Send custom Job Query to Big Query, parse answer and save to google sheet

Hey Buddies!

Easy way how to send your own pretty customised Job to big query and save answer to Google Sheet.

1. HTTP Request

URL (don’t forget edit your {Your project ID})
https://bigquery.googleapis.com/bigquery/v2/projects/{Your project ID}/queries

JSON/RAW Parameters
true

Tip: You can simply Copy&Paste your Queries from Big Query Console :wink:

Job/Query (paste into Body Parameters and don’t forget edit your location e.g.
Body Parameters:
{“query”: “Your Query”,
“location”: “europe-west3”,
“useLegacySql”: false,
“useQueryCache”: true}

2. SET Schema

Keep only Set
true
Value to Set: String
Name: schema
Value: {{$json["data"]["schema"]}}

3. SET Rows

Keep only Set
true
Value to Set: String
Name: rows
Value: {{$json["data"]["rows"]}}

4. Merge

Mode: Merge By Index
Join: Inner Join

5. convert (Credits an Jan)

JavaScript Code:

function convertBQToMySQLResults(schema, rows) {

    var resultRows = []
    
    function recurse (schemaCur, rowsCur, colName) {

        if (Array.isArray(schemaCur) && !Array.isArray(result[colName])) {
            for(var i=0, l=schemaCur.length; i<l; i++) {
                if (colName === "")
                    recurse(schemaCur[i], rowsCur.f[i], colName + schemaCur[i].name)
                else
                    recurse(schemaCur[i], rowsCur.f[i], colName + "." + schemaCur[i].name)
            }    
        }

        if (schemaCur.type && schemaCur.type === "RECORD") {
            if (schemaCur.mode !== "REPEATED") {
                var valIndex = 0
                for (var p in schemaCur.fields) {
                    if (rowsCur.v === null) {
                        recurse(schemaCur.fields[p], rowsCur, colName + "." + schemaCur.fields[p].name)
                    } else {
                        recurse(schemaCur.fields[p], rowsCur.v.f[valIndex], colName + "." + schemaCur.fields[p].name)
                    }
                    
                    valIndex++
                }
            } 
            
            if (schemaCur.mode === "REPEATED") {   
                result[colName] = [] 
                for (var x in rowsCur.v) {
                    recurse(schemaCur.fields, rowsCur.v[x], colName)
                }
            }
        } else {
            if (schemaCur.mode === "REPEATED") {
                if (rowsCur.v !== null) {
                    result[colName] = rowsCur.v.map( (value, index) => { return value.v })
                } else {
                    result[colName] = [ null ]
                }
                
            } else if (Array.isArray(result[colName])) {
                let nextRow = {} 
                for (var j in schemaCur) {
                    nextRow[colName + "." + schemaCur[j].name] = Array.isArray(rowsCur.v.f[j].v) ? rowsCur.v.f[j].v.map( (value, index) => { return value.v }) : rowsCur.v.f[j].v 
                }
                result[colName].push(nextRow)
            } else {
                if (colName !== "")
                    result[colName] = rowsCur.v
            }
        }
    }

    for (var r=0, rowsCount=rows.length; r<rowsCount; r++) {
        var result = {};
        recurse(schema, rows[r], "")
        resultRows.push(result)
    }

    return resultRows
}

const rows = convertBQToMySQLResults(items[0].json.schema.fields, items[0].json.rows);
return rows.map(row => ({json: row}));

6. Save to Sheet

Background
Im Neewbie with n8n and this is my first Concept how to use it. Your Feedbacks are welcome. I personally use it to automatise whole Processing.

  1. My Co-workers put daily Data into Sheets.
  2. Then they start AppScript and Data are uploaded automatically to BigQuery.
  3. After success, they call this n8n Scenario via Webhook/HTTP Request (via AppScript)
  4. n8n do this great Job and my colleagues receive just nice updated Sheet

Workflow code - (Copy& Paste (ctrl + v or cmd + v) anywhere in the n8n window)

{
  "nodes": [
    {
      "parameters": {},
      "name": "Start",
      "type": "n8n-nodes-base.start",
      "typeVersion": 1,
      "position": [
        210,
        300
      ]
    },
    {
      "parameters": {
        "authentication": "oAuth2",
        "requestMethod": "POST",
        "url": "https://bigquery.googleapis.com/bigquery/v2/projects/[Your-Project-ID]/queries",
        "responseFormat": "string",
        "jsonParameters": true,
        "options": {},
        "bodyParametersJson": "={\"query\": \"Your Query\", \"location\": \"europe-west3\", \"useLegacySql\": false, \"useQueryCache\": true}"
      },
      "name": "HTTP Request",
      "type": "n8n-nodes-base.httpRequest",
      "typeVersion": 1,
      "position": [
        360,
        300
      ],
      "credentials": {
        "oAuth2Api": "Slavomir BQ"
      }
    },
    {
      "parameters": {
        "keepOnlySet": true,
        "values": {
          "string": [
            {
              "name": "schema",
              "value": "={{$json[\"data\"][\"schema\"]}}"
            }
          ]
        },
        "options": {}
      },
      "name": "Schema",
      "type": "n8n-nodes-base.set",
      "typeVersion": 1,
      "position": [
        520,
        240
      ]
    },
    {
      "parameters": {
        "keepOnlySet": true,
        "values": {
          "string": [
            {
              "name": "rows",
              "value": "={{$json[\"data\"][\"rows\"]}}"
            }
          ]
        },
        "options": {}
      },
      "name": "Rows",
      "type": "n8n-nodes-base.set",
      "typeVersion": 1,
      "position": [
        520,
        380
      ]
    },
    {
      "parameters": {
        "operation": "append",
        "sheetId": "your sheet",
        "range": "test!A:E",
        "options": {}
      },
      "name": "Google Sheets",
      "type": "n8n-nodes-base.googleSheets",
      "typeVersion": 1,
      "position": [
        1040,
        310
      ],
      "credentials": {
        "googleApi": "Api-Bot"
      }
    },
    {
      "parameters": {
        "mode": "mergeByIndex",
        "join": "inner"
      },
      "name": "Prepare Input",
      "type": "n8n-nodes-base.merge",
      "typeVersion": 1,
      "position": [
        690,
        310
      ],
      "notesInFlow": true,
      "notes": "Merge Header & Rows"
    },
    {
      "parameters": {
        "functionCode": "function convertBQToMySQLResults(schema, rows) {\n\n    var resultRows = []\n    \n    function recurse (schemaCur, rowsCur, colName) {\n\n        if (Array.isArray(schemaCur) && !Array.isArray(result[colName])) {\n            for(var i=0, l=schemaCur.length; i<l; i++) {\n                if (colName === \"\")\n                    recurse(schemaCur[i], rowsCur.f[i], colName + schemaCur[i].name)\n                else\n                    recurse(schemaCur[i], rowsCur.f[i], colName + \".\" + schemaCur[i].name)\n            }    \n        }\n\n        if (schemaCur.type && schemaCur.type === \"RECORD\") {\n            if (schemaCur.mode !== \"REPEATED\") {\n                var valIndex = 0\n                for (var p in schemaCur.fields) {\n                    if (rowsCur.v === null) {\n                        recurse(schemaCur.fields[p], rowsCur, colName + \".\" + schemaCur.fields[p].name)\n                    } else {\n                        recurse(schemaCur.fields[p], rowsCur.v.f[valIndex], colName + \".\" + schemaCur.fields[p].name)\n                    }\n                    \n                    valIndex++\n                }\n            } \n            \n            if (schemaCur.mode === \"REPEATED\") {   \n                result[colName] = [] \n                for (var x in rowsCur.v) {\n                    recurse(schemaCur.fields, rowsCur.v[x], colName)\n                }\n            }\n        } else {\n            if (schemaCur.mode === \"REPEATED\") {\n                if (rowsCur.v !== null) {\n                    result[colName] = rowsCur.v.map( (value, index) => { return value.v })\n                } else {\n                    result[colName] = [ null ]\n                }\n                \n            } else if (Array.isArray(result[colName])) {\n                let nextRow = {} \n                for (var j in schemaCur) {\n                    nextRow[colName + \".\" + schemaCur[j].name] = Array.isArray(rowsCur.v.f[j].v) ? rowsCur.v.f[j].v.map( (value, index) => { return value.v }) : rowsCur.v.f[j].v \n                }\n                result[colName].push(nextRow)\n            } else {\n                if (colName !== \"\")\n                    result[colName] = rowsCur.v\n            }\n        }\n    }\n\n    for (var r=0, rowsCount=rows.length; r<rowsCount; r++) {\n        var result = {};\n        recurse(schema, rows[r], \"\")\n        resultRows.push(result)\n    }\n\n    return resultRows\n}\n\nconst rows = convertBQToMySQLResults(items[0].json.schema.fields, items[0].json.rows);\nreturn rows.map(row => ({json: row}));"
      },
      "name": "Convert",
      "type": "n8n-nodes-base.function",
      "typeVersion": 1,
      "position": [
        860,
        310
      ]
    }
  ],
  "connections": {
    "Start": {
      "main": [
        [
          {
            "node": "HTTP Request",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "HTTP Request": {
      "main": [
        [
          {
            "node": "Schema",
            "type": "main",
            "index": 0
          },
          {
            "node": "Rows",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Schema": {
      "main": [
        [
          {
            "node": "Prepare Input",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Rows": {
      "main": [
        [
          {
            "node": "Prepare Input",
            "type": "main",
            "index": 1
          }
        ]
      ]
    },
    "Prepare Input": {
      "main": [
        [
          {
            "node": "Convert",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Convert": {
      "main": [
        [
          {
            "node": "Google Sheets",
            "type": "main",
            "index": 0
          }
        ]
      ]
    }
  }
}
6 Likes

Great @kallados, thanks a lot for sharing!

1 Like

Love that you posted this, @kallados — many thanks!

1 Like

We are evaluating n8n as an internal alert system.

One of our needs is to be able to launch queries to GBQ and depending on the result generate alerts or not.

The bigquery node was a bit disappointing as it could only pick up whole tables and not launch the query method. This solution has helped us to unblock the problem and to be able to raise n8n again as a valid alert system for us.

Thank you!

Even so, I would like to make 3 comments. One about this workflow and the others about the core nodes.

  1. The workflow can be simplified. There is no need for the two “set” nodes. The script works fine without them and without the need for the merge because the request already takes the schema and rows responses as usable data. This simplifies the workflow to 3 nodes only. We have added before a “set” node to define there the google cloud project and the query (and not having to edit the json of the http request all the time).

  2. As a disadvantage (I don’t understand why) the http request node only allows Oauth 2 credentials and not a service account (the bigquery one allows both but it doesn’t allow queries). Does anyone know if there are plans to add the service account? For bigquery oauth doesn’t make much sense because it doesn’t allow you to control cloud privileges.

  3. Could we take advantage of this script to transform this post into a request and improve the bigquery node? The functionality for create is great. But we really need to be able to make custom requests without js hacks.If we could also launch queries that generate tables its potential would multiply a lot.

1 Like

Hi,

I am not sure if I got it right. I use the following Query for creating tables. (Auth2)

URL (replace xxx with your needs)

https://bigquery.googleapis.com/bigquery/v2/projects/xxx/datasets/xxx/tables)

Response Format

JSON

JSON/RAW Parameters

TRUE

Create Table

Create Dataset

Delete Dataset (just through Path)

1 Like

Hello ,

I probably explained myself wrong. The third point is not related to your workflow. With your approach I can make almost any call (as you say) through the http request node (which is limited to oauth2).

I was just suggesting that those actions should either be able to be done with the native bigquery node (and not with http request looking each time at the endpoint and json to send) or at least be able to use service accounts in the http request node.

Great work @kallados ! In my workplace we make use of your solution and works like a charm.
However, when parsing the answer, I found that Integer and Float fields from BigQuery are treated like a String when writing into CSV (at least when opened with Excel or LibreOffice Calc), so it has to be parsed again to allow calculations between them.

I have adapted your initial parser to allow Integers and Floats from BigQuery to be treated like so in CSV.
Note: I’m not proficient in JavaScript in any way, take this into consideration.

function parseGBQResults(schema, rows) {
    
    var resultRows = []
    
    function recurse (schemaCur, rowsCur, colName) {

        if (Array.isArray(schemaCur) && !Array.isArray(result[colName])) {
            for(var i=0, l=schemaCur.length; i<l; i++) {
                if (colName === "")
                    recurse(schemaCur[i], rowsCur.f[i], colName + schemaCur[i].name)
                else
                    recurse(schemaCur[i], rowsCur.f[i], colName + "." + schemaCur[i].name)
            }    
        }

        if (schemaCur.type && schemaCur.type === "RECORD") {
            if (schemaCur.mode !== "REPEATED") {
                var valIndex = 0
                for (var p in schemaCur.fields) {
                    if (rowsCur.v === null) {
                        recurse(schemaCur.fields[p], rowsCur, colName + "." + schemaCur.fields[p].name)
                    } else {
                        recurse(schemaCur.fields[p], rowsCur.v.f[valIndex], colName + "." + schemaCur.fields[p].name)
                    }
                    
                    valIndex++
                }
            } 
            
            if (schemaCur.mode === "REPEATED") {   
                result[colName] = [] 
                for (var x in rowsCur.v) {
                    recurse(schemaCur.fields, rowsCur.v[x], colName)
                }
            }
        } else {
            if (schemaCur.mode === "REPEATED") {
                if (rowsCur.v !== null) {
                    result[colName] = rowsCur.v.map( (value, index) => { return value.v })
                } else {
                    result[colName] = [ null ]
                }
            } else if (Array.isArray(result[colName])) {
                let nextRow = {} 
                for (var j in schemaCur) {
                    nextRow[colName + "." + schemaCur[j].name] = Array.isArray(rowsCur.v.f[j].v) ? rowsCur.v.f[j].v.map( (value, index) => { return value.v }) : rowsCur.v.f[j].v 
                }
                result[colName].push(nextRow)
            } else if(schemaCur.type && schemaCur.type === "INTEGER") {
                if(colName !== "")
                    result[colName] = parseInt(rowsCur.v)
            } else if(schemaCur.type && schemaCur.type === "FLOAT") {
                if(colName !== "")
                    result[colName] = parseFloat(rowsCur.v)
            } else {
                if (colName !== "")
                    result[colName] = rowsCur.v
            }
        }
    }

    for (var r=0, rowsCount=rows.length; r<rowsCount; r++) {
        var result = {};
        recurse(schema, rows[r], "")
        resultRows.push(result)
    }

    return resultRows
}

const rows = parseGBQResults(items[0].json.schema.fields, items[0].json.rows);

// return all rows in one item
// return [ { "json" : { queryResults : rows} } ];

// return each row as item
return rows.map(row => ({json: row}));