Asynchronous IO using Apache Flink

Amar Pandey
amarpandey
Published in
6 min readJun 6, 2021

What motivated the idea of ​​implementing asynchronous IO (Async IO)?

The asynchronous communication mechanism is used to solve the problem of the network delay when the Flink application interacts frequently with external systems. The external system can be Rest Server, Hbase Mysql, etc.

In general, IO operation is very expensive and time consumig process. For a (machine learning) ML streaming jobs, the data stream must obtain data from HBase (a data collection containing billions of records), and then perform calculations based on it. The bottleneck of this job is the operator design for accessing HBase. Although HBase has been highly optimized, the TPS(Transactions per second) of each subtask cannot be very high due to slow I/O operations, so the QPS(Queries per second) of the entire cluster is very high.

Let us take an external system as database. The product ID is stored in the order table. When we need product details, we need to query the product details in the database through the product ID. The common mode is serial synchronous access, sending request A to the database, and then waiting for the result to return, and then sending the next request after the result is returned. As shown on the left side of the figure below.

Sync. I/O vs Async. I/O

This is a synchronous access mode, and the network waiting time greatly hinders throughput and delay. AsyncIO can process multiple requests and restores concurrently. It can send multiple query requests to the database continuously. There is no need to wait between consecutive requests, which greatly improves the Flink application and the throughput of the program. As shown on the right side of the picture above.

The functions provided by Async IO are as follows:

  • Asynchronous access
  • Message sequence guarantee
  • State consistency guarantee

The following is an analysis of each function through examples and source code.

// Inherit RichSyncFuntion
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
// Database client
private transient DatabaseClient client;

@Override
public void open(Configuration parameters) throws Exception {
// Initialize the client
client = new DatabaseClient(host, post, credentials);
}

@Override
public void close() throws Exception {
client.close();
}

@Override
public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {

// Send an asynchronous request and return the result Futrue
final Future<String> result = client.query(key);

// Call back when the request is completed, and pass the result to resultFuture for collection
CompletableFuture.supplyAsync(new Supplier<String>() {

@Override
public String get() {
try {
return result.get();
} catch (InterruptedException | ExecutionException e) {
// Normally handled explicitly.
return null;
}
}
}).thenAccept( (String dbResult) -> {
resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
});
}
}

// Create the original data stream
DataStream<String> stream = ...;

// Perform AsyncI/O for the original data stream through AsyncDataStream to get the converted data stream
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.orderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

Use Flink’s Async I/O as above , which is concise and clear. There are two main steps:

  1. Inherit RichAsyncFunction, which is the business logic for asynchronous access.
  2. Use AsyncDataStream.unorderWait to perform asynchronous I/O conversion on the original Stream to get the converted Stream.

Async-I/O asynchronous access

AsyncDataStream

AsyncDataStream has two static methods, unorderedWait and orderedWait, corresponding to two modes of ordered and unordered respectively.

public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
DataStream<IN> in,
AsyncFunction<IN, OUT> func,
long timeout,
TimeUnit timeUnit,
int capacity)

public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
DataStream<IN> in,
AsyncFunction<IN, OUT> func,
long timeout,
TimeUnit timeUnit,
int capacity)
  • Orderly: the order in which messages are sent == the order in which messages are received, that is, first in, first out.
  • Out of order: ProcessingTime messages are completely out of order, and the results returned first are sent first. EventTime message, watermark specifies the boundary of the disorder. The messages between the two watermarks are disordered, and the disordered messages cannot exceed the watermark.

The orderedWait and underedWait methods mainly have 5 parameters:

  • in: Input data stream.
  • func: realize the business logic of asynchronous request
  • timeout, timeUnit: timeout time, asynchronous operation will be discarded after timeout.
  • capacity: The maximum number of asynchronous requests that can be processed at the same time.

The main job of AsyncDataStream.unorderedWait is to create an AsyncWaitOperator. AsyncWaitOperator is an operator implementation that supports AsyncI/O. It calls AsyncFunction internally and processes the returned result.

private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
DataStream<IN> in,
AsyncFunction<IN, OUT> func,
long timeout,
int bufSize,
OutputMode mode) {

......
// Create AsyncWaitOperatorFactor, factory method, internally create AsyncWaitOperator
AsyncWaitOperatorFactory<IN, OUT> operatorFactory = new AsyncWaitOperatorFactory<>(
in.getExecutionEnvironment().clean(func),
timeout,
bufSize,
mode);

return in.transform("async wait operator", outTypeInfo, operatorFactory);
}

AsyncWaitOperator

AsyncWaitOperator’s processing of elements: First, the element will be added to the Queue. The size of the Queue is Capacity. When the Queue is full, the joining action will be blocked.

If a timeout is set, a timer will be created for the element.

Finally, the asynchronous function will be called.

// Processing of elements by AsyncWaitOperator
public void processElement(StreamRecord<IN> element) throws Exception {
// Add elements to the queue
final ResultFuture<OUT> entry = addToWorkQueue(element);

final ResultHandler resultHandler = new ResultHandler(element, entry);

// Register timer
if (timeout > 0L) {
final long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();

final ScheduledFuture<?> timeoutTimer = getProcessingTimeService().registerTimer(
timeoutTimestamp,
timestamp -> userFunction.timeout(element.getValue(), resultHandler));

resultHandler.setTimeoutTimer(timeoutTimer);
}

// Asynchronous IO call
userFunction.asyncInvoke(element.getValue(), resultHandler);
}

When the asynchronous IO is completed, the resultHandler.complete() method will be called, and the result will be collected in the resultHandler

// The complete method of ResultHandler
@Override
public void complete(Collection<OUT> results) {
Preconditions.checkNotNull(results, "Results must not be null, use empty collection to emit nothing");

// Mutually Exclusive Conditions
if (!completed.compareAndSet(false, true)) {
return;
}

//Send the result to the next processing node
processInMailbox(results);
}

private void processInMailbox(Collection<OUT> results) {
// Send messages in the mail box thread, and processResults() for message processing
mailboxExecutor.execute(
() -> processResults(results),
"Result in AsyncWaitOperator of input %s", results);
}

private void processResults(Collection<OUT> results) {
// Calculate the result, cancel the timer
if (timeoutTimer != null) {
// canceling in mailbox thread avoids https://issues.apache.org/jira/browse/FLINK-13635
timeoutTimer.cancel(true);
}

// Update Queue's Entry
resultFuture.complete(results);
// Output the results of all queries from the Queue
outputCompletedElement();
}

// Send the result
private void outputCompletedElement() {
if (queue.hasCompletedElements()) {
// emit only one element to not block the mailbox thread unnecessarily
queue.emitCompletedElement(timestampedCollector);
// if there are more completed elements, emit them with subsequent mails
if (queue.hasCompletedElements()) {
mailboxExecutor.execute(this::outputCompletedElement, "AsyncWaitOperator#outputCompletedElement");
}
}
}

The resultFuture.compete() of the last function above will update the Entry in the Queue. Then send the completed elements in the queue.

Message sequence guarantee

AsyncWaitOperator uses StreamElementQueue to guarantee the order of messages. There are two subclasses: OrderedStreamElementQueue and UnorderedStreamElementQueue.

OrderedStreamElementQueue

OrderedStreamElementQueue implements order, and the internal data structure is a Queue of Java collections. When and when the element at the head of the queue has been completed, the element will be sent.

code shown as below:

@Override
public boolean hasCompletedElements() {
// The element at the top of the queue has been completed and can be sent
return !queue.isEmpty() && queue.peek().isDone();
}

// Send element
@Override
public void emitCompletedElement(TimestampedCollector<OUT> output) {
// Determine whether the leader element of the team can be sent
if (hasCompletedElements()) {
final StreamElementQueueEntry<OUT> head = queue.poll();
head.emitResult(output);
}
}

UnorderedStreamElementQueue

static class Segment<OUT> {
// Unfinished input elements.
private final Set<StreamElementQueueEntry<OUT>> incompleteElements;

// Undrained finished elements.
private final Queue<StreamElementQueueEntry<OUT>> completedElements;
}
public final class UnorderedStreamElementQueue<OUT> implements StreamElementQueue<OUT> {

private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class);

// Capacity of this queue.
private final int capacity;

// Queue of queue entries segmented by watermarks.
private final Deque<Segment<OUT>> segments;

// Take out the first element of Segments to determine whether it is complete.
@Override
public boolean hasCompletedElements() {
return !this.segments.isEmpty() && this.segments.getFirst().hasCompleted();
}
}

Segment is a queue, and another layer of queue is encapsulated in UnorderedStreamElementQueue.

Double-layer queue is used to solve the disorder of ProcessingTime and EventTime.

ProcessingTime is out of order - There is always only one element in segments, so all elements are placed in a queue.

EventTime out of order - Every time a watermark is placed, an empty segment is placed in the segments queue. Subsequent element addition will be another queue. This ensures that the elements between Watermarks are disordered.

State consistency guarantee

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);

ListState<StreamElement> partitionableState =
getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
partitionableState.clear();

try {
// Just save the elements in the queue in the state.
partitionableState.addAll(queue.values());
} catch (Exception e) {
partitionableState.clear();

throw new Exception("Could not add stream element queue entries to operator state " +
"backend of operator " + getOperatorName() + '.', e);
}
}

In the snapShot function, the state information is saved, which is the basis of state consistency.

AsyncWaitOperator is very simple to perform snapshots. You can see from the code that the following steps have been performed:

  1. Clear the original state storage first
  2. Take out all the information in the Queue, and then put it into the state storage area.
  3. Take a snapshot

That’s all! In my next blog, I will show flink-powered AI model serving.

--

--