Introduction to Event-Driven Systems with Stream Processing

Ratros Y.
Google Cloud - Community
9 min readSep 5, 2019

This document discusses the basics of stream processing event-driven systems. It is a part of the Building Event-Driven Cloud Applications and Services tutorial series.

Introduction

Stream processor is the centerpiece of a stream processing event-driven system and sets this pattern apart from its reactive cousin: the processor bundles input, data processing, and output together, while managing its own execution single-handedly. Developers no longer write the code from scratch all by themselves; instead, they manipulate data in the context of chosen stream processor.

This pattern provides a higher level of abstraction in terms of events. Instead of reacting to each and every event, most stream processing systems speak in the language of streams, applying actions to the flow as a whole. Stream processors help extract state from the stream, and pass it back to developers for further action or analysis. Many stream processing solutions are also built with distributed processing in mind; they can easily handle large amount of streaming data with little to none human intervention provided that you have a powerful enough cluster to support the workflow.

It might be easier to understand the distinct nature of stream processing event-driven systems with the example of view counting in Youtube: every time you open a Youtube video, Google displays its current view count and increases the number by one. The use case looks simple, but it can be fairly challenging engineering wise considering the scale of Youtube (5+ billion views per day), as the functionality is both read- and write- intensive:

  • If you use a SQL/NoSQL database to save the counts, the constant locking and releasing caused by swarming write (increment view count by 1) requests will impact the performance heavily. This is especially true for popular videos, which may receive up to 30,000 views per minute.
  • Constant writes will not be a problem if you persist data with event sourcing; however, with these solutions getting the view count becomes computationally inefficient as each read requires scanning and calculating all the related data. You may have to cache the results and risk providing stale data to viewers.

Stream processing event-driven systems, on the other hand, champions this use case by focusing on the big picture: each view becomes an event in the stream, and the stream processor calculates the view count on the fly in-memory. In other words, it extracts the state (aggregated view counts) from the stream of individual view count events so that the downstream can have what they truly want (the view count) without having to worry about the less important individual pieces (the views). With a little magic of distributed processing, this pattern can handle (almost) real-time view counting (and a large number of other similar use cases) easily at any scale, and many businesses and organizations have adopted it in production.

In this tutorial you will build a similar (but basic) view counting stream processing system with Apache Flink, one of the commonly adopted stream processors in the field, where an app sends view events via Cloud Pub/Sub and Apache Flink helps count the views using the event stream. The demo project uses Java (for Apache Flink) and Python (for the example app).

Architectural Overview

The workflow is as follows:

  1. The app publishes a number of view events via Cloud Pub/Sub.
  2. An Apache Flink cluster pulls the events from Cloud Pub/Sub and starts processing the stream. More specifically,
  • (keyBy) The cluster partitions the stream for parallel, distributed processing, in accordance with the IDs of videos
  • (filter) The cluster filters all the events that are stale or duplicate
  • (flatMap) The cluster casts the event into a format easier to process, removing the event ID attribute from each event
  • (keyBy) The cluster partitions the stream again, in accordance with the IDs of videos
  • (timeWindow) The cluster groups all events that arrive within the last 10 seconds
  • (reduce) The cluster calculates the total view count for each video in the window

Setup

  1. Install Java 8 (use Gradle as the build automation tool) and Python 3 on your machine.
  2. Install Google Cloud SDK.
  3. Download Apache Flink.
  4. Create a Cloud Pub/Sub topic and a Cloud Pub/Sub subscription to the topic in your Google Cloud Platform project. Use the Pull type for the subscription.
  5. Clone the source code from GitHub.
git clone https://github.com/michaelawyu/stream-processing-demo

Understanding the code

Apache Flink, as the stream processor of this project, manages the input, data transformation, and output of the workflow, whose specifics are available at StreamingJob.java.

Input

Apache Flink provides built-in support for connecting to a Cloud Pub/Sub topic, which pulls messages (events) automatically. The configuration is as follows:

After setting up the input, Flink returns a DataStream which you can operation on. At this moment, since no data transformation is specified, the stream looks as follows (suppose a number of events, or views, has been published):

Step #1: Partitioning (keyBy)

The first action you will apply is to partition the stream using the IDs of videos:

After this step the stream looks as follows:

Step #2: Filtering (filter)

Next, filter the stream and remove all the duplicate events (if any). Cloud Pub/Sub guarantees only at least once delivery, and occasionally it may return the same message twice or more; consequently, to use Cloud Pub/Sub as a reliable source for view counting, you have to deduplicate events in the stream.

In this step, you will use a custom filter for deduplication. Insides the filter resides a LoadingCache provided by Google Core Libraries for Java; every time an event arrives, the custom filter checks if the ID of the event is in the cache; Flink will remove the event from the stream if the cache hits. The contents of the cache expire after 10 minutes automatically, so you do not have to worry about memory overflowing.

After this step the stream looks as follows:

Note

In reality Cloud Pub/Sub rarely duplicates events. It usually happens when the subscriber fails to acknowledge the event in time. The diagram is for demonstration purposes only and the percentage of filtered events in the picture is definitely not accurate.

Step #3 and Step 4: Mapping (flatMap) and Partitioning (keyBy)

In these two steps, you will map the event from a custom Java class, PubSubEvent, to a tuple of two items, and partition the stream again with the ID of videos (the first item of the tuple):

These two steps help make the final calculation step a little easier. After this step the stream looks as follows:

Step #5: Windowing (timeWindow)

At this moment, the event stream at this moment flow infinitely and it is impossible for us to output total view counts for each video if new views keep piling. In this step, you will ask Flink to window the stream, grouping all the tuples that arrive in the last 10 seconds:

After this step the stream looks as follows:

Step #6: Reducing (reduce)

Now you can calculate the total view counts for each video within the last 10 seconds using the reduce function. Reduce is a concept in the field of functional programming where the system builds the final return value by combining the return value in each layer of a recursive data structure.

Below is a diagram showcasing how reduce works in principle:

And in this step Flink calculates the final output, the view counts of videos, in the same manner:

Output

For simplicity reasons, the workflow in this demo project outputs the view counts of videos to the terminal (stdout). In production, however, you may want to save the output in the database for queries. In essence, this workflow helps outputs aggregated view counts for each video periodically (every 10 seconds), effectively eliminating the need to update the view count database every time a new view happens, thus greatly improving the performance of your application.

Scalability

Apache Flink is capable of executing the workflow in this demo project in parallel across a number of nodes, which you may set up to increase the throughput of your system. Flink provides the guarantee that events with the same key will always hit the same partition, regardless of how many partitions there might be, to make it easier for developers to design scalable streaming processing systems.

The second step of the workflow, for example, uses an in-memory cache for deduplication. The cache itself is bound to a Flink partition instead of the whole cluster; with parallelization enabled, every partition will have a cache of its own. Without the Flink partitioning guarantee, duplicate events may be sent to different partitions, effectively bypassing the deduplication mechanism and eventually poisoning the final view counts.

Other stream processing solutions may have deduplication functionality built-in: Cloud Dataflow, for instance, smartly uses Bloom Filter for fast and accurate duplicate detection. You may want to implement it instead of a basic cache used in your application.

Checkpoints

Every application crashes, and streaming processing event-driven systems are no exception. Normally developers have to write their own logic for error handling; however, since stream processors manages its own execution, many of them have the capability to recover from errors on their own. Apache Flink uses a checkpoint-based mechanism for disaster recovery: the system backups states automatically during execution at the specified interval as checkpoints; should an exception got raised, Apache Flink will restore states from them.

If you are merely manipulating data in a Flink workflow, such as adding two values passed by Flink and returning the sum in step 6, no additional setup is required for using checkpoints. On the other hand, developers must tell Flink how to backup and restore state if they try to introduce a custom variable as state in the workflow. The cache used in step 2, for example, is such a custom variable:

See it in action

Now you are ready to run this workflow in your local cluster:

export GCP_PROJECT=YOUR-PROJECT
export PUBSUB_TOPIC=YOUR-PUBSUB-TOPIC
export PUBSUB_SUBSCRIPTION=YOUR-PUBSUB-SUBSCRIPTION

Replace YOUR-PROJECT, YOUR-PUBSUB-TOPIC, and YOUR-PUBSUB-SUBSCRIPTION with values of your own.

  • Change to the directory of the cloned project and run the example app with the command below. It is recommended that you use a Python virtualenv.
cd helper
pip install -r requirements.txt
python main.py

The helper application publishes 60 events to your Cloud Pub/Sub topic, including 20 views for video 1, 30 views for video 2, and 10 views for video 3. You may edit the values yourself in main.py.

View the event specification here. The application also uses an event library prepared by CloudEvents Generator for publishing events.

You should see the following output:

Waiting for Cloud Pub/Sub to complete publishing events (20s)…
  • Next, build the Apache Flink workflow it with gradle:
cd ..
gradle build

The compiled JAR file lives in build/libs.

  • Change to the directory where Apache Flink is installed. If you installed Flink on macOS using brew, you may find the path with brew info apache-flink.
  • Start a local Flink cluster:
./bin/start-cluster.sh # Linux
./libexec/bin/start-cluster.sh # macOS (via brew)
./bin/start-cluster.bat # Windows
  • Run the compiled workflow:
./bin/flink PATH-TO-PROJECT/build/libs/stream-processing-demo-0.1.0-all.jar # Linuxflink PATH-TO-PROJECT/build/libs/stream-processing-demo-0.1.0-all.jar # macOS./bin/flink.exe PATH-TO-PROJECT/build/libs/stream-processing-demo-0.1.0-all.jar #Windows
  • You can now check if the view counts calculated by Flink match the value earlier:
tail log/flink-*-standalonesession-*.log

You should see

(1, 20)
(2, 30)
(3, 10)

or its equivalent in the outputs, which matches the numbers of views specified in main.py.

--

--