ETL using Python and Apache Airflow

Felix Pratamasan
5 min readAug 5, 2023

--

In this article, I will share my project using Python and Apache Airflow to create ETL for Toll Data. You can see my code on my github: https://github.com/lixx21/IBM-Data-Engineer/blob/main/ETL-and-Data-Pipelines-with-Shell-Airflow-and-Kafka/Project5/etl_toll_data.py.

We have a several tasks to do:

  1. DAG Arguments
  2. Define DAG
  3. Task Definition
  4. Task Pipeline
  5. Submit Scripts into Airflow

But first thing we need to do is to create a python file named etl_toll_data.py or whatever you want, then import all libraries that we need:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, date, timedelta

DAG ARGUMENTS

In this section, we can write this as our DAG Arguments in our python script:

#dag arguments
default_dag = {
"owner": "Felix Pratamasan",
"start_date": date.today().isoformat(),
"email": ["felixpratama242@gmail.com"],
"email_on_failure": True,
"email_on_entry": True,
"retries":1,
"retry_delay": timedelta(minutes=5)
}

This is the explanation of DAG arguments:

  1. Owner is where you can fill the owner name of this DAG.
  2. Start_date is where you define the date at which your DAG will start being scheduled.
  3. Email is where you define your email to get the notification of your DAG.
  4. email_on_failure is a boolean DAG argument to set whether to send an email when a task has failed.
  5. email_on_entry is an email notification is sent when a task or job is about to be executed or entered into the workflow
  6. retries is the number times it should keep trying if it is failing: here only one if it does fail
  7. retry_delay is time to wait between subsequent tries

DAG DEFINITION

The next step, is to define the DAG or instantiation. It need parameters to work like this:

#define dag
dag = DAG('ETL_toll_data',
schedule= timedelta(days=1),
default_args= default_dag,
description="Apache Airflow Final Assignment"
)

The explanation DAG definition parameters:

  1. DAG Id this is used to define ID for our DAG, in this example my DAG ID is ETL_toll_data,
  2. Schedule is to define when the DAG will run, in this case the DAG will run repeatedly on a schedule interval of one day once it is deployed,
  3. default_args this is parameter where you pass the DAG ARGUMENTS that you have created before
  4. description is the description of your DAG

TASK DEFINITION

In this part, there are several tasks to do for our DAG or our Data Pipelines:

  1. Unzip Toll Data

In this task we will unzip the toll data using bash command tar, this command will help us to unzip tolldata.tgz file.

#task to unzip data
unzip_data = BashOperator(
task_id= "unzip_data",
bash_command = "tar -xvzf {file_path}/tolldata.tgz",
dag = dag
)

2. Extract Data from CSV

In this task we will use cut bash command to extract data from vehicle-data.csv and save it to another file named csv_data.csv, because CSV is using comma (,) delimiter, therefore the bash command will be like this:

# task to extract_data_from_csv
extract_data_from_csv = BashOperator(
task_id = "extract_data_from_csv",
bash_command = "cut -d, -f1,2,3,4 {file_path}/vehicle-data.csv > {file_path}/csv_data.csv", # -d for delimiter
dag = dag
)

-d, means that the delimiter is comma, and -f 1,2,3,4 means that we will get data from field 1, 2, 3 and 4.

3. Extract Data from TSV

Same as before, we will use cut bash command to extract data from TSV file. We will extract data from tollplaza-data.tsv and save it into tsv_data.csv. But, In this case we use delimiter TAB with -d$’\t’. Because TSV is using TAB as delimiter.

#task to extract data from tsv
extract_data_from_tsv = BashOperator(
task_id= "extract_data_from_tsv",
bash_command = "cut -d$'\t' -f 5,6,7 {file_path}/tollplaza-data.tsv > {file_path}/tsv_data.csv", # -d$'\t' for delimiter tab
dag = dag
)

4. Extract Data from Fixed Width

In this task, We will extract data from payment-data.txt to fixed_width_data.csv. But we will use -c is to select using a specified character, a character set, or a character range.. Therefore -c 59–62 means that we get the character range from 59–62.

# task to extract_data_from_fixed_width
extract_data_from_fixed_width = BashOperator(
task_id = "extract_data_from_fixed_width",
bash_command = "cut -c 59-62,63-67 {file_path}/payment-data.txt > {file_path}/fixed_width_data.csv", # -c for --characters=LIST
dag = dag
)

5. Consolidate Data

In this task, we will merge several files that we have created before which are csv_data.csv, tsv_data.csv, and fixed_width_data.csv into extracted_data.csv. We will use Paste bash command to do this:

csv_data = "{file_path}/csv_data.csv"
tsv_data = "{file_path}/tsv_data.csv"
fixed_width_data = "{file_path}/fixed_width_data.csv"
extracted_data = "{file_path}/extracted_data.csv"
# task to consolidate_data
consolidate_data = BashOperator(
task_id = "consolidate_data",
bash_command = f"paste {csv_data} {tsv_data} {fixed_width_data} > {extracted_data}", # paste for merge files
dag = dag
)

6. Transform Data

This is the final task, in this transform data we will use awk bash command. We will transform the specific column to uppercase. We will transform the extracted_data.csv into transformed_data.csv.

# task to Transform and load the data
transform_data = BashOperator(
task_id = "transform_data",
bash_command = "awk 'BEGIN {FS=OFS=\",\"} { $4= toupper($4) } 1' {file_path}/extracted_data.csv > {file_path}/transformed_data.csv",
dag = dag
)

FS=OFS=\”,\”: Sets the input and output field separator to a comma (,), assuming your CSV is comma-separated. $4= toupper($4) Modifies the fourth field ($4) to its uppercase version using the toupper function. And 1 means as a common awk pattern that evaluates to true and triggers the default action, which is to print the modified line.

TASK PIPELINES

This is the final thing that we will do before we submit our python script to our Airflow. We can define this task pipelines with this code:

# Define task pipelines
unzip_data >> extract_data_from_csv >> extract_data_from_tsv >> extract_data_from_fixed_width \
>> consolidate_data >> transform_data

double greater than notation specifies that task two is downstream from task one.

SUBMIT SCRIPT INTO AIRFLOW

We can submit our script into Airflow just as simple as copy the scripts into our DAG directory. You can see the DAG directory path in airflow.cfg. After you submit the file, you can run the your Airflow Webserver using this command:

airflow webserver

Then you can see your DAG in your Airflow Webserver like this:

Then you can run your DAG on that Airflow Webserver UI.

THEN FINALLY 🎉🎉: all the steps to build ETL using Python and Airflow is success !!! 👍👨🏻‍💻👏🏻.

If you have any question and want to keep contact with me you can reach me on LinkedIn: https://www.linkedin.com/in/felix-pratamasan/

--

--

Felix Pratamasan

Hi everyone 🙋🏻‍♂️, I am a software engineer who hungry to learn👨🏻‍💻 and I am from Indonesia. Hope my article could help you learn about tech 🚀.