Data Engineering End-to-End Project — Part 2— Airflow, Kafka, Cassandra, MongoDB, Docker, EmailOperator, SlackWebhookOperator

Dogukan Ulu
Apache Airflow
Published in
9 min readOct 12, 2023

In this part of the project, we will check if the correct data exists in the Cassandra table and MongoDB collection. If so, we are going to send the e-mail to the incoming address with the OTP information and send a Slack message that includes the e-mail and OTP information in the message body. Slack part can be used to inform people in the Slack channel. The e-mail part can be used to inform the related e-mail address.

In the end, we will create the whole Airflow DAG that includes all the necessary tasks.

Check Cassandra Data

In this section, we will check the specific e-mail address’ existence in the Cassandra table.

class CassandraConnector:
def __init__(self, contact_points):
self.cluster = Cluster(contact_points)
self.session = self.cluster.connect()
def select_data(self, email):
query = "SELECT * FROM email_namespace.email_table WHERE email = %s"
result = self.session.execute(query, (email,))

data_dict = {}

for row in result:
data_dict['email'] = row.email
data_dict['otp'] = row.otp
logger.info(f"Email: {row.email}, OTP: {row.otp}")

if len(data_dict) == 0:
data_dict['email'] = ''
data_dict['otp'] = ''

return data_dict
def close(self):
self.cluster.shutdown()

We will connect to Cassandra and select from the corresponding table. If the related data exists in the table, we will create a dictionary with that data. If not, we will return an empty dictionary. Creating the dictionary even if it is empty is necessary because if it is empty, the EmailOperator task will fail. That will help us detect if the correct data exists or not and we will use the result of this script for the EmailOperator.

We will use the below function as our Airflow task. We will define the specific e-mail address and check the data’s existence.

def check_cassandra_main():
cassandra_connector = CassandraConnector(['cassandra'])
sample_email = 'sample_email@my_email.com'
data_dict = cassandra_connector.select_data(sample_email)
cassandra_connector.close()
logger.info(f"Data found for email: {data_dict['email']}")
logger.info(f"OTP: {data_dict['otp']}")

Check MongoDB Data

In this section, we will check the specific e-mail address’ existence in the MongoDB collection.

def check_mongodb_main():
mongodb_uri = 'mongodb://root:root@mongo:27017/'
database_name = 'email_database'
collection_name = 'email_collection'
client = MongoClient(mongodb_uri)
db = client[database_name]
collection = db[collection_name]
sample_email = 'sample_email@my_email.com'
result = collection.find_one({'email': sample_email})
data_dict = {}
if result:
logger.info(f"Data found for email: {result['email']}")
logger.info(f"OTP: {result['otp']}")

data_dict['email'] = result.get('email')
data_dict['otp'] = result.get('otp')

client.close()
else:
data_dict['email'] = ''
data_dict['otp'] = ''
client.close()
return data_dict

We will connect to MongoDB and select from the corresponding collection. If the related data exists in the collection, we will create a dictionary with that data. If not, we will return an empty dictionary. Creating the dictionary even if it is empty is necessary because if it is empty, the EmailOperator task will fail. That will help us detect if the correct data exists or not and we will use the result of this script for the EmailOperator.

We will use the above function as our Airflow task. We will define the specific e-mail address and check the data’s existence.

mongodb_uri is defined depending on the configuration in the Docker container.

EmailOperator

In this part, I will explain how to get ready to create the task with the EmailOperator. I won’t separate it into two parts for Cassandra and MongoDB since they will be the same when it comes to the preparation.

We are going to use the Gmail SMTP server to send an e-mail. Remember that we have added some parameters to our docker-compose file during the first part of the article. You can see them below:

AIRFLOW__SMTP__SMTP_HOST: 'smtp.gmail.com'
AIRFLOW__SMTP__SMTP_MAIL_FROM: 'sample_email@my_email.com'
AIRFLOW__SMTP__SMTP_USER: 'sample_email@my_email.com'
AIRFLOW__SMTP__SMTP_PASSWORD: 'your_password'
AIRFLOW__SMTP__SMTP_PORT: '587'

The first thing we have to do is replace sample_email@my_email.com with the desired e-mail address that we want to send the e-mails from. For the password part, we have to create a new app password that will allow us to send e-mails. For that, we have to go to this link and create the password. Once we create it, we can replace the your_password part with the one we obtained.

Once we get all of them ready, we can then run our docker-compose file. This will allow us to send e-mails in the end. I will explain how to create the EmailOperator task in the Airflow DAG section below.
Once successful, we will get the e-mail below.

SlackWebhookOperator

For this project, we want to send a Slack message that contains e-mail and OTP information in the message body. I will explain how to send that message in this part without separating it into two parts for Cassandra and MongoDB since they are pretty similar to each other.

First of all, we have to go to this link to create a Slack Webhook token.

  • We should first click on the “Create App” button
  • It will redirect us to another page. On that page, we should log in to the account that we want to send the message to.
  • Once we are logged in, we should create a dedicated workspace.
  • Inside the workspace, we should also create a dedicated channel so that it will help us see the incoming messages better.
  • While creating the webhook, we should create a bot to send the messages to the dedicated channel. I named my bot Airflow Slack Webhook.
  • Once we create the token, it should look like this: https://hooks.slack.com/services/T…

To be able to send the messages to Slack, we have to go to Airflow UI with username airflow and password airflow. From the Admin -> Connections section, we have to create a Slack connection. We should define the parameters as below:

  • Connection ID: We can name our connection however we want.
  • Connection Type: Slack Incoming Webhook
  • Slack Webhook Endpoint: https://hooks.slack.com/services/
  • Webhook Token: This will come from the webhook token we obtained recently. We should take the part that comes after the endpoint part. It generally starts with T.

After defining all, we should now create the connection. After creating the connection, we can now use SlackWebhookOperator successfully. It will get the token from the connections and use it accordingly to send messages to the dedicated workspace and channel.
Once successful, we will get the Slack message below.

Airflow DAG

Here comes the last part of the project. We are going to combine all the parts we have created so far. First of all, we have to import the necessary operators. These include:

  • DummyOperator
  • PythonOperator
  • BranchPythonOperator
  • EmailOperator
  • SlackWebhookOperator
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.email import EmailOperator
from airflow.operators.python import BranchPythonOperator
from airflow.operators.python_operator import PythonOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator

As I mentioned earlier, we should locate all our scripts under the dags directory so that we can directly import the necessary methods as below.

from check_mongodb import check_mongodb_main
from kafka_producer import kafka_producer_main
from check_cassandra import check_cassandra_main
from kafka_create_topic import kafka_create_topic_main
from kafka_consumer_mongodb import kafka_consumer_mongodb_main
from kafka_consumer_cassandra import kafka_consumer_cassandra_main

After importing everything necessary, we can define the default arguments for the dag as below.

start_date = datetime(2022, 10, 19, 12, 20)

default_args = {
'owner': 'airflow',
'start_date': start_date,
'retries': 1,
'retry_delay': timedelta(seconds=5)
}

Once created, we can define email and OTP data coming from the data check scripts. These will be used for EmailOperator and SlackWebhookoperator.

We should also create a new function decide_branch. This will be used for PythonBranchOperator depending on the result it creates. We are going to connect this to two separate dummy operators.

email_cassandra = check_cassandra_main()['email']
otp_cassandra = check_cassandra_main()['otp']
email_mongodb = check_mongodb_main()['email']
otp_mongodb = check_mongodb_main()['otp']

def decide_branch():
create_topic = kafka_create_topic_main()
if create_topic == "Created":
return "topic_created"
else:
return "topic_already_exists"

We can now create the main DAG. We will have 3 main parts for this DAG.
The first part will include:

  • Creating the new Kafka topic
  • Depending on the result of the Kafka create task, we will add two dummy operators.
  • Regardless of the results of the dummy operators, we will start producing messages to the Kafka topic.

The second part will include:

  • We will consume messages coming to the Kafka topic and insert them into the Cassandra table.
  • After inserting them, we will check the data in the table and create a dictionary accordingly. If the data exists in the table, the dictionary will contain the information. If not, it will be empty.
  • Depending on the result of the check data task, we will send an e-mail to the dedicated e-mail address
  • After sending the e-mail, we will send the Slack message to the dedicated channel with the information coming from the Cassandra table.

The third part will include:

  • We will consume messages coming to the Kafka topic and insert them into the MongoDB collection.
  • After inserting them, we will check the data in the collection and create a dictionary accordingly. If the data exists in the collection, the dictionary will contain the information. If not, it will be empty.
  • Depending on the result of the check data task, we will send an e-mail to the dedicated e-mail address
  • After sending the e-mail, we will send the Slack message to the dedicated channel with the information coming from the MongoDB collection.

All these parts will be running in parallel to each other.

with DAG('airflow_kafka_cassandra_mongodb', default_args=default_args, schedule_interval='@daily', catchup=False) as dag:

create_new_topic = BranchPythonOperator(task_id='create_new_topic', python_callable=decide_branch)

kafka_consumer_cassandra = PythonOperator(task_id='kafka_consumer_cassandra', python_callable=kafka_consumer_cassandra_main,
retries=2, retry_delay=timedelta(seconds=10),
execution_timeout=timedelta(seconds=45))

kafka_consumer_mongodb = PythonOperator(task_id='kafka_consumer_mongodb', python_callable=kafka_consumer_mongodb_main,
retries=2, retry_delay=timedelta(seconds=10),
execution_timeout=timedelta(seconds=45))

kafka_producer = PythonOperator(task_id='kafka_producer', python_callable=kafka_producer_main,
retries=2, retry_delay=timedelta(seconds=10),
execution_timeout=timedelta(seconds=45))

check_cassandra = PythonOperator(task_id='check_cassandra', python_callable=check_cassandra_main,
retries=2, retry_delay=timedelta(seconds=10),
execution_timeout=timedelta(seconds=45))

check_mongodb = PythonOperator(task_id='check_mongodb', python_callable=check_mongodb_main,
retries=2, retry_delay=timedelta(seconds=10),
execution_timeout=timedelta(seconds=45))

topic_created = DummyOperator(task_id="topic_created")

topic_already_exists = DummyOperator(task_id="topic_already_exists")

send_email_cassandra = EmailOperator(
task_id='send_email_cassandra',
to=email_cassandra,
subject='One-Time Password',
html_content=f"""
<html>
<body>
<h1>Your OTP</h1>
<p>{otp_cassandra}</p>
</body>
</html>
"""
)

send_email_mongodb = EmailOperator(
task_id='send_email_mongodb',
to=email_mongodb,
subject='One-Time Password',
html_content=f"""
<html>
<body>
<h1>Your OTP</h1>
<p>{otp_mongodb}</p>
</body>
</html>
"""
)

send_slack_cassandra = SlackWebhookOperator(
task_id='send_slack_cassandra',
slack_webhook_conn_id = 'slack_webhook',
message=f"""
:red_circle: New e-mail and OTP arrival
:email: -> {email_cassandra}
:ninja: -> {otp_cassandra}
""",
channel='#data-engineering',
username='airflow'
)

send_slack_mongodb = SlackWebhookOperator(
task_id='send_slack_mongodb',
slack_webhook_conn_id = 'slack_webhook',
message=f"""
:red_circle: New e-mail and OTP arrival
:email: -> {email_mongodb}
:ninja: -> {otp_mongodb}
""",
channel='#data-engineering',
username='airflow'
)

create_new_topic >> [topic_created, topic_already_exists] >> kafka_producer
kafka_consumer_cassandra >> check_cassandra >> send_email_cassandra >> send_slack_cassandra
kafka_consumer_mongodb >> check_mongodb >> send_email_mongodb >> send_slack_mongodb

This DAG will be running daily, but we can change schedule_interval depending on our use case.

EmailOperator:

  • to: The e-mail address that is obtained from the NoSQL database. We are going to send our e-mail to this address. It will be sent from the e-mail address we defined in the docker-compose file
  • subject: The e-mail subject
  • html_content: The message body that we will include in the e-mail

SlackWebhookOperator:

  • slack_webhook_conn_id: The connection ID that we defined during the creation of the new Slack connection.
  • message: The message we want to send to Slack. We can include emojis or define the message like a body which will include separate sections in the message itself.
  • channel: We should define the channel we want the message to be sent
  • username: The message will be sent with the name of the bot we created. If we don’t use a bot, we can use the username parameter as well.

Below is what our DAG will look like once we complete working on it. You can check it out via the Airflow UI.

Thanks for reading, hope it helps :)

Please reach out via Linkedin and Github, all comments are appreciated 🕺

--

--