Development and Deployment of a Big Data Infrastructure leveraging Apache Kafka, Apache NiFi, Apache Hive, Spark Streaming, and Spark ML

Ahmed Laaziz
9 min readDec 26, 2023

--

Figure 1 : Project architecture

I. Project Objective

Our project aims to establish a system for dynamic processing and real-time prediction of data extracted from major cryptocurrency websites. The primary goal is to create a robust and scalable system capable of continuously processing data, performing periodic (weekly) self-training of multiple machine learning models, and conducting daily self-evaluation to identify the model offering the most accurate predictions.

II. Prerequisites for the Project — Installation and Configuration

This project requires the prior installation and configuration of multiple tools and environments. Follow these steps carefully to ensure a smooth progression of the project.

1. Development Environment

  • Operating System: Ensure that you have an operating system compatible with the necessary tools (Example: Here, we’re using Windows)
  • RAM: Make sure you have a RAM memory of at least 13 GB. Docker: Download and install Docker by following the specific instructions for your operating system: link to Docker.
  • Verify the installation with the command: docker — version Docker Compose: Download and install Docker Compose by following the specific instructions for your operating system: link to Docker Compose
  • Verify the installation with the command: docker-compose -version

2. Configuration of the Dockerized Environment

- Creating Containers

Create a docker-compose.yml file describing the necessary services for this application. Refer to the docker-compose.yml file in the project folder.

- Running the Containers

To start the containers defined in your docker-compose.yml file and run them in the background,use the following command:

docker-compose up -d

Once the containers are running, you can list the IDs of the running containers and their associated ports using the command: docker ps

Figure 2.1 the IDs of the running containers and their associated ports

To access the services in a web browser, use the following URLs:

· Apache Zeppelin : http://localhost: 8082

· Apache Nifi : http://localhost: 9999

· Confluent : http://localhost:9021

· Apache Spark : http://localhost:8080

· Apache Airflow : http://localhost:3000

· Namenode : http://localhost: 9870

Ensure that the services are properly started and that the ports specified in your docker-compose.yml file are not being used by other applications on your system.

- Running a Specific Container

To access the shell of a specific container in bash mode, use the following command, replacing ‘container_id’ with the desired container’s identifier: docker exec -it container_id /bin/bash

Figure 2.2 Running a Specific Container

III. Data Collection and Processing

1. Data Extraction and Ingestion

For our project, we will use BeautifulSoup, a Python library, to retrieve data from the site https://crypto.com/price every 5 minutes. Code:

To access the Zeppelin graphical interface, click on this link: http://localhost:8082. Once there, create a new notebook. Then, copy the code from the file named ‘kafka_stream’ to extract and ingest the data with Kafka.

Please note that the automatic execution of the code will be done via Apache Airflow, except for the ‘sparkStream.ipynb’ notebook, which will need to be launched manually.

Figure 2.3 Data extraction

2. Data Validation with Apache NiFi

Apache NiFi ensures real-time data validation by applying filters, rules, and quality controls to ensure the accuracy, security, and consistency of data passing through the flows, enabling reliable processing that aligns with defined requirements.

To validate the data coming from the Kafka topic, you need to: · Access the graphical interface of Apache NiFi at the following link: http://localhost:9999

· Drag and drop a processor of type ‘ConsumeKafka_2_0’.

· Right-click to configure your processor.

· Configure your processor as follows.

· Once you’ve clicked on “APPLY,” proceed to drag another processor of type ‘ValidateRecord.

· Click on ADD, then proceed to configure the processor.

· Click on APPLY, then create a connection between the two processors.

· Click on ADD.

· If an error occurred while establishing the connection, please navigate to the configuration button.

· Enable the JsonRecordSetWriter and the JsonTreeReader.

· After successfully establishing the connection, drag a processor of type LogAttribute.

· Configure it as follows.

·Click on APPLY and duplicate it.

· Then, connect both processors with the previous processor.

  • Same process for the second processor.

· Your processing schema should look like this.

· Start your validation pipeline.

3. Creation of Required Tables

To access the shell of a specific container in bash mode, use the following command, replacing ‘container_id’ with the desired container’s identifier:

docker exec -it id_container /bin/bash

Afterwards, execute the following command to create the ‘crypto_data1’ table, which will store the data coming from the Kafka topic:

CREATE TABLE IF NOT EXISTS crypto_data1 ( Name STRING,

Price STRING, 24H_CHANGE STRING,

24H_VOLUME STRING,

Market_Cap STRING, Datetime STRING

)

STORED AS Parquet;

4. Processing and Storing Data with Spark Streaming

To perform the processing, import the notebook named ‘sparkStream.ipynb’.

IV. Training and Evaluation of Multiple Machine Learning Models

a. Implementation

Before proceeding with model training and evaluation, please create the following tables using the following commands:

CREATE TABLE models_infos_table ( name STRING,

training_date STRING)

ROW FORMAT SERDE ‘org.apache.hive.hcatalog.data.JsonSerDe’;

_____

CREATE TABLE models_testing_infos_table ( rmse STRING,

testing_date STRING)

ROW FORMAT SERDE ‘org.apache.hive.hcatalog.data.JsonSerDe’;

— The first table, Table1, will store details about the top models saved each week in the HDFS file system.

— The second table, Table2, will record the performance of the best models trained each week.

b. Machine learning code

You’ll find the machine learning processing code in the shared folder named machine_learning.ipynp. Please import it into Zeppelin.

c- Model Evaluation

Evaluating a machine learning model daily ensures its reliability and ongoing adaptation. This practice monitors the model’s performance, swiftly detects any decline in its predictions, and promptly adjusts parameters in case of data changes or anomalies. It fosters continuous improvement, ensuring the model remains accurate and effective in daily application.

You’ll find the code for the continuous evaluation of models in the shared folder named ‘real_time_predictions.ipynp’; import it into Zeppelin.

V. Scheduling with Apache Airflow

Apache Airflow is a powerful tool for scheduling and automating workflows. It allows defining, scheduling, and managing complex data pipelines by orchestrating tasks and managing their dependencies. Using Directed Acyclic Graphs (DAGs), Airflow specifies the order and relationships between different tasks, enabling sequential or parallel execution.

It also provides features to monitor task progress, handle errors and retries, while offering a comprehensive overview of workflow status and performance. Airflow comes with a user interface allowing you to view DAGs and their tasks, trigger DAG runs, display logs, and perform limited debugging and issue resolution with your DAGs.

To access the interface, use the following credentials:

  • Username: admin
  • Password: admin

To automate your pipeline:

  • Use your preferred code editor (such as VSCode, PyCharm, etc.).
  • Create a new folder named “dags” at the location where you want to store your DAGs.
  • Inside the “dags” folder, create three Python files for your DAGs.
  • In these files, write the code describing the different stages of the process.
  • You’ll find the code in the files (dag1.py, dag2.py, dag3.py) in the project directory. Copy and paste the code into your files and perform the necessary installations.
  • Install all required packages using pip or, if you’re using PyCharm, through the interpreter.
  • In the DAG code, modify the notebook addresses according to their IDs.

Activate your DAGs in the following order:

1. « Extract data DAG »

2- « Weekly machine learning training dag »

3- « daily machine learning evaluation dag »

To verify the proper functioning of the code, use commands.

HQL : (Exemple) select * from <table name>

VI. Visualization with Streamlit:

Streamlit is a powerful Python library that significantly simplifies the process of creating interactive web applications for data visualization. Designed to be user-friendly and efficient, Streamlit enables developers to quickly generate dynamic dashboards without delving into complex coding details.

In this section, we’ll explore how to integrate Streamlit into your data pipelines and leverage its features to create interactive user interfaces. We’ll look into presenting your data attractively while providing users the ability to explore and interact with the results.

To visualize real-time streaming data:

  • Use your preferred code editor (such as VSCode, PyCharm, etc.).
  • Open the Visual_Stream project.
  • Execute the .py files in the following order:
  1. Le premier fichier scrapper.py qui est le responsable d’extraction des données depuis la table des crypto-currencies à traver BeautifulSoup et Sellenium :

2. The second file, server.py, will allow us to render the extracted data as an API using FastAPI.

3. The files writer.py and overwriter.py will be used to store the data in CSV files that we’ll utilize in the visualization.

  • Writer.py :
  • Overwriter.py :

· Create an app.py file for the Streamlit interface:

  • In the app.py file, we need to define the following components:

In the Streamlit application session, we’ll store session information:

A function read_crypto_data to read data from CSV files:

The Streamlit sidebar component

The Streamlit container component:

Now that we’ve created our Streamlit application, we can launch this application using the command:

streamlit run app.py

  • The first page is dedicated to presenting the real-time ranking of cryptocurrencies.
  • The second page is for tracking the price evolution of different cryptocurrencies.

Conclusion

Through the integration of robust data processing, machine learning, and interactive visualization tools like Apache Kafka, Apache NiFi, Spark, Streamlit, and FastAPI, this project achieves a comprehensive ecosystem for real-time cryptocurrency data management. It establishes an efficient workflow from data ingestion to visualization, enabling dynamic insights into cryptocurrency rankings and price trends. This seamless integration of technologies empowers users to explore and comprehend real-time data effortlessly, fostering informed decision-making in the ever-evolving landscape of cryptocurrency markets.

Authors:

  • LAAZIZ Ahmed
  • MHANI Mohamed Amine

--

--