Scaling Microservices @ Jet.com
Written By: James Novino
In the previous post, I talk about how we build our microservices using the decode → handle → interpret pipeline. This post is intended to talk more about scaling those microservices as scalability is an important part of the microservice development process.
One of the main benefits of microservices is that they are small independent services which allow for a variety of mechanisms for scaling. The sections below discuss some of the mechanism we employ to scale the throughput (number of messages processed / second) of each microservice. This post does not explain how we deal with scaling infrastructure, APIs or many of the other aspects of load balancing or systems scaling. We will only discuss how we deal with scalability for individual microservices, including:
- Parallel Consumption (Batch Processing)
- Horizontal Scaling
- Process Parallelism
Processing Incoming Messages
Microservices need to communicate with one another, to orchestrate business flows, typically through some service-to-service communication medium. At Jet, we utilize asynchronous communication channels like Kafka heavily. When communicating via service-to-service communication channels, the service needs a mechanism to listen to (consume from) the channel or topic and process those messages. Our shared infrastructure wrapper provides a
OmsService.consume function that does just that. It also provides a message representation that enables batch/parallel processing. In the OMS system we have 3 different types of message processing parallelism:
- Serial : Serially processing messages is probably the simplest case, which is when we process one message after the other. Serial processing gives strong order guarantees but sacrifices throughput (Queueing Theory 101 states that throughput decreases as the queue delay or latency processing messages increases) Scaling microservices that process serially have their own set of challenges.
- Parallel : Parallel message processing means the consumer attempts to process messages as fast as they arrive, as long as the ordering of the messages is not a requirement. Processing all incoming messages in parallel may reduce latency, but it requires careful implementation as one could easily overwhelm underlying threading systems by spawning an unbounded number of threads. In our implementation, we both limit the message batch size being processed and the number of threads that are spawned for the service.
- Hybrid : Hybrid or data-driven parallelism is a more realistic mechanism, where processing of a group is either done in parallel or serially depending on a series of correlation identifiers. This type of parallelism gives us more flexibility as we have guarantees that messages are processed serially when duplicate
correlationIdsexists in a single batch.
We implemented this selector so that consumers can make the decision about how they want to process incoming messages. They specify their decision using the
OmsService.handle . This function accepts a function
f:'a -> 'a -> bool that determines if the messages should be processed in parallel, serially, or using the hybrid method. Consumers make this decision by selecting one of the three available functions from the Parallelism module:
let Always = fun _ _ -> true
let Never = fun _ _ -> false
let basedOn f =
fun a b -> (f a) <> (f b)
Always function is used when you always want parallel processing, the
Never function is when you always want serial processing, and the
basedOn function allows for more dynamic parallelism while ensuring that messages with the same
correlationId are never processed in parallel if they are contained in either the same Batch or Bunch. If we look at the example from above:
let correlation =
| Input.Trigger(cid, _ ) -> [cid]
| _ -> 
OmsService.handle log (Parallelism.basedOn (fst >> correlationId)) handler
We can see that the
basedOn function is determining if the input message can be processed in parallel based on some correlationIds that are extracted from the Input payload. An example of this correlationId would be an orderId which is a unique guid representing an order, in most use cases we don’t want to process messages with the same orderId in parallel, out of order. Note: The implementation for
Parallelism.Always is based on a refined version of the Fork-Join, concurrency model.
Horizontal Parallelism (Horizontal Scaling)
Another form of parallelism we employ is horizontal scaling, which is the process of adding/removing instances of a microservice. Note: The parallelism described above is also a form of horizontal scaling. The difference is in the scope, above we are adding instances of concurrent Async computations. Here we are talking about adding microservice instances. But they should be approached similarly, i.e., adding microservices when existing instances can’t keep up.
Horizontal scalability is the ability to increase capacity by connecting multiple hardware or software entities so that they work as a single logical unit.
At Jet, we utilize horizontal scaling as we leverage parallel communication layers (Something that supports partitioning, or multiple consumers) examples of which would be Kafka, Azure Service Bus. To scale service horizontally, we need to ensure our services have at-least-once delivery semantics by implementing idempotence checks in the
handle. At Jet, we use Hashicorp Nomad as our process scheduling layer.
Nomad is a tool for managing a cluster of machines and running applications on them. Nomad abstracts away machines and the location of applications, and instead enables users to declare what they want to run and Nomad handles where they should run and how to run them.
Nomad allows us to dynamically scale our services horizontally, by increasing and decreasing instance counts either through an operational UI or via the Nomad CLI. In addition to scaling horizontally it allows us to scale vertically when needed across our multiple regions/environments. Mohit Arora gave a talk last year on how we use Nomad at Jet:
In addition to the horizontal scaling, we can also control the parallelism of the thread pool within each microservice independently. We talked about how we can manage if we process in parallel or not but we haven’t discussed how that works. To talk about how that works, we need some background, the way we deal with parallelism is through a simplification of the Fork-Join model. The incoming stream of messages is processed as such:
1. As a message arrives, check the current state to see if it can be processed immediately. If not, “suspend” on the task that is processing a different message on the same group.
2. If yes, then check to see if we have reached the max level of concurrency. If yes, “suspend” until one of the other tasks are completed.
3. If no, then spawn a task that processes this message. Update the state to include this new task.
The model above introduces some challenges around correctly committing the read position to get an at-least-once delivery semantic. The complexity of maintaining an at-least-once delivery semantic lies in that any stream processing service needs to manage offset/checkpoints explicitly. It is possible given the implementation to end up reading duplicate messages, on service restarts, which is why ideptonency is a core tenant of our microservice and systems design. The core of process parallelism lies in the
This function runs the specified function
f passing the
args either in parallel or waits until all tasks specified in the
state are completed; depending on
canRunTogether : : ‘input -> ‘input -> bool This is the same function
Parallelism.basedOn it’s just being past into the internals of the handler.
Parallelism is still best-effort and is not always guaranteed. The parallelism field is passed to the
runParallelOrBlock via the
OMS.Infrastructure.Configuration module, this module allows consumers to control the number of parallel tasks the services will attempt to use. This implementation is simple if
canRunTogether is false; then we wait until all the tasks specified in the state are completed. The State returned will have all completed tasks removed.
Note: Not the best implementation to handle errors and such. Expected that
f will always be a happy function. If exceptions bubble up to the handler, it will cause the batch/bunch to cancel process which can lead to a consumption loop where the same messages are consumed over and over; we deal with this via alerting & monitoring.
Microservice architectures are more complicated than most legacy systems as they require more infrastructure and services to orchestrate tasks. Leading to the environment being more complex which introduces a new set of challenges:
- Scaling — Process level parallelism increases the throughput of individual services, and adding MS instances increases the throughput of the system of services, but all constrained by dependencies. Amdahl’s Law talks about this relationship in more detail.
- Failure — When more services are interacting, you increase the possible points of failure. This requires effort to ensure services and system are fault tolerant. The watchword with microservices is “interdependence”. Services have to be able to withstand outright failures and inexplicable timeouts. Failures can cascade quickly, causing wide-spread issues. Fault tolerance in these types of environments is much more complicated.
- Monitoring — Adding additional services, makes configuring and monitoring them more challenging.
At Jet, we tackle these challenges with a variety of tooling and practices that may be the subject of future posts.
While this post covered various methods, we employ to manage consumption throughput. It’s important to note that while we can increase performance in a variety of ways, that it’s a worthwhile pursuit to ensure that the processing of incoming messages is as efficient as possible. Since microservices are scaled independently of one another, system scaling can be a challenge. Scaling systems requires a combination of proper load testing, configuration, monitoring, service partitioning, etc.