Airflow vs. Prefect vs. Kestra — Which is Best for Building Advanced Data Pipelines?

Remote REST API + S3 + Remote Postgres Database — This data pipeline covers an advanced use case comparison between Airflow, Prefect, and Kestra.

Dario Radečić
Geek Culture

--

Photo by Christophe Dion on Unsplash

Apache Airflow isn’t the only data orchestration platform out there. It even might not be the best in some use cases and work environments. There are many Airflow alternatives you can use for free, such as Prefect and Kestra. Today you’ll learn all about them.

Last week’s article showed you just what goes into building two simple data pipelines with all three data orchestration platforms.

Today we’ll kick things up a notch and write a pipeline that uses some of the more advanced features. To be exact, the pipeline will download a JSON file from a remote API, upload it to an AWS S3 bucket, add a new column, and load it into a Postgres database.

It’s a lot of work, so let’s dive straight in.

AWS Configuration for S3 and Postgres Database

Before actually writing the pipeline, there’s a bit of a setup process we must go over. It’s assumed you have an AWS account (the free tier is fine).

Log into your console and create a new S3 bucket. You can reuse the existing one, it won’t matter. Just take note of the bucket name. For example, ours is called demos3bucket-dr:

Image 1 — S3 bucket on AWS (image by author)

Every data orchestration platform will need a way to access this bucket, and the name alone won’t be enough.

Go to the AWS IAM console and create a new access key under Security credentials, as shown below:

Image 2 — S3 bucket access key (image by author)

Make sure to copy the Access key and Secret access key somewhere safe.

Moving onto the Postgres database. Navigate to RDS — Databases and create a new Postgres database instance. A free-tier database is enough for what we need today. Make sure to remember the username and password credentials you’ve entered.

Overall, this is what you will see once the database is provisioned:

Image 3 — Provisioning a Postgres database in AWS RDS (image by author)

Almost there. The next step is to allow inbound traffic to the database on port 5432. Open the security group properties for the security group that’s assigned to the database, and add a new inbound rule as shown below:

Image 4 — Port 5432 inbound rule (image by author)

That’s the S3 and RDS database taken care of.

The final step is to create a new table in this database. Establish a connection through any GUI software (we’re using the free version of TablePlus) and run the following CREATE TABLE statement:

CREATE TABLE users(
id INTEGER,
name VARCHAR(128),
email VARCHAR(256),
gender VARCHAR(32),
status VARCHAR(32),
inserted_from VARCHAR(32)
);

This is what you should see when you run the statement:

Image 5 — Table creation on Postgres (image by author)

The table is needed because we’ll fetch some dummy user data from a free REST API and insert it into this table. The additional inserted_from column will allow us to monitor from which data orchestration platform the rows were inserted.

Setup — done! Let’s start writing the pipeline in Airflow first

Data Pipeline in Airflow

Writing the mentioned data pipeline in Airflow is a two-step process. First, we need to add connections for the REST API, S3, and Postgres, and then we can start writing the pipeline.

Connection Configuration

Assuming you have the Airflow webserver and scheduler running, open the homepage and navigate to Admin — Connections. Click on the blue plus icon to add a new connection.

The first connection is for the REST API. We’ll fetch some random users from the free GoRest API, so make sure to add a new HTTP connection and specify the host (don’t add the /users route):

Image 6 — Airflow REST API connection (image by author)

The following connection is responsible for communicating with our S3 bucket. You only need to specify the connection type to be Amazon S3 and then write a bit of JSON in the Extra field. This is where you’ll specify your access and secret key:

Image 7 — Airflow S3 connection (image by author)

And finally, we can add a connection for the Postgres database. There are more fields to fill out, but nothing you can’t manage. Make sure to copy the Host from the Amazon RDS page for your database, and don’t forget to specify your login credentials:

Image 8 — Airflow Postgres connection (image by author)

That’s our connections configured. This will ensure no sensitive data is hardcoded into the Python scripts.

Writing the DAG in Python

It’s likely you’ll need two additional Airflow dependencies for communicating with S3 and Postgres, so make sure to install them:

pip install 'apache-airflow[amazon]'
pip install 'apache-airflow[postgres]'

Now onto the DAG itself. It implements four tasks, so let’s go over them one by one:

  1. task_api_fetch: Uses Airflow’s SimpleHttpOperator to establish a connection to the REST API and fetch the response in JSON format. This task leverages the first connection we’ve configured.
  2. task_save_locally: A simple PythonOperator that calls the save_users_locally() function. This function pulls data from the previous task from Airflow’s Xcoms and then dumps the JSON file locally to disk.
  3. task_save_to_s3: Calls the save_users_to_s3() Python function and specifies keyword arguments for the file name on the local machine, the file name on the S3 bucket, and the S3 bucket name. This function also leverages Airflow’s S3Hook to load the file onto the bucket.
  4. task_save_to_pg: Calls the save_users_to_pg() function that reads the JSON file from disk (from task 1), converts it into a Pandas DataFrame, adds a new column to specify from which orchestration platform the data was inserted, and then finally combines Pandas and SQLalchemy to push the data to the database.

If you prefer code over text, this is everything you need to run the DAG:

import json
import pandas as pd
from datetime import datetime
from airflow.models import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from airflow.hooks.S3_hook import S3Hook
from airflow.hooks.postgres_hook import PostgresHook
from airflow.providers.postgres.operators.postgres import PostgresOperator


# Save to disk in JSON format
def save_users_locally(ti) -> None:
users = ti.xcom_pull(task_ids=["get_users"])
with open("/home/airflow/users.json", "w") as f:
json.dump(users, f)

# Save users to AWS S3
def save_users_to_s3(filename, key, bucket_name):
hook = S3Hook("s3_conn")
hook.load_file(filename, key, bucket_name)

# Save users to Postgres database
def save_users_to_pg():
with open("/home/airflow/users.json", "r") as f:
users = json.load(f)

users = pd.DataFrame(users)
users["inserted_from"] = "airflow"
hook = PostgresHook("postgres_db")
users.to_sql("users", hook.get_sqlalchemy_engine(), if_exists="append", index=False)



with DAG(
dag_id="airflow_pg_s3",
start_date=datetime(2023, 7, 17),
schedule_interval="@daily",
catchup=False
) as dag:
# 1. Get the data from a remote API
task_api_fetch = SimpleHttpOperator(
task_id="get_users",
http_conn_id="users_api",
endpoint="users/",
method="GET",
response_filter=lambda response: json.loads(response.text)
)

# 2. Save the file locally
task_save_locally = PythonOperator(
task_id="save_users_locally",
python_callable=save_users_locally
)

# 3. Push the raw file to S3
task_save_to_s3 = PythonOperator(
task_id="save_users_s3",
python_callable=save_users_to_s3,
op_kwargs={
"filename": "/home/airflow/users.json",
"key": f"users_{int(datetime.timestamp(datetime.now()))}.json",
"bucket_name": "demos3bucket-dr"
}
)

# 4. Push to a Postrgres database
task_save_to_pg = PythonOperator(
task_id="save_users_pg",
python_callable=save_users_to_pg
)


task_api_fetch >> task_save_locally >> task_save_to_s3 >> task_save_to_pg

The DAG is scheduled to run daily, but you can also trigger it manually as we did. Don’t mind the red execution bars, as it took some trial and error to get it running successfully:

Image 9 — Airflow DAG run (image by author)

As you can see, the JSON file was successfully uploaded to the S3 bucket:

Image 10 — File stored on S3 (image by author)

And as you would assume, the data was also loaded into the Postgres database:

Image 11 — Users stored in database (image by author)

That’s a full Airflow data pipeline for you. Let’s see how Prefect compares.

Data Pipeline in Prefect

We find Prefect to be a bit more Pythonic than Airflow, which means you can easily store sensitive data such as login credentials into an .env file. That’s what we’ll do first, and only then we’ll write the Python code for the data pipeline.

Environment File Configuration

There’s an additional dependency you’ll have to install if you want to store the credentials into an .env file:

pip install python-dotenv

Now create a new file exactly where you plan to have your Prefect Python script and call it .env. That’s right, no file name, just the extension.

Paste the following into the file:

API_USERS=
AWS_KEY=
AWS_SECRET=
S3_BUCKET=
PG_HOST=
PG_PORT=
PG_USER=
PG_PASS=

And, of course, populate the values with your credentials. There’s no need to surround the values with quotes.

Writing the Prefect Pipeline

Now onto the pipeline. You’ll also need one more dependency to communicate with the S3 bucket, so make sure to install it:

pip install boto3

Now onto the Prefect flow. We’ll explain it task by task, similar to the explanations we made for Airflow:

  1. get_users(): A Prefect task that’s responsible for getting a JSON response from a defined URL.
  2. save_users_locally(): Dumps the provided dictionary to a file path. This one is used to save the users JSON file to disk.
  3. save_users_to_s3(): Uses boto3 to establish a new session to AWS by providing the AWS API key and secret from the .env file. Once the session is established, it creates a new S3 client, and uploads the file to the bucket.
  4. save_users_to_pg(): Leverages the SQLalchemy module to establish a connection with the Postgres database, converts the dictionary of users to a Pandas DataFrame, adds a new column to specify from where the data was inserted, and finally appends data to the table.

These are the four actual Prefect tasks, but they have to be combined into a Prefect flow.

Think of this as a main function for the program where you essentially call one function after the other to specify in which order the tasks should run.

You’ll find the entire code snippet below:

import os
import json
import boto3
import requests
import pandas as pd
from datetime import datetime
from dotenv import load_dotenv
from sqlalchemy import create_engine
from prefect import task, flow
from prefect.deployments import Deployment

# Load environment variables file
load_dotenv()


# Get users from remote API
@task
def get_users(url: str):
req = requests.get(url=url)
res = req.json()
return res


# Save to disk
@task
def save_users_locally(users: dict, path: str):
with open(path, "w") as f:
json.dump(users, f)


# Save to an S3 bucket
@task
def save_users_to_s3(local_file_path: str, bucket_name: str, file_path: str):
session = boto3.Session(
aws_access_key_id=os.environ["AWS_KEY"],
aws_secret_access_key=os.environ["AWS_SECRET"]
)
s3_client = session.client("s3")
s3_client.upload_file(local_file_path, bucket_name, file_path)


# Save to Postgres database
@task
def save_users_to_pg(users):
engine = create_engine(f"postgresql://{os.environ['PG_USER']}:{os.environ['PG_PASS']}@{os.environ['PG_HOST']}:{os.environ['PG_PORT']}")
conn = engine.connect()

users = pd.DataFrame(users)
users["inserted_from"] = "prefect"
users.to_sql("users", conn, if_exists="append", index=False)


# Actual flow - calls individual tasks
@flow
def s3_pg_flow():
p_local_file = "users.json"
p_s3_file_name = f"users_{int(datetime.timestamp(datetime.now()))}.json"

users = get_users(os.environ["API_USERS"])
save_users_locally(users, p_local_file)
save_users_to_s3(p_local_file, os.environ["S3_BUCKET"], p_s3_file_name)
save_users_to_pg(users)



if __name__ == "__main__":
s3_pg_flow()

You can run the Prefect flow by running the following code from the Terminal:

Image 12 — Running the Prefect flow (image by author)

It looks like there are no errors in the flow run, but let’s double-check by verifying if the data was saved to S3 and Postgres.

For reference, we’ve deleted all the bucket contents before the flow execution. It looks like Prefect has managed to transfer the file to an S3 bucket:

Image 13 — Prefect flow results stored in S3 (image by author)

And it also managed to insert new rows into our table:

Image 14 — Prefect flow results stored in Postgres (image by author)

But what about deployment? Let’s cover that next.

Deploying a Prefect Pipeline

To deploy a Prefect data pipeline, you’ll need to add one more function to your Python file. We’ve named it deployment() just to be extra explicit. It will build the deployment from a Prefect flow, and then the function is called when the Python file is executed:

...

# Deplpoy the flow so you can schedule it
def deployment():
deployment = Deployment.build_from_flow(
flow=s3_pg_flow,
name="s3_pg_deployment"
)
deployment.apply()


if __name__ == "__main__":
deployment()

Run the Python file, and then immediately run the prefect server start command from the Terminal.

It will open a Prefect GUI on port 4200, from where you can click on Deployments and add a custom schedule for your Prefect flow:

Image 15 — Deployed Prefect pipeline (image by author)

And that’s it — simple but effective!

Let’s see what our final Airflow alternative — Kestra — brings to the table.

Data Pipeline in Kestra

Unlike Airflow and Prefect, Kestra takes a slightly different approach when writing data flows and pipelines. You’re writing everything in YAML, which might take some time to get used to, but on the other hand, makes the whole system more flexible.

Long story short, you can run scripts from all sorts of programming languages, and even non-technical users can help you out when building data pipelines since YAML reads easier than Python.

Let’s first write the flow and then we’ll talk about scheduling it.

Writing the Kestra Flow

Assuming you have Kestra installed, open up the web UI on port 8080, navigate to Flows, and create a new flow.

We’re working with files saved locally, so it’s a best practice to have the tasks running in a WorkingDirectory. That’s where the identically-named Kestra plugin comes into play.

Let’s go over the tasks one by one:

  1. getUsers: Runs a Python script that makes a GET request to the REST API, fetches the response as JSON, and dumps the file to disk.
  2. saveUsersLocally: Makes sure we have data saved in a users.json file, and also makes sure we can access it from other Kestra tasks.
  3. saveUsersS3: Uses the S3 upload plugin to upload the JSON file from the previous task to the S3 bucket. Make sure to populate values that are missing (specific to you).
  4. input: Ensures the input JSON file for the following task, which is saving data to the Postgres database.
  5. saveUsersPg: Runs a Python script that loads the JSON file from disk, converts it into a Pandas DataFrame, adds a new column to note from where the rows were inserted, and finally inserts the data by combining SQLalchemy and Pandas.

This is the entire YAML file you need for the Kestra flow:

id: kesta-s3-pg
namespace: dev

tasks:
- id: wdir
type: io.kestra.core.tasks.flows.WorkingDirectory
tasks:
- id: getUsers
type: io.kestra.plugin.scripts.python.Script
runner: DOCKER
docker:
image: python:3.11-slim
beforeCommands:
- pip install requests > /dev/null
warningOnStdErr: false
script: |
import json
import requests

URL = "https://gorest.co.in/public/v2/users"
req = requests.get(url=URL)
res = req.json()

with open("users.json", "w") as f:
json.dump(res, f)

- id: saveUsersLocally
type: io.kestra.core.tasks.storages.LocalFiles
outputs:
- users.json

- id: saveUsersS3
type: io.kestra.plugin.aws.s3.Upload
from: "{{outputs.saveUsersLocally.uris['users.json']}}"
key: users-kestra.json
bucket:
region:
accessKeyId:
secretKeyId:

- id: input
type: io.kestra.core.tasks.storages.LocalFiles
inputs:
data.users: "{{outputs.saveUsersLocally.uris['users.json']}}"

- id: saveUsersPg
type: io.kestra.plugin.scripts.python.Script
beforeCommands:
- pip install requests pandas psycopg2 sqlalchemy > /dev/null
warningOnStdErr: false
script: |
import json
import pandas as pd
import requests
from sqlalchemy import create_engine

with open("data.users", "r") as f:
users = json.load(f)

df_users = pd.DataFrame(users)
df_users['inserted_from'] = 'kestra'

engine = create_engine(
f"postgresql://<username>:<password>@<host>:<port>"
)

df_users.to_sql("users", engine, if_exists="append", index=False)

Kestra makes scheduling super simple, but first, let’s test the flow by running it manually. As soon as you save the file in the editor, you’ll be able to click on the New execution button (bottom right corner).

It will redirect you to a Gantt view of your flow run. Green means good and red means bad. It’s all green for us, luckily:

Image 16 — Running a Kestra flow (image by author)

As you can see from the following image, Kestra has successfully uploaded the JSON file to the S3 bucket:

Image 17 — Kestra flow results stored in S3 (image by author)

Likewise, it has successfully loaded the data into Postgres:

Image 18 — Kestra flow results stored in Postgres (image by author)

Now, if you don’t mind writing a lengthier Python task, the whole Kestra flow can be significantly simplified.

Simplifying the Kestra Flow

Unlike Airflow, Kestra is not a Python orchestrator. It prefers bigger Python jobs because it allows it to simplify dealing with IO.

For that reason, you can simplify the flow you’ve seen earlier to have only two tasks:

  1. apiToPostgres: Uses a Python script task to download the JSON file from a remote API, dump it locally to disk, convert it to Pandas DataFrame, add a new column, and push it to Postgres.
  2. s3upload: To push the locally stored JSON file to an S3 bucket

Or, in code:

id: postgresS3PythonScript
namespace: dev

tasks:
- id: apiToPostgres
type: io.kestra.plugin.scripts.python.Script
beforeCommands:
- pip install requests pandas psycopg2 sqlalchemy > /dev/null
warningOnStdErr: false
script: |
import json
import pandas as pd
import requests
from sqlalchemy import create_engine

URL = "https://gorest.co.in/public/v2/users"
req = requests.get(url=URL)
res = req.json()

with open("{{outputDir}}/users.json", "w") as f:
json.dump(res, f)

df_users = pd.DataFrame(res)
df_users["inserted_from"] = "kestra"

engine = create_engine("postgresql://<username>:<password>@<host>:<port>")

df_users.to_sql("users", engine, if_exists="append", index=False)

- id: s3upload
type: io.kestra.plugin.aws.s3.Upload
from: "{{outputs.apiToPostgres.outputFiles['users.json']}}"
key: kestra-users.json
bucket:
region:
accessKeyId:
secretKeyId:

This is the output you will see when you run the flow:

Image 19 — Simplifying the Kestra flow (image by author)

Overall, our flow now has only two tasks, is significantly shorter in code, and doesn’t pass data back and forth between small functions. In plain English — it’s more elegant.

The only thing remaining to explore is flow scheduling with Kestra. Let’s cover that next.

Scheduling a Kestra Flow

To schedule a Kestra flow, you can simply paste add the Schedule trigger at the bottom of your YAML file. Unlike the Kestra tasks, this code block shouldn’t be indented:

triggers:
- id: schedule
type: io.kestra.core.models.triggers.types.Schedule
cron: "0 0 * * *"

In plain English, this Schedule will make sure the Kestra flow runs at midnight every day.

You can see it taking action by inspecting the Triggers tab, and you’ll also see “S” displayed next to the flow run indicating that it was run by a scheduler:

Image 20 — Scheduled Kestra flow (image by author)

And that’s it for Kestra!

The question remains — is Airflow still the king or it’s time to seriously consider Airflow alternatives, such as Kestra and Prefect? Let’s answer that next.

The Verdict — Which Data Orchestration Platform is the Best for 2023 and Beyond?

Today’s article covered a somewhat realistic data pipeline — as realistic as it can be in the scope of a single article. The previous article covered two simpler pipelines, so now it’s time for a final verdict.

The data orchestration platform you choose will depend mostly on personal preference and preference of the company you’re working at.

If you like writing orchestration logic in Python, Airflow/Prefect may be good options for you. However, more lightweight alternatives not tied to Python, such as Kestra, might often be beneficial, as they allow contributions from non-Python developers and make dependency management easier.

Which data orchestration platform is your company using? Let me know in the comment section below.

Loved the article? Become a Medium member to continue learning without limits. I’ll receive a portion of your membership fee if you use the following link, with no extra cost to you.

--

--