How does AsyncIO works?
What is asyncio?
Asyncio stands for asynchronous input output and refers to a programming paradigm which achieves high concurrency using a single thread or event loop. Asynchronous programming is a type of parallel programming in which a unit of work is allowed to run separately from the primary application thread. When the work is complete, it notifies the main thread about completion or failure of the worker thread.
Let’s have a look in below image:
Let’s understand asyncio with an example:
To understand the concept behind asyncio, let’s consider a restaurant with a single waiter. Suddenly, three customers, A, B and C show up. The three of them take a varying amount of time to decide what to eat once they receive the menu from the waiter.
Let’s assume A takes 5 minutes, B 10 minutes and C 1 minute to decide. If the single waiter starts with B first and takes B’s order in 10 minutes, next he serves A and spends 5 minutes on noting down his order and finally spends 1 minute to know what C wants to eat. So, in total, waiter spends 10 + 5 + 1 = 16 minutes to take down their orders. However, notice in this sequence of events, C ends up waiting 15 minutes before the waiter gets to him, A waits 10 minutes and B waits 0 minutes.
Now consider if the waiter knew the time each customer would take to decide. He can start with C first, then go to A and finally to B. This way each customer would experience a 0 minute wait. An illusion of three waiters, one dedicated to each customer is created even though there’s only one.
Lastly, the total time it takes for the waiter to take all three orders is 10 minutes, much less than the 16 minutes in the other scenario.
Let’s go through another example:
Suppose, Chess master Magnus Carlsen hosts a chess exhibition in which he plays with multiple amateur players. He has two ways of conducting the exhibition: synchronously and asynchronously.
Assumptions:
- 24 opponents
- Magnus Carlsen makes each chess move in 5 seconds
- Opponents each take 55 seconds to make a move
- Games average 30 pair-moves (60 moves total)
Synchronously: Magnus Carlsen plays one game at a time, never two at the same time, until the game is complete. Each game takes (55 + 5) * 30 == 1800 seconds, or 30 minutes. The entire exhibition takes 24 * 30 == 720 minutes, or 12 hours.
Asynchronously: Magnus Carlsen moves from table to table, making one move at each table. He leaves the table and lets the opponent make their next move during the wait time. One move on all 24 games takes Judit 24 * 5 == 120 seconds, or 2 minutes. The entire exhibition is now cut down to 120 * 30 == 3600 seconds, or just 1 hour
There is only one Magnus Carlsen, who has only two hands and makes only one move at a time by himself. But playing asynchronously cuts the exhibition time down from 12 hours to one.
Coding Example:
Let try to demonstrate Synchronous and Asynchronous execution time using code snippet.
Asynchronous — async_count.py
import asyncio
import time
async def count():
print("One", end=" ")
await asyncio.sleep(1)
print("Two", end=" ")
await asyncio.sleep(2)
print("Three", end=" ")
async def main():
await asyncio.gather(count(), count(), count(), count(), count())
if __name__ == "__main__":
start_time = time.perf_counter()
asyncio.run(main())
end_time = time.perf_counter()
execution_time = end_time - start_time
print(f"\nExecuting - {__file__}\nExecution Starts: {start_time}\nExecutions Ends: {end_time}\nTotals Execution Time:{execution_time:0.2f} seconds.")
Asynchronous — Output:
One One One One One Two Two Two Two Two Three Three Three Three Three
Executing - async_count.py
Execution Starts: 18453.442160108
Executions Ends: 18456.444719712
Totals Execution Time:3.00 seconds.
Synchronous — sync_count.py
import time
def count():
print("One", end=" ")
time.sleep(1)
print("Two", end=" ")
time.sleep(2)
print("Three", end=" ")
def main():
for _ in range(5):
count()
if __name__ == "__main__":
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
execution_time = end_time - start_time
print(f"\nExecuting - {__file__}\nExecution Starts: {start_time}\nExecutions Ends: {end_time}\nTotals Execution Time:{execution_time:0.2f} seconds.")
Synchronous — Output:
One Two Three One Two Three One Two Three One Two Three One Two Three
Executing - sync_count.py
Execution Starts: 18875.175965998
Executions Ends: 18890.189930292
Totals Execution Time:15.01 seconds.
Why use asyncio instead of multithreading in Python?
- It’s very difficult to write code that is thread safe. With asynchronous code, you know exactly where the code will shift from one task to the next and race conditions are much harder to come by.
- Threads consume a fair amount of data since each thread needs to have its own stack. With async code, all the code shares the same stack and the stack is kept small due to continuously unwinding the stack between tasks.
- Threads are OS structures and therefore require more memory for the platform to support. There is no such problem with asynchronous tasks.
How does asyncio works?
Before going deep let’s recall Python Generator
Python Generator:
Functions containing a yield
statement are compiled as generators. Using a yield expression in a function’s body causes that function to be a generator. These functions return an object which supports the iteration protocol methods. The generator object created automatically receives a __next()__
method. Going back to the example from the previous section we can invoke __next__
directly on the generator object instead of using next()
:
def asynchronous():
yield "Educative"
if __name__ == "__main__":
gen = asynchronous() str = gen.__next__()
print(str)
Remember the following about generators:
- Generator functions allow you to procrastinate computing expensive values. You only compute the next value when required. This makes generators memory and compute efficient; they refrain from saving long sequences in memory or doing all expensive computations upfront.
- Generators, when suspended, retain the code location, which is the last yield statement executed, and their entire local scope. This allows them to resume execution from where they left off.
- Generator objects are nothing more than iterators.
- Remember to make a distinction between a generator function and the associated generator object which are often used interchangeably. A generator function when invoked returns a generator object and
next()
is invoked on the generator object to run the code within the generator function.
States of a generator:
A generator goes through the following states:
GEN_CREATED
when a generator object has been returned for the first time from a generator function and iteration hasn’t started.GEN_RUNNING
when next has been invoked on the generator object and is being executed by the python interpreter.GEN_SUSPENDED
when a generator is suspended at a yieldGEN_CLOSED
when a generator has completed execution or has been closed.
Methods on generator objects:
A generator object exposes different methods that can be invoked to manipulate the generator. These are:
throw()
send()
close()
Let’s deep dive into more details explanations
The rules of asyncio:
- The syntax
async def
introduces either a native coroutine or an asynchronous generator. The expressionsasync with
andasync for
are also valid. - The keyword
await
passes function control back to the event loop. (It suspends the execution of the surrounding coroutine.) If Python encounters anawait f()
expression in the scope ofg()
, this is howawait
tells the event loop, "Suspend execution ofg()
until whatever I’m waiting on—the result off()
—is returned. In the meantime, go let something else run."
In code, that second bullet point looks roughly like this:
async def g():
# Pause here and come back to g() when f() is ready
r = await f()
return r
There’s also a strict set of rules around when and how you can and cannot use async
/await
. These can be handy whether you are still picking up the syntax or already have exposure to using async
/await
:
- A function that you introduce with
async def
is a coroutine. It may useawait
,return
, oryield
, but all of these are optional. Declaringasync def noop(): pass
is valid: - Using
await
and/orreturn
creates a coroutine function. To call a coroutine function, you mustawait
it to get its results. - It is less common to use
yield
in anasync def
block. This creates an asynchronous generator, which you iterate over withasync for
. Forget about async generators for the time being and focus on getting down the syntax for coroutine functions, which useawait
and/orreturn
. - Anything defined with
async def
may not useyield from
, which will raise aSyntaxError
. - Just like it’s a
SyntaxError
to useyield
outside of adef
function, it is aSyntaxError
to useawait
outside of anasync def
coroutine. You can only useawait
in the body of coroutines.
Here are some terse examples meant to summarize the above few rules:
async def f(x):
y = await z(x) # OK - `await` and `return` allowed in coroutines
return yasync def g(x):
yield x # OK - this is an async generatorasync def m(x):
yield from gen(x) # NO - SyntaxErrordef m(x):
y = await z(x) # NO - SyntaxError (no `async def` here)
return y
Generator Based Coroutine
Python created a distinction between Python generators and generators that were meant to be used as coroutines. These coroutines are called generator-based coroutines and require the decorator @asynio.coroutine
to be added to the function definition, though this isn’t strictly enforced.
Generator based coroutines use yield from
syntax instead of yield
. A coroutine can:
- yield from another coroutine
- yield from a future
- return an expression
- raise exception
Coroutines in Python make cooperative multitasking possible. Cooperative multitasking is the approach in which the running process voluntarily gives up the CPU to other processes. A process may do so when it is logically blocked, say while waiting for user input or when it has initiated a network request and will be idle for a while. A coroutine can be defined as a special function that can give up control to its caller without losing its state.
So what’s the difference between coroutines and generators?
Generators are essentially iterators though they look like functions. The distinction between generators and coroutines, in general, is that:
- Generators yield back a value to the invoker whereas a coroutine yields control to another coroutine and can resume execution from the point it gives up control.
- A generator can’t accept arguments once started whereas a coroutine can.
- Generators are primarily used to simplify writing iterators. They are a type of coroutine and sometimes also called as semicoroutines.
Generator Based Coroutine Example
The simplest generator based coroutine we can write is as follows:
@asyncio.coroutine
def do_something_important():
yield from asyncio.sleep(1)
The coroutine sleeps for one second. Note the decorator and the use of yield from
.
Native Based Coroutine Example
By native it is meant that the language introduced syntax to specifically define coroutines, making them first class citizens in the language. Native coroutines can be defined using the async/await
syntax. The simplest native based coroutine we can write is as follows:
async def do_something_important():
await asyncio.sleep(1)
AsyncIO Design Patterns
AsyncIO comes with its own set of possible script designs, which we will discuss in this section.
1. Event loops
The event loop is a programming construct that waits for events to happen and then dispatches them to an event handler. An event can be a user clicking on a UI button or a process initiating a file download. At the core of asynchronous programming, sits the event loop.
Example Code:
import asyncio
import random
import time
from threading import Thread
from threading import current_thread
# ANSI colors
colors = (
"\033[0m", # End of color
"\033[31m", # Red
"\033[32m", # Green
"\033[34m", # Blue
)
async def do_something_important(sleep_for):
print(colors[1] + f"Is event loop running in thread {current_thread().getName()} = {asyncio.get_event_loop().is_running()}" + colors[0])
await asyncio.sleep(sleep_for)
def launch_event_loops():
# get a new event loop
loop = asyncio.new_event_loop()
# set the event loop for the current thread
asyncio.set_event_loop(loop)
# run a coroutine on the event loop
loop.run_until_complete(do_something_important(random.randint(1, 5)))
# remember to close the loop
loop.close()
if __name__ == "__main__":
thread_1 = Thread(target=launch_event_loops)
thread_2 = Thread(target=launch_event_loops)
start_time = time.perf_counter()
thread_1.start()
thread_2.start()
print(colors[2] + f"Is event loop running in thread {current_thread().getName()} = {asyncio.get_event_loop().is_running()}" + colors[0])
thread_1.join()
thread_2.join()
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[3] + f"Event Loop Start Time: {start_time}\nEvent Loop End Time: {end_time}\nEvent Loop Execution Time: {execution_time:0.2f} seconds." + colors[0])
Execution Command: python async_event_loop.py
Output:
Try it out yourself and examine the output and you’ll realize that each spawned thread is running its own event loop.
Types of event loops
There are two types of event loops:
- SelectorEventLoop: SelectorEventLoop is based on the selectors module and is the default loop on all platforms.
- ProactorEventLoop: ProactorEventLoop is based on Windows’ I/O Completion Ports and is only supported on Windows.
2. Futures
Future represents a computation that is either in progress or will get scheduled in the future. It is a special low-level awaitable object that represents an eventual result of an asynchronous operation. Don’t confuse threading.Future
and asyncio.Future
.
Example Code:
import time
import asyncio
from asyncio import Future
# ANSI colors
colors = (
"\033[0m", # End of color
"\033[31m", # Red
"\033[32m", # Green
"\033[34m", # Blue
)
async def bar(future):
print(colors[1] + "bar will sleep for 3 seconds" + colors[0])
await asyncio.sleep(3)
print(colors[1] + "bar resolving the future" + colors[0])
future.done()
future.set_result("future is resolved")
async def foo(future):
print(colors[2] + "foo will await the future" + colors[0])
await future
print(colors[2] + "foo finds the future resolved" + colors[0])
async def main():
future = Future()
await asyncio.gather(foo(future), bar(future))
if __name__ == "__main__":
start_time = time.perf_counter()
asyncio.run(main())
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[3] + f"Future Start Time: {start_time}\nFuture End Time: {end_time}\nFuture Execution Time: {execution_time:0.2f} seconds." + colors[0])
Execution Command: python async_futures.py
Output:
Both the coroutines are passed a future. The foo()
coroutine awaits for the future to get resolved, while the bar()
coroutine resolves the future after three seconds.
3. Tasks
Tasks are like futures, in fact, Task is a subclass of Future and can be created using the following methods:
asyncio.create_task()
accepts coroutines and wraps them as tasks.loop.create_task()
only accepts coroutines.asyncio.ensure_future()
accepts futures, coroutines and any awaitable objects.
Tasks wrap coroutines and run them in event loops. If a coroutine awaits on a Future, the Task suspends the execution of the coroutine and waits for the Future to complete. When the Future is done, the execution of the wrapped coroutine resumes.
Example Code:
import time
import asyncio
from asyncio import Future
# ANSI colors
colors = (
"\033[0m", # End of color
"\033[31m", # Red
"\033[32m", # Green
"\033[34m", # Blue
)
async def bar(future):
print(colors[1] + "bar will sleep for 3 seconds" + colors[0])
await asyncio.sleep(3)
print(colors[1] + "bar resolving the future" + colors[0])
future.done()
future.set_result("future is resolved")
async def foo(future):
print(colors[2] + "foo will await the future" + colors[0])
await future
print(colors[2] + "foo finds the future resolved" + colors[0])
async def main():
future = Future()
loop = asyncio.get_event_loop()
t1 = loop.create_task(bar(future))
t2 = loop.create_task(foo(future))
await t2, t1
if __name__ == "__main__":
start_time = time.perf_counter()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[3] + f"Future Start Time: {start_time}\nFuture End Time: {end_time}\nFuture Execution Time: {execution_time:0.2f} seconds." + colors[0])
Execution Command: python async_tasks.py
Output:
4. Chaining Coroutines:
A key feature of coroutines is that they can be chained together. A coroutine object is awaitable, so another coroutine can await
it. This allows you to break programs into smaller, manageable, recyclable coroutines:
Example Code:
import sys
import asyncio
import random
import time
# ANSI colors
colors = (
"\033[0m", # End of color
"\033[31m", # Red
"\033[32m", # Green
"\033[36m", # Cyan
"\033[34m", # Blue
)
async def function1(n: int) -> str:
i = random.randint(0, 10)
print(colors[1] + f"function1({n}) is sleeping for {i} seconds." + colors[0])
await asyncio.sleep(i)
result = f"result{n}-1"
print(colors[1] + f"Returning function1({n}) == {result}." + colors[0])
return result
async def function2(n: int, arg: str) -> str:
i = random.randint(0, 10)
print(colors[2] + f"function2{n, arg} is sleeping for {i} seconds." + colors[0])
await asyncio.sleep(i)
result = f"result{n}-2 derived from {arg}"
print(colors[2] + f"Returning function2{n, arg} == {result}." + colors[0])
return result
async def chain(n: int) -> None:
start = time.perf_counter()
p1 = await function1(n)
p2 = await function2(n, p1)
end = time.perf_counter() - start
print(colors[3] + f"--> Chained result{n} => {p2} (took {end:0.2f} seconds)." + colors[0])
async def main(*args):
await asyncio.gather(*(chain(n) for n in args))
if __name__ == "__main__":
random.seed(444)
args = [1, 2, 3] if len(sys.argv) == 1 else map(int, sys.argv[1:])
start_time = time.perf_counter()
asyncio.run(main(*args))
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[4] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])
Pay careful attention to the output, where function1()
sleeps for a variable amount of time, and function2()
begins working with the results as they become available:
Execution Command: python async_chained.py 11 8 5
Output:
5. Using a Queue:
In this design, there is no chaining of any individual consumer to a producer. The consumers don’t know the number of producers, or even the cumulative number of items that will be added to the queue, in advance.
It takes an individual producer or consumer a variable amount of time to put and extract items from the queue, respectively. The queue serves as a throughput that can communicate with the producers and consumers without them talking to each other directly.
Example Code:
import asyncio
import argparse
import itertools as it
import os
import random
import time
# ANSI colors
colors = (
"\033[0m", # End of color
"\033[31m", # Red
"\033[32m", # Green
"\033[36m", # Cyan
"\033[34m", # Blue
)
async def generate_item(size: int = 5) -> str:
return os.urandom(size).hex()
async def random_sleep(caller=None) -> None:
i = random.randint(0, 10)
if caller:
print(colors[1] + f"{caller} sleeping for {i} seconds." + colors[0])
await asyncio.sleep(i)
async def produce(name: int, producer_queue: asyncio.Queue) -> None:
n = random.randint(0, 10)
for _ in it.repeat(None, n): # Synchronous loop for each single producer
await random_sleep(caller=f"Producer {name}")
i = await generate_item()
t = time.perf_counter()
await producer_queue.put((i, t))
print(colors[2] + f"Producer {name} added <{i}> to queue." + colors[0])
async def consume(name: int, consumer_queue: asyncio.Queue) -> None:
while True:
await random_sleep(caller=f"Consumer {name}")
i, t = await consumer_queue.get()
now = time.perf_counter()
print(colors[3] + f"Consumer {name} got element <{i}>" f" in {now - t:0.5f} seconds." + colors[0])
consumer_queue.task_done()
async def main(no_producer: int, no_consumer: int):
q = asyncio.Queue()
producers = [asyncio.create_task(produce(n, q)) for n in range(no_producer)]
consumers = [asyncio.create_task(consume(n, q)) for n in range(no_consumer)]
await asyncio.gather(*producers)
await q.join() # Implicitly awaits consumers, too
for consumer in consumers:
consumer.cancel()
if __name__ == "__main__":
random.seed(444)
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--no_producer", type=int, default=10)
parser.add_argument("-c", "--no_consumer", type=int, default=15)
ns = parser.parse_args()
start_time = time.perf_counter()
asyncio.run(main(**ns.__dict__))
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[4] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])
Execution Command: python async_queue.py -p 2 -c 4
Output:
Lastly, let’s have an example of how asyncio cuts down on wait time: given a coroutine generate_random_int()
that keeps producing random integers in the range [0, 10], until one of them exceeds a threshold, you want to let multiple calls of this coroutine not need to wait for each other to complete in succession.
Example Code:
import time
import asyncio
import random
# ANSI colors
colors = (
"\033[0m", # End of color
"\033[31m", # Red
"\033[32m", # Green
"\033[36m", # Cyan
"\033[35m", # Magenta
"\033[34m", # Blue
)
async def generate_random_int(indx: int, threshold: int = 5) -> int:
print(colors[indx + 1] + f"Initiated generate_random_int({indx}).")
i = random.randint(0, 10)
while i <= threshold:
print(colors[indx + 1] + f"generate_random_int({indx}) == {i} too low; retrying.")
await asyncio.sleep(indx + 1)
i = random.randint(0, 10)
print(colors[indx + 1] + f"---> Finished: generate_random_int({indx}) == {i}" + colors[0])
return i
async def main():
res = await asyncio.gather(*(generate_random_int(i, 10 - i - 1) for i in range(3)))
return res
if __name__ == "__main__":
random.seed(444)
start_time = time.perf_counter()
r1, r2, r3 = asyncio.run(main())
print(colors[4] + f"\nRandom INT 1: {r1}, Random INT 2: {r2}, Random INT 3: {r3}\n" + colors[0])
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[5] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])
Execution Command: python async_random.py
Output:
Note: If you’re writing any code yourself, prefer native coroutines for the sake of being explicit rather than implicit. Generator based coroutines will be removed in Python 3.10.
GitHub Repo: https://github.com/tssovi/asynchronous-in-python