Edge To Cloud

Hayrullah Karaman
İstanbul Data Science Academy
5 min readJul 7, 2023

Docker,Airflow, Kafka, Nifi, MongoDB, BigQuery

Edge to cloud Pipline

About Project

In this project, I will get you the fuel prices in Germany, eliminate the diesel fuel information, transfer the gasoline information to MongoDB, then process it in BigQuery and visualize it in Looker.

General project view

Infrastructure

The technologies used in the infrastructure of the project were built on google cloud. It was installed with docker to automate Apache airflow,
Kafka and Nifi were installed with a separate compute instance, and python was installed to run the python file on it. MongoDB limited version at cloud was used as MongoDB Atlas with Google Cloud Dataflow, data was taken from MongoDb Atlas and transferred to BigQuery and it is aimed to be visualized with Looker.

Extraction & Transform Data

Python :

I get the fuel pump information at the fuel station in a certain region of Germany from the site with the Python requests method.Since the data comes in nested Json format, the data is organized and editing with the Pandas Json library to avoid problems later.

import requests
import pandas as pd
import json
from kafka import KafkaProducer
import time
from airflow import DAG
from airflow.operators.python import PythonOperator
import pendulum
import os

def getdata():
url='https://creativecommons.tankerkoenig.de/api/v4/stations/search?apikey=cffa4fb8-7a16-cd85-7946-263722530f15&lat=48.8&lng=9.24&rad=25'

response = requests.get(url).json()
df_fuel = pd.json_normalize(data=response['stations'],record_path='fuels',
meta=[['category','name']],errors='ignore')

df_fuel.rename(columns={'name':'category_name'},inplace=True)
df_fuel=df_fuel[['category','category_name','price','lastChange.amount','lastChange.timestamp']][0:300]

df_v2=pd.json_normalize(response,record_path=['stations'])
df_station=df_v2[['country','id','name','brand','street','postalCode','place','isOpen','dist','volatility','coords.lat','coords.lng']][0:300]

df=pd.concat([df_station,df_fuel],axis=1)

Apache Kafka :

A topic named Gas is created and Procudered via python


server="ipv4:9092"
topic_name='gas'
producer = KafkaProducer(
bootstrap_servers = server,
value_serializer= lambda x: json.dumps(x).encode("utf-8"))

for _, row in df.iterrows():
to_json = row.to_dict()
time.sleep(1)
print(_)
producer.send("gas", value=to_json)
producer.flush()
producer.close()

Apache AirFlow

To pull daily data, DAGS named Gas was created and set to work daily.


with DAG(
dag_id="1_gas_bash",
schedule="@daily",
start_date=pendulum.datetime(2023,6,11,tz="UTC")#Task Start date
) as dag:

gas = PythonOperator(
task_id ="gas",#task name
python_callable=getdata#Task execute
)

gas

Apache Nifi

We organize the topics that Kafka sends as joltform with Apche Nifi kafka_consumure, then we filter only gasoline information with Route Atrribute and send them to mongodb by making PutMongo db connection settings.

General nifi info

RouteOnAttribute Variable created to filter diesel information

To send the filtered data to Mongodb, a connection is created as follows, collection, table b and the action to be taken are selected. I chose insert because I will only insert

MongoDb ATLAS

Mongodb is a popouler NoSql database that supports document-based jason files that are used a lot in NoSql databases. Mongodb atlas is offered as a cloud solution, you can use a limited cluster after registration. Here I am creating a table called Gasoline in the collection named Gas, I am sending it here to save the data I have filtered in nifi.

MongoDB Atlas Compose using

The information received from the edge device is automated with Airflow, Apache Nifi is transferred with kafka in the form of a topic, and MongoDb is sent for storage by performing gasoline filtering.

Google Cloud Dataflow

After I transferred the data to Mongo DB, I used Google Cloud Dataflow to transfer the data to Bigquery, I created a job on the dataflow, I choose Mongodb to BigQuery, then the URL, database and collection information of mongodb is entered and the job is created with create button . It is necessary to choose which dataset to send on the BigQuery side, the trick is to set the table inside the dataset. it has to be empty.This process is a batch process, it works once and transfers the existing data in batches.

Dataflow Job create
Job Graph and İnfo

Google Cloud BigQuery

I have previously created the data we transferred from Dataflow under Gas dataset.Job transfers the gasolinev1 he created as follows, where lob verbir adds timestamp and id to the sourcedata by adding the information in the json structure.

Data transferred from dataflow

The transferred data was in a separate json structure under sourcedata, in order to be able to use and process the files, I had to extract the files from the json format in accordance with the Dataware structure. For this problem, I converted each json key into the column structure of the table with the Extract_Json function, I created a table under gasoline_v2

Extract_Json Function

The table gasoline_v2 created with the json_excract function is as follows.

Gasoline_v2 Table

Google Cloud Looker Studio

Since the gasoline information is now in our hands, the desired graphs can be made, of course, the more data, the better ML models, analysis and visualization we can provide, since there are limited accounts here, I could only prepare visualizations with the data we took in 2–3 days.

I made a location-based display of the location coordination area codes and the fuel pump information obtained from Lat and Lng in the table.

Edge Device is Location

Price distribution ratio of fuel prices according to brands and price distribution according to brands

Price and Brand info

I finished the project by passing the end-to-end edge device information through various data engineer processes in the cloud, thank you for your time, see you in other articles.

if you want o view source code on github link

#DataEngineer #Docker #Kafka #Nifi #MongoDg #BigQuery

--

--