Understanding Non Blocking I/O with Python — Part 1

Vaidik Kapoor
May 31, 2015 · 10 min read

What is Non Blocking I/O?

So first lets see what is Blocking? A function is blocking if it has to wait for something to complete. Yes, every function is blocking — no matter if you are doing I/O or doing CPU task. Everything takes some time. If a function is doing some task which is making the CPU work, then it is blocking the function from returning. Similarly, if a function is trying to get something from the database, then it is going to wait for the result to come and will block until then to continue the processing. But it so happens that the server is not making any use of the CPU while it is waiting for the database to send the response.

import socket
import sys

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

port = 1234 if len(sys.argv) == 1 else int(sys.argv[1])
sock.bind(('localhost', port))
sock.listen(5)

try:
while True:
conn, info = sock.accept()

data = conn.recv(1024)
while data:
print data
data = conn.recv(1024)
except KeyboardInterrupt:
sock.close
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # True
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))
sock.setblocking(0)

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # AssertionError
import errno
import select
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))
sock.setblocking(0)

data = 'foobar\n' * 1024 * 1024
data_size = len(data)
print 'Bytes to send: ', len(data)

total_sent = 0
while len(data):
try:
sent = sock.send(data)
total_sent += sent
data = data[sent:]
print 'Sending data'
except socket.error, e:
if e.errno != errno.EAGAIN:
raise e
print 'Blocking with', len(data), 'remaining'
select.select([], [sock], []) # This blocks until

assert total_sent == data_size # True

Understanding select()

The last line of the above example introduces the select module. select module helps us with dealing with multiple file descriptors at once. The select module includes implementations of select, poll, epoll and kqueue, which are used by libraries like eventlet, twisted, tornado and others. We will look at them later in the coming articles of this series. Since we made our socket non-blocking, we don’t know when can we actually write to it unless we keep trying to write to it and expect it to not fail. This is a major waste of CPU time. In the above example, we call the select() function to avoid exactly that.

Introduction to event loops for network events

Now that we understand select better, lets make use of it to do better than our last example where we actually make use of making a socket non-blocking. We are going to make use of generators to make sure that our script does not block execution of other things and let other code proceed as well. Consider this example:

import errno
import select
import socket
import time


def other_task():
i = 0
while i < 2000:
i += 1
print i
time.sleep(0.02)
yield


def send_data_task(port, data):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', port))
sock.setblocking(0)

data = (data + '\n') * 1024 * 1024
print 'Bytes to send: ', len(data)

total_sent = 0
while len(data):
try:
sent = sock.send(data)
total_sent += sent
data = data[sent:]
print 'Sending data'
except socket.error, e:
if e.errno != errno.EAGAIN:
raise e
yield ('w', sock)

print 'Bytes sent: ', total_sent


if __name__ == '__main__':
tasks = [
other_task(),
send_data_task(port=1234, data='foo'),
send_data_task(port=5678, data='bar'),
]

fds = dict(w={}, r={})
while len(tasks) or len(fds['w']) or len(fds['r']):
new_tasks = []
for task in tasks:
try:
resp = next(task)
try:
iter(resp)
fds[resp[0]][resp[1]] = task
except TypeError:
# this task has to be done since not
# dependent on any fd
new_tasks.append(task)
except StopIteration:
# function completed
pass

if len(fds['w'].keys()) or len(fds['r'].keys()):
readable, writeable, exceptional = select.select(
fds['r'].keys(), fds['w'].keys(), [], 0)
for readable_sock in readable:
new_tasks.append(fds['r'][fd])
del fds['r'][fd]
for fd in writeable:
new_tasks.append(fds['w'][fd])
del fds['w'][fd]
# ignore exceptional for now

tasks = new_tasks

Examples

All the code examples in this article can be found here.

What’s next?

That was an introduction on how you can make sockets non-blocking and use select function from the select module to watch file descriptors for reading or writing. We understood how select() works, made a script that does network I/O not block using non-blocking sockets and made use of generators and select() to implement a very simple I/O loop.


Vaidik Kapoor

Thoughts on software engineering and technology

Vaidik Kapoor

Written by

Software Engineer, Building Tech at Grofers

Vaidik Kapoor

Thoughts on software engineering and technology