How We Migrated from Python Multithreading to Asyncio

Dor Indivo
6 min readJan 12, 2022

--

Background

At Lemonade, we seek to improve the insurance experience by incorporating Machine Learning into many aspects of the process, from onboarding users with our chat experience, Maya, to predicting catastrophes. In practice, we currently have dozens of ML models in production, all of which significantly rely on our in-house feature store for real-time inference and batch training.

The feature store service is exactly what we aim to improve.

Feature…what?

In this context, a feature is a data point for a machine learning model. If you’re interested in understanding a bit more about machine learning platforms, this is a great resource to get you started on feature stores.

Our feature store, like many others, was designed and developed to increase our Data Science velocity by addressing the below challenges:

  1. Feature (data) parity during model training, batch, and real-time predictions
  2. Feature (data) reuse for Data Scientists on various teams

Architecture

The feature store service was written in pure python (version 3.7) on the FastAPI web framework using a variety of additional auxiliary packages.

A single HTTP call to the service can request numerous CPU or I/O bound features. In the past, both FastAPI and the internal multi-feature request concurrency implementation used threading for simplicity.

Let’s take a closer look at how we used threading:

In this example, we have a model that asks for three different features (feature1, feature2, feature3), hence our web framework (FastAPI) will handle this request behind the scenes using a thread-pool. We use an internal thread-pool to calculate the features concurrently by requesting data from upstream services.

So why migrate?

The feature store service has evolved into a key component of many customer flows, and over time we realized that the current architecture will be difficult to maintain while demand grows greatly.

Why? There are several compelling reasons:

  1. Performance — As noted previously, we have dozens of models in production, and a reduction in latency will have an impact on all of them, and in turn our customers.
  2. Scale — it uses hundreds of threads for concurrency. We frequently experienced problems such as can't start new thread from the host system, making even horizontal scaling extremely difficult.
  3. Resource consumption — the service utilized up to 18GB of RAM to handle our current workload.

Asyncio to the rescue

In comparison to the thread-based concurrency paradigm, the (mostly) I/O bound feature store workload is a great choice for using an event loop.

I’m not going to explain what asynchronous programming is, but if you’re familiar with the topic you’ve undoubtedly heard of asyncio, which is the python implementation.

Starting from the bottom

The main idea was to start with the infrastructure code such as making our HTTP client support asynchronous methods (such as async get, async post, and so on).

When we finish with the HTTP client we can move to the feature resolver (where the majority of the complicated code is) and only then we should move to the top and make the FastAPI handling requests functions asynchronous.

To make production stable throughout this long migration process, we divided it into three phases (in each phase we will migrate from synchronous to asynchronous and validate performance):

  1. Features that are based on low-latency services
  2. Features that are based on medium/high latency services
  3. Cache

This is a fantastic idea, but there is one snag:

How do we handle asynchronous and synchronous models at the same time without degrading performance?

One coroutine to rule them all

This coroutine allows us to calculate some features asynchronously, while others can be calculated concurrently using the thread pool!

Lessons learned along the way

Pay attention to warnings

As always, warnings can help you find bugs in your code, in our case you should pay attention to this specific warning:

RuntimeWarning: coroutine ‘my_lovely_coro()' was never awaited my_lovely_coro()

It means that you probably forgot to use the await keyword. Finding this error quickly when refactoring a large amount of complex code can be quite valuable.

Be aware of loops

When migrating synchronous code to asynchronous code, we should keep performance in mind, especially when it comes to loops.

values = [feature.calculate() for feature in features]

We have a basic for-loop in this example that calls the method calculate for each item in the list

To run this code asynchronously, we can simply add the await keyword before calling to this coroutine (assume we already changed calculate to coroutine):

values = [await feature.calculate() for feature in features]

The issue with this approach is that it will run in a synchronous manner. We’ll wait for the last feature calculation to finish before moving on to the next iteration.

We can do this concurrently with the coroutine asyncio.gather:

values = await asyncio.gather(*[feature.caclulate() for feature in features])

Keep track of your threads

Using the threading library’s active_count is a great approach to keep track of the number of active threads (in our case, this number should be 1):

print(f"Number of threads: {threading.active_count()}")

This method was useful in identifying cases when we were still creating unnecessary threads, such as when handling HTTP requests.

In theory, converting the FastAPI requests handling functions to async would result in a significant reduction in the number of threads because we are now only using one thread, the event loop.

Here’s the catch, all of your dependencies (e.g. get_put_features_service) should be marked as async , otherwise FastAPI will continue to use the thread pool for concurrency!

Inheritance ❤️ async

The code can be structured in an inheritance-based manner to accommodate both async and sync models.

we’ve a basic implementation of Feature which has only one function called calculate that calculates the value of the specific feature.

FeatureProviderService is responsible for getting the feature value from the right service.

There are two features A and B, The first one can be calculated asynchronously by the AsyncFeatureProviderService, while B can be be calculated synchronously by the FeatureProviderService.

This implementation gives us more flexibility when it comes to running features concurrently:

  1. It’s simple to migrate a feature — when the feature B is implemented asynchronously, we only needed to add the async keyword to the calculate function.
  2. We have control over how a feature calculates— We mark a feature as synchronous (without using the async keyword in the calculate function) if it is CPU-bound, and the thread pool will run it concurrently.

Use persistent client

We discovered that when we switched from a boto3-based cache client to an async aioboto3 cache client, the performance of the feature store service suffered significantly.

The main reason for this was that instead of using a single persist object, we created several clients and tables objects.

There are numerous ways to solve this problem, such as creating a global client, but before jumping into implementation, please read your client’s documentation first. In most cases, you’ll find the right answer for you.

TL;DR: Making the client a class member solved this problem in our scenario.

Asyncio and pytest

The async code was written as coroutines, making it slightly more difficult to test with standard testing tools, but we found some great libraries that made testing really simple.

To mark tests as asynchronous we used the decorator pytest.mark.asynciofrom the pytest-asyncio library.

We also needed to mock coroutines and other async objects, so we used the asyncio-test library to do so.

Benchmarking and results

The procedure

In order to create the most precise and reliable benchmark possible, we chose to conduct the following in this process:

  1. JMeter performance tests are only conducted on the same machine.
  2. Mimic diverse requests to reproduce a real world workload.
  3. Use Hoverfly to simulate dependencies — so that production services are not impacted.

Results:

We discovered astounding outcomes in terms of performance and resource utilization:

  1. Latency reduction by up to 40%, which has a significant influence on production (e.g pictured is one of our core models latency decline after deploying phase 1)
  2. RAM use was reduced by up to 70%
  3. Throughput can increase by up to 25%

With these excellent results, the feature store service can easily handle increased load and scale horizontally more effectively.

--

--