Data Engineering End-to-End Project — Part 1 — Spark, Kafka, Elasticsearch, Kibana, MinIO, Docker, Airflow, Hadoop YARN, HDFS, Zookeeper, Pandas

Dogukan Ulu
8 min readNov 28, 2023

--

Tech Stack

  • Apache Airflow
  • Apache Zookeeper
  • Apache Kafka
  • Apache Hadoop HDFS
  • Apache Spark (PySpark)
  • Apache Hadoop YARN
  • Elasticsearch
  • Kibana
  • MinIO
  • Docker
  • Python
  • SQL

Overview

  • This project will consist of 2 main parts. This article will focus on the first part.
  • Take a compressed data source from a URL
  • Process the raw data either with PySpark or Pandas and use HDFS as the file storage, check resources with Apache Hadoop YARN.
  • Use a data generator to simulate streaming data, and send the data to Apache Kafka (producer).
  • Read the streaming data from the Kafka topic (consumer) using PySpark (Spark Structured Streaming).
  • Write the streaming data to Elasticsearch, and visualize it using Kibana.
  • Write the streaming data to MinIO (AWS Object Storage).
  • Use Apache Airflow to orchestrate the whole data pipeline.
  • Use Docker to containerize Elasticsearch, Kibana, and MinIO.

Steps of the Project

  • We should have Apache Kafka, Apache Spark, and Apache Hadoop installed locally. Elasticsearch, Kibana, and MinIO can be used via docker-compose.yaml.
  • All steps of the data pipeline can be seen via Airflow DAG. They are all explained here as well.
  • All scripts were written according to my local file/folder locations. But all the mentioned scripts can be found in the repo.

You may see all localhost ports we will be using during the project below.

Elasticsearch -> localhost:5601
Airflow -> localhost:1502
MinIO -> localhost:9001
Spark Jobs -> localhost:4040
pgAdmin -> localhost:5050
Kafka -> localhost:9092
Hadoop Namenode -> localhost:9870
Hadoop YARN -> localhost:8088/cluster
Hadoop HDFS -> localhost:9000

Start All Services

To make Apache Airflow, Docker, Apache Hadoop, Apache Kafka, and Apache Zookeeper available, we should run the following commands (This step may differ on how we installed these locally):

sudo systemctl start docker
sudo systemctl start airflow
sudo systemctl start airflow-scheduler
sudo systemctl start zookeeper
sudo systemctl start kafka
start-all.sh
cd /<location_of_docker_compose.yaml>/ && docker-compose up -d

You can access docker-compose file as follows:

Download the Data

Before starting to work with the data and run the scripts, we should first download the data locally with the following command.

wget -O <your_local_directory>/sensors.zip https://github.com/dogukannulu/datasets/raw/master/sensors_instrumented_in_an_office_building_dataset.zip

This zip file contains a folder named KETI. Each folder inside this main folder represents a room number. Each room contains five csv files, and each represents a property belonging to these rooms. These properties are:

  • CO2
  • Humidity
  • Light
  • Temperature
  • PIR (Passive Infrared Sensor Data)

Each CSV also includes a timestamp column.

Unzip the Downloaded Data and Remove README.txt

We should then unzip this data via the following command:

unzip <location_of_zip_file>/sensors.zip -d /<desired_location_of_unzipped_folder/

Then, we have to remove README.txt since the algorithm of the Spark script requires only folders under KETI, not files:

rm /<location_of_KETI>/KETI/README.txt

Put Data into HDFS

KETI folder is now installed to our local successfully. Since PySpark gets the data from HDFS, we should put the local folder in HDFS as well using the following command:

hdfs dfs -put /<location_of_KETI>/KETI/ /<desired_location_to_put_KETI>/

We can browse for the HDFS location we put the data in via localhost:9000

NOTE: The Spark and Airflow scripts are running inside a virtualenv. The purpose of doing this is not to have a library-related issue while running these. The related libraries can be installed globally as well.

Read and Write Initial Data with Pandas Script

Both read_and_write_pandas.py and read_and_write_spark.py can be used to modify the initial data. They both do the same job.

All the methods and operations are described with comments and docstrings in the scripts. I will list all the code snippets here as well.

First, we will start with the pandas. We have to first import necessary libraries and packages and define global variables.

import os
import pandas as pd
from functools import reduce

directory = '/home/train/datasets/KETI'
dataframes = {}
dataframes_room = {}
columns = ['co2', 'humidity', 'light', 'pir', 'temperature']

Since the data is located locally as separate CSV files, we have to first create separate pandas data frames out of them.

def create_separate_dataframes() -> dict:
"""
Creates a dictionary that includes room numbers as keys and dataframes per room as values
"""
count2 = 0
for filename in os.listdir(directory):
new_directory = directory + '/' + filename
count = 0
for new_files in os.listdir(new_directory):
f = os.path.join(new_directory, new_files)
my_path = new_directory.split('/')[-1] + '_' + new_files.split('.')[0] # e.g. 656_co2
dataframes[my_path] = pd.read_csv(f, names=['ts_min_bignt', columns[count]]) # Sample key: 656_co2. Dataframe columns: ts_min_bignt, co2
count += 1
count2 += 1

# Below creates dataframe per room
dataframes_room[filename] = reduce(lambda left, right:
pd.merge(left, right, on='ts_min_bignt', how='inner'),
[dataframes[f'{filename}_co2'], dataframes[f'{filename}_humidity'], \
dataframes[f'{filename}_light'], dataframes[f'{filename}_pir'],\
dataframes[f'{filename}_temperature']])
dataframes_room[filename]['room'] = filename # adds room number as column.

return dataframes_room

Since we will be using a data generator and it will send the data row by row to the Kafka topic, we should have a single pandas data frame. Below, a single main data frame is created from many separate data frames and saved it as a single CSV file.

def create_main_dataframe(separate_dataframes:dict):
"""
Concats all per-room dataframes vertically. Creates final dataframe.
"""
dataframes_to_concat = []

for i in separate_dataframes.values():
dataframes_to_concat.append(i)

df = pd.concat(dataframes_to_concat, ignore_index=True)
df = df.sort_values('ts_min_bignt') # All data is sorted according to ts_min_bignt. We want it to stream according to timestamp.

df.dropna(inplace=True)
df["event_ts_min"] = pd.to_datetime(df["ts_min_bignt"], unit='s') # Create datetime column
return df


def write_main_dataframe(df):
"""
Writes the final dataframe to the local.
"""
df.to_csv('/home/train/data-generator/input/sensors.csv', index=False)

We can check localhost:8088 to see the resource usage (YARN) of the running jobs while the Spark script is running.

Written data:

NOTE: With this step, we have our data ready. You can see it as sensors.csv.

Read and Write Initial Data with PySpark Script

We are going to continue with PySpark. All the steps target the same process, We have to first import the necessary libraries and packages and create a Spark session.

import os
import findspark
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

findspark.init("/opt/manual/spark") # This is where local spark is installed
spark = SparkSession.builder \
.appName("Spark Read Write") \
.master("local[2]") \
.getOrCreate()

Since the data is located locally as separate CSV files, we have to first create separate data frames out of them.

def create_separate_dataframes() -> dict:
"""
Creates a dictionary that includes room numbers as keys and dataframes per room as values
"""
dataframes = {} # This dict saves property dataframe per room as values.
directory = '/home/train/datasets/KETI'
dataframes_room = {} # This dict saves dataframes per room.
columns = ['co2', 'humidity', 'light', 'pir', 'temperature']
count2 = 0
for filename in os.listdir(directory): # loop through the folders under KETI
new_directory = directory + '/' + filename # e.g. /home/train/datasets/KETI/656
count = 0
for new_files in os.listdir(new_directory): # loop through the files under each room folder
f = os.path.join(new_directory, new_files) # e.g. /home/train/datasets/KETI/656/co2.csv
f_hdfs = f.replace('home', 'user') # e.g. /user/train/datasets/KETI/656/co2.csv
my_path = filename + '_' + new_files.split('.')[0] # e.g. 656_co2
dataframes[my_path] = spark.read.csv(f'{f_hdfs}')

dataframes[my_path] = dataframes[my_path].toDF('ts_min_bignt', columns[count]) # Sample key: 656_co2. Dataframe columns: ts_min_bignt, co2
count += 1
count2 += 1

dataframes[f'{filename}_co2'].createOrReplaceTempView('df_co2')
dataframes[f'{filename}_humidity'].createOrReplaceTempView('df_humidity')
dataframes[f'{filename}_light'].createOrReplaceTempView('df_light')
dataframes[f'{filename}_pir'].createOrReplaceTempView('df_pir')
dataframes[f'{filename}_temperature'].createOrReplaceTempView('df_temperature')

# Below sql joins on ts_min_bignt and creates the dataframe per room
dataframes_room[filename] = spark.sql('''
select
df_co2.*,
df_humidity.humidity,
df_light.light,
df_pir.pir,
df_temperature.temperature
from df_co2
inner join df_humidity
on df_co2.ts_min_bignt = df_humidity.ts_min_bignt
inner join df_light
on df_humidity.ts_min_bignt = df_light.ts_min_bignt
inner join df_pir
on df_light.ts_min_bignt = df_pir.ts_min_bignt
inner join df_temperature
on df_pir.ts_min_bignt = df_temperature.ts_min_bignt
''')

dataframes_room[filename] = dataframes_room[filename].withColumn("room", F.lit(filename))
return dataframes_room

Since we will be using a data generator and it will send the data row by row to the Kafka topic, we should have a single data frame. Below, a single main data frame is created from many separate data frames and saved it as a single CSV file.

def unionAll(dfs):
"""
Merges multiple dataframes vertically.
"""
return reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs)


def create_main_dataframe(separate_dataframes:dict):
"""
Merges all per-room dataframes vertically. Creates final dataframe.
"""
dataframes_to_concat = []
for i in separate_dataframes.values():
dataframes_to_concat.append(i)

df = reduce(DataFrame.unionAll, dataframes_to_concat)
df = df.sort(F.col("ts_min_bignt")) # All data is sorted according to ts_min_bignt. We want it to stream according to timestamp.

df = df.dropna()

df_main = df.withColumn("event_ts_min", F.from_unixtime(F.col("ts_min_bignt")).cast(DateType()))
df_main = df_main.withColumn("event_ts_min", F.date_format(F.col("event_ts_min"), "yyyy-MM-dd HH:mm:ss")) # Create datetime column

return df_main


def write_main_dataframe(df):
"""
Writes the final dataframe to the local.
"""
df = df.toPandas()
df.to_csv("/home/train/data-generator/input/sensors.csv")

We can check localhost:8088 to see the resource usage (YARN) of the running jobs while the Spark script is running.

Written data:

NOTE: With this step, we have our data ready. You can see it as sensors.csv.

Creating the Kafka Topic

The script kafka_admin_client.py under the folder kafka_admin_client can be used to create a Kafka topic or print the already_exists message if there is already a Kafka topic with that name.

from kafka.admin import KafkaAdminClient, NewTopic

admin_client = KafkaAdminClient(
bootstrap_servers=['localhost:9092'], #This is the host and port Kafka runs.
client_id='kafka_admin_client'
)
topic_list = admin_client.list_topics()
def create_new_topic():
"""Checks if the topic office_input exists or not. If not, creates the topic."""
try:
admin_client.create_topics(new_topics=[NewTopic('office_input', 1, 1)])
return "Topic office_input successfully created"
except:
return "Topic office_input already exists"

We can check if the topic has been created as follows:

kafka-topics.sh --bootstrap-server localhost:9092 --list

Running data-generator

Instructions on how to install the data generator can be found here

We can directly run the data-generator script by running data-generator.sh. We should use the location of the data generator. The data in the CSV (we created the CSV file from the initial data) file will be sent row by row to the Kafka topic we created earlier.

#!/usr/bin/env bash

set -e
source /home/train/data-generator/datagen/bin/activate
python /home/train/data-generator/dataframe_to_kafka.py -i /home/train/data-generator/input/sensors.csv -t office_input -rst 2

Streaming data example:

Up to this point, we have obtained the remote data from the GitHub repo. Download the zip file, extract it, and read it either with Pandas or PySpark script. We modified the data using these scripts, created a single data frame from them, and saved it as a CSV file. Then, we created a Kafka topic and sent the CSV data row by row to the topic created.

Please reach out via Linkedin and GitHub, all comments are appreciated 🕺

--

--