Concurrent Inference
Introduction
This is a post about getting multiple models to run on the GPU at the same time. This is a post about the torch.multiprocessing
module and PyTorch.
This could be useful in the case of having to serve the model as an API where multiple instances of the same model can be running inference on a single GPU in a concurrent manner. Alternatively one could also use Torchserve.
This post is divided into 4 sections:
- Context — Qualitative Evaluation of Trackers, a skippable section that provides context in the form of a use case.
- Two Methods for Concurrent Inference, a section where I illustrate two ways to go about concurrent inference.
- A Simple Use Case, the main section which contains the code on how to perform concurrent inference.
- Footnotes, additional stuff that may be of help.
I got a bit carried away making diagrams for this post, anyways here ☝ ️is the legend to the different components I have used in the diagrams. In all the diagrams the flow of data is from left to right unless stated otherwise.
Context — Qualitative Evaluation of Trackers
You can skip this section, I have added it provide context but it isn’t required to understand concurrent inference.
Background
I was working on a system that involved multi object trackers, i.e. a system that can track multiple objects, such as people, over the course of a video.
To evaluate the tracker I needed to annotate the input video with the output of the tracker i.e. bounding boxes or segments with colors assigned for each id.
Since I could run the tracker in two modes: 1. selective tracking where I can select the ids I want to track, and 2. non selective, there were three output videos that needed to be annotated separately one with just the detector’s output and one for each of the tracker modes.
One of the things being drawn onto the output frames of each of the three output videos were instance segments a piece of code being used in that particular function (mask.filter(ImageFilter.FIND_EDGES)
for applying edges) was a massive bottleneck.
This caused processing of the test video (~4 minutes at 15 fps) to take around an hour and forty minutes for the video with the most annotations, this was bad, there were several network components to test along with the two modes of the tracker, so every time I wanted to evaluate tracker it would’ve taken more than 3 hours.
Wanting to speed up qualitative evaluation as opposed solely relying on metrics wherein annotations wouldn’t have been required I decided to use concurrency which led me down a rabbit hole involving the GIL and other such friends and foes of concurrency in the python programming lore.
The Setup
The detector was independent from the trackers and each of the trackers were independent of each other, so it made sense to have each of the trackers and the detector on separate processes.
And since I was just learning the concept I got carried away and had the frame reader on a separate process too, so there were 6 different process running including the __main__
process which was fine because it was being run on a hex core Ryzen. The total number of processes could have been limited to 3 by keeping the reader and the detector on the __main__
process.
The data generation and consumption speeds were different for each process so IPC called for buffers and since my PC isn’t a Google datacenter, this excursion into over engineering called specifically for bounded buffers (are unbounded buffers used anywhere, seems like perilous choice). Buffers were maintained by using the mp.Queue
class.
- The input video [S] was read frame by frame by a reader [R] processes which put the frames into a buffer.
- The detector process [D] obtained frames from the buffer, ran inference on it then copied the output [C] into three buffers for each of the output videos.
- The annotator process [A] just annotated on the basis of the detector output to generate the output video [O3]; the other two processes [TA1,TA2] ran tracking and then annotations to generate their outputs [O1, O2].
Time Savings Due To Concurrent Inference
I had anticipated the speed up to be much more than was obtained, this is because I had not taken into account the bottlenecking caused by filled buffers, to illustrate this I included this ☝️ time taken curve.
In the plot ignore_new=True is the selective tracking and annotation process, the number of annotations for this are fewer than in ignore_new=False which is for non selective tracking and annotation and detection anno which is for annotating the detection output. Each curve pertains to one of the five processes excluding __main__
.
The initial increase in the blue curve is because of the reader to detector buffer filling up which is cause of the detection time bottleneck. The max size of all the buffers were set to 512 hence the first slow down is slightly after frame 500.
The second increase in blue and the increase in yellow is cause of tracker-annotator processes (green and purple) being slow and causing the buffers to fill up. This also causes red to increase cause detection can’t occur as quickly anymore because of the slowdown in yellow.
This initial phase roughly the first ~1500 frames is like the spool up time of the processes; this wouldn’t happen given unbounded buffers, all curves would be flat and the task wouldn’t complete because of an out of memory error.
The time take for all three annotation processes together was an hour and forty minutes which was the time taken by the most time consuming annotation processes, this still cut down the total processing time by half.
Since the code written for my use case is a bit verbose for a blogpost in the section below I’ll explain (with code) a simpler use case with a single producer and multiple consumers.
Note : In concurrency parlance a producer is one that generates data and a consumer is one that processes the generated data.
Two Methods for Concurrent Inference
Batched Concurrency
Batch multiple requests and execute them using a single model instance at certain intervals.
This is what occurs when a model is trained, the Dataloader
loads items from the Dataset
and batches them into a single Tensor
before it is sent to the model for inference.
The disadvantage of this is that there is waiting involved for either the batch to fill up with samples or until a timeout occurs.
Multiprocess Concurrency
Run multiple instances of a model, each on separate processes. Each instance running inference on a batch size of one.
If the resources allow for it then this form of concurrency would allow for quicker response times as the process doesn’t have to wait for batching each model instance can perform inference when the data is obtained.
Actuality is a Combination of Both
Ideally to maximize resource usage and throughput, both the modes will be combined where multiple instances of the model will be running inference on batch sizes greater than one, this allows for better usage of GPU memory because PyTorch allocates at least ~495MiB to an individual process even if a (1,)
dimensioned tensor is cast onto or created on the GPU.
In the simple use case I will be concentrating on Multiprocess Concurrency to increase GPU utilization.
A Simple Use Case
To showcase multiprocessing I will be explaining a simple setup where a processes [R] reads images from a folder [Fo] and multiple detection processes [D#] are used to obtain the class wise count of objects in the images and write it to a file [Fi].
It is a single producer with multiple consumers situation.
Aside 1 : Message Passing Structures
In this use case two message passing structures have been used, queues and events. Queues are implemented using pipes, both pipes and queues serialize objects for sending so only serializable objects can be transmitted else a TypeError
will be raised.
Event
- An event is used for signalling by setting and unsetting of an underlying boolean variable.
- Any process that is passed to an event object can set and unset the
Event
object's internal boolean using.set()
and.clear()
- It can also be polled by using
.is_set()
and be used for blocking using.wait(...)
Pipe
- A pipe is used to transfer messages between two processes and it shouldn’t be used when there are multiple producers and or multiple consumers because pipes are unsafe.
- The pipe can be simplex or duplex, calling pythons
Pipe
returns the two endpoints (a pair ofConnection
objects) of a pipe if it is duplex then both can be written to and read from using.send(…)
and.recv()
.
Queue
- Pythons
multiprocessing
module has 3 types of queues for use,SimpleQueue
,Queue
andJoinableQueue
- The
SimpleQueue
is unbounded and has no option for a non blocking.get()
but has.empty()
and.full()
which can be used for polling. - The
Queue
has a lot of additional functionality such as non blocking.put(...)
and.get(...)
calls that can also be used in blocking mode with a timeout. - The
JoinableQueue
has all the functionalities ofQueue
along with.join()
which will block until all the items in the queue have been processed which is indicated by calling.task_done()
after every call to.get(...)
.
All three queues can be used with multiple producers and consumers.
Both Queue
and JoinableQueue
use a background "feeder" thread for pickling and putting items into the underlying pipe that backs the queue, this can cause problems if not handled properly.
The Producer Function
The producer function has two primary functions : 1. Reading images from the folder and 2. Putting images into the queue.
What the code is doing is
- Creating a list of image paths having
.jpg
as their extension usingPath.
- Popping a path from the list if there is space in the queue, then opening it using
PIL.Image
after whichtransform
is used to convert it into a tensor. - This transformed image is
put
onto thequeue
along with theimage_path
for logging purposes. - Along with the queue an
event
is used for indicating that all the images have been read by callingevent.set()
- If the queue has no space the process will wait for sometime then check again. Here the
time.sleep(...)
is to prevent the processes from checking the queue with high frequency which would lead to more CPU utilization, alternativelyqueue.put(...)
can be called directly since by default it is a blocking call i.e. the process will wait until there is space in the queue to put an item. queue.join()
blocks the process from continuing until all the items in it have been processed.
The Consumer Function
The consumer function has to do the following things : 1. Reading an image from the queue, 2. Running inference on the image using the detector to obtain the output, 3. Processing the output and finally, 4. Writing the output to a file.
Here points 3. and 4. are handled by the function handle_output
.
Part 1 — Points 1. and 2.
The detect_objects
function does the following things
Opens the output file in append mode, all the detector processes write to the same file but open them individually rather than using a common io object.
The detector is put into evaluation mode and cast onto the device which may be a GPU.
Until the consumer process sends and indication (by setting the event
) that all the images have been put onto the queue and the queue becomes empty the process will do the following
- Get the image from the queue with a timeout call to
.get(...).
- Run inference on the image using the detector.
- Call
.task_done()
to indicate that the item removed from the queue has been processed; if this isn't called then the.join()
call in the producer process will block forever. - Call
handle_output
for processing the output and writing to the file
Clean up, finally the file is closed.
Aside 2 : Locks
- The lock is a synchronization primitive that is used to synchronize the usage of shared resources such as a file.
- The
Lock
object has two functions.acquire(...)
and.release()
- When a shared resource has to be used
.acquire(...)
is called, this locks the resource and other calls to acquire will either block or returnFalse
depending on theblock
andtimeout
arguments of.acquire(...)
. - After the process that acquired the lock is done with the resource
.release()
is called so that other processes can acquire the lock.
In this case the shared resource is the file that all the consumer processes are writing to.
Part 2— Points 3. and 4.
It is a simple function that does the following
- Filters the detection output by the detection confidence score.
- Gets an output string to write to the file.
- Acquires the lock so that other processes can’t write to the file then writes to the file.
.flush()
is called after.write(...)
to ensure that the output string isn't buffered and is immediately written to the file.- Finally the lock is released so that other processes are free to acquire the lock.
The Caller Function
The caller
function basically sets up the execution of the producer and consumer functions and then starts and waits for them to complete.
The caller does the following.
- Initializes the the
JoinableQueue
for passing of images,Event
to indicate completion of the reading process and theLock
for file writes. - Initializes the
Process
for the reader and the detectors by giving it the function (target
) that has to be run in a separate process along with theargs
for that function. - Starts all the processes by calling
.start()
then blocks execution until the each of the processes return using.join()
- Clean up, the queue is closed.
Calling the caller
.
- The main thing before calling the caller is to set the start method as
"spawn”
and wrap the caller in theif
block. - There are three start methods spawn, fork and forkserver out of which only spawn is supported if the GPU is to be used.
- When the start method is set as spawn the entire file is run for each process that is created, using the
if
block prevents thecaller
function from being called each time a process is spawned.
Footnotes
Source Code
The repository where all the above code is hosted. Only difference is the use of tqdm
along with Pipe
for logging. The code is standalone and can be run as a script by passing an input folder of images and an output file.
Additional Notes and Observations
Notes and observations about a few things. I came across these by trial and error, it may help in debugging.
Message Passing and Synchronization
Any argument passed to a different process through the mp.Process
constructor must be serializable when spawn method is used.
If possible use SimpleQueue
s cause they don't use additional threads for serializing data, but these queues are unbounded and have fewer features.
Queue.qsize()
is used for getting the count of items in the queue, this is not present on all operating systems.
Using Queue
may cause some errors such as ConnectionResetError
or FileNotFoundError
or BrokenPipe
which may be due to the "feeder" thread, this can be avoided by making joins explicit by a JoinableQueue
.
Lock
doesn't seem to be required there are no overlapping of lines in the output when not using a lock, file IO is probably synchronized behind the scenes.
Memory Usage
Use nvidia-smi
for GPU utilization and debugging OOMs.
Check if the worker count is memory safe for given input data size else an out of memory error will be thrown.
If tensors cast onto the GPU on a separate processes than from where it is going to be used additional GPU memory is assigned for that processes, i.e. a minimum of 495 MiB but the advantage is that transfer time is lesser, so if the data is not created on the GPU (such as when using torch.rand(...)
with a cuda device) it maybe better to cast the data on to the GPU on the process where it will be used by the model.
GPU Utilization
Speed up from using multiple processes depends on GPU utilization not on the memory being used by the processes, if GPU utilization is < 100% then speed ups can be obtained by using multiple processes for inference else if the utilization is 100% then there is just context switching occuring in the GPU.
As batch size increases the GPU utilization increases, but until the utilization reaches peak (100%) the time required process a batch remains roughly the same, after peak processing time increases with batch size.
GPU utilization will be low if the interval between inference is high which means that if intervals between inference requests are large then:
- Larger batch sizes can be used for processing with a waiting time for collecting the batch samples. Batch Concurrency.
- Multiple inference processes can be used without waiting time. Multiprocess Concurrency.
Resources
Thanks for reading 🙂. If some information is incorrect please let me know.