Code Is the Best DSL for Building Workflows
As a back-end developer, you are often asked to build some structured orchestration of tasks such as microservices orchestrations, distributed transactions, data pipelines or some business logic. In all those examples you must ensure that the different types of tasks are processed according to a given scenario. This scenario can be simple such as sequential tasks, or a lot more complex with scenarios depending on time, with a complex decision diagram, or depending on external signals for example.
As soon as you are facing moderately complex workflows, you may be tempted to use a DSL, coupled with a dedicated engine to operate those workflows, such as Airflow, Luigi or Conductor.
In this article, I will show how a programming language can be used as a DSL by a “workflow as code” engine, and why it’s probably your best long-term option for building reliable automation at any scale.
The examples provided below will use Kotlin, as this is the language I use in Infinitic, the “workflow as code” engine I work on. Kotlin was chosen for a great integration with Apache Pulsar as described here.
Context
Even if you can benefit from a “workflow as code” engine in simple infrastructure, I am considering the more complex situation where you have distributed stateless workers in different locations. You are able to distribute tasks to those workers through some mechanisms (for example a queuing system) — each task being usually described by their name and some data.
The problem we try to solve is: how to orchestrate the processing of those tasks on different servers — in a way both reliable, scalable, and easy to manage and monitor?
Example
Let’s consider the sequential workflow below as an example :
- The workflow is starting with an url pointing to an image and a target size
- The first task is downloading the image
- The second task is resizing the image according to target size
- And the last task is uploading the resized image and returning an url pointing to it:
Using Infinitic, this workflow could be coded like this:
Let’s look at it line by line:
- Line 1: let’s import
Workflow
interface that provides useful functions here (see below) - Line 3–7: we do not use an actual implementation of
ImageUtil
and we do not need to know one for coding the workflow. This interface actually declares how tasks can be called in workers. Here we declare that workers will receive a task defined by a nameImageUtil
. This task can be call with 3 methods:download
with a string as input, returns an array of bytes —resize
with a array of bytes and an integer as input, returns an array of bytes — andupload
with an array of bytes as input, returns a string - Line 9: interface
Workflow
provides the useful functionproxy
used below - Line 10:
context
variable is injected when processing by workers. It contains the workflow history. I’ll explain later how it is used. - Line 12:
proxy
function builds a pseudo instance of the provided interface. This pseudo instance is used below to capture which methods are actually applied as the workflow is processed. - Line 14–18: these 5 lines implements the workflow! Basically you can read literally as “please process task
ImageUtil
with methoddownload
and inputimageUrl
. Then process the taskImageUtil
with methodresize
using the output of previous task as first parameter and the provided size as second parameter. Then process the taskImageUtil
with methodupload
using output provided by the last task as input. Returns the last output as the output of the workflow.”
As you can see, using code provides a very concise and efficient way to describe this workflow. As additional benefits, the description of your workflow:
- is entirely contained in one file, easy to read.
- is included in your versioning system
- is automatically type checked by your IDE
How Does It Work?
To make it work, Infinitic uses the following architecture:
As you can see, around a transport layer, we have:
- stateless workers, that are processing normal tasks, defined by users, whose interfaces are the bricks used in workflow`s definition — and special tasks called WorkflowTasks, whose implementation is provided by Infinitic. Those tasks use the workflow definition and history to discover what to do next.
- stateful workers (Engine) whose role is mainly to maintain the state of each workflow instance by looking at everything happening related to it. They trigger WorkflowTasks based on workflow history and trigger normal tasks based on output of workflowTasks.
Serialization: as data used within a workflow instance flow between workers, it is serialized and deserialized along the way. Infinitic does it automatically and transport/store it into an avro format that guarantees durability and evolutivity. This format also lets us have workers processing tasks in different programming languages than java/kotlin. We provide examples in node.js.
Scalability: all stateless workers can easily be scaled horizontally. For the engines, horizontal scalability can also be ensured as long as all messages related to a specific workflow instance is handled in time by the same engine worker. For example, using Pulsar this constraint is solved using a Key-Shared subscription (the key being the id of a workflow instance). In case of complete failure of an engine worker, it can be stopped and the corresponding traffic will be automatically redistributed to remaining workers.
At Infinitic we are working on an off-the-shelf integration with Apache Pulsar.
The exciting benefits of using such a “workflow as code” pattern is that once your infrastructure is set up you can use it for whatever workflows you need to implement as shown below
Beyond Sequential Workflows
Child Workflows
Workflow can trigger child-workflows as easily as tasks. For example, this (inefficient) workflow calculates n! In a distributed way using a classical recursion algorithm. At the last level (n=1), we have a workflow without task directly returning 1
Asynchronous tasks
If you do not care about a task output, you may want to process it asynchronously, i.e. without stopping the workflow’s flow. Infinitic provides a syntax for that:
// T being method’s return type
val d: Deferred<T> = async(myTask) { method(args) }
This is triggering myTask.method(args)
but without interrupting workflow’s flow. The same syntax can be used to trigger a child-workflow asynchronously. Please have a look at the “Using Deferred” section below to learn what you can do with the provided Deferred instance.
As an example, if you want to send you an email when above CropImage
workflow starts and completes, you can do it like so:
Asynchronous branches
In more complex cases, you may want to run more than one task asynchronously, but a complete series of tasks. The async
function can also be used for that:
// T being the return type of … instructions
val d: Deferred<T> = async { … }
For example, the following code will resize an image to all sizes from sizeMin
to sizeMax
, all branches being processed in parallel:
Using Deferred
When using the async
function you get a Deferred<T>
instance, T
being the return type you’ll get once the task (or child workflow) is completed.
This Deferred is a reference that can be used with its 3 methods:
// trigger a task asynchronously, get immediately a Deferred
val d: Deferred<T> = async(myTask) { method(args) }// returns the deferred status (ongoing, completed, canceled)
val status: DeferredStatus = d.status()// waits for completion (or cancellation) and returns itself
d.await()// waits for completion (or cancellation) and returns result
val result: T = d.result()
But more importantly, Deferred can be logically combined:
(d1 or d2 or d3).result()
will wait for at least one deferred to complete and returns its result(d1 and d2 and d3).result()
will wait for all deferred to complete and returns a list of results(d1 or (d2 and d3)).result()
will wait for completion of d1 or (d2 and d3) and will return the result of d1 or a list of d2 and d3 results depending on what completes first.
Any logical combination is possible. for example, it can be used to easily code a fan-out of asynchronous branches (line 32 below):
Conclusions
This article is an introduction to the “workflow as code” pattern built-in in Infinitic. More features will quickly be added soon before the first release:
- Ability for a workflow to pause for a duration or up to a date in time
- Ability for a workflow to wait to receive an object from outside (this can be used for example to influence a workflow by an external signal such as a human validation)
- and more…
As you may start to realize, this pattern provides a very generic way to code and maintain any workflows within your infrastructure, and at the same time have them defined on versioned files as well as automatically monitored.
Actually, this approach is so versatile that even if you need to use and define your own DSL (let’s say in a Json or a BPM format) you can use this engine to actually process it. The only thing you have to do is to code a workflow that takes this definition file as input and convert it to actual tasks to run.
If interested to have a deeper understanding on how it works, please look at under the hood of an event-driven "workflow as code" engine.
I'm building [Infinitic](https://docs.infinitic.io) in public, you can subscribe here to receive more news.