Run a Stateful Streaming Service with Apache Flink and RocksDB

Aviral Srivastava
The Startup
Published in
5 min readJun 29, 2020

What is Flink?

Apache Flink is an open-source distributed system platform that performs data processing in stream and batch modes. Being a distributed system, Flink provides fault tolerance for the data streams.

What is Stateful Streaming Service?

To make our platform fault tolerance, we want to preserve the system’s state from time to time. Achieving fault tolerance means being capable of not losing any data when a node or more of the network goes down.

How does Flink become stateful?

Flink has three options for a stateful backend, and one of them includes RocksDB. However, only RocksDB can provide incremental checkpoints in Flink. In this blog post, we will go through a standard example of implementing Flink with RocksDB backend to have incremental checkpoints.

# Table of contents:

  1. Install Flink
  2. Configurational changes
  3. Running the example
  4. Checking the snapshots

Install Flink

Click here to head over to the official documentation to install Flink.

Configurational Changes

In Mac, open the configuration file with the following path:

/usr/local/Cellar/apache-flink/1.10.1/libexec/conf/flink-conf.yaml

In case you extracted the binary, then seek the relative path in the extracted folder:

cd flink-1.10.0
nano conf/flink-conf.yaml

The version, 1.10.1 might be different in your system, so take care of that.

In this file, make two changes:

Setting RocksDB as the state backend:

Search for state.backend and set it to: state.backend: rocksdb

Setting the directory for checkpoints

Search for state.checkpoints.dir and set it to your desired path. For eg, this is my configuration:

file:///Users/aviralsrivastava/dev/utkarsh-adwait-flink/frauddetection/checkpoints

hdfs won't work unless you install the Hadoop dependency for Flink. So, keeping it minimal, I decided to move forward with the normal filesystem.

Running the example

We will be running the Fraud-detection example.

Run the following command to create the above project.

mvn archetype:generate \\
-DarchetypeGroupId=org.apache.flink \\
-DarchetypeArtifactId=flink-walkthrough-datastream-java \\
-DarchetypeVersion=1.10.0 \\
-DgroupId=frauddetection \\
-DartifactId=frauddetection \\
-Dversion=0.1 \\
-Dpackage=spendreport \\
-DinteractiveMode=false

We need to be able to run the above example in our IDE with breakpoints, and, set RocksDB as our backend. Following is my pom.xml:

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
<http://www$>{flink.version}.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="<http://maven.apache.org/POM/4.0.0>" xmlns:xsi="<http://www.w3.org/2001/XMLSchema-instance>"
xsi:schemaLocation="<http://maven.apache.org/POM/4.0.0> <http://maven.apache.org/xsd/maven-4.0.0.xsd>">
<modelVersion>4.0.0</modelVersion>
<groupId>frauddetection</groupId>
<artifactId>frauddetection</artifactId>
<version>0.1</version>
<packaging>jar</packaging>
<name>Flink Walkthrough DataStream Java</name>
<url><https://flink.apache.org></url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.10.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url><https://repository.apache.org/content/repositories/snapshots/></url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- This dependency is provided, because it should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). --><!-- Example:<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
-->
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>spendreport.FraudDetectionJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.0.0,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>

After setting the above dependencies, we need to build the jar file so that we can submit our job to Flink.

Modify the code to create checkpoints

I added two lines to enable checkpoints:

// Enabling Checkpoint
long checkpointInterval = 5000;
env.enableCheckpointing(checkpointInterval);

The complete file becomes:

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package spendreport;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
//org.apache.flink.contrib.streaming.state
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;

import javax.security.auth.login.Configuration;

/**
* Skeleton code for the datastream walkthrough
*/
public class FraudDetectionJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setStateBackend(new RocksDBStateBackend(filebackend, true));

// Enabling Checkpoint
long checkpointInterval = 5000;
env.enableCheckpointing(checkpointInterval);

// Enable Web UI
// Configuration conf = new Configuration();
// env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");

DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name("fraud-detector");

alerts
.addSink(new AlertSink())
.name("send-alerts");

env.execute("Fraud Detection");
}
}

Building the jar file

Invoke a lifecycle:

mvn install

Invoke a plugin:

mvn compiler:compile
mvn org.apache.maven.plugins:maven-compiler-plugin:compile
mvn org.apache.maven.plugins:maven-compiler-plugin:2.0.2:compile

Create the package

mvn package. This command should be executed in the frauddetection directory.

Once done, we will have the .jar file in the target folder.

Start the cluster:

/usr/local/Cellar/apache-flink/1.10.1/libexec/bin/start-cluster.sh

Run the example

/usr/local/Cellar/apache-flink/1.10.1/libexec/bin/flink run /Users/aviralsrivastava/dev/utkarsh-adwait-flink/frauddetection/target/frauddetection-0.1.jar

Check the UI:

This is how the dashboard looks:

Checkpoints of Flink Job

We can also take a look at the path mentioned in the above image to check the persistence that our application is having with RocksDB as the backend.

Conclusion

In this blog post, we used RocksDB for stateful streaming in Flink. We began with installing Flink, configured it for using RocksDB as the state backend, and for having incremental checkpoints. We then modified our code to create checkpoints every five seconds, built the jar file, and observed the checkpoints created by Flink. In the next blog of this series, I will publish the results of profiling this code.

--

--