Kafka-Spark Integration: (Streaming data processing)

Sruthi Vijay
3 min readDec 17, 2018

--

This blog explains on how to set-up Kafka and create a sample real time data streaming and process it using spark.

Apache Kafka is an open-source stream-processing software platform developed by the Apache Software Foundation, written in Scala and Java. The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds.

Kafka-pipeline:

Recommended Platform:

OS: Linux is preferred as a development and deployment platform, whereas windows is used for development only.

Kafka Architecture:

Kafka consists of three parts mainly and it works on the Pub-Sub concept.

Producer

Broker

consumer

Kafka producer will publish the messages to a topic and the consumer acts a receiver which will be subscribed to a topic.

Prerequisites:

Kafka

Spark

Pyspark

Python-2.7+ version.

zookeeper

Kafka-setup:

Before you install kafka download zookeeper from the link.

unzip the file and navigate to the bin folder and run the below command.

zkServer.sh start

Now download kafka version of your choice from the link.

navigate to kafka folder and run bin/kafka-server-start.sh config/server.properties

Create topic:

bin/kafka-topics.sh — create — zookeeper localhost:2181 — replication-factor 1 — partitions 1 — topic topic-name.

Start producer:

bin/kafka-console-producer.sh — broker-list localhost:9092 — topic topic-name

Start consumer:

bin/kafka-console-consumer.sh — zookeeper localhost:2181 — topic topic-name — from-beginning.

Python script to create a data simulator for kafka producer.

below is the python script to simulate data to kafka producer.

from time import sleep
from json import dumps
from kafka import KafkaProducer
import pandas as pd
import json
import csv

producer = KafkaProducer(bootstrap_servers=[‘localhost:9092’],
value_serializer=lambda x:
dumps(x).encode(‘utf-8’))

with open(“/xxx/xxx/xxx/boston.csv”) as file:
reader = csv.DictReader(file, delimiter=”;”)
for row in reader:
data = json.dumps(row)
producer.send(‘streaming_data’,value=data)
sleep(1)
print(“Successfully sent data to kafka topic”)

Once you run the above script open a consumer console for the same topic you will be able to see stream of messages flowing from the producer.

pyspark script to read kafka data stream:

import sys

import os

os.environ[‘PYSPARK_SUBMIT_ARGS’] = ‘ — packages org.apache.spark:spark-streaming-kafka-0–8_2.11:2.3.0 pyspark-shell’

#os.environ[‘PYSPARK_SUBMIT_ARGS’] = ‘ — packages org.apache.spark:spark-streaming-kafka-0–8_2.11:2.1.0,org.apache.spark:spark-sql-kafka-0–10_2.11:2.1.0 pyspark-shell’

from pyspark import SparkContext, SparkConf

from pyspark.streaming import StreamingContext

from pyspark.streaming.kafka import KafkaUtils

from uuid import uuid1

import json

from pyspark.mllib.clustering import StreamingKMeans

from pyspark.mllib.linalg import SparseVector, DenseVector

from pyspark.sql import SQLContext, SparkSession

sc = SparkContext(appName=”Sparkstreaming”)

spark = SparkSession.builder.appName(“Spark-Kafka-Integration”).master(“local”).getOrCreate()

ssc = StreamingContext(sc,1)

kafka_stream = KafkaUtils.createStream(ssc,”localhost:2181",”raw-event-streaming-consumer”,{“streaming_data”:1})

raw = kafka_stream.flatMap(lambda kafkaS: [kafkaS])

lines = raw.map(lambda xs: xs[1].split(“|”))

lines.pprint()

ssc.start()

ssc.awaitTermination()

On running the above script you’ll be able to see the consumed messages. Further in the next article, we will see how to apply ML algorithms and process streaming data.

Thanks for reading. Hope it was useful.

--

--

Sruthi Vijay

Machine Learning evangelist having immense passion towards programming and mathematics.