Flink Map, CoMap, RichMap and RichCoMap Functions

M Haseeb Asif
Big Data Processing
2 min readMay 18, 2020

Flink has a powerful functional streaming API which let application developer specify high-level functions for data transformations. Applications developers can choose different transformations.

The most commonly used transformation is probably Map and there are different variants available in Flink. It can be confusing which one to use if you don’t know the difference.

All transformations require a user defined functions to be provided by application developer. For example, if we have to map String values to Integer from a data stream, we will transform each value using the MapFunction. MapFunction is used with the DataStreams and user has to implement the business logic for each value in the map method as follows

class MyMapFunction implements MapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
};

CoMapFunctions are similar to MapFunction except they are used for the ConnectedStream so we will map the values of both streams using respected map methods, map1, map2. There is no guarantee that which map method will be called first. The following sample has two input streams of Integer and strings and it returns boolean.

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
@Override
public Boolean map1(Integer value) {
return true;
}

@Override
public Boolean map2(String value) {
return false;
}
});

Rich functions provide four additional methods open, close, getRuntimeContext and setRuntimeContext other than map methods. We can have both RichMap and RichCoMap.

Open is used to make function stateful by initializing the state. It’s only called once. RuntimeContext is used to access different state types e.g. ValueState, ListState. Similarly, we can clean up on the close method as it is called when processing is done.

Following is the example using RichFlatMapFunction to count average of all values over a count window of 5. It does use value state to store the running sum and count of the values. We will initialize the state using open method of the rich function. We have used gerRuntimeContext to get the state descriptor.

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

/**
* The ValueState with count, a running sum.
*/

private transient ValueState<Tuple2<Long, Long>> sum;

@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

// access the state value
Tuple2<Long, Long> currentSum = sum.value();
currentSum.f0 += 1;
currentSum.f1 += input.f1;
sum.update(currentSum);

// if the count is 5, emit the average and clear the state
if (currentSum.f0 >= 5) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}

@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
sum = getRuntimeContext().getState(descriptor);
}
}

So to summarize, we have MapFunction for DataStreams and CoMapFunction for ConnectedStreams. Furthermore, appending Rich to these functions make them rich by adding four additional methods which are commonly used for state management.

References:
1.
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html
2.
https://javadoc.io/static/org.apache.flink/flink-java_2.11/0.10.1/org/apache/flink/api/java/record/functions/MapFunction.html
3. https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/RichMapFunction.html
4. https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/functions/co/RichCoMapFunction.html

--

--

M Haseeb Asif
Big Data Processing

Technical writer, teacher and passionate data engineer. Love to talk, write and code with Apache Spark, Flink or anything related to data