How to build a real-time data-centric architecture

Yaling Hu
gft-engineering
Published in
10 min readOct 19, 2021
Photo by JJ Ying on Unsplash

Data is an asset, so we want to treat it as such. In order to do so we need to build (or evolve into) data-centric architectures, covering current trends such as:

  • Data availability in real-time
  • Agile data management
  • Ownership federation
  • Self-service
  • Easy access to data
  • Valuable data insights

That is why we decided to design and build the skeleton for this kind of architectures, covering an end-to-end flow which can then be enhanced by adding new sources, consumers, security & controls, real-time features, analytics, etc.

This article provides a guide to setup the following data pipeline, with the objective of visualising the real-time data operations executed in the data source platform (in our case, MySQL database) which is also capable of capturing all the real-time changes.

bird’s-eye architecture

Firstly, we have the data stored in a MySQL database. Then with Debezium, we capture the real-time updates in the database. This technology gives you a state of database row before and after the commit, moreover it indicates the type of executed operation. Later we will set up the Spark Structured Streaming in Jupyter Notebook using Pyspark libraries consuming the JSON events from an Apache Kafka topic and then will write them to a Delta Lake table. This, on the one hand, could facilitate to unify streaming and batch data processing and, on the other hand, it permits concurrent read/write operations and enables efficient insert, update, deletes, and rollback capabilities. Next we will store the data with Delta Lake format in HDFS acting as the data source for Dremio, which is a powerful solution to connect your applications and BI tools to your data lake. It provides an user friendly data platform allows the users to create virtual datasets from multiple sources, like HDFS, AWS S3 etc. Finally, we will visualize the data in Apache Superset, an open source modern data exploration and visualization platform, moreover, it is recommended to connect Dremio with Jupyter in case of proving an environment for running the machine learning algorithm based on different use case needs.

Docker

First of all, I will use Docker and various container images to run different technologies, which allows each service run in a separate environment and simplifies the configuration.

So how do we bring the infrastructure up and running?

docker-compose.yml

version: '3'
services:
namenode:
image: bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8
container_name: namenode
volumes:
- hadoop_namenode:/hadoop/dfs/name
environment:
- CLUSTER_NAME=test
env_file:
- hadoop.env
ports:
- '9000:9000'
- '10070:50070'
datanode:
image: dlpgft/datanode:hadoop2.7.4-0.2
container_name: datanode
volumes:
- hadoop_datanode:/hadoop/dfs/data
environment:
SERVICE_PRECONDITION: "namenode:50070"
env_file:
- hadoop.env
ports:
- '10075:50075'
zookeeper:
image: debezium/zookeeper:1.5
container_name: zookeeper
ports:
- '2181:2181'
- '2888:2888'
- '3888:3888'
kafka:
image: debezium/kafka:1.5
container_name: kafka
ports:
- '9092:9092'
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
mysql:
image: debezium/example-mysql:1.5
container_name: mysql
ports:
- '3306:3306'
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
adminer:
image: adminer
container_name: mysql_ui
restart: always
ports:
- '8090:8080'
connect:
image: debezium/connect:1.5
container_name: connect
ports:
- '8083:8083'
links:
- kafka
- mysql
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
jupyter:
build: jupyter/
container_name: jupyter
ports:
- '8888:8888'
environment:
- JUPYTER_TOKEN=easy
- GRANT_SUDO="yes"
user:
"root"
dremio:
image: dremio/dremio-oss:latest
container_name: dremio
ports:
- '9047:9047'
superset:
build: superset/
container_name: superset
ports:
- '8080:8080'
volumes:
hadoop_namenode:
hadoop_datanode:

And now, we can start everything up with:

docker-compose up -d

Debezium

Debezium is an open source distributed platform for change data capture (CDC) built on top of Apache Kafka. Also, it has built up a library of connectors which allow to capture changes from various databases. In this article, I will focus on the MySQL connector and deploy it in Kafka Connect to monitor the database executing the following command:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json

Now let us check the detailed configuration of the connector which is defined in the register-mysql.json file:

{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"tombstones.on.delete": "false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite"
}
}

You can read a detailed explanation of each parameter in the official documentation, however, I would like to emphasize the function of the last five parameters.

Debezium normally generates the data change events with a complex structure, however, there are cases may only expect, for instance, the data after the changes, so we can configure the event flattening SMT (Single Message Transforms) to reconstruct the needed structure for consumers, for example:

"payload": {
"id": 1002,
"first_name": "George",
"last_name": "Bailey",
"email": "gbailey@foobar.com",
"__deleted": "false"
}

Spark Streaming

We will use Spark Structure Streaming to listen to the Kafka message. Once we have a dataframe streaming of the Kafka topic, we would map it to a Delta Lake format and store it in HDFS.

Why Delta Lake?

  • High Performance (data indexing, data skipping, compaction, data caching, etc.)
  • Data Reliability (ACID transaction, time travel, snapshot isolation, schema enforcement, checkpoints, upserts and deletes support, data versioning etc.)

In our demo, we will use Pyspark to execute the streaming jobs. Firstly, we will configure the prerequisites and we will import the libraries that are needed later:

!pip install pyspark==3.0.2
!pip -q install findspark
!wget -q https://archive.apache.org/dist/spark/spark-3.0.2/spark-3.0.2-bin-hadoop3.2.tgz
!tar xf spark-3.0.2-bin-hadoop3.2.tgz
!unzip -q /home/jovyan/spark-3.0.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64" os.environ["SPARK_HOME"] = "/home/jovyan/spark-3.0.2-bin-hadoop3.2" os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages io.delta:delta-core_2.12:0.8.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.2 --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog pyspark-shell'
import findspark
findspark.init()

Set up the Spark Session:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession \
.builder \
.appName("Challenge") \
.getOrCreate()
spark

Let us have a look at how the message is arriving from Kafka. Once the connector is added via curl command, it has started pushing the snapshot changes to Kafka. These are a sample of Kafka messages in our demo:

Sample Kafka Message

Then back to the notebook script, first, we will set up the connection with Kafka. Next, we will read the data with the predefined schema. Once we did it, it is recommended to properly read the data with the predefined schema.

from pyspark.sql.functions 
import * from pyspark.sql.types
import StructType, StructField, StringType, IntegerType, BooleanType import pyspark.sql.functions as f
schema_customers= StructType([
StructField('id', StringType(), True),
StructField('first_name', StringType(), True),
StructField('last_name', StringType(), True),
StructField('email', StringType(), True),
StructField('__deleted', StringType(), True) ])
Schema= StructType([
StructField('schema', StringType()),
StructField('payload', schema_customers)])
#Read Kafka Streamdf_stream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "dbserver1.inventory.customers") \
.option("startingOffsets", "earliest") \
.load() \
.select(f.from_json(f.col("value").cast("string"), Schema).alias("parsed_value"))
# Write stream in memory to check the datadf_stream.select("parsed_value.payload.*").writeStream\
.outputMode("update")\
.format("memory")\
.queryName("Customers_query")\
.start()
spark.sql("SELECT * FROM Customers_query").show(truncate=False)

At this point we will be able to visualize how the message is structured:

data stored temporarily in memory

Now that we have a streaming dataframe of our Kafka topic, we will map it to a Delta Lake format and store it to HDFS:

from delta.tables import *
spark.sql
("""
CREATE TABLE customers
(id int, first_name string, last_name string, email string, __deleted boolean)
USING delta
LOCATION '<introduce_your_hdfs_path: hdfs://namenode:9000/*****>'
""")
deltaTable_customers = DeltaTable.forPath(spark, "<introduce_your_hdfs_path: hdfs://namenode:9000/*****>")# Function to upsert microBatchOutputDF into Delta table using merge
def upsertCustomersToDelta(microBatchOutputDF, batchId):
deltaTable_customers.alias("t") \
.merge(microBatchOutputDF.alias("s"),"s.id = t.id") \
.whenMatchedDelete(condition = "s.__deleted = 'true'") \
.whenMatchedUpdate(set = { "id": "s.id", "first_name": "s.first_name", "last_name": "s.last_name", "email": "s.email" } ) \
.whenNotMatchedInsert(values = {"id": "s.id", "first_name": "s.first_name", "last_name": "s.last_name", "email": "s.email"}) \
.execute()
df_stream.select("parsed_value.payload.*").writeStream\
.format("delta") \
.foreachBatch(upsertCustomersToDelta) \
.outputMode("append") \
.option("checkpointLocation","<introduce_your_hdfs_path: hdfs://namenode:9000/checkpoint/*****>") \
.start("<introduce_your_hdfs_path: hdfs://namenode:9000/*****>")

Dremio

Dremio is defined as a Data Lake Engine, which allows you to do live and interactive queries on your cloud-based or on-premise data lake storage. Moreover, you could achieve high-performance business intelligence work directly on this data lake. Dremio provides both a free community edition and an enterprise edition. It is claimed that the free version is suitable for the most small businesses, but you should consider the enterprise version if you require more features like data security, data governance, data lineage, etc.

Dremio Community Edition VS Enterprise Edition

In this demo, we will take HDFS as micro-batching data source and AWS S3 as static data source, later we could create a new virtual one by joining these two datasets and visualize it in BI tool. Let us achieve it step by step:

  1. Create an admin account.
  2. Add Data Lake, choose HDFS and set the source configuration. Most importantly, in the metadata, you should adjust the dataset details to fetch the data every minute. Then you will see the folders which were created in Spark Streaming job, at the HDFS path you have used to create the delta table.
  3. Enable the delta lake format by setting the support key “dremio.deltalake.enabled” to true.
Enable the delta lake format

4. Create a space and save the needed datasets into it.

5. Create a virtual dataset by executing the new queries and save it in the same space in Dremio.

SELECT p.first_name, p.last_name, h.hobby
FROM gft_data_pipeline.people AS p
INNER JOIN gft_data_pipeline.hobby AS h
ON p.email = h.email

Apache Superset

We have chosen Apache Superset as the BI tool, which is an open-source and cloud-native BI web application. It provides users with an intuitive, visual and interactive data exploration platform that is able to handle data at the petabyte scale.

Apache Superset provides first class connectivity to Dremio via ODBC and Arrow Flight. We will use to the ODBC approach in this demo. It requires to install the Dremio SQLAlchemy Connector. You can check the detailed configuration in the following Dockerfile.

FROM preset/superset:latest
USER root
RUN apt-get update -y && apt-get install -y alien
RUN wget https://download.dremio.com/odbc-driver/1.4.2.1003/dremio-odbc-1.4.2.1003-1.x86_64.rpm -P /tmp/
RUN alien -k -i /tmp/dremio-odbc-1.4.2.1003-1.x86_64.rpm --scripts
RUN apt-get install -y unixodbc-bin && apt-get install -y unixodbc-dev
RUN pip install pyodbc && pip install sqlalchemy_dremio && pip install dremio_client[full]
USER superset
RUN superset fab create-admin --username admin --firstname Superset --lastname Admin --email admin@superset.com --password admin
RUN superset db upgrade
RUN superset init

After the configuration of the connectors, you need to add a new database in Superset with the required SQLAlchemy URI. The correct format should be:

dremio://[Dremio username]:[Dremio password]@[Dremio host]:31010/dremio

Then test whether the connection works correctly:

Once the database added, we need to add a new dataset as the following photo shows. Filling in the Schema with the space name in Dremio, and table name with the dataset name in Dremio.

Finally we could get our first simple visualization in Apache Superset.

Jupyter Notebooks

With real life projects experience, we quickly notices that data and AI often bond very tightly. Therefore, in this demo we also connect Dremio with a Jupyter Notebook to provide data scientists with a working environment for their algorithms.

Firstly, similar to the connection between Dremio and Superset, we need to install the corresponded connector. Check the detailed configuration in the following Dockerfile:

FROM jupyter/pyspark-notebook
USER root
RUN apt-get update -y && apt-get install -y alien
RUN wget https://download.dremio.com/odbc-driver/1.4.2.1003/dremio-odbc-1.4.2.1003-1.x86_64.rpm
RUN alien -i dremio-odbc-1.4.2.1003-1.x86_64.rpm --scripts
RUN apt-get install -y unixodbc-dev
RUN pip install pyodbc && pip install sqlalchemy_dremio && pip install dremio_client[full]
RUN wget -q https://archive.apache.org/dist/spark/spark-3.0.2/spark-3.0.2-bin-hadoop3.2.tgz
RUN tar xf spark-3.0.2-bin-hadoop3.2.tgz
RUN unzip -q /home/jovyan/spark-3.0.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip
RUN pip install pyspark==3.0.2 && pip -q install findspark
USER jupyter

After the connector installation, we should set up the parameters for the ODBC connection in the Jupyter Notebook.

import pyodbc
import pandas as pd
host = 'dremio'
port = 31010
uid = 'admin'
pwd = 'admin123'
driver = "/opt/dremio-odbc/lib64/libdrillodbc_sb64.so"

Build the connection string:

cnxn=pyodbc.connect(“Driver={};ConnectionType=Direct;HOST={};PORT={};AuthenticationType=Plain;UID={};PWD={}”.format(driver,host,port,uid,pwd),autocommit=True)

At the end, we will execute the SQL query and print the results, or planning the needed algorithms based on the dataset ingested.

sql = ‘SELECT * FROM <name_of_space_in_dremio>.<name_of_table>
df = pd.read_sql(sql, cnxn)
df.head()

Next Steps

This “skeleton architecture” could be enhanced to make it a full data-centric architecture with new features such as the following:

  • Building security into the architecture, for example, verify whether the sensitive fields are truly protected by corresponded encryption while the data is analyzed or manipulated etc; or key management issue; networking issue etc.
  • Enrich the data with new data sources, for instance, on Dremio platform , it is allowed to add different data sources, like AWS S3, Google Cloud Storage and various databases.

Summary

This article has shown the simple end-to-end data pipeline, which permitted capturing the real-time changes in the database, and transforming the dataset from JSON to Delta Lake format, then finally visualize it in the BI tool or a Jupyter Notebook.

The combination of all the technologies is capable to provide not only a reliable and scalable architecture, but also a high performance setup.

--

--