Asyncio Coroutine Patterns: Errors and cancellation

This is the second part of a two part series on coroutine patterns in asyncio, to fully benefit from this article please read the first installment: Asyncio Coroutine Patterns: Beyond await

In the first part of this series we concluded that asyncio is awesome, coroutines are awesome and our code is awesome. But sometimes the outside world is not as awesome and we have to deal with it.

Now, for this second part of the series, I’ll run over the options asyncio gives us to handle errors when using these patterns as well as cancelling tasks so as to make our asynchronous systems robust and performant.

If you are really new to asyncio I recommend having a read through my very first article on the subject, Asyncio for the Working Python Developer, before diving into this series.

I will continue using the Hacker News API and also be using the async/await syntax introduced Python 3.5+. All example code is also available in the Github repo for this series.

Let’s get to it!

Error handling

If you recall at the end of the previous article we had a lovely system using periodic task that fetched the top stories in HN and recursively calculates the number of comments for each of them. Here’s the full listing:

Notice how URLFetcher raises a BoomException after a set number of fetches and there’s no exception handling anywhere in the code.

Here’s our periodic coroutine one more time:

When we run the script we get something like:

[16:13:16] Calculating comments for top 5 stories. (1)
[16:13:16] Waiting for 5 seconds...
[16:13:17] Exception in callback poll_top_stories_for_comments.<locals>.callback(<Task finishe...ion('BOOM!',)>) at 01_error_handling.py:126
handle: <Handle poll_top_stories_for_comments.<locals>.callback(<Task finishe...ion('BOOM!',)>) at 01_error_handling.py:126>
Traceback (most recent call last):
File "/Users/yeray/.pyenv/versions/3.6.0/lib/python3.6/asyncio/events.py", line 126, in _run
self._callback(*self._args)
File "01_error_handling.py", line 127, in callback
fetch_count = fut.result()
File "01_error_handling.py", line 104, in get_comments_of_top_stories
results = await asyncio.gather(*tasks)
File "01_error_handling.py", line 71, in post_number_of_comments
response = await fetcher.fetch(session, url)
File "01_error_handling.py", line 60, in fetch
raise BoomException('BOOM!')
BoomException: BOOM!

[16:13:21] Calculating comments for top 5 stories. (2)
[16:13:21] Waiting for 5 seconds...
...

See how the system did not crash? Our Tasks completed and when we retrieved their result in the callback an unhandled exception was raised. Usually this would have caused the Python interpreter to stop but it simply continued on attempting to fetch posts and comments a second time.

This is an important point, when using ensure_future, exceptions will not crash the system and might go unnoticed. So you need to ensure you’re handling exceptions and logging them appropriately.

Let’s change our periodic coroutine to handle this exception, remember since we’re using the Future.result() method to retrieve the result of the Task it will raise the exception if that was its result:

[16:21:03] Calculating comments for top 5 stories. (1)
[16:21:03] Waiting for 5 seconds…
[16:21:04] Something went BOOM
Traceback (most recent call last):
File “01b_error_handling.py”, line 128, in callback
fetch_count = fut.result()
File “01b_error_handling.py”, line 104, in get_comments_of_top_stories
results = await asyncio.gather(*tasks)
File “01b_error_handling.py”, line 71, in post_number_of_comments
response = await fetcher.fetch(session, url)
File “01b_error_handling.py”, line 60, in fetch
raise BoomException(‘BOOM!’)
BoomException: BOOM!
[16:21:08] Calculating comments for top 5 stories. (2)
[16:21:08] Waiting for 5 seconds…
[16:21:08] Something went BOOM
Traceback (most recent call last):
File “01b_error_handling.py”, line 128, in callback
fetch_count = fut.result()
File “01b_error_handling.py”, line 104, in get_comments_of_top_stories
results = await asyncio.gather(*tasks)
File “01b_error_handling.py”, line 71, in post_number_of_comments
response = await fetcher.fetch(session, url)
File “01b_error_handling.py”, line 60, in fetch
raise BoomException(‘BOOM!’)
BoomException: BOOM!

This has not completely solved the issue as the system will still not crash, but at least the exception is being logged appropriately.

However, as good practice dictates, we’re catching a specific type of exceptions. What if some other type of exception occurs? Let’s simulate this with as small change in URLFetcher:

[16:23:22] Calculating comments for top 5 stories. (1)
[16:23:22] Waiting for 5 seconds…
[16:23:22] Exception in callback poll_top_stories_for_comments.<locals>.callback(<Task finishe… exception’,)>) at 01c_error_handling.py:129
handle: <Handle poll_top_stories_for_comments.<locals>.callback(<Task finishe… exception’,)>) at 01c_error_handling.py:129>

Traceback (most recent call last):
File “/Users/yeray/.pyenv/versions/3.6.0/lib/python3.6/asyncio/events.py”, line 126, in _run
self._callback(*self._args)
File “01c_error_handling.py”, line 131, in callback
fetch_count = fut.result()
File “01c_error_handling.py”, line 107, in get_comments_of_top_stories
results = await asyncio.gather(*tasks)
File “01c_error_handling.py”, line 74, in post_number_of_comments
response = await fetcher.fetch(session, url)
File “01c_error_handling.py”, line 63, in fetch
raise Exception(‘Random generic exception’)
Exception: Random generic exception

Again producing a silent unlogged error in our system.

The key points here are:

  1. when using await handle exceptions with try..except as usual
  2. when using ensure_future remember to catch generic exceptions and act accordingly

Here’s a complete listing of the system handling both cases:

Ok, so given that we’ve been hitting errors from the HN API we should probably stop the system, otherwise it’s just going to keep raising error after error.

We can do it the hard way and add loop.stop() to our handling of errors in the callback code:

[16:34:00] Calculating comments for top 5 stories. (1)
[16:34:00] Waiting for 5 seconds...
[16:34:01] Error retrieving post 15052691: BOOM!
[16:34:01] Error retrieving comments for top stories: BOOM!
Traceback (most recent call last):
File "02b_error_handling.py", line 175, in <module>
loop, session, args.period, args.limit))
File "/Users/yeray/.pyenv/versions/3.6.0/lib/python3.6/asyncio/base_events.py", line 464, in run_until_complete
raise RuntimeError('Event loop stopped before Future completed.')
RuntimeError: Event loop stopped before Future completed.

Well, that stopped it all right, but asyncio did not like it, we’re forcibly closing the loop while other Futures are still pending, not cool.

In order to exit cleanly we need to return from the periodic coroutine which the loop will detect as complete and close itself.

However the exception is handled inside a callback, but since the callback is triggered independently we need some way of reading the error from the main loop and act accordingly.

We can do that by defining a list of errors in the enclosing scope and mutating it in the callback so we can exit in the next iteration if there are any:

[16:48:59] Calculating comments for top 5 stories. (1)
[16:48:59] Waiting for 5 seconds...
[16:48:59] Error retrieving top stories: Random generic exception
[16:48:59] Unexpected error
Traceback (most recent call last):
File "02c_error_handling.py", line 153, in callback
fetch_count = fut.result()
File "02c_error_handling.py", line 112, in get_comments_of_top_stories
response = await fetcher.fetch(session, TOP_STORIES_URL)
File "02c_error_handling.py", line 64, in fetch
raise Exception('Random generic exception')
Exception: Random generic exception
[16:49:04] Error detected, quitting

I know, not pretty, and notice the timestamp as well, the return happened after the sleep which is not ideal but at least we get a clean exit from the loop.

Remember this happens only when using ensure_future. You can always go back to using await as described my first article if this is unacceptable.

To raise or not to raise

Let’s step back for a moment though. In our example we’re being quite extreme and raising errors after just a few fetches. But if we were to increase that number when the first exception occurs we may have actually calculated some of them, they are, after all, separate tasks. Is there a way to retrieve these possibly completed tasks?

Why, yes, there is! More than one in fact, but let’s put a pin on that and check the gather docs, notice there’s a return_exceptions argument that default to False:

If return_exceptions is True, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future.

Note that this means gather will not raise an exception anymore, so we don’t need the try..except clause in our coroutine. We do, however, need to manually check the list of results to see if there were any errors.

In the following example I increased the number of allowed fetches to allow for some results to come back.

[17:04:43] Calculating comments for top 5 stories. (1)
[17:04:43] Waiting for 5 seconds...
[17:04:47] Error retrieving comments for top stories: BOOM!
[17:04:47] Error retrieving comments for top stories: BOOM!
[17:04:47] Post 15051645 has 75 comments (1)
[17:04:47] Error retrieving comments for top stories: BOOM!
[17:04:47] Post 15052192 has 13 comments (1)
[17:04:47] > Calculating comments took 3.93 seconds and 511 fetches
[17:04:48] Calculating comments for top 5 stories. (2)
[17:04:48] Waiting for 5 seconds...

Look at that! We managed results for 2 out of the 5 top stories!

This particular example might require you to change the MAXIMUM_FETCHES depending on how popular the top stories in HN are at the point you’re running the script, I suggest increasing it to a high number allowing all tasks to complete and then lowering it to just below the fetch counter.

Let’s come back to that pin we put on the different ways to retrieve completed tasks:

Gather vs wait (vs as_completed)

In the asyncio API there are two main functions for scheduling a set tasks at the same time, our familiar gather and wait. The main differences between them are:

  • wait returns a tuple of two sets, done and pending Task objects, while gather returns the results of those tasks.
  • gather returns the results in order, i.e. the first element of the returned list is the result of the Task object passed as a first parameter to it. In contrast wait returns the objects out of order we need to manually keep track of which result corresponds to which Task.
  • gather by default will return when an exception is raised by any of the tasks, or whenever all tasks are done if no exception is raised. As we’ve seen, if return_exceptions is True it will return when all tasks are “done” even if some raised an exception. Conversely wait has a specific parameter return_when that can be one of FIRST_COMPLETED, FIRST_EXCEPTION or ALL_COMPLETED allowing us finer control on when it returns.
  • Finally, wait includes a timeout argument, gather does not have it but it’s possible to combine it with wait_for to mimic that behaviour.

Additionally, asyncio includes as_completed which returns an iterator of futures you can await as they are done. I cover both wait and as_completed in my other article Asyncio for the Working Python Developer if you’re interested.

Why do I mention all this? Because we’re going to be needing it very soon.

Cancelling tasks

It may not look like it, but we’re sending potentially thousands of requests to the HN API, more if we were to increase the number of top stories to get comments for, even more if any of the top stories happen to be controversial. We’re basically DDoS-ing the poor thing. Up until now it’s been fine with it but if we abuse it we may be getting some 429 Too Many Requests or 420 Enhance your calm.

So, ideally, as an exception is raised we should avoid making things worse for ourselves and cancel any scheduled tasks. Yeah, you heard me: you can cancel the tasks!

… but you need a handle on them, ideally we need handles as soon as an exception is raised. Now, as we mentioned before, gather returns the results of the tasks, but wait returns the Task objects in two tuples, done and pending.

Let’s give wait a go:

[17:20:04] Calculating comments for top 5 stories. (1)
[17:20:04] Waiting for 5 seconds...
Post ??? has 13 comments (1)
Error retrieving comments for top stories: BOOM!
[17:20:07] > Calculating comments took 3.79 seconds and 501 fetches
[17:20:09] Calculating comments for top 5 stories. (2)
[17:20:09] Waiting for 5 seconds...
Post ??? has 13 comments (2)
Post ??? has 82 comments (2)

Error retrieving comments for top stories: BOOM!
[17:20:13] > Calculating comments took 4.05 seconds and 501 fetches
[17:20:14] Calculating comments for top 5 stories. (3)

A few things to note on this example. First off the signature for wait is different, it accepts a list of Task objects so we don’t need to unpack the list as we were doing before with gather.

Secondly, we’re passing return_when=FIRST_EXCEPTION, specifically requesting it to return the two sets as soon as there’s an exception, at that point there will be pending tasks so we cancel them. If none are raised the second set will simply be empty. That’s why we managed to stop immediately after hitting our upper limit for fetches (500 in this example).

However, remember done != successful, if an exception was raised then one and only one Task object’s result is an exception, and calling result on it will raise it, so we need to be prepared to catch it.

Now for the caveats, notice the ??? in the post numbers? that’s because up until now we’ve been relying on gather returning the results in order, but we’ve lost that using wait.

What we need to do is keep track of which Task object corresponds to each post ID. Up until now we’ve been allowing gather and wait to create these Task objects from coroutine objects, but we can actually create the Task objects ourselves and pass the list to wait.

[17:35:33] Calculating comments for top 5 stories. (1)
[17:35:33] Waiting for 5 seconds...
Error retrieving comments for top stories: BOOM!
Post 15051645 has 86 comments (1)
Post 15052192 has 16 comments (1)

[17:35:37] > Calculating comments took 4.06 seconds and 521 fetches

As you can see we can now get the post ID corresponding to our completed Task object from the set returned by wait.

Notice how we’re creating Task objects using ensure_future (we could’ve also used loop.create_task) and then combining them using wait. Keeping references to our Task objects can be quite handy.

Handling cancellation

Since we’re talking about cancelling tasks, the actual workflow is quite interesting. When task.cancel() is called a CancelledError exception is sent to the coroutine. The coroutine can actually catch that exception and act accordingly, it may even choose to ignore it.

In our case we wouldn’t want to ignore the cancellation entirely, but it might be useful to catch the exception to cancel all child tasks if we have any.

[18:29:54] Calculating comments for top 5 stories. (1)
[18:29:54] Waiting for 5 seconds…
Post 13851386 has 1 comments (1)
Post 13851349 has 2 comments (1)
Error retrieving comments for top stories: BOOM!
[18:29:55] > Calculating comments took 1.76 seconds and 151 fetches
[18:29:55] Comments for post 13851706 cancelled, cancelling 3 child tasks
[18:29:55] Comments for post 13852103 cancelled, cancelling 1 child tasks
[18:29:55] Comments for post 13851611 cancelled, cancelling 2 child tasks

[... more messages like the above ...]
[18:29:59] Calculating comments for top 5 stories. (2)
[18:29:59] Waiting for 5 seconds…

The lesson here is that if there’s a chance that your coroutines can be cancelled remember you can catch the CancelledError and do any clean up that’s necessary. This also highlights the usefulness of storing references to any Tasks you schedule.

I find the solution quite elegant and Pythonic, exceptions are at the core of the language and still are even on a complicated feature like cancelling asynchronous tasks.

Conclusion

And that’s all I have on asyncio, I hope these two articles have satisfied your hunger for coroutine knowledge and you’ve learned a few ideas to keep in mind while working with asyncio.

As you can see there’s quite a bit of housekeeping to perform, especially in situations where you want or need to deviate from async/await and start using different patterns. Asyncio has been subject of detailed analysis from very clever people and sparked third party libraries that try a different approaches or aim to paliate our suffering.

Personally I think it is not without its flaws, but it’s had a tremendous impact in the community to embrace asynchronous programming. It’s really in its infancy and the core team is hard at work adding new features and polishing the API while the ecosystem around it has flourished and it’s only getting better and better.

I believe asyncio has, without a doubt, pushed Python to the next level.

One clap, two clap, three clap, forty?

By clapping more or less, you can signal to us which stories really stand out.