Help required using Loop handling large volume pgSQL query

Background: Having large volume (up to millions) records to SQL query and update table (with PG). Single batch query always kills n8n memory out. thus, taking loop over items to split into separate batches, 10K or 5K per batch, count total # of records, then split into batch, using offset in sql to limit each batch numbers, i.e. Offset + limit as the record range for certain batch run

Issue facing: above loop runs well with batches within 50, but becoming slow even hang over hours while reaching batch # over 80. My guess would be, Offset + limit indexing over 1 M could slow down the workflow even with batch run.
Question: Any idea to resolve it? Thanks in advance!

Json code

{
“name”: “TestLoop”,
“nodes”: [
{
“parameters”: {
“jsCode”: “return ;”
},
“type”: “n8n-nodes-base.code”,
“typeVersion”: 2,
“position”: [
640,
64
],
“id”: “277db845-3cfe-41e5-ba93-a052bb8591e8”,
“name”: “Cleandata4”,
“alwaysOutputData”: true
},
{
“parameters”: {
“operation”: “executeQuery”,
“query”: “SELECT l.* – 直接查询,无需去重\nFROM logs l\nWHERE NOT EXISTS (\n SELECT 1\n FROM acl a\n WHERE\n l.sourceip <<= ANY(a.sourceip)\n AND l.targetip <<= ANY(a.destination)\n AND (\n ‘{any}’ = a.port \n OR l.targetport::text = ANY(a.port) \n OR '{}’ = a.port \n )\n AND l.site = a.site\n)\nAND l.“action” IN (‘允许’, ‘Allow’)\nAND l.site = ‘LY DT’\nAND l.“Time” >= CURRENT_TIMESTAMP - INTERVAL ‘7 days’\nLIMIT 10000 – 每批行数\nOFFSET {{ $json.currentOffset }}; – 偏移量(动态生成)",
“options”: {}
},
“type”: “n8n-nodes-base.postgres”,
“typeVersion”: 2.6,
“position”: [
-144,
-64
],
“id”: “547c6031-3dac-4747-b7e3-29a0b832ae4f”,
“name”: “ProcessSQL”,
“executeOnce”: false,
“alwaysOutputData”: true,
“credentials”: {
“postgres”: {
“id”: “6mEmJBqv1phLspWq”,
“name”: “Postgres account”
}
},
“onError”: “continueErrorOutput”
},
{
“parameters”: {
“rule”: {
“interval”: [
{
“field”: “weeks”,
“triggerAtDay”: [
6,
0
],
“triggerAtHour”: 12
},
{
“field”: “weeks”,
“triggerAtDay”: [
0,
6
],
“triggerAtHour”: 21
}
]
}
},
“type”: “n8n-nodes-base.scheduleTrigger”,
“typeVersion”: 1.2,
“position”: [
-1088,
112
],
“id”: “d71fb3ed-eaf2-4af0-bfc8-d8ba6b25005e”,
“name”: “Schedule Trigger4”
},
{
“parameters”: {
“operation”: “upsert”,
“schema”: {
“__rl”: true,
“mode”: “list”,
“value”: “public”
},
“table”: {
“__rl”: true,
“value”: “lognotmatch”,
“mode”: “list”,
“cachedResultName”: “lognotmatch”
},
“columns”: {
“mappingMode”: “defineBelow”,
“value”: {
“sourceport”: “={{ $json.sourceport }}”,
“targetport”: “={{ $json.targetport }}”,
“Time”: “={{ $json.Time }}”,
“securitylevel”: “={{ $json.securitylevel }}”,
“sourceip”: “={{ $json.sourceip }}”,
“targetip”: “={{ $json.targetip }}”,
“targetcountry”: “={{ $json.targetcountry }}”,
“protocol”: “={{ $json.protocol }}”,
“action”: “={{ $json.action }}”,
“securitypolicy”: “={{ $json.securitypolicy }}”,
“site”: “={{ $json.site }}”
},
“matchingColumns”: [
“Time”,
“sourceip”,
“targetip”,
“sourceport”,
“targetport”,
“site”
],
“schema”: [
{
“id”: “Time”,
“displayName”: “Time”,
“required”: false,
“defaultMatch”: false,
“display”: true,
“type”: “dateTime”,
“canBeUsedToMatch”: true,
“removed”: false
},
{
“id”: “securitylevel”,
“displayName”: “securitylevel”,
“required”: false,
“defaultMatch”: false,
“display”: true,
“type”: “string”,
“canBeUsedToMatch”: false
},
{
“id”: “sourceip”,
“displayName”: “sourceip”,
“required”: false,
“defaultMatch”: false,
“display”: true,
“type”: “string”,
“canBeUsedToMatch”: true,
“removed”: false
},
{
“id”: “targetip”,
“displayName”: “targetip”,
“required”: false,
“defaultMatch”: false,
“display”: true,
“type”: “string”,
“canBeUsedToMatch”: true,
“removed”: false
},
{
“id”: “targetcountry”,
“displayName”: “targetcountry”,
“required”: false,
“defaultMatch”: false,
“display”: true,
“type”: “string”,
“canBeUsedToMatch”: false
},
{
“id”: “sourceport”,
“displayName”: “sourceport”,
“required”: false,
“defaultMatch”: false,
“display”: true,
“type”: “number”,
“canBeUsedToMatch”: true,
“removed”: false
},
{
“id”: “targetport”,
“displayName”: “targetport”,
“required”: false,
“defaultMatch”: false,
“display”: true,
“type”: “number”,
“canBeUsedToMatch”: true,
“removed”: false
},
{
“id”: “protocol”,
“displayName”: “protocol”,
“required”: false,
“defaultMatch”: false,
“display”: true,
“type”: “string”,
“canBeUsedToMatch”: false
},
{
“id”: “action”,
“displayName”: “action”,
“required”: false,
“defaultMatch”: false,
“display”: true,
“type”: “string”,
“canBeUsedToMatch”: false
},
{
“id”: “securitypolicy”,
“displayName”: “securitypolicy”,
“required”: false,
“defaultMatch”: false,
“display”: true,
“type”: “string”,
“canBeUsedToMatch”: false
},
{
“id”: “site”,
“displayName”: “site”,
“required”: false,
“defaultMatch”: false,
“display”: true,
“type”: “string”,
“canBeUsedToMatch”: true,
“removed”: false
}
],
“attemptToConvertTypes”: false,
“convertFieldsToString”: false
},
“options”: {}
},
“type”: “n8n-nodes-base.postgres”,
“typeVersion”: 2.6,
“position”: [
192,
-64
],
“id”: “ee67c79e-5465-452a-a63f-b9a502fc81c2”,
“name”: “Insert/Update2”,
“alwaysOutputData”: true,
“credentials”: {
“postgres”: {
“id”: “6mEmJBqv1phLspWq”,
“name”: “Postgres account”
}
},
“onError”: “continueErrorOutput”
},
{
“parameters”: {
“operation”: “executeQuery”,
“query”: "SELECT COUNT(
) AS total \nFROM logs l\nWHERE NOT EXISTS (\n SELECT 1\n FROM acl a\n WHERE\n l.sourceip <<= ANY(a.sourceip)\n AND l.targetip <<= ANY(a.destination)\n AND (\n ‘{any}’ = a.port \n OR l.targetport::text = ANY(a.port) \n OR ‘{*}’ = a.port \n )\n AND l.site = a.site\n)\nAND l.“action” IN (‘允许’, ‘Allow’)\nAND l.site = ‘LT’\nAND l.“Time” >= CURRENT_TIMESTAMP - INTERVAL ‘7 days’;”,
“options”: {}
},
“type”: “n8n-nodes-base.postgres”,
“typeVersion”: 2.6,
“position”: [
-928,
112
],
“id”: “80446259-6da1-4ff4-83f2-2da8a5a7e288”,
“name”: “Execute a SQL query3”,
“credentials”: {
“postgres”: {
“id”: “6mEmJBqv1phLspWq”,
“name”: “Postgres account”
}
}
},
{
“parameters”: {
“options”: {
“reset”: false
}
},
“type”: “n8n-nodes-base.splitInBatches”,
“typeVersion”: 3,
“position”: [
-448,
-80
],
“id”: “967c8bba-6060-4936-b229-dab422faad0d”,
“name”: “Loop Over Items3”,
“alwaysOutputData”: true,
“onError”: “continueErrorOutput”
},
{
“parameters”: {
“jsCode”: “// 假设总条数从count节点获取(替换为你的count节点名称)\nconst total = $input.first().json.total;\nconst batchSize = 10000; // 固定每批1000条\nconst offsets = ;\n\n// 计算需要多少批,生成offset数组(0, 1000, 2000, …)\nfor (let offset = 0; offset < total; offset += batchSize) {\n offsets.push(offset);\n}\n\n// 输出数组,作为Loop Over Items的输入\nreturn offsets.map(offset => ({ json: { currentOffset: offset } }));”
},
“type”: “n8n-nodes-base.code”,
“typeVersion”: 2,
“position”: [
-752,
96
],
“id”: “5d55a48e-102d-4570-ab1e-10b32f378990”,
“name”: “Code in JavaScript”
}
],
“pinData”: {},
“connections”: {
“Cleandata4”: {
“main”: [
[
{
“node”: “Loop Over Items3”,
“type”: “main”,
“index”: 0
}
]
]
},
“ProcessSQL”: {
“main”: [
[
{
“node”: “Insert/Update2”,
“type”: “main”,
“index”: 0
}
]
]
},
“Schedule Trigger4”: {
“main”: [
[
{
“node”: “Execute a SQL query3”,
“type”: “main”,
“index”: 0
}
]
]
},
“Insert/Update2”: {
“main”: [
[
{
“node”: “Cleandata4”,
“type”: “main”,
“index”: 0
}
],

]
},
“Execute a SQL query3”: {
“main”: [
[
{
“node”: “Code in JavaScript”,
“type”: “main”,
“index”: 0
}
]
]
},
“Loop Over Items3”: {
“main”: [
,
[
{
“node”: “ProcessSQL”,
“type”: “main”,
“index”: 0
}
]
]
},
“Code in JavaScript”: {
“main”: [
[
{
“node”: “Loop Over Items3”,
“type”: “main”,
“index”: 0
}
]
]
}
},
“active”: false,
“settings”: {
“executionOrder”: “v1”,
“callerPolicy”: “workflowsFromSameOwner”,
“availableInMCP”: false,
“timeSavedPerExecution”: 0
},
“versionId”: “bb5e8adc-7dc6-4ea7-9466-f75c88505123”,
“meta”: {
“templateCredsSetupCompleted”: true,
“instanceId”: “ceca07d9d659b7504c68cc1b8228bf14f5c82355fb43847435c42909379c2197”
},
“id”: “SbwgHzrC7N2w1I30”,
“tags”:
}

Here you can find some info, but I would suggest as well using sub-workflow s, since execution releases memory once it comletes.

  • Currently, all data from each loop iteration stays in memory, eventually causing out-of-memory issues

  • The sub-workflow only holds data for the current batch in memory, then frees it