Building a Cost-Effective Data Pipeline with Airflow, Power BI, and DuckDB

Mariusz Kujawski
10 min readAug 21, 2023

--

Introduction

In this post, I will demonstrate how to create a cost-effective and user-friendly data pipeline for analyzing Polish flat prices in relation to salaries, M1 indicator, USD, and gold prices. The data will be imported from the National Polish Bank website.

My primary objective is to leverage accessible tools. While I appreciate Apache Spark for its versatility in combining Python and SQL, it may not be the most cost-effective choice for this particular project. Instead, I will introduce an excellent alternative called DuckDB, which allows us to read data from various sources like Pandas data frames, parquet, and CSV files. DuckDB empowers us to perform data analysis and manipulation using SQL without the need for a server with an SQL engine. In this post, I’ll demonstrate how to utilize DuckDB with Python for common activities such as data transformation and cleansing, finally enabling you to populate a data model. To explore DuckDB further, visit this link: https://duckdb.org/

The code provided in the repository accompanying this post supports data processing on a local machine and integrates with Airflow, GCP storage, and BigQuery for more extensive projects. For data visualization, I’ll be using Power BI, a powerful tool that offers intuitive and insightful data representations.

By following the steps outlined in this post, you’ll be able to build an efficient data pipeline without incurring unnecessary expenses while benefiting from the ease of use and flexibility provided by DuckDB. Happy data engineering!

The data workflow will have the flowing structure:

Airflow DAG:

The code can be found in my repository.

Environment Configuration

The most straightforward method to establish an Airflow orchestrator environment is by utilizing GCP Composer. However, considering costs, GCP Composer proves to be rather expensive for my specific case. Since my data processing requirements involve only a brief span of time and don’t necessitate a high-powered machine, opting for a Virtual Machine (VM) offers a more economical solution.

Here are the essential steps that need to be executed:

  1. Create a Virtual Machine and install Airflow(deployment script in my repo).
  2. Establish a storage account.
  3. Set up a BigQuery dataset.
  4. Generate a Service Account and its corresponding key.

In this section, I’ll solely concentrate on detailing the process of configuring a VM with Airflow using docker-compose. I’ll assume that readers are acquainted with the remaining steps or can easily locate instructions within the GCP documentation.

To initiate the VM setup, navigate to the Compute Engine section in the GCP Console and select “VM instance.” For my configuration, I selected Ubuntu 22.0 as the operating system and a machine equipped with 16 GB of RAM. To ensure accessibility to the Airflow web server, I enabled HTTP traffic.

This approach allows me to effectively create a VM tailored to my data processing needs, while also considering cost-efficiency as a pivotal factor.

After setting up the VM, I connected to it using SSH to install docker-compose and configure my Airflow.

To configure Airflow automatically, you need to clone my repository using ‘git clone’ and then execute the ‘deploy.sh’ file.

git clone https://github.com/MariuszKu/analiza-duckdb-mieszkania.git
cd analiza-duckdb-mieszkania
chmod +x deploy.sh
./deploy.sh

deploy.sh:

#!/bin/bash

sudo apt-get update
sudo apt-get install ca-certificates curl gnupg

sudo install -m 0755 -d /etc/apt/keyrings
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg
sudo chmod a+r /etc/apt/keyrings/docker.gpg

echo \
"deb [arch="$(dpkg --print-architecture)" signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.co>
"$(. /etc/os-release && echo "$VERSION_CODENAME")" stable" | \
sudo tee /etc/apt/sources.list.d/docker.list > /dev/null

sudo apt-get update

sudo apt-get install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin
cd ../
mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOW_UID=$(id -u)" > .env
cp analiza-duckdb-mieszkania/*.* dags/
cp -r analiza-duckdb-mieszkania/data dags/
cp analiza-duckdb-mieszkania/docker-compose.yaml .
cp analiza-duckdb-mieszkania/Dockerfile .
cp analiza-duckdb-mieszkania/requirements.txt .
sudo docker compose up

The scripts attached to the project contain configuration for Python 3.10 and install required libraries on the docker. I chose Python 3.10 because it supports typing which I use in my Python scripts. There is the possibility to mount the dags folder from a bucket, but it’s not included in this project.

Airflow job execution

Once Airflow is operational, you can access it by visiting the external address of your VM.

Note: Check if your address is “http:// instead of https://”

The username and password are both ‘airflow’. Once logged in, you’ll encounter a list of DAGs. To execute a DAG, simply click the play button located on the right side of the respective row.

Following a successful job execution, new files should appear in the airflow/dags/data location.

GCP Configuration

If your intention is to collaborate with GCP storage and BigQuery, you’ll need to create a service account and upload the key file to dags/key.json and make the necessary adjustment to the ‘Link’ variable in the env.py file.

Once the job is re-executed, you should observe new files in your GCP bucket. From there, proceed to BigQuery where you can create a dataset and establish a table using the command provided below. The command will create an external table, which means BigQuery will read data from the files located in a bucket. You need to replace “bucket” with your bucket name.

CREATE OR REPLACE EXTERNAL TABLE mk.flats_report 
OPTIONS(
format = "PARQUET",
uris = ['gs://bucket/data/report.parquet']
);

The result of queering the new table should look like this:

Power BI Reports

Power BI, a robust business analytics service developed by Microsoft, empowers users to visualize and analyze data from diverse sources. It boasts the ability to connect with a wide range of data sources, such as Excel workbooks, SQL databases, and BigQuery. The platform offers an extensive array of visualization options, encompassing charts, graphs, maps, tables, and other interactive visual elements. To explore its capabilities, you can download and utilize Power BI Desktop for testing purposes.

When it comes to importing data from a source, simply click the ‘Get Data’ button, then select ‘More.’ Within this menu, you’ll discover the BigQuery connection, enabling seamless integration of your data.

I won’t be covering how to create a report in Power BI in this post, as there are numerous tutorials available on the internet for that purpose. Below, you’ll find several data visualizations that I’ve created based on the imported data from the project. These visualizations depict flat prices in relation to salaries, prices in USD and gold, as well as the M1 indicator.

Here is a visualization of flat prices in Warsaw in relation to USD and Gold prices:

Exploring the correlation between flat prices in Warsaw and the M1 indicator (Volume of money):

Flat prices in Warsaw in relation to salaries:

Flat prices in Warsaw month-to-month percent growth:

Data import using Python

Within the project, various methods are employed to retrieve data from HTTP APIs using a request library, subsequently saving it to the local file system or a GCP bucket. An example of downloading historical gold prices is presented below.

def import_gold_prices(date: tuple) -> list[(str, str)]:
"""
This function returns an array of date and gold price.

Args:
date: tuple: date range.

Returns:
list[str, str]: list of prices.
"""
url = f"https://api.nbp.pl/api/cenyzlota/{date[0]}/{date[1]}"
response = requests.get(url)
response.raise_for_status()
data = json.loads(response.text)
gold_prices = []

for cena_zlota in data:
date = cena_zlota["data"]
price = cena_zlota["cena"]
gold_prices.append((date, price))

return gold_prices

Data transformation with Pandas and DuckDB

DuckDB smoothly integrates with popular Python libraries like Pandas and Polars. As illustrated in the code below, it’s feasible to load data from CSV and XLSX files situated either on the local file system or within cloud storage. Through Pandas, data can be refined by adjusting it to the necessary data types, incorporating new columns, unpivoting, and even substituting Roman numerals like III and IV with dates (as demonstrated by the ‘convert_to_last_day_of_quarter’ function). Due to the fact that NBP only releases data on working days, I encountered the need to fill gaps to ensure data availability at the month’s end. To accomplish this, I employed DuckDB in conjunction with an SQL statement that populates empty dates using values from the last non-empty day.


def clean_flats():
flats = pd.read_excel(f"{env.LINK}flat_prices.xlsx", header=6, sheet_name="Rynek pierwotny")
cities = ['Białystok','Bydgoszcz','Gdańsk','Gdynia','Katowice','Kielce','Kraków','Lublin','Łódź','Olsztyn','Opole',
'Poznań','Rzeszów','Szczecin','Warszawa','Wrocław','Zielona Góra']
flats_unpivot = pd.melt(flats, id_vars='Kwartał', value_vars=cities)
flats_unpivot['date'] = flats_unpivot.apply(lambda row: convert_to_last_day_of_quarter(row['Kwartał']),axis=1)
flats_unpivot['date'] = pd.to_datetime(flats_unpivot['date'])
flats_unpivot['city'] = flats_unpivot['variable']

flats_unpivot.to_parquet(f"{env.LINK}flats_price.parquet")

def clean_currency():

gold = pd.read_csv(f"{env.LINK}gold.csv", names=["date","price"], header=None)
gold2013 = pd.read_csv(f"{env.LINK}gold_2006_2012.csv", names=["date","price"], header=None)
gold['date'] = pd.to_datetime(gold['date'])
gold2013['date'] = pd.to_datetime(gold2013['date'])
gold['currency'] = 'gold'
gold2013['currency'] = 'gold'
calendar = pd.DataFrame(generate_days_in_years(2006,2023), columns=["date","last_date"])
calendar['date'] = pd.to_datetime(calendar['date'])
usd = pd.read_csv(f"{env.LINK}usd.csv", names=["date","price"], header=None)
usd['date'] = pd.to_datetime(usd['date'])
usd['currency'] = 'usd'
currency = pd.concat([gold, gold2013, usd], ignore_index=True, sort=False)
# fill gups
usd = duckdb.sql("""

select
date,
price,
currency,
from
(
select
row_number() over (partition by currency, a.date order by b.date desc) lp,
a.date,
b.date org_date,
b.price,
currency,
from
calendar a left join currency b on b.date between a.date - INTERVAL 3 DAY and a.date

)
WHERE
lp = 1
order by date
""").to_df()

usd.to_parquet(f"{env.LINK}currency.parquet")

Once my data is set up and ready for analysis, I can efficiently query it using DuckDB. While I could directly read parquet files, I chose to work with Pandas data frames due to their versatility in reading data from both local file systems and cloud storage. Pandas utilizes the gccfs library in the background, enabling seamless data reading and writing to buckets. This approach also provides the advantage of connecting to Azure Account Storage or AWS S3.

In the following example, an SQL statement is employed to join tables, filter data related to Warsaw city, and facilitate column calculations.

def report():
calendar = pd.DataFrame(generate_days_in_years(2006,2023), columns=["date","last_date"])
flats_price = pd.read_parquet(f"{env.LINK}flats_price.parquet")
currency = pd.read_parquet(f"{env.LINK}currency.parquet")
salary = pd.read_parquet(f"{env.LINK}salary.parquet")
m1 = pd.read_parquet(f"{env.LINK}m1.parquet")

df_data = duckdb.sql("""
select
a.date,
a.value flat_price,
b.price gold,
b.price*31 ounce,
c.price usd,
a.value / c.price flat_price_usd,
a.value / (b.price * 31) flat_price_gold,
d.salary,
a.value / cast(d.salary as Double) salaries_m2,
(a.value - lag(a.value) over (order by a.date))/lag(a.value) over (order by a.date) m2mgrowth,
m1.value m1

from
flats_price a
left join currency b on a.date = b.date and b.currency = 'gold'
left join currency c on a.date = c.date and c.currency = 'usd'
left join salary d on a.date = d.date
left join m1 m1 on a.date = m1.date
where
city = 'Warszawa'
order by a.date
""").to_df()

#df_data.to_csv("data.csv", encoding='utf-8', index=False)
df_data.to_parquet(f"{env.LINK}falts_report.parquet")

To automate my processes, I utilize Airflow hosted on a Virtual Machine. Presented below is the DAG responsible for fetching data from sources, performing data cleansing, storing it, and generating a report consumed by Power BI. Airflow offers the flexibility to schedule our tasks for daily or monthly data refreshes. Additionally, we can incorporate error handling and a retry mechanism to prevent process interruptions stemming from network connection issues.

# Creating an Environmental Variable for the service key configuration
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/opt/airflow/dags/key.json'

default_args = {
"start_date": datetime.datetime.today(),
"schedule_interval": "0 0 * * *", # Run every day at midnight
}

with DAG(dag_id="NBP-flats", default_args=default_args, catchup=False) as dag:

@task
def import_currency():
save_usd_df()

@task
def import_flat_data():
import_flat_price()

@task
def clean_data_flats():
clean_flats()

@task
def clean_data_salary():
clean_salary()

@task
def clean_data_m1():
clean_m1()

@task
def clean_data_currency():
clean_currency()

@task_group
def clean():
clean_data_flats()
clean_data_salary()
clean_data_m1()
clean_data_currency()

@task
def create_report():
report()

# Dependencies
import_currency() >> import_flat_data() >> clean() >> create_report()

Summary

By examining these examples, it becomes evident that by utilizing cost-effective tools, we can proficiently import data, conduct transformations, and generate comprehensive reports to fulfill our analytical requirements. DuckDB and Pandas emerge as excellent substitutes for Apache Spark, particularly when dealing with datasets that are not exceedingly large. This platform empowers us to amalgamate data from diverse sources like CSV, XLS, HTTP, Parquet, and DataFrames using SQL, thereby enabling the creation of customized ETL processes and analyses. This approach offers remarkable flexibility in working with data, all without the need for extravagant platforms.

While this post primarily showcases GCP as a cloud provider, it’s important to note that this solution can seamlessly integrate with Azure or AWS as well. If you’re interested in discussing your specific use case involving Airflow, GCP, Azure, AWS, or DuckDB, please feel free to reach out to me on LinkedIn. I’m more than happy to engage in insightful conversations about your unique requirements and how these tools can be tailored to suit them.

Sources:

--

--