How we extract data for our DW

Felipe T. Farias
Alice Tech
Published in
3 min readJun 29, 2021

A long long time ago…

About a year ago I arrived at Alice with the compromise to make the world healthier. We had less than 100 employees and living from series A, which means short money. A big challenge and a lot of work to do!

The first challenges

As a data engineer, my responsibility was to provide data for everybody. The team was very small (actually just me and half the time of my boss) and the systems were changing very fast.

Facing the situation, the plan needed to be as simple and direct as possible:

  • Use a relational database as the data warehouse. We choose PostgreSQL hosted in AWS RDS. Easy to configure and maintain.
  • Use a simple tool to schedule the ETL pipelines. The choice was Airflow since it's a very stable, well know tool and we had experience with it (and we love Python).
  • A visualization interface. Metabase was the winner for many motives: Free, previous experience with it, easy to install and configure, self-service, very stable and users usually like it.

OK…. That's something we can make happen! I reused some previous code I wrote in the past to copy data from one place to another and data started to flow! Great! Mission accomplish!

Problems always happen!

BUT…. almost every day I had dags broken and had to add a table, fix some table structure, re-running dags, and deploying emergence PRs. The problem was for every new column, every modification in size or type, even the order of some selects could break something. How to solve it?

For every problem, a solution!

At that point, I started to work on an Airflow plugin for a smarter extractor which was baptized, given my total lack of imagination, "SmartTransfer". The main idea was to check the existence and structure of the table in the destination stage area and automatically create or re-create as needed.

The intention was to be really easy to be called, with minimal parameters. Below is a real example of a very simple dag with it:

(all code is available at https://github.com/alice-health/airflow_plugins )

So, how it works? No big secret, but there are some steps in the process.

First, we need to check the structure of the source table. We run a query and get one row from the source table and analyze the columns.

From the table, we can create build the 'create table' statement.

We used the 'unlogged table' PostgreSQL feature because it should be, in principle, faster. Unlogged tables are faster but not crash-free, so be careful! Since staging is just a copy of production, it's not a problem to lose data. You can learn more here. We didn't notice much difference but we have few gigabytes. We will benchmark again in the future as our database grows.

Everything set, the logic is quite straightforward:

  • If the destination table doesn't exist, create it
  • If the destination table exists but the structure is different, drop it and recreate it
  • If tables have the same structure, same primary keys, just keep the table

It's important to remember in the case of (re)creating the destination table we need to do a full import, so we set the flag "need_full_import" to True

Finally, transfer the data. The first step is to know if we are going to do a full import or an incremental import:

and prepare the insert statements. The syntax is PostgreSQL specific, with "ON CONFLICT DO UPDATE" to create a mixed insert/update statement:

The main loop makes the data transfer. The most important part here is the grouper function: It lazily consumes the source_cursor, keeping in memory only the data will be sent in the current block

More details of installing, using, and testing SmartTransfer are in the README file in the official repository at https://github.com/alice-health/airflow_plugins.

Conclusion

I tried to cover the most important (and interesting) parts of the code and show how we solved the problem of moving, not hundred of terabytes with 100 people, but a couple of gigabytes with two people, in a way to automatize all table synchronization and copying process to minimize the data engineer manual work.

--

--