In part one and part two of this series, we introduced the concept of dynamic sourcing pipelines for data, the architectural implementation, and the metadata. Armed with this knowledge, we’ll dive in the pipeline in detail. Part three will show you how we start from external sources with very different characteristics and generalize step by step.
This blog will include lots of code-blocks and queries. Consider the code blocks as “telling the story” while we provide explanations and extra details in the text. As the data travels through the pipeline, it will be generalized. We’ll pause after we have finished a level of generalization.
This blog follows the structure of a generic pipeline:
You’ll recall that Airflow creates pipelines as Directed Acyclic Graphs (DAGs) in Python scripts. At GeoPhy, we created one DAG script to dynamically generate all DAGs. The script is mounted with a list of pipelines, using a loop such as the one below. The function
create_dag(dag) has several tasks with underlying functions.
Integration of specific components
We call the specific functions for each source with the importlib library, which imports a different script for each source. The name of the source defines which ‘specific’ script is imported.
Test and Build
The initial run takes a configuration file as input that is created by the developer. This file contains the data to populate the static tables. Any tables required are created during the first run. The pipeline recognizes that the metadata and tables are in place and skips this step in subsequent runs.
Extract starts with retrieving metadata from
metadata.source, instantiating the ‘specific’ source module. The extract process then gets the
source_file_list and the
download_file_list. It loops through the
source_file_list and ignores all files that are in
download_file_list, skipping extraction of files that are on S3 but not registered in metadata. This process enables us to drop all database objects and rerun from with a clean database. The specific extract function downloads all new files. After
source.extract has been executed, the metadata will be updated. A
delivery_sqn is created for the
source_file, the number of rows in S3 is counted and the
file_size_kb is retrieved. Finally, the
delivery_status is created.
The specific extract components are divided into two parts:
To collect all files we use a single function
get_source_file_list that collects all filenames.
I’d like to explain this part by an example of two different sources
- Source A publishes data with an API. All historical data is called with a single GET request.
- Source B periodically publishes a CSV with a snapshot on a download page. The CSVs never get removed from the download page.
Source A only needs to do a single call. Source B needs all files. The generic component (described above) is required to handle both, but with different specific components.
For an API call (source A) we name the files with a timestamp attached to its name (e.g.
source_A_202104.json).This turns data from an API call into files. For files (Source B), the task is straightforward (collect all filenames). This is the first generalization and subsequent steps in the pipeline don’t have to bother about how the data is sourced. The chosen date-granularity avoids that files are ‘double’ downloaded.
An example function (source A) looks like below. The name of the current year and month is added to the list. It looks on S3 for other files from the source and adds them to the
source_list . The output of the function is a list of all available files. In the generic extract component, the previously processed files are filtered out.
For source A a simple specific
extract function looks like below. This is the actual call to the API and storing the response in a json in the data bucket.
wrangle step retrieves metadata, instantiates the specific scripts and enters the
while loop, it looks for deliveries where the highest status is
1 — referring to the S3 step. It takes the earliest
delivery_sqn and works it way to the latest. In the loop, the
source_file is retrieved from the S3 bucket and the specific wrangle function is activated. The
wrangle function uploads the data to a database. The
processed_count_discarded is returned from the function.
It’s a bit of an odd way of activating a function. The reason is that discarded records are counted while the data is being wrangled. After wrangling they are lost and can’t be retrieved. Records are discarded because they are not able to load to a database. For example (in a CSV), the number of columns does not match the number of header columns. You’ll never find out which column is missing. The
processed_count is retrieved with a generic function that looks at the database table. The new status record is inserted in the metadata and the new
minimum_delivery_sqn is called again. This goes on and on until no more deliveries with a highest status of
The task of a specific
wrangle is downloading the data from S3, naming columns to the target definitions, adding the columns
delivery_sqn, and uploading data to the
STAGING_IN database. The wrangling needs to transform data to row format with key attributes and values attributes. In many cases, wrangling data is a minimal effort. However, some sources publish data in n-dimensional arrays or with a complex nested key further complicating the wrangling.
The attributes are renamed from source names to target definition. With the
attribute table, we map this. In the wrangle component, we retrieve the mapping and rename the columns. The target data types are already correctly set in the staging and target table with the Test and Build task.
It is essential that a sustainable key is chosen:
- A key with too much granularity will not update.
- A key that is chosen too small will overload with too many updates.
Proper analysis of the granularity and understanding of the data is important. It reduces technical debt and avoids surprises. It may be trivial for some sources, but don’t underestimate the task to get an understanding of the granularity and nature of the data. The user is a good source for this information.
In the example (and in most of our pipelines) we use Pandas DataFrames. It has easy-to-use read and write capabilities with databases and file formats like CSV and JSON. It comes out-of-the-box with tons of useful transformation methods. This comes in handy for sources that fit in memory.
It gets problematic with sources that are too big for memory, requiring alternatives. We solved this issue with chunking.
Processing a big source through all stages consumes substantial memory and CPU resources. It usually is a long-running job and when it fails, it needs to be restarted and run again for a long time. This makes the process fragile.
Worse, processes that fail due to resource issues generally fail in a non-deterministic way: it’s hard to find the root cause of the problem. To add robustness (and unfortunately sacrifice simplicity) we chose a chunking strategy. Files are loaded to the database with chunks. To chunk a big file we use the following generator:
A tricky thing with chunking is preserving idempotence. The DAG either:
- needs to delete all intermediate chunks that weren’t logged in the metadata, or
- it needs to recognize the intermediate chunks that were created by a previous run.
We chose the latter approach because it is more efficient. We ensured that chunks are atomic operations, which allows us to read the database state and find out which data still needs to be processed. In subsequent phases we have two branches for loading data; one for chunked, one for single staged. The delta load section below will show this.
We have completed the next step in generalization. After the wrangling phase has been completed, the data is in a generic format. All subsequent tasks are fully generic and therefore don’t require any source-specific components.
Data Quality Checks
As in previous steps,
run_quality_checks retrieves metadata. It gets the earliest
delivery_sqn and enters the
while loop. It parses out the
quality_checks and instantiates the Checks class. The checks class has all tests. The results are written to the metadata table
data_quality_checks. We want to be able to link any test to any pipeline (many-to-many). The
eval function provides the flexibility we need because we can link any source to any test by just adding the name of the test to a source. So we can add and link more tests without touching the generic code. The effort to generalize the data and code really pays off!
delta_load function starts with parsing out the loading_strategy and
loading_strategy is a combination of (Insert, Update, Delete). The
publish_type is either
delta. All of them are important in handling the load as we will see shortly. It instantiates the
DeltaLoads class and enters the
while loop with the earliest
delta_load there are two branches to load data to the historical layer: One is for sources that only deliver new data (Delta) and one is for sources that deliver snapshots. Delta is really easy; the source indicates new, updated, and deleted records with every delivery. With snapshot, we have to compare the ‘active snapshot’ in the historical layer with the snapshot that is about to be loaded to the historical layer.
The snapshot is created with
build_hist_snap_query. (We’ll address queries below.) Then the individual loading strategies are called. Throughout the
delta_load process the number of processed records are collected and inserted into the
delivery_status with the
set_status method. At last we validate whether the referential integrity with
metadata.delivery is still valid to ensure the correct link to the metadata.
The snapshot consists of the undeleted and latest keys. The undeleted state is filtered by
change_indicator != 'D' . Latest is achieved by aggregating over the keys and taking the latest
The new delivery in staging is compared against the set of active records. The comparison works as follows:
We use Slowly Changing Dimensions type 2 (SCD) in the historical layer. This means that every change gets a new row. We use SCD 2 because we have sources that retro-actively change.
Another key concept in the historical layer is immutability. This means that a record can never change. Though immutability allows deletion (and insert data again). The query attaches the
change_indicator to indicate the change. With the
change_indicator we don’t have to touch old records to make new records valid.
Recall that we count the number of records in each stage to verify completeness. The delta load step has some particularities when it comes to counting the records. At first sight, a record processed from staging to historical can result in insert, update, or delete. But if a record already exists, nothing is written in the historical layer. We need to track how many records did not result in a write action to enable reconciliation on completeness. At last, we designed the processes in such a way that row duplicates are discarded during writing using
ON CONFLICT DO NOTHING. We count the duplicates before writing into the historical layer.
The insert query checks for keys that exist in staging, but not in the historical layer. With a
LEFT JOIN from
STAGING we identify new keys.
The update query checks for keys that are equal and rows that are not equal between
The delete query checks for keys that exist in the historical layer, but not in staging. Deletes are not delivered. They are simply missing records. Therefore a dummy ‘delete’ record is created that includes the key attributes and the
delivery_sqn is included for which the key is not found.
Putting it all together; how to add a new pipeline
We went through the pipeline in great detail. You should have a decent understanding of how we designed the system. Let’s look at what it takes to add a pipeline once all generic components are in place.
First we need to define how we extract and wrangle our source. This basically comes down to building the source specific functions. Once we are satisfied with them, we add them to the generic pipeline.
Second we need the configuration file. This JSON file has all the elements to populate the static tables (source, attribute, and source_attribute). An example configuration file is found here:
The script and configuration file need to be pushed to a git repository. The commit triggers the CI pipeline and this ships the files to the containers where the webserver and scheduler are running.
Stuff we haven’t covered
In this series we covered a lot, but there is more to tell. We haven’t covered how we build a few cool dashboards that monitor the pipelines. This is another application where metadata is useful. We haven’t covered how we build the pipelines that transform data for the applications. We haven’t discussed how we test the pipeline. We’ll be eager to publish this in future blog posts.
We discussed the pipeline in great detail. You have seen all steps like download, wrangle, quality check, and delta load. Each step works towards generalizations. At the point of the historical layer, the data is universally formatted and annotated with metadata. This setup allows data scientists to work with the data really easily. It also simplifies using the data in applications and exploration. It basically can serve any purpose within the company. On top of that, it is really easy to add a new pipeline because many features are abstracted in the generic components. This is a great gain in time-to-market with new data sources. It is quite a lot of work to set it all up, but in the end it pays off in terms of maintainability and documentation.
The series has three parts:
Part 3: Pipeline in detail with metadata interactions
This blog is also published at the GeoPhy website