Building Real-time Streaming Applications Using .NET Core and Kafka

TribalScale Inc.
TribalScale
Published in
6 min readMar 18, 2019

By: Srinivasa Gummadidala

In the world of MicroServices, .NET core gained more popularity because of its powerful original .NET Framework and ability to run in any platform.
The purpose of writing this post is to illustrate how to use the Confluent .NET Core libraries to build streaming applications with Kafka.

As you might have already heard, Kafka is currently the most popular platform for distributed messaging or streaming data.

The key capabilities of Kafka are:
-Publish and subscribe to streams of records
-Store streams of records in a fault tolerant way
-Process streams of records in real-time
-Replayable and real-time

In the Kafka world, a producer sends messages(records) to a Kafka node (broker) and these messages get stored in a topics and consumers subscribe to the topics to recieve new messages.

Kafka is popular among developers because it is easy to pick up and provides a powerful event streaming platform complete with just 4 APIs:
— Producer
— Consumer
— Streams
— Connect

Let’s take a simple use case of an e-commerce company. Assume we are building a simple “Order management” APIs to sell products like “Unicorn Whistles.”

Our objective here is to build fast & scalable backend APIs to take more order requests and quickly process or trigger other workflows to speed up the delivery process.

To address scaling individual apps and other performance related key metrics, let’s assume that we have decided to build the below two critical components:

— An Order API(RESTful) that takes users orders and responds immediately with some acknowledgment info.
— A background service that actually processes these order requests.

  • VSCODE or some .net code editor
  • .NET Core 2.1
  • Docker
  • Kafka Installation and Topics setup
  • Kafkacat command line tool
  • To install Kafka locally using Docker, please follow the instructions here.

In our use scenario, Order API takes user’s order requests and stores them in a topic called “OrderRequests.
So this Order API is going to be one of the producers for the topic “OrderRequests.

Let’s build OrderAPI as a .Net Core WebApi and add some code to produce messages to the Kafka topic.

Step 1: Create a base template for web API
— Create a directory with the name “OrderHandler” and run the below commands.

dotnet new webapi -n Api
dotnet new xunit -n Test
dotnet add Test/ reference Api/

Project Structure

Step 2: Create OrderController and implement POST handler to receive user’s order requests.

Note: Here, I have created a new POCO class called OrderRequest to represent the user’s order request. Refer my github repository for the complete code.

Step 3: Install package for Kafka “Confluent.Kafka”

dotnet install package “Confluent.Kafka”

With this, you get access to the producer and consumer APIs and its dependency classes.

Step 4: Use Confluent’s “Producer” class to connect and produce messages to KAFKA.

  • How do we open the connection to Kafka brokers? Creating a producer class instance with the recommended settings will maintain a connection with all the brokers in the cluster. You should generally avoid creating multiple Consumer or Producer instances in your application.

new Producer<string, string>(this._config)

  • How do we produce messages to topics: Use ProduceAsync(…) async method to write messages to one or more topics and await on the result.

Note: Make sure that you import the Confluent.Kafka and Confluent.Kafka.Serialization namespaces. Because you need these namespaces for accessing Kafka APIs.

To do things in OOP way and maintain clean code, wrapper the above code inside a new wrapper class(Here I named it as “ProduceWrapper”). Refer to my github repo

Make sure that you invoke the producer wrapper code from your controller code as below.

Issue dotnet run command to start the web server.

Trigger user order requests from POSTMAN or any rest client:

Verify if our requests were produced to Kafka’s “OrderRequests” topic :

  • Use “kafkacat” command line utility tool to verify

Use Confluent’s “Consumer” class to connect and consume message from Kafka topic.

How do we open the Kafka connection and consume messages?

  • Create a consumer class instance with the recommended settings and subscribe to the topic.

var consumer = new Consumer<string,string>(this._consumerConfig);

consumer.Subscribe(topicName);

  • Use “Consume()” method to start reading messages from the topic.

ConsumerConfig Settings:

  • GroupId: Records will be load balanced between consumer instances with the same group id.
  • AutoOffsetReset: Kafka lets you consume topic records in any order, you can consume from the beginning or latest, or reset to a particular position, and so on.
  • EnableAutoCommit: Kafka uses a concept called OFFSET to track each consumer position in the entire topic consumption. When this setting is on, your consumer instance is going to commit its Offset to Kafka every time you fetch a record.

As we have already implemented a POST handler to capture user order requests into “orderrequests” Kafka topic, now we need to build something to process these records and write to “readytoship” Kafka topic. And this should happen in real time means as soon as records arrive in orderrequest topic.

We can create a background service that streams records from “orderrequests” topic.

Step 1: Create a new .NET Hosted Service called “ProcessOrdersService”

Step 2: Use Confluent library’s Consumer class to read messages from topic.

.NET Core lets you to host background services inside webhost or generic host, please refer this article.

Since we already have a webhost(created for webAPI), keeping our HostedService inside webAPI project will be sufficient to run this service.

“dotnet run” command will start both web server and this background service.

Project Structure
.NET Core HostedService

Verify if messages are produced to “readytoship” topic using Kafkacat utility.

Using Confluent’s .NET Producer and Consumer APIs is simple and straightforward, which makes it easy to adopt them for real microservices/streaming apps.

For detailed code, please refer my githhub repository.

Srini is an Agile Transformation Engineer at TribalScale based out of Boston office, .NET web developer focused on micro-service-first architecture design.

TribalScale is a global innovation firm that helps enterprises adapt and thrive in the digital era. We transform teams and processes, build best-in-class digital products, and create disruptive startups. Learn more about us on our website. Connect with us on Twitter, LinkedIn & Facebook!

Originally published at medium.com on March 18, 2019.

--

--

TribalScale Inc.
TribalScale

A digital innovation firm with a mission to right the future.