From Hours to Minutes: Optimizing Data Pipelines with Airflow’s Daily Upsert

Aldo Pranata
EDTS
Published in
5 min readSep 30, 2024

A data engineer needs to able to managing large datasets efficiently, unfortunately I’m not one of them — at least, not at first. Initially, I relied on the classic “truncate-insert” method in my Airflow pipelines, which is worked fine until my dataset started to grow. Suddenly, what used to be manageable task turned into resource-hungry process that took hours to complete. There had to be a better way, which led me to discover the upsert method. In this article, I’ll share how making this switch not only saved time but also improved the overall performance and scalability of my pipeline.

It’s important to note that the upsert method isn’t suitable for every type of data or use case. Upserts are ideal for cases where data changes incrementally, allowing the addition of new records or modification of existing ones without affecting the entire dataset. For very large datasets with frequent and complex changes, or when dealing with append-only data like immutable logs, upserts can become inefficient or overly complex. In my case, the dataset consisted primarily of incremental updates to vehicle trips, where only a small portion of the data changed each day. This made it ideal for an upsert approach, as it allowed me to efficiently handle changes without reprocessing the entire table, providing significant performance gains.

Why Truncate-insert become an issue as data grows

The limitations of the truncate-insert approach became evident as my dataset grew. Every day, the pipeline would clear the entire table and reinsert all the data — whether it had changed or not. This worked perfectly fine when the dataset was small but as it expanded, the job’s runtime became longer, and system resources were constantly maxed out. I needed a way to process only new or updated data without changing the rest the way it is. That when I decided to switch to an upserts strategy — updating existing records and inserting new ones — leading to a drastic improvement in both speed and efficiency.

So, what exactly is the truncate-insert approach? it’s a simple method where before insert a new data, the entire table is wiped clean using the Truncate command, and then all data — both old and new — is reinserted from nothing. This method is perfectly fine for smaller datasets because its going to ensures a clean and consistent table. However, as datasets grow, this process becomes problematic. When you’re reprocessing data that hasn’t changed, which consumes vary unnecessary time and resources.

Switching to Upserts

To solve this issue, I needed a different approach — something that wouldn’t involve processing unchanged data. I switched to an upsert strategy. Unlike the truncate-insert method, an upsert approach leaves unchanged data intact while updating only what’s necessary and inserting new records. This means we avoid the heavy workload of reprocessing the entire dataset, significantly improving pipeline speed and reducing resource consumption. The key was to use SQL’s INSERT ... ON DUPLICATE KEY UPDATE statement (or a similar query depending on the database) to handle these changes efficiently. Here’s a simplified example of the SQL upsert approach that I used:

INSERT INTO target_table (id, column1, column2)
VALUES (%s, %s, %s)
ON DUPLICATE KEY UPDATE
column1 = VALUES(column1),
column2 = VALUES(column2);

With this SQL upsert query, existing rows with matching IDs are updated, while new records are inserted if they don’t exist. This allowed me to efficiently process only the new or modified data.

With the upsert approach in mind, I need to restructure the Airflow DAG to schedule daily updates and trigger the upsert process. Airflow’s flexibility makes it easy to manage task schedule and dependencies, which was crucial to make sure that only process the latest data. Here’s an example of how I set up the Airflow DAG for daily execution and integrated the upsert logic into it.

from airflow import DAG
from airflow.operators.mysql_operator import MySqlOperator
from datetime import datetime, timedelta

default_args = {
'owner': 'airflow',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}

with DAG(
dag_id='daily_upsert_job',
default_args=default_args,
schedule_interval='@daily', # Executes daily
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:

upsert_task = MySqlOperator(
task_id='upsert_data',
sql='sql/upsert_query.sql', # Path to SQL file containing the upsert logic
mysql_conn_id='your_mysql_connection',
autocommit=True
)

upsert_task

This DAG is designed to execute the upsert query every day. It processes the new data added since the last run, ensuring that the system is always up-to-date without the need for a resource-intensive truncate-insert operation.

Setting up the DAG to run daily was a critical part of this optimization. Scheduling it to run every day meant that only the latest data was processed — drastically reducing the workload compared to reprocessing everything each time. By doing this, I minimized redundant data operations and kept the pipeline efficient. Airflow’s built-in features allowed me to set retries and specify time intervals, making the workflow more robust to potential failures. This setup guaranteed that the data was up to date while keeping system usage low and manageable.

What I learned

Switching from the “truncate-insert” approach to using upserts in my Airflow pipeline was a game-changer, allowing me to optimize data processing significantly. By processing only new or updated data, I was able to drastically reduce runtime and resource consumption, making the job more efficient and scalable. This optimization is particularly effective in environments where data grows rapidly and daily updates are needed without reprocessing the entire dataset.

When facing performance bottlenecks in your pipeline, consider whether a different data handling approach might yield better results. In my case, switching to upserts led to massive performance gains. However, it’s crucial to assess whether an upsert strategy fits your specific use case. Upserts are ideal for scenarios involving incremental changes, but they may not be the best solution for all situations, especially with very large datasets requiring complex updates or append-only data.

References

--

--