Struggling with pagination

Hi,

I’ve been working with the Hubspot API, using the http node, and struggling to combine my paginated results for further processing.

I’ve put together a prototype flow that uses the Hubspot search endpoint to find “Companies” that have changed recently. I’ve managed to figure how to use the pagination, and have an IF node that is storing up each of the pages, but I’m struggling to merge the pages together.

I’ve cobbled together something from other forum topics, but it isn’t working, and I’m at the limit of my javascript capability (as in I have none :slight_smile:).

Here is the workflow

{
  "name": "Hubspot new Companies sync to data warehouse",
  "nodes": [
    {
      "parameters": {},
      "name": "Start",
      "type": "n8n-nodes-base.start",
      "typeVersion": 1,
      "position": [
        50,
        400
      ]
    },
    {
      "parameters": {
        "functionCode": "return [\n  {\n    json: {\n          \"filters\": [\n            {\n              \"propertyName\": \"createdate\",\n              \"operator\": \"GT\",\n              \"value\": $json[\"timestampoffset\"]\n            }\n          ]\n        }\n    }\n]\n"
      },
      "name": "offset: Timestamp",
      "type": "n8n-nodes-base.function",
      "typeVersion": 1,
      "position": [
        480,
        400
      ]
    },
    {
      "parameters": {
        "value": "2021-10-01 00:00:00",
        "dataPropertyName": "timestampoffset",
        "toFormat": "x",
        "options": {}
      },
      "name": "Date & Time",
      "type": "n8n-nodes-base.dateTime",
      "typeVersion": 1,
      "position": [
        270,
        400
      ]
    },
    {
      "parameters": {
        "requestMethod": "POST",
        "url": "https://api.hubapi.com/crm/v3/objects/companies/search",
        "options": {},
        "bodyParametersUi": {
          "parameter": [
            {
              "name": "filters",
              "value": "={{$node[\"offset: Timestamp\"].json[\"filters\"]}}"
            },
            {
              "name": "after",
              "value": "={{$json.paging.next.after}}"
            },
            {
              "name": "limit",
              "value": "10"
            }
          ]
        },
        "headerParametersUi": {
          "parameter": [
            {
              "name": "Authorization"
            },
            {
              "name": "Content-type",
              "value": "application/json"
            }
          ]
        },
        "queryParametersUi": {
          "parameter": []
        }
      },
      "name": "New Companies",
      "type": "n8n-nodes-base.httpRequest",
      "typeVersion": 1,
      "position": [
        710,
        400
      ],
      "executeOnce": false
    },
    {
      "parameters": {
        "conditions": {
          "string": [
            {
              "value1": "={{$json.paging.next.after}}",
              "operation": "isEmpty"
            }
          ]
        }
      },
      "name": "Empty Cursor",
      "type": "n8n-nodes-base.if",
      "typeVersion": 1,
      "position": [
        710,
        50
      ],
      "executeOnce": false
    },
    {
      "parameters": {
        "unit": "seconds"
      },
      "name": "Wait",
      "type": "n8n-nodes-base.wait",
      "typeVersion": 1,
      "position": [
        550,
        250
      ],
      "webhookId": "e3ab7eb9-12b5-44c3-b692-bd3acaf8a18b",
      "executeOnce": false
    },
    {
      "parameters": {
        "functionCode": "const allItems = []\nlet counter = 0\n\nwhile (true) {\n  try {\n    $items(\"Empty Cursor\", 0, counter).map(i => i.json.objects.map(o => allItems.push(o)))\n  } catch (err) {\n    break\n  }\n  counter++\n}\n\nreturn allItems.map(o => ({json: o}))\n"
      },
      "name": "Function",
      "type": "n8n-nodes-base.function",
      "typeVersion": 1,
      "position": [
        940,
        -110
      ]
    }
  ],
  "connections": {
    "Start": {
      "main": [
        [
          {
            "node": "Date & Time",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "offset: Timestamp": {
      "main": [
        [
          {
            "node": "New Companies",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Date & Time": {
      "main": [
        [
          {
            "node": "offset: Timestamp",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "New Companies": {
      "main": [
        [
          {
            "node": "Empty Cursor",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Empty Cursor": {
      "main": [
        [
          {
            "node": "Function",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "Wait",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Wait": {
      "main": [
        [
          {
            "node": "New Companies",
            "type": "main",
            "index": 0
          }
        ]
      ]
    }
  },
  "active": false,
  "settings": {},
  "id": 1004
}

and here is a visual of the flow:

Any guidance on how to make the final function work to combine the pages would be helpful.

Thanks
Scott

Hey @scottjscott,

I don’t have the answer to your question but why not use the Hubspot node which has an option to get recently created / modified companies from a certain timestamp?

Hey @Jon, the short answer is that I will probably utilise the HubSpot connectors somewhere in my workflows.

The longer answer, for another time, is that I need to get an http style solution working that will allow me to deal with pagination because of data volumes: and it won’t be enough to rely upon webhooks / triggers.

Regards
Scott

1 Like

That makes sense, just wasn’t sure if taking the longer route was worth the effort if it is something you need now but for learning how to deal with the pages and merging it makes sense.

What does the data look like that is hitting the function node is it just the last page or everything and it just needs cleaning?

I’ve got a working solution which I’m just pulling together and will post here shortly - as ever, I have borrowed from a solution in another forum post :smiley:

1 Like

Ok, I’ve got myself a solution that works now - I’m sure there are quicker / more optimal ways of doing this, but this will do me for now, and it was a learning exercise too :smiley:

My aim was to setup a 1 way sync of new Companies created in Hubspot, to a staging table in our data warehouse (Postgres). To make this work in this workflow I’ve created an offsets table in the data warehouse to store a timestamp offset and a batch number.

The pagination works nicely now - I’m pulling 50 items per page when I search for new companies, and I’ve managed to combine the pages using a Function node (thanks @RicardoE105 , @jan for the inspiration in other forum posts).

Before I insert the items into the data warehouse, I also assign them a uuid, and I associate them with the batch number created earlier in the workflow (will help with data processing downstream).

There are a couple of points where I introduce waiting time so I avoid hitting Hubspot API limits (150 every 10 seconds on professional edition, lower on the entry level Hubspot, so I’ve gone for 100).

This data is going to be part of a data model build that uses dbt, so I’ve just inserted the Hubspot properties retrieved from the API into a single json column in the database (I’ll disassemble it later).

Now I’ve gone back over this I can see a couple of areas where there is room for improvement, especially when the workflow returns no items from the API - it just stops at the Combine Companies node, which still works, but I need to improve upon that in the future.

Anyway, I’ve learnt more about n8n in the process and I’ve got a working solution that with a few tweaks will get us synchronising hubspot entities to our data warehouse for use in real customer analytics.

I also learnt that when you see some workflows you want to try from the forums, you can simply copy them and then past them on to the workflow canvas! That has saved a lot of time!!!

{
  "name": "Hubspot company sync to data warehouse",
  "nodes": [
    {
      "parameters": {},
      "name": "Start",
      "type": "n8n-nodes-base.start",
      "typeVersion": 1,
      "position": [
        -650,
        520
      ]
    },
    {
      "parameters": {
        "requestMethod": "POST",
        "url": "https://api.hubapi.com/crm/v3/objects/companies/search",
        "options": {},
        "bodyParametersUi": {
          "parameter": [
            {
              "name": "filters",
              "value": "={{$node[\"offset: createdate Timestamp\"].json[\"filters\"]}}"
            },
            {
              "name": "after",
              "value": "={{$json.paging.next.after}}"
            },
            {
              "name": "limit",
              "value": "50"
            }
          ]
        },
        "headerParametersUi": {
          "parameter": [
            {
              "name": "Authorization",
              "value": "Bearer <insert your API key / token here>"
            },
            {
              "name": "Content-type",
              "value": "application/json"
            }
          ]
        },
        "queryParametersUi": {
          "parameter": []
        }
      },
      "name": "New Companies",
      "type": "n8n-nodes-base.httpRequest",
      "typeVersion": 1,
      "position": [
        490,
        520
      ],
      "executeOnce": false
    },
    {
      "parameters": {
        "functionCode": "const allData = []\n\nlet counter = 0;\ndo {\n  try {\n    const items = $items(\"New Companies\", 0, counter).map(item => item.json.results);\n                    \n    const aja = items[0].map(item => {\n      return { json: item }\n    })    \n    \n    allData.push.apply(allData, aja);\n    //allData.push($items(\"Increment\", 0, counter));\n  } catch (error) {\n    return allData;  \n  }\n\n  counter++;\n} while(true);\n"
      },
      "name": "Combine Companies",
      "type": "n8n-nodes-base.function",
      "typeVersion": 1,
      "position": [
        720,
        110
      ],
      "alwaysOutputData": false
    },
    {
      "parameters": {
        "functionCode": "return [\n  {\n    json: {\n          \"filters\": [\n            {\n              \"propertyName\": \"createdate\",\n              \"operator\": \"GT\",\n              \"value\": $json[\"timestampoffset\"]\n            }\n          ]\n        }\n    }\n]\n"
      },
      "name": "offset: createdate Timestamp",
      "type": "n8n-nodes-base.function",
      "typeVersion": 1,
      "position": [
        210,
        520
      ],
      "notesInFlow": true,
      "notes": "Create a json structure for the createdate filter that will be used in the request to the Hubspot API."
    },
    {
      "parameters": {
        "amount": 0.25,
        "unit": "seconds"
      },
      "name": "Pause for qtr sec",
      "type": "n8n-nodes-base.wait",
      "typeVersion": 1,
      "position": [
        284,
        318
      ],
      "webhookId": "e3ab7eb9-12b5-44c3-b692-bd3acaf8a18b",
      "executeOnce": false,
      "notesInFlow": true,
      "notes": "This is to ensure we don't get rate limited by the Hubspot API"
    },
    {
      "parameters": {
        "conditions": {
          "string": [
            {
              "value1": "={{$json.paging.next.after}}",
              "operation": "isEmpty"
            }
          ]
        }
      },
      "name": "API paging test",
      "type": "n8n-nodes-base.if",
      "typeVersion": 1,
      "position": [
        504,
        118
      ],
      "executeOnce": false
    },
    {
      "parameters": {
        "url": "=https://api.hubapi.com/crm/v3/objects/companies/{{$node[\"Combine Companies\"].json[\"id\"]}}",
        "options": {
          "batchInterval": 10000,
          "batchSize": 100
        },
        "headerParametersUi": {
          "parameter": [
            {
              "name": "Authorization",
              "value": "Bearer <insert your API key / token here>"
            },
            {
              "name": "Content-type",
              "value": "application/json"
            }
          ]
        },
        "queryParametersUi": {
          "parameter": [
            {
              "name": "properties",
              "value": "days_to_close,hs_analytics_first_timestamp,hs_analytics_first_touch_converting_campaign,hs_analytics_first_visit_timestamp,hs_analytics_last_timestamp,hs_lastmodifieddate,about_us,address,address2,annualrevenue,city,closedate,company_type,country,createdate,customer_status,description,domain,founded_year,hs_createdate,hs_last_sales_activity_timestamp,hs_lead_status,hs_num_child_companies,hs_num_open_deals,hs_object_id,hs_parent_company_id,hs_total_deal_value,industry,name,num_associated_deals,numberofemployees,pennington_sector,phone,recent_deal_amount,recent_deal_close_date,sector,total_revenue,type,web_technologies,website,zip,asbestos,building___surveying__b_q_,consultancy,epc,fire,lab,legionella,projects,resourcing,stock,technical_auditing,technical_monitoring,facebook_company_page,linkedin_company_page,hs_ideal_customer_profile,hs_is_target_account"
            }
          ]
        }
      },
      "name": "Get each Company",
      "type": "n8n-nodes-base.httpRequest",
      "typeVersion": 1,
      "position": [
        1120,
        110
      ],
      "executeOnce": false
    },
    {
      "parameters": {
        "amount": 7,
        "unit": "seconds"
      },
      "name": "Rate limit rest",
      "type": "n8n-nodes-base.wait",
      "typeVersion": 1,
      "position": [
        920,
        110
      ],
      "webhookId": "90976eb5-6367-42c2-a827-bfbd30772c38",
      "executeOnce": false,
      "notesInFlow": true
    },
    {
      "parameters": {
        "schema": "hubspot_stage",
        "table": "stage_hubspot_companies_new",
        "columns": "id, properties, createdAt, updatedAt, archived, uuid, processed_flag, batch_id",
        "additionalFields": {
          "mode": "independently"
        }
      },
      "name": "Postgres",
      "type": "n8n-nodes-base.postgres",
      "typeVersion": 1,
      "position": [
        1510,
        110
      ],
      "credentials": {
        "postgres": "PCL Datawarehouse DEV"
      }
    },
    {
      "parameters": {
        "operation": "executeQuery",
        "query": "select\n   lasttime_offset,\n   batchrun_id\nfrom \n   hubspot_stage.offsets \nwhere \n   offset_name = 'new_companies';",
        "additionalFields": {}
      },
      "name": "Get offsets",
      "type": "n8n-nodes-base.postgres",
      "typeVersion": 1,
      "position": [
        -450,
        520
      ],
      "credentials": {
        "postgres": "PCL Datawarehouse DEV"
      }
    },
    {
      "parameters": {
        "value": "={{$node[\"Get offsets\"].json[\"lasttime_offset\"]}}",
        "dataPropertyName": "timestampoffset",
        "toFormat": "x",
        "options": {}
      },
      "name": "Convert time offset",
      "type": "n8n-nodes-base.dateTime",
      "typeVersion": 1,
      "position": [
        -30,
        520
      ],
      "notesInFlow": true,
      "notes": "Convert the timestamp to a unix date for use in the query to the Hubspot API"
    },
    {
      "parameters": {
        "keepOnlySet": true,
        "values": {
          "string": [],
          "number": [
            {
              "name": "batch_id",
              "value": "={{$node[\"Get offsets\"].json[\"batchrun_id\"]+1}}"
            }
          ]
        },
        "options": {}
      },
      "name": "Set batch ID",
      "type": "n8n-nodes-base.set",
      "typeVersion": 1,
      "position": [
        -240,
        520
      ]
    },
    {
      "parameters": {
        "functionCode": "const uuid = require('uuid');\n\nitem.uuid = uuid.v4();\nitem.processed_flag = 0;\nitem.batch_id = $item(0).$node[\"Set batch ID\"].json[\"batch_id\"];\n\n// You can write logs to the browser console\nconsole.log('Done!');\n\nreturn item;\n\n\n\n\n\n"
      },
      "name": "FunctionItem",
      "type": "n8n-nodes-base.functionItem",
      "typeVersion": 1,
      "position": [
        1320,
        110
      ]
    },
    {
      "parameters": {
        "operation": "executeQuery",
        "query": "=select\nmax(\"createdAt\") as maximum_timestamp \nfrom\nhubspot_stage.stage_hubspot_companies_new where batch_id = '{{$node[\"Set batch ID\"].json[\"batch_id\"]}}'",
        "additionalFields": {
          "mode": "independently"
        }
      },
      "name": "Get max_timestamp",
      "type": "n8n-nodes-base.postgres",
      "typeVersion": 1,
      "position": [
        1700,
        110
      ],
      "credentials": {
        "postgres": "PCL Datawarehouse DEV"
      }
    },
    {
      "parameters": {
        "values": {
          "string": [
            {
              "name": "offset_name",
              "value": "new_companies"
            },
            {
              "name": "entity_name",
              "value": "Company"
            },
            {
              "name": "lasttime_offset",
              "value": "={{$node[\"Get max_timestamp\"].json[\"maximum_timestamp\"]}}"
            },
            {
              "name": "batchrun_id",
              "value": "={{$node[\"Set batch ID\"].json[\"batch_id\"]}}"
            }
          ]
        },
        "options": {}
      },
      "name": "Set",
      "type": "n8n-nodes-base.set",
      "typeVersion": 1,
      "position": [
        1890,
        110
      ],
      "executeOnce": true
    },
    {
      "parameters": {
        "operation": "update",
        "schema": "hubspot_stage",
        "table": "offsets",
        "updateKey": "offset_name",
        "columns": "entity_name, lasttime_offset, batchrun_id",
        "additionalFields": {
          "mode": "independently"
        }
      },
      "name": "Update offsets",
      "type": "n8n-nodes-base.postgres",
      "typeVersion": 1,
      "position": [
        2090,
        110
      ],
      "credentials": {
        "postgres": "PCL Datawarehouse DEV"
      }
    }
  ],
  "connections": {
    "New Companies": {
      "main": [
        [
          {
            "node": "API paging test",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Pause for qtr sec": {
      "main": [
        [
          {
            "node": "New Companies",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "API paging test": {
      "main": [
        [
          {
            "node": "Combine Companies",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "Pause for qtr sec",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Combine Companies": {
      "main": [
        [
          {
            "node": "Rate limit rest",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Get offsets": {
      "main": [
        [
          {
            "node": "Set batch ID",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Convert time offset": {
      "main": [
        [
          {
            "node": "offset: createdate Timestamp",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "offset: createdate Timestamp": {
      "main": [
        [
          {
            "node": "New Companies",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Rate limit rest": {
      "main": [
        [
          {
            "node": "Get each Company",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Get each Company": {
      "main": [
        [
          {
            "node": "FunctionItem",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Start": {
      "main": [
        [
          {
            "node": "Get offsets",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "FunctionItem": {
      "main": [
        [
          {
            "node": "Postgres",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Set batch ID": {
      "main": [
        [
          {
            "node": "Convert time offset",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Postgres": {
      "main": [
        [
          {
            "node": "Get max_timestamp",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Get max_timestamp": {
      "main": [
        [
          {
            "node": "Set",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Set": {
      "main": [
        [
          {
            "node": "Update offsets",
            "type": "main",
            "index": 0
          }
        ]
      ]
    }
  },
  "active": false,
  "settings": {},
  "id": 1
}
4 Likes