Data Engineering End-to-End Project — Part 1 — Spark, Kafka, Elasticsearch, Kibana, MinIO, Docker, Airflow, Hadoop YARN, HDFS, Zookeeper, Pandas
Tech Stack
- Apache Airflow
- Apache Zookeeper
- Apache Kafka
- Apache Hadoop HDFS
- Apache Spark (PySpark)
- Apache Hadoop YARN
- Elasticsearch
- Kibana
- MinIO
- Docker
- Python
- SQL
Overview
- This project will consist of 2 main parts. This article will focus on the first part.
- Take a compressed data source from a URL
- Process the raw data either with PySpark or Pandas and use HDFS as the file storage, check resources with Apache Hadoop YARN.
- Use a data generator to simulate streaming data, and send the data to Apache Kafka (producer).
- Read the streaming data from the Kafka topic (consumer) using PySpark (Spark Structured Streaming).
- Write the streaming data to Elasticsearch, and visualize it using Kibana.
- Write the streaming data to MinIO (AWS Object Storage).
- Use Apache Airflow to orchestrate the whole data pipeline.
- Use Docker to containerize Elasticsearch, Kibana, and MinIO.
Steps of the Project
- We should have Apache Kafka, Apache Spark, and Apache Hadoop installed locally. Elasticsearch, Kibana, and MinIO can be used via docker-compose.yaml.
- All steps of the data pipeline can be seen via Airflow DAG. They are all explained here as well.
- All scripts were written according to my local file/folder locations. But all the mentioned scripts can be found in the repo.
You may see all localhost ports we will be using during the project below.
Elasticsearch -> localhost:5601
Airflow -> localhost:1502
MinIO -> localhost:9001
Spark Jobs -> localhost:4040
pgAdmin -> localhost:5050
Kafka -> localhost:9092
Hadoop Namenode -> localhost:9870
Hadoop YARN -> localhost:8088/cluster
Hadoop HDFS -> localhost:9000
Start All Services
To make Apache Airflow, Docker, Apache Hadoop, Apache Kafka, and Apache Zookeeper available, we should run the following commands (This step may differ on how we installed these locally):
sudo systemctl start docker
sudo systemctl start airflow
sudo systemctl start airflow-scheduler
sudo systemctl start zookeeper
sudo systemctl start kafka
start-all.sh
cd /<location_of_docker_compose.yaml>/ && docker-compose up -d
You can access docker-compose file as follows:
Download the Data
Before starting to work with the data and run the scripts, we should first download the data locally with the following command.
wget -O <your_local_directory>/sensors.zip https://github.com/dogukannulu/datasets/raw/master/sensors_instrumented_in_an_office_building_dataset.zip
This zip file contains a folder named KETI
. Each folder inside this main folder represents a room number. Each room contains five csv
files, and each represents a property belonging to these rooms. These properties are:
- CO2
- Humidity
- Light
- Temperature
- PIR (Passive Infrared Sensor Data)
Each CSV also includes a timestamp column.
Unzip the Downloaded Data and Remove README.txt
We should then unzip this data via the following command:
unzip <location_of_zip_file>/sensors.zip -d /<desired_location_of_unzipped_folder/
Then, we have to remove README.txt since the algorithm of the Spark script requires only folders under KETI
, not files:
rm /<location_of_KETI>/KETI/README.txt
Put Data into HDFS
KETI
folder is now installed to our local successfully. Since PySpark gets the data from HDFS, we should put the local folder in HDFS as well using the following command:
hdfs dfs -put /<location_of_KETI>/KETI/ /<desired_location_to_put_KETI>/
We can browse for the HDFS location we put the data in via localhost:9000
NOTE: The Spark and Airflow scripts are running inside a virtualenv. The purpose of doing this is not to have a library-related issue while running these. The related libraries can be installed globally as well.
Read and Write Initial Data with Pandas Script
Both read_and_write_pandas.py
and read_and_write_spark.py
can be used to modify the initial data. They both do the same job.
All the methods and operations are described with comments and docstrings in the scripts. I will list all the code snippets here as well.
First, we will start with the pandas. We have to first import necessary libraries and packages and define global variables.
import os
import pandas as pd
from functools import reduce
directory = '/home/train/datasets/KETI'
dataframes = {}
dataframes_room = {}
columns = ['co2', 'humidity', 'light', 'pir', 'temperature']
Since the data is located locally as separate CSV files, we have to first create separate pandas data frames out of them.
def create_separate_dataframes() -> dict:
"""
Creates a dictionary that includes room numbers as keys and dataframes per room as values
"""
count2 = 0
for filename in os.listdir(directory):
new_directory = directory + '/' + filename
count = 0
for new_files in os.listdir(new_directory):
f = os.path.join(new_directory, new_files)
my_path = new_directory.split('/')[-1] + '_' + new_files.split('.')[0] # e.g. 656_co2
dataframes[my_path] = pd.read_csv(f, names=['ts_min_bignt', columns[count]]) # Sample key: 656_co2. Dataframe columns: ts_min_bignt, co2
count += 1
count2 += 1
# Below creates dataframe per room
dataframes_room[filename] = reduce(lambda left, right:
pd.merge(left, right, on='ts_min_bignt', how='inner'),
[dataframes[f'{filename}_co2'], dataframes[f'{filename}_humidity'], \
dataframes[f'{filename}_light'], dataframes[f'{filename}_pir'],\
dataframes[f'{filename}_temperature']])
dataframes_room[filename]['room'] = filename # adds room number as column.
return dataframes_room
Since we will be using a data generator and it will send the data row by row to the Kafka topic, we should have a single pandas data frame. Below, a single main data frame is created from many separate data frames and saved it as a single CSV file.
def create_main_dataframe(separate_dataframes:dict):
"""
Concats all per-room dataframes vertically. Creates final dataframe.
"""
dataframes_to_concat = []
for i in separate_dataframes.values():
dataframes_to_concat.append(i)
df = pd.concat(dataframes_to_concat, ignore_index=True)
df = df.sort_values('ts_min_bignt') # All data is sorted according to ts_min_bignt. We want it to stream according to timestamp.
df.dropna(inplace=True)
df["event_ts_min"] = pd.to_datetime(df["ts_min_bignt"], unit='s') # Create datetime column
return df
def write_main_dataframe(df):
"""
Writes the final dataframe to the local.
"""
df.to_csv('/home/train/data-generator/input/sensors.csv', index=False)
We can check localhost:8088
to see the resource usage (YARN) of the running jobs while the Spark script is running.
Written data:
NOTE: With this step, we have our data ready. You can see it as
sensors.csv
.
Read and Write Initial Data with PySpark Script
We are going to continue with PySpark. All the steps target the same process, We have to first import the necessary libraries and packages and create a Spark session.
import os
import findspark
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
findspark.init("/opt/manual/spark") # This is where local spark is installed
spark = SparkSession.builder \
.appName("Spark Read Write") \
.master("local[2]") \
.getOrCreate()
Since the data is located locally as separate CSV files, we have to first create separate data frames out of them.
def create_separate_dataframes() -> dict:
"""
Creates a dictionary that includes room numbers as keys and dataframes per room as values
"""
dataframes = {} # This dict saves property dataframe per room as values.
directory = '/home/train/datasets/KETI'
dataframes_room = {} # This dict saves dataframes per room.
columns = ['co2', 'humidity', 'light', 'pir', 'temperature']
count2 = 0
for filename in os.listdir(directory): # loop through the folders under KETI
new_directory = directory + '/' + filename # e.g. /home/train/datasets/KETI/656
count = 0
for new_files in os.listdir(new_directory): # loop through the files under each room folder
f = os.path.join(new_directory, new_files) # e.g. /home/train/datasets/KETI/656/co2.csv
f_hdfs = f.replace('home', 'user') # e.g. /user/train/datasets/KETI/656/co2.csv
my_path = filename + '_' + new_files.split('.')[0] # e.g. 656_co2
dataframes[my_path] = spark.read.csv(f'{f_hdfs}')
dataframes[my_path] = dataframes[my_path].toDF('ts_min_bignt', columns[count]) # Sample key: 656_co2. Dataframe columns: ts_min_bignt, co2
count += 1
count2 += 1
dataframes[f'{filename}_co2'].createOrReplaceTempView('df_co2')
dataframes[f'{filename}_humidity'].createOrReplaceTempView('df_humidity')
dataframes[f'{filename}_light'].createOrReplaceTempView('df_light')
dataframes[f'{filename}_pir'].createOrReplaceTempView('df_pir')
dataframes[f'{filename}_temperature'].createOrReplaceTempView('df_temperature')
# Below sql joins on ts_min_bignt and creates the dataframe per room
dataframes_room[filename] = spark.sql('''
select
df_co2.*,
df_humidity.humidity,
df_light.light,
df_pir.pir,
df_temperature.temperature
from df_co2
inner join df_humidity
on df_co2.ts_min_bignt = df_humidity.ts_min_bignt
inner join df_light
on df_humidity.ts_min_bignt = df_light.ts_min_bignt
inner join df_pir
on df_light.ts_min_bignt = df_pir.ts_min_bignt
inner join df_temperature
on df_pir.ts_min_bignt = df_temperature.ts_min_bignt
''')
dataframes_room[filename] = dataframes_room[filename].withColumn("room", F.lit(filename))
return dataframes_room
Since we will be using a data generator and it will send the data row by row to the Kafka topic, we should have a single data frame. Below, a single main data frame is created from many separate data frames and saved it as a single CSV file.
def unionAll(dfs):
"""
Merges multiple dataframes vertically.
"""
return reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs)
def create_main_dataframe(separate_dataframes:dict):
"""
Merges all per-room dataframes vertically. Creates final dataframe.
"""
dataframes_to_concat = []
for i in separate_dataframes.values():
dataframes_to_concat.append(i)
df = reduce(DataFrame.unionAll, dataframes_to_concat)
df = df.sort(F.col("ts_min_bignt")) # All data is sorted according to ts_min_bignt. We want it to stream according to timestamp.
df = df.dropna()
df_main = df.withColumn("event_ts_min", F.from_unixtime(F.col("ts_min_bignt")).cast(DateType()))
df_main = df_main.withColumn("event_ts_min", F.date_format(F.col("event_ts_min"), "yyyy-MM-dd HH:mm:ss")) # Create datetime column
return df_main
def write_main_dataframe(df):
"""
Writes the final dataframe to the local.
"""
df = df.toPandas()
df.to_csv("/home/train/data-generator/input/sensors.csv")
We can check localhost:8088
to see the resource usage (YARN) of the running jobs while the Spark script is running.
Written data:
NOTE: With this step, we have our data ready. You can see it as
sensors.csv
.
Creating the Kafka Topic
The script kafka_admin_client.py
under the folder kafka_admin_client
can be used to create a Kafka topic or print the already_exists
message if there is already a Kafka topic with that name.
from kafka.admin import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(
bootstrap_servers=['localhost:9092'], #This is the host and port Kafka runs.
client_id='kafka_admin_client'
)
topic_list = admin_client.list_topics()
def create_new_topic():
"""Checks if the topic office_input exists or not. If not, creates the topic."""
try:
admin_client.create_topics(new_topics=[NewTopic('office_input', 1, 1)])
return "Topic office_input successfully created"
except:
return "Topic office_input already exists"
We can check if the topic has been created as follows:
kafka-topics.sh --bootstrap-server localhost:9092 --list
Running data-generator
Instructions on how to install the data generator can be found here
We can directly run the data-generator script by running data-generator.sh
. We should use the location of the data generator. The data in the CSV (we created the CSV file from the initial data) file will be sent row by row to the Kafka topic we created earlier.
#!/usr/bin/env bash
set -e
source /home/train/data-generator/datagen/bin/activate
python /home/train/data-generator/dataframe_to_kafka.py -i /home/train/data-generator/input/sensors.csv -t office_input -rst 2
Streaming data example:
Up to this point, we have obtained the remote data from the GitHub repo. Download the zip file, extract it, and read it either with Pandas or PySpark script. We modified the data using these scripts, created a single data frame from them, and saved it as a CSV file. Then, we created a Kafka topic and sent the CSV data row by row to the topic created.