Workload Distribution Dynamics: Mastering Efficiency in Application Performance

Guillermo Areosa
Mercado Libre Tech

--

If you’re passionate about implementing high-impact solutions, you’re in the right place. Welcome to this article, where we’ll explore how Mercado Libre’s Stream Service faced and overcame a challenging workload distribution issue. This resulted in a more resilient system, with a 95% decrease in lag alerts and a developer-friendly environment within our ecosystem.

To give context, the Stream Service manages real-time information streaming within the Fury platform, an internal tool designed for Mercado Libre’s developers. This service handles a massive load, transmitting approximately 30 million messages per minute to multiple destinations, including queues, key-value storage, and relational databases such as BigQuery. For further insights on Fury, you can explore this post.

The Challenge of Workload Managing to Avoid Exceeding Limits

Imagine you’ve developed a cloud-based app following the guidelines outlined in the Twelve-Factor App (12F). Its main function is to process information and send the results to a complementary service, such as a database or an external app.

Example of information processing in a backing service
Figure 1: Example of information processing in a backing service

Under certain conditions, like unexpected peak traffic, your application may need to handle increased loads. As a cloud-native application adhering to the 12F principles, you will choose a horizontal scaling approach.

Information processing in a backing service from various instances
Figure 2: Information processing in a backing service from various instances

So far, so good. Your application’s horizontal scalability, aided by a load balancer, enables efficient management of the surge in data. This strategy is effective whether your application operates as a web API or a worker because the data source is a minor factor in this context.

The challenge arises when an unexpected event further escalates the processing volume, surpassing the capacity of your data destination. The destination could be an inherited database or an external application beyond your control, making horizontal scaling unfeasible.

At this point, it is crucial to consider your destination’s data capacity; let’s assume it can manage 100,000 requests per minute. Now, the question is: How can your application control the workload to avoid surpassing 100K RPM, especially amid fluctuating instance numbers?

Before revealing the solution to this challenge, we should underscore this problem’s impact on the Stream team. As mentioned earlier, the team handles the transportation of a substantial amount of the company’s data. This involves data ingested from various sources, including CDC (Change Data Capture) from databases and standalone origins. All these data points are delivered to multiple destinations, impacting nearly all vital company processes. We are talking about over 30 million messages from over 12,000 origins, connected to over 15,000 destinations, representing the communication of 2,500 microservices underpinning platform operations. Adding to this complexity, the delivery of these messages (workload process) is carried out from more than 800 instances that fluctuate constantly.

During the last months of 2023, the team faced approximately 600 alerts. Each alert required an average of 30 minutes of attention from a team member, which is critical for maintaining the platform’s uptime. These alerts usually occurred unexpectedly, putting significant pressure on the team. But this is a problem from the past! Let’s see how we solved it.

Our First Iteration

The impressive 95% decrease in lag alerts didn’t happen overnight. It resulted from several iterations and adjustments based on the teams’ available resources.

During this first iteration, we focused on balancing the load per instance, taking into account the number of active instances. We implemented a system where the total number of permits was distributed evenly. For example, if the destination could manage 100,000 requests per minute and the application had ten active instances, each instance was assigned 10,000 permits.

This system was based on three key components: a message queue, a database, and an additional application responsible for compiling and storing data, as depicted in the diagram below:

  1. Every time a new instance started processing, it sent an alive signal to this new application, which we will call worker-balancer, through the message queue.
  2. This signal was registered in the database with a limited Time-To-Live (TTL). Since real-time transmission was not crucial, reasonable efforts were made to ensure its delivery.
  3. Subsequently, our application, which we will call worker, interacted with the worker-balancer to determine the number of alive instances and equitably distributed the permits proportionally based on this information.
Equal load distribution in the first iteration
Figure 3: Equal load distribution in the first iteration

The initial approach of evenly distributing processing permits among instances seemed perfectly suited to our requirements. However, we faced a significant challenge: this strategy assumed that each instance would execute the same processing load, which is not always the case. According to AZ Affinity, variables such as the performance of the load balancer or the distribution of traffic could contribute to this disparity.

It is essential to account for scenarios where different processing tasks may not demand the same time or effort. For example, processing an invoice containing thousands of lines will naturally take longer than one with only a few lines. Furthermore, even if the load is evenly distributed, the data source may not allocate tasks equally — potentially directing specific task types to particular instances to maintain processing sequences. Consequently, this approach may lead to instances where a higher processing capacity is underused.

Let’s illustrate with a concrete example: let’s consider a scenario where there are 100,000 requests per minute and two available application instances, A and B. In this case, the system allocates 50,000 requests to each instance for processing. However, if instance A can only process 20,000 RPM while B can handle 80,000 RPM, our equal distribution model would set a limit and only allow us to process a total of 70,000 RPM: 50,000 from B and 20,000 from A. As a result, there would be 30,000 requests per minute left unprocessed, leading to load processing lag. Although this approach prevents overloading the destination with excessive requests, it also underuses the application’s full capacity, causing processing delays and ultimately failing to meet the business demands.

Example of resource squandering due to equal distribution
Figure 4: Example of resource squandering due to equal distribution

After obtaining these insights, we decided to revamp the application architecture. We restructured the existing framework to inform about host availability and provide comprehensive details on the application’s current load status within each host. Additionally, we adjusted the worker-balancer to analyze this information effectively and serve as the load distribution center.

New Information Model

As previously stated, in the first iteration, the information sent was limited to rendering the host activity status, which was a relatively simple task. However, with this new approach, a mere approximation no longer suffices. This detailed data is now crucial for workload distribution calculations, encompassing essential details such as instance_id, timestamp, designated permits, and used permits, among others.

On the other hand, sending synchronized information while processing work is neither practical nor possible since this would require more calculations and hinder work processing. To address this issue, we adopted a similar approach to that of monitoring libraries like DataDog or NewRelic.

For this new outline, illustrated in the image below, we created a custom in-memory data structure (asynchronous queuing) that compiles, consolidates, and sends information in an asynchronous thread.

The workflow operates as follows:

  1. Initiation of a new workload processing
  2. Activation of the supporting service
  3. Accumulation of processing details and permits in a memory buffer
  4. Periodic compression and clearance of the buffer by a dedicated thread every minute
  5. Dispatch of the compressed information to the message queue
  6. Storage of persistent data by the worker-balancer for subsequent processing
Figure 5: Flow of information transmission

So far, we’ve figured out how to send valuable information. Now, we have to focus on what to do with this information and how to add value to the solution, which is when the worker-balancer takes center stage.

The Main Cog in the New Mechanism

The renewed application has three functions: persisting valuable information, handling distribution, and making it available with high efficiency.

Persistence

To address persistence, we implemented a streamlined process that receives information from the queue and executes a bulk UPSERT operation in the database, ensuring data persistence only if the timestamp of each tuple is lower than the new element. This approach lessens the queue’s workload and eradicates dependencies on message order. A relevant execution detail is that the inserted elements have a three-minute TTL, which is helpful in scenarios such as the cessation of a host.

Architecture with information persistence flow
Figure 6: Architecture with information persistence flow

Distribution

For effective distribution management, we need to save the host’s permit percentage. This is a crucial decision because reprocessing all the results won’t be necessary if a user changes the number of permits per processing. For example, suppose we set 100,000 requests per minute and distribute them at an 80% ratio to host A and a 20% ratio to host B. This percentage scale will remain intact even with an increase to 200,000 permits, enabling prompt reflection in the worker instances. The distribution algorithm is simple: if a underuses its permits, the surplus permits are reallocated to another host using all permits for that iteration. This continuous distribution assessment on a minute-by-minute basis allows for ongoing adjustments until the optimal distribution is achieved.

Architecture with distribution generation flow
Figure 7: Architecture with distribution generation flow

Availability of distribution

To prevent prolonged and inefficient response times caused by querying available permits for each instance, we implemented a system where distribution processing occurs every minute. After assessing distributions, they are stored persistently in long-term storage, specifically Amazon S3. Then, when a request for host distribution information is received, we can promptly access the file, retrieve the data, and transmit the results to the client with minimal response time.

Architecture with distribution availability flow
Figure 8: Architecture with distribution availability flow

Conclusion and Next Steps

Restructuring our application’s architecture marked a pivotal stride in enhancing user experience, increasing system resilience, and significantly reducing the number of latency alerts. Through iterations and steady adjustments, we successfully optimized workload distribution, catering to the sector’s evolving demands. We’re excited about the favorable outcomes of these improvements’ positive impact and are committed to continue innovating, optimizing our systems, and ensuring peak performance.

--

--