[Data Engineering] : Flink Stateful (KeyedProcessFunction) Streaming Kafka Events To Delta Lakehouse (ADLS Gen2) Sink — Scala

Keshav Singh
6 min readMay 22, 2024

--

In this article we begin exploring stateful realtime streaming and will demonstrate the power of stream processing with Flink.

Continuing with our Sales event example, we review an illustrate and example of stream processing where — we want to aggregate the Sales Amount by Product Name. In performing such we intend to handle duplicate arriving events or late arriving events. We intend to ignore all duplicate & late events based on EventProcessingTime stamp. To be clear for Product_A if we have processed aggregations until T1 (“2024–05–10T19:04:40.563Z”) we intend to ignore all events for Product_A which are either less or equal to T1. Further we want to be fault tolerant and real-time in processing these event.

Event

{
"SalesId": "acbb7118-a45a-4929-a3ec-0bf176716312",
"ProductName": "Product_23",
"SalesDateTime": "2024-05-10T19:04:40.563Z",
"SalesAmount": 426,
"EventProcessingTime": "2024-05-10T19:04:40.563Z"
}

Approach — To aggregate the Sales Amount by Product Name while handling duplicate and late-arriving events, we utilize Apache Flink’s stateful stream processing capabilities. Our goal is to ensure accurate and fault-tolerant aggregation by ignoring all duplicate and late events based on the EventProcessingTime stamp.

Here’s the detailed approach in navigating the solution:

Event Identification and Handling:

  • Each event includes a unique SalesId, a ProductName, a SalesDateTime, a SalesAmount, and an EventProcessingTime.
  • We aggregate sales amounts by ProductName.

Duplicate and Late Event Handling:

  • We keep track of the latest processed EventProcessingTime for each product.
  • If an event arrives with an EventProcessingTime less than or equal to the latest processed time for that product, it is considered a duplicate or late event and is ignored.

Fault Tolerance and Real-Time Processing:

  • We leverage Flink’s state management to store the latest EventProcessingTime for each product.
  • Ensure stateful processing by leveraging Flink’s checkpointing and state backends, which provide fault tolerance and recovery capabilities.

GitHub Code Reference https://github.com/keshavksingh/FlinkKafkaDelta

package com.kafkadelta.project
import com.typesafe.config.ConfigFactory
import io.delta.flink.sink.DeltaSink
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.datastream.{DataStream,KeyedStream}
import org.apache.flink.table.data.{GenericRowData, RowData, StringData, TimestampData}
import org.apache.flink.table.types.logical.{IntType, RowType, TimestampType, VarCharType}
import org.apache.hadoop.conf.Configuration

import java.util
import org.apache.flink.streaming.api.CheckpointingMode

object StatefulStreamProcessingKafkaSourceDeltaSinkStreamJob {
def main(args: Array[String]):Unit= {
val TOPIC = "saleseventhub"
val config = ConfigFactory.load("kafka.consumer.conf").getConfig("confighome")
val kafkaconfig = config.getConfig("kafka-consumer")
val deltaTablePath_sink = "abfss://flink@<storage>.dfs.core.windows.net/Streams/SalesOrderAggregatesByProduct"

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(10000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

val ROW_TYPE: RowType = new RowType(util.Arrays.asList(new RowType.RowField("SalesId", new VarCharType(VarCharType.MAX_LENGTH))
, new RowType.RowField("ProductName", new VarCharType(VarCharType.MAX_LENGTH))
, new RowType.RowField("SalesDateTime", new TimestampType)
, new RowType.RowField("TotalSalesAmount", new IntType)
, new RowType.RowField("EventProcessingTime", new TimestampType)))

val kafkaSource = KafkaSource.builder()
.setBootstrapServers(kafkaconfig.getString("bootstrap.servers"))
.setProperty("sasl.mechanism", kafkaconfig.getString("sasl.mechanism"))
.setProperty("sasl.jaas.config", kafkaconfig.getString("sasl.jaas.config"))
.setProperty("security.protocol", kafkaconfig.getString("security.protocol"))
.setTopics(TOPIC)
.setGroupId("$Default")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new KafkaSalesEventSchema())
.build()
val stream:DataStream[SalesOrderEvent] = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "KafkaSource")
val newStream: DataStream[RowData] = stream
.keyBy((value: SalesOrderEvent) => value.ProductName)
.process(new DeduplicateAndAggregateSales())

createADLSDeltaSink(newStream, deltaTablePath_sink, ROW_TYPE)
env.execute("AzureEventHubKafkaReadADLSDeltaStatefulWriteExampleJob")
}
def createADLSDeltaSink(stream: DataStream[RowData], deltaTablePath: String, rowType: RowType): DataStream[RowData] = {
val deltaSink = DeltaSink.forRowData(new Path(deltaTablePath), new Configuration, rowType).build()
stream.sinkTo(deltaSink)
stream
}
}

This Flink application reads sales events from a Kafka topic, processes the events to remove duplicates and aggregate sales amounts by product name, and writes the results to a Delta table in Azure Data Lake Storage. The job is fault-tolerant and ensures exactly-once processing using Flink’s checkpointing capabilities.

package com.kafkadelta.project

import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.table.data.{GenericRowData, RowData, StringData, TimestampData}
import org.apache.flink.util.Collector
import java.sql.Timestamp

class DeduplicateAndAggregateSales extends KeyedProcessFunction[String, SalesOrderEvent, RowData] {
// Map to store the last processed event's EventProcessingTime for each product
private lazy val lastProcessedTimeMap: MapState[String, Timestamp] = getRuntimeContext.getMapState(
new MapStateDescriptor[String, Timestamp]("lastEventProcessingTimeMap", classOf[String], classOf[Timestamp])
)

// Map to store the aggregated sales amount for each product
private lazy val productSalesAmountMap: MapState[String, Int] = getRuntimeContext.getMapState(
new MapStateDescriptor[String, Int]("productSalesAmountMap", classOf[String], classOf[Int])
)

override def processElement(
event: SalesOrderEvent,
ctx: KeyedProcessFunction[String, SalesOrderEvent, RowData]#Context,
out: Collector[RowData]): Unit = {
val productName = event.ProductName
val lastProcessedTime = Option(lastProcessedTimeMap.get(productName))
if (lastProcessedTime.isEmpty || event.EventProcessingTime.compareTo(lastProcessedTime.get) > 0) {
// Update the last processed time for the product
lastProcessedTimeMap.put(productName, event.EventProcessingTime)
// Update the aggregated sales amount for the product
val currentSalesAmount = Option(productSalesAmountMap.get(productName)).getOrElse(0)
productSalesAmountMap.put(event.ProductName, currentSalesAmount + event.SalesAmount)

// Emit the event with aggregated sales amount
val rowData = new GenericRowData(5)
rowData.setField(0, StringData.fromString(event.SalesId))
rowData.setField(1, StringData.fromString(event.ProductName))
rowData.setField(2, TimestampData.fromTimestamp(event.SalesDateTime))
rowData.setField(3, currentSalesAmount + event.SalesAmount) // Aggregate sales amount
rowData.setField(4, TimestampData.fromTimestamp(event.EventProcessingTime))
out.collect(rowData)
}
}
}

This class ensures that only the latest events for each product are processed by maintaining a state of the last processed EventProcessingTime. It aggregates sales amounts by product and emits the processed events downstream. The approach effectively handles deduplication by comparing the event’s EventProcessingTime against the stored time for each product, ensuring that only new events are considered.

Stateful Stream Processing Kafka Source Delta Sink Stream Job

At this point we are ready to test our Flink applications.

Submit Flink Stateful Streaming Job

Initial Events

We begin with 2 events for Product_A and Product_B

Kafka Source Events

Both get processed to product the Azure Data Lake Gen2 Delta Sink. The current Sales Amount for Product_A is 200 (EventProcessingTime 2024–05–20 19:04:40.563) and Product_B is 100 (EventProcessingTime 2024–05–20 19:04:41.563)

Sink Output

At this point Flink manages the state for the Keys and their respective EventProcessingTime and Total SalesAmount.

Scenario 2

Further at this point we emitt 3 more events with Product_A,Product_B & Product_C. Notice the EventProcessingTime for Product_A equal to last event and hence based on our logic deserves to be ignored.

New Events
Sink

Flink Ignores Product_A, adds Product_C to sink. Lastly for Product_B it looks up the state and adds the last SalesAmount 100 with 500 to produce 600.

Scenario 3

To be very clear we do this one more time. 3 new event, Product_C, Product_A and Product_B. Based on the EventProcessingTime Product_B & Product_A will be ignored since their respective EventProcessingTime is ≤the last processed event’s state. Hence we only fine Product_C to processed.

Product_C is processed with 200 + 100 depicting 300 as the latest Total Sales Amount from the event.

Sink

In this article, we explore a powerful solution for real-time, stateful stream processing using Apache Flink. By integrating Kafka as a data source and Delta Lake for storage, we demonstrate how to effectively aggregate sales data while handling duplicate and late-arriving events. This approach ensures accurate, up-to-date results by maintaining state and utilizing Flink’s fault-tolerant realtime processing capabilities. For businesses, this means having reliable, real-time insights into sales performance, enabling timely decision-making and improving operational efficiency. The practical example provided showcases the potential of Flink to transform data streams into actionable intelligence, highlighting its robustness in managing complex event processing scenarios. This solution empowers organizations to harness their streaming data with confidence, driving better outcomes and fostering a data-driven culture.

--

--

Keshav Singh

Principal Engineering Lead, Microsoft Purview Data Governance PG | Data ML Platform Architecture & Engineering | https://www.linkedin.com/in/keshavksingh