Kafka event emission with Buffer message fails

I don’t actually know how to edit the custom nodes but I could get the following function to work based on your codebase:


const {
    CompressionTypes,
    Kafka: apacheKafka,
    KafkaConfig,
    SASLOptions,
    TopicMessages,
} = require(`kafkajs`);


class Kafka {

    constructor(topic, credentials) {
        this.topic = topic;
        this.credentials = credentials
    }

    async send(message) {

        const credentials = this.credentials;
        const brokers = (credentials.brokers || '').split(',');
        const clientId = credentials.clientId;
        const ssl = credentials.ssl

        const config = {
            clientId,
            brokers,
            ssl,
        };

        const kafka = new apacheKafka(config);

        const producer = kafka.producer();

        await producer.connect();

        let compression = CompressionTypes.None;

        const topic = this.topic

        let headers = {};

        const topicMessages = []

        topicMessages.push(
            {
                topic,
                messages: [{
                    value: message,
                    headers,
                }],
            });

        const timeout = 30000
        const acks = 0

        const responseData = await producer.sendBatch(
            {
                topicMessages,
                timeout,
                compression,
                acks,
            },
        );

        if (responseData.length === 0) {
            responseData.push({
                success: true,
            });
        }

        await producer.disconnect();

        return [{json: responseData}];
    }
}

exports.Kafka = Kafka

I think this is either a bug or a feature request, i.e. allow for Buffer messages, not only string (which also implies not casting it to string). I don’t actually have enough knowledge to understand all the edge cases and make a proposal MR.

1 Like