Argo Workflows as the choice to run data pipelines in Casavo

Nicolo' Lino
Casavo
Published in
8 min readDec 21, 2021

--

image credits akuity.io

Once upon a time…

Right from the start, Casavo has focused on technology and the entire company relied on the Data Team to bring a quality set of data to build a base for all the business processes, decisions and to bring to life new amazing features.

To do so, the Data Team Extract, Transform and Load lots of data used to make business decisions or to train ML models to compare and/or evaluate properties.

But the old data infrastructure in Casavo consisted of docker containers running on a Linux VM at different hours (cron jobs).

What if we have to handle dependencies between jobs?
What if we want to retry a job that fails or get notified about an event?
As you can imagine, this doesn’t scale well…

We needed a workflow engine that will easily schedule and run our data pipelines.
It will make sure that each task will get executed in the correct order, that gets the required resources, an easy way to handle errors and to notify us in case of success or failure of a workflow.

And we weren’t able in doing so in a proper way with that setup.
So we started looking around to understand what we can do to meet our needs.

What options do we have?

Base on that, we took into consideration Apache Airflow and Luigi.
Two platforms to programmatically schedule and monitor workflows.

We were also pretty confident to use Airflow and even tried it in a PoC but we found it hard to handle it in our infrastructure based on Kubernetes clusters and Docker.

Turns out that none of them are pleasant to use with Docker.
Airflow in particular requires the end user to do hacky things that distract the team from what they want to be doing: defining and running pipelines.

And, exactly when we were struggling with it, one of our DevOps told us about Argo Workflows!

What is Argo Workflows?

Argo Workflows is an open-source container-native workflow engine for orchestrating parallel jobs on Kubernetes (K8s).
It is implemented as a K8s custom resource definition (CRD).

CRDs are used to define custom API objects. It allows for the extension of the vanilla K8s-experience in a K8s-compliant fashion.

Also, the community is really active, with weekly update meetings and a lot of contributors working on issues and maintainers always there to discussing new ideas, problems, etc

That sounds great! It should fit our infrastructure perfectly :D
We will see how in a moment. First, let’s get familiar with some core concepts of Argo Workflows!

Workflow

The Workflow is the main resource in Argo and is used to define the workflow to be executed and store its state.

HelloWorld Workflow example

Template Types

There are 6 types of templates divided into two categories:
Template Definitions and Template Invocators

Template Definitions

This kind of template defines the work that has to be done

  • Container — most common template type, used to schedule a container (same spec as the K8s container spec)
  • Script — It’s like a wrapper around a container. It has also the same spec but adds the source field which allows you to define a script “hardcoded” in the YAML definition.
Script template example
  • Resource — define and perform operations on the K8s cluster. So it can be used to get, create, apply, delete, replace or patch resources on your cluster.
  • Suspend — A really useful template that can be used to suspend the execution of the workflow, either for a duration or until it is resumed manually (from the CLI, the API, or the UI)

Template Invocators

These kinds of templates, instead, are used to invoke other templates (template definitions)

  • Steps — This template allows you to define your tasks in a series of steps. The structure of this template is a “list of lists”. This allows us to run sequentially outer lists and in parallel inner lists.
    We can also set control the execution with when clause to conditionally execute a step

For example, here, we run step1 first. When finished, step2a and step2b will run in parallel:

Steps template example
  • DAG — With Dag templates, we can define our tasks as a graph of dependencies.

In this example A runs first. Once it is completed, B and C will run in parallel and once they both complete, D will run:

Dag template example

WorkflowTemplate

This kind of Workflow allows you to create a library of frequently-used templates and reuse them either by submitting them directly or by referencing them from your Workflows.

Template VS WorkflowTemplate

TL;DR
A template is a task within a Workflow or WorkflowTemplate
A WorkflowTemplate define a Workflow that can be referenced and used by other Workflows or WorkflowTemplates

The terms WorkflowTemplate and template have created an unfortunate naming collision and have created some confusion. But let’s try to describe them and clarify the differences:

A template (lower-case) is a task within a Workflow or a WorkflowTemplate under the field templates.
Whenever you define a Workflow, you must define at least one template to run. This template can be of the type we describe above (container, script, dag, steps, resource, or suspend) and can be referenced by an entry point or by a dag, and step templates.

A WorkflowTemplate instead, is a definition of a Workflow that lives in your cluster. Since it is a definition of a Workflow it also contains templates. These templates can be referenced from within the WorkflowTemplate and from other Workflows and WorkflowTemplates on your cluster.

WorkflowTemplate VS template

CronWorkflow

Is a workflow that runs on a preset schedule. They are designed to be converted from Workflow easily and to mimic the same options as Kubernetes CronJob.

In essence, CronWorkflow = Workflow + some specific cron options.

How we work with Argo Workflows

So, based on these concepts we decided to structure our project to separate the main elements we use:

  • crons: contains all the CronWorkflows
  • templates: contains all the WorkflowTemplates frequently used by workflows (e.g.: slack notifications, common pipeline actions, etc …)
  • workflows: contains all the Workflows that sometimes we need to run manually
  • resources: contains the K8s resources used by the application (e.g.: ConfigMaps)

The crons, templates and resources directories have subfolders that reflect our staging and production environment. This way, with the help of Kustomize we can build our YAMLs for the desired environment.
In our case, besides Secrets/ConfigMaps, the only difference we have right now between a staging and production workflow is the suspend field:
We want all the CronWorkflow in the staging environment to be suspended as we don’t want them to run periodically in a test environment.

Then, our CI builds the YAMLs using Kustomize and pushes them into another repository where ArgoCD (another project by Argo) is listening for changes and applying them in our K8s cluster.

Notifications

As we said, besides orchestrating our data pipeline, another feature we were missing is notifications: Notify us when a workflow is completed or it terminates with an error.

For a couple of workflows that run frequently (every hour), we decided to get notified only if the workflow failed to not create too much unnecessary noise.

What we did was create a WorkflowTemplate that accepts a channel, a title, an icon, the workflow status and the workflow failures (if any) as parameters.
We build the message using that information and in case of failure, we attach a link to our logs too.

The message is then sent to a Slack channel so we are always aware of our workflows status!!

NOTE: right now is under development the “plugin” feature that allows customizing the Argo Workflows experience without having to wait for new features to be taken over and released. So here for example we could build a “slack-notification” plugin instead of a template.

Retries

Talking of failures, another advantage we gain is that we can set a retry!

So, if a step in a workflow fails, we can retry it n times before throwing an error and stop the workflow.
This allows us to prevent the workflow from failing because of random errors like temporary network issues.

There are a lot of other things to know about Argo Workflows but this is a good first introduction.

CronWorkflow Example

After a review of what Argo Workflows is and how we use it in Casavo, wouldn’t be nice to see some working examples?
Unfortunately, sharing our workflows here would be inefficient (hard to read).

But we can build a simple CronWorkflow HelloWorld that runs every minute and send a message of completion to a slack channel.

First, let’s create our WorkflowTemplate to send the slack message:

This WorkflowTemplate has one template of type container that runs the casavo/slack-notifier image. A simple image that calls the slack hook.
The slack hook is retrieved from K8s secrets with the following:

envFrom:
- secretRef:
name: slack-hook

Also, this template accepts 4 parameters with some default values (channel, title, icon and wf-status) that are used from the docker image as environment variables to build the HTTP call to slack.
This way, every Workflow that uses this template can customize those values.

Now that we have our YAML definition of the WorkflowTemplate, we can paste it in the Argo Workflows UI in the template section or, if you installed the Argo Workflows CLI you can run:

argo template create slack-notifier.yml

Now, let’s create the CronWorkflow that every minute will print a “hello world” using the docker/whalesay image:

Here you can notice some cron configuration in the spec field like the
schedule or the concurrencyPolicy.

While in the workflowSpec we have the workflow definition with two templates:
One is of type container and will run the whalesay image.
The other one is of type steps and will call the slack-notifier template and it’s used as an exit handler.
This means that every time the workflow is finished, no matter what, this exit handler will be called.
You will also notice that we pass the workflow status to the template and we overwrite the title.

NOTE: the send-notification template is invoked by a step template. That’s because, as we saw earlier, steps and dags are the only two types of templates that can invoke other templates.

Once we’ve done, paste it in the UI under the CronWorkflow section or run the CLI command:

argo cron create hello-cron.yml

And that’s it! now you have a CronWorkflow running every minute and a slack notification that alerts you about the status!

These are the main reasons that led us to adopt Argo Workflows for our data pipeline infrastructure, and we are continuously improving the way we use it to get the best out of it!

--

--