Understanding Non Blocking I/O with Python — Part 1

Dated: May 31, 2015

As someone working with the web stack and languages like Python or Ruby, there are high chances that you have heard of Non Blocking I/O. You might as well be using it with some of your projects or have tried your hands with libraries like Gevent or Tornado. But how do these libraries make network requests non blocking. This is something that I had always wondered when I tried Gevent. I just couldn’t get my head around the fact that when you send something to a socket or receive from it, it will block execution for the at least the amount of time it takes to transmit the data. So how do I make it possible to execute something else while I/O is happening? So I started digging in, trying to understand how to make some network request non blocking in Python.

With this article series, I will try to introduce the topic and go in as much detail as possible.

What is Non Blocking I/O?

So first lets see what is Blocking? A function is blocking if it has to wait for something to complete. Yes, every function is blocking — no matter if you are doing I/O or doing CPU task. Everything takes some time. If a function is doing some task which is making the CPU work, then it is blocking the function from returning. Similarly, if a function is trying to get something from the database, then it is going to wait for the result to come and will block until then to continue the processing. But it so happens that the server is not making any use of the CPU while it is waiting for the database to send the response.

So if a function is blocking (for whatever reasons), it is capable of delaying execution of other tasks. And the overall progress of the entire system may get suffered. If the function is blocking because it is doing some CPU task, well then we cannot do much. But if it is blocking because of I/O, we know that the CPU is idle and can be used for starting another task that needs CPU.

Lets see an example of blocking network request. I have a very simple TCP server written in Python:

import socket
import sys

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

port = 1234 if len(sys.argv) == 1 else int(sys.argv[1])
sock.bind(('localhost', port))
sock.listen(5)

try:
while True:
conn, info = sock.accept()

data = conn.recv(1024)
while data:
print data
data = conn.recv(1024)
except KeyboardInterrupt:
sock.close

And here is a simple client to connect to our server:

import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # True

The above code will block for a long period of time. If there was code after the last time, it will not get executed until send method returns. What is going on here? The send() method will try to transmit all the data while the write buffer will get filled up. The kernel will put the process to sleep until the data in the buffer is transferred to destination and the buffer is empty again. When the buffer becomes empty, the kernel will wake the process up again to get the next chunk of data that is to be transferred. In short, your code will block and it will not let anything else proceed.

Lets make the above example NOT block:

import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))
sock.setblocking(0)

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # AssertionError

When you run the above client, you will notice that it did not block at all. But there is a problem with the client — it did not send all the data. socket.send method returns the number of bytes sent. When you make a socket non-blocking by calling setblocking(0), it will never wait for the operation to complete. So when you call the send() method, it will put as much data in the buffer as possible and return. As this is read by the remote connection, the data is removed from the buffer. If the buffer gets full and we continue to send data, socket.error will be raised. When you try to send data more than the buffer can accommodate, only the amount of data that can be accommodated is actually sent and send() returns the number of bytes sent. This is useful so that we can try to send the remaining data when the buffer becomes empty. Let’s try to achieve that:

import errno
import select
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))
sock.setblocking(0)

data = 'foobar\n' * 1024 * 1024
data_size = len(data)
print 'Bytes to send: ', len(data)

total_sent = 0
while len(data):
try:
sent = sock.send(data)
total_sent += sent
data = data[sent:]
print 'Sending data'
except socket.error, e:
if e.errno != errno.EAGAIN:
raise e
print 'Blocking with', len(data), 'remaining'
select.select([], [sock], []) # This blocks until

assert total_sent == data_size # True

In the above example, we make sure that we keep trying to send the remaining data as long as we have not sent all of it. When the write buffer is full and cannot accommodate more data, EAGAIN error is raised asking us to try again. If you examine the exception object, the exception message is “Resource temporarily unavailable”. So we keep trying to send the remaining data until we have sent it all.

Understanding select()

The last line of the above example introduces the select module. select module helps us with dealing with multiple file descriptors at once. The select module includes implementations of select, poll, epoll and kqueue, which are used by libraries like eventlet, twisted, tornado and others. We will look at them later in the coming articles of this series. Since we made our socket non-blocking, we don’t know when can we actually write to it unless we keep trying to write to it and expect it to not fail. This is a major waste of CPU time. In the above example, we call the select() function to avoid exactly that.

select() expects three arguments - list of file descriptors to watch for reading, list of file descriptors to watch for writing and list of file descriptors to watch for errors. Timeout can be passed as an optional 4th argument which can be used to prevent select() from blocking indefinitely. It returns a subset of all the three lists passed in the same order i.e. all the file descriptors that are ready for reading, writing or have caused some error.

We call the select() function and pass it file descriptors asking it to tell us which of these are ready for reading or writing. In the above example, select() blocks if there is no file descriptor that is ready to work with. You might say that this is still blocking the execution of our program but this is just the foundation for building better things. As of now, select() will just block until our sock object becomes writeable again. If we remove that line, our script will continue to work but a lot more useless while loop iterations will be run as most of them will result in exceptions.

But how does select() really work? Well, select() is nothing but an interface to the Unix select() system call. And its pretty easy to understand as the usage does not differ so much from the Python interface. For the curious ones, you can read more about in the man page for select and at these links:

Introduction to event loops for network events

Now that we understand select better, lets make use of it to do better than our last example where we actually make use of making a socket non-blocking. We are going to make use of generators to make sure that our script does not block execution of other things and let other code proceed as well. Consider this example:

import errno
import select
import socket
import time


def other_task():
i = 0
while i < 2000:
i += 1
print i
time.sleep(0.02)
yield


def send_data_task(port, data):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', port))
sock.setblocking(0)

data = (data + '\n') * 1024 * 1024
print 'Bytes to send: ', len(data)

total_sent = 0
while len(data):
try:
sent = sock.send(data)
total_sent += sent
data = data[sent:]
print 'Sending data'
except socket.error, e:
if e.errno != errno.EAGAIN:
raise e
yield ('w', sock)

print 'Bytes sent: ', total_sent


if __name__ == '__main__':
tasks = [
other_task(),
send_data_task(port=1234, data='foo'),
send_data_task(port=5678, data='bar'),
]

fds = dict(w={}, r={})
while len(tasks) or len(fds['w']) or len(fds['r']):
new_tasks = []
for task in tasks:
try:
resp = next(task)
try:
iter(resp)
fds[resp[0]][resp[1]] = task
except TypeError:
# this task has to be done since not
# dependent on any fd
new_tasks.append(task)
except StopIteration:
# function completed
pass

if len(fds['w'].keys()) or len(fds['r'].keys()):
readable, writeable, exceptional = select.select(
fds['r'].keys(), fds['w'].keys(), [], 0)
for readable_sock in readable:
new_tasks.append(fds['r'][fd])
del fds['r'][fd]
for fd in writeable:
new_tasks.append(fds['w'][fd])
del fds['w'][fd]
# ignore exceptional for now

tasks = new_tasks

For running this example, we are going to run two instances of servers and make our script do two things simultaneously — run a function that just increments a variable in a loop and send data to two servers. Both the servers are essentially the same. They are just running on different ports so that our script can connect to two servers at the same time.

In the above example, we have two functions send_data_task() and other_task(). send_data_task() tries to send a lot of data to a TCP server. other_task() just runs a counter. We want to execute both the functions. If we execute them one by one, then we end up blocking the one executing later. However, we can have both the functions proceed simultaneously. We make use of non-blocking sockets and generators to make two functions proceed co-operatively.

The send_data_task() function creates a non-blocking socket and tries to send data just like our previous example. The only difference is that it yields when the write buffer gets full and an exception is raised. Since we cannot send any more data through the socket any more, we can have another block of code proceed that does not depend on this function. other_task() function is a simple function for this example that just runs a counter in a loop. This function also yields after every iteration of the loop.

In case of send_data_task(), we yield when we cannot send more data since our write buffer is full. But when we yield, we also return a tuple with the type of operations on the socket (‘w’ for write, ‘r’ for read) and the socket object itself. When the execution is returned to the callee, we maintain a mapping of socket objects to the generator that returned it.

In case of other_task, we yield after every iteration. Why? If we don’t do that, the function will continue to execute until it has completed all that it has to do and our other function send_data_task() will not get a chance to proceed. So we consciously try to switch execution to another function whenever possible. Since this function does not depend on any fd or socket object, we don’t return anything when we yield. This is just a way we have designed our implementation - ugly as it may look but it keeps things simple to understand.

In our main block, we maintain a list of functions that we want to call in a list called tasks. To be precise, both our functions use yield and hence return generators when we call them. So tasks actually maintains a list of generators returned by the functions we want to co-operatively execute. We run a loop as long as our tasks don’t complete their execution. On every iteration of the loop, we run each task one-by-one using the next() function. The function resumes its execution and yields whenever it can.

The while loop runs as long as tasks list is not empty or we have any fds or socket objects to watch. We run every task one-by-one. When we call send_data_task, it yields a tuple with the operation (reading or writing) we were performing on the socket and the socket object itself. We keep the socket object in a dictionary called fds where we maintain two different dictionaries of objects - one for those which we are writing to and another for those we are reading from. Then we run the other_task() and it yields nothing.

The execution gets returned to the main block. After executing the tasks, we see if there are any socket objects or fds that we need to watch using select() and call it accordingly. select() returns a subset of sockets/fds that can be read from or written to. If we can read from a or write to any socket/fd, we look for the corresponding generator in the corresponding dictionary in fds and append it to a new list of tasks that shall be executed in the next iteration of our main while loop. Finally, we just replace tasks with new_tasks so that the while loops picks up the new tasks.

This keeps on running until we have no tasks left in the tasks list and no more sockets/fds to watch. And this way, our functions co-operatively let each other proceed. The example is perhaps ugly and not even close to real-world usage but its simple and it gets the idea across.

That large while loop that you see in our main block is our implementation of event loop for our script. What does it do? It just watches for network events and schedules corresponding code blocks to run as and when they can. However, our implementation is a very simple implementation and it surely does not handle the most common things. It is just enough for undersanding the concept.

Examples

All the code examples in this article can be found here.

What’s next?

That was an introduction on how you can make sockets non-blocking and use select function from the select module to watch file descriptors for reading or writing. We understood how select() works, made a script that does network I/O not block using non-blocking sockets and made use of generators and select() to implement a very simple I/O loop.

In the next article, we will look at more examples and look at other infrastructure for handling non-blocking sockets such as poll and epoll.


Originally published at vaidik.in.