Data Engineering End-to-End Project — Part 1 — Airflow, Kafka, Cassandra, MongoDB, Docker, EmailOperator, SlackWebhookOperator

Dogukan Ulu
Apache Airflow
Published in
10 min readSep 29, 2023

Tech Stack

  • Apache Airflow
  • Apache Kafka
  • Cassandra
  • MongoDB
  • Docker
  • Apache Zookeeper
  • EmailOperator
  • SlackWebhookOperator

Overview

In this article, we are going to create a data pipeline. The whole pipeline will be orchestrated by Airflow. We are going to first create a Kafka topic if it does not exist. After creating it, we will produce messages that include e-mail and OTP (one-time password) as records. This part will illustrate streaming data coming to the Kafka topic.

While the data is being produced into the Kafka topic, we are going to consume it as well. We will obtain the data from the Kafka topic and will insert them both into the Cassandra table and MongoDB collection. We are going to check if the correct data exists in those. If so, we will send an e-mail to the incoming e-mail address and a Slack message including the e-mail address and OTP.

We can think of this project as a real-life e-mail validation. Let’s say there are streaming records that come to the Kafka topic including e-mail and OTP data. We will illustrate that part with the Kafka producer. Kafka consumer and data check parts will help us detect if the e-mail and OTP already exist or not.

Services as Docker Containers

The first thing we have to do is run all the services as Docker containers. For this, we will use the official Airflow image. We have to first install all the necessary libraries and packages into the Airflow container. For that, we have to create a Dockerfile.

# Use the Apache Airflow 2.7.1 image as the base image
FROM apache/airflow:2.7.1

# Switch to the "airflow" user
USER airflow

# Install pip
RUN curl -O 'https://bootstrap.pypa.io/get-pip.py' && \
python3 get-pip.py --user

# Install libraries from requirements.txt
COPY requirements.txt /requirements.txt
RUN pip install --user -r /requirements.txt

This Dockerfile will be used to install the pip command first. Then, it will install all the necessary libraries in the requirements.txt file so that we won’t get an import error in the future. We will use this Dockerfile to build a container install-requirements.

After creating the Dockerfile, we can follow the instructions that this link provides us. After completing all the instructions, we will have a docker-compose.yaml file locally.
After obtaining the docker-compose file, we are going to modify that before starting the services. The first thing we have to do is add the following container under the services section.

install-requirements:
<<: *airflow-common
container_name: install-requirements
build:
context: .
volumes:
- ./requirements.txt:/requirements.txt
depends_on:
- postgres
- redis
networks:
- cassandra-kafka

This will be the container that will install all the necessary dependencies inside the Airflow container. After adding that under the services section, we should add the below parameters under the x-airflow-common section.

#This will set the reload time to 30 seconds instead of the default 5 minutes.
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
# Below will help us use EmailOperator properly. I will explain this in the second part.
AIRFLOW__SMTP__SMTP_HOST: 'smtp.gmail.com'
AIRFLOW__SMTP__SMTP_MAIL_FROM: 'sample_email@my_email.com'
AIRFLOW__SMTP__SMTP_USER: 'sample_email@my_email.com'
AIRFLOW__SMTP__SMTP_PASSWORD: 'your_password'
AIRFLOW__SMTP__SMTP_PORT: '587'

We can add the additional services to our docker-compose.yaml file. Having a single docker-compose file will make things easier and we can start all the services with only one command.

zoo1:
image: confluentinc/cp-zookeeper:7.3.2
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888
networks:
- kafka-network
- cassandra-kafka

kafka1:
image: confluentinc/cp-kafka:7.3.2
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
networks:
- kafka-network
- cassandra-kafka

kafka2:
image: confluentinc/cp-kafka:7.3.2
container_name: kafka2
ports:
- "9093:9093"
- "29093:29093"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 2
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
networks:
- kafka-network
- cassandra-kafka

kafka3:
image: confluentinc/cp-kafka:7.3.2
container_name: kafka3
ports:
- "9094:9094"
- "29094:29094"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 3
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
networks:
- kafka-network
- cassandra-kafka

kafka-connect:
image: confluentinc/cp-kafka-connect:7.3.2
container_name: kafka-connect
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka1:19092,kafka2:19093,kafka3:19094
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: compose-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: compose-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: compose-connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect'
CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components'
networks:
- kafka-network
- cassandra-kafka

schema-registry:
image: confluentinc/cp-schema-registry:7.3.2
container_name: schema-registry
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka1:19092,kafka2:19093,kafka3:19094
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
networks:
- kafka-network
- cassandra-kafka

kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8888:8080
depends_on:
- kafka1
- kafka2
- kafka3
- schema-registry
- kafka-connect
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: PLAINTEXT://kafka1:19092,PLAINTEXT_HOST://kafka1:19092
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: connect
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect:8083
DYNAMIC_CONFIG_ENABLED: 'true'
networks:
- kafka-network
- cassandra-kafka

cassandra:
image: cassandra:latest
container_name: cassandra
hostname: cassandra
ports:
- 9042:9042
environment:
- MAX_HEAP_SIZE=512M
- HEAP_NEWSIZE=100M
- CASSANDRA_USERNAME=cassandra
- CASSANDRA_PASSWORD=cassandra
volumes:
- ./:/home
- cassandra-data:/var/lib/cassandra
networks:
- cassandra-kafka

mongo:
image: mongo
container_name: mongo
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: root
networks:
- cassandra-kafka

mongo-express:
image: mongo-express
container_name: mongo-express
restart: always
ports:
- 8082:8081
environment:
ME_CONFIG_MONGODB_ADMINUSERNAME: root
ME_CONFIG_MONGODB_ADMINPASSWORD: root
ME_CONFIG_MONGODB_URL: mongodb://root:root@mongo:27017/
networks:
- cassandra-kafka

volumes:
cassandra-data:
postgres-db-volume:

networks:
kafka-network:
driver: bridge
cassandra-kafka:
external: true

The additional services include:

  • Kafka
  • Zookeeper
  • MongoDB
  • Cassandra
  • Kafka UI
  • Mongo Express

Please don’t forget to add the external network cassandra-kafka for all the services. If it doesn’t exist yet, you may create it with:

docker network create cassandra-kafka

Once we add the new services and parameters to the default Airflow docker-compose, we can run our containers.

docker compose up airflow-init

This command will initiate Airflow first. Then, we will run the below command to run all the services.

docker compose up -d --build

This command will build the container upon the Dockerfile and start all other services. If we follow the above instructions, we will have a directory called dags. We should locate all our scripts under the dags directory including the DAG script itself. All the scripts that I will explain in this article will be used as Airflow DAG tasks.

Create Kafka Topic

The first thing we have to do is create a new Kafka topic. If the topic already exists, the script will also return the result accordingly.

from confluent_kafka.admin import AdminClient, NewTopic
import logging

logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

admin_config = {
'bootstrap.servers': 'kafka1:19092,kafka2:19093,kafka3:19094',
'client.id': 'kafka_admin_client'
}

admin_client = AdminClient(admin_config)

def kafka_create_topic_main():
"""Checks if the topic email_topic exists or not. If not, create the topic."""
topic_name = 'email_topic'

existing_topics = admin_client.list_topics().topics
if topic_name in existing_topics:
return "Exists"

# Create a new topic
new_topic = NewTopic(topic_name, num_partitions=1, replication_factor=3)
admin_client.create_topics([new_topic])
return "Created"


if __name__ == "__main__":
result = kafka_create_topic_main()
logger.info(result)

We have used the bootstrap servers according to the ones defined in the docker-compose file. We can define the client.id however we want. The script will return “Exists” if the topic already exists and “Created” if the topic has just been created. We will use this information while creating BranchPythonOperator in the second part. We can define the replication factor as 3 since we have 3 Kafka brokers as containers.

We are going to create two DummyOperators depending on the result of this task soon while creating the Airflow DAG. Our topic’s name will be email_topic. We can check if the topic exists or not via Kafka UI.

Kafka Producer

For this project, we want to illustrate streaming data coming to our Kafka topic. That’s why we have to create a Kafka producer as well.

import logging
from confluent_kafka import Producer
import time

# Configure the logger
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger(__name__)

class KafkaProducerWrapper:
def __init__(self, bootstrap_servers):
"""
Initializes the Kafka producer with the given bootstrap servers.
"""
self.producer_config = {
'bootstrap.servers': bootstrap_servers
}
self.producer = Producer(self.producer_config)

def produce_message(self, topic, key, value):
"""
Produces a message to the specified Kafka topic with the given key and value.
"""
self.producer.produce(topic, key=key, value=value)
self.producer.flush()

def kafka_producer_main():
bootstrap_servers = 'kafka1:19092,kafka2:19093,kafka3:19094'
kafka_producer = KafkaProducerWrapper(bootstrap_servers)

topic = "email_topic"
key = "sample_email@my_email.com"
value = "1234567"

start_time = time.time()

try:
while True:
kafka_producer.produce_message(topic, key, value)
logger.info("Produced message")

elapsed_time = time.time() - start_time
if elapsed_time >= 20: # Stop after 20 seconds
break

time.sleep(5) # Sleep for 5 seconds between producing messages
except KeyboardInterrupt:
logger.info("Received KeyboardInterrupt. Stopping producer.")
finally:
kafka_producer.producer.flush()
logger.info("Producer flushed.")

if __name__ == "__main__":
kafka_producer_main()

This script will produce messages to the email_topic which include sample_email@my_email.com as key and 1234567 as value. The value will be the one-time password and the key will be the e-mail addresses coming to the Kafka topic. This process will go on for 20 seconds. We can modify the time period part according to our use case.

We can also manually check if the data is produced to the email_topic via Kafka UI.

Kafka Consumer for Cassandra

Up until this point, we have created a Kafka topic and produced messages to the email_topic. From now on, we have to consume the messages coming to email_topic. This will be in two parts, the first part is for Cassandra and the second part is for MongoDB. In this section, I will explain the one for Cassandra.

After importing all the necessary libraries, we have to connect to Cassandra and execute the necessary commands.

class CassandraConnector:
def __init__(self, contact_points):
self.cluster = Cluster(contact_points)
self.session = self.cluster.connect()
self.create_keyspace()
self.create_table()
def create_keyspace(self):
self.session.execute("""
CREATE KEYSPACE IF NOT EXISTS email_namespace
WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}
""")
def create_table(self):
self.session.execute("""
CREATE TABLE IF NOT EXISTS email_namespace.email_table (
email text PRIMARY KEY,
otp text
)
""")
def insert_data(self, email, otp):
self.session.execute("""
INSERT INTO email_namespace.email_table (email, otp)
VALUES (%s, %s)
""", (email, otp))
def shutdown(self):
self.cluster.shutdown()

This class will be used to connect to the Cassandra server first. Then, it will create a keyspace named email_namespace and a table named email_table. After obtaining the messages coming to the Kafka topic, it will insert it into the newly created table.

def fetch_and_insert_messages(kafka_config, cassandra_connector, topic, run_duration_secs):
consumer = Consumer(kafka_config)
consumer.subscribe([topic])
start_time = time.time()
try:
while True:
elapsed_time = time.time() - start_time
if elapsed_time >= run_duration_secs:
break

msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
logger.info('Reached end of partition')
else:
logger.error('Error: {}'.format(msg.error()))
else:
email = msg.key().decode('utf-8')
otp = msg.value().decode('utf-8')
query = "SELECT email FROM email_namespace.email_table WHERE email = %s"
existing_email = cassandra_connector.session.execute(query, (email,)).one()
if existing_email:
logger.warning(f'Skipped existing email: Email={email}')
else:
cassandra_connector.insert_data(email, otp)
logger.info(f'Received and inserted: Email={email}, OTP={otp}')

except KeyboardInterrupt:
logger.info("Received KeyboardInterrupt. Closing consumer.")
finally:
consumer.close()

The above function will consume all the incoming messages for a predefined time period (30 seconds for this case) and populate the corresponding Cassandra table with them. If the data already exists in the table, it will skip those and will log them on the Logs section of Airflow.

def kafka_consumer_cassandra_main():
cassandra_connector = CassandraConnector(['cassandra'])
cassandra_connector.create_keyspace()
cassandra_connector.create_table()
kafka_config = {
'bootstrap.servers': 'kafka1:19092,kafka2:19093,kafka3:19094',
'group.id': 'cassandra_consumer_group',
'auto.offset.reset': 'earliest'
}
topic = 'email_topic'
run_duration_secs = 30
fetch_and_insert_messages(kafka_config, cassandra_connector, topic, run_duration_secs)
cassandra_connector.shutdown()

We will use this function for our Airflow task. This basically combines all the methods we have created so far. It will create the keyspace and table after connecting to the Cassandra server. After consuming the messages coming to the email_topic, it will insert the non-existing ones into the Cassandra table.

We can manually check the data's existence by running the following commands in order.

docker exec -it cassandra /bin/bash/
cqlsh -u cassandra -p cassandra
select * from email_namespace.email_table;

Kafka Consumer for MongoDB

In this section, I will explain how to connect to MongoDB and insert the incoming messages into the corresponding collection. After importing all the necessary libraries, we have to connect to Cassandra and execute the necessary commands.

class MongoDBConnector:
def __init__(self, mongodb_uri, database_name, collection_name):
self.client = MongoClient(mongodb_uri)
self.db = self.client[database_name]
self.collection_name = collection_name
def create_collection(self):
# Check if the collection already exists
if self.collection_name not in self.db.list_collection_names():
self.db.create_collection(self.collection_name)
logger.info(f"Created collection: {self.collection_name}")
else:
logger.warning(f"Collection {self.collection_name} already exists")
def insert_data(self, email, otp):
document = {
"email": email,
"otp": otp
}
self.db[self.collection_name].insert_one(document)
def close(self):
self.client.close()

We don’t need to explicitly create a new database since it is created on the fly for MongoDB (this part is different than Cassandra).

class KafkaConsumerWrapperMongoDB:
def __init__(self, kafka_config, topics):
self.consumer = Consumer(kafka_config)
self.consumer.subscribe(topics)
def consume_and_insert_messages(self):
start_time = time.time()
try:
while True:
elapsed_time = time.time() - start_time
if elapsed_time >= 30:
break
msg = self.consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
logger.info('Reached end of partition')
else:
logger.warning('Error: {}'.format(msg.error()))
else:
email = msg.key().decode('utf-8')
otp = msg.value().decode('utf-8')
existing_document = self.db[self.collection_name].find_one({"email": email, "otp": otp})
if existing_document:
logger.warning(f"Document with Email={email}, OTP={otp} already exists in the collection.")
else:
mongodb_connector.insert_data(email, otp)
logger.info(f'Received and inserted: Email={email}, OTP={otp}')
except KeyboardInterrupt:
logger.info("Received KeyboardInterrupt. Closing consumer.")
finally:
mongodb_connector.close()
def close(self):
self.consumer.close()

The above class will consume all the incoming messages for a predefined time period (30 seconds for this case) and populate the corresponding MongoDB collection with them. If the data already exists in the table, it will skip those.

def kafka_consumer_mongodb_main():
mongodb_connector.create_collection()
kafka_config = {
'bootstrap.servers': 'kafka1:19092,kafka2:19093,kafka3:19094',
'group.id': 'consumer_group',
'auto.offset.reset': 'earliest'
}
topics = ['email_topic']
kafka_consumer = KafkaConsumerWrapperMongoDB(kafka_config, topics)
kafka_consumer.consume_and_insert_messages()

We will use this function for our Airflow task. This basically combines all the methods and classes we have created so far. It will create the collection after connecting to the MongoDB server. After consuming the messages coming to email_topic, it will insert the non-existing ones into the MongoDB collection.
We can check the data’s existence manually via Mongo Express.

In this part of the article, we have illustrated streaming data with Kafka producer and consumed the messages coming to the email_topic both for Cassandra and MongoDB. In the second part, we will check the data correctness for both, send an e-mail and a Slack message. In the end, I will explain the whole Airflow DAG.

Thanks for reading, hope it helps :)

Please reach out via Linkedin and Github, all comments are appreciated 🕺

--

--