RabbitMQ trigger and limit executions

Hi!
We are giving a try with n8n because it’s extremely user friendly.
But we are at a point which it could be a no-go for this adoption. I explain…

We have a message broker, for example RabbitMQ, receiving messages (hundreds or thousands) in a specific topic from multiple apps, in a standardized format. All’s fine with this.
Now we add n8n to process these messages using RabbitMQ Trigger and it is a caos, the n8n simply dies because the server resources are immediately reached.

First attempt to solve this was add a line in RabbitMQTrigger.node.js:

const startConsumer = async () => {
await channel.prefetch(2);
await channel.consume(queue, async (message) => {

Ok, with this tweak the n8n processes the things and apparently does not die but if we see the execution list it is insane the processes at the same time.

About scaling the n8n and using redis, basically it is the same, we cannot limit workers at the same time (I thing).

For you, using this scenario (thousands of messages), what is the best approach to solve the problem?

Thanks :slight_smile:

Welcome to the community @powerPT !

Next to the prefetch can should also be able to add a wait-time between in gets the next batch. Both can then be optional parameters that can be set.

Hey @powerPT,

How is it going? Did the solution provided by @jan work for you?

Hi there!

Well no, my function below doesn’t works properly. You have my tweaks in bold.
Oh god, a controllable consumer will give to a lot of people the solutions for many problems!

async trigger() {
const sleep = (waitTimeInMs) => new Promise(resolve => setTimeout(resolve, waitTimeInMs));
const queue = this.getNodeParameter(‘queue’);
const options = this.getNodeParameter(‘options’, {});
const channel = await GenericFunctions_1.rabbitmqConnectQueue.call(this, queue, options);
const self = this;
const startConsumer = async () => {
await channel.prefetch(2);
await channel.consume(queue, async (message) => {

channel.ack(message);
}
});
};
await sleep(10000); // sleep for 10 seconds
startConsumer();
async function closeFunction()