Streaming from Kafka to PostgreSQL through Spark Structured Streaming

Knoldus Inc.
3 min readMay 4, 2020

--

Hello everyone, in this blog we are going to learn how to do a structured streaming in spark with kafka and postgresql in our local system. We will be doing all this using scala so without any furthur pause, lets begin.

spark structured streaming with kafka and postgresql

Setting up the necessities first:

  1. Dependencies

Set up the required dependencies for scala, spark, kafka and postgresql.

scalaVersion := "2.11.12"

val postgresVersion = "42.2.2"
val sparkVersion = "2.4.3"

libraryDependencies ++= Seq(
//spark
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,

//kafka
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion,

//postgresql
"org.postgresql" % "postgresql" % postgresVersion
)

2. PostgreSQL setup

Lets start fresh by creating a user and a database.

First go inside the postgres shell:

sudo -u postgres psql

Then, create a user and a database:

CREATE USER tom WITH ENCRYPTED PASSWORD 'password';
CREATE DATABASE pendulum WITH OWNER tom

A user named tom is created with password as password itself and a database named pendulum.

Coding Time :

As usual create your spark session first :

val spark = SparkSession.builder()
.appName("streaming-app")
.master("local[2]")
.getOrCreate()

Before moving on, let us take a break and take a look at what is going to happen.

  • Json strings will be populated into a kafka topic :
{"id":4,"name":"The Beatles","hometown":"Liverpool","year":1960}
{"id":3,"name":"Metallica","hometown":"Los Angeles","year":1981}
  • Spark is going to read them and do some operation accordingly
  • A table in postgreSQL will be populated by spark.

Reading from kafka

As we know what kind of json strings are coming into kafka, we are going to create a schema for it.

case class Bands(id: Int,
name: String,
hometown: String,
year: String)

object Bands {
val bandsSchema: StructType = Encoders.product[Bands].schema
}

Now, lets create a method to read those from kafka:

//Set up your configuration accordingly
def kafkaSourceOptions: Map[String, String] = Map(
("kafka.bootstrap.servers", brokers),
("group.id", groupId),
("startingOffsets", fromBeginning),
("subscribe", sourceTopic)
)

//Method to read from kafka as a dataset
def readFromKafka(spark: SparkSession): Dataset[Bands] = {
import spark.implicits._
spark
.readStream
.format("kafka")
.options(kafkaSourceOptions)
.load() //Dataframe is loaded untill here
.selectExpr("cast(value as string) as value") //casting binary values into string
.select(from_json(col("value"), bandsSchema).as[Bands])//converting into the dataset with the schema we have created.
}

Okay, so we have got our dataset. So, I am leaving this upto you to do some operations on it.

Writing into PostgreSQL

For this, we need to first set up the connection from spark to postgresql. with the credentials we have defined above.

//Create your postgresql configurations
def postgresqlSinkOptions: Map[String, String] = Map(
"dbtable" -> "public.bands", // table
"user" -> "tom", // Database username
"password" -> "password", // Password
"driver" -> "org.postgresql.Driver",
"url" -> "jdbc:postgresql://localhost:5432/pendulum"
)

Now, lets create a method to write our dataset into postgresql :

//Method to write the dataset into postgresql
def writeToPostgresql(dataset: Dataset[Bands], mode: SaveMode = SaveMode.Append) = {
dataset.writeStream
.foreachBatch { (batch: Dataset[Bands], _: Long) =>
batch.write
.format(dataSource)
.options(postgresqlSinkOptions)
.mode(mode)
.save()
}
.start()
.awaitTermination()
}

Lets run

Okay, so first lets sum up what we did so far by calling the methods :

//reading from kafka
val bandsDataset: Dataset[Bands] = readFromKafka(spark)

//after doing something with the dataset say

//writing to db
writeToPostgresql(bandsDataset)

Before running, make sure your kafka and postgresql is up running in your local system.

And lets produce some sample messages in the kafka topic say : bands

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic bands
>>{"id":4,"name":"The Beatles","hometown":"Liverpool","year":1960}
>>{"id":3,"name":"Metallica","hometown":"Los Angeles","year":1981}
>>{"id":0,"name":"Led Zeppelin","hometown":"London","year":1968}

Now, after running your application you should be able to see the data in your table accordingly, in our case it was the bands table:

Output in postgresql table.

For more, you can refer to this repository having the same implementation.

Thanks

References :

knoldus blogs.

--

--

Knoldus Inc.

Group of smart Engineers with a Product mindset who partner with your business to drive competitive advantage | www.knoldus.com