Building a Real-Time Streaming Data Pipeline: A Journey through Apache Kafka, Airflow, Blob Storage, DBT, Snowflake, ElasticSearch, Logstash and Kibana

Murat_Aydin
14 min readNov 19, 2023

--

Tech Stack

  • Apache Kafka
  • Apache Airflow
  • Azure Blob Storage
  • Snowflake
  • DBT
  • ElastichSearch
  • Logstash
  • Kibana
  • Docker

Note: given the number of technologies used and the complex architecture of the pipeline, this project needs many files to be configured. The project might take a quite a bit of time to implement and run! So I suggest you to start implementing it when you have at least 2–3 free hours.

Introduction

In the ever-evolving landscape of data engineering, the need for real-time streaming solutions has become paramount and there are many different technologial stacks used for this aim. However, in my quest to master the complexity of real-time streaming, I found myself looking for a project that would not only challenge my understanding but also contribute meaningfully to an industry’s efficiency and eventually I got inspired by a taxi trip that I cancelled.

The Taxi Dilemma

Around two months ago, I called a taxi through an app in Milano. When I called it, it was initially 5 km away from me and after around 5 min (maybe less??) I needed to cancel it for some reason when the driver was close to me as much as 3km.

Some days later, I found myself pondering, “How many people cancel their trips in a given day, and what is the cost of this to the drivers?” What if the taxi app could predict cancellations and avoid unnecessary trips for drivers? Inspired by these thoughts, I decided to create fake taxi app data, simulate, and analyze the process to develop a solution that would mitigate such costs.

In this article, we are going to create a robust and scalable real time data streaming pipeline using cutting-edge technologies like Apache Kafka, Apache Airflow, Azure Blob Storage, Snowflake, DBT, Elasticsearch, Logstash, and Kibana, all orchestrated within the containerized environment of Docker to find a solution to the Taxi Dilemma.

Creating Synthetic Data: The Faker Library

To simulate real-world scenarios, I turned to the Faker library in Python which fakes almost anything you like in many different languages, generating diverse and extensive dataset. The dataset I created for this project includes user information, driver details, geographical locations, booking and cancellation times, weather conditions, and many many more other fields. In short, any necessary information for a taxi calling app to work.

To take the experimentation a step further, I established an endpoint using Flask, a lightweight web application framework in Python. I made the real-time data stream accessible through the following endpoint:

http://localhost:8081/fake_taxi_data

The kafka producer was listening http://localhost:8081/fake_taxi_data in real time and sending it to the Kafka consumer which eventually writes the data to azure blob storage.

IMPORTANT NOTE:

The dataset size and the frequency at which the synthetic data would be generated depends on us. For example, we can create 100 records within 10 seconds but also 1000 records. This depends on how robust and scalable our architecture is. However, keep in my data my aim here is not to create the “most robust or scalable” architecture, I just wanted to show to those who are interested how real time streaming can be done, what are the problems we face, what solutions we can produce for them and what technologies we can use.

ARCHITECTURE

Real Time Streaming Pipeline For Taxi Call App

Once the data reaches the blob storage, Airflow triggers the pipeline which uploads the data in raw format to Snowflake, does transformations in DBT, loads back to Snowflake, creates a machine learning MART in Snowflake, get the data from ML mart to docker container, do machine learning using RandomForest and try to predict if a given taxi call would be cancelled our not. Upload the results to snowflake, use ELK stack to visualize the result.

ON ELK STACK

For those who never saw ELK stack, it gives us the ability to aggregate logs from all your systems and applications, analyze these logs, and create visualizations for application and infrastructure monitoring, faster troubleshooting.

How does it work?

Logstash is employed to collect, process, and filter raw data from various sources, such as logs, application events, or metrics. It transforms the data into a standardized format suitable for storage and analysis and then sends the processed data to Elasticsearch, where it is indexed and stored. Elasticsearch’s powerful search capabilities enable fast and flexible querying of the stored data.

What means that it is indexed? ES does not look for columns, it looks for indices of the data so that the retrieval is faster. Indexing in this context means that field values are mapped to documents or rows for faster searching.

And lastly, Kibana connects to Elasticsearch, allowing users to create visualizations and dashboards based on the indexed data.

DATA MODEL

I would like to show the data model based on the DBT lineage graph which I think make things way easier. Before diving into the data model, I suggest you to have a look at my previous project on Airbnb dataset where I explained more in detail the DBT and Snowflake pipeline orchestrated by Airflow:

Lineage graph provided by DBT

The fake data with all the fields available is uploaded in json format in TAXI_CALLS_JSON table in snowflake. This table has a row for each unique taxi call along with the status which is either canceled or completed. The json values are then converted and uploaded in taxi_calls_table in a standard columnar format.

Here we need to make an assumption. Obviously, when a taxi call is made, the status ( whether it will be canceled or not) is not known a priori. We need to wait sometime for it. However, for the sake of simplicity I prefered not to, and already included the status in the creation time of the data. So, once a call is made, we will get a random status for it ( you can check the python code later and see how this randomness work).

Again, we could first generate the fake data without the status and after some time, depending on how long the trip would take( its all random, check the python code), we could also create a status for each trip_id, create an extra table in snowflake and eventually join with the taxi_calls_table which would be more realistic but increases the complexity.

From taxi_calls_table, we generate two views which is ml_mart and calls_result. Ml_mart is where we have the data ready to be applied machine learning to predict the status of a taxi-call. I created the ml_mart view based on the taxi_calls_table without doing any transformations. However, in real case scenario we would do many transfomations, add or derive columns etc. the process that any standard ML project goes through. You therefore can assume that the data in taxi_calls_table goes through a complex transformation before being a view in ml_mart.

Calls_result instead is the view which has the information about status of a taxi-call. This view too is created without any transformations from the source table. Again, we could have created it with another process without using the source data but I think this already serves well our end. So for the sake of simplicity, you can think of this view as a separate table from the taxi_calls_table.

Locations, users and driver tables are the fake metadata that is created only once and uploaded through DBT seed command.

Once we get the data from ml_mart, we do ML, do predictions on status, upload the data to azure which is then staged to snowflake in taxi_calls_predictions_json in json format and then we convert it to columnar format in predictions table.

Then all these views feed the metrics_to_kibana view which has many joins and some transformations, you will see the specifics below in sql file. This view is ingested by logstash, sent to elasticsearch and the result is visualized in Kibana.

SETTING UP THE ENVIRONMENT

Repository

For successfully running this project, you will need to get an azure account and a snowflake account, you can get the trial accounts for free. You then need to clone git following repository.

git clone https://github.com/Murataydinunimi/Real_Time_Streaming_Kafka_Airflow_DBT_Snowflake_ELK-Stack

You then need to create a storage account in azure and four containers like following:

Taxi-app-data container will be a kind of data lake and it will store our fake data consumed by Kafka. When the taxi-calls data are processed and uploaded to the snowflake, airflow will move the data to the taxi-app-data-uploaded container modifying its name as current_timestamp_taxi-app-data_uploaded.json

Similar process will be applied to the predictions. When the data reaches ML_MART view, it gets processed by randomforest and the predictions are uploaded to the predictions container which is then ingested in snowflake data warehouse after which the file is moved to predictions-uploaded container in the same manner.

You then should go to the fake_taxi_data folder, here you will find four scripts.

fake_data.py → produces the fake data and publishes them in http://localhost:8081/fake_taxi_data

thread_script_to_kafka.py → this script gets the data in quasi real time from http://localhost:8081/fake_taxi_data and sends to kafka producer.

consume_kafka_messages.py → This scripts consumes the messages produced by the procuder and uploades them to the azure taxi-app-data container that we created above. It uses the load_to_azure.py while doing it.

What I like about this project is that it is highly customizable. For instance, you can modify the fake_data.py, add or remove fields, conditions to make it more realistic and you can eventually create your own transformations in DBT and make this whole project quite personal.

You should also modify the .env file here with your azure storage account and blob storage information.

Then go to the airflow folder and open the azure.env file and modify that one as well:

Then go to the dbtlearn folder and open the .env file there and modify the values for snowflake configuration like follows :

Modify also the profiles.yml file in the same directory as follows :

Then you should open the snowflake_config.sql file in the snowflake_config folder at the root. Copy and paste it in a snowflake worksheet and run all at once.

What this file is doing is that it creates the necessary roles, gives the necessary permissions, creates the database and the schemas. It also creates two azure stages that are a sort of bridge between the blob storage and snowflake. Go below a little bit in the file and modify the following part:

What happens is that when the fake data arrives in blob storage, we need to read them in snowflake. To do it, we need to use snowflake stages. When the data arrives in blob storage, snowflake is able to see it in real time through its stages.

Notice that this script creates also taxi_calls_json and taxi_calls_predictions_json VARIANT tables where the raw fake data is ingested in as json. See again the dbt lineage graph to have a better understanding.

You also need to configure the mount path of docker operator to run dbt commands. Open the airflow/dags/pipeline.py

You need to paste your own full path to the dbtlearn folder in project repository.

Under the airflow/dags folder you can find all the dags and their tasks and see the specifics. You can also find the random_forest_model.pkl file under airflow/ML/model folder that is trained on the 1000 fake data which is the completed_transactions.csv in the data folder at root.

We now need to configure the ELK Stack! This is the last one, I promise.

In order to establish a connection between snowflake and logstash we use JDBC driver. The JDBC driver is a software component that enables Java applications to interact with databases using the JDBC API. Modify the logstash.conf file above as shown in the pic. When all the configs are given correctly, logstash will get the data from metrics_to_kibana view and send them to elasticsearch. ES will index the data and send them to kibana with the snowflake_metrics index as specified in the logstash.conf file.

Well, that was lots of configurations! We are now ready to build the images. We have two docker-compose.yml files. One for all the services except the ELK stack while the other one is for the ELK stack. Now, you can cd into the docker folder and run docker compose up command.

When the above commad is completed, cd into ELK_stack folder and run another docker compose up. Run the following code to give the neccessary permission for airflow DockerOperator to work properly.

sudo chmod 777 /var/run/docker.sock

When they are all set up, go to airflow UI in:

http://localhost:8080/

your username and password is both airflow. We need to create two connections to be able to use SnowflakeOperator and Azure Sensors. For creating snowflake connection, click on admin-connections-create and enter the following info

password is your snowflake login password.

We then need to create the connection for azure sensor, go to admin-connections and create the following connection:

You should provide three things here. The connection id, connection type and your blob storage connection string that you should have pasted in the .env file that we saw before.

You should now create the dbt seeds. Run docker ps, get the docker id for dbt container which is called custom_dbt_image and exec into the container as:

docker exec -it container_id /bin/bash

and then run the following commands:

You can now see the seeds which are the metadata about drivers, locations and users in snowflake.

We should now start creating the fake data and kafka scripts. Open three different terminals and run the scripts in fake_taxi_data folder in the following order:

python .\fake_data.py
python .\thread_script_to_kafka.py
python .\consume_kafka_messages.py

The fake_data.py will create 60 records in 120 seconds and consume_kafka_messages.py uploads the data to azure in batches with 60 records. So, around each 2 minutes, 60 records will be uploaded to the azure taxi-app-data container. Airflow azure sensor will be scheduled each 120 seconds and will be poking(checking) the taxi-app-data container each 120 seconds. Therefore, it is important running the commands within the time constraint given by the fake data production. Otherwise, you might get the “blob already exists” error from the consume_kafka_message.py or you might get an error from the move_taxi_app_file task in the pipeline dag saying that “the specified blob does not exist” because it might not be ingested and moved to taxi-app-data-uploaded container if there is an inconsistency between upload time and ingestion time. The point here is that by the time kafka writes a new file to the blob storage, the previous file written by kafka should be moved to the taxi-app-data-uploaded container. That is why it is important “when” you start all the scripts and the dags. But even if a dagrun fails, it does not mean that things get wrong, it just means for that dagrun the file is not found because the previous dagrun has already got it and so we need to wait for the next batches of data.

Now, go to airflow UI and unpause the dags.

The fake data is being created and it is available in the http://localhost:8081/fake_taxi_data.

After around two minutes, in the terminal where you run the consume_kafka_messages.py you should see the following output:

The data is now available in the taxi-app-data container. Right after that go to airflow UI and observe that dag_azure_storage is triggered which then triggers the pipeline dag.

Wait for the first run of the pipeline dag to be completed so that logstash can ingest the data from the metrics_to_kibana view. You should see the following output in Airflow UI

You can now go to the kibana in:

http://localhost:5601/

Click on the management button, then saved objects and then import. Browse and choose the taxi_calls_dashboard.ndjson in ELK_stack/dashboard folder.

You should see the output above. These are single visualizations and Taxi_app_Trips_Live_Report is the dashboard which is the collection of all the visualizations. Click on the dashboard and it will ask you to create a new index. As index name write “snow” and choose “ I do not want to use the Time filter” and create the index.

Click on the dashboard and see the live dashboard that is updating itself around each 2 minutes.

When you see that other dags runs are completed like below:

Check again the kibana dashboard and observe that numbers increased in quasi real time.

Here we can see total number of cancelled trips, total KM of all those trips for which the drivers did not go because we correctly predicted that they would be canceled, total number of minutes drivers did not waste to go there, and total estimated avoided cost in Euros that is the cost of oil which the drivers did not waste to go there, the pickup location where the most calls are canceled, the driver which has the most canceled trips and the user who canceled the most trip.

HOW CAN YOU CONTRIBUTE TO THIS PROJECT?

This project is far from perfect. My aim though was not that of creating a perfect solution but was sharing a sample architecture with the community here because when I was doing research for my project I could not really find anything useful for some of the things I did here. I therefore think that this project could be useful for anyone who want to master their skills on this kind of pipeline.

But, this does not mean that there are not points to be improved. Below, I will list some of the main things that you can work on, contribute, improve, customize and maybe push to the repo:

  1. Create different dags for each technology or similar ones. For example, create a dag for all the snowflake tasks and DBT tasks. Use ExternalTaskSensor for a task of dag to wait for another task of a dag.
  2. Make things faster! Produce 100 or 1000 fake data in 20 seconds and ingest it immediately. Lower the poking time of azure sensor.
  3. Create another process for the calls_result view so that the status of a taxi-call is determined after some time the call has been done like in real life. So create another script for it, another table, another ingestion process. You can eventually create a fact table where you join all.
  4. Create dimension and fact tables, do more analytics in dbt, make more models, apply tests to those models.
  5. Make a better visualization in Kibana. Developing other models and tables can make you gain more insight which you eventually can visualize in Kibana!

CONCLUSION

Congratulations to anyone who made it till here!

We tried to create an end-to-end data engineering pipeline in quasi real time using many different technologies together. I know that was lots of configurations but the architecture is quite complex. I might have made mistakes along the way, you can definitely report them to me!

If you have any questions please do not hesistate to reach out to me either here or in linkedin!

Thanks for reading.

--

--