ETL with Argo Workflows
Using Argo workflows with Kubernetes for Data Engineering
Retailo is one of the fastest growing startups in the MENAP region that serves thousands of retailers in Pakistan, Saudi Arabia and the UAE. The Retailo App allows retailers to browse through thousands of products, make instant price comparisons and get them delivered the next day. Since the company’s inception in 2020, we’ve seen tremendous month-on-month growth.
In order to support the increasing number of data initiatives at Retailo, we decided to invest in laying out a standardised infrastructure that could support our data workflows. For our implementation, we decided to deploy on EKS which is a managed Kubernetes service offered by AWS.
This article assumes you have working knowledge of Docker and Kubernetes
Argo workflows
Argo Workflows is an open source container-native workflow engine for orchestrating parallel jobs on Kubernetes.
Argo workflows allow you to setup a sequence of jobs in the shape of a directed acyclic graph (DAG). These workflows are executed in a step-by-step manner on your Kubernetes cluster. Argo Workflows being container-native, allows users to run containers at each step of the workflow.A lot of additional options are provided to improve the overall developer experience and precisely capture the intent of the workflow. These options include workflow parameterisation, artefact exchange and exit handling. At Retailo, we use Argo to create workflows that serve the needs of our data analysts and scientists. Our workflows have a range of applications such as SQL transformations using dbt in our warehouse, data ingestion from 3rd parties and training machine learning models.
Argo is cloud agnostic and can run on any Kubernetes cluster. Argo also has a powerful CLI and UI built-in. The CLI can be used to remotely administer workflows. Argo UI is built on top of the Argo server and provides different options to create, run and inspect workflows through the GUI. Using Argo’s UI our Data Analysts and Data Scientists can easily administer their mission-critical workflows directly from their favourite browser. Another benefit of running Argo in production is the extensibility that the platform provides. Monitoring is one of the most important aspects of ensuring good data governance and Argo allows you to customise alerting based on your needs. In our configuration, we have Slack notifications that notify us on the status of our workflows so that we can troubleshoot them in a timely manner.
If you have kubectl set up, you can get started with argo using the following commands. This will set up Argo server in the argo namespace on your Kubernetes cluster.
kubectl create namespace argo
kubectl apply -n argo -f https://raw.githubusercontent.com/argoproj/argo-workflows/master/manifests/install.yaml
What comprises of a workflow?
The
Workflow
is the most important resource in Argo and serves two important functions:It defines the workflow to be executed.
It stores the state of the workflow.
Workflows are defined in YAML format and are intuitive and easy to understand. Below is an example hello-world
workflow in Argo.
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hello-world
spec:
entrypoint: whalesay
templates:
- name: whalesay
steps:
-- name: run-whalesay-1
template: run-whalesay
arguments:
parameters:
- name: message
value: "hello world"
-- name: run-whalesay-2
template: run-whalesay
arguments:
parameters:
- name: message
value: "hello again, world"
- name: run-whalesay
inputs:
parameters:
- name: message
container:
image: docker/whalesay
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
To define an argo workflow, we will populate the workflow.spec
field. First, we define an entrypoint
for the workflow.Templates are the most important constituent of the workflow and can be defined as modular components that describe what each step in the workflow shall achieve. Here, we are defining a template
by the name of whalesay to be the entry point for this workflow. The steps field can be used to defined a sequence of templates for each step of your workflow. For now, we have created two steps for this workflow. Both of the steps are similar in their function except for the input parameter that is passed to it. Both steps use therun-whalesay
template that is defined at the end of the file. However, each time that the whalesay container is run, it will be passed a different inputs.parameters.message
based on the parameter defined in the steps respectively.
While the template type used in this example is container
for both steps of the workflow, users can also use script
template to define code directly in the workflow definition file. resource
template can also be used as part of the workflow to perform various operations directly on the Kubernetes cluster e.g. spinning up deployments etc.
How we use Argo Workflows for ETL?
Let’s say we want to run an ingestion job that fetches data from a data source in Google BigQuery and loads the data into Amazon Redshift at an hourly interval. This time the workflow will be of the kind: CronWorkflow as it will run on a fixed schedule. We use AWS ECR as our container registry and once the code has been completed, the docker image is then pushed to it’s repository on ECR.
We can now start to define the workflow that will run the container from the image that we have created previously.
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: bigquery-load
spec:
schedule: "0 * * * *"
timezone: "Asia/Karachi"
failedJobsHistoryLimit: 1
successfulJobsHistoryLimit: 1
entrypoint: bq-load-job
templates:
- name: bq-load-job
container:
image: <private>.ecr.amazonaws.com/bq-load-job:latest
command: [python]
args: ["bigquery-load.py"]
Under the spec field, we can define some useful fields to configure the running of our workflow. We have defined a cron schedule that runs the specified template every hour at the 0th minute mark. The failed and successful JobsHistoryLimit fields limit the number of pods that persist after the workflow has completed both successfully or with failure. Once the workflow configuration file is complete, the workflow can be simply created with the following Argo CLI command and your workflow will run at the next specified interval.
argo cron create -n your-namespace bigquery-load.yaml
The entry point here is defined as template of type container
. bq-load-job template fetches the image from our ECR repository and initiates the workflow by running the python file bigquery-load.py. The python program then executes and fetches the data from BigQuery and loads it into Redshift.
In addition to the fields defined in this spec, Argo has a lot of other options that data engineers can benefit from. For some of our workflows, it is also useful for us to define the cpu
and memory
request and limit in the workflow configuration. inputs
can also be configured to pass in command line arguments to a container in a particular step in a workflow. Workflows also have support for parallelism
which defines the number of pods to be run concurrently for a step in the workflow.
With a singular set of rules that dictate how complex data extractions and transformations can take place within our stack, there is much less dependence on any one individual to support the workflows he/she has created as anyone on the team can read through a workflow configuration file and quickly understand the different components of that particular workflow. To this end, the team is quickly able to resolve any data related issues in production and the pace of innovation never slows down.