Car Wash Pattern: Parallelizing Non-Thread Safe and/or CPU-intensive Processes with Future Based Queue Centric Approach in Python

Bahadir
VLMedia Tech Blog
Published in
9 min readMar 25, 2022

If we have to parallelize the code that is not thread-safe and/or has a time-consuming cold start, the approach explained here can be used as an effective solution. This method will be particularly useful when you need to convert heavy workload modules into a web service.

The title seems more appropriate for an article from more scholarly literature, but the content will not be like that, don’t worry. Besides, in my opinion, trying to explain a parallelism subject in a formal way — which is the best approach to avoid using it unless it is necessary — is a separate pain for both the writer and the reader. Yes, for code, execution, and most importantly your mental health you should avoid concurrency/parallelism if possible. If you can’t avoid using it, it’s a good idea to choose a language designed especially for parallelism. Like Go, or Rust (think twice before choosing Rust, and please read what’s written against Rust, not just what’s written by fans).

David Baron’s popular note in Mozilla’s San Francisco office says: “Must be this tall to write multi-threaded code”*

The concept of parallelism, which is known to be one of the weak points of Python, is not actually an insurmountable obstacle for some well-defined problems. The important thing is to express the situation in terms of these problems. We will be discussing such a specific problem that can be explained as the car wash analogy. This will provide us with an environment that will allow CPU-bound and/or non-thread-safe processes to work in controlled parallelism later on.

But first, in every article that touches on the subject of parallelism in Python, it is obligatory to mention the thread ambiguity originating from the GIL, albeit briefly.

Disambiguation of Threads and Pythonic Threads

I will not go into to the depths of concurrency, parallelism, threads, and processes topics in this article. Because I myself refrain from reading the articles that start with the creation of the universe. But a quick recap on concurrency and parallelism might be helpful here. If you are already familiar with these concepts, you can proceed to the next section.

Concurrency is the composition of independently executing computations, and concurrency is not parallelism: concurrency is about dealing with lots of things at once but parallelism is about doing lots of things at once. *

If this explanation I quoted from Wikipedia was not sufficient, please review those topics for the sake of understanding the rest of the article.

While concurrency can be provided by a single thread (like in NodeJS, Java NIO), parallelism can only be provided by multiple threads due to its simultaneous nature. If we look at the general definitions, we can roughly say that threads are workpieces executed inside processes. Threads are part of a process, and every process has at least one thread. You can run many threads in parallel in a process. But in Python, as a consequence of Global Interpreter Lock (caused by the non-thread-safe nature of the Python interpreter), you cannot run threads in parallel, but you can run them concurrently. So in the world of Python, threads mean concurrency, not parallelism. That’s why we refer to processes, not threads when we need parallelism in Python.

But as you will remember, threads are light and processes are heavy. Summoning and killing too many processes means heavy overhead. So it’s wise to consider processes for CPU-intensive parts that will survive through the program runtime. I think this is a special case of Python that is often overlooked.

Another important nuance is inter-process serialization/deserialization overhead. Threads use the memory of the processes they belong to and do this directly, but when it comes to data sharing between processes, Python uses serialize/deserialize mechanism. This of course adds overhead and creates a serializability requirement.

Queue Based Parallel Processing

Queues are particularly useful in acting as cross-process sharing tools. Go uses a channel mechanism for this purpose and they are wonderful. You can even nest the channels in Go like a channel of channels. I have to stop here or my Go flattery may take over the rest of the article.

Queues (from the multiprocessing package) regulate the most critical parts of the execution. You can see a lot of examples around built with a similar approach to the flow below.

On the left, we see the work packages coming to the system. These are transferred to a thread-safe queue, providing an FCFS structure. Then, on the right, we see the processes that receive and process the jobs from this queue in order. In this example, you can think of work packages as a letter you have written or a shipping package you will send. When you hand your package to the post office, it is put in a queue (at least we hope so) and sent for distribution. Distribution takes place in parallel thanks to many courier or postal workers. Here you don’t wait for the person to write back to you in front of the post office after you deliver your package to be sent. You go home, if someone else sends you a mail you receive it, if not, you move on with your life. We can call this action fire-and-forget (or more technically publish as in pub-sub pattern).

Implementing Queue Centric, Future Based Request-Reply Supported Parallel Processing System or Simply: Car Wash Pattern

What if we have to wait for a response after we submit our request. At this stage, I especially used the terms request and response (or reply). Because almost all web services work with this pattern. Users send us a request and then wait for a response from us. Most of the web works like this undoubtedly. How can we extend this pattern to works with the req-rep pattern?

We can explain this new problem with the car wash analogy. I don’t know if this service works the same all over the world, but let’s assume that it works like this: When you take your car to a washing service, the person who greets you accepts your washing request if the waiting queue is short enough for finishing your car’s wash in that day. Then you hand over your car, get a token that probably has your car’s license plate on it, and start waiting. If there is no other vehicle waiting in line in front of you, available wash personnel will start working on your vehicle. After the work is finished, you will receive your car by handing over the token. You requested to have your car washed and you got your washed car back in response. Meanwhile, many personnel worked in parallel through a queue system inside.

In this analogy, we can match the washing action as the job, the car as the content of the job, the washing personnel as the process (we assume that only one person washes each car for the sake of simplicity), the line where you leave your car as the queue, and the token in your hand as the Future. You can still remain as a user.

At this very stage, it would be nice to recall the Future topic. Futures are akin to promises we are accustomed to from javascript. These objects mediate you in getting the result of the work you have delegated. In exchange for the work you handed over to me, I’ll give you a Future object so you can go to sleep. When I’m done, I report this to this object, and it wakes you up and notifies you of the result. But in Python, you can’t exchange Future objects between processes. Because these objects are not serializable and this is how Python provides communication between processes as you remember. In fact, we don’t have to transfer them. We can keep Future in its own process, give it a number and change that number instead of Future.

Car Wash Pattern

It roughly explains the whole solution in the flow I’ve drawn above.

First of all, every incoming request is converted into a Job item. This item contains the JobID and the payload required for the job. Before the job is queued, a Future associated with this job is created and saved on a map (dict). Then the job is queued and the current thread yields on this Future’s result. Meanwhile, processors (P1, P2, P3) waiting in the queue receive the job, process it, and add the result to the results queue. The result items contain the JobID of the relevant job and the payload of the result. A loop running in a thread of the startup process also constantly listens to the result queue, and when a job is added to it, it finds the relevant Future via Future Map. This adds the result payload on the Future and then returns to the thread that is waiting for it on the Future. It sounds confusing when you tell it, but the important thing here is the communication between processes is provided only through queue structures and Futures are kept within the initial process.

Now let’s look at the implementation and everything will be completely clear. The first file implements the pattern and the second one is an example usage.

What you’ve seen does the work I mentioned above. When you run Usage.py, it will randomly simulate successful and timed-out operations by changing operation durations.

As you can see, there are some things that the implementation of QP (Queue Processor) does extra:

  • Timeout: Python’s queues come with a timeout feature. We can utilize a back-pressure mechanism by exposing the use of this feature on job and result queues. In this way, we can warn the party sending us a job if there is more work than we can handle. In this way, the system before us can scale us up in these situations, or if this is not possible, a “system is busy right now!” warning can be returned to the user. Otherwise, the user can wait for hours to finish the work she has given.
  • Dynamic Instantiation of Worker Class: We talked about some of the disadvantages of Python, but the painless dynamic class instantitating advantage that I will talk about here allows us to obtain a much more flexible and generic system. As you can see in the “_work” method, we take the class implemented by the user and instantiate it ourselves in each process. Thus, this class instance is isolated inside this process and the structure of the implemented class does not need to be thread-safe and/or serializable. Because this class has already been created in the process that it should be, inter-process transfer is not in question.
  • Separated cold start and work phases: As you can see, the “worker” class has two methods that need to be implemented. The `load` method is the part that will only run once qp.start() is called during the QP’s lifecycle. The `run()` method is the part that will be called each time a task is received. Therefore, we can do all the costly loading operations we need for the work, inside the load method. During operation, we can access these attributes that have already been loaded.

Caveat: There is also a point that can be considered as a deficiency in this implementation. As you can see when you examine the __listen_results method when a result is received, the Future connected to it is removed from the list and then the result is set. In this case, if the future has not started to be listened to until this stage, the Future that it is tried to be listened to after this moment will not be found. This is the case for jobs that are finished before we even have to wait. You can think of it as having the car wash finish washing your car without giving you your token. If there is a scenario that can create such a situation, there may be several ways to overcome it. For example, the removal of Futures can be done when the listening job is done. In this case, it is necessary to be careful that the jobs that are not listened to accumulate and block the system. In order to prevent this, a mechanism can be considered that will allow the automatic removal of Futures to rest for a certain period of time.

I hope the article is useful to you. If you see anything missing, wrong or have any questions, please feel free to write.

--

--