Microservices in Rust with Kafka

Marco Amann
Aug 4 · 15 min read

Microservices don’t always have to speak HTTP. Another common approach is using asynchronous communication via events. A well known middleware, Apache Kafka, despite the fact being at home in the Java eco-system, provides adapters for many platforms. How well can a Rust application be integrated into a mostly foreign environment? In this post we will find out by building another microservice.

Image for post
Image for post
Photo by pine watt on Unsplash

This is the second part of a series of articles on how you can create, deploy, monitor and scale a microservice-based application. The Other parts in this series are (so far):

As mentioned in the intro, microservices don’t always have to expose a REST-API, perhaps they don’t even speak HTTP. What if we would choose to loosely couple our services based on a messaging architecture to make use of the benefits that come with that design like dynamic assignment of communication partners? Of course, what would be possible, but is it a good idea and practical to do so using Rust? In this post we will find out, how suitable Rust is to build microservices in a messaging environment dominated by Java. If you are interested in how we can deploy the service to Kubernetes for a more production-like deployment, you can refer to one of the next posts in this series. You can find the sources for this post in our github repo.

The download-service

Let’s focus on how the application carries out the desired actions:

Image for post
Image for post
The basic idea of our service (we won’t cover the blue magic dust for now)

This shows, that the user will not directly interact with our service. This is an important observation, since it allows us for a bit more freedom in terms of choosing the technology: We are not required to support web-technologies.

So why should we consider using Rust to implement this application, especially if the rest of the ecosystem is implemented in Java? One reason is due to the scalability of the download-service: depending on the user behavior, it might be desirable to have hundreds of instances running. Given the absence of a real runtime in a rust program, we can achieve low resource usage and hence scale easily. Further, Rust provides excellent, high-level safety guarantees when working with concurrent and asynchronous code what will come in handy, when the user needs to ‘wait’ for a download to finish. The image crate allows for super fast and flexible image processing. It is an often observed practice to write performance intensive applications in Rust and connect it to the rest of the (existing) infrastructure.

Connections

No fast replies required:It is acceptable for the user to wait a few seconds for their image to be downloaded
Self-Contained tasks: The individual download tasks are self-contained and do not require external information
Scalability: If the user-base grows, we might hit (bandwidth-)limits of a single host, therefore we want to be able to scale the download service independently of the management service
Elasticity: The service should not only be able to cope with long-term growth, but also short-term fluctuations in workload to scale in a cost-efficient manner
Resilience: If either one of the services is unavailable (e.g. due to maintenance or network segmentation), the other service should not be affected.

To achieve the desired properties mentioned above, I chose to integrate the services loosely with messaging (the reasons for this decision are discussed in the next section).

To be prepared for service-outages, our messaging system has to persist messages until they are successfully processed.
For this use case, I chose Apache Kafka, since it is reliable software and despite being a bit hard to set up properly, I like to work with it.

Image for post
Image for post
The basic idea with the message queue in place

The resulting control flow will be as follows:

  • The user submits a new URL that they want to have downloaded to the Java management application
  • After performing basic checks like authentication of the user and a set of blacklisted domains (e.g. phncdn), the request is transformed into a message that is sent to a Kafka topic ‘urls_new’
  • The download microservice receives those messages, downloads the URL, creates a thumbnail and uploads the files to an S3 bucket
  • When the download service is done, it informs the management application about its completion and the resources are ready to be served to the user.

Using Kafka to integrate our Microservices

Basic Terminology

Scaling

Image for post
Image for post
Partitions (P0–3) assigned to consumers (C1–6) of two groups. From https://kafka.apache.org/24/images/consumer-groups.png

Using consumer-groups, scaling up our system is easy, we just spin up a new instance and let it subscribe to the Kafka topic. But what about scaling down? Can we simply kill an application while it is running? To answer this, let’s talk about reliability.

Reliability

What delivery semantics do we want to achieve with our messages? We can basically choose from four:

  • Best-effort: The system gives us next to no guarantees if a message will be processed. If the receiver crashes, the message is lost. Think of UDP here.
  • At-Most-Once: Each message will be processed at most once under all circumstances. Rather do not process it at all than to process it twice.
  • At-Least-Once: Each message will be processed eventually, perhaps even several times.
  • Exactly-Once: This would guarantee that every message is processed once-and-only-once. This is really tricky and we won’t do this.

Since we can tolerate a resource to be downloaded multiple times (bandwidth is cheap), we go for At-Least-Once delivery semantics. We need to make use of another concept of Kafka to achieve this: consumers offsets. When a consumer has processed a message, it commits the offset (of the message within its partition) to Kafka and the server stores that per-consumer offset per partition. As long as we do not commit an offset, the offset stored in Kafka is not changed and when our service crashes or is down-scaled, another consumer will re-receive that message and hopefully successfully process and commit the message.

So to achieve At-Least-Once semantics, we simply have to commit only after we are finished with processing. (More details later on)

Integration

Tracing

State

Image for post
Image for post
Building — Photo by Gerold Hinzen on Unsplash

Building the Microservice in Rust

Rust and Kafka

Since the focus of this post is not how those APIs work, I will spare you the details and we will start with a more conceptual overview.

The core logic of the download-service is as follows:

for msg in receive_messages(in_topic):
parse_payload(msg)
img = download_resource(msg)
tmb = create_thumbnail(img)
push_to_s3(img, tmb)
send_ready_notification(out_topic, msg)
if everything_is_ok:
in_topic.commit(msg.offset)

Since our service most likely has more bandwidth as the image resources provide, it makes sense to download multiple images in parallel. We do this by running the above code multiple times in parallel, this is described in the next section.

The rdkafka crate has a pretty nice API, receiving messages is implemented as an async-stream, allowing us to write the receive loop like the following (pretty much like the pseudo-code):

The receive loop

A quick note on Kafka and Rust here: I chose to build parallel processing by using multiple kafka-consumers, each with their own processing logic from above. It would be nicer to have fewer consumers (or even only one) per downloader service, that drive many parallel downloads. However, if you commit an offset in Kafka, that commit is cumulative. This means, if we commit offset 42, Kafka assumes we also acknowledge processing all messages up to 42. This is a problem in our case: if a large download arrives with offset 25 and takes a long time to finish (e.g. because the other service is slow or the resource is large), we cannot commit any messages until it is done downloading, perhaps for hours. This means we would loose hours of progress if the service crashes.

Quick rdkafka walk-through

The first thing we need is a runtime. You can either create one by hand or use the provided annotation:

#[tokio::main] async fn main() {
...
}

The next thing we need is a Consumer. They can be created with or without a custom Context, which allows for hooking up functionality to internal events, like when offsets are committed. At creation time, you need to know the group-id to join and a list of brokers to use as bootstrap-servers:

let consumer = ClientConfig::new()
.set("group.id", group_id)
.set("bootstrap.servers", brokers)
// optionally more configuration
.create()?;

Now the consumer can be used to subscribe to topics:

consumer.subscribe(&topics)?;

And when started, it yields a message stream:

let mut message_stream = consumer.start();

This asynchronous message stream allows us to process each and every message, e.g. by using a loop:

while let Some(message) = message_stream.next().await {
match message {
Err(e) =>[...],
Ok(m) => {
// process the message
[...]
consumer.commit_message(&m, CommitMode::Async);
}
};
}

Notice the await here? This will be explained in the next section, so don’t worry.

We explicitly commit the message in this example, if the consumer was created with auto-commit, this is not necessary. Since Kafka has no idea of the message format, we have to handle this ourselves. In this case, I simply used the serde crate to deserialize a json from a string-view into the payload:

let payload = match m.payload_view::<str>() {
None => "",
Some(Ok(s)) => s,
Some(Err(e)) => [...]
};

Note, that a message can have no payload at all (the None case) and of course we need to handle the error case, e.g. the bytes are not utf8.

Async-Await

That way, we can wait for new messages on the kafka-consumers, finished downloads, completed uploads to S3, and even offload image-processing to a thread-pool.

This is the original process function, I only stripped away some type-annotations for clarity (it matches quite good with the pseudo-code from above):

It downloads a given URL, spawns the image processing on a special thread pool for CPU-intensive work, pushes the results to s3 and logs success and returns.
Note the ? at the await statements: they leverage the return types: if one of the calls fails, the surrounding process function returns with an Result::Err Type. If coming from Java, think of a more precise but therefore more limited version of a throw . This limitation has both, benefits and drawbacks: On one hand, you know exactly what Error-Type will be returned and you cannot forget to handle a thrown exception. On the other hand, you need to know the exact types in advance and are limited to them.

Configuration

This outputs the following help-text (if called with --help ):

downloader 0.1.0USAGE:
downloader [OPTIONS]
FLAGS:
-h, — help Prints help information
-V, — version Prints version information
OPTIONS:
-r, — s3-region-name <s3-region-name> [env: S3_REGION_NAME=] [default: eu-east2]

Using the environment-based configuration, we allow for configuring the service from within a docker-compose file or k8s ConfigMaps without the need to map the configuration to a file.

Using S3 with Rust

There is rust-s3, a relatively small crate, the one I eventually chose. But more to that later on.
Further, there is Rusoto, describing itself as ‘AWS SDK for Rust‘. While the code base is massive and has 139 contributors at the time of writing, I was unable to use it. There were several version conflicts of TLS-libraries with other crates. There is no guide, that describes how to use the more than 700 structs in the S3 crate alone, although the docs are really detailed. For S3, there is no example that would allow stealing code and the returned error messages of S3 are more than cryptic.
I really hope, they publish an overview of their crate: making such a powerful tool accessible to the masses could really increase the adoption of Rust in this space.

The rust-s3 crate is much simpler, somewhat broken in places, but it does its job and has nice examples. Unfortunately, the crate does not support async uploads, so its put-operations has to be executed from within the blocking-thread-pool of tokio.
Moreover, the error handling is incomplete, so I would advise against using the crate in a production environment just yet.

The following shows a shortcoming of Rust in combination with not-yet-done APIs: The strict and sometimes verbose type system leads to beauties like this:

The Ok case of the result of the put_object function is mapped to the empty type () since it is broken at the moment. The Err case is mapped to another error type, that can be communicated to the external world (remember, we cant implement foreign Traits on foreign Structs). So the str in the error and the hardcoded one are cast into String with .into() what is a bit clumsy. Now the real problems start: The return-type of the enclosing method isResult<(), Box<dyn Error + Send + Sync> >(A result of the empty type or a box of something implementing the Error, Send and Sync traits) , since already the creation of the bucket can fail. But we create a concrete type here: Result<(), Box<NetworkError>> . This means, we need another .into() to tell the compiler, that we acknowledge that it may drop the detailed type information. Whilst this code could be moved to a From implementation, this only scatters the problems around in our source files.

The code

Image for post
Image for post
Wires, having nothing to do with the topic — Photo by Victor Aznabaev on Unsplash

Wiring it all up

# ZooKeeper
➜ ./bin/zookeeper-server-start.sh config/zookeeper.properties
# Kafka
➜ ./bin/kafka-server-start.sh config/server.properties

We can directly create our topic for the tasks with a given partition count. Refer to this blog post if you would like to read more about that topic.

➜ ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic converter_tasks --partitions 4 --replication-factor 1

To store the downloaded resources, the microservice needs a storage backend compatible with the S3-API. You can (amongst others) use Amazon S3, ceph, or if you just want to run everything locally, minio:

➜ ./minio server ./data --address 127.0.0.1:9099

If everything is running, you can already start the microservices:

➜ cargo run --release  # for the downloader service
➜ ./gradlew bootRun # for the management service

Notice that it is irrelevant which service is available first, since they can operate independently and the loose coupling allows them to come online as they please.

Scaling up

If you scale your service for testing on a single host, you might want to tune OS parameters, otherwise you might run into problems by having too many parallel downloads. Have a look at the parameters at sysctl net .

Scaling down

Summary

Now back to the question from the intro:

How hard is it to connect a Rust microservice to Kafka?

From the experience gained writing the application for this blog, I tend to say that it is straightforward to connect a rust application to Kafka, if you mostly use basic features. More exotic functionality in the crates might have harder to find documentation and less refined APIs. If you require your service to be maintainable and safe, you should consider Rust. However, you need to have enough people that know Rust or are willing to learn it, in order to maintain your software. In any case, it makes sense to write a small prototype to verify that all required functionality is sufficiently supported in the used crates.

Due to the isolated nature of a microservice, I would argue that the risk of implementing a tiny service in a new language is not that huge. This is especially true if your public interfaces are well-defined and you use off-the-shelf technologies to connect them.

In the end, the more people use rust and contribute to its environment, the better it gets and thereby smaller problems slowing you down are eliminated faster.

Thanks for reading! If you have any questions, suggestions or critique regarding the topic, feel free to respond or contact me. You might be interested in the other posts published in the Digital Frontiers blog, announced on our Twitter account.

Digital Frontiers — Das Blog

Dies ist das Blog der Digital Frontiers GmbH & Co.

Thanks to François Fernandès, Benedikt Jerat, and Nikolai Neugebauer

Marco Amann

Written by

Digital Frontiers — Das Blog

Dies ist das Blog der Digital Frontiers GmbH & Co. KG (http://www.digitalfrontiers.de). Hier veröffentlichen wir zu Themen, die uns interessieren und bewegen.

Marco Amann

Written by

Digital Frontiers — Das Blog

Dies ist das Blog der Digital Frontiers GmbH & Co. KG (http://www.digitalfrontiers.de). Hier veröffentlichen wir zu Themen, die uns interessieren und bewegen.

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store