Part I: Develop stream processing apps using Apache Kafka Streams on Oracle Cloud

Abhishek Gupta
Oracle Developers
Published in
6 min readSep 5, 2017

--

This blog demonstrates how to develop and deploy a stateful stream processing microservice using the Kafka Streams library and Oracle Cloud services — Oracle Event Hub cloud , Oracle Application Container Cloud & Oracle Developer Cloud

Its split into two parts — Part I (this) covers the core concepts and development aspects, followed by Part II which deals with the setup, deployment & CI/CD part

Oracle Event Hub Cloud

Simply put, Oracle Event Hub Cloud is Managed Kafka (as a service) — it’s supported Kafka version has been bumped up to 0.10.2 (from 0.9.0). A Kafka fan probably knows what that implies — Kafka Streams support ! The sample application in the blog will leverage this

Oracle Application Container Cloud

It is an application platform-as-a-service (aPaaS) with support for polyglot runtimes (Java SE, Node.js, PHP, Ruby, Python etc.)

In the context of the use case presented in this blog, it plays two major roles

  • serve as a horizontally scalable platform for our stateful stream processing (micro) services
  • its caching functionality makes it a great choice for distributed store for the (intermediate) state of our stream processing application — details to follow

Oracle Developer Cloud

Serves as the CI/CD and agile development platform

Let’s start with the sample application overview, core concepts and then look at some code

Application: high level overview

The application and its goal are simple — given a continuous stream of tweets, keep track of the number of occurrences of a select set of keywords (e.g. java, paas, kafka etc.). It is similar to the canonical word count problem and hopefully easy enough to demonstrate the required concepts

There are three major components at play here — here is a quick overview of what they are and how they interact

Kafka Connect

The Kafka connect twitter (source) connector ingests the tweet stream and pushes them to a Kafka topic. Check out this cool project here

Covering Kafka Connect is out of scope of this blog — read more about it in the Kafka documentation

Managed Kafka broker (Oracle Event Hub Cloud)

Oracle Event Hub Cloud hosts the Kafka cluster and the topic (more in the Setup & Deployment section) and serves as the streaming data platform

Tweet analytics application

Oracle Application Container Cloud hosts the(Kafka Streams based) tweet analysis application as well as the state store (Cache) — more in the Setup & Deployment section

Here is what the system looks like at a high level

High level overview

Concepts

This section covers some of the fundamental concepts around Kafka Streams as well as (specific aspects of) Oracle Application Container Cloud which are relevant in the context of this blog

For details, I would recommend diving into the documentation for Kafka Streams and Oracle Application Container Cloud

Kafka Streams overview

In simple words, Kafka Streams is a library which you can include in your Java based applications to build stream processing applications on top of Apache Kafka. Other distributed computing platforms like Apache Spark, Apache Storm etc. are widely used in the big data stream processing world, but Kafka Streams brings some unique propositions in this area

  • Built on top of Kafka: leverages its scalable and fault tolerant capabilities. If you use Kafka in your ecosystem, it makes perfect sense to leverage Kafka Streams to churn streaming data to/from the Kafka topics
  • Flexible deployment & elastic in nature: You’re not restricted to a specific deployment model (e.g. cluster-based). The application can be packaged and deployed in a flexible manner and scaled up and down easily
  • For fast data (as well): Harness the power of Kafka streams to crunch high volume data in real time systems — it does not need to be at big data scale
  • Others: Has support for stateful, fault-tolerant, one record at a time and windowed operations

Kafka Streams APIs

Kafka Streams has two types of APIs for writing stream processing applications

  • DSL API: its a high level, fluent API which makes it easy to express stream processing topologies
  • Processor API: this is low-level API which can be used when you need more control, flexibility and customization

Kafka Stream State Stores

Kafka Streams provides a State Store feature using which applications can store its local processing results (the state). RocksDB is used as the default state store and it can be used in persistent or in-memory mode. There are a few options as far as these are concerned — one can either

  • use the out-of-the-box state implementations e.g. RocksDB, in-memory, or,
  • write a custom state store implementation i.e. implement/extend Kafka Streams interfaces/classes, or,
  • use the low level Processor API to plugin your own state store — this is the implementation choice for our example (more on this in the next sub-section)

Please note that as of now (Kafka Streams 0.11.0.0) custom implementations (options #2,#3 above) can only be used with the low level processor API

Oracle Application Container Cloud Cache as the State Store

In our sample application, the state which we care about is the count of occurrences of the keywords which we chose to follow — how is it implemented?

Oracle Application Container Cloud provides access to a scalable in-memory cache and it’s used the custom state store in our use case

  • the word and its count are stored as key-value pairs
  • the cache specific logic is plugged within the Low level Processor API based implementation
  • the Oracle Application Container Cloud Java Cache SDK is used

More details to follow in the upcoming Code section

You can read more about the Cache feature in the official documentation

Here are some of the benefits

  • the most important one is the fact that access to the local state (in the cache) is fast and easy and to implement (compared to the native API StreamsMetadata provided by Kafka Streams) — for details please refer to the documentation
  • Oracle Application Container cloud cache is distributed in nature(all app instances can access the same cache), is highly available, scalable and provides access via Java SDK as well as a REST API
  • It’s also possible for other applications/microservices (deployed in Oracle Application Container cloud) to access the cache contents (i.e. the stream processing state in this case) if required

Scalability

It’s possible to scale our stream processing service both ways (details in the documentation) i.e. elastically

  • Up and down — increase/decrease the memory (RAM)
  • In and out — decrease/increase the number of instances of the application

In case of a scale out, the Kafka Streams distributes the (stream) processing load among all the application instances — this is determined by the number of partitions in the topic e.g. if you have 10 partitions and 5 application instances, then the each instance will crunch data from two partitions

Here is a slightly more granular view of the stream processing service on Oracle Application Container Cloud

Implementation details

Code

Components

  • Kafka Streams library
  • Jersey (JDK based) HTTP container
  • Java client SDK for Oracle Application Container Cloud Cache

Let’s look at some code snippets — the project is available here

KafkaStreamsAppBootstrap.java bootstraps the Jersey HTTP container and the Kafka Streams application

KafkaStreamsTweetAnalysisJob.java sets up the Kafka Streams application along with the processor topology

TweetStreamProcessor.java extends org.apache.kafka.streams.processor.Processor & encapsulates the main processing logic

TweetAnalysisStateStore.java acts as a wrapper on top of Oracle ACCS Java Cache client SDK and exposes put and get operations

TweetStateResource.java is the REST endpoint which exposes the state i.e. the current count of the keywords we are following. It queries the Oracle ACCS cache to get the results

Others

  • TweetCountSerializer.java — custom serialization (& de-serialization) logic (extends com.oracle.cloud.cache.basic.io.Serializer) for cache contents (converts b/w Long and byte[])
  • Stat.java — JAXB annotated class to represent the state (counts)

That’s it for this part — move on part II for the following

  • Configuration & deployment of the Oracle Cloud components
  • How to run/test the application

Don’t forget to…

The views expressed in this post are my own and do not necessarily reflect the views of Oracle.

--

--

Abhishek Gupta
Oracle Developers

Principal Developer Advocate at AWS | I ❤️ Databases, Go, Kubernetes