Reading Kafka data through Pyspark
In my previous post, we saw how to read data from MySQL table into a local Kafka topic installed on a Windows machine. In this article, we will see how to read the data from the Kafka topic through Pyspark. You can read Kafka data into Spark as a batch or as a stream. Batch processing is preferred when you have static data. Stream processing is for processing data that is generated in real-time. We will see both types of processing in this article. I will be executing this from my windows machine, so as a prerequisite, please ensure to install Pyspark on your windows machine.
Batch Processing
Open your Pyspark shell with spark-sql-kafka package provided by running the below command —
pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1
I am running Spark 3. So, providing org.apache.spark:spark-sql-kafka-0–10_2.12:3.0.1 package to import required Kafka libraries. Please check your Spark version and use the appropriate package.
Execute the below command to read the Kafka records from the employee topic. You can replace “localhost” with your Kafka server IP, and “employee” with your Kafka topic name. Setting “startingOffsets” as “earliest” ensures fetching all the records from the server and “latest” fetches only the latest/recent unprocessed records.
You can execute the below command to verify the data. The data appears in the serialized format.
To deserialize, cast the value data as a string by executing the below command -
You can verify the deserialized data by running the show —
We can see that the data is in JSON format, and to parse the data into individual columns, let's read this data through a JSON schema. The below code will import the required datatypes and define a struct schema to read our JSON data.
The below code will parse the data using the json_schema and we can verify the parsed data by running show-
Stream Processing
Now we will see how to stream the data using Spark. Once the stream is started, Spark keeps listening to the Kafka topic and fetches new records in real time.
To read the Kafka data as a stream, just replace the read with readStream as below
You can run a printSchema to verify the schema
Execute the below code to fetch only the value field and cast it as a string-
Execute the code below to convert the string to its actual JSON format and verify its schema-
Execute the code below to print the streamed records into console —
df2.writeStream.format("console").outputMode("append").start().awaitTermination()
Until we kill the process it keeps listening to the Kafka topic for any new records arrival. You can kill the process by pressing CTRL+C. You can submit this as a spark job with the Kafka package as below -
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 kafka-spark.py
You can choose the format to be console or json or parquet or even Kafka. You can refer to WriteStream for more options. You get the complete code on my github.
Happy Learning!