[Data Engineering] Scale Real-Time Streams to Delta Lakehous: Flink Streaming -“Azure HDInsights on AKS”

Keshav Singh
9 min readMay 8, 2024

--

Generated by DALL•E3

In past my series on Flink demonstrated the streaming proposition establishing from scratch the infrastructure and a solution on Azure.

  1. Flink on Azure Kubernetes
  2. Bootstrap Flink Maven Project
  3. Flink Stream Processing — Azure EventHub on Kafka Protocol

This was a great starting point for the azure data engineering enthusiasts keen on engineering stream centric, low latency data processing capability. At the time, it seemed a shiny new proposition with limitations in production grade adoption, since it —

  • Had Infrastructure Management overhead
  • Lacked Service, OOB Scalability, Security guarantees
  • Had limited Native support for Azure Data Lake Storages & formats such as Delta Storage.
  • Lacked OOB Configuration management and extensibility and integrations

Things have much evolved since! Over the past year Azure has progressed immensely with Lakehouse, Onelake, Microsoft Fabric, Microsoft Purview for Data + AI governance (Public Preview) and specifically HDInsights on AKS (In Public Preview). This is an enticing new offering on Azure and enables the Apache Flink community to come onboard and create scalable streaming solutions.

Offering
HDI On AKS

I highly recommend revisiting my previous blog series on Flink as whole to learn more on the technology and more, on this blog we build on past to further establish a lakehouse story with Flink through HDI on AKS service.

Here is great blog sharing about the service -

Delta Format on Flink

In this blog we turn the page and learn about enabling delta format as source and sink for stream processing. Why it is critical? Well, Delta has become a default ACID compliant lakehouse format for ecosystem enabling PetaByte scale processing while turning it a single source of truth, its core of Microsoft’s Fabric story. Data engineering in delta format unifies diverse data sources into singular mode for analytics. Lastly as technologies such as Fabric endpoint, Big Query, Synapse Serverless SQL will get efficient by the day, direct mode delta access will get cheaper and faster with no real need for an edge copy analytics.

Streaming Events can now be unified in Delta format as a sink for enabling Realtime analytics.

Stream Processing

Illustration

Lets make it happen!

Consider a Sales Event Scenario, the event has demonstrated structure.

Sales Source Event

The Sales Source Event is stored in Delta Format on ADLS Gen2.

Delta Storage for SaleSource Events
About 20 Million Events

HDInsight On AKS Cluster Pool

Create a cluster pool to host a set of clusters, these could be Spark, Trino, Flink clusters. Provision the Pool.

HDI on AKS Pool

Next, provision a Flink Cluster, I went with a Session cluster. In nutshell a session cluster can share resources amongst multiple jobs while a Application Cluster will be resource dedicated towards a particular application.

Flink Session Cluster

Once the cluster is provisioned update the flink-configs to add/load the Hadoop class path and ensure to load the cluster’s native class loaders.

Hadoop Class Path

Upon applying the changes the cluster will restart, click on the Flink Dashboard and review its available. This is one point for DAG, execution logs and stream processing details.

Flink Cluster

Application Code

Here is our code for SteamProcessingJob. This code simply reads the data from a Delta source and stream processes it to Delta Sinks.

package org.example;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.hadoop.conf.Configuration;
import java.util.ArrayList;
import java.util.Arrays;

public class StreamProcessingJob {

public static final RowType ROW_TYPE = new RowType(Arrays.asList(
new RowType.RowField("SalesId", new VarCharType(VarCharType.MAX_LENGTH)),
new RowType.RowField("ProductName", new VarCharType(VarCharType.MAX_LENGTH)),
new RowType.RowField("SalesDateTime", new TimestampType()),
new RowType.RowField("SalesAmount", new IntType()),
new RowType.RowField("EventProcessingTime", new TimestampType())
));

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);

// Define the sink Delta table path
String deltaTablePath_sink = "abfss://flink@<storage>.dfs.core.windows.net/Streams/SaleSink";

// Define the source Delta table path
String deltaTablePath_source = "abfss://flink@<storage>.dfs.core.windows.net/Streams/SaleSource";

// Create a bounded Delta source for all columns
DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

createDeltaSink(deltaStream, deltaTablePath_sink, ROW_TYPE);
// Execute the Flink job
env.execute("FlinkDeltaSourceSinkExample");
}

public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {

DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new Path(deltaTablePath),
new Configuration())
.build();

return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "deltaSource");
}
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
new Configuration(),
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}


}

Let me also share the POM.xml for my JAVA project.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>deltaflinkproject</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<hadoop-version>3.4.0</hadoop-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-flink</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

We build a JAR and upload to a convenient directory on ADLS Gen2.

Jar
Uploaded JAR ADLS Gen2

At this point we are ready to submit the StreamProcessingJob on the Flink Cluster. Point to the jar location on the storage and provide the entry class details.

Submit Job on Flink Cluster
Job
Flink Processing Delta Source:Sink

The Job processed the streams based on the logic defined in the JAVA jar code bits.

Callouts and Validations

Flink periodically commits into delta based on the configured checkpointing. env.enableCheckpointing(10000); In our case we issuing commits every 10 seconds.

NOTE : Now one of the critical observations to keep in mind for the initial dataset, incase your JobDuration < Checkpoint duration meaning, you have only 10 records and the job completes much before the first Checkpoint, you will observe only parquet files with no _delta_log directory since the first delta commit was never issued. This is an edge case worth calling out to remember the streaming the (unbounded )semantics are much different than (bounded) batch processing semantics.

The below screenshots depict a periodic processing of this data —

The initial run creates the parquet files but is yet to issue the first delta commit hence we observe only parquet files.

Initial processing

Upon the first commit the delta is initialized and continues to process the streams based on the checkpoint duration.

Stream Processing Continues

Upon completion and checkpoints all inprogress files are committed.

Stream Processing Complete.

Checkpointing is a crucial feature in distributed stream processing frameworks like Flink to ensure fault tolerance and exactly-once semantics.

Relevance ofenv.enableCheckpointing(10000):

  1. Ensures Fault Tolerance: Checkpointing allows Flink to take snapshots of the state of the streaming application at regular intervals. In case of failures, Flink can use these snapshots to restore the state of the application and continue processing from the last successful checkpoint. This ensures fault tolerance and resilience against failures such as machine crashes, network issues, or software bugs.
  2. Consistent State: Checkpointing helps in maintaining consistent state in the face of failures. By periodically saving the state of the application, Flink guarantees that even if failures occur, the state can be recovered to a consistent point.
  3. Exactly-once Processing: Checkpointing, combined with Flink’s processing model, enables exactly-once semantics. With exactly-once processing, each record in the input streams is processed exactly once, even in the presence of failures and restarts. This is crucial for applications where data correctness is paramount, such as financial transactions or real-time analytics.
  4. Performance Considerations: The checkpointing interval (in this case, 10000 milliseconds or 10 seconds) is a trade-off between fault tolerance and performance. Shorter intervals provide better fault tolerance but can impact performance due to the overhead of taking and managing checkpoints. Longer intervals reduce this overhead but increase the potential amount of data loss in case of failures. Choosing an appropriate interval depends on the specific requirements of the application.
  5. Configuration Flexibility: Flink provides flexibility in configuring checkpointing behavior. Developers can tune various parameters such as checkpointing interval, checkpointing mode (e.g., exactly-once, at-least-once), state backend, and storage options based on the specific needs of their application and the underlying infrastructure.

Finally lets validate the delta sink for our processed streams.

Stream Sink Validations

Architectural Considerations

When compared with Azure Stream Analytics or Spark Streaming —

  • Flink offers built-in support for managing complex stateful computations efficiently. It provides a unified runtime for both batch and stream processing, allowing seamless integration of stateful operations into streaming jobs. Flink’s state management capabilities include fault tolerance, exactly-once semantics, and flexible state backend options (e.g., memory, RocksDB).
  • Flink provides strong consistency guarantees with exactly-once processing semantics out of the box. It ensures that each event is processed exactly once, even in the presence of failures or restarts, making it suitable for mission-critical applications.
  • Flink is designed for high throughput and low-latency processing at scale. It supports fine-grained control over resource allocation and dynamic scaling, allowing efficient utilization of cluster resources. Flink’s pipelined execution model and advanced optimizations contribute to its superior performance.
  • Flink integrates seamlessly with other components of the Apache ecosystem, such as Apache Kafka, Apache Hadoop, and Apache Hive. It also provides connectors for integrating with various cloud platforms and data sources.

Flink excels in handling complex stateful workloads with its advanced state management, processing guarantees, scalability, and performance optimizations.

We reviewed Delta Source & Sinks on this blog which could be complemented based on my blog in past to create a Kafka Source & Sinks to diverse stream processing scenarios. This will help lightup much needed realtime streaming analytical stories.

In Closing..

We have introduced Azure HDI on AKS (Flink) and emphasized the Delta Lakehouse story. This blog is for those passionate data engineers who are data native and love to design a system, resilient/frugal/precise, write those hard lines of code, control their destiny and are curious to understand what lies under the hood. I dedicate this blog to all such, lets welcome a fully managed Apache Flink on Azure!

References

--

--

Keshav Singh

Principal Engineering Lead, Microsoft Purview Data Governance PG | Data ML Platform Architecture & Engineering | https://www.linkedin.com/in/keshavksingh