Kafka-Spark Integration: (Streaming data processing)
This blog explains on how to set-up Kafka and create a sample real time data streaming and process it using spark.
Apache Kafka is an open-source stream-processing software platform developed by the Apache Software Foundation, written in Scala and Java. The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds.
Kafka-pipeline:
Recommended Platform:
OS: Linux is preferred as a development and deployment platform, whereas windows is used for development only.
Kafka Architecture:
Kafka consists of three parts mainly and it works on the Pub-Sub concept.
Producer
Broker
consumer
Kafka producer will publish the messages to a topic and the consumer acts a receiver which will be subscribed to a topic.
Prerequisites:
Kafka
Spark
Pyspark
Python-2.7+ version.
zookeeper
Kafka-setup:
Before you install kafka download zookeeper from the link.
unzip the file and navigate to the bin folder and run the below command.
zkServer.sh start
Now download kafka version of your choice from the link.
navigate to kafka folder and run bin/kafka-server-start.sh config/server.properties
Create topic:
bin/kafka-topics.sh — create — zookeeper localhost:2181 — replication-factor 1 — partitions 1 — topic topic-name.
Start producer:
bin/kafka-console-producer.sh — broker-list localhost:9092 — topic topic-name
Start consumer:
bin/kafka-console-consumer.sh — zookeeper localhost:2181 — topic topic-name — from-beginning.
Python script to create a data simulator for kafka producer.
below is the python script to simulate data to kafka producer.
from time import sleep
from json import dumps
from kafka import KafkaProducer
import pandas as pd
import json
import csvproducer = KafkaProducer(bootstrap_servers=[‘localhost:9092’],
value_serializer=lambda x:
dumps(x).encode(‘utf-8’))with open(“/xxx/xxx/xxx/boston.csv”) as file:
reader = csv.DictReader(file, delimiter=”;”)
for row in reader:
data = json.dumps(row)
producer.send(‘streaming_data’,value=data)
sleep(1)
print(“Successfully sent data to kafka topic”)Once you run the above script open a consumer console for the same topic you will be able to see stream of messages flowing from the producer.
pyspark script to read kafka data stream:
import sys
import os
os.environ[‘PYSPARK_SUBMIT_ARGS’] = ‘ — packages org.apache.spark:spark-streaming-kafka-0–8_2.11:2.3.0 pyspark-shell’
#os.environ[‘PYSPARK_SUBMIT_ARGS’] = ‘ — packages org.apache.spark:spark-streaming-kafka-0–8_2.11:2.1.0,org.apache.spark:spark-sql-kafka-0–10_2.11:2.1.0 pyspark-shell’
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from uuid import uuid1
import json
from pyspark.mllib.clustering import StreamingKMeans
from pyspark.mllib.linalg import SparseVector, DenseVector
from pyspark.sql import SQLContext, SparkSession
sc = SparkContext(appName=”Sparkstreaming”)
spark = SparkSession.builder.appName(“Spark-Kafka-Integration”).master(“local”).getOrCreate()
ssc = StreamingContext(sc,1)
kafka_stream = KafkaUtils.createStream(ssc,”localhost:2181",”raw-event-streaming-consumer”,{“streaming_data”:1})
raw = kafka_stream.flatMap(lambda kafkaS: [kafkaS])
lines = raw.map(lambda xs: xs[1].split(“|”))
lines.pprint()
ssc.start()
ssc.awaitTermination()
On running the above script you’ll be able to see the consumed messages. Further in the next article, we will see how to apply ML algorithms and process streaming data.
Thanks for reading. Hope it was useful.