RabbitMQ trigger and limit executions

Hi!
We are giving a try with n8n because it’s extremely user friendly.
But we are at a point which it could be a no-go for this adoption. I explain…

We have a message broker, for example RabbitMQ, receiving messages (hundreds or thousands) in a specific topic from multiple apps, in a standardized format. All’s fine with this.
Now we add n8n to process these messages using RabbitMQ Trigger and it is a caos, the n8n simply dies because the server resources are immediately reached.

First attempt to solve this was add a line in RabbitMQTrigger.node.js:

const startConsumer = async () => {
await channel.prefetch(2);
await channel.consume(queue, async (message) => {

Ok, with this tweak the n8n processes the things and apparently does not die but if we see the execution list it is insane the processes at the same time.

About scaling the n8n and using redis, basically it is the same, we cannot limit workers at the same time (I thing).

For you, using this scenario (thousands of messages), what is the best approach to solve the problem?

Thanks :slight_smile:

Welcome to the community @powerPT !

Next to the prefetch can should also be able to add a wait-time between in gets the next batch. Both can then be optional parameters that can be set.

Hey @powerPT,

How is it going? Did the solution provided by @jan work for you?

Hi there!

Well no, my function below doesn’t works properly. You have my tweaks in bold.
Oh god, a controllable consumer will give to a lot of people the solutions for many problems!

async trigger() {
const sleep = (waitTimeInMs) => new Promise(resolve => setTimeout(resolve, waitTimeInMs));
const queue = this.getNodeParameter(‘queue’);
const options = this.getNodeParameter(‘options’, {});
const channel = await GenericFunctions_1.rabbitmqConnectQueue.call(this, queue, options);
const self = this;
const startConsumer = async () => {
await channel.prefetch(2);
await channel.consume(queue, async (message) => {

channel.ack(message);
}
});
};
await sleep(10000); // sleep for 10 seconds
startConsumer();
async function closeFunction()

Hi @powerPT

I had exactly the same complain as you.
The RabbitMQ trigger will try to initialize the same amount of processes as the amount of message in the queue, because of unlimited prefetch.
It will consume quite lots of memory, more than 10Gb in my case.
In the end, the container will be killed because of out of memory.

But I find kind of workaround later on that n8n could be implemented with execution mode = main.
Under this mode, all executions will be done using the main process.
It will explicitly lower down the resource usage(< 300Mb) and increase the performance(around 100 messages per second).
It really solve the pain point for me.

Maybe you could give a try on that.

More detail about the execution-modes and Pros/Cons you might be interested: