Creating Microservices with .NET Core and Kafka: A Step-by-Step Approach

Manan Patadiya
Simform Engineering
7 min readOct 23, 2023

--

Learn to build, integrate, and communicate microservices with Kafka in .NET Core.

For the past few years, most tech teams have preferred using microservice architecture in product development because of its benefits like scalability, reliability, and high performance. Also, .NET Core has gained significant popularity because of its powerful, original .NET Framework and ability to run on any platform.

To use microservices in our .NET application, we will need a messaging system to facilitate communication between them. There are quite a few messaging systems out there, and Apache Kafka is one of them. It’s an open-source and versatile stream-processing software that’s a high-throughput, low-latency messaging system for distributed applications.

Before diving into the topic at hand, I recommend that you first explore a comprehensive blog that covers the basics of Kafka, its advantages and disadvantages, the scenarios in which it is best employed, and the steps for installing it on your system. Here’s the blog link:

https://medium.com/simform-engineering/kafka-unleashed-a-comprehensive-guide-to-setting-up-kafka-fb607d5d37b4

In this blog, we will learn about how to establish communication between microservices using Kafka in a .NET Core application.

To carry out the practical implementation, you must ensure that the following items are installed on your system:

  • Visual Studio 2022
  • .NET 7.0
  • ASP.NET 7.0 Runtime
  • Apache Kafka
  • Java Runtime Environment (JRE)

You can download these tools from the following sources:

Visual Studio 2022: Download Visual Studio Tools — Install Free for Windows, Mac, Linux (microsoft.com)

Apache Kafka: Download Apache Kafka

JRE: Download Java for Windows

Note: To use Kafka for microservice communication, install Java and configure system variables like JAVA_HOME and PATH. These settings are essential for seamless communication between microservices using Kafka’s messaging capabilities.

Now, let’s create a demo application to use Apache Kafka in ASP.NET Core.

In this segment, we will implement simple Inventory Processing APIs.

So, let’s open Visual Studio 2022. Click on the “Create a new Project” option, select “Blank Solution,” and click the ‘‘next’’ button.

Specify the Solution name as “InventoryManagement” and the path where it should be created in the “Configure your new project” window.

After creating a blank solution, proceed to right-click on the solution file within the “Solution Explorer.” Next, select “Add,” followed by “New Project.” Choose “ASP.NET Core Web API” as the project type and name it “InventoryProducer.

Next, we’ll create an “InventoryConsumer” project, configuring it similarly to the InventoryProducer.

Note that you can also choose any meaningful name for both projects.

Now, add new folders named ‘Models’ and ‘Services’ in both projects.

After completing these steps, your solution should look like the image below:

Install necessary NuGet Packages:

The following steps are necessary to install the required NuGet packages. To produce and consume messages, you require a Kafka client. We will use one of the popular clients: “Confluent’s Kafka .NET Client

To install the necessary packages into your project, right-click the solution and choose “Manage NuGet Packages for Solution…” Then, in the search field, enter Confluent.Kafka, and select Confluent.Kafka package, and install it. You can refer to the image below:

Alternatively, you can execute the following command in the Package Manager Console:

PM> Install-Package Confluent.Kafka

Next, in the “Models” folder under the “InventoryProducer” Project, add a new class in it as “InventoryUpdateRequest.cs” with the following code:

namespace InventoryProducer.Models
{
public class InventoryUpdateRequest
{
public int Id { get; set; }

public string ProductId { get; set; }

public int Quantity { get; set; }
}
}

Also, create a new class in the same directory named “InventoryDbContext” with the below code:

namespace InventoryProducer.Models
{
public class InventoryDbContext : DbContext
{
public InventoryDbContext(DbContextOptions<InventoryDbContext> options) : base(options) { }
public DbSet<InventoryUpdateRequest> InventoryUpdates { get; set; }
}
}

Now, in the appsettings.json file, we will configure Kafka, which will contain the address and port of the Kafka broker. This will allow us to easily configure and connect with Kafka without hardcoding these details in our code.

"Kafka": { 
"BootstrapServers": "localhost:9092"
}

Next, we will write our producer’s business logic. Create a new file named ‘ProducerService’ under the Service folder. It will be responsible for sending data to Apache Kafka. The ProduceAsync method sends a message to a specified Kafka topic. It wraps the message in a Kafka-specific format and uses the configured producer to send it.

using Confluent.Kafka;

namespace InventoryProducer.Services
{
public class ProducerService
{
private readonly IConfiguration _configuration;

private readonly IProducer<Null, string> _producer;

public ProducerService(IConfiguration configuration)
{
_configuration = configuration;

var producerconfig = new ProducerConfig
{
BootstrapServers = _configuration["Kafka:BootstrapServers"]
};

_producer = new ProducerBuilder<Null, string>(producerconfig).Build();
}

public async Task ProduceAsync(string topic, string message)
{
var kafkamessage = new Message<Null, string> { Value = message, };

await _producer.ProduceAsync(topic, kafkamessage);
}
}
}

Let’s create a new controller named “InventoryController” in the Controller folder in the “InventoryProducer” project. The controller will be responsible for handling HTTP requests. In short, a controller receives inventory update requests, converts them into JSON messages, and sends them to a Kafka topic for further processing.

Controller code:

namespace InventoryProducer.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class InventoryController : ControllerBase
{
private readonly ProducerService _producerService;

public InventoryController(ProducerService producerService)
{
_producerService = producerService;
}

[HttpPost]
public async Task<IActionResult> UpdateInventory([FromBody] InventoryUpdateRequest request)
{
var message = JsonSerializer.Serialize(request);

await _producerService.ProduceAsync("InventoryUpdates", message);

return Ok("Inventory Updated Successfully...");
}
}
}

Lastly, we will have to add dependency injection of service file in program.cs, as shown below:

builder.Services.AddSingleton<ProducerService>();

We have pretty much completed the InventoryProducer part. Now let’s continue with ‘InventoryConsumer.

Add a ‘Model’ folder; under that, add class ‘InventoryDbContext’ and ‘InventoryUpdateRequest’ as we added in the ‘InventoryProducer’ solution.

Similarly, configure a Kafka server in the appsettings.json file as we did for ‘InventoryProducer’.

Now, let’s create a ConsumerService class in the Services folder.

It will receive incoming messages from a Kafka topic and process them. It connects to Kafka using the provided configuration settings, subscribes to the topic, and continuously checks for new messages. When a message arrives, it logs the content as an inventory update. If any errors occur during the process, it logs those errors as well.

In short, it is responsible for consuming messages from a Kafka topic in the background.

ConsumerService.cs :

namespace InventoryConsumer.Services
{
public class ConsumerService : BackgroundService
{
private readonly IConsumer<Ignore, string> _consumer;

private readonly ILogger<ConsumerService> _logger;

public ConsumerService(IConfiguration configuration, ILogger<ConsumerService> logger)
{
_logger = logger;

var consumerConfig = new ConsumerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
GroupId = "InventoryConsumerGroup",
AutoOffsetReset = AutoOffsetReset.Earliest
};

_consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build();
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_consumer.Subscribe("InventoryUpdates");

while (!stoppingToken.IsCancellationRequested)
{
ProcessKafkaMessage(stoppingToken);

Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
}

_consumer.Close();
}

public void ProcessKafkaMessage(CancellationToken stoppingToken)
{
try
{
var consumeResult = _consumer.Consume(stoppingToken);

var message = consumeResult.Message.Value;

_logger.LogInformation($"Received inventory update: {message}");
}
catch (Exception ex)
{
_logger.LogError($"Error processing Kafka message: {ex.Message}");
}
}
}
}

Create an “InventoryController” with the below code. It will receive inventory update requests and respond with a success message.

InventoryController.cs :

namespace InventoryConsumer.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class InventoryController : ControllerBase
{
private readonly ILogger _logger;

public InventoryController(ILogger logger)
{
_logger = logger;
}

[HttpPost]
public IActionResult ProcessInventoryUpdate([FromBody] InventoryUpdateRequest request)
{
return Ok("Inventory update processed successfully.");
}
}
}

Next, we have to add a dependency injection for ConsumerService, as we did in the ‘InventoryProducer’ project.

Add the following service code to the program.cs file.

builder.Services.AddHostedService<ConsumerService>(); 

Note: Enable both InventoryProducer and InventoryConsumer as starting projects. To do it, right-click on the ‘InventoryManagement’ solution and go to Properties.

Next, go to ‘Startup Project’ from Common Properties, select ‘Multiple startup projects’, and select ‘Start’ from the Action Tab for both projects.

Now, let’s run our application and test it with Swagger.

We will get the output in the console as shown below:

Conclusion:

In this article, we have shared practical demo code to understand Apache Kafka and how it’s used in communication between microservices. Using Confluent’s .NET Producer and Consumer APIs is simple and straightforward, which makes it easy to adopt them for real microservices/streaming apps. Note that you can also use Docker to configure Apache Kafka. You can learn more about Apache Kafka from the Apache Kafka documentation.

For the detailed code, please refer to this GitHub repository:

manan-p-simformsolutions/InventoryManagement_Blog: (github.com)

To stay up-to-date with the latest trends in the development ecosystem, follow Simform Engineering.

--

--