Asynchronous Feature Extraction — 1

Sushant Chaudhary
Feb 11 · 7 min read

Introduction

At Attentive AI, a major chunk of our tech resources are aimed towards providing our customers with highly accurate map features in the shortest time. These features could be roads, building footprint, trees, parking lots, swimming pools, etc. This list is inexhaustive, as we are constantly adding more features to our catalog in order to create more solutions for organizations that are looking for highly specific map features. To ensure our goal is achieved in a seamless manner, our machine learning team and engineering team have worked side by side to create an ML-powered feature extraction pipeline that delivers feature outputs asynchronously through API endpoints.

Goal

Our objective was to divide our different feature extraction services into independent components that act as computational black boxes, which can be invoked remotely given the appropriate parameters. Each remote call (an API endpoint) sends a message from the end-user which contains the parameters needed to perform the computation for a particular task (or set of tasks). The black boxes perform the complex computation asynchronously so that multiple clients can send multiple remote calls and the delivery of outputs is guaranteed (as long as the input data is valid). This can be achieved by using the broker pattern with the help of a task-queue handling system using RabbitMQ as the messaging broker and Celery for the task-queue implementation. This pattern bypasses the standard HTTP Request-Response cycle as the clients do not have to wait for an individual request to finish before sending more.

What is a broker pattern?

The broker pattern is an architectural pattern that can be used to structure distributed software systems with decoupled components that interact by remote service invocations.

What is RabbitMQ?

It is a messaging broker. Let's say a client (producer) produces a structured message describing certain information that needs processing. This is sent to the RabbitMQ server (broker) which stores the message temporarily in a queue. On the other side of the broker, there could be multiple clients (consumer) listening for messages stored in the broker. If a consumer’s required configuration matches any of the current messages stored within the broker, the broker forwards the information to the consumer and the consumer starts processing the result. The broker keeps track of all the consumers listening to it at any given moment, so it can give status reports for the client’s requests whenever the client asks for it. Once the result is generated, it gets stored in a results back-end which gets forwarded to the client. Here’s a graphic to explain the flow:

Image source: https://tests4geeks.com/
  • A producer could be a client that sends the request received from an API endpoint to the broker.
  • A broker stores the request as a structured message. It is attached to multiple consumers, and whenever it finds an idle consumer (not processing anything) that matches the appropriate configuration, it forwards the message to it.
  • A consumer is an instance of programming logic (could be a function) that starts working whenever it receives a message from the broker.

What is Celery?

It is a task-queue implementation in python. It is generally used to run scheduled tasks and long duration background jobs. For example for the first case, let’s say there’s a website that emails its users a newsletter every Monday morning at 9 am. Celery can be used here to create asynchronous scheduled tasks that automatically get called at the given time. An example of the second case could be a Celery task which can be invoked asynchronously to perform long-running jobs like processing huge amounts of data, or doing a complex calculation that requires high computing power. A Celery client can be used both as a producer and consumer.

Implementation Example

Problem Statement

We want to deliver a super complex computational task (a function that takes two integer arguments and adds them) to our clients and we want it to work asynchronously.

Setup

DISCLAIMER: Availability of a Linux based system is assumed.

First, we need to set up our RabbitMQ server which will be used as our broker. There are multiple ways to configure the RabbitMQ server. The easiest way is to run a docker container of the official RabbitMQ image. A good primer on how docker images work is available on FreeCodeCamp. We’ll be using docker and docker-compose quite generously as the purpose of this series is to explain Celery use cases and not the inner workings of RabbitMQ. Now you’ll need to create a rabbitmq.conffile and a Dockerfile.

The project structure could look like this:

.
├── adder_task_app.py
├── celery_conf
│ ├── celeryconfig.py
│ └── __init__.py
├── docker
│ ├── config
│ │ └── rabbitmq.conf
│ └── dockerfile-rabbit.df
├── docker-compose.yml
└── send_task_app.py

dockerfile-rabbit.df (custom Dockerfile)

FROM rabbitmq:3.7-management

rabbitmq.conf

default_vhost=test

Then we’ll run the official Postgres image which will be used to store the results of our Celery tasks. Upon connection with the Celery worker, Postgres will automatically generate relevant tables for Celery tasks under the mentioned database. Combining the back-end and broker, we can make a docker-compose file.

docker-compose.yml

version: “3”

Go to the terminal and cd to project root (or to whichever location your docker-compose.yml is located). Enter the command docker-compose build to build the required images on your system. After the build, enter docker-compose up to get your containers up and running. Now you have the broker and back-end configured and ready-to-use.

Celery Application

Now that we have our broker up and running, we can create our Celery task that adds two integer arguments and returns the output. Firstly, we’ll need to create a celeryconfig.py with the following content:

celeryconfig.py

broker_url = “amqp://guest:guest@localhost:15672/test”

The first two configuration options are self-explanatory. Here, worker_concurrency means the number of concurrent workers you want to enable for the client. If not mentioned, Celery uses the default number of cores available on the system as the number of concurrent workers. This config will help us to create Celery applications (clients) for use on both the producer side and consumer side. Let’s create a file called adder_task_app.py which will act as a minimal client that has a task which adds two numbers and returns the output. We’ll use the Celery decorator @task to mark the function as a Celery task. Now, we create a Celery application using the config we made earlier.

adder_task_app.py

from celery import Celery

Now that we have our app setup, we can add a task to it.

from celery import Celery

Our task is ready. We can use the celery worker command to get it up and running so that it can listen for messages from the broker. Make sure your docker-compose is up and running from before. Enter this on terminal to get the consumer up:

celery worker -A adder_task_app --loglevel INFO

output of celery worker
output of celery worker
Terminal output of celery worker

Celery has connected to the services mentioned in the config, and it is clear that our task has been registered. In order to invoke this task asynchronously, we’ll create a Celery application for our consumer. One way of calling the task is by using the Celery send_task function. Repeating our config setup from above, we’ll create another Celery app and use it to call the send_task function with the information needed to call our adder function. Add this to your send_task_app.py file:

send_task_app.py

from celery import Celery

Here, the first argument of send_task is the registered name of the task. This can be referred to in the celery worker output under [tasks]. For our adder app, the name of the registered task is adder_task_app.adder and it requires two arguments. The second argument of send_task is a tuple of inputs that will go to the actual adder task as its arguments. Running this python file on terminal will help us invoke our first-ever invocation for our adder task. This can be visualized on the running celery worker here:

Terminal output of celery worker

And out.get() will give us the result of the task, on the producer’s end.

That’s it. You have just built your first Celery task with the bare minimum configurations. In the next part of the series, we’ll dive deeper into the different ways to invoke and get status reports of long-running tasks.

Source files: https://github.com/attentiveai/engineering-blog-1/tree/master/part-1


P.S. We are looking for developers to join our team! If you’re interested in creating innovative solutions for the geospatial industry, connect with me or visit our career section on LinkedIn.

Attentive AI Tech Blog

Writing articles on artificial intelligence, GIS and the intersection of the two.

Sushant Chaudhary

Written by

Attentive AI Tech Blog

Writing articles on artificial intelligence, GIS and the intersection of the two.

More From Medium

More from Attentive AI Tech Blog

Related reads

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade