We added an AI cat “checker” to our “CatLoader” project using this trick.

Let Flask apps fly with “fire & forget”

Defer slow tasks via threaded calls.

Rinaldo Nani
Variance Digital

--

Using Flask, you will sooner or later hit the situation shown below:

@app.route('/my_call')
def my_call():
...
a = do_something_fast();
b = other_fast_task();
...
heavy_and_slow_task() #<-- not nice return render_template('my-page.html', a=a, b=b)

This code, triggered when the /my_call endpoint is called, tries to deliver the my_page.html web page after completing a series of steps. In doing so, the code stumbles over the heavy_and_slow_task() task, and the caller has to wait [for a potentially long time] until the job completes.

One could try executing the heavy_and_slow_task() function in the background and asynchronously to help the call return quickly; there are many ways to achieve this, such as:

  • using async Python code [complicated];
  • using queueing mechanisms, like Redis Queue or Celery [require additional packages and know-how];
  • using pub/sub mechanisms of SQL databases [may not be portable between database engines].

In this article, we suggest a simple alternative to the above methods. In a nutshell: defer the heavy_and_slow_task() by wrapping it into a Thread call.
As simple as that.

The suggested solution is “fire & forget” because when the cumbersome job is “fired”, the subsequent code runs immediately without waiting for further acknowledgements.

Sections 2 and 3 below show two concrete examples of using “fire & forget” in a Flask project.

The “fire & forget” solution comes in three different options.

  • OPTION 1 - The heavy_and_slow_task() is deferred to an external HTTP call, i.e. an HTTP service that runs in a separate site; the call of this external HTTP service is spawned in the wrapping thread.
  • OPTION 2 - The heavy_and_slow_task() is deferred to an internal HTTP service; in this case, an endpoint in the same Flask application will execute the cumbersome task.
  • OPTION 3 - The heavy_and_slow_task() is not deferred to any HTTP service: it is executed directly in a separate thread within the same “process” of the Flask application.

Depending on your scenarios and needs, you may choose one of the three options to distribute the computation load and gain speed.

1. The “fire & forget” with the HTTP call (Options 1 and 2)

The idea behind the “fire & forget” solution for the first two options is this: instead of calling the heavy_and_slow_task() function directly, call a custom-made HTTP service that implements the heavy-duty job. But this is only half of the trick: if Flask has to wait until the HTTP service is completed, there would be no benefit in moving the blocking code behind an HTTP endpoint, right?

The other half of the trick: wrap the HTTP call inside a Python thread.

Here is how the whole thing works in practice. The code snippet from the opening paragraph becomes:

@app.route('/my_call')
def my_call():
...
a = do_something_fast();
b = other_fast_task();
...
defer_heavy_and_slow_task() #<-- much better now! return render_template('my-page.html', a=a, b=b)

Then comes the snippet defining the defer_heavy_and_slow_task() function, along with the actual mechanism for the HTTP call:

import requests
from threading import Thread
def request_task(url):
requests.get(url)
#CALLING "async": CREATE THREAD
def defer_heavy_and_slow_task():
service_url = 'https://my.site.com/heavy_duty_url'
Thread(target=request_task, args(service_url,)).start()
#note THE EXTRA ,

The main actors of this code are highlighted.

The first is the obvious one: the requests.get() instruction. As we moved the heavy-duty stuff behind an HTTP service, the service must be called somewhere.

The second hero is Thread().start(). As you can see, the request_task() function is wrapped in a Python thread thanks to the Thread().start() call. Note that the first parameter of Thread(...) is the name of the function managed asynchronously, i.e. request_task, while the URL of the service API is the second parameter. Other parameters could be passed to the request_task function if needed.

As said above, wrapping requests.get(url) inside a separate thread is crucial: if it were not for this wrapping, the requests.get(url) would perform synchronously, and the Flask execution would have to wait for its completion.

The requests.get() is one of the valuable methods of the powerful “requests” external Python package. This package must be installed in the runtime environment. Use pip with
pip install requests
to install it, and
pip freeze > requirements.txt
if you need to update the app’s requirements.

The last piece of the “fire & forget” puzzle is the HTTP service which executes the heavy_and_slow_task(). This can be easily implemented with the Flask code snippet below.

@app.route('/heavy_duty_url')
def heavyjobapi():

heavy_and_slow_task()
return {'data':'OK', 'error':0}

Figure 1 shows the first two schemes for the “fire & forget” pattern.

Fig. 1 — Architecture for “fire & forget” (options 1 and 2)

In the first option, a single Flask application exposes both the main call(s) and the route for the heavy-duty service; thus, it contains the code for both core functions plus the heavy_and_slow_task().

In the second implementation, all the code that must be executed asynchronously is moved into an ad hoc Flask application which runs separately. In this case, the database of the main Flask app is “shared” and is accessed by the ad hoc application.

2. An up-and-running example (for OPTION 2)

In our “CatLoader” demo project, we used a simple “fire & forget” pattern to implement an image-checking job called “Cat checker”.

Fig.2 — The “CatLoader” app and Cat Checker component

The “CatLoader” app lets its users upload images taken from the camera or from the file system (go here for a demo). The app also validates each uploaded image to check if it portrays a feline subject; the validation task is delegated to the “Cat Checker” service.

Figure 2 shows how the “CatLoader” app and the Cat Checker component cooperate using the “fire & forget” architecture; the “Cat checker” is an excellent example of a heavy and slow task.

These are the steps that the “Cat checker” needs to do:

  1. get from the “CatLoader” database the list of all non-validated images, obtaining the cloud repository URLs of each image (and its thumbnail);
  2. for each image of step 1, analyse the image (and its thumbnail) to check if it has cats in it; if this is the case, the image is valid — otherwise, it’s not;
  3. update the database, saving the validity for each image;
  4. log the operations for future analysis.

Every time that the "CatLoader” API needs to validate the uploaded images, it calls the non-blocking upl_check_cats_async(); here is how this function is defined:

#CALLING CATCHECKER: request
def request_task(url):
requests.get(url) #External URL
#CALLING CATCHECKER "async": CREATE THREAD
def upl_check_cats_async():
threading.Thread(target=request_task, args=
(os.environ["CATCHECKER_URL"],)).start()

The request_task is called, receiving the URL of the Cat Checker service as a parameter. The Cat Checker, a stand-alone Flask application running on a different server than the “CatLoader” app, will be triggered via the HTTP GET of the request.get() command.

Our Cat Checker service is a Flask Application that uses Microsoft Computer Vision Image Analysis service to validate images. If you want to know how this is done, read the article below.

3. Another example (OPTION 3)

Suppose that one of the tasks of your Flask endpoint is devoted to deleting a large number of images from a cloud repository. Whichever technology you are using, deleting many pictures on the cloud — one by one — is a cumbersome task.

We had a similar scenario when deleting a user’s account from the “CatLoader” application. The endpoint/view is shown in the snippet below.

@bp.route('/deleteaccount')
@login_required
def deleteaccount():
images_to_be_removed= db_delete_account(g.user_id)
user_logout(...)
upl_delete_s3_files_async(images_to_be_removed) #Fire & forget
flash("Account deleted")
return redirect(url_for('bl_backoffice.listuploads'))

The code shows the fast db_delete_account() function followed by the potentially very slow image deletion task. Again, one good solution is using the “fire & forget” pattern; this time, we have chosen its third version. Figure 3 shows a sketch of what is going on.

Fig. 3— Architecture for “fire & forget” (option 3)

The upl_delete_s3_files_async() definition is shown below: it wraps the request to remove the images in a Thread().

#Fire & forget pattern (by without HTTP call)def request_remove_images_in_recordset(images):
for record in images:
awskey = record['img_filename']
awskey_sq = record['img_th_filename']
bucket_name = os.environ["AWS_BUCKET_NAME"]
bucket_sq_name = os.environ["AWS_BUCKET_SQ_NAME"]
delete_files_from_s3(awskey, awskey_sq,
bucket_name,bucket_sq_name)
#REMOVE IMAGES FROM S3 "async": CREATE THREAD
def upl_delete_s3_files_async(images):
threading.Thread(target=request_remove_images_in_recordset,
args=(images,)).start()
#note THE EXTRA ,

The thread will asynchronously cycle through the set of images that must be removed from the repository and call the delete_files_from_s3() function to delete each file.

The CatLoader Flask application uses AWS S3 buckets as repositories for user images and the boto3 Python package to manage the S3 buckets. Please look at our CatLoader API and HTML Flask application (specifically the file s3_operations.py) to see the details of our implementation.

4. FAQs — TL;DR :)

> Where can I get the complete code examples?

  • Please feel free and get the code from these public GitHub repos: here for the Cat Checker HTTP service (which implements the asynchronous heavy-duty function, checking all uploaded images). We are going to disclose the “CatLoader” app code very soon. Stay tuned!

> Are there demos running somewhere?

  • Sure! The “CatLoader” demo site is here.

> How does the Cat Checker work?

  • Read this article to learn more.

--

--