How does AsyncIO works?

Tusamma Sal Sabil
14 min readAug 10, 2021

--

What is asyncio?

Asyncio stands for asynchronous input output and refers to a programming paradigm which achieves high concurrency using a single thread or event loop. Asynchronous programming is a type of parallel programming in which a unit of work is allowed to run separately from the primary application thread. When the work is complete, it notifies the main thread about completion or failure of the worker thread.

Let’s have a look in below image:

Let’s understand asyncio with an example:

To understand the concept behind asyncio, let’s consider a restaurant with a single waiter. Suddenly, three customers, A, B and C show up. The three of them take a varying amount of time to decide what to eat once they receive the menu from the waiter.

Let’s assume A takes 5 minutes, B 10 minutes and C 1 minute to decide. If the single waiter starts with B first and takes B’s order in 10 minutes, next he serves A and spends 5 minutes on noting down his order and finally spends 1 minute to know what C wants to eat. So, in total, waiter spends 10 + 5 + 1 = 16 minutes to take down their orders. However, notice in this sequence of events, C ends up waiting 15 minutes before the waiter gets to him, A waits 10 minutes and B waits 0 minutes.

Now consider if the waiter knew the time each customer would take to decide. He can start with C first, then go to A and finally to B. This way each customer would experience a 0 minute wait. An illusion of three waiters, one dedicated to each customer is created even though there’s only one.

Lastly, the total time it takes for the waiter to take all three orders is 10 minutes, much less than the 16 minutes in the other scenario.

Let’s go through another example:

Suppose, Chess master Magnus Carlsen hosts a chess exhibition in which he plays with multiple amateur players. He has two ways of conducting the exhibition: synchronously and asynchronously.

Assumptions:

  • 24 opponents
  • Magnus Carlsen makes each chess move in 5 seconds
  • Opponents each take 55 seconds to make a move
  • Games average 30 pair-moves (60 moves total)

Synchronously: Magnus Carlsen plays one game at a time, never two at the same time, until the game is complete. Each game takes (55 + 5) * 30 == 1800 seconds, or 30 minutes. The entire exhibition takes 24 * 30 == 720 minutes, or 12 hours.

Asynchronously: Magnus Carlsen moves from table to table, making one move at each table. He leaves the table and lets the opponent make their next move during the wait time. One move on all 24 games takes Judit 24 * 5 == 120 seconds, or 2 minutes. The entire exhibition is now cut down to 120 * 30 == 3600 seconds, or just 1 hour

There is only one Magnus Carlsen, who has only two hands and makes only one move at a time by himself. But playing asynchronously cuts the exhibition time down from 12 hours to one.

Coding Example:

Let try to demonstrate Synchronous and Asynchronous execution time using code snippet.

Asynchronous — async_count.py

import asyncio  
import time


async def count():
print("One", end=" ")
await asyncio.sleep(1)
print("Two", end=" ")
await asyncio.sleep(2)
print("Three", end=" ")


async def main():
await asyncio.gather(count(), count(), count(), count(), count())


if __name__ == "__main__":
start_time = time.perf_counter()
asyncio.run(main())
end_time = time.perf_counter()
execution_time = end_time - start_time
print(f"\nExecuting - {__file__}\nExecution Starts: {start_time}\nExecutions Ends: {end_time}\nTotals Execution Time:{execution_time:0.2f} seconds.")

Asynchronous — Output:

One One One One One Two Two Two Two Two Three Three Three Three Three 
Executing - async_count.py
Execution Starts: 18453.442160108
Executions Ends: 18456.444719712
Totals Execution Time:3.00 seconds.

Synchronous — sync_count.py

import time  


def count():
print("One", end=" ")
time.sleep(1)
print("Two", end=" ")
time.sleep(2)
print("Three", end=" ")


def main():
for _ in range(5):
count()


if __name__ == "__main__":
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
execution_time = end_time - start_time
print(f"\nExecuting - {__file__}\nExecution Starts: {start_time}\nExecutions Ends: {end_time}\nTotals Execution Time:{execution_time:0.2f} seconds.")

Synchronous — Output:

One Two Three One Two Three One Two Three One Two Three One Two Three 
Executing - sync_count.py
Execution Starts: 18875.175965998
Executions Ends: 18890.189930292
Totals Execution Time:15.01 seconds.

Why use asyncio instead of multithreading in Python?

  • It’s very difficult to write code that is thread safe. With asynchronous code, you know exactly where the code will shift from one task to the next and race conditions are much harder to come by.
  • Threads consume a fair amount of data since each thread needs to have its own stack. With async code, all the code shares the same stack and the stack is kept small due to continuously unwinding the stack between tasks.
  • Threads are OS structures and therefore require more memory for the platform to support. There is no such problem with asynchronous tasks.

How does asyncio works?

Before going deep let’s recall Python Generator

Python Generator:

Functions containing a yield statement are compiled as generators. Using a yield expression in a function’s body causes that function to be a generator. These functions return an object which supports the iteration protocol methods. The generator object created automatically receives a __next()__ method. Going back to the example from the previous section we can invoke __next__ directly on the generator object instead of using next():

def asynchronous():
yield "Educative"
if __name__ == "__main__":
gen = asynchronous()
str = gen.__next__()
print(str)

Remember the following about generators:

  • Generator functions allow you to procrastinate computing expensive values. You only compute the next value when required. This makes generators memory and compute efficient; they refrain from saving long sequences in memory or doing all expensive computations upfront.
  • Generators, when suspended, retain the code location, which is the last yield statement executed, and their entire local scope. This allows them to resume execution from where they left off.
  • Generator objects are nothing more than iterators.
  • Remember to make a distinction between a generator function and the associated generator object which are often used interchangeably. A generator function when invoked returns a generator object and next() is invoked on the generator object to run the code within the generator function.

States of a generator:

A generator goes through the following states:

  • GEN_CREATED when a generator object has been returned for the first time from a generator function and iteration hasn’t started.
  • GEN_RUNNING when next has been invoked on the generator object and is being executed by the python interpreter.
  • GEN_SUSPENDED when a generator is suspended at a yield
  • GEN_CLOSED when a generator has completed execution or has been closed.

Methods on generator objects:

A generator object exposes different methods that can be invoked to manipulate the generator. These are:

  • throw()
  • send()
  • close()

Let’s deep dive into more details explanations

The rules of asyncio:

  • The syntax async def introduces either a native coroutine or an asynchronous generator. The expressions async with and async for are also valid.
  • The keyword await passes function control back to the event loop. (It suspends the execution of the surrounding coroutine.) If Python encounters an await f() expression in the scope of g(), this is how await tells the event loop, "Suspend execution of g() until whatever I’m waiting on—the result of f()—is returned. In the meantime, go let something else run."

In code, that second bullet point looks roughly like this:

async def g():
# Pause here and come back to g() when f() is ready
r = await f()
return r

There’s also a strict set of rules around when and how you can and cannot use async/await. These can be handy whether you are still picking up the syntax or already have exposure to using async/await:

  • A function that you introduce with async def is a coroutine. It may use await, return, or yield, but all of these are optional. Declaring async def noop(): pass is valid:
  • Using await and/or return creates a coroutine function. To call a coroutine function, you must await it to get its results.
  • It is less common to use yield in an async def block. This creates an asynchronous generator, which you iterate over with async for. Forget about async generators for the time being and focus on getting down the syntax for coroutine functions, which use await and/or return.
  • Anything defined with async def may not use yield from, which will raise a SyntaxError.
  • Just like it’s a SyntaxError to use yield outside of a def function, it is a SyntaxError to use await outside of an async def coroutine. You can only use await in the body of coroutines.

Here are some terse examples meant to summarize the above few rules:

async def f(x):
y = await z(x) # OK - `await` and `return` allowed in coroutines
return y
async def g(x):
yield x # OK - this is an async generator
async def m(x):
yield from gen(x) # NO - SyntaxError
def m(x):
y = await z(x) # NO - SyntaxError (no `async def` here)
return y

Generator Based Coroutine

Python created a distinction between Python generators and generators that were meant to be used as coroutines. These coroutines are called generator-based coroutines and require the decorator @asynio.coroutine to be added to the function definition, though this isn’t strictly enforced.

Generator based coroutines use yield from syntax instead of yield. A coroutine can:

  • yield from another coroutine
  • yield from a future
  • return an expression
  • raise exception

Coroutines in Python make cooperative multitasking possible. Cooperative multitasking is the approach in which the running process voluntarily gives up the CPU to other processes. A process may do so when it is logically blocked, say while waiting for user input or when it has initiated a network request and will be idle for a while. A coroutine can be defined as a special function that can give up control to its caller without losing its state.

So what’s the difference between coroutines and generators?

Generators are essentially iterators though they look like functions. The distinction between generators and coroutines, in general, is that:

  • Generators yield back a value to the invoker whereas a coroutine yields control to another coroutine and can resume execution from the point it gives up control.
  • A generator can’t accept arguments once started whereas a coroutine can.
  • Generators are primarily used to simplify writing iterators. They are a type of coroutine and sometimes also called as semicoroutines.

Generator Based Coroutine Example

The simplest generator based coroutine we can write is as follows:

@asyncio.coroutine
def do_something_important():
yield from asyncio.sleep(1)

The coroutine sleeps for one second. Note the decorator and the use of yield from.

Native Based Coroutine Example

By native it is meant that the language introduced syntax to specifically define coroutines, making them first class citizens in the language. Native coroutines can be defined using the async/await syntax. The simplest native based coroutine we can write is as follows:

async def do_something_important():
await asyncio.sleep(1)

AsyncIO Design Patterns

AsyncIO comes with its own set of possible script designs, which we will discuss in this section.

1. Event loops

The event loop is a programming construct that waits for events to happen and then dispatches them to an event handler. An event can be a user clicking on a UI button or a process initiating a file download. At the core of asynchronous programming, sits the event loop.

Example Code:

import asyncio  
import random
import time
from threading import Thread
from threading import current_thread

# ANSI colors
colors = (
"\033[0m", # End of color
"\033[31m", # Red
"\033[32m", # Green
"\033[34m", # Blue
)


async def do_something_important(sleep_for):
print(colors[1] + f"Is event loop running in thread {current_thread().getName()} = {asyncio.get_event_loop().is_running()}" + colors[0])
await asyncio.sleep(sleep_for)


def launch_event_loops():
# get a new event loop
loop = asyncio.new_event_loop()

# set the event loop for the current thread
asyncio.set_event_loop(loop)

# run a coroutine on the event loop
loop.run_until_complete(do_something_important(random.randint(1, 5)))

# remember to close the loop
loop.close()


if __name__ == "__main__":
thread_1 = Thread(target=launch_event_loops)
thread_2 = Thread(target=launch_event_loops)

start_time = time.perf_counter()
thread_1.start()
thread_2.start()

print(colors[2] + f"Is event loop running in thread {current_thread().getName()} = {asyncio.get_event_loop().is_running()}" + colors[0])

thread_1.join()
thread_2.join()
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[3] + f"Event Loop Start Time: {start_time}\nEvent Loop End Time: {end_time}\nEvent Loop Execution Time: {execution_time:0.2f} seconds." + colors[0])

Execution Command: python async_event_loop.py

Output:

Try it out yourself and examine the output and you’ll realize that each spawned thread is running its own event loop.

Types of event loops

There are two types of event loops:

  • SelectorEventLoop: SelectorEventLoop is based on the selectors module and is the default loop on all platforms.
  • ProactorEventLoop: ProactorEventLoop is based on Windows’ I/O Completion Ports and is only supported on Windows.

2. Futures

Future represents a computation that is either in progress or will get scheduled in the future. It is a special low-level awaitable object that represents an eventual result of an asynchronous operation. Don’t confuse threading.Future and asyncio.Future.

Example Code:

import time  
import asyncio
from asyncio import Future

# ANSI colors
colors = (
"\033[0m", # End of color
"\033[31m", # Red
"\033[32m", # Green
"\033[34m", # Blue
)


async def bar(future):
print(colors[1] + "bar will sleep for 3 seconds" + colors[0])
await asyncio.sleep(3)
print(colors[1] + "bar resolving the future" + colors[0])
future.done()
future.set_result("future is resolved")


async def foo(future):
print(colors[2] + "foo will await the future" + colors[0])
await future
print(colors[2] + "foo finds the future resolved" + colors[0])


async def main():
future = Future()
await asyncio.gather(foo(future), bar(future))


if __name__ == "__main__":
start_time = time.perf_counter()
asyncio.run(main())
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[3] + f"Future Start Time: {start_time}\nFuture End Time: {end_time}\nFuture Execution Time: {execution_time:0.2f} seconds." + colors[0])

Execution Command: python async_futures.py

Output:

Both the coroutines are passed a future. The foo() coroutine awaits for the future to get resolved, while the bar() coroutine resolves the future after three seconds.

3. Tasks

Tasks are like futures, in fact, Task is a subclass of Future and can be created using the following methods:

  • asyncio.create_task() accepts coroutines and wraps them as tasks.
  • loop.create_task() only accepts coroutines.
  • asyncio.ensure_future() accepts futures, coroutines and any awaitable objects.

Tasks wrap coroutines and run them in event loops. If a coroutine awaits on a Future, the Task suspends the execution of the coroutine and waits for the Future to complete. When the Future is done, the execution of the wrapped coroutine resumes.

Example Code:

import time  
import asyncio
from asyncio import Future

# ANSI colors
colors = (
"\033[0m", # End of color
"\033[31m", # Red
"\033[32m", # Green
"\033[34m", # Blue
)


async def bar(future):
print(colors[1] + "bar will sleep for 3 seconds" + colors[0])
await asyncio.sleep(3)
print(colors[1] + "bar resolving the future" + colors[0])
future.done()
future.set_result("future is resolved")


async def foo(future):
print(colors[2] + "foo will await the future" + colors[0])
await future
print(colors[2] + "foo finds the future resolved" + colors[0])


async def main():
future = Future()

loop = asyncio.get_event_loop()
t1 = loop.create_task(bar(future))
t2 = loop.create_task(foo(future))

await t2, t1


if __name__ == "__main__":
start_time = time.perf_counter()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[3] + f"Future Start Time: {start_time}\nFuture End Time: {end_time}\nFuture Execution Time: {execution_time:0.2f} seconds." + colors[0])

Execution Command: python async_tasks.py

Output:

4. Chaining Coroutines:

A key feature of coroutines is that they can be chained together. A coroutine object is awaitable, so another coroutine can await it. This allows you to break programs into smaller, manageable, recyclable coroutines:

Example Code:

import sys  
import asyncio
import random
import time

# ANSI colors
colors = (
"\033[0m", # End of color
"\033[31m", # Red
"\033[32m", # Green
"\033[36m", # Cyan
"\033[34m", # Blue
)


async def function1(n: int) -> str:
i = random.randint(0, 10)
print(colors[1] + f"function1({n}) is sleeping for {i} seconds." + colors[0])
await asyncio.sleep(i)
result = f"result{n}-1"
print(colors[1] + f"Returning function1({n}) == {result}." + colors[0])
return result


async def function2(n: int, arg: str) -> str:
i = random.randint(0, 10)
print(colors[2] + f"function2{n, arg} is sleeping for {i} seconds." + colors[0])
await asyncio.sleep(i)
result = f"result{n}-2 derived from {arg}"
print(colors[2] + f"Returning function2{n, arg} == {result}." + colors[0])
return result


async def chain(n: int) -> None:
start = time.perf_counter()
p1 = await function1(n)
p2 = await function2(n, p1)
end = time.perf_counter() - start
print(colors[3] + f"--> Chained result{n} => {p2} (took {end:0.2f} seconds)." + colors[0])


async def main(*args):
await asyncio.gather(*(chain(n) for n in args))


if __name__ == "__main__":
random.seed(444)
args = [1, 2, 3] if len(sys.argv) == 1 else map(int, sys.argv[1:])
start_time = time.perf_counter()
asyncio.run(main(*args))
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[4] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])

Pay careful attention to the output, where function1() sleeps for a variable amount of time, and function2() begins working with the results as they become available:

Execution Command: python async_chained.py 11 8 5

Output:

5. Using a Queue:

In this design, there is no chaining of any individual consumer to a producer. The consumers don’t know the number of producers, or even the cumulative number of items that will be added to the queue, in advance.

It takes an individual producer or consumer a variable amount of time to put and extract items from the queue, respectively. The queue serves as a throughput that can communicate with the producers and consumers without them talking to each other directly.

Example Code:

import asyncio  
import argparse
import itertools as it
import os
import random
import time

# ANSI colors
colors = (
"\033[0m", # End of color
"\033[31m", # Red
"\033[32m", # Green
"\033[36m", # Cyan
"\033[34m", # Blue
)


async def generate_item(size: int = 5) -> str:
return os.urandom(size).hex()


async def random_sleep(caller=None) -> None:
i = random.randint(0, 10)
if caller:
print(colors[1] + f"{caller} sleeping for {i} seconds." + colors[0])
await asyncio.sleep(i)


async def produce(name: int, producer_queue: asyncio.Queue) -> None:
n = random.randint(0, 10)
for _ in it.repeat(None, n): # Synchronous loop for each single producer
await random_sleep(caller=f"Producer {name}")
i = await generate_item()
t = time.perf_counter()
await producer_queue.put((i, t))
print(colors[2] + f"Producer {name} added <{i}> to queue." + colors[0])


async def consume(name: int, consumer_queue: asyncio.Queue) -> None:
while True:
await random_sleep(caller=f"Consumer {name}")
i, t = await consumer_queue.get()
now = time.perf_counter()
print(colors[3] + f"Consumer {name} got element <{i}>" f" in {now - t:0.5f} seconds." + colors[0])
consumer_queue.task_done()


async def main(no_producer: int, no_consumer: int):
q = asyncio.Queue()
producers = [asyncio.create_task(produce(n, q)) for n in range(no_producer)]
consumers = [asyncio.create_task(consume(n, q)) for n in range(no_consumer)]
await asyncio.gather(*producers)
await q.join() # Implicitly awaits consumers, too
for consumer in consumers:
consumer.cancel()


if __name__ == "__main__":
random.seed(444)
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--no_producer", type=int, default=10)
parser.add_argument("-c", "--no_consumer", type=int, default=15)
ns = parser.parse_args()
start_time = time.perf_counter()
asyncio.run(main(**ns.__dict__))
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[4] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])

Execution Command: python async_queue.py -p 2 -c 4

Output:

Lastly, let’s have an example of how asyncio cuts down on wait time: given a coroutine generate_random_int() that keeps producing random integers in the range [0, 10], until one of them exceeds a threshold, you want to let multiple calls of this coroutine not need to wait for each other to complete in succession.

Example Code:

import time  
import asyncio
import random

# ANSI colors
colors = (
"\033[0m", # End of color
"\033[31m", # Red
"\033[32m", # Green
"\033[36m", # Cyan
"\033[35m", # Magenta
"\033[34m", # Blue
)


async def generate_random_int(indx: int, threshold: int = 5) -> int:
print(colors[indx + 1] + f"Initiated generate_random_int({indx}).")
i = random.randint(0, 10)
while i <= threshold:
print(colors[indx + 1] + f"generate_random_int({indx}) == {i} too low; retrying.")
await asyncio.sleep(indx + 1)
i = random.randint(0, 10)
print(colors[indx + 1] + f"---> Finished: generate_random_int({indx}) == {i}" + colors[0])
return i


async def main():
res = await asyncio.gather(*(generate_random_int(i, 10 - i - 1) for i in range(3)))
return res


if __name__ == "__main__":
random.seed(444)
start_time = time.perf_counter()
r1, r2, r3 = asyncio.run(main())
print(colors[4] + f"\nRandom INT 1: {r1}, Random INT 2: {r2}, Random INT 3: {r3}\n" + colors[0])
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[5] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])

Execution Command: python async_random.py

Output:

Note: If you’re writing any code yourself, prefer native coroutines for the sake of being explicit rather than implicit. Generator based coroutines will be removed in Python 3.10.

GitHub Repo: https://github.com/tssovi/asynchronous-in-python

--

--