How to make split tasks to multiple workers

Hello.
I have a simple workflow - take data from the database and push rows to customer.io.
The only thing in response what I’m interested in is - request returns 200 (success) I’d love to add something like this:

  • queries with multiple workers - now it’s run in parallel and each request waits for a response and then runs next - with my number of data it’s taking few hours to run. Would be great to speed it up for example by adding threading - how to do this?
  • if any of the response fails then pass request params to the next node which will save to file

I’ve tried split in batches with for example size of 10 and duplicate customer.io node, but each node receives same data. I’d like to send different packs of records between nodes. Is this possible?

Hey @Kacper!

In the node, go to the Settings tab and toggle Continue On Fail to true. This will execute the workflow on failure. You can then check with an IF node if the previous node had any failures and design your workflow accordingly.

Can you also share your workflow (select the nodes and copy them and paste it here)? This would help me understand your issues better and help me replicate it. :slightly_smiling_face:

Thanks @harshil1712

{
  "nodes": [
    {
      "parameters": {},
      "name": "Start",
      "type": "n8n-nodes-base.start",
      "typeVersion": 1,
      "position": [
        200,
        300
      ]
    },
    {
      "parameters": {
        "triggerTimes": {
          "item": [
            {
              "mode": "everyX",
              "value": 4
            }
          ]
        }
      },
      "name": "Cron",
      "type": "n8n-nodes-base.cron",
      "typeVersion": 1,
      "position": [
        200,
        500
      ]
    },
    {
      "parameters": {
        "functionCode": "const sqlQuery = items[0].json.query;\nconst {BigQuery} = require('@google-cloud/bigquery');\nconst bigquery = new BigQuery();\nconst options = {\n  query: sqlQuery,\n  location: \"europe-west2\",\n};\nconst [rows] = await bigquery.query(options);\nreturn rows.map((row) => {\n  return {\"json\": row}\n});\n"
      },
      "name": "BigQuery Request",
      "type": "n8n-nodes-base.function",
      "typeVersion": 1,
      "position": [
        600,
        400
      ]
    },
    {
      "parameters": {
        "id": "={{$json[\"employee_id\"]}}",
        "jsonParameters": true,
        "additionalFieldsJson": "={\"email\": \"{{$json[\"employee_details_work_email\"]}}\",\n\"id\": \"{{$json[\"customer_id\"]}}\"}"
      },
      "name": "CustomerIO",
      "type": "n8n-nodes-base.customerIo",
      "typeVersion": 1,
      "position": [
        800,
        400
      ],
      "retryOnFail": true,
      "maxTries": 5,
      "credentials": {
        "customerIoApi": "CustomerIO"
      },
      "continueOnFail": true
    },
    {
      "parameters": {
        "values": {
          "string": [
            {
              "name": "query",
              "value": "SELECT * FROM `table`"
            }
          ]
        },
        "options": {}
      },
      "name": "SQL Query",
      "type": "n8n-nodes-base.set",
      "typeVersion": 1,
      "position": [
        400,
        400
      ]
    },
    {
      "parameters": {
        "channel": "#customer-io",
        "attachments": [
          {
            "fields": {
              "item": [
                {
                  "title": "Failed node",
                  "value": "={{$json[\"execution\"][\"lastNodeExecuted\"]}}",
                  "short": true
                },
                {
                  "title": "URL",
                  "value": "={{$json[\"execution\"][\"url\"]}}",
                  "short": true
                },
                {
                  "title": "Mode",
                  "value": "={{$json[\"execution\"][\"mode\"]}}",
                  "short": true
                }
              ]
            },
            "color": "#ff0000",
            "title": "=Workflow {{$json[\"workflow\"][\"name\"]}} (#{{$json[\"workflow\"][\"id\"]}}) :thumbsdown:",
            "author_link": "={{$json[\"execution\"][\"url\"]}}",
            "footer": "Inform: @here"
          }
        ],
        "otherOptions": {
          "mrkdwn": true,
          "link_names": true
        },
        "blocksUi": {
          "blocksValues": []
        }
      },
      "name": "Slack",
      "type": "n8n-nodes-base.slack",
      "typeVersion": 1,
      "position": [
        480,
        60
      ],
      "credentials": {
        "slackApi": "Slack Token"
      }
    },
    {
      "parameters": {},
      "name": "Error Trigger",
      "type": "n8n-nodes-base.errorTrigger",
      "typeVersion": 1,
      "position": [
        280,
        60
      ]
    }
  ],
  "connections": {
    "Start": {
      "main": [
        [
          {
            "node": "SQL Query",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Cron": {
      "main": [
        [
          {
            "node": "SQL Query",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "BigQuery Request": {
      "main": [
        [
          {
            "node": "CustomerIO",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "SQL Query": {
      "main": [
        [
          {
            "node": "BigQuery Request",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Error Trigger": {
      "main": [
        [
          {
            "node": "Slack",
            "type": "main",
            "index": 0
          }
        ]
      ]
    }
  }
}

This Is my code. And the problem is that I have only one CustomerIO which is updating data. I’d love to split rows to multiple CustomerIO workers and push different batch to each of them

@Kacper thanks for sharing the workflow. I now have a much better understanding of your question. The nodes in n8n can’t execute in parallel. Only after one node has finished it’s execution the next is executed. Hence, splitting the data and sending them to different CustomerIO node will not help.
One solution that is on the top my head is to create multiple such workflows and query limited data in each of them. This is not an ideal solution though.

How about creating new workflow which will split data to 10 groups and this workflow will fire Execute workflow with customer.io for each group?

OK, this doesn’t work :confused:

If you want to take this route, you can programmatically set the value of the workflow ID. Assuming that the workflows are saved consecutively, they will have an incremental ID. For example, 1, 2, 3, and so on.

I’ll try to contact customer.io to add bulk actions in API, this will solve my problem.

1 Like