Kafka event emission with Buffer message fails

Issue

When I try to emit a Kafka event with the Kafka node in n8n, I get the following error when giving a Buffer object as the message:

ERROR: The "string" argument must be of type string or an instance of Buffer or ArrayBuffer. Received an instance of Object

Workflow

myProto = MyProtoClass.create({prop1: '1', prop2: '2'})
o = Buffer.from(myProto, encoding='utf8')

Emission works if I apply o.toString() but then event processing fails with an IndexError (which I got locally when I tried to save with toString() but resolved if saving the Buffer object to a binary file directly).

Any suggestions as to what I should change would be most appreciated!

n8n setup

  • n8n version: 0.164.1
  • Running n8n with the execution process [own(default), main]: own
  • Running n8n via [Docker, npm, n8n.cloud, desktop app]: Docker

Hey @Delphine_Pessoa,

The error says you are passing an object and it is expecting a string, you would need to make your object a string or a buffer instance like the typescript / JavaScript error message is suggesting.

Hi @Jon ,

Thank you for getting back to me on this =)

As mentioned in my post, I am passing a Buffer object.

In a Function node, I have something like

myProto = MyProtoClass.create({prop1: '1', prop2: '2'})
o = Buffer.from(myProto, encoding='utf8')
return [{json: {message: o}}]

And in the Kafka node, the message is retrieved from the input JSON with an expression:

{{$json["message"]}}

When I do

console.log(o)

I get

{type: 'Buffer', data: Array(485)}
data: (485) [10, 24,  …]
type: "Buffer"
[[Prototype]]: Object

Hey @Delphine_Pessoa,

I may be missing more of this but the error is still clear, what you are passing is not being seen as the correct data type so while it looks like it should be a buffer it is actually an object when you use it.

I don’t have your protoclass so testing isn’t possible but at the moment I don’t think this is an error with n8n.

@Jon Do you think you could point me to which kafka library and function is used to produce the event, so maybe I can check how the “message” argument is being used?

EDIT: I have found the codebase for this: n8n/Kafka.node.ts at 69d6b7827f434665696bdf8a8183280874ed2b99 · n8n-io/n8n · GitHub

Adding some elements, from this documentation, Buffer.isBuffer() is the way to test if an object is a Buffer in Nodejs.

For my object we have:

Buffer.isBuffer(o) // True
typeof o // object
typeof (new Buffer(3)) // object

The latter showcases the issue, because in the Kafka node definition there is

let message: string | Buffer;

Do you have any example flow that uses Buffers in Kafka nodes?

Hey @Delphine_Pessoa,

Sadly I don’t have any example workflows that use a buffer with Kafka but as a test maybe you could try an http request node and have it download some binary data from somewhere and use that as the input for Kafka and see if it works.

1 Like

Either way, it wouldn’t matter because you cast it to string a bit below

message = this.getNodeParameter('message', i) as string;

As far as I can see from kafkajs documentation, there is no requirement for the value to be a string, it can be an object.

class InstrumentationEvent {
  /**
   * @param {String} type
   * @param {Object} payload
   */
  constructor(type, payload) {
    this.id = nextId()
    this.type = type
    this.timestamp = Date.now()
    this.payload = payload
  }
}

I’m going to try to modify the Kafka node to allow for Buffer objects.

I don’t actually know how to edit the custom nodes but I could get the following function to work based on your codebase:


const {
    CompressionTypes,
    Kafka: apacheKafka,
    KafkaConfig,
    SASLOptions,
    TopicMessages,
} = require(`kafkajs`);


class Kafka {

    constructor(topic, credentials) {
        this.topic = topic;
        this.credentials = credentials
    }

    async send(message) {

        const credentials = this.credentials;
        const brokers = (credentials.brokers || '').split(',');
        const clientId = credentials.clientId;
        const ssl = credentials.ssl

        const config = {
            clientId,
            brokers,
            ssl,
        };

        const kafka = new apacheKafka(config);

        const producer = kafka.producer();

        await producer.connect();

        let compression = CompressionTypes.None;

        const topic = this.topic

        let headers = {};

        const topicMessages = []

        topicMessages.push(
            {
                topic,
                messages: [{
                    value: message,
                    headers,
                }],
            });

        const timeout = 30000
        const acks = 0

        const responseData = await producer.sendBatch(
            {
                topicMessages,
                timeout,
                compression,
                acks,
            },
        );

        if (responseData.length === 0) {
            responseData.push({
                success: true,
            });
        }

        await producer.disconnect();

        return [{json: responseData}];
    }
}

exports.Kafka = Kafka

I think this is either a bug or a feature request, i.e. allow for Buffer messages, not only string (which also implies not casting it to string). I don’t actually have enough knowledge to understand all the edge cases and make a proposal MR.

1 Like