Microservices in Rust with Kafka

Marco Amann
Digital Frontiers — Das Blog
15 min readAug 4, 2020

Microservices don’t always have to speak HTTP. Another common approach is using asynchronous communication via events. A well known middleware, Apache Kafka, despite the fact being at home in the Java eco-system, provides adapters for many platforms. How well can a Rust application be integrated into a mostly foreign environment? In this post we will find out by building another microservice.

Photo by pine watt on Unsplash

This is the second part of a series of articles on how you can create, deploy, monitor and scale a microservice-based application. The Other parts in this series are (so far):

As mentioned in the intro, microservices don’t always have to expose a REST-API, perhaps they don’t even speak HTTP. What if we would choose to loosely couple our services based on a messaging architecture to make use of the benefits that come with that design like dynamic assignment of communication partners? Of course, what would be possible, but is it a good idea and practical to do so using Rust? In this post we will find out, how suitable Rust is to build microservices in a messaging environment dominated by Java. If you are interested in how we can deploy the service to Kubernetes for a more production-like deployment, you can refer to one of the next posts in this series. You can find the sources for this post in our github repo.

The download-service

We will re-use the long-running example from the last post: Our image-printing company. Our task in this post is to create the downloading service: The user will supply an URL and we will download the resource for them and we create thumbnails. If you want to have a bit of context on how this service fits into the system, refer to the previous post.

Let’s focus on how the application carries out the desired actions:

The basic idea of our service (we won’t cover the blue magic dust for now)

This shows, that the user will not directly interact with our service. This is an important observation, since it allows us for a bit more freedom in terms of choosing the technology: We are not required to support web-technologies.

So why should we consider using Rust to implement this application, especially if the rest of the ecosystem is implemented in Java? One reason is due to the scalability of the download-service: depending on the user behavior, it might be desirable to have hundreds of instances running. Given the absence of a real runtime in a rust program, we can achieve low resource usage and hence scale easily. Further, Rust provides excellent, high-level safety guarantees when working with concurrent and asynchronous code what will come in handy, when the user needs to ‘wait’ for a download to finish. The image crate allows for super fast and flexible image processing. It is an often observed practice to write performance intensive applications in Rust and connect it to the rest of the (existing) infrastructure.

Connections

Our services are required to communicate with one another. There are many options to choose from, so let’s start by investigating our language agnostic requirements:

No fast replies required:It is acceptable for the user to wait a few seconds for their image to be downloaded
Self-Contained tasks: The individual download tasks are self-contained and do not require external information
Scalability: If the user-base grows, we might hit (bandwidth-)limits of a single host, therefore we want to be able to scale the download service independently of the management service
Elasticity: The service should not only be able to cope with long-term growth, but also short-term fluctuations in workload to scale in a cost-efficient manner
Resilience: If either one of the services is unavailable (e.g. due to maintenance or network segmentation), the other service should not be affected.

To achieve the desired properties mentioned above, I chose to integrate the services loosely with messaging (the reasons for this decision are discussed in the next section).

To be prepared for service-outages, our messaging system has to persist messages until they are successfully processed.
For this use case, I chose Apache Kafka, since it is reliable software and despite being a bit hard to set up properly, I like to work with it.

The basic idea with the message queue in place

The resulting control flow will be as follows:

  • The user submits a new URL that they want to have downloaded to the Java management application
  • After performing basic checks like authentication of the user and a set of blacklisted domains (e.g. phncdn), the request is transformed into a message that is sent to a Kafka topic ‘urls_new’
  • The download microservice receives those messages, downloads the URL, creates a thumbnail and uploads the files to an S3 bucket
  • When the download service is done, it informs the management application about its completion and the resources are ready to be served to the user.

Using Kafka to integrate our Microservices

Kafka has a lot of concepts that need to be understood for it to be used efficiently. Let’s look into the most important concepts of Kafka, that we will need:

Basic Terminology

Kafka is based on the concept of messages being written to topics( they can have names like “things_to_download”). The part that writes messages is called a producer, the part that reads is a consumer. One topic can have zero or more consumers. One topic consists of one or more partitions, that store messages along an increasing sequence-number, called offset.

Scaling

We want our application to scale-up dynamically in the case of a request-peak and back down, when the load decreases. Therefore, our service has to be able to adapt to changing load, after all, that was one of the selling-point of microservices in the first place. Luckily this part can be easily achieved by using Kafka:
Kafka supports consumer-groups, a set of individual (physical) consumers that form one logical consumer. Kafka then assigns partitions within a topic to consumers within the group, thereby distributing messages between the consumers.

Partitions (P0–3) assigned to consumers (C1–6) of two groups. From https://kafka.apache.org/24/images/consumer-groups.png

Using consumer-groups, scaling up our system is easy, we just spin up a new instance and let it subscribe to the Kafka topic. But what about scaling down? Can we simply kill an application while it is running? To answer this, let’s talk about reliability.

Reliability

To be able to answer whether scaling-down simply means killing a process, we need to talk about the semantics of our download-tasks: A user surely expects its URL to be downloaded once submitted. So once a task is published to Kafka as a message, we depend on the semantics of that message.

What delivery semantics do we want to achieve with our messages? We can basically choose from four:

  • Best-effort: The system gives us next to no guarantees if a message will be processed. If the receiver crashes, the message is lost. Think of UDP here.
  • At-Most-Once: Each message will be processed at most once under all circumstances. Rather do not process it at all than to process it twice.
  • At-Least-Once: Each message will be processed eventually, perhaps even several times.
  • Exactly-Once: This would guarantee that every message is processed once-and-only-once. This is really tricky and we won’t do this.

Since we can tolerate a resource to be downloaded multiple times (bandwidth is cheap), we go for At-Least-Once delivery semantics. We need to make use of another concept of Kafka to achieve this: consumers offsets. When a consumer has processed a message, it commits the offset (of the message within its partition) to Kafka and the server stores that per-consumer offset per partition. As long as we do not commit an offset, the offset stored in Kafka is not changed and when our service crashes or is down-scaled, another consumer will re-receive that message and hopefully successfully process and commit the message.

So to achieve At-Least-Once semantics, we simply have to commit only after we are finished with processing. (More details later on)

Integration

Since we integrate a Java application and a Rust application, there needs to be some common exchange format or at least some transformations to and from a canonical representation. There will be a whole post about this challenge, for now, we will simply use JSON and hope for it to work out of the box.
I will link that post here, once published

Tracing

In our scenario, the downloader tells the management service, that is has finished downloading a resource. With (blocking) RPC, this would be easy: the management blocks until the downloader is done and has the result-values returned. But with messaging, we won’t block. So we need to have a way to match “done-messages” with tasks in the management service. In the messaging world, this is done with a Correlation-ID, that is stored and can be used to match replies to requests. This further has the benefit that we are able to track processing of a single request across multiple services in external systems, e.g. for logging.

State

Since the Kafka cluster stores our consumer offsets, the downloader can be completely stateless across tasks. Of course, downloading a resource is inherently stateful.

Building — Photo by Gerold Hinzen on Unsplash

Building the Microservice in Rust

Since we covered the basics, we can proceed to implement the application. The first thing to do is to select appropriate crates: we don’t want to reinvent the wheel.

Rust and Kafka

There exist two crates for connecting to Kafka with Rust: ‘kafka’ and ‘rdkafka’, the first one being a pure rust implementation, the second one is based on librdkafka. For this project, I chose rdkafka due to its slightly more accessible documentation.

Since the focus of this post is not how those APIs work, I will spare you the details and we will start with a more conceptual overview.

The core logic of the download-service is as follows:

for msg in receive_messages(in_topic):
parse_payload(msg)
img = download_resource(msg)
tmb = create_thumbnail(img)
push_to_s3(img, tmb)
send_ready_notification(out_topic, msg)
if everything_is_ok:
in_topic.commit(msg.offset)

Since our service most likely has more bandwidth as the image resources provide, it makes sense to download multiple images in parallel. We do this by running the above code multiple times in parallel, this is described in the next section.

The rdkafka crate has a pretty nice API, receiving messages is implemented as an async-stream, allowing us to write the receive loop like the following (pretty much like the pseudo-code):

The receive loop

A quick note on Kafka and Rust here: I chose to build parallel processing by using multiple kafka-consumers, each with their own processing logic from above. It would be nicer to have fewer consumers (or even only one) per downloader service, that drive many parallel downloads. However, if you commit an offset in Kafka, that commit is cumulative. This means, if we commit offset 42, Kafka assumes we also acknowledge processing all messages up to 42. This is a problem in our case: if a large download arrives with offset 25 and takes a long time to finish (e.g. because the other service is slow or the resource is large), we cannot commit any messages until it is done downloading, perhaps for hours. This means we would loose hours of progress if the service crashes.

Quick rdkafka walk-through

The rdkafka library has a great set of examples but due to the many concepts of Kafka, they are a bit convoluted. So let me give you a quick walk-through of how the crate works.

The first thing we need is a runtime. You can either create one by hand or use the provided annotation:

#[tokio::main] async fn main() {
...
}

The next thing we need is a Consumer. They can be created with or without a custom Context, which allows for hooking up functionality to internal events, like when offsets are committed. At creation time, you need to know the group-id to join and a list of brokers to use as bootstrap-servers:

let consumer = ClientConfig::new()
.set("group.id", group_id)
.set("bootstrap.servers", brokers)
// optionally more configuration
.create()?;

Now the consumer can be used to subscribe to topics:

consumer.subscribe(&topics)?;

And when started, it yields a message stream:

let mut message_stream = consumer.start();

This asynchronous message stream allows us to process each and every message, e.g. by using a loop:

while let Some(message) = message_stream.next().await {
match message {
Err(e) =>[...],
Ok(m) => {
// process the message
[...]
consumer.commit_message(&m, CommitMode::Async);
}
};
}

Notice the await here? This will be explained in the next section, so don’t worry.

We explicitly commit the message in this example, if the consumer was created with auto-commit, this is not necessary. Since Kafka has no idea of the message format, we have to handle this ourselves. In this case, I simply used the serde crate to deserialize a json from a string-view into the payload:

let payload = match m.payload_view::<str>() {
None => "",
Some(Ok(s)) => s,
Some(Err(e)) => [...]
};

Note, that a message can have no payload at all (the None case) and of course we need to handle the error case, e.g. the bytes are not utf8.

Async-Await

Async-await is a new feature in the Rust language. It allows to ‘await’ the completion of a function marked with ‘async’. The Rust-guys write a whole book about these features, so please refer to that one for details. For now, the only important semantic is the following: when we use await on a function call, our code will be paused there and resumed when the task finished, without blocking a thread in the underlying thread-pool.

That way, we can wait for new messages on the kafka-consumers, finished downloads, completed uploads to S3, and even offload image-processing to a thread-pool.

This is the original process function, I only stripped away some type-annotations for clarity (it matches quite good with the pseudo-code from above):

It downloads a given URL, spawns the image processing on a special thread pool for CPU-intensive work, pushes the results to s3 and logs success and returns.
Note the ? at the await statements: they leverage the return types: if one of the calls fails, the surrounding process function returns with an Result::Err Type. If coming from Java, think of a more precise but therefore more limited version of a throw . This limitation has both, benefits and drawbacks: On one hand, you know exactly what Error-Type will be returned and you cannot forget to handle a thrown exception. On the other hand, you need to know the exact types in advance and are limited to them.

Configuration

Our microservice should be configurable, for example, to tell it where it can find the Kafka brokers or what credentials should be used for S3.
For this, the structopt crate can be used, it feels close to cheating, allowing the following: With annotating a struct and its fields, one can define configurations manageable with command-line parameters or environment variables.

This outputs the following help-text (if called with --help ):

downloader 0.1.0USAGE:
downloader [OPTIONS]
FLAGS:
-h, — help Prints help information
-V, — version Prints version information
OPTIONS:
-r, — s3-region-name <s3-region-name> [env: S3_REGION_NAME=] [default: eu-east2]

Using the environment-based configuration, we allow for configuring the service from within a docker-compose file or k8s ConfigMaps without the need to map the configuration to a file.

Using S3 with Rust

Whilst the available crates for Kafka were great, this is not the case for the S3 crates.

There is rust-s3, a relatively small crate, the one I eventually chose. But more to that later on.
Further, there is Rusoto, describing itself as ‘AWS SDK for Rust‘. While the code base is massive and has 139 contributors at the time of writing, I was unable to use it. There were several version conflicts of TLS-libraries with other crates. There is no guide, that describes how to use the more than 700 structs in the S3 crate alone, although the docs are really detailed. For S3, there is no example that would allow stealing code and the returned error messages of S3 are more than cryptic.
I really hope, they publish an overview of their crate: making such a powerful tool accessible to the masses could really increase the adoption of Rust in this space.

The rust-s3 crate is much simpler, somewhat broken in places, but it does its job and has nice examples. Unfortunately, the crate does not support async uploads, so its put-operations has to be executed from within the blocking-thread-pool of tokio.
Moreover, the error handling is incomplete, so I would advise against using the crate in a production environment just yet.

The following shows a shortcoming of Rust in combination with not-yet-done APIs: The strict and sometimes verbose type system leads to beauties like this:

The Ok case of the result of the put_object function is mapped to the empty type () since it is broken at the moment. The Err case is mapped to another error type, that can be communicated to the external world (remember, we cant implement foreign Traits on foreign Structs). So the str in the error and the hardcoded one are cast into String with .into() what is a bit clumsy. Now the real problems start: The return-type of the enclosing method isResult<(), Box<dyn Error + Send + Sync> >(A result of the empty type or a box of something implementing the Error, Send and Sync traits) , since already the creation of the bucket can fail. But we create a concrete type here: Result<(), Box<NetworkError>> . This means, we need another .into() to tell the compiler, that we acknowledge that it may drop the detailed type information. Whilst this code could be moved to a From implementation, this only scatters the problems around in our source files.

The code

You can find the source code of the described microservice over at our github-repo.

Wires, having nothing to do with the topic — Photo by Victor Aznabaev on Unsplash

Wiring it all up

As all components exist, we can now wire them up and go for a test-run. For now, we will only run everything in local processes. First, we need a Kafka broker and that in turn needs a ZooKeeper. Luckily in the official Kafka distribution zip, there is everything we need:

# ZooKeeper
➜ ./bin/zookeeper-server-start.sh config/zookeeper.properties
# Kafka
➜ ./bin/kafka-server-start.sh config/server.properties

We can directly create our topic for the tasks with a given partition count. Refer to this blog post if you would like to read more about that topic.

➜ ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic converter_tasks --partitions 4 --replication-factor 1

To store the downloaded resources, the microservice needs a storage backend compatible with the S3-API. You can (amongst others) use Amazon S3, ceph, or if you just want to run everything locally, minio:

➜ ./minio server ./data --address 127.0.0.1:9099

If everything is running, you can already start the microservices:

➜ cargo run --release  # for the downloader service
➜ ./gradlew bootRun # for the management service

Notice that it is irrelevant which service is available first, since they can operate independently and the loose coupling allows them to come online as they please.

Scaling up

Now we can test scaling our downloader service: just spin up another instance! If you have at least two partitions in our topic, download requests should now be distributed between the two services. Depending on the already assigned partitions, you might need to send a few requests to see the effect.

If you scale your service for testing on a single host, you might want to tune OS parameters, otherwise you might run into problems by having too many parallel downloads. Have a look at the parameters at sysctl net .

Scaling down

To test how the service scales down, either download a huge file or insert a delay. Then, with two consumers running, kill one process after it has started processing. You should observe that the message is re-consumed by another consumer after a few seconds (see your timeout configuration). Yay: At-Least-Once delivery!

Summary

It has turned out to be quite easy to implement a small service with Rust once the decision was made, which libraries to use. However, as often so in the ever-changing ecosystem of Rust, documentation was slightly out-of-sync, requiring some deeper digging.

Now back to the question from the intro:

How hard is it to connect a Rust microservice to Kafka?

From the experience gained writing the application for this blog, I tend to say that it is straightforward to connect a rust application to Kafka, if you mostly use basic features. More exotic functionality in the crates might have harder to find documentation and less refined APIs. If you require your service to be maintainable and safe, you should consider Rust. However, you need to have enough people that know Rust or are willing to learn it, in order to maintain your software. In any case, it makes sense to write a small prototype to verify that all required functionality is sufficiently supported in the used crates.

Due to the isolated nature of a microservice, I would argue that the risk of implementing a tiny service in a new language is not that huge. This is especially true if your public interfaces are well-defined and you use off-the-shelf technologies to connect them.

In the end, the more people use rust and contribute to its environment, the better it gets and thereby smaller problems slowing you down are eliminated faster.

Thanks for reading! If you have any questions, suggestions or critique regarding the topic, feel free to respond or contact me. You might be interested in the other posts published in the Digital Frontiers blog, announced on our Twitter account.

--

--