2) Loading Data to BigQuery

Given all of the different methods of loading data to BigQuery, what’s the best way to do it?

Danilo Drobac
10 min readNov 22, 2022
Data With Dro — Loading Data to BigQuery

This article is Part 2 of a series titled “Real World Python for Data Engineering” (linked).

Each article progresses on what’s been built in the previous article so it’s best to check those out first if you wish to follow along!

Intro

In Part 1 of this series, we successfully made requests to the Rick and Morty API, paginating through all of the results and storing them all in a local variable.

Now, as in the real world, we want to store that data somewhere useful, like a Data Warehouse.

For this tutorial, and because of my background as a Google Cloud Platform data engineer, I’ll be choosing to store this data in BigQuery.

You could choose from alternatives: Azure Data Warehouse for Microsoft folk, Redshift for AWS lovers, or Snowflake for cool kids who like to stay current.

You can read more about the role Data Warehouses play in the Modern Data Stack here:

There’s a plethora of ways to load data into BigQuery…

You can load it using the gcloud CLI. Load from a DataFrame. A CSV file. JSON records. A file hosted in GCS.

The common theme is that under the hood, these all create a LoadJob which is used to load the data into a specified table with some additional bits of information that BigQuery needs (such as a table schema).

The other approach is to not load the data (which loads it as a table) but rather stream records directly into the table. The difference here is for another article, but I’m just making you aware that there’s an alternative approach.

Methodology

Let’s break down the different parts of this tutorial:

  1. Load our data into pandas and create the row_hash and ingestion_date fields.
  2. Denormalise the data into an un-nested DataFrame.
  3. Create a schema object for BigQuery.
  4. Load the data to BigQuery

A few additional pieces of information/pre-requisites for this:

  • You will need a Google Cloud Platform account and project set-up: https://cloud.google.com/free
  • You’ll need to install the gcloud CLI and authenticate locally using gcloud auth login and set your project ID to the project you created/will be using for this tutorial: https://cloud.google.com/sdk/docs/install
  • You’ll need to enable the BigQuery API
  • A dataset called rick_and_morty needs to be created within BigQuery
  • I’m using WSL2 on Windows 22H2
  • I’ll be implementing this in Python 3.8
  • Dependency management will be done through pipenv, requirements are requests ,pandas and google-cloud-bigquery and black as a dev dependency for auto-formatting (another topic of discussion).

Processing Order

In our finished main.py file, the processing order is as follows:

1) Processing order
  • We use the results to run prepare_data and get our finished json_records and schema .
  • We create a BigQuery Client instance.
  • We build a LoadJobConfig from our schema
  • Then finally, we load our data to BigQuery.

I. Loading Data to DataFrame

If we remember back to the last tutorial, it ended with us having a list of CharacterSchema objects stored in a variable called results .

By default, pandas can load a DataFrame from a list of dataclass objects, but as we have nested data within this, we want to use another function to load the data.

The function that does this is wrapped in the prepare_data function which is in the main.py file and looks like this:

2) prepare_data() function in main.py

We can see that the first line in the function creates the DataFrame by using the pd.json_normalize() function. This function takes JSON input and converts any nested structures into separate fields separated by “.” (default). For example, location which has name and url as fields would be turned into location.name and location.url in the DataFrame.

We use list comprehension and the asdict function imported from dataclasses to convert our CharacterSchema objects to dictionaries, and pass that in as the value to normalise.

Now that we have a DataFrame, we can go ahead and run the transform_dataframe() function…

II. Create row_hash and ingestion_date Fields

Note — in general, in our new ELT world, we want to leave source data as raw as possible, meaning as few transformations as we need. Adding metadata fields is fine, but the records themselves shouldn’t be changed.

All of the following code is held in another file called transforms.py .

3) create_row_hash() function

Why do we need a row_hash ? In many API instances, as a Data Engineer, you might be pulling from some reporting endpoint which is a combined view of lots of data. Depending on the API, you may or may not get a unique ID for these fields, so for that reason, we’ll create our own to show how you would address it.

This function is a neat way to take a DataFrame, and use the pd.util.hash_pandas_object function to create a hash for each row. Note — because our DataFrame can contain complex objects that are unhashable (for example a list ), we use a version of the DataFrame where the type is string for all records.

We set the index of the DataFrame to the hash value, and then reset the index and name it as row_hash .

Et voila, we have a unique ID field of the type uint64 .

4) add_current_datetime() function

This creates a field at the start of the DataFrame called ingestion_date using the pd.to_datetime() function and setting the value to the current UTC time.

5) transforms.py — Combining DataFrame transformations

The above is the skeleton of what the transforms.py file looks like. It displays a functional approach to handling transformations with DataFrames.

We have a compose function, which takes an argument of Callable objects (functions) and creates a sequence of calling them in order, finally returning it as another “combined” function that we can call on another value.

So, we can see that the transform_dataframe function:

  • Takes the DataFrame df
  • Creates a preprocessor variable using the compose function with all of the functions we want to run (in order)
  • Runs the preprocessor on the df value

In this example, because we’re not doing many transformations, we could have easily added the ingestion_date to each of the records without creating a DataFrame, and added a hash to each of the records (it would have been a little more tricky than what we did in image 3). I’ve decided to do it this way to show you how to use this methodology because it can be really useful in downstream processing of tables or essentially anywhere you’re using pandas .

I know what you’re thinking — there’s a 3rd function in preprocessor called df_denormalize that we haven’t gone through… We’ll do that next.

III. Denormalise the data into an un-nested DataFrame

At this point, our DataFrame has been normalized so our location record has now been turned into location.name and location.url . If we were to convert this back to JSON, we’d end up with something like this:

{
"location.name": "",
"location.url": ""
}

This isn’t what we want in our final JSON, we need it nested like it was originally, example:

{
"location": {
"name": "",
"url": ""
}
}

To do this, we’ll use this process:

  1. Find all parent nested fields (the ones with “.” in them).
  2. Sort them in order of how deeply nested they are.
  3. For each of the parents, find the list of children.
  4. Take a subset of the DataFrame with only the children.
  5. Rename it so it only leaves the field name and not the path to the field (e.g. something like location.country.state would just be called state ).
  6. Create a new field for the parent which is a JSON record of number 5.
  7. Delete the children from the DataFrame.

In code, it looks like this (inside of transforms.py )

6) Final functions in transforms.py

IV. Create a schema object for BigQuery

Looking back at image 2, it’s starting to make more sense. We’ve covered everything that gets us to line 3 and returns a df value we’re happy with. Our DataFrame is essentially complete.

But, before we go ahead and abandon the DataFrame, we’ll use it to create a BigQuery schema object.

A BigQuery SchemaField looks something like this:

SchemaField(name="", field_type="", mode="", fields=[]),

Field Type & Mode

The field_type is specific to BigQuery, so we need to have some conversion to handle mapping a type in pandas to a type in BigQuery.

We can use a useful function from pandas called dtype.kind to give us a single letter representing the types and use this for our mappings. More info here…

    TYPE_MAPPING = {
"i": "INTEGER",
"u": "NUMERIC",
"b": "BOOLEAN",
"f": "FLOAT",
"O": "STRING",
"S": "STRING",
"U": "STRING",
"M": "TIMESTAMP",
}

The mode value can be either NULLABLE , REQUIRED or REPEATED which is how to store lists/arrays. We won’t consider any fields REQUIRED because we don’t have information to explain which fields can or cannot be NULL. We could infer it for fields like ID but I trust that it’s correct for this use case.

If we wanted to extend this function to specify fields as REQUIRED we could accept another parameter req_fields and use that to determine which are required.

Fields

The fields value is only needed if the record is nested, i.e. like a dict . In that instance, the value you pass through is a BigQuery schema definition (list of SchemaFields ) so we can use our function recursively to handle this.

There is one distinction we have to make here. If we specify a fields value (the record is nested), then we also have to specify a field_type of RECORD .

The code for doing everything we’ve just mentioned is here, in main.py :

7) _generate_bigquery_schema() function

Essentially all we’re doing here is looping through each column in the DataFrame and setting values for the SchemaField based on certain conditions.

The most complex logic comes in the fields definition. We have two scenarios where fields are required (i.e. the record is nested):

  1. When the datatype of the records in the DataFrame is a dict
  2. When the record is a repeated record (a list), and the values within the list are a dict .

In both of these instances, we need to recursively run the _generate_bigquery_schema() function with the dict value loaded as a DataFrame.

The output is a schema which we can use directly in a BigQuery load job.

V. Load Data to BigQuery

Now we’ve generated the schema , the final part of prepare_data is to convert the DataFrame “records” to JSON (with ISO formatted dates) and lines=True which returns it in newline-delimited format (the JSON format needed to load data to BigQuery).

We’ve got everything we need to load the data; let’s do it!

This is the final bit of code that is missing from main.py :

Now we can run our script and all of the data for the character endpoint will be available to view in BigQuery!

Note — you’ll need to make sure you’ve already created the rick_and_morty dataset in location EU for this to work.

Schema tab for `rick_and_morty.character` in BigQuery
Preview tab for `rick_and_morty.character` in BigQuery

Closing Thoughts 💭

Photo by Rebe Pascual on Unsplash

We’ve made some good progress; we’ve pulled data from an API and been able to load it successfully into a table in BigQuery so it can be queried with SQL.

The data is in its rawest form with no transformations but a couple of additional metadata fields that can help us with downstream processing.

While this has worked perfectly and done exactly what we wanted, it’s still not entirely practical — we don’t really want to have to run this script from our laptops every time we want to pull an updated version of the data into BigQuery, do we?

In the next part of the series, we’ll look at deploying the code into a Google Cloud Function so that we can run it via any traditional HTTP request.

Feel free to drop me a follow on both Medium and LinkedIn for more similar content, and reach out with any questions, I love connecting to new people :)

Lastly, if you have any requests or ideas of other things I could run through in similar tutorial format, drop them in the comments down below and I’ll work out which topics I can cover!

You can find the full code for this here: GitHub

--

--

Danilo Drobac

Data Engineer. Director @ N-ZYTE // Data Nerd 🤓 // CrossFit and Food Addict