Streaming word count app with Grainite

Build a streaming word count application with real-time aggregations on Grainite

Gautam Mulchandani
Grainite
7 min readDec 19, 2022

--

In this article, you will learn how to build a complete word count application using Grainite. After a brief introduction to stream processing and Grainite, we will walk through the process of building the word count application. We picked word count because it is considered to be the “Hello world!” of big data systems.

What is Stream Processing?

Stream processing is a way to handle data as it arrives instead of batching it up and then periodically processing the batched data. Stateful stream processing involves joining an entity’s current state with streaming events that result in the entity getting updated. As organizations are looking to become nimble and respond quickly to changing customer needs, stateful stream processing has become increasingly prevalent.

Downsides of traditional approaches

Building applications that require event stream processing using existing technologies is complicated, to say the least. This is due to the need for integrating disparate components for stream ingest and storage as well as stream compute engine. Add in a few databases, and caches to store and manage the application state. These components, usually deployed in independent clusters, must be in place before attempting to build such applications.

Overall, building stateful event stream processing applications involves a combination of distributed programming and system design. Developers need to learn the APIs of these components and reason about integrations, failures, and slowdowns. Operations teams need to manage these components and ensure their smooth running, security & scaling as load spikes. This makes building, deploying, and running these applications difficult & expensive, bringing risk to the business initiative.

Introduction to Grainite

Grainite is a platform for building and running stateful stream processing and event-driven applications. It converges Event ingest & storage, Stateful computation, a NoSQL database, and a host of other capabilities into a single platform. Grainite is built from the ground up and comes with a single set of APIs. It has built-in, adaptive caching that adjusts to the load requirements of the applications.

As a result, developers can build and test entire applications with just a single Grainite docker image on a laptop. The operations team can deploy these applications unchanged to Grainite on any public or private cloud. Apart from Kubernetes for cluster deployment and storage, Grainite has no other dependencies. The Grainite conceptual model explains how to think about Grainite. The rest of this blog assumes knowledge of the Grainite conceptual model.

You can request a free Grainite trial (or optionally download a docker image) here.

Wordcount with Grainite

We will build a streaming word count application that receives a stream of lines from documents and produces counts of words and documents.

The complete project, along with instructions to run it and prerequisites is in this Gitlab repository. The Grainite API Javadocs are also available from the same repository here.

In Grainite, the central artifact is the app.yaml. The app.yaml file contains the definition of the app including its tables, topics, action handlers, and their subscriptions to topics. Here is a snippet of the word count app.yaml.

app_name: wordcount
package_id: com.grainite.samples.wordcount
# Location of jar file relative to this config file
jars:
- target/wordcount-0.1-SNAPSHOT-jar-with-dependencies.jar

topics:
- topic_name: line_topic
key_name: hash
key_type: string

tables:
- table_name: line_table
key_type: string
action_handlers:
- name: LineEventHandler
type: java
class_name: com.grainite.samples.wordcount.handlers.LineEventHandler
actions:
- action_name: handleLineEvent
subscriptions:
- subscription_name: lineUpdates
topic_name: line_topic
topic_key: hash
  1. The application name: wordcount
  2. The package that contains the application code: com.grainite.samples.wordcount
  3. The jar file(s) for the application
  4. Topic named line_topic
  5. A table named line_table

line_table has an action handler LineEventHandler with an action handleLineEvent in it. The handleLineEvent action in turn subscribes to events that arrive in the line_topic

Grainite requires events to have keys. In this example, the key is simply the hash of the first 3 letters of the document from where the events are originating.

The figure below shows how the application is set up in Grainite, all based on the contents of app.yaml:

The Wordcount app contains 3 tables, each with a single action handler and a single topic for stream ingestion. The LineEventHandler subscribes to the line topic so all events sent from clients that arrive at this topic are processed by the LineEventHandler.

When clients send events to topics, Grainite stores them and acks the client. Grainite also invokes the action(s) that subscribe to the events. Grainite does this in an exactly-once manner, from the event ingestion to action invocation, and storing the results of the actions are done transactionally.

Developers only need to code the action handlers using plain java and upload them to Grainite. Here is the code in the handleLineEvent action which is part of LineEventHandler.java

// actions accept the event payload and current state of the "grain" to the action
public ActionResult handleLineEvent(Action action, GrainContext context) {
LineEvent event = ((TopicEvent) action).getPayload().asType(LineEvent.class);
// split line into words, strip each word, increment total word count of line
final AtomicLong wordCount = new AtomicLong(0);
Stream.of(event.getLine().split("[. ]+")).forEach(w -> {
// for each word, send to word stats table, and increment document count.
if (w.trim().length() == 0) return;
w = w.toLowerCase();
wordCount.incrementAndGet();
// send event to each word to increment itself
GrainOp.Invoke invoke = new GrainOp.Invoke(Constants.HANDLE_WORD_EVENTS_ACTION, Value.of(w));
context.sendToGrain(Constants.WORDS_TABLE, Value.of(w.substring(0, 1)), invoke, null);
});
long numPeriods = countPeriods(event.getLine());

GrainOp.Invoke invokeDoc = new GrainOp.Invoke(Constants.HANDLE_DOC_STATS_EVENT,
Value.ofObject(new DocumentStats(numPeriods, wordCount.get())));
// now update document count.
context.sendToGrain(Constants.DOC_STATS_TABLE, Value.of(event.getDocName()), invokeDoc, null);
return ActionResult.success(action);
}

The parameters to handleLineEvent are the incoming event and the state of the entity. In this case, the incoming event contains a line of text as the payload and the entity is simply a hash of the first 2 characters of the incoming line.

Note: Grainite, unlike many other systems, does not require users to partition/shard the tables and topics in order to do parallel processing. Instead, topics and tables are required to have a key and Grainite automatically partitions them by key. So the unit of partitioning is the key. This enables massive parallel processing of incoming events.

Here, handleLineEvent splits the line into words and sends an event to the handleWordsEvent action handler that is associated with the words table. It also sends an event to the handleDocEvent that is associated with the doc stats table.

Note: Grainite guarantees the atomic execution of action handlers. So, after running the actions, all the side effects — such as updating the state of the grain and/or sending events to other entities — are guaranteed to successfully execute together or not execute at all. (Other systems use the outbox pattern to achieve this. Grainite automatically natively handles this)

For completeness, here are the actions from DocStatsHandler and WordStatsHandler:

public class DocStatsHandler {
public ActionResult handleDocStatsEvent(Action action, GrainContext context) {
// get current state of the document from passed in context
DocumentStats prevStats = context.getValue().asType(DocumentStats.class);
// extract event payload from event sent by handleLineEvent
DocumentStats event =
((GrainRequest)action).getPayload().asType(DocumentStats.class);
context.getLogger().info("DocStatsHandler: Received wordCount for doc - " +
context.getKey().asString());
// update state of the document based on incoming event
prevStats.numPeriods += event.numPeriods;
prevStats.numWords += event.numWords;
// save new state of document
context.setValue(Value.ofObject(prevStats));
return ActionResult.success(action);
}
}
public class WordStatsHandler {
// increments the count of this word. note this count is across all docs.
public ActionResult handleWordEvent(Action action, GrainContext context) {
Value word = ((GrainRequest)action).getPayload();
context.getLogger().info("WordStatsHandler: Received word - " +
word.asString());
// word_table stores all the words and their counts in a single map
context.mapPut(0, word, Value.of(context.mapGet(0, word).asLong(0) + 1));
return ActionResult.success(action);
}
}

Under the covers, Grainite first hydrates the document and then invokes handleDocStatsEvent action. This is an example of data scheduled compute — executing compute not only close to the data but also after fetching the data.

As can be seen above, the task for the developer is to simply implement the business logic of handling the incoming event using straight-line Java. Besides data scheduled compute, Grainite serializes all access to the document so that there is no contention and thus no need for locking the document. After the action has been completed, Grainite persists the document state such that the execution of the action was a single atomic operation.

Similarly, the handleWordEvent action maintains statistics for all words in the word stats table. There is a single grain in the word stats table that maintains the statistics for all words. This information is put in the map of the grain.

Here is a snippet of the client code to send events to the line event. (The callback is removed just for brevity):

Topic topic = client.getTopic(Constants.APP_NAME, Constants.LINE_TOPIC);
Path inputPath = Paths.get(inputFile);

AtomicLong numEventsSent = new AtomicLong(0);
Files.lines(inputPath).forEach(line -> {
if (!line.isEmpty()) {
numEventsSent.incrementAndGet();
topic.appendAsync(
new Event(Value.of(line.substring(0, 2)),
Value.ofObject(new LineEventHandler.LineEvent(
inputPath.getFileName().toString(), line))));
}
});

Serve the Word Count data to clients

Grainite is not only a platform for event streaming, but it also serves queries with its built-in database. Here is code from the client that queries Grainite for a document and prints out the statistics for that document.

Table docTable = client.getTable(Constants.APP_NAME, Constants.DOC_STATS_TABLE);
Grain docGrain = docTable.getGrain(Value.of(docName), false);
if (docGrain == null) {
System.out.printf("Document %s not found\n", docName);
System.exit(1);
}
DocumentStats stats = docGrain.getValue().asType(DocumentStats.class);
System.out.printf("Document %s { Sentences: %d, Words: %d }\n", docName, stats.numPeriods, stats.numWords);

Summary

Streaming applications require the underlying platform to provide several capabilities. Whether the application is cloud-native or not, each of these capabilities requires a separate component. At a minimum, even the simplest streaming apps require stream storage, processing, and state storage (database). It is usually left to the application developer to integrate these components and reason about correctness, scalability, resiliency, security & SLAs.

Grainite takes the opposite approach — it coalesces these capabilities into a single component that can be deployed anywhere Kubernetes is available. As a result, it can offer several platform guarantees i.e.: exactly-once processing and message delivery, observability, and security. The application developer can now focus on the logic to process the streaming events while rapidly building and deploying these apps to production.

To summarize: In this simple example, you saw how Grainite can

  • Ingest events from producers
  • Execute processing logic on those incoming events statefully
  • Update the state of entities, including their history
  • Serve client queries with its database

This convergence of capabilities is what makes Grainite extremely powerful, simple to use, and the perfect platform for building stateful event streaming applications.

--

--