Python Generators/Coroutines/Async IO with examples

Alex Anto Navis L
Analytics Vidhya
Published in
14 min readApr 12, 2020

For a consulting work which I did, I used python asyncio and understood the difficulty within its simple syntax. It took a quite bit of blog reading and examples to understand how it worked. The topic is overwhelming because of its history with generators / older coroutine syntax and requires some basic knowledge on some of the concepts to understand better.

I wanted to share my learnings which will help to understand the core fundamentals with short definitions and simple code examples so that it makes it a litter easier to work with asyncio. :).

Topics

1) Introduction
2) History & How it works
3) Summary
4) Simple HTTP API Crawler Example

1. Introduction

Following is a brief overview on the terminologies used on the topics.

Concurrency vs Parallelism

  • Concurrency — ability to execute two or more tasks which can start, run and complete in overlapping time periods (can be truly parallel execution on multi-cores or time-sliced on a single core machine with the help of threads).
  • Parallelism —is about executing tasks in parallel at the same time. (e.g., at least two threads are executing simultaneously)
  • Threads — Threads helps to execute multiple tasks which helps to achieve Concurrency/Parallelism. In a single python process, thread parallelism is not possible due to Global Interpreter Lock (CPython implementation). GIL is a mutex mechanism which prevents multiple threads from executing simultaneously on Python objects. So only one thread can execute at a time out of multiple threads within a python process.
  • Multi Processes — Multiple python processes is required to achieve parallelism. multiprocessing package offers both local and remote concurrency, effectively side-stepping the GIL by using subprocesses instead of threads.

Sync Vs Async

Sync/Async is about how a computation/task is done within the context of a single Thread.

  • Sync — blocking execution of tasks. Execution of computation/task by a Thread(cpu) is blocked/waiting for an operation(IO) to complete.
    e.g. a single thread handling a http request making a DB call and waits for db response to return the http response, then pick up the next http request.
  • Async — concurrent execution of tasks. Multiple tasks can start, pause and complete independent of each other which are executed by a single thread. Async tasks don’t block operations that usually wait for IO. It is mostly helpful for IO bound(disk/network) use-cases since CPU can be utilised for other purposes while we are waiting on IO. coroutines in python helps to achieve this.
    e.g. a single thread handling a http request makes a DB call and suspend the current task for the DB response. Same thread picks up a new request and start processing.

2) History & How it works ?

Iterators:

Iterators are objects that can be iterated upon (using __iter__ and __next__) lazily.

list = [1, 2]
iterator = iter(list)
print(next(iterator))
print(iterator.__next__())
print(next(iterator))
# Output:
1
2
Raise StopIteration exception since no items to iterate

Generators:

Generators(Regular) are iterators which helps to simplify the complexities in building a custom logic iterator. It generates one value on the fly for each run (eg., lazy producer).

How to create ?

generator is any normal function with yield statement instead of a return statement

yield is a keyword that is used like return, except the function will return a generator. yield statement pauses the function saving all its states and later continues from last state on successive calls (refer more here).

import randomdef simple_generator():
yield 10
yield 100


gen = simple_generator()
print(gen)
print(next(gen))
print(gen.__next__())
try:
print(next(gen))
except StopIteration:
print('iteration stopped')
# Output:
<generator object simple_generator at 0x100fa48b8>
10
100
iteration stopped

The same example generator object can be interacted with ‘for’ loop since it is a iterator

def magic_pot(start=1, end=1000):
while True:
yield random.randint(start, end)
gen = magic_pot()
for a in gen:
print(a)
# Output: prints numbers without stopping
569
...

We can pipeline generators similar to Unix pipelines.

def read_file(file_name):
for row in open(file_name, "r"):
yield row


def read_csv_row(file_name):
for row in read_file(file_name):
yield row.split(',')


def read_csv(file_name):
for items in read_csv_row(file_name):
print("Row: " + str(items))


read_csv('test.csv')
# Output:
Row: ['col1', 'col2', 'col3\n']
Row: ['1', '2', '3\n']
Row: ['3', '4', '5']

PEP-380, (yield from is introduced to simplify pipeline of generators)

  • Addition of yield from in Python 3.3 made it easier to refactor generators as well as chain them together
# Example:def read_file(file_name):
for row in open(file_name, "r"):
yield row
"or"def read_file(file_name):
yield from open(file_name, "r")

Coroutine:

Coroutines are computer program components that generalise subroutines for nonpreemptive multitasking, by allowing multiple entry points for suspending and resuming execution at certain locations.

Coroutines helps to achieve cooperative multitasking/concurrency. They are functions/tasks which helps in concurrent execution by having the ability to pause & resume tasks within a thread. (e.g., asynchronous I/O, and other forms of event-driven programming or co-operative multitasking)

Evolving To “Generator Coroutine”:

Generators didn’t have the ability to accept arguments during execution which makes it hard to control the flow of logic inside the generator function (e.g. a coroutine A which waits on response from another http call coroutine B. Coroutine A suspends -> Coroutine B Http Complete -> Resume Coroutine A (http response from Coroutine B)).

Generators was evolved to support coroutines like ability using PEP-342(Python 2.5), which helped to pass parameters to generators to control the flow of execution. This eventually helped to achieve coroutine abilities other forms of co-operative multitasking.

PEP-342

Python’s generator functions are almost coroutines — but not quite — in that they allow pausing execution to produce a value, but do not provide for values or exceptions to be passed in when execution resumes.

generators cannot yield control while other functions are executing, unless those functions are themselves expressed as generators, and the outer generator is written to yield in response to values yielded by the inner generator. This complicates the implementation of even relatively simple use cases like asynchronous communications, because calling any functions either requires the generator to block (i.e. be unable to yield control), or else a lot of boilerplate looping code must be added around every needed function call

However, if it were possible to pass values(send()) or exceptions into a generator at the point where it was suspended, a simple co-routine scheduler or trampoline function would let coroutines call each other without blocking — a tremendous boon for asynchronous applications. Such applications could then write co-routines to do non-blocking socket I/O by yielding control to an I/O scheduler until data has been sent or becomes available. Meanwhile, code that performs the I/O would simply do something like this:

data = (yield nonblocking_read(my_socket, nbytes))

in order to pause execution until the nonblocking_read() coroutine produced a value.

Above PEP added the improvements to add send(), throw(), close() method for generators. yield was changed as expression and behaves like a bidirectional communication tool which is explained in the example below.

Example — generator with send() and yield expressionimport randomdef magic_pot(start=1, end=1000):
while True:
stop = (yield random.randint(start, end))
print("stop %s" % stop)
if stop is True:
yield "magic pot stopped"
break
gen = magic_pot()
print(gen)
print(gen.send(None)) # same as next(gen)
print("second")
print(next(gen))
print(gen.send(True)) # input value for generator
try:
print(next(gen))
except StopIteration:
print('iteration stopped')
Output:
<generator object magic_pot at 0x10ab39840>
735
second
stop None # stop is printed after the next(gen)
506
stop True
magicpot stopped
iteration stopped
In the code,
- 'yield random.randint(start, end)' returns an output which is triggered by next(gen) or gen.send(None)
- '(yield random.randint(start, end))'
captures the subsequent iteration input parameter(using gen.send(None) or gen.send(Any)) which is used to control flow inside the generator coroutine. In this example, gen.send(True) is passed to stop the coroutine.
- (yield) works bidirectional with output and input.

Coroutine vs Generator:

Coroutine is a generator that follows certain conventions.

  • coroutine yields control to another coroutine and can resume execution from the point it gives up control.
  • coroutines are data consumers while generators are data producers
  • can send values to generator based coroutines (using send() & (yield) statement) after it is initiated while regular generators can’t.

AsyncIO — Asynchronous I/O, event loop, coroutines and tasks

From Python Doc:

This module provides infrastructure for writing single-threaded concurrent code using coroutines, multiplexing I/O access over sockets and other resources, running network clients and servers. We will look at each of the topics below.

Event loop is the core of every asyncio application. Event loops run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses. Application developers should typically use the high-level asyncio functions, such as asyncio.run(), and should rarely need to reference the loop object or call its methods.

AsyncIO Generator Coroutine:

In python 3.4, generator-based coroutines is created with @asyncio.coroutine decorator using new asyncio module library. Asyncio generator coroutines use yield from syntax to suspend coroutine.

A asyncio coroutine can:

  • “yield from” another coroutine
  • “yield from” a future
  • return an expression
  • raise exception

yield from statement gives up control back to the event loop to let other coroutine execute.

  • “yield from” syntax introduced in PEP 380, instead of the original yield syntax for generator coroutine. “yield from” are used inside generators coroutines.yield from iterator or native / generator coroutines / future.
  • Enables the generator use “yield from” to call native coroutines(async def), and also enables the generator coroutine to be called by native coroutines using an await expression.
  • “yield” usage makes them as regular generators/iterators which has to be iterated and doesn’t work with asyncio methods.
import asyncio
import random

@asyncio.coroutine
def compute_coroutine(x):
yield from asyncio.sleep(random.random()) # yield from native coroutine
print(x * 2)

asyncio.run(compute_coroutine(2))
# Output
4
asyncio.run()
- starts compute_coroutine() coroutine
- suspends compute_coroutine() and start asyncio.sleep()
- resumes compute_coroutine()
- coroutines are suspended/executed with the help of asyncio event loop
--------------------------------------------------------------------@asyncio.coroutine
def coroutine_generator(x):
for i in range(0, x):
yield i
print("input=%s" % x)


asyncio.run(coroutine_generator(2))
# Output - yield inside coroutine cannot be used with asyncio
RuntimeError: Task got bad yield: 0

Native Coroutines:

async/await keywords are introduced since Python 3.5 to make grammar of coroutine programming more meaningful and the latest syntax.

  • async def return a native coroutine object. async def functions are always coroutines, even without await.
  • Regular generators return a generator object
  • asyncio.coroutine returns generator-based coroutine object
  • types.coroutine returns generator-based coroutine object
import asyncio
import types

@asyncio.coroutine
def async_gen_data():
yield 10

@types.coroutine
def types_gen_data():
yield 10


async def async_native_coroutine():
print('async_native_coroutine')


async def async_native_coroutine_generator():
print('async_native_coroutine_generator')
yield 100


print(async_gen_data())
print(types_gen_data())
print(async_native_coroutine())
print(async_native_coroutine_generator())
# Output:
<generator object async_gen_data at 0x10b6c1e58>
<generator object types_gen_data at 0x10b6c1e58>
<coroutine object async_native_coroutine at 0x10b6d9748>
<async_generator object async_native_coroutine_generator at 0x10b709f28>
We will see more on @types.coroutine / async_generator in the further sections.

await

  • “await” should be used with async def. await is used to obtain the result of a coroutine object’s execution
  • async/await is latest equivalent of @asyncio.coroutine/‘yield from’
  • await works on generator/native coroutines objects and on object with __await__ method returning an iterator.
  • Under the hood, await borrows implementation from yield from with an additional check if its argument is indeed an awaitable.
  • native/generator coroutine, future, tasks are awaitable’s
# example1: async native coroutineimport asyncio


async def f1():
print('before')
await asyncio.sleep(1)
print('after 1 sec')
asyncio.run(f1()) # runs in a event loop and execute# Output
<coroutine object f1 at 0x101d852c8>
after 1 sec
---------------------------------print(f1())# Output
RuntimeWarning: coroutine 'f1' was never (coroutine needs to be awaited)
---------------------------------
await f1()
# Output
SyntaxError: 'await' outside function (should be used inside async function/native coroutine)

Concurrent Coroutines (async/await, @asyncio.coroutine/yield from):

import asyncio
import random
@asyncio.coroutine
def compute(tid, x):
print("%s: input=%s with sleep=%s" % (tid, x, sleep))
yield from asyncio.sleep(random.random()) # async future
return x * 2
@asyncio.coroutine
def print_sum(tid, x):
result = yield from compute(tid, x) # return a value
print("%s: result=%s" % (tid, result))
async def task(tid, x):
return await print_sum(tid, x) # await a coroutine
async def main():
await asyncio.gather(
task("t1", 2),
print_sum("t2", 3),
task("t3", 4),
)
asyncio.run(main())# Output - execute 3 tasks concurrently
t1: input=2 with sleep=0.7909687238238471
t2: input=3 with sleep=0.25100171976591423
t3: input=4 with sleep=0.4164068460815761
t2: result=6
t3: result=8
t1: result=4
await asyncio.gather() - Run alls coroutines concurrently inside the event loop and gathers all the results to be returned by main()

Coroutine compute() “yield from” statement gives up control back to the event loop(pause) and resumes execution after the coroutine asyncio.sleep() has completed. Note that asyncio.sleep() is itself a coroutine and blocked coroutine resumes after the sleep. Event loop executes based on runnable coroutines and output execution order (t1, t2, t3) varies based on the delay in the example.

Event Loop:

  • Event loops use cooperative scheduling, meaning the event loop runs one Task at a time. While a Task awaits for the completion of a Future, the event loop runs other tasks, callbacks, or performs IO operations. Tasks can also be cancelled.
  • An Event loop is associated to one Thread
  • SelectorEventLoop — Event loop based on the selectors module (epoll()/kqueue()/select()) for high-level and efficient I/O multiplexing.
  • ProactorEventLoop — Event loop for windows
# event loop with multiple coroutines callsimport asyncio
import threading
from asyncio import events
async def f1():
print('tid:%s - before' % threading.get_ident())
await asyncio.sleep(1)
print('tid:%s - after 1 sec' % threading.get_ident())
def run(fn):
loop = events.new_event_loop()
loop.run_until_complete(fn)
loop.close()
print('tid:%s - start' % threading.get_ident())
# creates 2 new event loop for each run()
run(f1())
run(f1())
# Output
tid:4638019008 - start
tid:4638019008 - before
tid:4638019008 - after 1 sec
tid:4638019008 - before
tid:4638019008 - after 1 sec
Event loop uses only unix thread(tid is the same in all outputs) to schedule tasks for each run() which created new_event_loop() every-time.

Futures

A Future is a special low-level awaitable object that represents an eventual result of an asynchronous operation. Future object is awaited it means that the coroutine will wait until the Future is resolved in some other place

import asyncioasync def num_calc(name, number):
f = 1
for i in range(2, number + 1):
await asyncio.sleep(1)
f *= i
print(f"Task {name}: multiplier({number}) = {f}")
return f
async def main():
# schedule calls concurrently & gathers all async future results
results = await asyncio.gather(num_calc("A", 2), num_calc("B", 3), num_calc("C", 4) )
print(results)
asyncio.run(main())# Output
Task A: multiplier(2) = 2
Task B: multiplier(3) = 6
Task C: multiplier(4) = 24
[2, 6, 24]
# Results [2, 6, 24] are printed based on await on the futures

Coordination between Futures

import asyncio
from asyncio import Future


async def child_future(future):
print("child sleep")
await asyncio.sleep(1)
print("child woke")
future.done()
future.set_result("future is resolved")
return "child complete"


async def parent(future):
print("parent wait for child")
print(await future)
return "parent complete"


async def main():
future = Future()
print(await asyncio.gather(parent(future), child_future(future)))


asyncio.run(main())
# Output : parent waits for child to complete the task
parent wait for child
child sleep
child woke
future is resolved
['parent complete', 'child complete']

Tasks:

Tasks is a subclass of Future. Tasks are used to schedule coroutines concurrently in event loop. Coroutine is wrapped into a Task with functions like asyncio.create_task() the coroutine is automatically scheduled to run soon. Task has add_done_callback() to handle further code cleanup/done logic.

import asyncio


async def nested():
return 42


def result_callback(future):
print("Callback: %s" % future.result())


async def main():
# Schedule nested() to run soon concurrently with "main()".
task = asyncio.create_task(nested())
task.add_done_callback(result_callback)

# "task" can now be used to cancel "nested()", or
# can simply be awaited to wait until it is complete:
await task


asyncio.run(main())
# Output
Callback: 42
Prints callback result after the task is complete.

@types.coroutine:

Decorator to flag a generator as a coroutine. Very similar to asyncio.coroutine() and deserves less attention. It allows interoperability between existing generator-based coroutines in asyncio and native coroutines(async).

import typesasync def magic_pot(db):
return 1
@types.coroutine
def process_magic_pot():
data = yield from magic_pot(db)
print(data)
pmp = process_magic_pot()
print(magic_pot())
print(pmp)
asyncio.run(pmp)
# Output:
<coroutine object magic_pot at 0x107ed5c48>
<generator object process_magic_pot at 0x107f542a0>
1

Asynchronous Generators:

‘yield’ inside a native coroutine returns a asynchronous generator

Rationale: (from PEP 255)Regular generators enabled an elegant way of writing complex data producers and have them behave like an iterator. However, currently there is no equivalent concept for the 
asynchronous iteration protocol (async for).
This makes writing asynchronous data producers unnecessarily complex, as one must define a class that implements __aiter__ and __anext__ to be able to use it in an async for statement.def func(): # a function
return

def genfunc(): # a generator function
yield
async def coro(): # a coroutine function
await smth()
async def read(db): # a coroutine function without await
pass
async def asyncgen(): # an async generator coroutine function
await smth()
yield 42
The result of calling an asynchronous generator function is an asynchronous generator object, which implements the asynchronous iteration protocol defined in PEP 492.

Generators with async/await is supported with a new asynchronous iteration protocol

Asynchronous Iteration Protocol

  1. An __aiter__ method returning an asynchronous iterator.
  2. An __anext__ method returning an awaitable object, which uses StopIteration exception to “yield” values, and StopAsyncIteration exception to signal the end of the iteration.
async def async_generator():
for i in range(2):
await asyncio.sleep(1)
yield i * i


async def main():
gen = async_generator()
print(gen)
print(await gen.__anext__()) # await for the 'yield'
print(await gen.__anext__())
await gen.aclose() # close the generator for clean-up

asyncio.run(main())

# Output
0
1
'yield' has to be awaited until exhausted to close the async generator in a clean way. Otherwise, it can cause task 'shutdown' exceptions.

async for:

Simplifies the iteration of asynchronous generator (yield).

# combination of using async generator and coroutineasync def main():
async for i in async_generator():
print(i)

loop = asyncio.get_event_loop()
asyncio.ensure_future(main())
asyncio.ensure_future(main())
loop.run_forever()
# Output
0
0
1
1

3. Summary

  • ‘yield’ keyword create regular generators. ‘yield from’ is a shortcut to iterate and exhaust the generator
  • generator.send() helps to send values to a coroutine. ‘(yield)’ is bi-directional for iteration between coroutines.
  • Event loops use cooperative scheduling. Concurrent tasks are scheduled in an Event Loop managed by a Thread.
  • @asyncio.coroutine creates asyncio generator coroutine objects and uses ‘yield from’ to work on native/generator coroutines. yield from statement gives up control back to the event loop to let other coroutine execute.
  • @types.coroutine is similar to @asyncio.coroutine which helps to convert regular generators to coroutine objects and interoperable between native coroutines.
  • asyncio is a library to write concurrent code using the async/await syntax.
  • coroutines declared with async/await syntax is the preferred way of writing asyncio applications.
  • async/await is latest equivalent of @asyncio.coroutine/‘yield from’
  • native/generator coroutine, future, tasks are awaitable’s
  • Future is a special low-level awaitable object that represents an eventual result of an asynchronous operation.
  • Tasks is a subclass of Future. Tasks are used to schedule coroutines concurrently in event loop for execution.
  • async for’ is used to iterate over asynchronous generator
  • async with’ is used to do async operations with cleanup(garbage collected)

4. Simple HTTP API Crawler Example

# Simple example to crawl http urls in parallelimport aiohttp
import asyncio
import time


async def get_req(page_no):
print("called at time1: " + str(time.time()))
async with aiohttp.ClientSession() as session:
async with session.get("http://reqres.in/api/users?page=" + str(page_no), headers={}) as resp:
print("called at time2: " + str(time.time()))
return await resp.json()


async def fetch_all_urls():
results = await asyncio.gather(*[get_req(page_no) for page_no in range(1, 5)], return_exceptions=True)
# results = [await get_req(page_no) for page_no in range(1, 5)]
for result in results:
print('page: %s, size: %s' % (result['page'], len(result['data'])))
return results


def get_htmls():
loop = asyncio.get_event_loop()
htmls = loop.run_until_complete(fetch_all_urls())
return htmls


start = time.time()
print("start time: " + str(start))
get_htmls()
print("time taken: " + str(time.time() - start))
# Output:
start time: 1583848827.362924
called at time1: 1583848827.363832
called at time1: 1583848827.365168
called at time1: 1583848827.365639
called at time1: 1583848827.365942
called at time2: 1583848827.625053
called at time2: 1583848827.6382
called at time2: 1583848827.656707
called at time2: 1583848827.981515
page: 1, size: 6
page: 2, size: 6
page: 3, size: 0
page: 4, size: 0
time taken: 0.6193337440490723
# Uncomment the bold line (replace "await asyncio.gather" line with the next commented line and check the results). Request are completed done sequentially and takes 2 seconds to complete. In this case, await iterated sequentially can lose all concurrency and thats how started this journey of understanding the asyncio.

References:

Special Thanks to all who helped me to understand the complicated concepts with their details blog/presentations. I have used some part of definitions/example references from some of the reference links.

Please let me know your feedbacks/corrections on definitions/code. If you are reading this last line, you have got a lot of patience/interest to read such a long post and thanks for that :).

--

--

Alex Anto Navis L
Analytics Vidhya

Full stack developer, x-ThoughtWorker, Love technology/science