Background: Having large volume (up to millions) records to SQL query and update table (with PG). Single batch query always kills n8n memory out. thus, taking loop over items to split into separate batches, 10K or 5K per batch, count total # of records, then split into batch, using offset in sql to limit each batch numbers, i.e. Offset + limit as the record range for certain batch run
Issue facing: above loop runs well with batches within 50, but becoming slow even hang over hours while reaching batch # over 80. My guess would be, Offset + limit indexing over 1 M could slow down the workflow even with batch run.
Question: Any idea to resolve it? Thanks in advance!
Json code
{
“name”: “TestLoop”,
“nodes”: [
{
“parameters”: {
“jsCode”: “return ;”
},
“type”: “n8n-nodes-base.code”,
“typeVersion”: 2,
“position”: [
640,
64
],
“id”: “277db845-3cfe-41e5-ba93-a052bb8591e8”,
“name”: “Cleandata4”,
“alwaysOutputData”: true
},
{
“parameters”: {
“operation”: “executeQuery”,
“query”: “SELECT l.* – 直接查询,无需去重\nFROM logs l\nWHERE NOT EXISTS (\n SELECT 1\n FROM acl a\n WHERE\n l.sourceip <<= ANY(a.sourceip)\n AND l.targetip <<= ANY(a.destination)\n AND (\n ‘{any}’ = a.port \n OR l.targetport::text = ANY(a.port) \n OR '{}’ = a.port \n )\n AND l.site = a.site\n)\nAND l.“action” IN (‘允许’, ‘Allow’)\nAND l.site = ‘LY DT’\nAND l.“Time” >= CURRENT_TIMESTAMP - INTERVAL ‘7 days’\nLIMIT 10000 – 每批行数\nOFFSET {{ $json.currentOffset }}; – 偏移量(动态生成)",
“options”: {}
},
“type”: “n8n-nodes-base.postgres”,
“typeVersion”: 2.6,
“position”: [
-144,
-64
],
“id”: “547c6031-3dac-4747-b7e3-29a0b832ae4f”,
“name”: “ProcessSQL”,
“executeOnce”: false,
“alwaysOutputData”: true,
“credentials”: {
“postgres”: {
“id”: “6mEmJBqv1phLspWq”,
“name”: “Postgres account”
}
},
“onError”: “continueErrorOutput”
},
{
“parameters”: {
“rule”: {
“interval”: [
{
“field”: “weeks”,
“triggerAtDay”: [
6,
0
],
“triggerAtHour”: 12
},
{
“field”: “weeks”,
“triggerAtDay”: [
0,
6
],
“triggerAtHour”: 21
}
]
}
},
“type”: “n8n-nodes-base.scheduleTrigger”,
“typeVersion”: 1.2,
“position”: [
-1088,
112
],
“id”: “d71fb3ed-eaf2-4af0-bfc8-d8ba6b25005e”,
“name”: “Schedule Trigger4”
},
{
“parameters”: {
“operation”: “upsert”,
“schema”: {
“__rl”: true,
“mode”: “list”,
“value”: “public”
},
“table”: {
“__rl”: true,
“value”: “lognotmatch”,
“mode”: “list”,
“cachedResultName”: “lognotmatch”
},
“columns”: {
“mappingMode”: “defineBelow”,
“value”: {
“sourceport”: “={{ $json.sourceport }}”,
“targetport”: “={{ $json.targetport }}”,
“Time”: “={{ $json.Time }}”,
“securitylevel”: “={{ $json.securitylevel }}”,
“sourceip”: “={{ $json.sourceip }}”,
“targetip”: “={{ $json.targetip }}”,
“targetcountry”: “={{ $json.targetcountry }}”,
“protocol”: “={{ $json.protocol }}”,
“action”: “={{ $json.action }}”,
“securitypolicy”: “={{ $json.securitypolicy }}”,
“site”: “={{ $json.site }}”
},
“matchingColumns”: [
“Time”,
“sourceip”,
“targetip”,
“sourceport”,
“targetport”,
“site”
],
“schema”: [
{
“id”: “Time”,
“displayName”: “Time”,
“required”: false,
“defaultMatch”: false,
“display”: true,
“type”: “dateTime”,
“canBeUsedToMatch”: true,
“removed”: false
},
{
“id”: “securitylevel”,
“displayName”: “securitylevel”,
“required”: false,
“defaultMatch”: false,
“display”: true,
“type”: “string”,
“canBeUsedToMatch”: false
},
{
“id”: “sourceip”,
“displayName”: “sourceip”,
“required”: false,
“defaultMatch”: false,
“display”: true,
“type”: “string”,
“canBeUsedToMatch”: true,
“removed”: false
},
{
“id”: “targetip”,
“displayName”: “targetip”,
“required”: false,
“defaultMatch”: false,
“display”: true,
“type”: “string”,
“canBeUsedToMatch”: true,
“removed”: false
},
{
“id”: “targetcountry”,
“displayName”: “targetcountry”,
“required”: false,
“defaultMatch”: false,
“display”: true,
“type”: “string”,
“canBeUsedToMatch”: false
},
{
“id”: “sourceport”,
“displayName”: “sourceport”,
“required”: false,
“defaultMatch”: false,
“display”: true,
“type”: “number”,
“canBeUsedToMatch”: true,
“removed”: false
},
{
“id”: “targetport”,
“displayName”: “targetport”,
“required”: false,
“defaultMatch”: false,
“display”: true,
“type”: “number”,
“canBeUsedToMatch”: true,
“removed”: false
},
{
“id”: “protocol”,
“displayName”: “protocol”,
“required”: false,
“defaultMatch”: false,
“display”: true,
“type”: “string”,
“canBeUsedToMatch”: false
},
{
“id”: “action”,
“displayName”: “action”,
“required”: false,
“defaultMatch”: false,
“display”: true,
“type”: “string”,
“canBeUsedToMatch”: false
},
{
“id”: “securitypolicy”,
“displayName”: “securitypolicy”,
“required”: false,
“defaultMatch”: false,
“display”: true,
“type”: “string”,
“canBeUsedToMatch”: false
},
{
“id”: “site”,
“displayName”: “site”,
“required”: false,
“defaultMatch”: false,
“display”: true,
“type”: “string”,
“canBeUsedToMatch”: true,
“removed”: false
}
],
“attemptToConvertTypes”: false,
“convertFieldsToString”: false
},
“options”: {}
},
“type”: “n8n-nodes-base.postgres”,
“typeVersion”: 2.6,
“position”: [
192,
-64
],
“id”: “ee67c79e-5465-452a-a63f-b9a502fc81c2”,
“name”: “Insert/Update2”,
“alwaysOutputData”: true,
“credentials”: {
“postgres”: {
“id”: “6mEmJBqv1phLspWq”,
“name”: “Postgres account”
}
},
“onError”: “continueErrorOutput”
},
{
“parameters”: {
“operation”: “executeQuery”,
“query”: "SELECT COUNT() AS total \nFROM logs l\nWHERE NOT EXISTS (\n SELECT 1\n FROM acl a\n WHERE\n l.sourceip <<= ANY(a.sourceip)\n AND l.targetip <<= ANY(a.destination)\n AND (\n ‘{any}’ = a.port \n OR l.targetport::text = ANY(a.port) \n OR ‘{*}’ = a.port \n )\n AND l.site = a.site\n)\nAND l.“action” IN (‘允许’, ‘Allow’)\nAND l.site = ‘LT’\nAND l.“Time” >= CURRENT_TIMESTAMP - INTERVAL ‘7 days’;”,
“options”: {}
},
“type”: “n8n-nodes-base.postgres”,
“typeVersion”: 2.6,
“position”: [
-928,
112
],
“id”: “80446259-6da1-4ff4-83f2-2da8a5a7e288”,
“name”: “Execute a SQL query3”,
“credentials”: {
“postgres”: {
“id”: “6mEmJBqv1phLspWq”,
“name”: “Postgres account”
}
}
},
{
“parameters”: {
“options”: {
“reset”: false
}
},
“type”: “n8n-nodes-base.splitInBatches”,
“typeVersion”: 3,
“position”: [
-448,
-80
],
“id”: “967c8bba-6060-4936-b229-dab422faad0d”,
“name”: “Loop Over Items3”,
“alwaysOutputData”: true,
“onError”: “continueErrorOutput”
},
{
“parameters”: {
“jsCode”: “// 假设总条数从count节点获取(替换为你的count节点名称)\nconst total = $input.first().json.total;\nconst batchSize = 10000; // 固定每批1000条\nconst offsets = ;\n\n// 计算需要多少批,生成offset数组(0, 1000, 2000, …)\nfor (let offset = 0; offset < total; offset += batchSize) {\n offsets.push(offset);\n}\n\n// 输出数组,作为Loop Over Items的输入\nreturn offsets.map(offset => ({ json: { currentOffset: offset } }));”
},
“type”: “n8n-nodes-base.code”,
“typeVersion”: 2,
“position”: [
-752,
96
],
“id”: “5d55a48e-102d-4570-ab1e-10b32f378990”,
“name”: “Code in JavaScript”
}
],
“pinData”: {},
“connections”: {
“Cleandata4”: {
“main”: [
[
{
“node”: “Loop Over Items3”,
“type”: “main”,
“index”: 0
}
]
]
},
“ProcessSQL”: {
“main”: [
[
{
“node”: “Insert/Update2”,
“type”: “main”,
“index”: 0
}
]
]
},
“Schedule Trigger4”: {
“main”: [
[
{
“node”: “Execute a SQL query3”,
“type”: “main”,
“index”: 0
}
]
]
},
“Insert/Update2”: {
“main”: [
[
{
“node”: “Cleandata4”,
“type”: “main”,
“index”: 0
}
],
]
},
“Execute a SQL query3”: {
“main”: [
[
{
“node”: “Code in JavaScript”,
“type”: “main”,
“index”: 0
}
]
]
},
“Loop Over Items3”: {
“main”: [
,
[
{
“node”: “ProcessSQL”,
“type”: “main”,
“index”: 0
}
]
]
},
“Code in JavaScript”: {
“main”: [
[
{
“node”: “Loop Over Items3”,
“type”: “main”,
“index”: 0
}
]
]
}
},
“active”: false,
“settings”: {
“executionOrder”: “v1”,
“callerPolicy”: “workflowsFromSameOwner”,
“availableInMCP”: false,
“timeSavedPerExecution”: 0
},
“versionId”: “bb5e8adc-7dc6-4ea7-9466-f75c88505123”,
“meta”: {
“templateCredsSetupCompleted”: true,
“instanceId”: “ceca07d9d659b7504c68cc1b8228bf14f5c82355fb43847435c42909379c2197”
},
“id”: “SbwgHzrC7N2w1I30”,
“tags”:
}

