Cross-thread event dispatching in python

This article will discuss dispatching events across threads in python 2, using a parallel file downloading utility as a working example. Before we dive into that let’s spend a couple of minutes talking about python 2 and threads in general. Mention the two things together in a sentence and many developers will respond with puzzlement. “Python isn’t multi-threaded,” they say, ending with an invocation of the dreaded GIL, or Global Interpreter Lock. This is incorrect. Python is multi-threaded. What python is not is concurrent. There’s a difference.

Python can run lots of threads, but because of the GIL the python interpreter can only run one thread at a time. That means that if a thread wants to do work it has to acquire the interpreter lock, which it can only do when another thread doesn’t already have it. The GIL is the synchronization primitive that serializes requests to the interpreter from different threads. So splitting some work into multiple threads doesn’t actually get it done any faster. We can see this clearly in a simple example:

import numpy.random
def sort_arrays(a1, a2):
sorted(a1)
sorted(a2)
test_arr_1 = numpy.random.randint(0,high=1000,size=1000000)
test_arr_2 = numpy.random.randint(0,high=1000,size=1000000)
%timeit sort_arrays(test_arr_1, test_arr_2)

1 loops, best of 3: 858 ms per loop

This snippet just creates two arrays of one million random integers each and then sorts them. The timer reports a best-of-3 time of 858 ms to perform the sorts. Looking at core usage with mpstat while the program is running we can get a picture of CPU usage:

04:20:15 PM CPU %usr ... %idle
04:20:16 PM all 26.18 ... 72.82
04:20:16 PM 0 3.00 ... 97.00
04:20:16 PM 1 97.00 ... 0.00
04:20:16 PM 2 2.00 ... 98.00
04:20:16 PM 3 2.00 ... 98.00

Core 1 is almost completely saturated, but we have three full cores sitting there damn near idle. I bet we can get those arrays sorted a lot faster!

import numpy.random
from threading import Thread
def sort_arrays_t(a1, a2):
def sort_array(a):
sorted(a)

t = Thread(target=lambda: sort_array(a2))
t.start()
sorted(a1)
t.join()
return
test_arr_1 = numpy.random.randint(0,high=1000,size=1000000)
test_arr_2 = numpy.random.randint(0,high=1000,size=1000000)
%timeit sort_arrays_t(test_arr_1, test_arr_2)
1 loops, best of 3: 892 ms per loop

Ouch. That didn’t help. Let’s look at the core usage during “concurrent” sorting:

12:54:12 AM CPU %usr ... %idle
12:54:13 AM all 24.37 ... 74.12
12:54:13 AM 0 2.02 ... 95.96
12:54:13 AM 1 50.50 ... 47.52
12:54:13 AM 2 40.38 ... 58.65
12:54:13 AM 3 2.04 ... 95.92

What the heck is going on here? The parallel version utilizes two cores at roughly half capacity and the overall run time is slower than the sequential version. The reason for this is pretty simple: python delegates thread scheduling to the host operating system. That means the host, linux in this case, is free to follow its normal scheduling algorithm, and that results in each of these active threads getting assigned to a different idle core. However, in order to execute python bytecode each thread has to first acquire the interpreter lock. This effectively throttles the two threads so that only one can run at a time. If we created a third we would see that each thread now gets 33% of CPU time, and so on. To make it worse acquiring the GIL is an expensive operation in its own right, which is why overall the program runs slower using threads. Thanks Global Interpreter Lock!

Python is multi-threaded. What python is not is concurrent. There’s a difference.

Clearly python is not concurrent. If it were we would expect the process to take something like half as long on a multi-core system where no context switching is required. So does that mean that multiple threads are of no value in python? It does not. There is one specific situation where multiple threads are very valuable indeed, and that is when we need to wait on a long-running I/O operation, essentially doing nothing until it completes. Consider the following example:

At time 0 in this graph the main thread receives a request over a network connection. It makes a call to a data access layer which in turn queries the database using a blocking call. Connections 2 and 3 cannot be serviced until that blocking call returns and a response is sent on connection 1. This is generally considered a bad thing, and developers working on server software will strive mightily to avoid this situation. One way to avoid it is to create a new thread for each connection (or more typically, to assign a thread from a managed pool).

Even though python is not concurrent and will not allow these threads to execute at the same time the reality is that they spend most of their time waiting on the database, and while blocked waiting for a response they yield and allow other threads to run. So this strategy definitely increases the number of connections that can be handled concurrently, and is the approach used by mainstream http daemons like apache and python http server packages like flask.

There’s a problem with this kind of threaded server design, though: threads in linux have a fair amount of overhead. This overhead includes the memory that needs to be allocated for the thread’s stack and any thread-local storage. By some accounts this can be as much as 8 MB of RAM allocated per thread. That’s not very much if all you need to handle is a few connections. Scale that up to thousands of connections and it starts to become an issue. When running servers at scale the amount of RAM you need affects costs in a very significant way. Rather than having all these expensive threads waiting on connections to databases or other resources, wouldn’t it be nice if we could use just one thread, kicking off requests and then coming back to deal with the results when they’re available?

This pattern is often referred to as single-threaded event-driven I/O. It’s single threaded because it, well, uses one thread. It’s event-driven because after kicking off a long, blocking I/O request the processing thread goes on to handle other requests, coming back to deal with the results of each only when they are available. That “when they are available” bit is the event part. This sort of programming is also referred to as “asynchronous” because the individual processing steps needed to complete a request do not necessarily happen in sequential order. It is the underlying pattern behind high performance networking libraries like twisted, tornado, and node.js.

Which brings us back to the title of this article. In order to build an example of the above pattern we will make a very simple parallel file downloader. To make it work we need three things:

  1. A main thread to simulate the server. We won’t handle any incoming connections. Instead we will take input from a text file just to keep it simple.
  2. At least one worker thread, started by the main thread, which does the actual downloading. This simulates our long-running, blocking database I/O in the illustrations above.
  3. Some means of communicating results from the worker thread back to the main thread when I/O completes. Results might mean a file successfully downloaded, or it might mean an error.

To feed data to our main thread we will grab lines from a text file:

$ cat test_input.txt
http://download.geonames.org/export/dump/CN.zip
http://download.geonames.org/export/dump/ID.zip
http://download.geonames.org/export/dump/IN.zip
http://download.geonames.org/export/dump/IR.zip
http://download.geonames.org/export/dump/NO.zip
$

These urls point to some files at geonames.org that happen to be the right size for a demo. If you don’t know Geonames.org they are a great organization that provides a lot of excellent geographical information for free. The following code reads the file and does a little prep on the contents:

input_file = u"test_input.txt"
if __name__ == “__main__”:
# open the input file, read the urls, for each clean up the
# text, determine the local filename, etc.
with open(input_file, “r”) as f:
for url in f:
clean_url = url.strip()
short_url = ‘/’.join(clean_url.split(‘/’)[:-1])
local_name = clean_url.split(‘/’)[-1]
print “Downloading {} from {} …”.format(local_name, short_url)

To this we have to add three things: some code to start the worker threads, a place to receive events from them, and a loop to process those events as they come in. Here is the completed code for the main thread:

from Queue import Queue
import threading
if __name__ == “__main__”:
# a queue to receive events for the main thread
msg_queue = Queue()
# track the thread count against this baseline
thread_count = threading.active_count()
# open the input file, read the urls, for each clean up the
# text, determine the local filname, and then kick off a worker
with open(input_file, “r”) as f:
for url in f:
clean_url = url.strip()
short_url = ‘/’.join(clean_url.split(‘/’)[:-1])
local_name = clean_url.split(‘/’)[-1]
print “Downloading {} from {} …”.format(local_name, short_url)
t = threading.Thread(target=download, args=(clean_url,
local_name, msg_queue))
t.start()
# keep processing events in the message queue until the thread
# count returns to the baseline
while threading.active_count() > thread_count:
event = msg_queue.get()
event()
print “All downloads completed!”

The first thing we do is create an instance of the Queue type and store it in the msg_queue variable. Queues are a thread-safe python data structure that implement an exclusive FIFO queue. Only one thread can access the structure at a time, and since it can store any python object it can store callables, which will be important when we get to dispatching events into it.

We then grab the current active thread count and store it in a variable for later reference. We’ll use it to coordinate our main thread so that it keeps processing events as long as any of its worker threads are living. There are other, and probably more robust ways to do this, but a simple approach works here.

In the url processing loop that was introduced above we have added code to create a new thread (type threading.Thread). To the constructor’s ‘target’ argument we pass the name of the method (as yet unwritten) that we want it to call with the new thread, in this case we’ll call it ‘download’. We also pass a tuple of arguments. These are the url we want to download, the local file name we want the content written to, and a reference to the queue we created above. The Thread.start() method is then called which will cause the thread to be created with our download method as the entry point. In this simple loop we create one thread for each url in the file, meaning that if someone puts 10,000 urls in the input file we will get 10,000 threads. In actual practice we would use a managed thread pool.

There is one specific situation where multiple threads are very valuable indeed, and that is when we need to wait on a long-running I/O operation…

With the threads created and presumably working the main thread drops into a loop that runs until the active thread count returns to what it was when we started. All the thread does in this loop is call the blocking Queue.get() method to get an object, which it treats as a callable and then calls. This is the main “event loop” in our example. Now lets create the worker method that will do the downloading and dispatch events into the queue:

from contextlib import closing
import requests
def download(url, local_name, msg_queue):
“””
The download worker entrypoint.
“””
try:
with closing(requests.get(url, stream=True)) as response:
if not response.ok:
response.raise_for_status()
with open(local_name, ‘wb’) as local_file:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
local_file.write(chunk)
except Exception as e:
msg_queue.put(lambda: on_download_error(url, e.message))
else:
msg_queue.put(lambda: on_download_complete(url, local_name))

The first thing to note is that the signature of this method matches the target name and arguments we passed to the Thread.start() method above. The second thing to note is that we’re using the requests package, which is a really easy to use http client-side library. It will do the downloading for us. The contextlib.closing type is used to create a closure around the requests.get call, which helps to make sure the connection is released in streaming downloads like the ones we’re running. Inside this loop we just iterate the chunks of data returned in the response and write them out to the file. If the request returns an http status code other than OK we raise that in an exception.

The whole thing is wrapped in a try block and it is in the handlers for the try block that the actual event dispatching takes place. An event is dispatched to the main thread by putting a callable onto the queue for it to process. In this case we’ve defined two “events” as outcomes from the download process: on_download_complete and on_download_error. In both cases we use a lambda to create a closure around the callable and the arguments we want to pass to it, otherwise the method would get executed and its result placed onto the queue, which is not what we want. The last thing we need to do is actually define the two event handlers:

def on_download_complete(url, file_name):
“””
Callback to be fired on the main thread when the download has
completed.
“””
print “{} completed!”.format(file_name)
def on_download_error(url, error):
“””
Callback to be fired on the main thread in case of error.
“””
print “{} failed due to {}”.format(url, error)

These don’t do anything but print out some details of the event so that we can see what’s happening. And with that our very rudimentary parallel downloader is complete. You can view the full python source on github. So let’s fire it up and see what happens:

$ python download.py
Downloading CN.zip from http://download.geonames.org/export/dump
Downloading ID.zip from http://download.geonames.org/export/dump
Downloading IN.zip from http://download.geonames.org/export/dump
Downloading IR.zip from http://download.geonames.org/export/dump
Downloading NO.zip from http://download.geonames.org/export/dump
ID.zip completed!
IR.zip completed!
NO.zip completed!
IN.zip completed!
CN.zip completed!
All downloads completed!
$ ll *.zip
-rw-rw-r — 1 mark mark 20774626 Apr 11 15:56 CN.zip
-rw-rw-r — 1 mark mark 8433991 Apr 11 15:55 ID.zip
-rw-rw-r — 1 mark mark 12322642 Apr 11 15:55 IN.zip
-rw-rw-r — 1 mark mark 10570624 Apr 11 15:55 IR.zip
-rw-rw-r — 1 mark mark 15368628 Apr 11 15:55 NO.zip

While this example is extremely simple, the same basic pattern and techniques are employed in high-performance network servers, not to mention just about every GUI framework ever developed. One of the reasons the pattern occurs so often in frameworks is that, as you can see, it requires collaboration between the main thread, which is processing events in a loop, and worker threads that communicate events to the main thread via a queue. It would not be easy to bolt on an asynchronous handler for a database call, for example, onto an application where you had no control over what the main thread was doing. It could be done, potentially using timers or signals to interrupt the main thread and cause it to process events, but that is a lot more intrusive and risky than having all the pieces working together.