How I Engineered my First Data Pipeline using only Open Source Software

DL
13 min readSep 15, 2022

--

scientia potentia est

Knowledge is power

Every business collects data one way or another. Information is data processed in a meaningful way to derive inferences and narratives from the data. Intelligence is when you use that information to strategise and execute.

Business Intelligence is a powerful tool every business owner should have at their fingertips.

Back when I used to run my own eCommerce brand as a solo venture in 2021, I had been managing the paid media, the finances, the fulfilment, chargeback frauds etc.

I had to digest A LOT of information, from 891,243 different platforms!

I’m a strong advocate for automation and using technology to ease our lives. So I decided to go the extra mile to reduce my admin work — by adding more technical work!

In this post, I’ll share with you how I engineered my first data pipeline using only open source software to enable business intelligence for an eCommerce brand.

Before we begin, let’s set some context. I wanted a solution that was open source, self-hostable and with minimal vendor lock-in. Scalability should be a key factor for posterity. Web UI is highly preferable unless it really isn’t possible. Deployment should be as easy as docker-compose up -d. Minimal out-of-the-box configuration to run, but with the full range of “tinkerability” when I need it. No code as much as possible, and certainly no diving deep into editing software files to enable a feature.

What’s data engineering without its challenges?

My first challenge in this journey was that I had several MariaDB databases that needed to be replicated into a Postgres data warehouse on a regular basis. The Postgres data warehouse will then serve as a database to pull for BI and analytics.

You’d ask me now, “DL, why do you not have data marts and why don’t your analytics software just pull directly from the mariadb?”

And the answer to that is: Pulling directly from an application database could significantly impact its performance as you perform queries on it. We want to ensure that our applications run smoothly while we build the BI solution on top of it. As such, we also do not have sufficient data to justify the additional complexity of a data mart.

When I first started all this, I didn’t have the confidence to write my own code, but had the instinct to apply such a pipeline from prior experience. I needed a few stable, tried and tested, scalable applications that can handle this robustly.

Drawing from Prior Experience

Previously in my career as an analyst at the Analytical CRM department of a retail bank, I’ve used an ETL tool by SAS Institute, called Enterprise Guide. It is a graphical interface that allows users to easily join different data tables and manipulate data.

So naturally, I started looking for alternatives to SAS EG. Ideally, it should be a graphical interface that aids transformation and then the flow can be saved as a service that automatically syncs data when I’m not looking.

The first prominent contender I found was Pentaho suite. Although I didn’t look too deeply into its interface, it appears that it runs as a graphical-only interface which meant that I had to manually click to run the flow, or I had to leave my graphical computer powered to manage the flow. But I needed it to run on a headless server instead. So this crossed ~~Pentaho~~ off the list for me and on to the next contender!

It’s better to have a reference on your first time than going in blind.

As my research wore on, I realised a structured platform was the best way to guide my engineering lest my pipeline explodes before it actually got to do any transformations! In all seriousness, I felt that it was much easier to build on an existing platform at that time.

So this is when I found Singer, Meltano and the new kid on the block, Airbyte.

Singer was a CLI-only option and Airbyte claims that Singer’s connectors are built as independent repositories on Github. Thus code quality can be inconsistent. Not only that, it doesn’t actually have any scheduling. It has to be custom-built with some other scheduling software.

Meltano is built as a graphical Web UI on top of Singer, with scheduling, so many points there. Their website gave me the idea that it required some configuration out of the box to run.

Airbyte aims to redefine open source data pipelines, and had some significant funding too, which is normally uncommon in the open source space. Their USP is that it is a turnkey solution. This REALLY piqued my interest.

From these options it was pretty clear which one I decided to go with, in the interest of time.

Testing Airbyte

docker-compose up -dand voila, our system was up and running in no time!

Added the connectors, configured databases etc, and everything worked as it should! … or did it??

This is where the REAL roller coaster began!

Airbyte, at that time, was still very new and the software was nowhere near “mature”. Rough around the edges did not begin to describe it at this stage.

The two connectors I used, which I believe to be some of the most basic database types out there are MySQL and Postgres.

I used the MySQL source (which supports MariaDB too, although it isn’t listed on their site or Github and took me some time to figure out by testing) to connect to the Postgres destination (our data warehouse)

I’ll get into a few technical difficulties I experienced using airbyte:

1. Schemas

From first principles, we do a

SELECT * FROM table;

to get data from a table in both Mysql and Postgres. When I did that in our source database, it works fine. When I did that in our destination Postgres, it showed a blank result. Perplexed. I searched high and low on Github and even joined their slack chat to ask about it.

It turns out, it does not save into the public schema on the destination, but instead saves data into a separate schema based on the source database’s name. To find the synced data, I had to write:

SELECT * FROM sourcedb.table;

Ok, no problem, minor inconvenience and some inexperience, I thought

2. Normalisation

Airbyte comes with the ability to “normalise” data for select destination database types. Postgres was fortunately one of the supported ones. What this does is that it changes the source data type into a suitable data type for Postgres without having to pre-set the destination table schema. This includes things like converting DATETIME (Mysql) to TIMESTAMP (Postgres) etc. or so I thought it would do this intelligently.

Here are some major errors I encountered in my data:

  • DATETIMEdid not, in fact, turn into TIMESTAMPbut turned into TEXT!
  • TINYINT(1) was assumed to be BOOLEAN , which is wrong because BOOLEAN can be expressed as TINYINT(1), but not all TINYINT(1) is BOOLEAN
  • A few fields of type INT being cast as TEXT etc.

It felt like I wasn’t able to edit this easily in the UI out of the box. It’s hidden behind many layers of code from the repository which went against the ethos of this exercise.

My only option was to raise issue tickets on Airbyte’s Github repo or speak with the dev team on Slack.

3. SQL Views

I had to create some SQL views on the Postgres database after syncing using Airbyte. The purpose of this is as a pseudo-”data mart” for the BI tools to grab data from. It also eliminates the necessity of joining data later on if the BI tool does not support it easily.

There was no problem at all doing this but after each data sync as scheduled by Airbyte, the views disappear completely.

Once again, I spoke with the friendly devs at Airbyte to look into this.

Apparently this was due to the way dbt handles dependencies for views and is out of scope for Airbyte.

I had no prior experience with how dbt works, and this was a little out of my depth.

The devs assured me that there would be a way to recreate the views with dbt more easily after their next patch, but I still didn’t quite understand how to do this at the moment.

4. Sync errors

As I set the sync frequency to every 1 hour, sometimes, the system runs into problems and does not sync as it should. Logs were sent to the dev teams but nothing can really be gleaned out of it other than “Do let us know if this happens again”. I wasn’t satisfied with how this could be used in production.

P/S — it is clear now how Airbyte is nowhere near their goal of being a turnkey solution, but I do tip my hat to the speed at which they are trying to iterate every day. As such, I needed a more robust solution that allows me to customise where I needed to.

Enter Airflow

Airflow has been around for a much longer time and is clearly quite mature at this point.

This one took me down a rabbit hole I’m glad to have gone into.

Airflow is often used or described as an ETL tool, but it is often a misconception of its objective, because Airflow is in fact a WMS (Workflow Management System), not an ETL tool.

This, however, did not stop me trying to turn it into an ETL tool, with painful lessons learnt.

Airflow comes pre-packaged with many “operators” that performs commands based on different connector hooks.

A DAG (Directed Acyclic Graph) calls different tasks in a flowchart-like structure.

Naturally, I wanted to replicate the ETL tasks in a simple 2 step task:

t1 = Extract from MariaDB t2 = Push to Postgres

Where t1 >> t2

Airflow does not do Normalisation that Airbyte does, at least not without coding in your customisations. Each destination table had to be pre-defined before the data is being passed through. Not ideal nor scalable in my situation, but it does provide a good amount of customiseability.

Now, the question is, how do I pass the data from one task to the other using Airflow?

Airflow comes with a functionality called xcom, and it enables communication between tasks, with functions like xcom_push and xcom_pull.

At first, I used this method to push thousands of rows of data to memory and then pull it in the next task.

This method, however, is NOT the intended purpose of xcom. xcom is meant to push and pull variables to easily be changed and used for different tasks.

I found this out the hard way as I kept running into memory errors using xcom as a data pipeline!

Airflow simply is not designed with data pipeline performance in mind.

Solution:

I read later on that best practices when it comes to ETL or data pipelines, is to always have a CSV (Comma Separated Values) file in between steps to allow users to troubleshoot or extract data when necessary. It also serves as a log of the data being piped. This idea gave me a lightbulb! 💡

So I redesigned the tasks as such: t1 = Extract from MariaDB as a dataframe and push to_csv() t2 = read_csv() to a dataframe and push to postgres

Wonderful solution! (For what I’m trying to achieve with this step — Little did I know of the troubles ahead).

Some keen readers may say, “You should use Singer or some pipeline package with Airflow!” And you’d be right! But what did I know back then?!

Don’t worry, the journey’s still going and is nowhere near finished!

So a quick break ☕ and hope you’re enjoying the journey so far. Here’s a few pictures of my goofy cat:

Isn’t she lovely?

More manual = more room for error

Pre-defining tables means doing everything manually and ad-hoc

Unfortunately, there were errors with encoding when I ran the above (previous — hyperlinked to previous article) DAG.

I hadn’t realised this when the MariaDB application DB’s were created. It appears that they were using latin1 encoding instead of utf8mb4.

Encoding is another rabbit hole I dove into over here.

Alas, I had no tangible way of correcting the data from latin1 to utf8mb4 as a misstep here could lead to disastrous consequences in a production database. It’s just a note-to-self for the future.

I sidestepped this issue by running a HEX(column) to convert the problem binaries into VARCHAR where I could, but alas gave up on Airflow before finishing this, as I had yet another problem with the Airflow approach.

Sync the final view or sync the source tables?

In one of the source applications, I have 5 meaningful, normalised tables, that combines to a massive one with a bunch of left joins.

Question is: Do I (1) create a view in the source MariaDB and sync the view to the destination, or do I (2) sync the 5 tables to the destination and then create a view there?

The problem with method (1) is that it creates a lot of load on the source server to simply generate the massive 130 column view — but this does minimise a lot of code required from me in the DAG.

Perhaps it would be much simpler with syncing 5 separate tables and then creating a view after.

There’s also another reason for this as I’ll explain soon.

**Incremental vs Full Refresh**

The whole purpose of syncing the application data to another database is to reduce the load on the application database when queries are being made by the BI tool.

Now, what I could achieve with Airflow at the time was querying the complete data from the source DB and doing so each time means the source DB server has to allocate resources for the SELECT query at each stage.

This defeats the purpose of keeping low loads on application server.

So I had to rethink this strategy.

I recall that, when I was testing Airbyte, there was an option to select either Full Refresh of data (and overwrite destination) OR Incremental sync (which only syncs changes made to the origin database since the last sync).

In order to do this (known as Change Data Capture, or CDC), bin_log has to be enabled on the source MariaDB.

I had no idea at this stage on how to perform this with Airflow.

Airflow as a whole really isn’t a good ETL tool, but an excellent WMS.

Holy Grail or Fail?

It was around this time when I realised that Airbyte could be used in conjunction with Airflow, since there’s an Airbyte Operator in Airflow!

Have I found *the* solution to all this madness?

My current plan here is to create a DAG as such:

t1 = Trigger Airbyte to sync data t2 = create SQL view using SQL commands

This way, I can use Airbyte’s normalisation (which has improved in a moderate way since my Airflow adventures), combined with the flexibility that Airflow offers

With Airbyte, I can take full advantage of the incremental sync without having to load the origin servers too much.

Remember the decision to sync the tables first and then create the view after? It would be impossible to perform incremental sync on views since no data is being written to the view — which means no trackable changes in bin_log. As such, syncing the original source tables incrementally would be most ideal.

And now, I can automatically recreate the view in the destination which was deleted by Airbyte after each sync, courtesy of Airflow.

Here were my initial problems with Airbyte, and here’s what it looks like with Airflow + Airbyte

1. Schemas

  • Not too big an issue — I can live with this. Plus, there’s a github issue on this and the devs are closing it soon.

2. Normalisation

  • I managed to bypass this by recasting the values as different types using an SQL view, so that the BI tools can read TIMESTAMP appropriately.

3. SQL View

  • Airflow will automatically regenerate this after each sync, so no fuss there!

4. Sync Errors

  • Airflow will make sure it syncs, and notifies me if it doesn’t.

Airflow also opens the door to a whole new world of automation as we move forward.

The end solution is near

Now that I’ve come full circle, I’ve learnt so much with practical first-hand experience, I’ve learnt that I was not as averse to code as I thought I should have been. Further chatting with the Airbyte team on Slack made me realise that dbt was quite the “meta” tool to use and so I picked it up soon after.

That’s when I realised how powerful dbt is in a production pipeline. With dbt, I built the T part of the ELT using Airbyte and my self-hosted instance of Gitea. And on realising this, it removed the need for a WMS to manage the flow since Airbyte was able to self-schedule and transform automatically with the help of dbt. Airflow was then removed from this pipeline. It’s a fantastic tool that I would end up using in other projects, but this simply wasn’t the time and place for it.

In the end, the stack of open source tools that allowed a full 360° view of the business was Airbyte + dbt, PostgresQL, Apache Superset. Honorable mentions to the countless Docker containers I deployed and removed with help from docker-compose and Portainer; as well as the tests I ran with Airflow, Singer, Meltano, Pentaho etc. Today, some, if not most, of these tools have matured since I worked on the project above in 2021 and are in a great position to be used in production. I am not affiliated with any of these services.

Today, I’ve released the dbt transforms and monster SQL’s that have gone into this project on Github. Hope it helps someone out there, feel free to reach out.

All good things come to an end

Well, having tried every solution possible, maximising the UX of it all, I have learnt so much to kickstart my skillset. I hope the solution I’ve stumbled upon is worth the effort I put into testing the systems. As far as I know, there hasn’t been any article out there of anyone attempting something like this. If you know, please do put me in touch with anyone who has done this! :)

Until then!

--

--

DL

DL is a full-stack data guy. From engineering for analytics to machine learning and deep statistical analysis of data. He’s always happy to connect with others.