Weak Reference and Object Management within Python Threads

Sergei
Pipedrive R&D Blog
Published in
4 min readDec 23, 2019
[Photo on fastcompany.com]

While working on the high availability of our AI services, both multiple processes and threads are born, terminated, and revived again. If a process is terminated, the allocated resources are simply freed up, but with threads the process isn’t so simple. In this article I will explain how weak references can actually be useful in these situations.

Part of our services requires reaction to realtime events which accumulate in a fault-tolerant queue. In order to handle these events we’ve developed a high-availability consumer, which is queue type agnostic.

Our queue consumer manages starting workers in individual threads or processes, monitoring them, restarting in case of termination, and detecting hung threads or processes.

All we need for a project is to define handler, which will be called on event consuming. For this project, let’s say the handler retrieves the AWS s3 path from an event, downloads the file and does some ‘AI’ stuff with its content. In order to work with AWS, there is an amazing library called boto3. When dealing with boto3, AWS recommends to create a resource instance for each thread / process in multithreaded or multiprocess application.

This means that, on one end we need to only code the event handler and ignore threads and their management, but on the other side we should create a AWS resource instance on thread creation (specifically the first handler call) and then release it on thread finishing.

To create and remove an AWS resource on each handler call is not optimal because our target is the thread start and finish, and within a thread run-time several thousands of handler calls can happen.

That’s why we use Python weakref.WeakKeyDictionary with a tweak — we place the thread object as key and the AWS resource as value. Due to a weak references feature, if key objects (thread instances) are removed by GC (garbage collector), then the reference to value (resource) will be removed as well and then GC can also delete it (if there’s no other references to the resource of course). Keep in mind that Python GC uses the reference counting technique.

Below is code chunk for how we assign s3 resources to threads:

And as we use AWS SQS as a fault-tolerant queue, we still follow AWS recommendation in addition to keeping a weak references pool of SQS clients:

The consumer’s supervisor observes threads, removing dead ones and starting new ones, which leads to a removal of boto3 objects, related to dead threads from pools.

What about thread-safety? Is it thread-safe to place items into the dictionary in concurrent threads. The answer is both ‘yes’ and ‘no’. It isn’t thread-safe if you try to iterate the dictionary and a concurrent thread adds new element there, but if threads just add items, then it’s fine. weakref.WeakKeyDictionary is Python’s wrapper over built-in dictionary. Python guarantees atomics for methods in built-in primitives and adding items to dictionary is also atomic. The weakref.WeakKeyDictionary isn’t built-in, it’s a Python code. We investigated its source code, and didn’t find reasons why it can’t be thread-safe, and we additionally didn’t meet any problems in usage. We should also look into weakref.WeakKeyDictionary source code in order to make sure that it is also thread-safe.

Code to add a new item:

def __setitem__(self, key, value):
self.data[ref(key, self._remove)] = value

As you see in the example, it just adds a new element to the built-in dictionary because self.data = {}. There is a trick though where it wraps a key to a weak reference and registers the callback self._remove, which is called if the key is removed by GC. In our case, it happens when the thread is finished and removed from a pool by a supervisor.

Let’s look to self._remove in order to ensure its thread-safety:

def remove(k, selfref=ref(self)):
self = selfref()
if self is not None:
if self._iterating:
self._pending_removals.append(k)
else:
del self.data[k]
self._remove = remove

As you can see, the remove function also does atomic things (del self.data[k] by default).

Now that we understand how weak reference callback is created and called in WeakKeyDictionary, we can extend it in order to do extra things on thread removing. Let’s imagine that we need to start a server on thread creation and stop it on finishing. Then a code can be reworked like this:

Let’s also make sure that it can work. Just change the code a bit to make it more visual:

And after the call:

$ python script.pythread is done
pool size is 1
before callback
callback is called
resource is deleted
after callback
pool size is 0

It works even if thread raises exception:

Call result:

$ python script.pyException: BOOM!
pool size is 1
before callback
callback is called
resource is deleted
after callback
pool size is 0

Weak references are a powerful feature which can be used in different places in order to avoid memory management problems. If you have begun interest in it, you also can look into how to avoid memory leaks with circular references in Python.

--

--

Sergei
Pipedrive R&D Blog

Software Engineer. Senior Backend Developer at Pipedrive. PhD in Engineering. My interests are IT, High-Tech, coding, debugging, sport, active lifestyle.