Building a simple data stack with Docker.

Iago S. Ochôa
13 min readFeb 24, 2023

--

I’m preparing to apply to the DCA (Docker Certified Associate) exam and as a way of study, I decided to build a simple data stack using Docker. All the codes presented in this article are available on my GitHub page. So, let’s get to work.

NOTE: I used a notebook with 32GB RAM and a i7 CPU. Depending on your computer specs I encourage you to change the deploy specs on the docker-compose file.

ARCHITECTURE

We’ll use docker-compose to deploy all the containers in our data stack.
As a data source, we are going to use a MySql database. The database will be ingested with green taxi data from NYC. To perform the ingestion we are going to use python. The data is going to be extracted from MySql and inserted into S3 using Airbyte (an EL tool). S3 will serve as a data lake for future data aggregations. As a pipeline orchestrator, we will use Airflow, which offers operators for connecting and monitoring jobs in Airbyte and Spark. Finally, we will use Apache Spark to aggregate the data ingested on the data lake(S3) and store it in a Postgres instance that will be used as a data warehouse. The architecture is shown in the image below.

Data Stack Architecture

DATA SOURCE LAYER

As mentioned before, in this layer we are going to use a MySql database that is going to be ingested with data via a python script. The compose file from this part is shown below. Note that the volume is mapped, so, even if we stop the container the data and configs will be persisted.

mysql_db:
container_name: datasource-mysql-db
image: mysql:latest
restart: unless-stopped
environment:
MYSQL_DATABASE: ${MY_SQL_DATABASE}
MYSQL_USER: ${MY_SQL_USER}
MYSQL_PASSWORD: ${MY_SQL_PASSWORD}
MYSQL_ROOT_PASSWORD: ${MY_SQL_ROOT_PASSWORD}
deploy:
resources:
limits:
cpus: "0.5"
memory: "256M"
ports:
- '3306:3306'
volumes:
- ./mysql-data:/var/lib/mysql
profiles:
- datasource
networks:
- datastack

Variable values can be seen inside the .envfile. We also limit the resources available to the container. In this layer we will also deploy a graphical interface to access the DB, we are going to use adminer.

  #SGBD
adminer:
container_name: datasource-adminer
image: adminer:latest
deploy:
resources:
limits:
cpus: "0.2"
memory: "256M"
restart: always
ports:
- 8888:8080
profiles:
- datasource
networks:
- datastack

In each deployed service we have a profile, this profile is used in docker-compose to deploy groups of services separately. This way we can deploy multiple services under the same profile isolated from other services.
To deploy the data source layer we just need to run docker compose --profile datasource up -d . After the services are deployed we can access Adminer on localhost:8888 and connect to the MySql database. Use the values defined in the .env variable to access the Adminer.

NOTE: Server name on Adminer is the mysql service name on docker-compose file, mysql_db).

Adminer connected to MySql

If you access taxis database, you will note that there is no data there. We are going to solve this now.

INGESTION SCRIPT LAYER

We are going to use the following python script to ingest the data.

from dotenv import load_dotenv
import os
import pandas as pd
from sqlalchemy import create_engine
import time
import sys


def main():

load_dotenv('.env')

user = os.environ.get('MY_SQL_USER')
password = os.environ.get('MY_SQL_PASSWORD')
host = os.environ.get('MY_SQL_HOST')
port = os.environ.get('MY_SQL_PORT')
db = os.environ.get('MY_SQL_DATABASE')
table_name = os.environ.get('MY_SQL_TABLE_NAME')

time.sleep(15)

print(f'mysql+pymysql://{user}:{password}@{host}:{port}/{db}')
sys.stdout.flush()

engine = create_engine(f'mysql+pymysql://{user}:{password}@{host}:{port}/{db}')
pd.io.parquet.get_engine('auto')
print('Reading data...')
sys.stdout.flush()
df = pd.read_parquet('./data/jan-2021/green_tripdata_2022-01.parquet')
print('Inserting data...')
sys.stdout.flush()
df.to_sql(name=table_name, con=engine, if_exists='append')


if __name__ == '__main__':
main()

As we can see, the python script uses the .env file to get the environment variables and use pandas to insert the parquet data in MySql. To run this script we are going to use the following Dockerfile.

FROM python:3.9.1

RUN pip3 install pandas sqlalchemy psycopg2 python-dotenv pymysql cryptography pyarrow fastparquet

WORKDIR /app
COPY ./scripts/ingest_data.py ingest_data.py
COPY ./data data

ENTRYPOINT ["python"]
CMD ["ingest_data.py"]

In docker-compose we will use the Dockerfile in the ingestion- service.

  ingestion-python:
container_name: ingestionscript-ingestion-python
build:
dockerfile: ./Dockerfiles/ingestion.dockerfile
depends_on:
- mysql_db
profiles:
- ingestionscript
env_file:
- .env
networks:
- datastack

To ingest the data we run docker compose --profile ingestionscript up -d . The ingestion may take some time. When the ingestionscript-ingestion-python container exits, we can access Adminer to verify if the ingestion was succesful.

Ingested data on green_taxi table.

WAREHOUSE LAYER

Before we deploy the other layers, we will be deploying the warehouse layer. In this layer a postgres instance is going to be used. On the container startup we will be creating the table that will be storing the aggregated data. The following .sql script is used.

CREATE SCHEMA aggregations;

CREATE TABLE aggregations.tolls_avg_distance (
id serial PRIMARY KEY,
DOLocationID integer,
avg_distance DECIMAL(15,4)
);

The docker-compose service will be deployed mounting a volume to persist the data like we did in MySql.

  warehouse:
container_name: warehouse-warehouse
image: postgres:13
restart: always
environment:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
deploy:
resources:
limits:
cpus: "0.5"
memory: "256M"
volumes:
- ./postgres-data:/var/lib/postgresql/data
- ./configs/postgres-config:/docker-entrypoint-initdb.d
ports:
- "5432:5432"
profiles:
- warehouse
networks:
- datastack

Run docker compose --profile warehouse up -d to deploy the warehouse layer. The postgres instance can be accessed using Adminer too.

Warehouse table.

INGESTION LAYER

In this layer, we are going to use Airbyte which is an EL tool. For more details about Airbyte, the official documentation can be found here. We are also going to use the docker-compose file provided by Airbyte itself, the file will look like this.

init:
container_name: ingestion-airbyte-init
image: airbyte/init:${VERSION}
deploy:
resources:
limits:
cpus: "0.2"
memory: "128M"
command: /bin/sh -c "./scripts/create_mount_directories.sh /local_parent ${HACK_LOCAL_ROOT_PARENT} ${LOCAL_ROOT}"
environment:
- LOCAL_ROOT=${LOCAL_ROOT}
- HACK_LOCAL_ROOT_PARENT=${HACK_LOCAL_ROOT_PARENT}
volumes:
- ${HACK_LOCAL_ROOT_PARENT}:/local_parent
profiles:
- ingestion
networks:
- datastack
bootloader:
container_name: ingestion-airbyte-bootloader
image: airbyte/bootloader:${VERSION}
deploy:
resources:
limits:
cpus: "0.4"
memory: "256M"
logging: *default-logging
environment:
- AIRBYTE_VERSION=${VERSION}
- DATABASE_PASSWORD=${DATABASE_PASSWORD}
- DATABASE_URL=${DATABASE_URL}
- DATABASE_USER=${DATABASE_USER}
- LOG_LEVEL=${LOG_LEVEL}
depends_on:
init:
condition: service_completed_successfully
profiles:
- ingestion
networks:
- datastack

db:
image: airbyte/db:${VERSION}
logging: *default-logging
container_name: ingestion-airbyte-db
restart: unless-stopped
deploy:
resources:
limits:
cpus: "0.1"
memory: "256M"
environment:
- CONFIG_DATABASE_PASSWORD=${CONFIG_DATABASE_PASSWORD:-}
- CONFIG_DATABASE_URL=${CONFIG_DATABASE_URL:-}
- CONFIG_DATABASE_USER=${CONFIG_DATABASE_USER:-}
- DATABASE_PASSWORD=${DATABASE_PASSWORD}
- DATABASE_URL=${DATABASE_URL}
- DATABASE_USER=${DATABASE_USER}
- POSTGRES_PASSWORD=${DATABASE_PASSWORD}
- POSTGRES_USER=${DATABASE_USER}
volumes:
- db:/var/lib/postgresql/data
profiles:
- ingestion
networks:
- datastack

worker:
image: airbyte/worker:${VERSION}
logging: *default-logging
container_name: ingestion-airbyte-worker
restart: unless-stopped
deploy:
resources:
limits:
cpus: "2"
memory: "4GB"
environment:
- AIRBYTE_VERSION=${VERSION}
- AUTO_DISABLE_FAILING_CONNECTIONS=${AUTO_DISABLE_FAILING_CONNECTIONS}
- CONFIG_DATABASE_PASSWORD=${CONFIG_DATABASE_PASSWORD:-}
- CONFIG_DATABASE_URL=${CONFIG_DATABASE_URL:-}
- CONFIG_DATABASE_USER=${CONFIG_DATABASE_USER:-}
- CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=${CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION:-}
- CONFIG_ROOT=${CONFIG_ROOT}
- DATABASE_PASSWORD=${DATABASE_PASSWORD}
- DATABASE_URL=${DATABASE_URL}
- DATABASE_USER=${DATABASE_USER}
- DEPLOYMENT_MODE=${DEPLOYMENT_MODE}
- INTERNAL_API_HOST=${INTERNAL_API_HOST}
- JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=${JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION:-}
- JOB_MAIN_CONTAINER_CPU_LIMIT=${JOB_MAIN_CONTAINER_CPU_LIMIT}
- JOB_MAIN_CONTAINER_CPU_REQUEST=${JOB_MAIN_CONTAINER_CPU_REQUEST}
- JOB_MAIN_CONTAINER_MEMORY_LIMIT=${JOB_MAIN_CONTAINER_MEMORY_LIMIT}
- JOB_MAIN_CONTAINER_MEMORY_REQUEST=${JOB_MAIN_CONTAINER_MEMORY_REQUEST}
- LOCAL_DOCKER_MOUNT=${LOCAL_DOCKER_MOUNT}
- LOCAL_ROOT=${LOCAL_ROOT}
- LOG_LEVEL=${LOG_LEVEL}
- LOG_CONNECTOR_MESSAGES=${LOG_CONNECTOR_MESSAGES}
- MAX_CHECK_WORKERS=${MAX_CHECK_WORKERS}
- MAX_DISCOVER_WORKERS=${MAX_DISCOVER_WORKERS}
- MAX_SPEC_WORKERS=${MAX_SPEC_WORKERS}
- MAX_SYNC_WORKERS=${MAX_SYNC_WORKERS}
- MAX_NOTIFY_WORKERS=${MAX_NOTIFY_WORKERS}
- SHOULD_RUN_NOTIFY_WORKFLOW=${SHOULD_RUN_NOTIFY_WORKFLOW}
- NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT=${NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT}
- NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST=${NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST}
- NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT=${NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT}
- NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST=${NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST}
- SECRET_PERSISTENCE=${SECRET_PERSISTENCE}
- SYNC_JOB_MAX_ATTEMPTS=${SYNC_JOB_MAX_ATTEMPTS}
- SYNC_JOB_MAX_TIMEOUT_DAYS=${SYNC_JOB_MAX_TIMEOUT_DAYS}
- TEMPORAL_HOST=${TEMPORAL_HOST}
- TRACKING_STRATEGY=${TRACKING_STRATEGY}
- WEBAPP_URL=${WEBAPP_URL}
- WORKSPACE_DOCKER_MOUNT=${WORKSPACE_DOCKER_MOUNT}
- WORKSPACE_ROOT=${WORKSPACE_ROOT}
- METRIC_CLIENT=${METRIC_CLIENT}
- OTEL_COLLECTOR_ENDPOINT=${OTEL_COLLECTOR_ENDPOINT}
- JOB_ERROR_REPORTING_STRATEGY=${JOB_ERROR_REPORTING_STRATEGY}
- JOB_ERROR_REPORTING_SENTRY_DSN=${JOB_ERROR_REPORTING_SENTRY_DSN}
- ACTIVITY_MAX_ATTEMPT=${ACTIVITY_MAX_ATTEMPT}
- ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS=${ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS}
- ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS=${ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS}
- WORKFLOW_FAILURE_RESTART_DELAY_SECONDS=${WORKFLOW_FAILURE_RESTART_DELAY_SECONDS}
- AUTO_DETECT_SCHEMA=${AUTO_DETECT_SCHEMA}
- USE_STREAM_CAPABLE_STATE=${USE_STREAM_CAPABLE_STATE}
- MICRONAUT_ENVIRONMENTS=${WORKERS_MICRONAUT_ENVIRONMENTS}
- APPLY_FIELD_SELECTION=${APPLY_FIELD_SELECTION}
- FIELD_SELECTION_WORKSPACES=${FIELD_SELECTION_WORKSPACES}
- STRICT_COMPARISON_NORMALIZATION_WORKSPACES=${STRICT_COMPARISON_NORMALIZATION_WORKSPACES}
- STRICT_COMPARISON_NORMALIZATION_TAG=${STRICT_COMPARISON_NORMALIZATION_TAG}
configs:
- flags
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- workspace:${WORKSPACE_ROOT}
- ${LOCAL_ROOT}:${LOCAL_ROOT}
ports:
- "9000"
depends_on:
bootloader:
condition: service_completed_successfully
profiles:
- ingestion
networks:
- datastack

airbyte-server:
image: airbyte/server:${VERSION}
logging: *default-logging
container_name: ingestion-airbyte-server
deploy:
resources:
limits:
cpus: '0.3'
memory: "1GB"
restart: unless-stopped
environment:
- AIRBYTE_ROLE=${AIRBYTE_ROLE:-}
- AIRBYTE_VERSION=${VERSION}
- CONFIG_DATABASE_PASSWORD=${CONFIG_DATABASE_PASSWORD:-}
- CONFIG_DATABASE_URL=${CONFIG_DATABASE_URL:-}
- CONFIG_DATABASE_USER=${CONFIG_DATABASE_USER:-}
- CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=${CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION:-}
- CONFIG_ROOT=${CONFIG_ROOT}
- DATABASE_PASSWORD=${DATABASE_PASSWORD}
- DATABASE_URL=${DATABASE_URL}
- DATABASE_USER=${DATABASE_USER}
- JOB_MAIN_CONTAINER_CPU_LIMIT=${JOB_MAIN_CONTAINER_CPU_LIMIT}
- JOB_MAIN_CONTAINER_CPU_REQUEST=${JOB_MAIN_CONTAINER_CPU_REQUEST}
- JOB_MAIN_CONTAINER_MEMORY_LIMIT=${JOB_MAIN_CONTAINER_MEMORY_LIMIT}
- JOB_MAIN_CONTAINER_MEMORY_REQUEST=${JOB_MAIN_CONTAINER_MEMORY_REQUEST}
- JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=${JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION:-}
- LOG_LEVEL=${LOG_LEVEL}
- NEW_SCHEDULER=${NEW_SCHEDULER}
- SECRET_PERSISTENCE=${SECRET_PERSISTENCE}
- TEMPORAL_HOST=${TEMPORAL_HOST}
- TRACKING_STRATEGY=${TRACKING_STRATEGY}
- JOB_ERROR_REPORTING_STRATEGY=${JOB_ERROR_REPORTING_STRATEGY}
- JOB_ERROR_REPORTING_SENTRY_DSN=${JOB_ERROR_REPORTING_SENTRY_DSN}
- WEBAPP_URL=${WEBAPP_URL}
- WORKER_ENVIRONMENT=${WORKER_ENVIRONMENT}
- WORKSPACE_ROOT=${WORKSPACE_ROOT}
- GITHUB_STORE_BRANCH=${GITHUB_STORE_BRANCH}
- MICRONAUT_ENVIRONMENTS=${WORKERS_MICRONAUT_ENVIRONMENTS}
- AUTO_DETECT_SCHEMA=${AUTO_DETECT_SCHEMA}
- MAX_NOTIFY_WORKERS=5
- SHOULD_RUN_NOTIFY_WORKFLOWS=false
ports:
- "8001"
configs:
- flags
volumes:
- workspace:${WORKSPACE_ROOT}
- data:${CONFIG_ROOT}
- ${LOCAL_ROOT}:${LOCAL_ROOT}
depends_on:
bootloader:
condition: service_completed_successfully
profiles:
- ingestion
networks:
- datastack

airbyte-webapp:
image: airbyte/webapp:${VERSION}
logging: *default-logging
container_name: ingestion-airbyte-webapp
restart: unless-stopped
deploy:
resources:
limits:
cpus: "1"
memory: "1GB"
ports:
- 3569:80
environment:
- AIRBYTE_ROLE=${AIRBYTE_ROLE:-}
- AIRBYTE_VERSION=${VERSION}
- API_URL=${API_URL:-}
- CONNECTOR_BUILDER_API_URL=${CONNECTOR_BUILDER_API_URL:-}
- INTERNAL_API_HOST=${INTERNAL_API_HOST}
- CONNECTOR_BUILDER_API_HOST=${CONNECTOR_BUILDER_API_HOST}
- OPENREPLAY=${OPENREPLAY:-}
- PAPERCUPS_STORYTIME=${PAPERCUPS_STORYTIME:-}
- TRACKING_STRATEGY=${TRACKING_STRATEGY}
depends_on:
bootloader:
condition: service_completed_successfully
profiles:
- ingestion
networks:
- datastack

airbyte-temporal:
image: airbyte/temporal:${VERSION}
logging: *default-logging
container_name: ingestion-airbyte-temporal
restart: unless-stopped
deploy:
resources:
limits:
cpus: "0.5"
memory: "512M"
environment:
- DB=postgresql
- DB_PORT=${DATABASE_PORT}
- DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development.yaml
- LOG_LEVEL=${LOG_LEVEL}
- POSTGRES_PWD=${DATABASE_PASSWORD}
- POSTGRES_SEEDS=${DATABASE_HOST}
- POSTGRES_USER=${DATABASE_USER}
volumes:
- ./temporal/dynamicconfig:/etc/temporal/config/dynamicconfig
profiles:
- ingestion
networks:
- datastack

airbyte-cron:
image: airbyte/cron:${VERSION}
logging: *default-logging
container_name: ingestion-airbyte-cron
restart: unless-stopped
deploy:
resources:
limits:
cpus: "0.3"
memory: "256M"
environment:
- AIRBYTE_VERSION=${VERSION}
- CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=${CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION}
- DATABASE_PASSWORD=${DATABASE_PASSWORD}
- DATABASE_URL=${DATABASE_URL}
- DATABASE_USER=${DATABASE_USER}
- DEPLOYMENT_MODE=${DEPLOYMENT_MODE}
- LOG_LEVEL=${LOG_LEVEL}
- REMOTE_CONNECTOR_CATALOG_URL=${REMOTE_CONNECTOR_CATALOG_URL}
- TEMPORAL_HISTORY_RETENTION_IN_DAYS=${TEMPORAL_HISTORY_RETENTION_IN_DAYS}
- UPDATE_DEFINITIONS_CRON_ENABLED=${UPDATE_DEFINITIONS_CRON_ENABLED}
- WORKSPACE_ROOT=${WORKSPACE_ROOT}
- MICRONAUT_ENVIRONMENTS=${CRON_MICRONAUT_ENVIRONMENTS}
configs:
- flags
volumes:
- workspace:${WORKSPACE_ROOT}
depends_on:
bootloader:
condition: service_completed_successfully
profiles:
- ingestion
networks:
- datastack

airbyte-connector-builder-server:
image: airbyte/connector-builder-server:${VERSION}
logging: *default-logging
container_name: ingestion-airbyte-connector-builder-server
restart: unless-stopped
deploy:
resources:
limits:
cpus: "0.2"
memory: "128M"
ports:
- 80
environment:
- AIRBYTE_VERSION=${VERSION}
depends_on:
bootloader:
condition: service_completed_successfully
profiles:
- ingestion
networks:
- datastack

airbyte-proxy:
image: airbyte/proxy:${VERSION}
container_name: ingestion-airbyte-proxy
restart: unless-stopped
deploy:
resources:
limits:
cpus: "0.2"
memory: "128M"
ports:
- "8000:8000"
- "8001:8001"
- "8003:8003"
environment:
- BASIC_AUTH_USERNAME=${BASIC_AUTH_USERNAME}
- BASIC_AUTH_PASSWORD=${BASIC_AUTH_PASSWORD}
- BASIC_AUTH_PROXY_TIMEOUT=${BASIC_AUTH_PROXY_TIMEOUT}
depends_on:
- airbyte-webapp
- airbyte-server
profiles:
- ingestion
networks:
- datastack

Running docker compose --profile ingestion up -d will deploy Airbyte. This may take some time. After Airbyte is deployed, create a source connecetion with MySql.

Airbyte source connector.

After creating the source connector, let’s create the destination connector.

Airbyte destination connector.

Finally, let’s create our connection between source and destination. We are going to use the connectors created before. Our connection is going to be set to Manual (since we are going to use Airflow) and Full Load.

Airbyte connection between source and destination.

After creating the connection, DO NOT SYNC the data. We are going to deploy the Scheduling layer to do the sync automatically.

PROCESSING LAYER

Before we deploy our scheduling layer we are going to deploy Spark. To do so, we are going to use bitnami image. The compose file will look like below.

spark-master:
container_name: processing-spark-master
image: docker.io/bitnami/spark:3.3
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
ports:
- '4001:8080'
profiles:
- processing
networks:
- datastack

spark-worker:
container_name: processing-spark-worker
image: docker.io/bitnami/spark:3.3
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_WORKER_MEMORY=6G
- SPARK_WORKER_CORES=3
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
profiles:
- processing
networks:
- datastack

Note that we have one master and one worker. You can set the SPARK_WORK_MEMORY and SPARK_WORKER_CORES to a value that fits best for you. Running docker compose --profile processing up -d will deploy the processing layer. After spark is deployed we can access the Spark UI in localhost:4001.

Spark UI.

SCHEDULING LAYER

Finally, we are going to deploy the scheduling layer. We are going to use a custom Dockerfile that uses Airflow 2.5.1 as image base. We change the Dockerfile to add Airbyte and Spark packages to Airflow.

FROM apache/airflow:2.5.1
USER root
RUN apt-get update \
&& apt-get install -y git libpq-dev python3 python3-pip \
&& apt-get install -y openjdk-11-jdk \
&& apt-get autoremove -yqq --purge \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64/
RUN export JAVA_HOME

COPY Dockerfiles/requirements.txt .
USER airflow
RUN pip3 install -r requirements.txt

On our requirements.txt we have the following packages

apache-airflow
pyspark==3.3.2
apache-airflow-providers-airbyte==3.2.0
apache-airflow-providers-apache-spark==4.0.0

In our docker-compose file, we are going to use the docker-compose file available on the Airflow website but with some changes.

    postgres:
container_name: scheduling-airflow-postgres
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
deploy:
resources:
limits:
cpus: "0.40"
memory: 1200M
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
profiles:
- scheduling
networks:
- datastack

redis:
container_name: scheduling-airflow-redis
image: redis:latest
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
profiles:
- scheduling
networks:
- datastack

airflow-webserver:
<<: *airflow-common
container_name: scheduling-airflow-webserver
command: webserver
ports:
- 8080:8080
deploy:
resources:
limits:
cpus: "0.50"
memory: 1200M
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8081/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
profiles:
- scheduling
networks:
- datastack

airflow-scheduler:
<<: *airflow-common
container_name: scheduling-airflow-scheduler
command: scheduler
deploy:
resources:
limits:
cpus: "2"
memory: "6GB"
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
profiles:
- scheduling
networks:
- datastack

airflow-worker:
<<: *airflow-common
container_name: scheduling-airflow-worker
command: celery worker
deploy:
resources:
limits:
cpus: "1"
memory: 1200M
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
environment:
<<: *airflow-common-env
# Required to handle warm shutdown of the celery workers properly
# See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
DUMB_INIT_SETSID: "0"
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
profiles:
- scheduling
networks:
- datastack

airflow-triggerer:
<<: *airflow-common
container_name: scheduling-airflow-triggerer
command: triggerer
deploy:
resources:
limits:
cpus: "0.5"
memory: 512M
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
profiles:
- scheduling
networks:
- datastack

airflow-init:
<<: *airflow-common
container_name: scheduling-airflow-init
entrypoint: /bin/bash
deploy:
resources:
limits:
cpus: "0.5"
memory: 512M
# yamllint disable rule:line-length
command:
- -c
- |
function ver() {
printf "%04d%04d%04d%04d" $${1//./ }
}
airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version)
airflow_version_comparable=$$(ver $${airflow_version})
min_airflow_version=2.2.0
min_airflow_version_comparable=$$(ver $${min_airflow_version})
if (( airflow_version_comparable < min_airflow_version_comparable )); then
echo
echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
echo
exit 1
fi
if [[ -z "${AIRFLOW_UID}" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
echo "If you are on Linux, you SHOULD follow the instructions below to set "
echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
echo "For other operating systems you can get rid of the warning with manually created .env file:"
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
echo
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $$4}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo
echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
echo
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
echo
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
echo
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
echo
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
# yamllint enable rule:line-length
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
_PIP_ADDITIONAL_REQUIREMENTS: ''
user: "0:0"
volumes:
- ${AIRFLOW_PROJ_DIR:-.}:/sources
profiles:
- scheduling
networks:
- datastack

Finally, to deploy the scheduling layer we just need to run docker compose --profile scheduling up -d. The deploy may take some time. When it finishes we can access the Airflow UI on localhost:8080 using the default user and passoword (airflow/airflow).

Airflow UI.

We can notice that we have two dags. Although, before we run the dags we need to create the connections to Airbyte and Spark. To do so, we need to click on Admin > Connections. The first connection we are going to set up is Airbyte. The default user/password for Airbyte is airbyte/password.

Airbyte connection.

Then, we are going to create the spark connection. This one is simple.

Spark connection.

Now that we have our environment deployed, we can run the Airbyte DAG. The code is shown below.

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor

with DAG(dag_id='airbyte_dag_example',
default_args={'owner': 'airflow'},
schedule_interval='@daily',
start_date=days_ago(1)
) as dag:

mysql_to_s3 = AirbyteTriggerSyncOperator(
task_id='mysql_to_s3',
airbyte_conn_id='airbyte_conn',
connection_id='ff8917bd-bfd6-4a86-ab07-74acad1825a6',
asynchronous=True,
)

airbyte_sensor = AirbyteJobSensor(
task_id='airbyte_sensor_mysql',
airbyte_conn_id='airbyte_conn',
airbyte_job_id=mysql_to_s3.output
)



mysql_to_s3 >> airbyte_sensor

NOTE: In the code we have a connection_id attribute. This connection id can be retrieved on the Aitbyte UI.

Airbyte connection ID.

With all configured, now we are going to run the Airbyte DAG. On the Airflow UI just turn on the DAG and then you can keep up with the data sync on the Airbyte UI.

Airbyte sync operation.

When the sync finishes on Airbyte the DAG will be marked as success on Airflow. If you want another confirmation you can access AWS and check the S3 Bucket. Now we are going to run our sparkjob DAG with a simple aggregation. The DAG is shown below. You can note that we are calling our sparkjob using bash operator.

import airflow
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator

default_args = {
'owner': 'iago',
'start_date': datetime(2020, 11, 18),
'retries': 10,
'retry_delay': timedelta(hours=1)
}

with airflow.DAG('dag_sparkjob',
default_args=default_args,
catchup=False,
schedule_interval='0 1 * * *') as dag:

bash_sparkjob = BashOperator(
task_id='saprkjob',
bash_command="python ${AIRFLOW_HOME}/dags/sparkjob.py",
)

Our sparkjob is simple too. We are just calculating the average trip distance grouping by location id for trips that have tolls. The code is shown below.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession \
.builder \
.appName("ETL") \
.config("spark.jars.packages",
"org.apache.spark:spark-hadoop-cloud_2.12:3.3.2,postgresql:postgresql:9.1-901.jdbc4") \
.config("spark.hadoop.fs.s3.endpoint", "s3-us-east-1.amazonaws.com") \
.config("spark.hadoop.com.amazonaws.services.s3.enableV4", "true") \
.config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3FileSystem") \
.config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
.config('spark.hadoop.fs.s3a.access.key', 'youraccesskey') \
.config('spark.hadoop.fs.s3a.secret.key', 'yoursecretkey') \
.config("spark.executor.memory", "6g") \
.config("spark.executor.cores", 3) \
.master("spark://spark-master:7077") \
.getOrCreate()


df = spark.read.parquet('s3a://mysql-iago/taxis/green_taxi/', inferSchema=True)

df_agg = (
df.filter(
F.col('tolls_amount') > 0
).groupBy(
F.col('DOLocationID')
).agg(
F.avg('trip_distance').alias('avg_distance')
).orderBy('DOLocationID')
)

mode = "overwrite"
url = "jdbc:postgresql://warehouse:5432/analyst"
properties = {"user": "analyst","password": "analyst","driver": "org.postgresql.Driver"}
df_agg.write.option('truncate', 'true').jdbc(url=url, table="aggregations.tolls_avg_distance", mode='overwrite', properties=properties)

NOTE: Don’t forget to place your AWS Access Key and Secret Key on the spark config.

The DAG should run quickly. After you trigger and it completes, you can check the data on Adminer connecting to the warehouse.

Aggregated data stored on Warehouse.

And, voilà! Our stack is complete. Feel free to change the docker-compose file to test different services and settings. And also, feel free to create connections, dags and sparkjobs on the stack!

That’s it! Hope you enjoy it!

Created by Iago S. Ochôa.
Reviewed by Gabriel Bonifácio.

--

--