End-to-End Data Engineer DATA LAKE Project (Scala Spark 3.5.1 — Airflow — HDFS — Trino— 12 Containers)

M. Cagri AKTAS
16 min readJun 15, 2024

--

  1. Our Senerio
  2. Containers
  3. Airflow and Spark-master ssh connection
  4. Scala Apache Spark Scrips
  5. SUMMARY

Why do we need to deploy this project

Because in this century data is everything, if you want to plan for the future in your organization, you need to know everything about it. Who is working efficiently and who is not? What are our customers satisfied with? You can add a lot of questions. In this case, if you can make sense of your data, you can see your future.

So, in this case, we need to create our data strategy. As I said, if we can make sense of our data, it becomes valuable. If we want to make sense of data, we need to analyze it. This is where Data Engineering comes in. We need to deploy a data storage strategy in our organization and in the end, analyze and check the results. This is what we call a DATA LAKE.

What’s DataLake

A data lake is a place where you keep all of your organization’s data, both structured and unstructured. It uses a simple system that lets you store raw data without needing to organize it first. Instead of setting up rules and formats for your data in advance, you use tools to tag and identify data. This way, you can search and analyze only the data you need for a specific business question. This analysis can include real-time analytics, big data analytics, machine learning, dashboards, and data visualizations to help you find insights and make better decisions. Data teams can build ETL pipelines and transformations to make data in a data lake ready for data science, machine learning, analytics, and business intelligence tools. Managed data lake creation tools help you avoid the problems of slow, hand-coded scripts and limited engineering resources.

https://www.qlik.com/us/data-lake

1. Our Senerio

Let’s create a scenario for our data lake. Firstly, we have a logistics company, and we want to optimize our main storage. We aim to set up a system so that the warehouse operates efficiently. Therefore, I have created three tables: users table, humanresource table, and a documentation table called texts table.

PS: You can find full files in my github: https://github.com/mcagriaktas/End-to-End-Data-Lake-Project

# I created random sample data using Python. 
# You can find everything in my GitHub repository.

docker exec -it mariadb bash
mysql -u admin -p
password: admin
MariaDB [test_db]> select * from users limit 5;
+------+-----------------+-----------+-------------+---------------------+
| id | signal_strength | frequency | location | timestamp |
+------+-----------------+-----------+-------------+---------------------+
| 78 | 29 | 2400 | Location_80 | 2023-08-13 07:12:39 |
| 87 | 28 | 2490 | Location_76 | 2023-07-18 09:23:17 |
| 76 | 5 | 2483 | Location_85 | 2023-07-29 16:22:10 |
| 43 | 48 | 2460 | Location_65 | 2024-02-13 17:52:11 |
| 83 | 72 | 2462 | Location_22 | 2023-09-14 04:18:50 |
+------+-----------------+-----------+-------------+---------------------+

MariaDB [test_db]> select * from humanresource limit 5;
+---------+------------+------------+-----------+------------+
| user_id | department | position | salary | hire_date |
+---------+------------+------------+-----------+------------+
| 9 | HR | Manager | 47083.98 | 2004-06-21 |
| 49 | Finance | Analyst | 141352.96 | 2011-06-08 |
| 51 | HR | Consultant | 137997.17 | 2008-01-21 |
| 16 | Sales | Developer | 68564.00 | 2017-08-18 |
| 91 | Finance | Analyst | 75175.91 | 2008-06-22 |
+---------+------------+------------+-----------+------------+

# I'M USING MONGODN IN TRINO OR MONGO COMPASS, I CAN GIVE A ADVISE FOR THAT :D
in mongodb test_db => texts collection;
user_id, content, timestamp
41 Lorem dolor amet consectetur Lorem dolor adipiscing adipiscing Lorem consectetur 2024-05-22 09:07:54.792
25 ipsum elit sit Lorem adipiscing sit ipsum consectetur amet consectetur 2024-02-09 05:58:36.792
55 consectetur dolor elit amet consectetur sit Lorem ipsum Lorem amet 2024-01-09 21:16:38.792

2. Containers

I used seven different tools in this scenario, and the worker node count for this container came up to 11.

Firstly, we need to deploy our database because our human resources team wants to add papers to digital documents, and our user sensors are not coming in.

PS: I defined IP addresses for all containers because, during testing, the IP addresses kept changing, so I needed to fix them. However, this required changing some configuration files, which I handled. No problem. :) Also, if you’re unfamiliar with Docker, you can check my other article.

  1. https://medium.com/@mucagriaktas/deploying-postgresql-and-pgadmin4-with-docker-on-centos-7-a-step-by-step-guide-6133c7ff530f
  2. https://medium.com/@mucagriaktas/deploying-mini-project-with-docker-compose-on-centos-7-a-step-by-step-guide-23bd36964a6e

2.1. DATABASES:

2.1.1. Mariadb:

MariaDB is an open-source relational database that offers improved performance, enhanced security features, and better scalability compared to MySQL. It provides more storage engine options and maintains compatibility as a drop-in replacement for MySQL.

# You can find the entire line in the docker-compose.yaml 
# file in my GitHub repository.

mariadb:
container_name: mariadb
image: mariadb:10.5.8
ports:
- 3308:3306
environment:
MYSQL_ROOT_PASSWORD: admin
MYSQL_USER: admin
MYSQL_PASSWORD: admin
MYSQL_DATABASE: test_db
volumes:
- ./data/mariadb_data:/var/lib/mysql
networks:
dahbest:
aliases:
- mariadb
ipv4_address: 172.80.0.100

2.2.1. mongodb:

MongoDB is a NoSQL database that offers flexibility and scalability. It stores data in JSON-like documents, making it easy to handle unstructured data. MongoDB allows for high performance with fast read and write operations and is horizontally scalable, supporting large-scale, distributed applications. It also provides robust querying and indexing capabilities.

MONGODB ❤ ❤ ❤

# You can find the entire line in the docker-compose.yaml 
# file in my GitHub repository.

mongodb:
container_name: mongodb
hostname: mongodb
image: bitnami/mongodb:7.0.2
ports:
- "27017:27017"
environment:
MONGODB_ROOT_USER: admin
MONGODB_ROOT_PASSWORD: admin
MONGODB_USERNAME: cagri
MONGODB_PASSWORD: 3541
MONGODB_DATABASE: test_db
MONGODB_DISABLE_ENFORCE_AUTH: "true"
volumes:
- ./data/mongodb_data:/bitnami/mongodb
networks:
dahbest:
aliases:
- mongodb
ipv4_address: 172.80.0.101

3.3.1. HDFS (Hadoop Distributed File System):

I use HDFS because it runs really fast with Apache Spark. So, our main storage is HDFS. You can also read my article on Apache Spark and Hadoop.

  1. https://medium.com/@mucagriaktas/apache-hadoop-hdfs-yarn-mapreduce-and-apache-hive-echosystem-64260f58309f
  2. https://medium.com/@mucagriaktas/apache-spark-hadoop-apache-spark-and-parquet-orc-format-d352bf95833
# You can find the entire line in the docker-compose.yaml 
# file in my GitHub repository.

namenode:
image: bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8
container_name: namenode
hostname: namenode
environment:
- CLUSTER_NAME=test
- CORE_CONF_fs_defaultFS=hdfs://namenode:8020
- HDFS_CONF_dfs_replication=1
- HDFS_CONF_dfs_namenode_datanode_registration_ip_hostname_check=false
ports:
- "9870:9870"
- "9000:9000"
volumes:
- ./data/hadoop_namenode:/hadoop/dfs/name
- ./hdfs/hadoop/conf:/etc/hadoop/conf
networks:
dahbest:
aliases:
- namenode
ipv4_address: 172.80.0.102

datanode:
image: bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8
container_name: datanode
hostname: datanode
environment:
- CLUSTER_NAME=test
- NAMENODE_HOST=namenode
- CORE_CONF_fs_defaultFS=hdfs://namenode:8020
- HDFS_CONF_dfs_datanode_data_dir=file:///hadoop/dfs/data
ports:
- "9864:9864"
volumes:
- ./data/hadoop_datanode:/hadoop/dfs/data
- ./hdfs/hadoop/conf:/etc/hadoop/conf
networks:
dahbest:
aliases:
- datanode

3.1.2. How to use and managemt hdfs:

Firstly, check if your NameNode can access your DataNode. You need to see the DataNode IP address on the second line. I set a config file in our volumes, which you’ll find in the project files.

You can use HDFS like the Ubuntu CLI:

# CREATE FOLDER IN HDFS
hdfs dfs -mkidr /data

# GIVE ALL USERS FOR PERMISIND
hdfs dfs -chmod 777 /data

Also you can check your folder and datanode connection:

# YOU CAN USE -ls FOR SHOW THE HDFS PATH:
root@namenode:/# hdfs dfs -ls /
Found 1 items
drwxrwxrwx - root supergroup 0 2024-06-11 18:49 /data

# In this scenario, you can typically remove a folder using:
root@namenode:/# hdfs dfs -rm -r -f
# CHECK THE NODES:
hdfs dfsadmin -report

Configured Capacity: 102301270016 (95.28 GB)
Present Capacity: 2644082688 (2.46 GB)
...
...
-------------------------------------------------
Live datanodes (1):

Name: 172.80.0.4:9866 (datanode.dahbest)
Hostname: datanode
Decommission Status : Normal
...
...

PS: I select PostgreSQL use it for the Airflow database.

2.2. Apache Spark

You can check the GitHub repository for our Spark Dockerfile and configuration files. I set up volumes and added Python and Scala scripts (.sh) for easy use; otherwise, we would always need to enter the container to start our scripts. These are located in the (/spark/submitfiles) folder. I also added a script (.sh) to download your JAR files since we'll use JARs to connect to our databases. If you want to read more about Apache Spark, you can check my other article.

PS: When you bring up your containers, you can use

--scale=2

For the worker nodes, I fixed all IPs by manually defining them in the (docker-compose.yaml) file.

  spark-master:
container_name: spark-master
deploy:
resources:
limits:
cpus: '4'
memory: "6g"
build:
context: ./sparkMaster/spark/master
dockerfile: Dockerfile
ports:
- "4040:4040"
- "7077:7077"
- "18080:18080"
- "22:22"
- "5050:8080"
volumes:
- ./sparkMaster/submitfiles:/opt/submitfiles
networks:
dahbest:
aliases:
- spark-master
ipv4_address: 172.80.0.105

spark-worker-1:
container_name: spark-worker-1
deploy:
resources:
limits:
cpus: '4'
memory: "6g"
build:
context: ./sparkMaster/spark/worker
dockerfile: Dockerfile
depends_on:
- spark-master
ports:
- "8081-8089:8081"
volumes:
- ./sparkMaster/submitfiles:/opt/submitFiles
networks:
dahbest:
aliases:
- spark-worker-1
ipv4_address: 172.80.0.110

spark-worker-2:
container_name: spark-worker-2
deploy:
resources:
limits:
cpus: '4'
memory: "6g"
build:
context: ./sparkMaster/spark/worker
dockerfile: Dockerfile
depends_on:
- spark-master
ports:
- "8081-8089:8081"
volumes:
- ./sparkMaster/submitfiles:/opt/submitFiles
networks:
dahbest:
aliases:
- spark-worker-2
ipv4_address: 172.80.0.111

We’ll always use our script in the (sparkMaster => submitfiles) path. I attached our Apache Spark architecture. I added a (download_jar.sh) script, which can help you download a JAR file from MEAVEN Additionally, I defined (scala-spark-submit.sh) and (spark-submit.sh). These scripts will help you start your Python and Scala scripts in the Apache Spark container. Actually, you can use them locally too, but I just wanted to show you how to start them in the container.

For JARs, I built .jar files for our Airflow.

PS: You can enter spark UI, localhost:5050

2.3. Trino

I think Trino is the most powerful tool for big data. :D You can use different databases in a single SQL query, and you can even use MongoDB with SQL queries. Actually, I don’t like to use NoSQL queries, so Trino is really helpful for us. We’ll use Trino for our analysis step.

I set up a lot of volume paths because we’ll use this configuration file. I also thought about adding Snowflake, but I prefer that this whole project remain open-source. However, I can provide files for you to create a free trial account and try to learn Snowflake. Snowflake offers $400 for the first month.

  coordinator:
container_name: mn-coordinator
image: 'trinodb/trino:422'
ports:
- "9091:8080"
volumes:
- ./trino/coordinator:/etc/trino
- ./hdfs/hadoop/conf/core-site.xml:/etc/hadoop/conf/core-site.xml:ro
- ./hdfs/hadoop/conf/hdfs-site.xml:/etc/hadoop/conf/hdfs-site.xml:ro
- ./trino/coordinator/password_authenticator.properties:/etc/trino/password_authenticator.properties:ro
- ./trino/coordinator/password.db:/etc/trino/password.db:ro
#- ./trino/coordinator/plugin/snowflake:/usr/lib/trino/plugin/snowflake
networks:
dahbest:
aliases:
- coordinator

worker1:
container_name: mn-worker1
image: 'trinodb/trino:422'
ports:
- "8081:8081"
volumes:
- ./trino/worker:/etc/trino
- ./hdfs/hadoop/conf/core-site.xml:/etc/hadoop/conf/core-site.xml:ro
- ./hdfs/hadoop/conf/hdfs-site.xml:/etc/hadoop/conf/hdfs-site.xml:ro
- ./trino/worker/password_authenticator.properties:/etc/trino/password_authenticator.properties:ro
- ./trino/worker/password.db:/etc/trino/password.db:ro
#- ./trino/worker/plugin/snowflake:/usr/lib/trino/plugin/snowflake
networks:
dahbest:
aliases:
- worker1

2.3.1. Trino Configuration:

By the way, this Trino architecture build is mine. I spent a lot of time on it.

If you want to add a new database, you need to put the your propertis files in (tools_name.properties) file in coordinator => catalog and worker => catalog.

Actually, you don’t need to deploy all files for configuration. You just need to focus on building (config.properties) and (jvm.config) mainly.

config.properties:

coordinator=true
node-scheduler.include-coordinator=false
http-server.http.port=8080
discovery.uri=http://coordinator:8080

jvm.config:

-server
-Xms2G
-Xmx2G
-XX:InitialRAMPercentage=80
-XX:MaxRAMPercentage=80
-XX:G1HeapRegionSize=32M
-XX:+ExplicitGCInvokesConcurrent
-XX:+ExitOnOutOfMemoryError
-XX:+HeapDumpOnOutOfMemoryError
-XX:-OmitStackTraceInFastThrow
-XX:ReservedCodeCacheSize=512M
-XX:PerMethodRecompilationCutoff=10000
-XX:PerBytecodeRecompilationCutoff=10000
-Djdk.attach.allowAttachSelf=true
-Djdk.nio.maxCachedBufferSize=2000000
-XX:+UnlockDiagnosticVMOptions
-XX:+UseAESCTRIntrinsics

After that, we need to create our database properties files. We'll use MariaDB and MongoDB for our raw and clean data, so let's build it.

mariadb.properties:

connector.name=mariadb
# PS: DON'T FORGET IF YOUR CONTAINER IS NOT SAME NETWORK YOU NEED TO DEFINE
# mariadb TO mariadb CONTAINER'S IP ADDRESS!
connection-url=jdbc:mariadb://mariadb:3306
# PS: WE DEFINED USER AND PASSWORD IN docker-compose.yaml file.
connection-user=admin
connection-password=admin

mongodb.properties:

# WE DEFINED USER AND PASSWORD IN docker-compose.yaml file.
connector.name=mongodb
mongodb.connection-url=mongodb://admin:admin@mongodb:27017/

PS: If you want to add a new plugin (I mean tools), you can check (node.properties). Our plugin path is defined in the file.

node.environment=docker
node.data-dir=/data/trino
plugin.dir=/usr/lib/trino/plugin

DBeaver is indeed a great choice for database management. Its user-friendly interface makes it easy to work with various databases.

2.4. Airflow

Airflow can trigger our scripts because in real life, organizations, companies use millions or billions of data, so you need to start the scripts when the company is not working, just think like that, our bronze line is raw data then we need to append our raw data to HDFS, but we’re using our servers in shifts so we need to run out of shift time.

  airflow:
build:
context: ./airflow
dockerfile: Dockerfile
container_name: airflow
environment:
- AIRFLOW_DATABASE_NAME=airflowdb
- AIRFLOW_DATABASE_USERNAME=cagri
- AIRFLOW_DATABASE_PASSWORD=3541
- AIRFLOW_EXECUTOR=LocalExecutor
- AIRFLOW_USERNAME=admin
- AIRFLOW_PASSWORD=admin
- AIRFLOW_EMAIL=admin@example.com

ports:
- '8080:8080'
volumes:
- ./airflow/dags:/opt/bitnami/airflow/dags
- ./airflow/logs:/opt/bitnami/airflow/logs
- ./airflow/airflow.cfg:/opt/bitnami/airflow/airflow.cfg
- ./sparkMaster/submitfiles:/opt/submitfiles
depends_on:
- spark-master
networks:
dahbest:
aliases:
- airflow

postgresql:
image: docker.io/bitnami/postgresql:15
container_name: airflow-database
volumes:
- ./data/postgresql_data:/bitnami/postgresql
environment:
- POSTGRESQL_DATABASE=airflowdb
- POSTGRESQL_USERNAME=cagri
- POSTGRESQL_PASSWORD=3541
- ALLOW_EMPTY_PASSWORD=yes
networks:
dahbest:
aliases:
- postgresql

airflow-scheduler:
image: docker.io/bitnami/airflow-scheduler:2
container_name: airflow-scheduler
environment:
- AIRFLOW_DATABASE_NAME=airflowdb
- AIRFLOW_DATABASE_USERNAME=cagri
- AIRFLOW_DATABASE_PASSWORD=3541
- AIRFLOW_EXECUTOR=LocalExecutor
- AIRFLOW_WEBSERVER_HOST=airflow
volumes:
- ./airflow/dags:/opt/bitnami/airflow/dags
- ./airflow/logs:/opt/bitnami/airflow/logs
- ./airflow/airflow.cfg:/opt/bitnami/airflow/airflow.cfg
depends_on:
- postgresql
- airflow
networks:
dahbest:
aliases:
- airflow-scheduler

2.4.1 Airflow UI and folders:

I simply deploy dags and logs file, also you can check all config in bitnami’s dockerhub https://hub.docker.com/r/bitnami/airflow

When you put your dag file in dags folder, you can check the dags in localhost:8080, User and password = admin

2.4.2 Airflow dags folder

You need to create airflow dag for our trigger. You can find full code and project in my github repostry.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.operators.bash import BashOperator
from airflow.models import Variable

start_date = datetime(2024, 6, 7)

default_args = {
'owner': 'admin',
'start_date': start_date,
'retry_delay': timedelta(seconds=30),
}

# Connection Informatino
# MariaDB
maria_url = Variable.get("maria_url")
maria_port = Variable.get("maria_port")
maria_db = Variable.get("maria_db")
maria_user = Variable.get("maria_user")
maria_password = Variable.get("maria_password")

# MongoDB
mongo_url = Variable.get("mongo_url")
mongo_port = Variable.get("mongo_port")
mongo_db = Variable.get("mongo_db")
mongo_user = Variable.get("mongo_user")
mongo_password = Variable.get("mongo_password")

# HDFS
hdfsUrl = Variable.get("hdfsUrl")
hdfsPort = Variable.get("hdfsPort")

# Spark Information
spark_master_url = Variable.get("spark_master_url")
spark_master_port = Variable.get("spark_master_port")


with DAG('ETL', default_args=default_args, schedule_interval='@daily', catchup=False) as dag:

t0 = SSHOperator(
task_id="starting",
ssh_conn_id='spark_master_ssh',
cmd_timeout=None,
command="echo 'Starting the script'",
)

t1 = SSHOperator(
task_id="raw_mariadb",
ssh_conn_id='spark_master_ssh',
cmd_timeout=None,
command=f"""
/opt/spark/bin/spark-submit \
--class com.example.ETL_raw_mariadb \
--master spark://{spark_master_url}:{spark_master_port} \
/opt/submitfiles/ETL_raw_mariadb.jar \
{maria_url} {maria_port} {maria_db} {maria_user} {maria_password} {hdfsUrl} {hdfsPort} {spark_master_url} {spark_master_port}
"""
)

t2 = SSHOperator(
task_id="raw_mongodb",
ssh_conn_id='spark_master_ssh',
cmd_timeout=None,
command=f"""
/opt/spark/bin/spark-submit \
--class com.example.ETL_raw_mongodb \
--master spark://{spark_master_url}:{spark_master_port} \
--packages io.delta:delta-core_2.12:2.4.0 \
/opt/submitfiles/ETL_raw_mongodb.jar \
{mongo_url} {mongo_port} {mongo_db} {mongo_user} {mongo_password} {hdfsUrl} {hdfsPort} {spark_master_url} {spark_master_port}
"""
)

t3 = SSHOperator(
task_id="clean_mariadb",
ssh_conn_id='spark_master_ssh',
cmd_timeout=None,
command=f"""
/opt/spark/bin/spark-submit \
--class com.example.ETL_clean_mariadb \
--master spark://{spark_master_url}:{spark_master_port} \
/opt/submitfiles/ETL_clean_mariadb.jar \
{maria_url} {maria_port} {maria_db} {maria_user} {maria_password} {hdfsUrl} {hdfsPort} {spark_master_url} {spark_master_port}
"""
)

t4 = SSHOperator(
task_id="clean_mongodb",
ssh_conn_id='spark_master_ssh',
cmd_timeout=None,
command=f"""
/opt/spark/bin/spark-submit \
--class com.example.ETL_clean_mongodb \
--master spark://{spark_master_url}:{spark_master_port} \
--packages io.delta:delta-core_2.12:2.4.0 \
/opt/submitfiles/ETL_clean_mongodb.jar \
{mongo_url} {mongo_port} {mongo_db} {mongo_user} {mongo_password} {hdfsUrl} {hdfsPort} {spark_master_url} {spark_master_port}
"""
)

t5 = SSHOperator(
task_id="the_end",
ssh_conn_id='spark_master_ssh',
cmd_timeout=None,
command="echo 'The script has ended'",
)

t0 >> t1 >> t2 >> t3 >> t4 >> t5

We defined the (example_url) variable in Airflow and we passed this argument to the Scala scripts.

object ETL_clean_mongodb {
def main(args: Array[String]): Unit = {

val mongo_url = args(0)
val mongo_port = args(1)
val mongo_db = args(2)
val mongo_user = args(3)
val mongo_password = args(4)
val hdfsUrl = args(5)
val hdfsPort = args(6)
val spark_master_url = args(7)
val spark_master_port = args(8)
...
...
...

3. Airflow and Spark-master ssh connection:

If we want to run our script in the Airflow container, we need to set up an SSH tunnel for our container. You can think of a container like a server in real life, so we need to open an SSH tunnel for our two servers.

I added all configurations in the Airflow Dockerfile and Spark Dockerfile.

# YOU CAN CHECK spark-master CONTAINER FOR OUR SSH SERVICE IS RUNNIN
root@4b24a53cff6b:/# service ssh status
sshd is running.

You can check the Dockerfile for SSH tunnel information and password in the following location: sparkMaster => spark => master => Dockerfile

So, our password is: screencast

# YOU CAN FIND FULL DOCKER FILES IN sparkMaster FOLDER.
# SSH configuration
RUN mkdir -p /var/run/sshd
# THE PASSWORD FOR SPARK-MASTER, THE WILL BE SSH PASSS
RUN echo 'root:screencast' | chpasswd
RUN sed -i 's/#PermitRootLogin prohibit-password/PermitRootLogin yes/' /etc/ssh/sshd_config
RUN sed -i 's/#PasswordAuthentication yes/PasswordAuthentication yes/' /etc/ssh/sshd_config

# Generate SSH keys
RUN ssh-keygen -t rsa -f /root/.ssh/id_rsa -q -N ""
RUN cp /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys

on contination I make some change in bitnami airflow image, I added a user for our ssh,

FROM bitnami/airflow:2.9.1

# I ADDED BELOW LINE BECAUSE WE'LL NEED TO USE SSH FOR OUR
# spark-master CONTAINER.

USER root
RUN apt-get update && apt-get install -y openssh-client && apt-get clean

RUN useradd -m -u 1001 -s /bin/bash airflow_user
RUN chown -R airflow_user:airflow_user /opt/bitnami/airflow

RUN chmod 777 /opt/bitnami/airflow

USER 1001

After that, you can enter the Airflow container from the Spark master container.

cagri@cagri:~/Desktop/trino_test_1$ docker exec -it airflow bash
airflow_user@7927c833abe1:/$ ssh -i /opt/bitnami/airflow/.ssh/spark-master-key root@spark-master
root@spark-master's password:
Linux 4b24a53cff6b 6.5.0-41-generic #41~22.04.2-Ubuntu SMP PREEMPT_DYNAMIC Mon Jun 3 11:32:55 UTC 2 x86_64

The programs included with the Debian GNU/Linux system are free software;
the exact distribution terms for each program are described in the
individual files in /usr/share/doc/*/copyright.

Debian GNU/Linux comes with ABSOLUTELY NO WARRANTY, to the extent
permitted by applicable law.
Last login: Wed Jun 12 11:34:14 2024 from 172.80.0.6
root@4b24a53cff6b:~# cd /opt/spark/
root@4b24a53cff6b:/opt/spark# ls
LICENSE NOTICE R README.md RELEASE bin conf data examples history jars kubernetes licenses logs python sbin yarn
root@4b24a53cff6b:/opt/spark# pwd
/opt/spark
root@4b24a53cff6b:/opt/spark#

Lastly, you need to add SSH information in Airflow:

  • Go to localhost:8080 => admin => connection => add a new record

PS: You can also add the Spark master container’s IP address, that’ll be okay.

3.1. Scrips Security for Airflow

Lastly, you need to set variables in Airflow because most scripts stay open on the server. However, we can get all our important connection information in Airflow, so you must be careful with the Airflow user and password.

4. Scala Apache Spark Scrips:

You can find all the scripts in our project folder under scala_scripts and I have also added the SBT files. You can check all the code locally. I'm using IntelliJ, so first, we need to compile our scripts.# MAKE SURE YOU ARE IN THE SAME DIRECTORY AS THE build.sbt FILE.
sbt assembly

[success] Total time: 13 s, completed Jun 12, 2024, 2:50:37 PM
cagri@cagri:~/Desktop/code/gitrepom/scala/scala2/pjairflow

4.1. How to use scala scrip in airflow:

After that, you can use this JAR file in Airflow. Also, don’t worry, you can change the name of your JAR file.

jar tf ETL_raw_mongodb.jar | grep 'ETL_raw_mongodb.class'
# IF YOUR ETL_raw_mongodb.class COMPILE GOOD,
# YOU NEED TO SEE OUTPUT LIKE THAT:
com/example/ETL_raw_mongodb.class
# YOU CAN TEST YOUR JAR FILE IN YOUR LOCAL WITH THE spark-submit.
/opt/spark/bin/spark-submit --class com.example.Main /
--master spark://spark-master:7077 --packages io.delta:delta-core_2.12:2.4.0 /
/opt/submitfiles/ETL_raw_mongodb.jar

PS: I deploy a .sh for your scrips when you want to use this .sh please make sure your class name and .jar file’s name must be the same.

#!/bin/bash

# WARNING
echo "YOUR CLASS NAME AND JAR FILE NAME MUST BE THE SAME!"

# EXTRACT THE BASE NAME OF THE JAR FILE (REMOVE THE DIRECTORY AND .jar EXTENSION)
jar_base_name=$(basename "$1" .jar)

# CONSTRUCT THE CLASS NAME DYNAMICALLY
class_name="com.example.${jar_base_name}"

# RUN THE DOCKER EXEC COMMAND WITH THE CONSTRUCTED CLASS NAME
docker exec -it spark-master /opt/spark/bin/spark-submit --class "${class_name}" --master spark://172.80.0.105:7077 /opt/submitfiles/"$@"

After than you can check your application in localhost:5050

Then your script will start automatically.

IMPORTANT:

I’ll be going to military service next month, so I’ll deploy Tableau and Spark containers in Kubernetes and airflow mail system. We can call the version 1, then I’ll build version 2 soon…

5. SUMMARY and Fast Build

To set up the desired system in the project, we first created an architecture. This architecture involved collecting raw data from human resources, users within the warehouse, and organizational documents. After cleaning the data, it was written to the relevant databases, making it available for analysis using visualization tools like Tableau or SQL queries. Initially, the raw data was sent to HDFS through an ETL pipeline. As information on the required analysis was received, the data was transferred to clean tables. The analysis was then conducted using Trino, and the relevant reports were prepared.

1. Build containers:

docker-compose up -d --build

2. Wait 1 minute and check the containers status.

docker-compose ps -a

3. Check the SSH connection for Airflow and Spark master.

ssh -i /opt/bitnami/airflow/.ssh/spark-master-key root@spark-master

4. Create the /data folder in HDFS and grant permissions.

5. Lastly you need to create users and humanresource tables

You need to create users and humanresource tables. You can use Trino on DBeaver:

CREATE TABLE users (
id INT,
signal_strength INT,
frequency INT,
location VARCHAR(255),
timestamp DATETIME
);

CREATE TABLE humanresource (
user_id INT,
department VARCHAR(255),
position VARCHAR(255),
salary DECIMAL(10, 2),
hire_date DATE
);

Then insert the data. I created a Python script in the create_data folder.

Also don’t forget to download jar file,

6. Add your airflow variables and spark_master_ssh

Check the (3.1. Scrips Security for Airflow) part.

7. Start your airflow dags: localhost:8080

8. I’m using DBrave, the database managment tools is really easy and user firently.

The-End

Thank you for reading all project, if you wish you can send me a mail… :D

cheers in peace… See ya V2…

--

--