Asyncio basics in python
--
Python 3.5 brought with it asyncio. An event loop based paradigm previously available as a library but now it is built in as a standard library. There are other async libraries out there, but I am going to focus on the one built in to Python.
Creating an event loop
This is the basic example to get you up and running. This is the core of asyncio. Starting an event loop and running some function on top of it. Some frameworks abstract this from you and handle it on the bootstrap of the application layer (aiohttp, for example).
The async function
Just like defining a regular function, except we add the async
keyword. This not only let's you know the function is asynchronous, but it also let's the interpreter know that it’s a special function. The interpreter wraps up the function inside a coroutine
and then hands it to you so you can handle it.
In our first example we started an event loop and ran our function inside run_until_complete()
That will run your function until all synchronous and non-synchronous calls are complete. I like to think of this step as an instantiation of our asynchronous paradigm.
The real magic — await
Cool. We have our code starting to come together. Let’s add a new async function and rename some_func
to run
since that is all we are going to have it do -- run our async logic.
await
tells our event loop to pause here and wait for the called coroutine to finish work before continuing on.
What happens if you leave out await
? There are times that do not require you to immediately await
your coroutine, but at some point before the execution of your code is complete, you will need to handle it in some way, or you will receive an RuntimeWarning once our coroutine is out of scope.
Starting a function and getting back to it later.
The first thing you might need an async pattern for is starting a job in the background, do other tasks in the front and then coming back to your original task.
If you run this you will see the results:
That’s what we wanted right? Close. We wanted to start our async job speak()
and when it was done, return the results. However, the first thing we printed to our terminal was A
not C
. We did not actually start our job, we just defined it. We started it’s execution later when we called await will_speak
. To start the job running while we move on to other tasks, we need to use asyncio.ensure_future()
. (Update 12/04/18) ensure_future()
returns a Task
object, which is correct, but is not really its purpose. In Python 3.7, we now have asyncio.create_task()
which gives us a succinct method for creating a Task.
Note: I added a 1 second sleep after we did create_task()
That's because will_speak
is now running concurrently with the rest of the function, and you cannot guarantee that C
will print before A
and B
, so putting a sleep()
in there ensures the speak()
function can finish it's task first. This is done just for the illustration purposes and not necessarily necessary in practice. (that was fun to say)
Cool. We now have a function that branches and runs while we continue on.
Order mucking
Let’s look at how our coroutines are handled. In the above example, I am showing that you can define your work in any order. Then await
them out of order. Or in order. Or out of order and then back in order. Or any combination. Great. They all run sequentially though in the order they are awaited. What about if we wanted all those to run at the same time. Well, we could create_task
all the things (which is just fine). OR we can show you another way.
Concurrent runnings
Here we are creating a list and then adding all of our items to said list. In this case, we are just iterating over a range()
and appending those coroutines to that list. Then with asyncio.wait()
we can wait for all those coroutines to complete. (Don't forget to await
the asyncio.wait()
, a common mistake.)
One thing you should keep in mind, if you do not wrap your coroutines in create_task()
then you cannot ensure order.
This is because wait
(and gather
, which we’ll talk about in a minute) puts coroutines in a set, which are not ordered, and Task
objects are able to maintain order. (Tested in Python 3.6)
Update 03/20/18:
Python 3.5–3.6 iterates over a set which causes jobs to be processed out of order. 3.7 corrects this. Shout out to dzunukwa for his additional research on this. So in 3.7 order is maintained and create_task()
is no longer “required”. And as dzunukwa points out, it can also be written like this.
async def run():
coros = [meow(x) for x in range(6)]
x = await asyncio.gather(*coros)
print(x)
Which utilizes a list comprehension and looks a bit cleaner. Rather nice, if all you are doing is iterating through jobs.
Handling async function results
Our previous example handles the concurrency beautifully (albeit out of order), but what if you want to do something with the results of those functions, a la “scatter-gather”? Use gather()
!
Typically asyncio.gather()
is actually what you want, over asyncio.wait()
, since it gives you the added benefit of collecting the results.
That it? What else you got?
A couple things come to mind. Look at this example.
create_task
wraps up our coroutine in a Task
. A task is more robust than a coroutine. First a task begins execution immediately when it is created, and then stores the results inside of its object. Then, any time in the future if you call await
on your future, you can retrieve the result, without having to re-execute the item again. A coroutine dies as soon as it is await
’ed, and will raise an exception if you try and use it again.
And lastly, just have fun with it. You can do all sorts of crazy stuff with asyncio. Execute things in weird orders, or whatever makes sense for your application.
Update (03–31–18): Blocking the loop
One of the most frustrating things you will run into is blocking the event loop. This happens when using a non async function or library. For example if you use requests
(non-async as of this update) it will block the event loop, halting all execution until the http call completes and produces a result. Other things that might block the event loop: time.sleep()
If you need a non-blocking sleep use asyncio.sleep()
Http requests, use a library like aiohttp
for async requesting. AMQP, aioamqp
is a great alternative to Pika. Last example, Databases. Postgres users I recommend asyncpg
and asyncpgsa
.
Example of blocking the event loop.
import asyncio
import timeasync def bark():
await asyncio.sleep(3)
time.sleep(3) # This here is the culpritasync def speak():
await bark()
return True
async def run():
r = [speak() for x in range(10)]
await asyncio.wait(r)loop = asyncio.get_event_loop()
loop.run_until_complete(run())
If we were to profile the above example in Pycharm, you can see that the first 3 seconds all jobs run at the same. Then we hit a blocking call, and each task is forced to wait until the previous jobs sleep finishes.
Update (12–04–18): Unblock the Blockers
I ran into a situation recently where I was using a library that was synchronous. This threw a wrench into how I had designed my worker service. Now, instead of being able to run many jobs simultaneously, all jobs stopped to wait for my synchronous code to run. I started to overthink everything and even started playing the idea of re-architecting the service.
Thankfully I have Nick Humrich at my disposal and after only a few minutes of talking, he reminded me that there is a simple solution. loop.run_in_executor()
allows you to take a non async function and put it into an executor, without getting technical makes a sync function async. Take the following code for example:
import asyncio
import timedef blocking_func():
print('before block')
time.sleep(3)
print('after block')async def run():
print('before run')
blocking_func()
print('after run')async def start():
jobs = [run(), run()]
await asyncio.wait(jobs)loop = asyncio.get_event_loop()loop.run_until_complete(start())
As you can see, the above is an example of a blocking function. The first run()
function has to pass the blocking_func()
before the second run()
is able to execute. So rather than calling blocking_func()
lets wrap it inside of an executor.
import asyncio
import timedef blocking_func():
print('before block')
time.sleep(3)
print('after block')async def run():
print('before run')
await loop.run_in_executor(None, blocking_func)
print('after run')async def start():
jobs = [run(), run()]
await asyncio.wait(jobs)loop = asyncio.get_event_loop()loop.run_until_complete(start())
Since loop.run_in_ececutor()
is an async function, don’t forget to await
it. The first parameter allows you to pass in a separate executor, and uses the default if we pass in None
. The second parameter is an instance of the function to call inside the executor, and any additional parameters are passed into the function when it is called. Viola! No more blocking the event loop!
Skyler Lewis makes no claims to expertise or exactness of the information presented and may not reflect the opinions of the owners or advertisers. Parental guidance isn’t recommended. Any rebroadcast, retransmission, or account of this article, without the express written consent of Major League Baseball, is not prohibited. Use as directed. May cause sleepiness, headaches, backaches, or thoughts of bacon wrapped shrimp.