Optimizing an I/O Intensive Data Pipeline

Nim Wijetunga
Super.com
Published in
7 min readJan 8, 2020

As an intern on the Data & Infrastructure team at SnapTravel, I had the opportunity to work on several interesting projects, ranging from building ELTs using Stitch and DBT, to using Terraform for creating metrics dashboards on Datadog. However, the most interesting project I worked on (and quite frankly the one that took the longest) is what I’ll be talking about in this article.

To give you some context, at SnapTravel, we use Airflow for orchestrating tasks that ingest data from external sources and then load this data into our various AWS Cloud Services (S3, RDS, Elasticache, etc..). I’m going to assume you know a bit about Airflow for the rest of this article (if not this is a great introduction). One of our largest (in terms of Depth) and longest-running DAGs on Airflow is the infamous Catalog DAG, taking over 15 days from start to finish!

Airflow DAG Visualisation of Catalog

Think about Catalog as the child you never wanted. You had a good time (hopefully?) while you were making it, but 2–3 years down the road you’re questioning your decisions!

In essence, this DAG (which is an ETL -> Extract, Transform, Load) is responsible for building our inventory which consists of hotel metadata from our 3 data providers (Provider 1 -> P1, Provider 2 -> P2 and Provider 3 -> P3). Our initial stack consisted of:

  • Python: Call Provider APIs and transform the data as needed (Used for the Extract and Transform steps)
  • Sqlalchemy: Load ingested data into our Postgres Application Databases (Used for the Load Step)
  • Airflow: Orchestrate the entire data pipeline (fetching data -> loading into Postgres)

The major bottleneck for this DAG was fetching data from P1 and P2 which took 4–5d and 5–7d respectively.

Major Bottlenecks in the Catalog Pipeline

There were several issues contributing to this long runtime which included (but were not necessarily limited to):

Overhead of Multithreading

  • Creating and destroying thread pools for 1–2 API requests

IO Bottlenecks

  • API Rate Limits, Large number of API requests (4M+ API calls!), Single row upserts to our Postgres DB

Memory Bottlenecks

  • Chunking to avoid loading large amounts of data in memory

Just Make it a Spark Job!

Whenever you search up potential solutions for an ETL task that runs really slowly, the answer always seems to be: “Run that job on a Spark cluster!”. While I agree that Spark is a really efficient data processing framework, it did not fit our needs at the time for the following reasons:

Our main bottleneck was waiting on IO calls rather than the actual processing/transformation of data. While Spark can help with parallelizing network calls, it’s primary purpose is to enable map-reduce operations across several nodes in a cluster. In other words, Spark was built mainly for the T rather than the EL in ETL.

The second, more profound reason was a simple one, limited QPS (Queries Per Second). Many of the endpoints we were using had tight QPS limits, so using Spark to parallelize API requests would only marginally improve our IO bottlenecks, compared to a more straightforward multi-threading solution.

Further, the infrastructure costs and technical complexity (in terms of $$ and engineer-hours) required to run a Spark cluster made the choice of using Spark much less desirable.

AsyncIO to the Rescue

To solve our IO network bottlenecks, we decided to utilize Python’s asynchronous framework: AsyncIO. For those not familiar with concurrency using Python (i.e Multithreading vs AsyncIO vs Multiprocessing), I found this article really helpful.

The backbone of Asyncio is its event loop which keeps track of all the tasks currently running. Once a certain task is blocked waiting on an operation such as a network call, the event loop diverts its resources to run the next ready task.

AsyncIO Network Flow

Using AsyncIO helped improve our data retrieval times by over 70%. Although this improvement was a large step forward, we had really only tackled the ET in ETL, and boy were we in for a really big L!

Data Staging Layer

In our original Catalog DAG, we would write directly to Postgres, however, there were a few issues with this:

  • CRUD Operations in Postgres were slow, especially when updating + committing the same row several times
  • Failure to write to Postgres meant needing to retry several network calls (to re-fetch data), increasing DAG runtime
  • In the event of bad data, it became harder to track whether it was due to our transformations or the raw data itself (provided by 3rd Party APIs)

In order to solve these problems we needed an intermediary database that could:

  1. Store large amounts of JSON data (API response payloads could vary making our raw data fairly unstructured)
  2. Have a high (preferably parallel) write throughput
  3. Relatively fast read throughput
  4. Fault Tolerance when reading data from the database

Enter DynamoDB

After some discussions, we decided to move forward with DynamoDB, Amazon’s managed NoSQL database. Before committing to Dynamo, we read up on its initial implementation to see if it would fit our use cases. Here’s how it fared, given the requirements above:

  1. Storing Unstructured Data: Dynamo is a NoSQL database so this one is a pretty obvious fit
  2. Write Throughput: Dynamo utilizes Eventually Consistent writes, meaning that write acknowledgment is sent without data being replicated across all worker nodes, ensuring a high write throughput
  3. Read Throughput: Dynamo uses a single primary key (hash key) allowing for very fast data retrieval if queried on that key (which was the case for us as we always queried on an id)
  4. Fault Tolerance: Dynamo utilizes Consistent Hashing to replicate data across several nodes, allowing for efficient failure recovery (ensuring high availability)
New Data Ingestion Flow

Adding the extra layer of DynamoDB helped us in the following ways:

  • The source of bad data could be identified more easily (i.e bad raw data or bad transformations)
  • We no longer had to update Postgres multiple times, now all these CRUD operations would occur in Dynamo before final ingestion into Postgres

While the end was near, we still had to find a way to speed up our final data ingestion into Postgres

Enter Asyncpg

Since AsyncIO worked well for handling our network calls, we wanted to see if using an Asynchronous Postgres client would help speed up our reads and writes. There were several Async Postgres clients available for Python (aiopg, asyncpg, etc..) and this article compares the most popular ones. We decided to go with asyncpg as it seemed to have the highest read/write throughput out of all the available options.

Read Throughputs of Different Asynchronous Postgres Clients

For an ORM (Object Relational Mapping) we used Gino, which is built on top of SQLAlchemy Core and supports the asyncpg dialect. Gino has good community support and is used by companies such as Grab so we felt confident introducing it to our stack.

Using Asyncpg helped speed up Postgres writes by over 50%, especially for repeated update operations.

Our Hotels DB Model using Asyncpg & Gino

Impact of These Changes

Finally, after a long 1.5 months of re-architecting, the Catalog DAG had become a much faster and more reliable data pipeline.

DAG Runtime Comparison for Processing Hotel Data from our Suppliers (Provider 1 and 2)

Developer Iteration Speed

Some of our team's major Q4 initiatives revolved around the data catalog, one of which included introducing a new hotel data provider. Through trial and error, it was discovered that a developer needs to process around 20,000 hotels before discovering any subtle errors in their logic (or bad data). The time comparison for this task (processing 20,000 hotels) between our old and new Catalog implementation is shown below.

# Hours Required to Process 20,000 Hotels

Decoupling the ETL Process

Since we have decoupled the E, T, and L into their own Airflow tasks, failures are less likely to occur due to reduced complexity. Additionally, any modifications needed for a single step (i.e loading to/from a different data source) becomes much easier to handle, due to related tasks having low cohesion.

Data Integrity

With our previous catalog implementation, any bad data could only be fixed with the next catalog run (monthly), as we needed to make the same slow API calls. With our layer of DynamoDB and Asyncio, not only could we identify the root cause of bad data, but additionally, rerun Postgres hotel ingestion without needing to make any repeated API calls.

What’s Next?

With the IO and Postgres bottlenecks addressed, there remain some low hanging fruits such as moving all tasks (within the Catalog DAG) that are reading from/writing to Postgres, to use asyncpg. Additionally, streaming data between tasks using a Queue (i.e SQS) can help parallelize certain portions of the DAG, reducing runtimes drastically.

Regardless of what’s next, it’s always important to remember these three things:

  1. Don’t be afraid to use open source software, it’s there for a reason
  2. Before diving into new tools always read up on how they can benefit your needs
  3. Remember the big picture

“Premature optimisation is the root of all evil!”

Thanks to Nehil Jain for giving me the opportunity to work on such an interesting project, and for helping me write this article!

--

--

Nim Wijetunga
Super.com

Software Engineering Student @ The University of Waterloo.