Building highly concurrent systems using Python

Agustín Romano
Ordergroove Engineering
9 min readApr 19, 2022
MLenny / Getty Images

Ordergroove is a SAS product and it is architected to be a multi-tenant application — one system hosting and serving all its clients. Certain services in our platform have high concurrency requirements where we need to process large data in a time efficient manner. E.g. we need to send shipments to merchant’s ecommerce platforms, and for some merchants there are specific requirements to deliver these shipments at a set time, because some of them might have large warehouses where hourly wage workforce will be sitting idle if the shipment information is not received by a certain time, or some of them sell perishable goods that would go stale if they can not be shipped by a certain time. This means that our system will need to process these workloads of hundreds of thousands of tasks at a very high rate consistently.

In this post I will walk through a high level approach of how we can accomplish this.

Problem statement

Our clients use different ecommerce platforms, and even within a certain ecommerce platform there are different tiers which give them variable parallel processing capabilities. Some can handle five concurrent requests, others can handle five hundred. Their time to process a request and the response times to the API calls also vary.

Given this highly heterogeneous pool of clients, our goal is to implement a system that sends each one of them the shipments due for a given day through HTTP requests, where each shipment message is sent in a separate request.

Additionally, our system need to meet the following non-functional requirements:

  • Scalability: When new clients start using the system, it is easy to scale up the resources to serve them in a timely manner.
  • Reliability: Shipments due for a given day are reliably delivered to all our clients and they are delivered only once.
  • Predictability: A client can have an accurate estimate of when we are going to finish sending the due shipments.
  • Transparency: At any point in the process, the client can know how many shipments were delivered and how many are outstanding.
  • Fairness: A client is not impacted by high traffic or large volume of data being processed for other clients in the multi-tenant system.

Let’s start with the simplest solution

The naive solution for this problem would be a process that takes all shipment messages due for a given day and starts sending them one after the other. A new message would be sent as soon as we receive the response of the previous one until we finish with all of them and move on to the next client. We could also maintain a counter to keep track of what was already sent.

Basic diagram of naive solution

Even though this solution is functional, there are a lot of issues one can notice at plain sight:

  1. We have no way of scaling our system as new clients join. Thus, we will come to a point where we won’t be able to send all the shipments due for a given day.
  2. A client with slow response times or a large volume of requests to be sent will delay the clients that come after it.
  3. We are not leveraging from the ability of some clients to accept multiple requests at the same time.
  4. Every time we send a request we are wasting clock cycles by doing nothing until we receive a response.

Let’s try to tackle 3 and 4 first by using asyncio.

Squeezing our clock cycles with asyncio

Instead of waiting for a request’s response to arrive before starting sending a new one, we can use that time to send as many new requests as the client allows us to. For that, we will need to introduce our first flavor of concurrency.

In this case, the well-known Python built-in framework asyncio seems like the right tool for our needs. On top of it, we can use the aiohttp library, which enables us to send concurrent requests (httpx could also be a valid alternative).

Since asyncio has been around for a long time, there is a lot of material that can help you learn how to use it. Just to mention a few:

In a nutshell, asyncio allows us to declare and execute asynchronous functions called coroutines. By combining its primitives (and perhaps using some extra libraries) we can easily achieve that when a coroutine is waiting for the result of an I/O operation (e.g., sending an HTTP request), control is released so that another coroutine that finished waiting (if any) can resume its execution. The component that is constantly polling (or looping through) the pending coroutines to pick which one should gain control is called the event loop. Every time the keyword await is called inside a coroutine, control is given back to the event loop so that it looks for the next coroutine to resume execution.

In this way, asyncio provides us with a concurrent model which allows a single threaded process to advance several operations seemingly at the same time.

Going back to our example, let’s draft a possible solution using asyncio and aiohttp.

After creating a new instance of the ShipmentsSender class, we can start sending requests to a given client by calling run(). This will make the event loop give control to the main coroutine which will create concurrent tasks to send the requests. The number of tasks that can be created at a given time is limited by the max number of allowed requests, kept tracked by requests_counter.

When reaching the maximum number of tasks that can be created at that time (or if there are no messages left to be sent), the main coroutine releases control to the event loop by calling await. At that moment, the event loop can pick any of the created tasks to send the requests. Each of these tasks also releases control to the event loop when sending the request and waiting for the response, so that another task or the main coroutine can make some progress in the meantime.

Before exiting, the main coroutine makes sure that all other tasks have finished by calling _wait_for_all_pending_tasks.

Finally, ShipmentsSender exposes the terminate method in case it needs to be called to gracefully terminate the process. We could call this method inside a signal handler:

import signal
def function_running_shipments_sender():
shipments_sender = ShipmentsSender(...)
...
signal.signal(signal.SIGINT, shipments_sender.terminate)
...

With this simple example, and thanks to asyncio, we were able to enhance our program to make the most of our resources and be able to send several requests at the same time by introducing some concurrency at the Python process level.

Diagram for our solution with asyncio

Some final notes about asyncio that may be of interest:

  • You are not actually tied to use a single implementation of the event loop. If the one that comes by default doesn’t fit your needs for some reason, you have the choice to use a different one. One of the alternative event loops, mentioned in import asyncio: Learn Python’s AsyncIO, is uvloop, which claims to be 2x faster than the one that comes by default.
  • We need to account for the fact that our process may be terminated in the middle of its execution (for example, when a new release is deployed). When that happens we need to make sure our process can gracefully terminate by properly handling coroutines that haven’t finished their execution yet.
  • Even though a process using asyncio is single-threaded, we can still have situations similar to deadlocks where no pending coroutine can advance.
  • It can be a bit difficult to write unit tests where we need to mock aiohttp requests. There are some libraries, like aioresponses, that can help you with this task. Nevertheless, if for some reason the tools out there are not suitable for what you need to do, here’s a simple code snippet where I mock aiohttp requests in the message sender from the example above:

Introducing some parallelism with processes

Even though we have made some improvements in our system, clients are still served one after the other. As a consequence, a client’s execution impacts the ones that come after it.

In order to tackle that, we can introduce some parallelism by creating a new process for each client we need to serve.

The code for that can look like the following:

import subprocessclass ProcessSpawner:
...
def run(self):
while self.running:
if not self._can_spawn_a_new_process():
sleep(SECONDS_UNTIL_NEXT_RUN)
continue
command_to_execute = f‘python shipments_sender.py --url {url} ...’
p = subprocess.Popen(command_to_execute)

Also, we need to make sure that we do the proper clean up after a given process finishes. For that, we can keep track of the processes every time they are created and periodically poll them to see if they have finished. This will also be useful to keep the count of the processes that are still running and implement the _can_spawn_a_new_process method.

For that, we could extend the previous class in the following way:

import subprocessclass ProcessSpawner:
...
def run(self):
while self.running:
...
self.poll_running_processes()
p = subprocess.Popen(command_to_execute)
self.running_processes[p.pid] = p
def poll_running_processes(self):
for pid, process in self.running_processes.items():
return_code = process.poll()
if return_code is not None:
# process has finished, account for it…

Another possible approach to account for finished processes could have been listening and handling “child term” signals:

import subprocess
import os
class ProcessSpawner:
def __init__(self, ...):
...
signal.signal(signal.SIGCHLD, self._handle_child_term)
...
def _handle_child_term(self, signum, frame, *args, **kwargs):
os.waitpid(-1, os.WNOHANG)
# process has finished, account for it…

Even though this seems simpler than being constantly polling for all running processes, there are at least two drawbacks about this approach:

  • If we need to perform several tasks when a process finishes, we can end up having a lot of code in _handle_child_term. However, as a rule of thumb, I would avoid running complex operations on signal handlers. This is because whenever a signal is received, the Python program execution can be interrupted at any point and give control to the handler. Therefore, some unexpected race conditions may arise if the handler changes some state in the middle of, for example, writing something to Mongo.
  • In this sample code we are using os.waitpid to clean-up finished processes while we are using subprocess.Popen to create new ones. In my experience, I have had some issues when using different modules to manage processes. That is why I only used subprocess primitives in the original example.

After adding this enhancement, we are now able to serve different clients in parallel, making our system more fair, transparent and predictable. Also, we are better using our resources and improving throughput.

Diagram for our solution using processes

Horizontally scaling our system with kubernetes

Even though we now have the ability of serving each client in a different process, there are still some things we need to do on the scalability front.

If we want to support hundreds of clients with this version of the system, we would need to vertically scale by spawning hundreds of processes in a single server. This approach does not seem optimal at all.

What we could do instead is leveraging from kubernetes and spinning up or down different pods running our multi-process system depending on the scale we need at a given moment.

Diagram of our multi-pod solution

This last modification will require a new concurrency model at the architecture level so that the different components can interact with each other in a correct way. Since going though that model deserves a whole new post for itself, I will just pose some open-ended questions we should be asking ourselves when trying to build this new system:

  • How is the system going to choose the pod that is serving the next client?
  • How do we allow a given pod to communicate that it now has capacity to serve clients?
  • Which communication mechanism are we going to use between the different components of the system?
  • How do we monitor the different pods so that we don’t end up with a pod not sending any request?
  • And many more

Conclusion

Through these simple examples I shared some thoughts about working with concurrent systems using Python. We could also see that sometimes we need to structure a concurrent model in different application layers.

As we could see in this article, Python provides a lot of features to build a highly concurrent system, with their own pros and cons. There are some other languages, e.g. GO, that are more suited to implementing concurrent systems, something I would like to explore in a separate article in the future.

--

--