2) Loading Data to BigQuery
Given all of the different methods of loading data to BigQuery, what’s the best way to do it?
This article is Part 2 of a series titled “Real World Python for Data Engineering” (linked).
Each article progresses on what’s been built in the previous article so it’s best to check those out first if you wish to follow along!
Intro
In Part 1 of this series, we successfully made requests to the Rick and Morty API, paginating through all of the results and storing them all in a local variable.
Now, as in the real world, we want to store that data somewhere useful, like a Data Warehouse.
For this tutorial, and because of my background as a Google Cloud Platform data engineer, I’ll be choosing to store this data in BigQuery.
You could choose from alternatives: Azure Data Warehouse for Microsoft folk, Redshift for AWS lovers, or Snowflake for cool kids who like to stay current.
You can read more about the role Data Warehouses play in the Modern Data Stack here:
There’s a plethora of ways to load data into BigQuery…
You can load it using the gcloud
CLI. Load from a DataFrame. A CSV file. JSON records. A file hosted in GCS.
The common theme is that under the hood, these all create a LoadJob
which is used to load the data into a specified table with some additional bits of information that BigQuery needs (such as a table schema).
The other approach is to not load the data (which loads it as a table) but rather stream records directly into the table. The difference here is for another article, but I’m just making you aware that there’s an alternative approach.
Methodology
Let’s break down the different parts of this tutorial:
- Load our data into
pandas
and create therow_hash
andingestion_date
fields. - Denormalise the data into an un-nested DataFrame.
- Create a
schema
object for BigQuery. - Load the data to BigQuery
A few additional pieces of information/pre-requisites for this:
- You will need a Google Cloud Platform account and project set-up: https://cloud.google.com/free
- You’ll need to install the
gcloud
CLI and authenticate locally usinggcloud auth login
and set your project ID to the project you created/will be using for this tutorial: https://cloud.google.com/sdk/docs/install - You’ll need to enable the BigQuery API
- A dataset called
rick_and_morty
needs to be created within BigQuery - I’m using WSL2 on Windows 22H2
- I’ll be implementing this in Python 3.8
- Dependency management will be done through
pipenv
, requirements arerequests
,pandas
andgoogle-cloud-bigquery
andblack
as a dev dependency for auto-formatting (another topic of discussion).
Processing Order
In our finished main.py
file, the processing order is as follows:
- We use the
results
to runprepare_data
and get our finishedjson_records
andschema
. - We create a BigQuery
Client
instance. - We build a
LoadJobConfig
from our schema - Then finally, we load our data to BigQuery.
I. Loading Data to DataFrame
If we remember back to the last tutorial, it ended with us having a list of CharacterSchema
objects stored in a variable called results
.
By default, pandas
can load a DataFrame from a list of dataclass
objects, but as we have nested data within this, we want to use another function to load the data.
The function that does this is wrapped in the prepare_data
function which is in the main.py
file and looks like this:
We can see that the first line in the function creates the DataFrame by using the pd.json_normalize()
function. This function takes JSON input and converts any nested structures into separate fields separated by “.” (default). For example, location
which has name
and url
as fields would be turned into location.name
and location.url
in the DataFrame.
We use list comprehension and the asdict
function imported from dataclasses
to convert our CharacterSchema
objects to dictionaries, and pass that in as the value to normalise.
Now that we have a DataFrame, we can go ahead and run the transform_dataframe()
function…
II. Create row_hash
and ingestion_date
Fields
Note — in general, in our new ELT world, we want to leave source data as raw as possible, meaning as few transformations as we need. Adding metadata fields is fine, but the records themselves shouldn’t be changed.
All of the following code is held in another file called transforms.py
.
Why do we need a row_hash
? In many API instances, as a Data Engineer, you might be pulling from some reporting endpoint which is a combined view of lots of data. Depending on the API, you may or may not get a unique ID for these fields, so for that reason, we’ll create our own to show how you would address it.
This function is a neat way to take a DataFrame, and use the pd.util.hash_pandas_object
function to create a hash for each row. Note — because our DataFrame can contain complex objects that are unhashable (for example a list
), we use a version of the DataFrame where the type is string
for all records.
We set the index of the DataFrame to the hash value, and then reset the index and name it as row_hash
.
Et voila, we have a unique ID field of the type uint64
.
This creates a field at the start of the DataFrame called ingestion_date
using the pd.to_datetime()
function and setting the value to the current UTC time.
The above is the skeleton of what the transforms.py
file looks like. It displays a functional approach to handling transformations with DataFrames.
We have a compose
function, which takes an argument of Callable
objects (functions) and creates a sequence of calling them in order, finally returning it as another “combined” function that we can call on another value.
So, we can see that the transform_dataframe
function:
- Takes the DataFrame
df
- Creates a preprocessor variable using the
compose
function with all of the functions we want to run (in order) - Runs the preprocessor on the
df
value
In this example, because we’re not doing many transformations, we could have easily added the ingestion_date
to each of the records without creating a DataFrame, and added a hash to each of the records (it would have been a little more tricky than what we did in image 3). I’ve decided to do it this way to show you how to use this methodology because it can be really useful in downstream processing of tables or essentially anywhere you’re using pandas
.
I know what you’re thinking — there’s a 3rd function in preprocessor
called df_denormalize
that we haven’t gone through… We’ll do that next.
III. Denormalise the data into an un-nested DataFrame
At this point, our DataFrame has been normalized so our location
record has now been turned into location.name
and location.url
. If we were to convert this back to JSON, we’d end up with something like this:
{
"location.name": "",
"location.url": ""
}
This isn’t what we want in our final JSON, we need it nested like it was originally, example:
{
"location": {
"name": "",
"url": ""
}
}
To do this, we’ll use this process:
- Find all parent nested fields (the ones with “.” in them).
- Sort them in order of how deeply nested they are.
- For each of the parents, find the list of children.
- Take a subset of the DataFrame with only the children.
- Rename it so it only leaves the field name and not the path to the field (e.g. something like
location.country.state
would just be calledstate
). - Create a new field for the parent which is a JSON record of number 5.
- Delete the children from the DataFrame.
In code, it looks like this (inside of transforms.py
)
IV. Create a schema object for BigQuery
Looking back at image 2, it’s starting to make more sense. We’ve covered everything that gets us to line 3 and returns a df
value we’re happy with. Our DataFrame is essentially complete.
But, before we go ahead and abandon the DataFrame, we’ll use it to create a BigQuery schema object.
A BigQuery SchemaField
looks something like this:
SchemaField(name="", field_type="", mode="", fields=[]),
Field Type & Mode
The field_type
is specific to BigQuery, so we need to have some conversion to handle mapping a type in pandas
to a type in BigQuery.
We can use a useful function from pandas
called dtype.kind
to give us a single letter representing the types and use this for our mappings. More info here…
TYPE_MAPPING = {
"i": "INTEGER",
"u": "NUMERIC",
"b": "BOOLEAN",
"f": "FLOAT",
"O": "STRING",
"S": "STRING",
"U": "STRING",
"M": "TIMESTAMP",
}
The mode
value can be either NULLABLE
, REQUIRED
or REPEATED
which is how to store lists/arrays. We won’t consider any fields REQUIRED
because we don’t have information to explain which fields can or cannot be NULL. We could infer it for fields like ID
but I trust that it’s correct for this use case.
If we wanted to extend this function to specify fields as REQUIRED
we could accept another parameter req_fields
and use that to determine which are required.
Fields
The fields
value is only needed if the record is nested, i.e. like a dict
. In that instance, the value you pass through is a BigQuery schema definition (list of SchemaFields
) so we can use our function recursively to handle this.
There is one distinction we have to make here. If we specify a fields
value (the record is nested), then we also have to specify a field_type
of RECORD
.
The code for doing everything we’ve just mentioned is here, in main.py
:
Essentially all we’re doing here is looping through each column in the DataFrame and setting values for the SchemaField
based on certain conditions.
The most complex logic comes in the fields
definition. We have two scenarios where fields
are required (i.e. the record is nested):
- When the datatype of the records in the DataFrame is a
dict
- When the record is a repeated record (a list), and the values within the list are a
dict
.
In both of these instances, we need to recursively run the _generate_bigquery_schema()
function with the dict
value loaded as a DataFrame.
The output is a schema
which we can use directly in a BigQuery load job.
V. Load Data to BigQuery
Now we’ve generated the schema
, the final part of prepare_data
is to convert the DataFrame “records” to JSON (with ISO formatted dates) and lines=True
which returns it in newline-delimited format (the JSON format needed to load data to BigQuery).
We’ve got everything we need to load the data; let’s do it!
This is the final bit of code that is missing from main.py
:
Now we can run our script and all of the data for the character
endpoint will be available to view in BigQuery!
Note — you’ll need to make sure you’ve already created the rick_and_morty
dataset in location EU
for this to work.
Closing Thoughts 💭
We’ve made some good progress; we’ve pulled data from an API and been able to load it successfully into a table in BigQuery so it can be queried with SQL.
The data is in its rawest form with no transformations but a couple of additional metadata fields that can help us with downstream processing.
While this has worked perfectly and done exactly what we wanted, it’s still not entirely practical — we don’t really want to have to run this script from our laptops every time we want to pull an updated version of the data into BigQuery, do we?
In the next part of the series, we’ll look at deploying the code into a Google Cloud Function so that we can run it via any traditional HTTP request.
Feel free to drop me a follow on both Medium and LinkedIn for more similar content, and reach out with any questions, I love connecting to new people :)
Lastly, if you have any requests or ideas of other things I could run through in similar tutorial format, drop them in the comments down below and I’ll work out which topics I can cover!
You can find the full code for this here: GitHub