Data Orchestration using Prefect and RabbitMQ

Manjunath Hegde
CivicDataLab
Published in
9 min readAug 8, 2023

At CivicDataLab, we use data pipelines to streamline the tasks such as data acquisition through techniques like web scraping, followed by data transformation and loading into appropriate formats within databases or files, all to facilitate subsequent analysis. The challenge with the explosion of data, however, is not just obtaining it, but also efficiently managing it. Data pipelines are taken into account when regulating data flow. They aid in the collection, management, and storage of data in a form that allows for analysis and the extraction of insights. They also provide a way to reduce the number of manual interventions.

In this article, we’ll look at how to build a data pipeline with Prefect and RabbitMQ.

Prefect, a workflow management system, supports the construction, planning, and administration of data pipelines. It offers a simple user interface that makes it simple to create complex workflows. Furthermore, because Prefect is a Python module, setting up the pipeline is very Python-friendly.

RabbitMQ, on the other hand, is a message broker that facilitates the decoupling of system components. It aids in the management of message queues and exchanges and provides a dependable, scalable mechanism for transmitting messages between system components.

Prefect and RabbitMQ offer a potent method for creating scalable, dependable, and simple-to-manage data pipelines when used together. While Prefect allows us to create data pipelines, a limitation arises when attempting to scale these pipelines across multiple servers. Solely relying on Prefect would prove inadequate for this purpose. To overcome this challenge we would need message queues like Rabbitmq. Here Prefect would take care of running the tasks sequentially while the message queue looks after delegating the work to the write worker across the cluster of servers.

Above all, RabbitMQ supports several languages including Python, Java, C++, Golang etc. Thus, by using Prefect and RabbitMQ together, we can overcome language dependency as well. Let’s look into an ETL (extract-transform-load) — pipeline example to understand this architecture better.

Requirements to set up the pipeline:

  1. Python — 3.8 or later
  2. RabbitMQ — 3.11
  3. Prefect
  4. Pandas
  5. ec2 instances with Linux OS (Note: It is recommended to have an ec2 — t2.micro — with Ubuntu 18.04 or higher)

Server setup

First, we need to install RabbitMQ on two servers (Here onwards the words — ec2 instance, server and node will be used interchangeably). One server is used as a publisher and the other is used as a task executor(Worker). This way we would be able to scale our tasks across multiple servers.

Image showing the communication between various servers over a message queue
Servers communicating with each other over a message queue

Assuming that the 2 servers are ready, follow these steps.

  1. On server-1 and server-2 do sudo vi /etc/hosts
  2. In the file add a line in the format — private_ip1 node-1 and private_ip2 node-2 (pivate_ip1 and private_ip2 are the private IPs of server-1 and server-2 respectively) after localhost entry which would be there already. Private IP can be found in the AWS console. Save and close the file.
Image showing the contents of /etc/hosts file to cluster RabbitMQ nodes together.
Contents of /etc/hosts file

3. Once these lines are added do — sudo hostnamectl set-hostname node1 — static in server-1 and sudo hostnamectl set-hostname node2 — static in server-2. This would make it easier to refer to the servers while clustering. Else we need to refer to our servers with our private IPs which are not that readable.

4. To install Rabbitmq copy the contents of this shell script into a file named rmq.ssh in both servers and run sh rmq.sh — this will install rabbitmq in both servers. Installation can be verified by running

sudo rabbitmqctl --version

which will show the version of the rabbitmq installed.

An image showing the current version of RabbitMQ running on the server
RabbitMQ version running on the server
  1. Now we need to cluster these two nodes. Clustering is needed to logically group several nodes, each sharing users, virtual hosts, queues, exchanges, bindings, runtime parameters and other distributed states. This is needed for establishing a communication channel between the publisher and the workers. To do this we need to ensure that the same erlang cookie is present on both the nodes. The steps to do this are as follows:
  2. copy the contents of the file /var/lib/rabbitmq/.erlang.cookie in node1 to the file in the same location in node2.
  3. In node2 the cookie would be different. It needs to be replaced with the cookie of node1.
  4. By doing this, we are making node1 a publisher node and node2 a subscriber node. If we need to add more nodes to the cluster, say node-3 and node-4 the cookies of these nodes need to be replaced by the cookie of node-1.
  5. Now we need to restart the rabbitmq service on node-2 and stop the app on node-2. This can be achieved by running the following commands.
sudo systemctl restart rabbitmq-server
sudo rabbitmqctl stop_app

6. Now cluster both nodes by running

sudo rabbitmqctl join_cluster rabbit@node-1

Here, the IP address will be picked up from the hosts file that we edited in step 1.

7. Now start the app in node-2 by running

rabbitmqctl start_app

8. Clustering can be confirmed by running the following command.

sudo rabbitmqctl cluster_status

The command shows the entry of node1 in node2 and vice versa as shown in the following image.

Image showing the cluster status of the RabbitMQ nodes
clustering status of RabbitMQ nodes

A simple ETL — pipeline

Let’s take an example of a simple ETL operation as a use case.

  1. For — E i.e. extraction, we will use the colors CSV. This file contains 3 columns namely — Name(Name of the color), HEX(Hex. Code for the color) and RGB (values of r,g and b components for the color).
Image showing the first 5 rows of colors- csv file
Image showing the first 5 rows of colors- CSV file

2. As part of T — transformation, we will perform an operation to skip a column — HEX.

3. As a part of — L — loading we will load the final result as a new CSV file in the local file system of the worker server.

Publisher

As the name suggests, a publisher is the entity that publishes the tasks in the queue. These tasks are picked up by the corresponding workers. Publishers will be on the server — 1 and the workers will be running on server-2

We will be creating a data pipeline using Prefect to ensure that tasks — extract, transform and load are run one after the other. Let’s define a Prefect flow named — etl_flow and 3 prefect taskspublish_extract, publish_transform and publish_load. Though the tasks are defined, we still need a publisher code that can publish a task to the RabbitMQ queue.

Message template

Each message published by the publisher needs to follow a template. Having a template for publishing a task is useful as it ensures that there aren’t many ways in which each task needs to be defined. All tasks can be published using the same template. It’s at this stage we can decide how the message should look like.

When a message is published, a worker should pick the message up to process it. But how does a worker know which message to pick? This can be done based on the name of the task (i.e. extract — worker should pick the message that has extract). So, both publisher and worker should agree on a key using which they can communicate. This is called — binding key in RabbitMQ’s terminology.

An image showing ETL workers picking messages published by the publisher in the queue based on binding key
ETL workers pick messages in the queue based on binding key

The task will also be needing the context with which it can act upon the data. So our message should consist of the context which contains the parameters required for the task. All the messages should be published in the string format.

Once we agree on these terms we can now look at publisher code.

Publish and Process

Code to publish messages and get responses from workers

The code above serves two main purposes:

  1. Publishing task.
  2. Getting the response from the corresponding worker.

Here we define a class, by instantiating which, we can connect to the Rabbit-MQ’s localhost. The parameters that the constructor takes are — task name, context and data. The task name is used as the binding key so that the workers can pick the tasks corresponding to them.

There are two class methods defined. One is the — call method — which is responsible for publishing the message. As it’s seen in the code, the call method first sends a message — get-ack just to see if the worker is alive. After pulling the message there’s a 2 seconds waiting time (process_data_events). After this, if the response is None — it’s assumed that there is no worker to process the message and the control is returned.

Otherwise, the actual task — i.e. the context and the data are converted into JSON-string and published. Now, there’s no limit to the waiting time.

Hence it is important to make sure that the worker returns something (processed data/error message) properly to the publisher to avoid a dead-lock situation.

Now, let’s use the above code with the Prefect tasks.

Prefect Tasks

As stated earlier we will define a flow by name — etl_flow which calls 3 prefect tasks — publish_extract, publish_transform and publish_load one after the other.

Prefect flow for ETL

In the code, the flow(etl_flow) — runs three tasks — publish_extract, publish_transform and publish_load — sequentially.

Note: Host the publisher in server-1.

All the published tasks are picked up by the corresponding workers. Now let us code the workers.

Workers

Extract — worker code

extract worker

Now, this is a demon process. To start the demon, run the following command in a terminal.

python extract_worker.py

If there’s a message with the binding key — extract — it picks up the message and processes it.

If the message is “get-ack” — then the worker sends a “worker alive” message upon receiving which the publisher sends the actual message i.e. the task.

Then from the task message, context is retrieved and then the task is executed — here it is reading data from the URL into a data frame and saving it in a CSV file. The file name is then sent back to the publisher for further processing. If there’s an error while carrying out the task, an error message is returned to the publisher.

The other two tasks have the same template except for their task-level logic.

Transformer — worker code

transformer worker

The transformer receives the source file name and the column to be skipped from the context and stores the result in another file. This file name is sent back to the publisher for further processing.

Run the above demon in a separate terminal

python transform_worker.py

Loader — worker code

loader worker

load_worker receives the source file name, the format to which the data needs to be converted and the resultant file name in the context. Using these, the result of the transformation is stored in the specified format and the specified file. The file name in which the output resides is sent back to the publisher.

Run the above demon in a separate terminal.

python load_worker.py

Note: Host all these workers in server-2.

Executing the pipeline

Now, given that our publisher and workers are ready, and hosted in respective servers, we can run our pipeline. All the worker demons need to be started.

Running the pipeline is just triggering the publisher on server-1 by running the following.

python pipeline.py
Image showing the console output after running pipeline.py.
Console output after running pipeline.py

The content of out_file.csv in node-2 is as below.

Image showing the contents of out_file.csv
The first 5 rows of out_file.csv

In the image above we can see that the column HEX is dropped.

Possible Enhancements

This is just a basic example showing how we can design a pipeline that is distributed across multiple servers. The pipeline can further be improved with the following features.

  1. User-Friendly UI with Django Integration — A user interface (UI) can be developed using the Django framework to interact with the pipeline. A dashboard that displays the pipeline’s current status, progress, and any potential errors can be created.
  2. API Integration for Pipeline Control — An API layer can be implemented that allows users to trigger the pipeline remotely. Users can initiate pipeline tasks through API requests, enhancing automation and flexibility.
  3. Detailed Logging and Analytics — Logging mechanisms can be enhanced to capture detailed information about each step of every pipeline.

We hope that this guide is a useful resource for anyone who wants to learn more about using these tools to create and manage data pipelines. If you have any suggestions on how we can improve this further, please write to us at info@civicdatalab.in.

--

--