October 13, 2015
The Cask Data Application Platform (CDAP) is an open-source platform to build and deploy data applications on Apache Hadoop™. In a previous blog post we introduced Workflows, a core component of CDAP, in comparison with Apache Oozie. In this post we will discuss the CDAP Workflow engine in greater detail.
CDAP Workflows are used to execute batch programs in an order specified by users. Currently, CDAP Workflows support MapReduce, Spark programs or any custom java code. The MapReduce and Spark programs are added to an application and referred to inside a Workflow by using their names. This gives users ability to execute the program independently outside of the Workflow if required. The same program can also be referred to by multiple Workflows; thus giving users the ability to reuse the existing program code. Users can also execute lightweight java classes inside the same JVM as Workflow Driver program. These are called custom actions and are added directly inside the Workflow.
Workflow Representation
A Workflow is represented as a list of nodes that are executed in sequence. A Workflow can have three types of nodes: ACTION, CONDITION, or FORK. The nodes are uniquely identified by a node id. The following illustration explains the hierarchy of Workflow nodes:
ACTION node: When user adds a program (such as MapReduce or Spark) or custom action in a Workflow, an ACTION node is generated for it. The id of the ACTION node is same as the name of the underlying program or custom action that would be executed by the node.
For a program, the ACTION node only stores its name. For a custom action, ACTION node also stores other details, such as name of the custom action class, which are required to instantiate and execute the class at runtime. Upon successful execution of the ACTION node, Workflow execution continues to the next node, while on failure of the ACTION node, Workflow execution is aborted.
Program can be added to the Workflow using its name.
// Add myMapReduce program to the Workflow
addMapReduce("myMapReduce");
// Add mySpark program to the Workflow
addSpark("mySpark");
Custom action can be added to the Workflow using its instance.
// Add custom action 'MyCustomAction' to the Workflow.
// Custom java code in the 'run' method of the 'MyCustomAction'
// will be call during the execution.
addAction(new MyCustomAction());
CONDITION node: Sometimes user may want to execute certain actions in the Workflow based on some conditions such as availability of the valid data for processing. A CONDITION node allows Workflow to execute different actions based on the result of a Predicate at runtime. Think of it as a ‘if-else’ construct in the programing model. The class representing the predicate should be a public named class. This restriction allows the Workflow driver to instantiate it at runtime using its name. It should also implement the interface Predicate<WorkflowContext> interface. The boolean return value of the apply method is used to determine whether to execute the if branch or the else branch. Node id of the CONDITION node is the simple name of the predicate class. Apart from storing the predicate class name, CONDITION node also stores two lists of nodes — one representing the nodes to be executed when the predicate evaluates to true and another to be executed when the predicate evaluates to false. Each node in the list can further be of type ACTION, CONDITION, or FORK which allows arbitrary nesting of conditions and forks in the Workflow.
CONDITION node can be added to the Workflow as,
// Add condition in the Workflow.
// 'MyPredicate' is a public class which overrides the 'apply' method.
// Based on the return value of the 'apply' method, either 'if' or 'else'
// branch in the condition node gets executed.
condition(new MyPredicate())
.addMapReduce("OneMR").addSpark("OneSpark")... // list of nodes on the 'if' branch
.otherwise()
.addMapReduce("AnotherMR")... // list of nodes on the 'else' branch
.end();
FORK node: Workflow supports the execution of multiple actions in parallel using FORK node. When the actions are independent of each other, executing them in parallel reduces the overall runtime of the Workflow. FORK node contains list of branches and each branch of the fork in turn is a list of nodes. Branches are executed in parallel by individual threads. Similar to CONDITION nodes, nodes on the branches can further be of type ACTION, CONDITION, or FORK.
FORK node can be added to the Workflow as,
fork()
.addMapReduce("Branch1MR")... // list of nodes on one branch of the fork
.also()
.addSpark("Branch2Spark")... // list of nodes on another branch of the fork
.also()
.... // fork can have arbitrary number of branches
.join();
Workflow Runtime
A CDAP Workflow runs as a YARN application managed by Apache Twill. A Workflow can be started explicitly from the UI, CLI, REST endpoint or triggered by a schedule. When the CDAP Master receives a request to start a Workflow, it uses Twill to launch the Workflow application master (AM) in the Hadoop cluster. The Master also maintains a controller which is used to manage the lifecycle of the Workflow application. the Workflow AM in turn launches a new YARN container which runs the Workflow Driver.
The Workflow Driver is responsible for executing the nodes in the Workflow in order. It runs as a single thread which goes through startUp, run and shutDown phases.
During the startup phase a Netty HTTP Service is initialized for every run of the Workflow. This service exposes RESTful APIs for serving requests for the list of nodes that are currently in execution.
During the run phase, the driver retrieves the list of nodes to be executed for a Workflow and starts executing them serially. As explained earlier, when a CONDITION node is encountered, its Predicate is evaluated first to determine whether to execute the true branch or the false branch. When a FORK node is encountered, driver thread starts executing branches of the FORK node in separate threads. It then blocks until all underlying threads finish executing the nodes in their respective branches.
When an ACTION node is encountered, driver executes it in a new thread and blocks till it is completed. Launching of the MapReduce program from Workflow is mainly controlled by the MapReduce runtime environment which is started by the Workflow Driver. During the service startUp phase, MapReduce Job is created with the supplied hadoop configurations. It also copies the program jar containing the MapReduce program to the hadoop cluster and enable it for the localization by adding its URI in the job config under the property “mapreduce.job.cache.files”. Once the job is configured it is submitted to the hadoop cluster. MapReduce YARN AM is then launched in new container and takes care of executing the map and reduce tasks in separate containers. Driver still maintains the controller to manage the lifecycle of the MapReduce program. Similar to the MapReduce program, when Driver encounters the Spark ACTION node in the Workflow, it starts Spark runtime environment to control the launch of the Spark program. Since custom actions are lightweight java classes, they are executed in the same container as the Workflow Driver, thus avoiding the overhead of creating containers for them.
Once all the nodes are executed, Netty HTTP Service started during the startup phase is stopped. The Workflow gets marked as COMPLETED.
Besides normal execution, Workflow also supports the ability to stop and suspend/resume. When Workflow is explicitly stopped, its Driver thread is interrupted. This in turn interrupts the other threads started by the Driver while executing Workflow nodes. In this case, the Workflow is marked as KILLED. When a Workflow is suspended, execution of any running ACTION node is completed first and then the Driver thread gets blocked. A SUSPENDED Workflow can be resumed at a later stage. When a Workflow is resumed, its Driver thread is unblocked, and it continues executing subsequent nodes to completion.
Workflow Token
As Workflow execute the actions, it is important to carry the state about them, such as success status or Hadoop counters and Spark metrics. This information can be used by subsequent nodes in the Workflow. For example, CONDITION nodes can decide which branch to execute based on it. This information is passed as a Workflow token.
At high level, token simply contains the the key-value pairs where key and value are of type String. Empty token is created when the Workflow starts its execution. This token is then passed along from one node to the next as the execution of the Workflow continues. During the execution node has an ability to read the content of the token as well as put some more data inside it. When the FORK node is encountered in the Workflow, multiple copies of the token are made and passed along the each branch of the FORK. At the join, the tokens from all branches are merged together into the single token which is then passed further in the Workflow.
Piecing it together
Consider an example of processing web logs of an e-commerce website and getting some analytics. Particularly, let’s say we are interested in getting the top 1000 active users and top 1000 popular products. The workflow for this problem might have the following structure:
- User Records Verifier (URV): MapReduce program to verify the user records from user logs
- Product Records Verifier (PRV): MapReduce program to verify the product records from product access logs
- Predicate (EnoughDataToProceed): Used to determine whether enough number of valid users and products are available for processing
- TOP 1000 Active User Finder (TAUF): MapReduce program to compute top 1000 active users
- TOP 1000 Popular Products Finder (TPPF): MapReduce program to compute top 1000 popular products
- Workflow Run Statistics Logger (StatisticsLogger): Custom java code to log the statistics of the Workflow run
The above Workflow can be configured using Java API as:
@Override
public void configure() {
// Add User Record Verifier node
addMapReduce("URV");
// Add Product Record Verifier node
addMapReduce("PRV");
// Predicate EnoughDataToProceed can be used for branching in the Workflow
condition(new EnoughDataToProceed())
.fork()
.addMapReduce("TAUF") // Add Top 1000 Active User Finder
.also()
.addMapReduce("TPPF") // Add Top 1000 Popular Product Finder
.join()
.end();
// Add custom java code to log the statistics of the Workflow run
addAction(new StatisticsLogger());
}
Predicate EnoughDataToProceed can read the number of valid user and product records from the Workflow token.
public class EnoughDataToProceed implements Predicate<WorkflowContext> { @Override
public boolean apply(WorkflowContext context) {
WorkflowToken token = context.getToken();
return token.get("valid.records") > minProductCount && token.get("valid.records", "URV") > minUserCount;
}
}
If the number of valid user and product records are above certain threshold, Workflow is configured to run the MapReduce programs “TAUF” and “TPPF” in parallel using fork. This helps in reducing the overall execution time of the Workflow.
StatisticsLogger is a custom java code that is configured to execute at the end of Workflow run. It can be used to log or email the statistics of the Workflow run as read from the Workflow token.
Summary
In this blog post, we talked about the representation of the Workflows in CDAP. Having hierarchical node structure allows user to define the Workflows having complex nesting of forks and conditions. We discussed the Workflow runtime. We also discussed about the feature “Workflow Tokens”, which allows passing information between the nodes of the Workflow.
We hope you enjoyed this post! Please checkout the CDAP Workflow guide and Wikipedia Pipelinefor an end to end example and reach out to us for any questions.