Composed Tasks on Spring Cloud Dataflow

Siddhant Sorann
MiQ Tech and Analytics
5 min readSep 22, 2020

Implementing composed tasks on SCDF to better schedule data pipelines.

Introduction

In our previous blog, we spoke about what we do at MiQ (Trading Lab) and how we are using Spring Cloud Dataflow to run our batch jobs on the cloud. We had a need for the composed task to resolve multiple issues with our pipelines, mainly around dependent batch jobs and scheduling. This will be explained more in detail in the later sections.

Why do we need composed tasks?

To ensure that the traders have up to date information available before making any decisions we ingest data from multiple DSPs (Demand-Side Platform) (Appnexus, The Trade Desk & DV360) and aggregate it. Being a programmatic media buying agency, we at MiQ work closely with DSPs to buy inventory from them. To make this possible, we run 100+ batch jobs every day.

Some of these jobs are dependent on others to complete before they start otherwise it might result in incorrect data.

Consider the below example -

Fig. 1: A basic representation of dependency between tasks.

In the above scenario -

  1. Task 1 and Task 2 have to run first and dump some data to our database
  2. After they have completed, Task 3 has to fetch the dumped data, aggregate it, and finally store it back in the database.

Before composed tasks, we used to schedule all our tasks in such a way that Tasks 1 & 2 will complete before Task 3 starts. i.e if we know that Tasks 1 & 2 approximately take 30 mins each to complete we could schedule Task 3 an hour after the schedule of Tasks 1 & 2.

The following were the drawbacks -

  • Sometimes the dependent tasks (1 & 2) would take more than an hour to complete, this would mean that Task 3 wouldn’t have fetched the updated data to aggregate.
  • If any of Task 1 or 2 fails, we know that Task 3 won’t give us the correct data so there is no point in running it.
  • We give a 30mins gap between Task 3 and the tasks it depends on which sometimes might not be sufficient and sometimes might result in unnecessary delay.

How did we solve this problem?

To run our Batch Jobs we use an open-source platform called Spring Cloud Dataflow (SCDF). You can refer to another one of our other blogs that talk about how we are using SCDF here.

Composed Task has been created by SCDF for exactly this purpose. You can get some more information on Composed Tasks here (official documentation).

Composed task lets the user combine multiple tasks and create a hierarchical DAG (Directed Acyclic Graph) between them. The graph is very intuitive and can handle multiple branches after a task execution based on the exit status of the task. It also allows you to run tasks in parallel and sync them to wait for them to complete before moving on. Another useful feature is that they can be created through the RESTful API, the Spring Cloud Data Flow Shell, or the Spring Cloud Data Flow UI. This makes them effortless to create, modify, and use.

Going back to the previous example of the 3 tasks we had looked at, this is how the composed task will look like while building it using the SCDF UI.

Fig. 2: Representation of a basic composed task graph

In this graph, we are waiting for task-1 and 2 to complete before starting task-3. The graph representation of this dependency will be:

<task-1 || task-2> && task-3

A little more about the Composed Task Definition -

Conditional execution is expressed by using a double ampersand symbol (&&). This lets each task in the sequence be launched only if the previous task is successfully completed. The split (||) symbol is used to express tasks that can run in parallel. Finally, the (<>) operators are used to simply group tasks together. In our example, task-1 and task-2 will start running in parallel as soon as the composed task is triggered. If and when both task-1 and task-2 complete successfully, task-3 will be executed. I.e if either of task-1 or task-2 fails, task-3 will not be triggered, also, task-3 will only be triggered once both task-1 and task-2 have completed.

Transitions -

The Composed Tasks also support the ability to control what tasks get executed based on the ExitStatus of the previous task. This is done by specifying ExitStatus after the task definition followed by the -> operator and the task definition that should be launched based on the result. Example:

Fig 3: Composed Task Graph with conditional execution

The graph representation for this composed task is as follows:

task-1 ‘FAILED’->task-2 ‘COMPLETED’->task-3 && task-4

In the above example, when the composed task is triggered, it will trigger task-1. Once task-1 returns an exitStatus, according to the value of the exitStatus the next task will be executed -

  1. If exitStatus = FAILED, task-2 will be triggered
  2. If exitStatus = COMPLETED, task-3 will be triggered
  3. If exitStatus is anything other than the above 2, task-4 will be executed

After the execution of task-2, task-3, or task-4 the composed task will exit and execution will end.

Sync Node

The sync node is a control node. It is used to program the composed task to wait for the execution of tasks before the sync node. Only after they are complete, will it trigger the tasks after the sync node.

Fig. 4. A basic composed task to represent the use of a Sync node.

DSL for the above graph -

<task-1: apn || task-2: apn> && <task-3: apn || task-4: apn>

In the above graph, task-1 and task-2 will be executed in parallel. Once both of them have completed, the composed task will trigger task-3 and task-4 in parallel. This is how the Sync Node helps to control the flow of execution.

Conclusion

Using the above 2 examples you can combine the operators and transitions to create any kind of graph which suits your use case. Such a composed task ends up resolving all the drawbacks we had previously come across. It makes our pipelines more efficient, reliable, and requires less manual intervention. Overall it managed to shave off at least 30% of the time taken by our data pipelines. It also made it easier for us to handle failures and debug issues.

--

--