How do I subscribe to MQTT Topic and continue to receive messages?

Hello! I have created a Trigger Node that subscribes to a specific topic of MQTT Broker and receives messages as shown below.

I look forward to continuing to receive messages from subscribed topics. However, the result ends after receiving the message once. How do I keep receiving messages?

Welcome to the community @whiter8t!

Reading through the code it looks at first sight all correct.
Does it still call the function which got defined under container.on('message', .... with each message that got received?

When I check the console.log, contactor.on ('message, ....) is called once and closeFunction is called.
editor-ui receiving the message once, after the message is not received. (MQTT Broker is continuously publishing messages…)

The code I wrote is as follows.

MqttTrigger.node.ts

import { ITriggerFunctions } from 'n8n-core';
import {
	INodeType,
	INodeTypeDescription,
	ITriggerResponse,
} from 'n8n-workflow';

import * as mqtt from 'mqtt';

import {
	IClientOptions,
} from 'mqtt';

export class MqttTrigger implements INodeType {
	description: INodeTypeDescription = {
		displayName: 'Mqtt Trigger',
		name: 'mqttTrigger',
		group: ['trigger'],
		version: 1,
		description: 'Listens to MQTT events',
		defaults: {
			name: 'Mqtt Trigger',
			color: '#00FF00',
		},
		inputs: [],
		outputs: ['main'],
		credentials: [{
			name: 'mqtt',
			required: true,
		}],
		properties: [
			// Node properties which the user gets displayed and
			// can change on the node.
			{
				displayName: 'Topics',
				name: 'topics',
				type: 'string',
				default: '',
				description: `Topics to subscribe to, multiple can be defined with comma.<br/>
				wildcard characters are supported (+ - for single level and # - for multi level)`,
			},
		]
	};


	async trigger(this: ITriggerFunctions): Promise<ITriggerResponse> {

		const credentials = this.getCredentials('mqtt');
		if (!credentials) {
			throw new Error('Credentials are mandatory!');
		}

		const topics = (this.getNodeParameter('topics') as string).split(',');

		const brokerUrl = credentials.brokerUrl;
		const port = credentials.port as number;

		const clientOptions: IClientOptions = {
			port,
		};

		if (credentials.username && credentials.password) {
			clientOptions.username = credentials.username as string;
			clientOptions.password = credentials.password as string;
		}

		const container = mqtt.connect(brokerUrl, clientOptions);

		const self = this;

		container.on('connect', () => {

			console.log(`MQTT Broker(${brokerUrl}) : Connected`);
			
			container.subscribe(topics, (err, granted) => {
				if (!err) {
					console.log(`MQTT Broker(${brokerUrl}) : Subscription(Success) : ${topics}`);
				} else {
					console.log(`MQTT Broker(${brokerUrl}) : Subscription(Fail) : ${topics}`);
				}
			});
		});
		
		container.on('message', (topic: string, message: Buffer) => { // tslint:disable-line:no-any
			self.emit([self.helpers.returnJsonArray([{ message: message.toString(), topic }])]);
			console.log(`MQTT Broker(${brokerUrl}) : Message : ${message.toString()}`);
		});


		// The "closeFunction" function gets called by n8n whenever
		// the workflow gets deactivated and can so clean up.
		async function closeFunction() {
			container.end();
		}

		// The "manualTriggerFunction" function gets called by n8n
		// when a user is in the workflow editor and starts the
		// workflow manually. So the function has to make sure that
		// the emit() gets called with similar data like when it
		// would trigger by itself so that the user knows what data
		// to expect.
		// async function manualTriggerFunction() {
		// 	await new Promise(( resolve, reject ) => {
		// 		container.on('connect', () => {
		// 			container.subscribe(topics, (err, granted) => {
		// 				if (!err) {
		// 					container.on('message', (topic: string, message: Buffer) => { // tslint:disable-line:no-any
		// 						self.emit([self.helpers.returnJsonArray([{ message: message.toString(), topic }])]);
		// 						resolve(true);
		// 					});
		// 				} else {
		// 					reject(err);
		// 				}
		// 			});
		// 		});
		// 	});
		// }

		return {
			closeFunction,
			// manualTriggerFunction,
		};

	}
}

Mqtt.credentials.ts

import {
	ICredentialType,
	NodePropertyTypes,
} from 'n8n-workflow';


export class Mqtt implements ICredentialType {
	name = 'mqtt';
	displayName = 'MQTT';
	properties = [
		// The credentials to get from user and save encrypted.
		// Properties can be defined exactly in the same way
		// as node properties.
		{
			displayName: 'Broker URL',
			name: 'brokerUrl',
			type: 'string' as NodePropertyTypes,
			default: '',
		},
		{
			displayName: 'Port',
			name: 'port',
			type: 'number' as NodePropertyTypes,
			default: 1883,
		},
		{
			displayName: 'Username',
			name: 'username',
			type: 'string' as NodePropertyTypes,
			default: '',
		},
		{
			displayName: 'Password',
			name: 'password',
			type: 'string' as NodePropertyTypes,
			typeOptions: {
				password: true,
			},
			default: '',
		},
	];
}

Ah yes, then everything is working 100% correctly.

The editor-ui is only meant to create/test workflows. So it stays active till one message got received and then disconnects. The data of that one message can then be used to create the workflow.

Once the workflow is ready and works correctly it can be activated and then the connection stays active as long n8n is running and the workflow is active.

Aha, I understood.

Click ‘Execute Workflow’ as shown below to confirm that each node operates normally. and I activated this workflow.

I expect MQTT data to continue posting to Slack channel via HTTP Request node. However, it is not posted to Slack channel after ‘Active’.

If I click ‘Executions’ on the sidebar of the menu, I think the workflow ordered by ‘Active’ above is not running.

Is there any reason why it doesn’t work?

Did you configure n8n globally or the workflow to save executions that did succeed/fail?

And I also see that you have the debug console.logs in the code. Do you seem them printing the data in the console as expected?

Console logs do not occur after Active.

The following is the console log when running ‘Execute Workflow’.

n8n: [Node] MQTT Broker(mqtt://101.0.0.200) : Connected
n8n: [Node] MQTT Broker(mqtt://101.0.0.200) : Subscription(Success) : /oneM2M/req/STemperatureHumiditySensor/Mobius2/json
n8n: [Node] MQTT Broker(mqtt://101.0.0.200) : Message : {"op":"1","to":"/Mobius/TemperatureHumiditySensor/temperaturehumidity","fr":"STemperatureHumiditySensor","rqi":"iYoDkn8XB","ty":"4","pc":{"m2m:cin":{"con":{"ct":1597825744503,"temperature":28,"humidity":16,"cpu_usage":8,"mem_total":2048,"mem_used":592,"mem_free":1456,"uptime":1595223342000,"uptime_second":1318909,"disk_size":14987608,"disk_used":7419232,"disk_available":7568376,"receive":null,"transmit":null,"interface":"disable","ssid":"disable","quality":null}}}}

The following message is posted on the Slack channel via HTTP Request Node.

It has been confirmed that it is operating normally in ‘Execute Workflow’. However, I unable to determine post-active behavior.

(I am sorry for my short English. and thank you for your help!)

Sadly have not the slightest idea why it would not work. All n8n does when a workflow gets activated, is to call the trigger function. That seems to work fine for all other nodes. So I would advice to add another console.log as first command and see if that one gets called (what I expect that it does).
Then going forward from there and see where it breaks exactly.

thank you for your answer.

I will environment EXECUTIONS_DATA_SAVE_ON_ERROR and EXECUTIONS_DATA_SAVE_ON_SUCCESS set all and check by adding console.log.

1 Like

Initializing and resetting the database will work properly! :slight_smile: Thank you for taking the time to help.

Ah great to hear! Have fun!

Hi @whiter8t, do you plan to send in a PR for the MQTT nodes? Would love to use these in some IoT workflows :slight_smile:

2 Likes

I’ll try PR. have a nice day! :slight_smile:

1 Like

I sent the PR! :slight_smile:

https://github.com/n8n-io/n8n/pull/862

1 Like

Wow, so cool! Thanks a lot @whiter8t :raised_hands: