Creating dynamic sourcing pipelines in Airflow: Automation(3/3)

In part one and part two of this series, we introduced the concept of dynamic sourcing pipelines for data, the architectural implementation, and the metadata. Armed with this knowledge, we’ll dive in the pipeline in detail. Part three will show you how we start from external sources with very different characteristics and generalize step by step.

This blog will include lots of code-blocks and queries. Consider the code blocks as “telling the story” while we provide explanations and extra details in the text. As the data travels through the pipeline, it will be generalized. We’ll pause after we have finished a level of generalization.

This blog follows the structure of a generic pipeline:

overview pipeline

DAG creation

Integration of specific components

Test and Build

Extract

Extract starts with retrieving metadata from metadata.source, instantiating the ‘specific’ source module. The extract process then gets the source_file_list and the download_file_list. It loops through the source_file_list and ignores all files that are in download_file_list, skipping extraction of files that are on S3 but not registered in metadata. This process enables us to drop all database objects and rerun from with a clean database. The specific extract function downloads all new files. After source.extract has been executed, the metadata will be updated. A delivery_sqn is created for the source_file, the number of rows in S3 is counted and the file_size_kb is retrieved. Finally, the delivery_status is created.

Specific

The specific extract components are divided into two parts: get_source_file_list and extract.

To collect all files we use a single function get_source_file_list that collects all filenames.

I’d like to explain this part by an example of two different sources

  • Source A publishes data with an API. All historical data is called with a single GET request.
  • Source B periodically publishes a CSV with a snapshot on a download page. The CSVs never get removed from the download page.

Source A only needs to do a single call. Source B needs all files. The generic component (described above) is required to handle both, but with different specific components.

For an API call (source A) we name the files with a timestamp attached to its name (e.g. source_A_202104.json).This turns data from an API call into files. For files (Source B), the task is straightforward (collect all filenames). This is the first generalization and subsequent steps in the pipeline don’t have to bother about how the data is sourced. The chosen date-granularity avoids that files are ‘double’ downloaded.

An example function (source A) looks like below. The name of the current year and month is added to the list. It looks on S3 for other files from the source and adds them to the source_list . The output of the function is a list of all available files. In the generic extract component, the previously processed files are filtered out.

For source A a simple specific extract function looks like below. This is the actual call to the API and storing the response in a json in the data bucket.

Wrangle

The wrangle step retrieves metadata, instantiates the specific scripts and enters the while loop, it looks for deliveries where the highest status is 1 — referring to the S3 step. It takes the earliest delivery_sqn and works it way to the latest. In the loop, the source_file is retrieved from the S3 bucket and the specific wrangle function is activated. The wrangle function uploads the data to a database. The processed_count_discarded is returned from the function.

It’s a bit of an odd way of activating a function. The reason is that discarded records are counted while the data is being wrangled. After wrangling they are lost and can’t be retrieved. Records are discarded because they are not able to load to a database. For example (in a CSV), the number of columns does not match the number of header columns. You’ll never find out which column is missing. The processed_count is retrieved with a generic function that looks at the database table. The new status record is inserted in the metadata and the new minimum_delivery_sqn is called again. This goes on and on until no more deliveries with a highest status of 1 exists.

Specific

The task of a specific wrangle is downloading the data from S3, naming columns to the target definitions, adding the columns id and delivery_sqn, and uploading data to the STAGING_IN database. The wrangling needs to transform data to row format with key attributes and values attributes. In many cases, wrangling data is a minimal effort. However, some sources publish data in n-dimensional arrays or with a complex nested key further complicating the wrangling.

The attributes are renamed from source names to target definition. With the attribute table, we map this. In the wrangle component, we retrieve the mapping and rename the columns. The target data types are already correctly set in the staging and target table with the Test and Build task.

It is essential that a sustainable key is chosen:

  • A key with too much granularity will not update.
  • A key that is chosen too small will overload with too many updates.

Proper analysis of the granularity and understanding of the data is important. It reduces technical debt and avoids surprises. It may be trivial for some sources, but don’t underestimate the task to get an understanding of the granularity and nature of the data. The user is a good source for this information.

In the example (and in most of our pipelines) we use Pandas DataFrames. It has easy-to-use read and write capabilities with databases and file formats like CSV and JSON. It comes out-of-the-box with tons of useful transformation methods. This comes in handy for sources that fit in memory.

It gets problematic with sources that are too big for memory, requiring alternatives. We solved this issue with chunking.

Chunking

Worse, processes that fail due to resource issues generally fail in a non-deterministic way: it’s hard to find the root cause of the problem. To add robustness (and unfortunately sacrifice simplicity) we chose a chunking strategy. Files are loaded to the database with chunks. To chunk a big file we use the following generator:

A tricky thing with chunking is preserving idempotence. The DAG either:

  1. needs to delete all intermediate chunks that weren’t logged in the metadata, or
  2. it needs to recognize the intermediate chunks that were created by a previous run.

We chose the latter approach because it is more efficient. We ensured that chunks are atomic operations, which allows us to read the database state and find out which data still needs to be processed. In subsequent phases we have two branches for loading data; one for chunked, one for single staged. The delta load section below will show this.

We have completed the next step in generalization. After the wrangling phase has been completed, the data is in a generic format. All subsequent tasks are fully generic and therefore don’t require any source-specific components.

Data Quality Checks

Delta Load

Process

In delta_load there are two branches to load data to the historical layer: One is for sources that only deliver new data (Delta) and one is for sources that deliver snapshots. Delta is really easy; the source indicates new, updated, and deleted records with every delivery. With snapshot, we have to compare the ‘active snapshot’ in the historical layer with the snapshot that is about to be loaded to the historical layer.

The snapshot is created with build_hist_snap_query. (We’ll address queries below.) Then the individual loading strategies are called. Throughout the delta_load process the number of processed records are collected and inserted into the delivery_status with the set_status method. At last we validate whether the referential integrity with metadata.delivery is still valid to ensure the correct link to the metadata.

The snapshot consists of the undeleted and latest keys. The undeleted state is filtered by change_indicator != 'D' . Latest is achieved by aggregating over the keys and taking the latest delivery_sqn .

The new delivery in staging is compared against the set of active records. The comparison works as follows:

We use Slowly Changing Dimensions type 2 (SCD) in the historical layer. This means that every change gets a new row. We use SCD 2 because we have sources that retro-actively change.

Another key concept in the historical layer is immutability. This means that a record can never change. Though immutability allows deletion (and insert data again). The query attaches the change_indicator to indicate the change. With the change_indicator we don’t have to touch old records to make new records valid.

Recall that we count the number of records in each stage to verify completeness. The delta load step has some particularities when it comes to counting the records. At first sight, a record processed from staging to historical can result in insert, update, or delete. But if a record already exists, nothing is written in the historical layer. We need to track how many records did not result in a write action to enable reconciliation on completeness. At last, we designed the processes in such a way that row duplicates are discarded during writing using ON CONFLICT DO NOTHING. We count the duplicates before writing into the historical layer.

Insert

Update

Delete

Putting it all together; how to add a new pipeline

First we need to define how we extract and wrangle our source. This basically comes down to building the source specific functions. Once we are satisfied with them, we add them to the generic pipeline.

Second we need the configuration file. This JSON file has all the elements to populate the static tables (source, attribute, and source_attribute). An example configuration file is found here:

The script and configuration file need to be pushed to a git repository. The commit triggers the CI pipeline and this ships the files to the containers where the webserver and scheduler are running.

Stuff we haven’t covered

Conclusion

The series has three parts:

Part 1: We outline the reasons to start this project.

Part 2: Elaboration on the implementation’s architecture and metadata.

Part 3: Pipeline in detail with metadata interactions

This blog is also published at the GeoPhy website

Data Engineer @ GeoPhy

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store