Pawel Garbacki, Mao Ye, Changshu Liu & Jooseong Kim | Pinterest engineers
As we continue to build in a fast and dynamic environment, we need a workflow manager that’s flexible and can keep up with our data processing needs. After trying a few options, we decided to build one in-house. Today we’re open-sourcing Pinball, which is designed to accommodate the needs of a wide range of data processing pipelines composed of jobs ranging from simple shell scripts to elaborate Hadoop workloads. Pinball joins our other open-sourced projects like Secor and Bender, available on our Github.
Building highly customizable workflow manager
Development of new product features is often constrained by the availability of data that powers them. The raw data, typically originating from logs, is sliced and diced, merged and filtered across multiple dimensions before it reaches the shape suitable for ingestion by downstream applications. The process of data transformation is often modelled as a workflow, which, in abstract terms, are directed graphs of nodes representing processing steps and edges describing the run-after dependencies.
Workflows can be arbitrarily complex. In realistic settings it’s not uncommon to encounter a workflow composed of hundreds of nodes. Building, running and maintaining workflows of that complexity requires specialized tools. A Bash script won’t do.
After experimenting with a few open-source workflow managers we found none of them to be flexible enough to accommodate the ever-changing landscape of our data processing solutions. In particular, current available solutions are either scoped to support a specific type of job (e.g. Apache Oozie optimized for Hadoop computations) or abstractly broad and hard to extend (e.g. monolithic Azkaban). With that in mind, we took on the challenge of implementing a highly customizable workflow manager build to survive the evolution of the data processing use cases ranging from execution of basic shell commands to elaborate ETL-style computations on top of Hadoop, Hive and Spark.
Pinball is used by all of our engineering teams, and handles hundreds of workflows with thousands of jobs that process almost three petabytes of data on a daily basis in our Hadoop clusters. The largest workflow has more than 500 jobs. Workflows generate analytics reports, build search indices, train ML models and perform a myriad of other tasks.
Platform vs. end product
Pinball offers a complete end-to-end solution ready to use right out of the box. At the same time, its component-wise design allows for easy alterations. The workflow management layer of Pinball is built on top of generic abstractions implementing atomic state updates.
Conceptually, Pinball’s architecture follows a master-worker (or master-client to avoid naming confusion with a special type of client that we introduce below) paradigm where the stateful central master acts as a source of truth about the current system state to stateless clients. Clients come in different flavors ranging from workers taking care of job execution, to scheduler controlling when a workflow should run and UI allowing users to interact with the system, to command line tools. All clients talk the same language (protocol) exposed by the master, and they don’t communicate directly. Consequently, clients are independent and can be easily replaced with alternative implementations. Because of the flexibility of Pinball’s design, it’s a platform for building customized workflow management solutions.
While customization is possible, it’s worth emphasizing that Pinball comes with default implementation of clients allowing users to define, run and monitor workflows.
Workflow life cycle
Workflow is defined through a configuration file or a UI workflow builder, or even imported from another workflow management system. Pinball offers a pluggable parser concept allowing users to express their workflows in a format that makes most sense to them. Parser translates an opaque workflow definition to a collection of tokens representing workflow jobs in a format understandable by Pinball. (Read more about Pinball’s features.)
Workflow gets deployed through a command line tool or a UI component. Deployment invokes the parser to extract schedule token from the workflow configuration and stores it in the master. A schedule token contains metadata such as the location of the workflow config, the time at which the workflow should run, the recurrence of executions and an overrun policy. The policy describes how the system should behave if the previous run of the workflow hasn’t finished by the time a new execution is due. Example policies allow aborting the currently running workflow, starting another workflow instance in parallel to the running one or delaying the workflow start until the previous run finishes.
When the time comes, the scheduler uses the information stored in the schedule token to locate the workflow config, parse it and generate job tokens representing individual jobs in that workflow. Job tokens are posted to the master under a unique workflow instance ID. Workflow instances are controlled independently of one another giving the user the flexibility to run multiple instances of the same workflow in parallel.
Job tokens are claimed and executed by idle workers. A job is described by a command line that the worker runs in a subprocess. The output of the subprocess is captured and exposed in the UI. Pinball interprets specially formatted log lines as values to be exposed in the UI or passed on to downstream jobs. This allows us to directly embed a link to a Hadoop job tracker page in the Pinball UI or propagate parameters from an upstream job to its downstream dependents.
If any post-processing is needed on job failure (e.g. one may want to remove partial output), Pinball offers the ability to attach arbitrary cleanup command to job configuration. Cleanups are guaranteed to run even if the worker that claimed the job died in the middle of its execution.
Failed jobs may be retried automatically or manually. Users can choose any subset of jobs to retry by interacting with the workflow diagram. Bulking of actions significantly improves the usability when working with larger job hierarchies.
When the workflow instance finishes (either failing or succeeding), optional email notifications are sent out to workflow owners.
Workflow configuration and job templates
To end users, workflow manager is often a black box that employs faerie magic to schedule and execute their jobs, but the workflow itself needs to be defined in one way or another. While designing Pinball, we made a conscious choice to not make the configuration syntax part of the system core in order to give developers a lot of flexibility to define workflow configurations in a way that makes the most sense in a given setting. At the same time, we wanted to offer a complete package with a low barrier to entry. So, we decided to include a simplified version of the parser and job templates that we use in our open-source release.
Out of the box, we support a Python-based workflow configuration syntax. We also provide a number of job templates for configuring simple shell scripts as well as more elaborate computations on the Hadoop platform. We offer a native support for EMR and Qubole platforms with some power features such as embedding of job links in the Pinball UI and cleaning up resources after failed jobs. We also propose the notion of a condition that allows users to model data dependencies between jobs (think of a job being delayed until the data it needs becomes available).
Pawel Garbacki is a software engineer on the Monetization team. Mao Ye, Changshu Liu and Jooseong Kim are software engineers on the Data team.
Acknowledgements: Thanks to Krishna Gade, Mohammad Shahangian, Tongbo Huang, Julia Oh and Roger Wang for their invaluable contributions to the Pinball project.