Kafka stream processing APIs

Merrin Kurian
Realtime Data Streaming
4 min readDec 24, 2017

There are a lot of resources for Apache Kafka from confluent and otherwise. There are quite a few tutorials, videos on how to use Kafka in production and for various scenarios such as low latency publishes or no loss publishes.

Ever since I got interested in stream processing, I have been searching for solutions without additional infrastructure, which is where Kafka Streams APIs come into picture. There are 2 levels of APIs available: Low level Processor APIs and High level abstractions with Stream DSL. The best tutorial I found on the subject is of course at Apache Kafka website itself: https://kafka.apache.org/documentation/streams/ It is a good introductory tutorial that helps grasp the concepts. However I had a hard time translating those to my use cases.

So I searched around and came across this wonderful set from Bill Bejeck: http://codingjunkie.net/kafka-processor-part1/ There are code samples using the Processor APIs for both stateful and stateless processing.

Here is my own version for the simple usecase I was trying to solve:

Let us say customers create invoices in their respective companies and I need to keep track of total number of invoices per company and also total invoice amount per company. The following code samples help provide the basics to achieve this. These code samples follow the Stock examples mentioned in the blog earlier.

In the input source stream “invoiceline", multiple invoice lines to the same company or different companies could be published. Each such invoice line needs to be unique identified in order to support log compaction by Kafka, which is why companyId is only part of the key and needs to be extracted. Partitioning is done based on companyId so that all invoicelines that belong to the same company are published and consumed in order. This is achieved using a custom partitioner.

In this example, “invoicelines-count-store” and “invoicelines-total-store” are the intermediate stores where the counts are aggregated and stored. Every time a new event arrives, existing counts are updated. The logic to update these counters are in InvoiceCountByCompany and InvoiceTotalsByCompany.

The stream processor then aggregates these counters and outputs are published by “counter” and “totaler” to their respective Kafka topics: “invoice-count-by-company” and “invoice-total-by-company”.

InvoiceLine object model is added for reference. JsonDeserializer and JsonSerializer classes are from the code samples mentioned in the earlier blog post.

package com.aggregate.compute;



import com.aggregate.model.InvoiceLine;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;

import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.Stores;

import java.util.Properties;

public class Application{

public static void main(String[] args) throws Exception {
JsonDeserializer<InvoiceLine> invoiceLineDeserializer = new JsonDeserializer<>(InvoiceLine.class);
JsonSerializer<InvoiceLine> invoiceLineSerializer = new JsonSerializer<>();
StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
LongSerializer longSerializer = new LongSerializer();
LongDeserializer longDeserializer = new LongDeserializer();

Serde<Long> summarySerde = Serdes.serdeFrom(longSerializer, longDeserializer);


Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "invoice-summary-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsConfig streamingConfig = new StreamsConfig(props);
TopologyBuilder builder = new TopologyBuilder();


builder.addSource("source", stringDeserializer, invoiceLineDeserializer, "invoiceline")
.addProcessor("counter", InvoiceCountByCompany::new, "source")
.addStateStore(Stores.create("invoicelines-count-store").withStringKeys()
.withValues(summarySerde).persistent().build(), "counter")
.addProcessor("totaler", InvoiceTotalsByCompany::new, "source")
.addStateStore(Stores.create("invoicelines-total-store").withStringKeys()
.withValues(summarySerde).persistent().build(), "totaler")
.addSink("sink", "invoicelines-out", stringSerializer, invoiceLineSerializer, "source")
.addSink("sink-2", "invoice-count-by-company", stringSerializer, longSerializer, "counter")
.addSink("sink-3", "invoice-total-by-company", stringSerializer, longSerializer, "totaler");

System.out.println("Starting InvoiceSummaryStatefulProcessor Example");
KafkaStreams streaming = new KafkaStreams(builder, streamingConfig);
streaming.start();
System.out.println("InvoiceSummaryStatefulProcessor Example now started");
}
}
package com.aggregate.compute;

import com.aggregate.model.InvoiceLine;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Objects;


public class InvoiceCountByCompany extends AbstractProcessor<String, InvoiceLine> {

private KeyValueStore<String, Long> summaryStore;
private ProcessorContext context;


public void process(String key, InvoiceLine invoiceLine) {
String companyId = extractCompanyIdFromKey(key);
Long count = summaryStore.get(companyId);
if (count == null) {
count = 1L;
} else {
count = count + 1L;
}
summaryStore.put(companyId, count);
System.out.println("updated store "+companyId +": " + count );
this.context.commit();
}


@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(10000);
summaryStore = (KeyValueStore<String, Long>) this.context.getStateStore("invoicelines-count-store");
Objects.requireNonNull(summaryStore, "State store can't be null");

}


@Override
public void punctuate(long streamTime) {
KeyValueIterator<String, Long> it = summaryStore.all();
while (it.hasNext()) {
KeyValue kv = it.next();
this.context.forward(kv.key, kv.value);
}
}
}
package com.aggregate.compute;

import com.aggregate.model.InvoiceLine;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Objects;

public class InvoiceTotalsByCompany extends AbstractProcessor<String, InvoiceLine> {
private KeyValueStore<String, Long> totalsStore;
private ProcessorContext context;


public void process(String key, InvoiceLine invoiceLine) {
String companyId = extractCompanyIdFromKey(key);
Long totals = totalsStore.get(companyId);
if (totals == null) {
totals = invoiceLine.getAmount().longValue();
} else {
totals = totals + invoiceLine.getAmount().longValue();
}
totalsStore.put(companyId, totals);
System.out.println("updated store "+companyId +": " + totals );
this.context.commit();
}


@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(10000);
totalsStore = (KeyValueStore<String, Long>) this.context.getStateStore("invoicelines-total-store");
Objects.requireNonNull(totalsStore, "State store can't be null");

}


@Override
public void punctuate(long streamTime) {
KeyValueIterator<String, Long> it = totalsStore.all();
while (it.hasNext()) {
KeyValue kv = it.next();
this.context.forward(kv.key, kv.value);
}
}
}
package com.aggregate.model;


public class InvoiceLine {

String lineId;
Double amount;

String account;
String accountId;

@Override
public String toString() {
return "InvoiceLine{" +
"companyId='" + companyId + '\'' +
", id='" + getId() + '\'' +
", lineId='" + lineId + '\'' +
", amount=" + amount +
", nameId='" + nameId + '\'' +
", name='" + name + '\'' +
", account='" + account + '\'' +
", accountId='" + accountId + '\'' +
'}';
}

String nameId;

public String getAccount() {
return account;
}

public void setAccount(String account) {
this.account = account;
}

public String getAccountId() {
return accountId;
}

public void setAccountId(String accountId) {
this.accountId = accountId;
}

public InvoiceLine(String companyId, String id, String lineId, Double amount, String nameId, String name, String account, String accountId) {

this.companyId = companyId;
this.id = id;
this.lineId = lineId;
this.amount = amount;
this.nameId = nameId;
this.name = name;
this.account = account;
this.accountId = accountId;
}

String name;

public InvoiceLine() {
}

public String getLineId() {
return lineId;
}

public void setLineId(String lineId) {
this.lineId = lineId;
}

public Double getAmount() {
return amount;
}

public void setAmount(Double amount) {
this.amount = amount;
}

public String getNameId() {
return nameId;
}

public void setNameId(String nameId) {
this.nameId = nameId;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}

--

--