I look forward to continuing to receive messages from subscribed topics. However, the result ends after receiving the message once. How do I keep receiving messages?
Reading through the code it looks at first sight all correct.
Does it still call the function which got defined under container.on('message', .... with each message that got received?
When I check the console.log, contactor.on ('message, ....) is called once and closeFunction is called.
editor-ui receiving the message once, after the message is not received. (MQTT Broker is continuously publishing messages…)
The code I wrote is as follows.
MqttTrigger.node.ts
import { ITriggerFunctions } from 'n8n-core';
import {
INodeType,
INodeTypeDescription,
ITriggerResponse,
} from 'n8n-workflow';
import * as mqtt from 'mqtt';
import {
IClientOptions,
} from 'mqtt';
export class MqttTrigger implements INodeType {
description: INodeTypeDescription = {
displayName: 'Mqtt Trigger',
name: 'mqttTrigger',
group: ['trigger'],
version: 1,
description: 'Listens to MQTT events',
defaults: {
name: 'Mqtt Trigger',
color: '#00FF00',
},
inputs: [],
outputs: ['main'],
credentials: [{
name: 'mqtt',
required: true,
}],
properties: [
// Node properties which the user gets displayed and
// can change on the node.
{
displayName: 'Topics',
name: 'topics',
type: 'string',
default: '',
description: `Topics to subscribe to, multiple can be defined with comma.<br/>
wildcard characters are supported (+ - for single level and # - for multi level)`,
},
]
};
async trigger(this: ITriggerFunctions): Promise<ITriggerResponse> {
const credentials = this.getCredentials('mqtt');
if (!credentials) {
throw new Error('Credentials are mandatory!');
}
const topics = (this.getNodeParameter('topics') as string).split(',');
const brokerUrl = credentials.brokerUrl;
const port = credentials.port as number;
const clientOptions: IClientOptions = {
port,
};
if (credentials.username && credentials.password) {
clientOptions.username = credentials.username as string;
clientOptions.password = credentials.password as string;
}
const container = mqtt.connect(brokerUrl, clientOptions);
const self = this;
container.on('connect', () => {
console.log(`MQTT Broker(${brokerUrl}) : Connected`);
container.subscribe(topics, (err, granted) => {
if (!err) {
console.log(`MQTT Broker(${brokerUrl}) : Subscription(Success) : ${topics}`);
} else {
console.log(`MQTT Broker(${brokerUrl}) : Subscription(Fail) : ${topics}`);
}
});
});
container.on('message', (topic: string, message: Buffer) => { // tslint:disable-line:no-any
self.emit([self.helpers.returnJsonArray([{ message: message.toString(), topic }])]);
console.log(`MQTT Broker(${brokerUrl}) : Message : ${message.toString()}`);
});
// The "closeFunction" function gets called by n8n whenever
// the workflow gets deactivated and can so clean up.
async function closeFunction() {
container.end();
}
// The "manualTriggerFunction" function gets called by n8n
// when a user is in the workflow editor and starts the
// workflow manually. So the function has to make sure that
// the emit() gets called with similar data like when it
// would trigger by itself so that the user knows what data
// to expect.
// async function manualTriggerFunction() {
// await new Promise(( resolve, reject ) => {
// container.on('connect', () => {
// container.subscribe(topics, (err, granted) => {
// if (!err) {
// container.on('message', (topic: string, message: Buffer) => { // tslint:disable-line:no-any
// self.emit([self.helpers.returnJsonArray([{ message: message.toString(), topic }])]);
// resolve(true);
// });
// } else {
// reject(err);
// }
// });
// });
// });
// }
return {
closeFunction,
// manualTriggerFunction,
};
}
}
Mqtt.credentials.ts
import {
ICredentialType,
NodePropertyTypes,
} from 'n8n-workflow';
export class Mqtt implements ICredentialType {
name = 'mqtt';
displayName = 'MQTT';
properties = [
// The credentials to get from user and save encrypted.
// Properties can be defined exactly in the same way
// as node properties.
{
displayName: 'Broker URL',
name: 'brokerUrl',
type: 'string' as NodePropertyTypes,
default: '',
},
{
displayName: 'Port',
name: 'port',
type: 'number' as NodePropertyTypes,
default: 1883,
},
{
displayName: 'Username',
name: 'username',
type: 'string' as NodePropertyTypes,
default: '',
},
{
displayName: 'Password',
name: 'password',
type: 'string' as NodePropertyTypes,
typeOptions: {
password: true,
},
default: '',
},
];
}
Ah yes, then everything is working 100% correctly.
The editor-ui is only meant to create/test workflows. So it stays active till one message got received and then disconnects. The data of that one message can then be used to create the workflow.
Once the workflow is ready and works correctly it can be activated and then the connection stays active as long n8n is running and the workflow is active.
Sadly have not the slightest idea why it would not work. All n8n does when a workflow gets activated, is to call the trigger function. That seems to work fine for all other nodes. So I would advice to add another console.log as first command and see if that one gets called (what I expect that it does).
Then going forward from there and see where it breaks exactly.