Building a Modern Data Pipeline Part 5: Pipeline Code-base

Andy Sawyer
6 min readFeb 27, 2024

--

AI Generated Image

This is the fifth part of a six-part series titled ‘Building a Modern Data Pipeline: A Journey from API to Insight’ related to this GitHub repo. It steps through setting up a Data Pipeline and running the pipeline end-to-end on your local machine.

Part 1 was a high level overview, while part 2 stepped through how to download and run the pipeline. Part 3 looks at the configuration of Docker, and part 4 reviewed the Python file used by Airflow to orchestrate our data pipeline.

This section is the ‘meaty section’ where we get into the code that actually moves data around. Enjoy!

Where’s the Code?

This blog post covers the code that can be found within the pipelines folder of the repo:

folder structure

This can be broken into four parts:

  1. Re-usable code: In the main folder there is a delta.py and a params.py file. The code in these is used over and over through the pipeline.
  2. Currency related code: In the currency_pipeline folder you will find the files that step through moving currency data from a csv into the bronze, then silver, then ultimately gold buckets.
  3. Rates related code: Similar to the above currency folder, except here we are calling an API to get currency rates and moving them through the medallion architecture.
  4. Generic pipeline code: Here we have a generated dim_date table. It’s built from scratch and so doesn’t require any source data that would first get landed in bronze and staged through silver.

Re-Usable Code

To get it out of the way, there is a simple params.py file. This contains the API key for the API we’re going to be getting our rates from, as well as the rates in a list of dictionaries. I’ll call it out now that in a production pipeline you should never store your keys or passwords in the code. Ideally, you would want to store them in the AWS Parameter Store and call them with boto3 or similar. This ensures your access to resources remains secure. I’d note that I got an almost immediate automated email from GitHub when I pushed this repo, letting me know that it appeared I’d saved private data into the repo. DON’T DO THIS IN PROD!!!

The second file contains a class that I’ve named DeltaS3. It has two methods:

  1. Read. This will read the data from the specified Delta table in an S3 bucket. You provide the bucket and the table name, and it will return a Polars DataFrame of the DeltaTable.
  2. Write. Unsurprisingly, this writes a DataFrame that is passed in to a table in a specified bucket. There are some additional parameters, including whether it should overwrite or append data, and what to do if the schema changes.
Reading DeltaTables from an S3 bucket

While a simple class, reading and writing DataFrames to S3 happens over and over in this demo. So it made sense to keep the code DRY (don’t repeat yourself) and have this somewhere I could call on over and over.

Currency Related Code

Broken into three files, there are a number of steps that we perform to move currency data through our pipeline:

  1. Get the raw currency data. get_currencies_to_bronze.py does just this. We use Polars to extract the list of currencies from a csv file that is stored in the seeds/ folder. Once read in, we write it ‘as is’ to the bronze bucket for later use.
  2. Transform the currency data. transform_currencies_to_silver.py picks up where step 1 finished. It loads the data from the bronze bucket and performs some simple transformations. Namely it adds a hash value for each currency, and makes the column headings lower case. Polars is case sensitive, so a column named Currency is different to a column named currency. Cleaning this up helps ensure no issues downstream.
  3. Present the currency data. present_currencies_in_gold.py completes the currency journey. Picking up the transformed data from the silver bucket, we leverage the SQL capabilities of Polars to write a SQL query on the data. We ensure no duplicates by performing a SELECT DISTINCT, and then we add a ‘surrogate key pipeline’. This is used in Kimball fact and dimension modelling to accommodate for late arriving dimensions. Finally, we save the data back to the gold bucket for later use.

I should note at this point that throughout the code, I have opted to overwrite data in the DeltaTables. Ideally in a production setting we would want to leverage an append option. For the purposes of this demonstration however, I felt that it was ok to simply overwrite if someone chose to run the pipeline twice.

Rates Related Code

Following the same approach as with the currencies, we step through a pipeline of bronze, silver and gold buckets. This code however is slightly different:

  • Extracting from API. While the currencies don’t change, and so we could store them in a csv, the rates change frequently. As such, we connect to an API to get the rates. While I’ve opted to pull a full history of rates each time, in a production setting, you would likely only pull recent rates. This allows you to append them to the existing dataset.
  • Iterating over the data. There isn’t just one API call to get everything we need. We have defined the rates that we want, and there is a process of iterating over these and calling them from the API one at a time. We save these into our bronze bucket in raw state, so one table per rate pair.
  • Unnesting and unpivoting the data. Unlike a csv which is already in a nicely tabular format, the json provided back from the API is nested and not in the best structure for analytics. As part of the transformation, we unnest the relevant portion of the json, and then we unpivot this to get it back into a nicely usable format. Similarly to the currency process, we create some hash values, and rename our columns. There is also an update to the data types of some of the columns, as Polars has seen everything coming from the json data as a string.
  • Combining our data. The transformation step is the point that we can bring all of those raw rates tables together. Now that we’ve flattened the data, fixed the column names, and defined the data types, we can concatenate all of the data together into one larger table that holds all of our rates.
  • Saving our fact table. Similar to the currency dimension, we have used SQL to build our fact table. Now that we’ve defined the data types for the columns though, we’re able to perform some calculations. I’ve taken the daily opening and closing rates to get the movement. This can then be used to aggregate up over time when performing analysis.
Transforming the rates table

Generic Pipeline Code

This code is different to the other processes. The data is generated within the pipeline and simply saved once generated. I’ve used Polars to create a series of dates from 01/01/2010 through to ‘today’, and then save this as a DataFrame. I’ve then added a number of columns that can be useful for analytics purposes. These include the day of year, the day (Monday, Tuesday, etc.), and the week number. This allows an analyst to group over different time periods to spot trends.

Generate a date table in Polars

Next Steps

That’s all for this post. The next post will be coming shortly, and will finish off the series. We’ll jump into Jupyter and have a look at the tables we’ve created. Stay tuned, and please feel free to share your thoughts. Your feedback and questions are highly welcome. Follow me for updates on this series and more insights into the world of data engineering.

--

--

Andy Sawyer

Bringing software engineering best practices and a product driven mindset to the world of data. Find me at https://www.linkedin.com/in/andrewdsawyer/