Python 3: fight for nonblocking pipe

Denis Makogon
6 min readSep 25, 2017

--

Intro

This story is all about pain and gain. I’ve been Python fan since university. Just right after C and C++ i was so damn surprised when i saw something conceptually different from static typing. Later i did a lot Python code, like literally, OpenStack helped me to grow as skillful Python developer and as gratitude i did a lot meaningful contribution to it as well as started my own pet project in Python.

There’s very special thing that all developers who’ve been using Python wanted — nonblocking IO. With help of Python Developers community was able to see Python 3 and it’s asyncio library that was designed to give us ability to develop highly scalable concurrent network applications like web servers, database binding, etc. And i can confirm things are getting even more better with newer AIO libs, faster CPython libraries like uvloop which shown nearly the same concurrency level as Golang concurrent network applications. Okay, network programming is comfortable, tools are good, am i satisfied — hell no!

Ugly stdlib, ugly STDIN

I decided to implement following thing — nonblocking STDIN pipe. So, the idea i’m pursuing is to implement async nonblocking process that awaits on STDIN from another process.

First thing i decided to look at — stdlib, everyone is aware that you can use STDIN through sys package:

import sys

if __name__ == "__main__":
print(sys.stdin.read(), flush=True)

So, STDIN here acts nothing but file (to be more precise — TextIOWrapper). More important to us: type of this object implements to major methods — read and write. Okay, i know how to read and write data into a file, my next question — is it blocks? Wrong assumption is it doesn’t, at least it shouldn’t be so, every IO device, socket or pipe is a file — there should be no problems with reading and writing to a file simultaneously without getting blocked. Here’s very bad piece of code:

import sys

if __name__ == "__main__":
while True:
print(sys.stdin.read(), flush=True)

Here’s how sys.stdin.read acts. On first read it blocks forever until process “on the other side” will write EOF. This means that you can write as much as you want to STDIN, print will write STDIN content to STDOUT ONLY when EOF appears. Okay, sys.stdin.read is BAD.

What about sys.stdin.readline? Well, key difference is that sys.stdin.read reads everything, but sys.stdin.readline reads line by line. Moreover, this type of reading is nonblocking.

import sys

if __name__ == "__main__":
while True:
print(sys.stdin.readline(), flush=True)

Thi code actually works, but the downside of it is that it splits input line by line, so in order to rebuild data that have been send you must need to know exact initial structure, but what about concurrent writes?. My verdict — it good, but it’s not. It appears that there’s no fine way to implement STDIN pipe. Keeping this though my overall verdict for sys.stdin — i’m sorry but you’re ugly.

What’s wrong with STDIN in Python? On Unix we know that everything is a file, so why in Python STDIN and socket are two different things(one blocks but another doesn’t)? Socket is nothing but network pipe — someone writes(client), someone reads(server). Starting this point i was trying to solve STDIN problem as i was trying to implement TCP server. Comparing Python’s socket to STDIN i found to common things between them: i can do reading/writing.

First of all i tried to implement echoing socket server and here’s what i had eventually:

import asyncio

async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
writer.write(data)
await writer.drain()
writer.close()

loop = asyncio.get_event_loop()
coro = asyncio.start_server(
handle_echo,
'127.0.0.1',
9000,
loop=loop
)
server = loop.run_until_complete(coro)

try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

Okay, we have here a server that listens TCP socket for incoming data and sends it back. So, you might ask me how all this related to STDIN pipe? It appears that event loop has very interesting method — AbstractEventLoop.add_reader.

Again, i was very hyped when i found that you can add file reader that designed to be a callback to any file IO operation and here how it looks like:

import asyncio

if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.add_reader(
sys.stdin.fileno(), lambda x: print(sys.stdin.read()))
loop.run_forever()

I thought that this also should work, it didn’t, STDIN still blocks on read. Hopefully following example worked, but it doesn’t fit my needs as well:

import asyncio

if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.add_reader(
sys.stdin.fileno(), lambda x: print(sys.stdin.readline()))
loop.run_forever()

File reader it not my case, moving on. What’s next? Please note to yourselves, this is the most important thought of this post — doing low-level code is not always a good thing, especially when everybody trying to be so low level, so hard core…

Asyncio transport, protocol, lie.

Going back to socket example, they way it’s implemented doesn’t let you understand how is it possible to do a transition from sockets to pipes. At this point i found myself out of variants. I’ve spent a month(month of being angry and somehow desperate) or so to figure out what can i do more to accomplish my task.

Few days ago i start googling again, but still wasn’t successful. I decided to go over source code of Python trying to understand why STDIN is so bad. I did not believe to myself when i saw what i saw — another way to implement socket server, here’s an example:

class EchoServer(asyncio.Protocol):

def connection_made(self, transport):
self.transport = transport
print(transport.get_extra_info('peername'), file=sys.stderr, flush=True)
super(EchoServer, self).connection_made(transport)

def data_received(self, data):
print('received {!r}'.format(data), file=sys.stderr, flush=True)
self.transport.write(data)
print('sent {!r}'.format(data), file=sys.stderr, flush=True)
super(EchoServer, self).data_received(data)

def eof_received(self):
if self.transport.can_write_eof():
self.transport.write_eof()
super(EchoServer, self).eof_received()

def connection_lost(self, exc):
super(EchoServer, self).connection_lost(exc)


if __name__ == "__main__":
host_port = ('127.0.0.1', 9000)
loop = asyncio.get_event_loop()
factory = loop.create_server(EchoServer, *host_port)
server = loop.run_until_complete(factory)
try:
loop.run_forever()
finally:
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

Main feature for developers here is that you have certain layer of flexibility to define handlers for different stages of socket IO events. Why this was so interesting to me. First of all — transport, it hides actual transport type (socket, file, pipe, etc.), all you need to know that you can do read/write on transport, that’s all. Okay, so maybe i can do the same for STDIN pipe?

asyncio docs lie!

asyncio currently implements transports for TCP, UDP, SSL, and subprocess pipes. The methods available on a transport depend on the transport’s kind.

asyncio provides base classes that you can subclass to implement your network protocols

This is totally lie. And here’s why, it appears that asyncio.Transport can be whatever you want until it implements read/write interface, also docs says that you don’t have to create your own transports — use platform specific (still don’t sure how to react on this), asyncio.Protocol represents everything that works as file, moreover, docs says that there’s no need to use asyncio.Protocol as parent class, i.e. purpose of this class/abstraction is to a some kind of hint or note for developers.

Okay, seems to be clear at this point, we have Transport and transport-bound Protocol, the only problem that stays there — socket host and port. So, we know that protocol is responsible for data processing that goes through transport, the only thing left is to define that transport (protocol may be left as is), here’s what asyncio offers:

asyncio.BaseEventLoop.connect_read_pipe(self, protocol_factory, pipe)
Register read pipe in event loop. Set the pipe to non-blocking mode.

Shocking! I was so surprised like never before. I was like: “Seriously? After month of time wasting?”. This function actually does real magic and here how it looks:

import asyncio
import sys
import uvloop
class MyProtocol(asyncio.Protocol):

def connection_made(self, transport):
print('pipe opened', file=sys.stderr, flush=True)
super(MyProtocol, self).connection_made(transport=transport)

def data_received(self, data):
print('received: {!r}'.format(data), file=sys.stderr, flush=True)
print(data.decode(), file=sys.stderr, flush=True)
super(MyProtocol, self).data_received(data)

def connection_lost(self, exc):
print('pipe closed', file=sys.stderr, flush=True)
super(MyProtocol, self).connection_lost(exc)

if __name__ == "__main__":
with open("/dev/stdin", "rb", buffering=0) as stdin:
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
loop = asyncio.get_event_loop()
try:
stdin_pipe_reader = loop.connect_read_pipe(MyProtocol, stdin)
loop.run_until_complete(stdin_pipe_reader)
loop.run_forever()
finally:
loop.close()

This code is almost equal to:

if __name__ == "__main__":
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
loop = asyncio.get_event_loop()
try:
stdin_pipe_reader = loop.connect_read_pipe(MyProtocol, sys.stdin)
loop.run_until_complete(stdin_pipe_reader)
loop.run_forever()
finally:
loop.close()

And you know what makes them special? They both make STDIN nonblocking for reading. You may wonder how it works? Basically, no matter what kind of files you use, when someone writes data into a file kernel creates an event and there’s a event processing mechanism (C EPOLL, i.e event loop) that knows set of event processors and polls kernel for IO events, if match found event loop triggers “callback” to this event, this is key concept of event-driven programming. Particular to my case, when someone writes data into STDIN kernel generates event (with corresponding information useful for callback) that has a callback defined (bound by our protocol) in event loop.

As conclusion i want to say that asyncio brought a lot useful features to network programming with awesome UX and design. But there’s huge place for contribution, especially for docs that are not so explicit or not going into edge cases.

Solution i’ve made shows key feature of event-driven programming — how to make truly nonblocking IO, no matter what we’re talking about: sockets, files, pipes.

--

--