Wix’s distributed system of 2000 clustered microservices is required to process billions of business events every day with very high speed in a highly concurrent fashion.
There is a need to balance the load between the various cluster nodes, such that no bottlenecks are created. For serving HTTP requests, this can be done by load balancers such as NGINX or Amazon’s ELB — this is out of scope for this article.
A service acting as a client may also require to load-balance its calls in certain cases. For example when it initializes an internal cache from data retrieved from a different service.
There are also many cases where events and actions have to be processed in an atomic manner such that the stored data remains valid. E.g. changing account balance or updating inventory.
In this blog post, we will explore various practices used by Wix microservices that ensure atomic operation for updating the state of some resource (e.g. a cache or a DB entry), thus keeping the data valid but without compromising on high throughput and low latency.
The following practices are divided by their operation “granularity”:
- Selecting a single leader service node to run a task or a bunch of tasks
- Sharding the retrieval of a large dataset by multiple “leader” nodes
- Processing events sequentially for single domain entity by any random service node
1. Selecting a Leader for scheduling tasks using ZooKeeper
There are many services at Wix that are required to perform scheduled tasks.
Let’s consider for example Contacts Importer Service that imports Wix Site Owners contacts from external locations such as gmail.
The Importer service DB accumulates many import jobs metadata that becomes stale and can be deleted or archived once the import process is completed. Otherwise the DB will grow bigger and have slower response times.
Scheduling a cron job
A periodic cleaning job needs to be scheduled that will perform the DB deletion operations.
Note that scheduling cron jobs by a clustered microservice comprising multiple nodes, requires that only one node will be in charge of the scheduling of the same task.
Otherwise, the cleaning task can potentially run more than once at the same time, causing unintended race condition errors, like ending up with incorrect import job state and it also puts more load on the DB.
Wix has a Cron Scheduler service called Cronulla that makes sure that jobs are scheduled in just one of the client service nodes. It accepts requests to schedule a REST call to the client service with some cron expression string.
"0 7 * * * *"This cron expression means run once every hour on the 7th minute.
Zookeeper and Curator
In order to make sure the requested job is only sent once to Contacts Importer service every hour, Cronulla enlists the help of Apache Zookeeper. Zookeeper is a centralized service used to coordinate distributed systems.
Cronulla uses Apache Curator library, which is a high level, robust zookeeper client. It offers built-in recipes, including shared counters and locks. The relevant recipe for Cronulla’s case is leader election.
Following are steps to take In order to configure Curator to execute some task on a single leader:
- First the Curator client is built, including the zookeeper connection string.
2. Then a
LeaderSelector, (which is the leader election recipe abstraction) is created. It is provided with the following parameters:
- The Curator
pathto a unique Zookeeper ZNode representing this leadership group
LeaderSelectorListenerAdapterwhich defines the action to take once this node becomes leader. More details on item 4.
3. The LeaderSelector is then set to
autoRequeue() so that it puts itself back in the election pool after it has relinquished leadership.
LeaderSelectorListenerAdapter defines a
takeLeadership callback, where actions can be performed, as this node is now the leader. In our case the actions that are performed are scheduled cron tasks.
It is important to periodically check if this thread has been interrupted — an indicator that the leader needs to be relinquished and cron jobs should no longer be executed on this node.
For more information about Curator API and usage visit this detailed blog post.
Zookeeper Server together with Curator Client provide a powerful and relatively simple way to coordinate distributed microservices. Especially for leader election and guarantee of atomic scheduled task processing.
In reality, Wix Cron scheduler service is more complex and also uses Apache Kafka and Greyhound — Wix Kafka client, in order to guarantee eventual task successful processing (using Consumer-side retries)
Having a single leader eliminates concurrency issues like race conditions and corrupted state healing, but on the other hand it introduces a single point of failure and limited possibility of scaling.
2. Sharding large dataset retrieval using a Kafka Topic’s assigned partitions
Let’s take for example an e-commerce producer re-seller service. The service needs to cache all items that are part of a very large product catalog of a 3rd party distributor (e.g. eBay) for quick access. The only API we have to use is:
public Collection<String> listAllItemIds()
public Item getItemData(Collection<String> items)
In order to fill the cache, fetching all item’s data can take a long time (minutes) and will also take up a lot of memory. Sharding the fetch operation between many service nodes can help with reducing both latency and memory usage.
Apache Kafka to the rescue
A Kafka topic can be used to distribute retrieval of the large product catalog dataset among many service instances.
For the readers who are unfamiliar with it, Apache Kafka is a message broker. Each message is produced to some topic. Each topic is divided into partitions. Each consumer (Service instance) in a consumer group is assigned some of these partitions to process messages from.
Topic assigned partitions as sharding key
These partitions can be used as the sharding key for retrieval of large datasets among many service instances. Especially if the dataset retrieval is done through a call to 3rd party API which cannot be altered.
The procedure should be as follows:
- Create a Kafka topic with minimal amount of partitions for your current scale
- In your service, implement a Kafka Consumer such that the currently assigned partitions that are used to calculate the shard are always up-to-date and the caching job is restarted when the partitions are changed.
- Implement a caching job that
- Fetches all items ids using
- Calculates which ids in response are part of this instance’s shard according to its assigned partitions.
getItemDataonly with items that are part of this instance’s shard
As mentioned in step 2, In order to make sure the shard calculation is always up-to-date you would need to implement the
ConsumerRebalanceListener when subscribing to the sharding topic.
onPartitionsRevoked callback, make sure to stop the ongoing job that updates the cache, as it is working on a stale list of partitions and also clear the currentPartitions collection.
onPartitionsAssigned callback, update the
currentPartitions collection with the provided newly assigned partitions, and restart the caching job, as now it can calculate the shard correctly again.
The last piece of the puzzle is how to calculate the items that are part of the shard. This involves a very simple sharding operation. The steps include:
- Retrieve the
hashCodefor each item id.
- Calculate the partition this id belongs to by using the
%(remainder) operator on the total amount of partitions the topic has.
- Filter the complete items list such that only ones that are associated with currently assigned partitions remain.
As you can see, with a few lines of code, a very powerful dynamic sharding algorithm is implemented using the power of kafka topic partitions and the partition rebalance mechanism.
The amount of shards can easily grow by simply adding more partitions and more service nodes.
Be careful not to have less partitions than nodes, as these nodes will be idle.
If there is an alternative API to polling data-sets where your service can listen to data updates, then you should probably choose it, as sharding can be provided out-of-the-box via kafka record keys (see next practice)
3. Producing a Kafka event with a key in order to guarantee sequential processing per domain entity
Due to GDPR regulations Wix is required to provide its users with access to all their personal data that it keeps. It is also required to delete this information in certain cases.
As personal data is kept in many different services at Wix, a centrailzied GDPR service was created that requests to fetch data or delete data from all services that can potentially contain personal information on a user (e.g. Wix Stores, Wix bookings, Wix Restaurants, etc…)
Fanout request using Kafka
Upon user request, a kafka event is produced by the GDPR service with a request to collect personal data related to this user with a request Id.
Once this request is consumed by the various services at Wix, the data is gathered and a series of responses is produced by each service.
Responses produced with request-id Key
The responses include various data related to the user and are produced back to Kafka with the request Id as the record key. Attaching a key to the produced event, guarantees that all events will be produced to the same Kafka topic partition.
Once all data of the user is produced, a completed status event is produced to the same topic, indicating that all the data has been produced. This status is also produced with the same request id.
Sequential processing by request-id Key
The GDPR service’s Kafka consumer can then safely consume all data related to a user sequentially and have a guarantee that once it processes a Completed status that all of the data has been received and this status can be updated for the original user GDPR access request.
Greyhound Blocking retries
In case of processing failure the GDPR service consumer automatically retries processing the same message again with exponential backoff. This is one of the features of Wix’s Greyhound Kafka client.
Sequential nature is guaranteed, because the message will not be committed unless successfully processed.
By producing response events with a unique key, the GDPR service is able to guarantee that processing is done sequentially per user request and thus keep the processing atomic per user and with correct status along the way. In order to meet high throughput and the growing list of services with potentially relevant data, the amount of partitions for this topic can be easily increased together with the amount of GDPR service nodes.
The different practices demonstrated in this blog post all guarantee atomic operation on various levels.
Having a single leader node to do work will be very simple, but will naturally constrain the possible scale, that is why it is important to keep the scope of the work to a minimum.
Utilizing Kafka topic partitions, whether for sharding, or for sequential processing, is a great way to achieve atomic operations in a distributed system without compromising on the scalability and performance aspects.
Thank you for reading!
If you’d like to get updates on my future software engineering blog posts, follow me on Twitter and Medium.
You can also visit my website, where you will find my previous blog posts, talks I gave in conferences and open-source projects I’m involved with.
If anything is unclear or you want to point out something, please comment down below.