Let’s make workflow engines fun again

Arik Cohen
4 min readJan 19, 2018

--

Workflow engines are far from being a new thing.

In fact, type “workflow engine” into the Github search box and you have well over 600 open source workflow engines to choose from. Not to mention the commercial offerings.

So why another one?

Well, after turning more than a few stones I found them lacking in two main areas:

They are cumbersome and over-engineered to a fault.

They don’t scale very well.

I was tasked by a client to build a system that can transcode and process thousands of pieces of video content (trailers, movies, clips etc.) which is a very CPU intensive type of workload.

If you ever transcoded a movie on your laptop or home computer you know what I’m talking about. It takes time and horsepower.

So scaling is the only option. Problem is that these workflow engines are typically designed to run as a single process and not distributed.

So I set out to build one.

My requirements were fairly straightforward:

  1. Simple to use. Creating a new workflow should not require a PhD in business management or extensive training.
  2. Scalable. I want to be able to process dozens of videos simultaneously and also be able to split a single video processing to multiple machines (a.k.a split & stitch).
  3. General purpose. While not every computation task is suitable for a workflow engine I’d like to be able to perform other types of workload — maybe completely unrelated to video processing.
  4. Extensible. That means well defined interfaces that can be used to extend the system and it’s functionality without having to mess with its “guts”.
  5. Fault tolerant. Long running processes — especially ones that run in a distributed environment — tend to break, a lot. Machines go down, hard disks get filled up, a 3rd party service becomes temporarily unavailable etc. The system should then be able to gracefully handle these sorts of failures, retry when appropriate and not lose any significant amount of work.

Meet Piper

Piper is my answer to this problem. It is a distributed workflow engine designed to run on a single node or thousands of nodes — depending on your needs.

In Piper, work to be done is defined as a set of tasks called a Pipeline. Pipelines can be sourced from many locations but typically they live on a Git repository where they can be versioned and tracked.

Here’s an example of a pipeline:

name: Hello World

inputs:
- name: yourName
label: Your Name
type: string
required: true


tasks:
- name: randomNumber
label: Generate a random number
type: randomInt
startInclusive: 0
endInclusive: 10000

- label: Print a greeting
type: print // each task specifies a type
text: Hello ${yourName}

- label: Sleep a little
type: sleep
millis: ${randomNumber}
- label: Call a webservice
type: rest
url: http://some.shaky.service/api
retry: 3 // will retry upto 3 times
timeout: 5s // will timeout after 5 seconds

And that’s basically it as far as syntax goes. There are a few specialized task types such as each (loop), parallel (running tasks in parallel), fork to fork the execution path of the workflow etc. But these are nothing but tasks as well.

For fault tolerance, every task may specify a retry property which tell Piper how many times that task may be retried before giving up, as well as timeout to specify the amount of time to wait before terminating the execution of a task.

Anyone can create their own TaskHandler by implementing its interface and without having to know the intimate details of the rest of the system.

As far as architecture goes the system is composed of two main pieces:

  1. Coordinator —The Coordinator is the like the central nervous system of Piper. It keeps tracks of jobs, dishes out work to be done by Worker machines, keeps track of failures, retries and other administrative details. Unlike Worker nodes, it does not execute actual work but delegate all task activities to Worker instances.
  2. Worker — Workers are the work horses of Piper. These are the nodes that actually execute tasks requested to be done by the Coordinator machine. Unlike the Coordinator, the workers are stateless, which by that is meant that they do not interact with a database or keep any state in memory about the job or like that. This makes it very easy to scale up and down the number of workers in the system without fear of losing application state.

All communication between the Coordinator and the Worker nodes is done through a messaging broker (RabbitMQ by default). This has many advantages:

  1. if all workers are busy the message broker will simply queue the message until they can handle it.
  2. when workers boot up they subscribe to the appropriate queues for the type of work they are intended to handle
  3. if a worker crashes the task will automatically get re-queued to be handle by another worker.
  4. Last but not least, workers and TaskHandler implementations can be written in any language since they are completely decoupled.

For more details and getting started tutorials feel free to checkout the project page.

--

--