I don’t actually know how to edit the custom nodes but I could get the following function to work based on your codebase:
const {
CompressionTypes,
Kafka: apacheKafka,
KafkaConfig,
SASLOptions,
TopicMessages,
} = require(`kafkajs`);
class Kafka {
constructor(topic, credentials) {
this.topic = topic;
this.credentials = credentials
}
async send(message) {
const credentials = this.credentials;
const brokers = (credentials.brokers || '').split(',');
const clientId = credentials.clientId;
const ssl = credentials.ssl
const config = {
clientId,
brokers,
ssl,
};
const kafka = new apacheKafka(config);
const producer = kafka.producer();
await producer.connect();
let compression = CompressionTypes.None;
const topic = this.topic
let headers = {};
const topicMessages = []
topicMessages.push(
{
topic,
messages: [{
value: message,
headers,
}],
});
const timeout = 30000
const acks = 0
const responseData = await producer.sendBatch(
{
topicMessages,
timeout,
compression,
acks,
},
);
if (responseData.length === 0) {
responseData.push({
success: true,
});
}
await producer.disconnect();
return [{json: responseData}];
}
}
exports.Kafka = Kafka
I think this is either a bug or a feature request, i.e. allow for Buffer messages, not only string (which also implies not casting it to string). I don’t actually have enough knowledge to understand all the edge cases and make a proposal MR.