End-to-end Twitter Data pipeline from scratch with Docker and Apache Airflow

Tabinda Bhat
TrialX Inc.
Published in
8 min readDec 18, 2022

Introduction

This post describes a simple implementation of a sample Data Engineering Pipeline to extract the data from twitter using python and transform it and deploy it on Apache Airflow with Docker and further store the data into Postgresql database running inside docker and Amazon S3, covering all the steps needed for setting up the a local working environment from scratch.

Problem Statement

Let’s imagine a scenario where we can fetch the input data from Twitter using twitter API v2.0 to enable programmatic access to Twitter, and process it using python code.

Therefore, we want our pipeline to:

  1. Fetch the data from API using twitter free API
  2. Process the data obtained using python
  3. Store the data into csv file and load that csv file to postgresql
  4. Store the csv formed into Amazon S3 bucket
Image by author

Environment Setup

To setup airflow we need to have docker compose pre-installed

The official Airflow docker compose yaml file needs to be applied some changes:

  1. Setting AIRFLOW__CORE__EXECUTOR to LocalExecutor to run it locally
  2. Remove the definitions for worker,flower as they are not needed for local executor
  3. Set the AIRFLOW__CORE__LOAD_EXAMPLES to false to delete all the pre-loaded examples
  4. Create docker volumes to store the data into postgres docker container for loading it into postgres db

4.1 ./dags:/data

The docker compose file looks like this:

version: '3'
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-extending_airflow_now}
# build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
- ./dags:/data
ports:
- 15433:5432
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", " - fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check - job-type SchedulerJob - hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check - job-type TriggererJob - hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
# yamllint disable rule:line-length
command:
- -c
- |
function ver() {
printf "%04d%04d%04d%04d" $${1//./ }
}
airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version)
airflow_version_comparable=$$(ver $${airflow_version})
min_airflow_version=2.2.0
min_airflow_version_comparable=$$(ver $${min_airflow_version})
if (( airflow_version_comparable < min_airflow_version_comparable )); then
echo
echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
echo
exit 1
fi
if [[ -z "${AIRFLOW_UID}" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
echo "If you are on Linux, you SHOULD follow the instructions below to set "
echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
echo "For other operating systems you can get rid of the warning with manually created .env file:"
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
echo
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0–9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $$4}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo
echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt - to iec $$((mem_available * one_meg)))"
echo
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
echo
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt - to iec $$((disk_available * 1024 )))"
echo
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
echo
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
# yamllint enable rule:line-length
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
_PIP_ADDITIONAL_REQUIREMENTS: ''
user: "0:0"
volumes:
- .:/sources
airflow-cli:
<<: *airflow-common
profiles:
- debug
environment:
<<: *airflow-common-env
CONNECTION_CHECK_MAX_COUNT: "0"
# Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
command:
- bash
- -c
- airflow
volumes:
postgres-db-volume:

Save and run:

docker-compose up -f docker-compose.yml up -d

Check if docker containers are running:

docker ps
docker running image

Run Airflow scheduler in the background:

airflow scheduler
airflow scheduler

Confirm that the web UI is running correctly by browsing on localhost:8080

Airflow UI

Since we will connect to PostgreSQL database and amazon S3 bucket we need to create a connection object, select admin on homepage and go to the connections from the drop down click on + icon and add PostgreSQL connection

To add PostgreSQL connection select PostgreSQL in connection type

PostgreSQL connection

To add S3 connection select Amazon Web Services from the drop down and add your AWS Access Key ID and AWS Secret Access Key

S3 connection

PIPELINE IMPLEMENTATION

In airflow, workflow is a sequence of tasks and is defined as DAG i.e directed acyclic graph

A simple DAG may look like this:

Sample dag

When a DAG run is triggered its tasks are going to be executed one after the other and each task will go through different stages from start to completion.

We can now create a python file twitter_dag.py under dags folder in our project:

from datetime import timedelta,datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from airflow.providers.postgres.operators.postgres import PostgresOperator
import psycopg2
import psycopg2.extras
from twitter_etl import run_twitter_etl, upload_to_s3, delete_from_s3



default_args = {
'owner': 'airflow',
'depends_on_past' : False,
'start_date' : datetime(2021,10,6),
'email' : ['tabu@example.com'],
'email_on_failure' : False,
'email_on_retry' : False,
'retries' : 1,
'retry_delay' : timedelta(minutes=10)
}


with DAG(
default_args=default_args,
dag_id='complete_twitter_etl',
description='My first etl code for airflow twitter',
start_date=datetime(2021, 10, 6),
schedule_interval='@daily',

) as dag:
run_etl = PythonOperator(
task_id='run_twitter_etl',
python_callable=run_twitter_etl

)

create_table_in_postgres = PostgresOperator(
task_id='create_postgres_table',
postgres_conn_id='postgres_localhost',
sql="""
create table if not exists twitter_table_data1 (
index TEXT,
tweets TEXT,
created_at date,
reply_count int,
retweet_count int,
like_count int,
quote_count int,
domain_names TEXT,
entity_names TEXT
)
"""
)


load_data_to_postgresql = BashOperator(
task_id="load_data_to_postgresql",
bash_command="python /opt/airflow/dags/load_data.py "
)

delete_data = PostgresOperator(
task_id='delete_data_from_table',
postgres_conn_id='postgres_localhost',
sql="""
delete from twitter_table_data1;
"""
)

task_delete_from_s3 = PythonOperator(
task_id='delete_from_s3',
python_callable=delete_from_s3,
op_kwargs={
'keys': 'elonmusk_tweets.csv',
'bucket_name': 'tabinda-airflow-bucket'
}
)

task_upload_to_s3 = PythonOperator(
task_id='upload_to_s3',
python_callable=upload_to_s3,
op_kwargs={
'filename': '/opt/airflow/dags/elonmusk_tweets.csv',
'key': 'elonmusk_tweets.csv',
'bucket_name': 'tabinda-airflow-bucket'
}
)

# load_to_s3
run_etl >> create_table_in_postgres >> delete_data >> load_data_to_postgresql >> task_delete_from_s3 >>task_upload_to_s3

All tasks are created from pre defined templates named operators, we used Bash Operator, Python Operator, Postgres Operator to execute some python code. The utility functions can be created in a separate file inside the dags folder where we can create functions like to run twitter ETL, upload files to S3 and store files to PostgreSQL. We now create a file twitter_etl.py .

import tweepy
import pandas as pd
import json
from datetime import datetime
from airflow.hooks.S3_hook import S3Hook
import s3fs
import os

def run_twitter_etl():
client = tweepy.Client(bearer_token='your-bearer-token-here')

# Replace with your own search query
query = 'from:elonmusk'
# query = 'from:BillGates'
# query = 'from:JamesMelville'


tweets = tweepy.Paginator(client.search_recent_tweets, query=query,
tweet_fields=['context_annotations', 'created_at','public_metrics'], max_results=100).flatten(limit=1000)

tweet_lis = []

lis = []
for tweet in tweets:
# print(tweet.text)
lis.append(tweet.text)
if len(tweet.context_annotations) > 0:
domain_name_lis = []
entity_name_lis = []

for dic in tweet.context_annotations:
for key,value in dic.items():
if key == 'domain':
try:
domain_names = value['name']
domain_name_lis.append(domain_names)
except KeyError as e:
pass
elif key == 'entity':
try:
entity_names = value['name']
entity_name_lis.append(entity_names)

except KeyError as e:
entity_description = None
pass
refined_tweets = {'tweets':tweet.text,'created_at': tweet.created_at ,'reply_count':tweet.public_metrics["reply_count"],
'retweet_count': tweet.public_metrics["retweet_count"],
'like_count': tweet.public_metrics["like_count"],
'quote_count' : tweet.public_metrics["quote_count"],
'domain_names' : domain_name_lis,
'entity_names' : entity_name_lis,}
tweet_lis.append(refined_tweets)


print(tweet_lis)


df = pd.DataFrame(tweet_lis)
df.to_csv("/opt/airflow/dags/elonmusk_tweets.csv")


def upload_to_s3(filename: str, key: str, bucket_name: str) -> None:
hook = S3Hook('aws-s3-conn')
hook.load_file(filename=filename, key=key, bucket_name=bucket_name)


def delete_from_s3(bucket_name: str,keys: str) -> None:
hook = S3Hook('aws-s3-conn')
hook.delete_objects(bucket=bucket_name, keys=keys)

However, I have used a Bash Operator to run a bash script to load data to PostgreSQL in order to implement more operators.

import psycopg2

conn = psycopg2.connect(
host="172.17.0.1",
port = "15433",
database="test",
user="user",
password="password")


cur = conn.cursor()


sql2 = '''copy public.twitter_table_data1(index,tweets,created_at,\
reply_count,retweet_count,like_count,quote_count,domain_names,entity_names)
FROM '/data/elonmusk_tweets.csv'
DELIMITER ','
CSV HEADER;'''
cur.execute(sql2)
sql3 = '''select * from twitter_table_data1;'''
cur.execute(sql3)
for i in cur.fetchall():
print(i)

conn.commit()

conn.close()
print('Connection closed')

The project folder structure looks as follows:

Project Structure

When we open our browser at port localhost:8080 our dags appear to be visible

Browser

Now we open dbeaver-ce to check if all the tweets have been inserted successfully

PostgreSQL output

Similarly we can check if the batch data is saved to S3 after fetching

S3 bucket

CONCLUSION

Apache-airflow is a popular open-source orchestrator. We can install it through airflow and configure it to run in our local environment. We also implemented the end-to-end operational twitter data pipeline, additional connections to postgresql and amazon s3 was included

REFERENCES

[1] https://airflow.apache.org/

[2]https://developer.twitter.com/en/docs/twitter-api

[3] https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html

[4] https://docs.docker.com/compose/install/

[5] https://airflow.apache.org/docs/docker-stack/build.html

[6] https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html

--

--

Tabinda Bhat
TrialX Inc.

Data science and Machine learning enthusiast. Passionate Computer Science Engineer.