Event Driven File Ingestion using Flink Source API

Majid Azimi
DataReply
Published in
11 min readJun 23, 2024

Introduction

In the ever-evolving landscape of data processing, the ability to manage real-time data streams efficiently is crucial for the success of many businesses. Tools like Snowflake’s SnowPipe and similar technologies like ClickPipe have become essential in enabling seamless data ingestion from various sources into their respective platforms. However, these tools come with their own limitations and dependencies that might not align perfectly with every system architecture or use case.

Apache Flink, renowned for its powerful stream processing capabilities, offers an alternative with its highly flexible and scalable architecture. Despite its comprehensive set of built-in functionalities, Flink lacks direct equivalents to SnowPipe or ClickPipe, which simplifies continuous data ingestion with minimal latency. Recognizing this gap, I explored the possibility of simulating these functionalities using the Flink Source API. This approach not only leverages Flink’s robust processing capabilities but also introduces a way to customize data ingestion to fit more specific needs or constraints.

Throughout the post we delve into why Flink’s FileSource fall short for such scenarios and outlines a tailored architecture that better suits the dynamic requirements of streaming data pipelines. Through custom source implementation, this effort aims to provide at-least-once semantics while maintaining the flexibility and power of Flink's stream processing engine.

The following sections will guide you through the architectural considerations, detailed class diagrams, step-by-step code explanations, and a comprehensive end-to-end example.

Why not FileSource?

The FileSource API in Apache Flink is a versatile API for processing files as input data streams. Here's a simple example of how to utilize the FileSource for reading data from a directory:

public class FileSourceExample {
public static void main(String[] args) throws Exception {
// Set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Create a FileSource that reads text files from a specified path
FileSource<String> source = FileSource.forRecordStreamFormat(new TextLineFormat(), new Path("path/to/input/directory"))
.build();

// Add the source to the execution environment
env.fromSource(source, WatermarkStrategy.noWatermarks(), "FileSource")
.print();

// Execute the program
env.execute("Flink FileSource Example");
}
}

In this example, FileSource is set up to read text files from a given directory. The use of TextLineFormat allows the source to read data line-by-line, making it suitable for simple text processing tasks.

While the FileSource API is generally effective for file-based data streams, it encounters specific challenges when dealing with unbounded file sources. One of the significant issues with using FileSource for unbounded file sources is that the enumerator (the component that discovers files to be processed) keeps track of the paths of all already processed files. This record-keeping is essential to avoid reprocessing the same files. However, as the system operates over time, this state information can grow quite large, potentially leading to scalability and performance issues. This is specifically problematic for object storage where incoming files are appended to newer partitions (like a datalake).

This limitation is well documented on Flink docs:

For Unbounded File Sources, the enumerator currently remembers paths of all already processed files, which is a state that can, in some cases, grow rather large. There are plans to add a compressed form of tracking already processed files in the future (for example, by keeping modification timestamps below boundaries).

According to the Flink documentation, there are plans to address this issue by introducing a more compressed form of tracking already processed files. One proposed method is to keep track of file modification timestamps below certain boundaries, which could significantly reduce the state size by abstracting the exact paths into broader timestamp categories. This improvement would help in managing the state more efficiently, though it is not yet implemented.

For more details on the current capabilities and future plans for FileSource, you can refer to the Flink documentation.

In the next section, we will explore an event driven architecture designed to handle the continuous ingestion of data streams in a way that addresses these limitations of the FileSource. This will provide a more tailored solution that fits the dynamic requirements of data lakes where the entire dataset keep growing (new files are appended continuously).

Architecture

The architecture of our custom Flink Source API implementation is designed to leverage the event notification systems provided by cloud services, such as AWS S3. These event-driven mechanisms are crucial for enabling efficient, real-time data processing pipelines. By using these notifications, our solution can immediately respond to new data, avoiding the need for continuous polling and reducing the latency typically associated with batch processing.

AWS S3 event notifications is designed to alert subscribers immediately after certain events occur on objects in an S3 bucket. These events can include creating, deleting, or modifying objects. Users can configure these notifications to trigger a response from various AWS services, such as Lambda functions, SQS queues, or SNS topics, which can then be integrated into larger processing workflows.

Here’s a typical use case in our context:

  • File Upload: When a file is uploaded to an S3 bucket, an event notification is generated.
  • Notification Receipt: This notification can be configured to be sent to an AWS Lambda function, an Amazon Simple Notification Service (SNS) topic, or directly to an Amazon Simple Queue Service (SQS) queue.
  • Triggering Processing: For our Flink Source API, the notification can trigger the processing of the newly uploaded file without waiting for a scheduled scan of the bucket, thus reducing latency and overhead.

Example of Configuring AWS S3 Event Notifications

To configure S3 event notifications that integrate with a Flink streaming job, follow these steps:

  • Set up an S3 Bucket: Ensure your S3 bucket is ready and has the correct permissions.
  • Configure Event Notifications: Set up event notifications in the S3 bucket to send a message to an SQS queue whenever a new file is uploaded. For example using AWS management console, select the bucket for which you want to set up notifications. Navigate to the “Properties” tab and scroll to “Event notifications”. Then click “Create event notification”. Enter a name for your event notification. Under “Event types,” select “All object create events”. Under “Destination” choose “SQS Queue” and select the queue to which notifications should be sent.

Once a new file is uploaded to the bucket, an event will be pushed to SQS queue. The following depicts a sample JSON for the aforementioned event:

{
"Records": [
{
"eventVersion": "2.1",
"eventSource": "aws:s3",
"awsRegion": "eu-central-1",
"eventTime": "2024-03-26T15:19:47.914Z",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "AWS:*******"
},
"requestParameters": {
"sourceIPAddress": "************"
},
"responseElements": {
"x-amz-request-id": "************",
"x-amz-id-2": "************"
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "tf-s3-queue-20240326151721372100000001",
"bucket": {
"name": "<BUCKET_NAME>",
"ownerIdentity": {
"principalId": "************"
},
"arn": "<S3_ROLE_ARN>"
},
"object": {
"key": "file.jpeg",
"size": 5562,
"eTag": "440d663d81a95249495183196b0f3b66",
"sequencer": "006602E793DC885EA2"
}
}
}
]
}

We can make use of this event to enable event driven ingestion of files into Flink DataStream. Here is the architecture diagram:

Here is high level flow of the data pipeline:

  1. Once a file is uploaded to the bucket, S3 event notification message is sent to the queue which in turn is delivered to our newly implemented source.
  2. Upon delivery, our source opens the file on S3 and emits messages line by line.
  3. Once the file is finished, the message is delete from SQS queue.
  4. Rinse and repeat the process.

Using this approach, we’ve fixed the two main issues:

  1. There is no need to constantly poll S3 for new files.
  2. There is no need to keep track of list of already processed files in the state store. In other words, S3 bucket can grow indefinitely with as many partitions and files as needed.

Flink Source API

When creating a new source for Apache Flink, the process involves implementing several key interfaces to integrate smoothly with Flink’s runtime. Specifically, you need to implement the Source and SourceReader interfaces. In scenarios where file splitting is not required, as is the case with our implementation, we manage file handling without actual splits, using a placeholder class for managing file paths and metadata.

The Source interface is the main entry point for integrating custom data sources into Flink. This interface requires defining how the source behaves, including how it reads from the underlying data system and how it interacts with potential splits. For our implementation, where we handle files as a whole, the main tasks are:

  • Initializing and maintaining the state of the source.
  • Creating a SourceReader to actually read the data.
public class S3EventNotificationSource implements Source<String, NoSplitFile, Void> {

String queueUrl = null;

public S3EventNotificationSource(String queueUrl) {
this.queueUrl = queueUrl;
}

@Override
public Boundedness getBoundedness() {
return Boundedness.CONTINUOUS_UNBOUNDED;
}

@Override
public SplitEnumerator<NoSplitFile, Void> createEnumerator(SplitEnumeratorContext<NoSplitFile> enumContext) {
return new NoOpEnumerator(enumContext);
}

@Override
public SplitEnumerator<NoSplitFile, Void> restoreEnumerator(SplitEnumeratorContext<NoSplitFile> enumContext, Void checkpoint) {
return new NoOpEnumerator(enumContext);
}

@Override
public SimpleVersionedSerializer<NoSplitFile> getSplitSerializer() {
return new NoSplitFileSerializer();
}

@Override
public SimpleVersionedSerializer<Void> getEnumeratorCheckpointSerializer() {
return new VersionedVoidSerializer();
}

@Override
public SourceReader<String, NoSplitFile> createReader(SourceReaderContext readerContext) {
return new S3EventNotificationSourceReader(readerContext.getConfiguration(), queueUrl);
}
}

Explanation:

  • Note that we defined this source as unbounded source using Boundedness.CONTINUOUS_UNBOUNDED.
  • Since we don’t enumerate over input files, we have a placeholder class (NoOpEnumerator) for SplitEnumerator. Below you’ll find the code for these NoOp classes.
  • Since we process files one by one and we don’t support exactly-once semantics, we don’t need to have state. As a result, we don’t need to keep track of anything during checkpointing process. VersionedVoidSerializer is a placeholder class that does nothing.

The main class that reads input files is S3EventNotificationSourceReader. For the case of simplicity, we emit each line as a String.

The following is an implementation of NoSplitFile class:

public class NoSplitFile implements SourceSplit, Serializable {
private static final long serialVersionUID = 1L;

@Override
public String splitId() {
return null;
}
}

NoSplitFileSerializer also does nothing:

public class NoSplitFileSerializer implements SimpleVersionedSerializer<NoSplitFile> {
@Override
public int getVersion() {
return 1;
}

@Override
public byte[] serialize(NoSplitFile obj) throws IOException {
return new byte[0];
}

@Override
public NoSplitFile deserialize(int version, byte[] serialized) throws IOException {
return null;
}
}

Another placeholder class is VersionedVoidSerializer:

public class VersionedVoidSerializer implements SimpleVersionedSerializer<Void> {
@Override
public int getVersion() {
return 1;
}

@Override
public byte[] serialize(Void obj) throws IOException {
return new byte[0];
}

@Override
public Void deserialize(int version, byte[] serialized) throws IOException {
return null;
}
}

The main class that takes care of reading the input file and emitting each line is S3EventNotificationSourceReader:

public class S3EventNotificationSourceReader implements SourceReader<String, NoSplitFile> {

Configuration config;

private final SqsClient sqsClient;
private final S3Client s3Client;

private BufferedReader currentFile = null;
private Message currentMessage = null;

private final String queueUrl;

public S3EventNotificationSourceReader(Configuration config, String queueUrl) {
this.config = config;

this.sqsClient = SqsClient.builder().region(Region.EU_CENTRAL_1).build();
this.s3Client = S3Client.builder().region(Region.EU_CENTRAL_1).build();

this.queueUrl = queueUrl;
}

@Override
public void start() {
System.out.println("Source Reader Started");
}

@Override
public InputStatus pollNext(ReaderOutput<String> output) throws IOException {
if (currentFile == null) {
return InputStatus.NOTHING_AVAILABLE;
}
String currentLine = currentFile.readLine();
if (currentLine != null) {
output.collect(currentLine);
return InputStatus.MORE_AVAILABLE;
} else {
IOUtils.closeQuietly(currentFile);
currentFile = null;
return InputStatus.NOTHING_AVAILABLE;
}
}

@Override
public List<NoSplitFile> snapshotState(long checkpointId) {
return Collections.emptyList();
}

@Override
public CompletableFuture<Void> isAvailable() {
if (Objects.isNull(currentFile)) {
// delete SQS message
if (!Objects.isNull(currentMessage)) {
sqsClient.deleteMessage(DeleteMessageRequest.builder().queueUrl(this.queueUrl).receiptHandle(currentMessage.receiptHandle()).build());
currentMessage = null;
}

ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder()
.queueUrl(this.queueUrl)
.maxNumberOfMessages(1)
.visibilityTimeout(Math.toIntExact(Duration.ofHours(12).toSeconds())) // max visibility timeout
.waitTimeSeconds(5)
.build();
ReceiveMessageResponse receiveMessageResponse = sqsClient.receiveMessage(receiveMessageRequest);
if (receiveMessageResponse.hasMessages()) {
currentMessage = receiveMessageResponse.messages().get(0);
S3EventNotificationRecord eventRecord = S3EventNotification.fromJson(currentMessage.body()).getRecords().get(0);
GetObjectRequest getFile = GetObjectRequest.builder()
.bucket(eventRecord.getS3().getBucket().getName())
.key(eventRecord.getS3().getObject().getKey())
.build();
currentFile = new BufferedReader(new InputStreamReader(s3Client.getObject(getFile)));
}
}

return CompletableFuture.completedFuture(null);
}

@Override
public void addSplits(List<NoSplitFile> splits) {

}

@Override
public void notifyNoMoreSplits() {

}

@Override
public void close() {
IOUtils.closeQuietly(currentFile);
s3Client.close();
sqsClient.close();
}
}

Let’s evaluate line by line:

  • In the constructor, SqsClient and S3Client are instantiated.
  • As long as we return InputStatus.MORE_AVAILABLE Flink runtime will keep calling pollNext() . Once we return InputStatus.NOTHING_AVAILABLE , it will call isAvailable().
  • isAvailable() returns a CompletableFuture<Void>. Once it completes, it will switch to pollNext.
  • Within isAvailable() , first we delete the pending SQS queue. Then poll a new message. Finally, a BufferedReader is created over the S3 file. This helps us read the file line by line: new BufferedReader(new InputStreamReader(s3Client.getObject(getFile))). If nothing is available on SQS queue, it times out after 5 seconds. An empty future is returned which triggers another round of isAvailable(). Under a sustained workload, this doesn’t create a polling overhead.
  • Job failure is not an issue as we delete the incoming message from SQS queue after we have fully processed it. In other words, there is a chance of data duplication, but no data loss (at-least-once semantics).

Now that we have a baseline SourceReader, we can create a DataStream :

public class DataStreamJob {

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String queueUrl = "<QUEUE_URL>";
S3EventNotificationSource mySource = new S3EventNotificationSource(queueUrl);

env.fromSource(mySource, WatermarkStrategy.noWatermarks(), "S3 File Reader")
.map((s) -> {
String[] parts = s.split(",");
return new Person(Long.valueOf(parts[0]), parts[1], Integer.valueOf(parts[2]));
})
.addSink(JdbcSink.sink(
"INSERT INTO person (id, name, age) VALUES (?, ?, ?)",
(statement, person) -> {
statement.setLong(1, person.id);
statement.setString(2, person.name);
statement.setInt(3, person.age);
},
JdbcExecutionOptions.builder()
.withBatchSize(10)
.withBatchIntervalMs(200)
.withMaxRetries(10)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://localhost:5432/testdb")
.withDriverName("org.postgresql.Driver")
.withUsername("postgres")
.withPassword("123456")
.build()
));

// Execute program, beginning computation.
env.execute("S3 Event Notification Source");
}
}

Explanation:

  • For the sake of example, a JDBC sink is created with a local PostgreSQL instance.
  • The input file is CSV file with 3 fields (id INT, name VARCHAR, age INT). The file is directly mapped to the sink table without any more transformation. We only split the line and emit Person POJO.

Once a file is uploaded, our job immediately emits the input records and dumps into JDBC sink:

SELECT * FROM person;

This would return

+--+-----+---+
|id|name |age|
+--+-----+---+
|1 |majid|23 |
|4 |ali |75 |
|2 |javad|43 |
+--+-----+---+

Enhancing Semantics and Reliability

The current implementation of our custom Flink source primarily supports at-least-once semantics. While this level of reliability ensures that no data is lost, it does mean that in the event of a system failure, some data might be processed more than once. Additional deduplication mechanisms must be implemented either within the streaming pipeline or at the sink.

To support exactly-once semantics, which ensures that each piece of data is processed exactly once — no more, no less — even in the event of failures, we need to implement a more sophisticated state management system. The key steps involved include:

  • State Management: Enhance the source to manage and store the state more granularly. For files, this could involve not just remembering which files have been processed, but also tracking which lines or records within those files have been handled.
  • Checkpoint Integration: Integrate with Flink’s checkpointing mechanism to store and restore this state. In the event of a failure, Flink would use this checkpointed state to accurately determine the next record to process, thereby avoiding duplicates.
  • Line Number Tracking: Specifically, we need to keep track of the last line number processed for each file. Store these line numbers in the state store, which is saved at each checkpoint. Upon recovery, processing can resume from the next line, ensuring no data is reprocessed unnecessarily.

Conclusion

We explored the innovative approach of simulating SnowPipe and ClickPipe functionalities using Apache Flink’s Source API. Starting from understanding the limitations of the existing FileSource in handling unbounded file sources, we digged into creating a custom source implementation that leverages event-driven mechanisms provided by cloud services like AWS S3. This approach not only enhances the efficiency of data ingestion processes but also aligns with the dynamic requirements of real-time data streaming architectures.

We delved into the detailed architecture of our solution, examining each component from the creation of a custom Source and SourceReader to the integration with cloud-based event notifications.

Looking forward, we discussed potential enhancements to improve the robustness of our solution, specifically moving from at-least-once to exactly-once semantics. I will keep this enhancement for another post.

--

--