Tutorial | Function usage and extension in DolphinScheduler

Apache DolphinScheduler
CodeX
Published in
4 min readFeb 9, 2023

Author | Wang Jipeng, Apache DolphinScheduler Committer

Apache DolphinScheduler is an open-source, distributed, and scalable visual DAG workflow task scheduling system. It is suitable for enterprise-level scenarios and provides a visual solution for operating tasks, workflows, and the entire data processing life cycle.

When using Apache DolphinScheduler, we usually use some regular times in logic to specify business times or execution times and apply them to individual tasks or entire workflows.

For example, setting a global parameter in a workflow to output today’s date last month, we would use the following function: $[add_months(yyyyMMdd,-1)]

After the task component outputs, we get the following result:

It is clear that after using the add_months function, the time is calculated according to the value in the parameter and output in the specified time format. If we want to use time ranges such as next month, last year, or next week, the Apache DolphinScheduler website has described this as follows:

Use the add_months() function, which is used to add or subtract months. The first input parameter is [yyyyMMdd], which represents the format of the returned time. The second input parameter is the number of months to add or subtract.

  • N years later: $[add_months(yyyyMMdd,12N)]
  • N years earlier: $[add_months(yyyyMMdd,-12N)]
  • N months later: $[add_months(yyyyMMdd,N)]
  • N months earlier: $[add_months(yyyyMMdd,-N)]

Adding or subtracting numbers directly directly “±” the number after the custom format

  • N weeks later:$[yyyyMMdd+7N]
  • N weeks earlier: $[yyyyMMdd-7N]
  • N days later: $[yyyyMMdd+N]
  • N days earlier: $[yyyyMMdd-N]
  • N hours later: $[HHmmss+N/24]
  • N hours earlier:$[HHmmss-N/24]
  • N minutes later: $[HHmmss+N/24/60]
  • N minutes earlier: $[HHmmss-N/24/60]

Through the above functions, we can complete our business in Apache DolphinScheduler. But what should we do if the functions provided by Apache DolphinScheduler can no longer support emerging businesses?

For example, getting the last five days of the first month of the previous quarter, the final output result should be 20220727, 20220728, 20220729, 20220730, 20220731. If it’s a little more complicated, getting the last five working days of the first month of the previous quarter, then the final output result should be 20220725, 20220726, 20220727, 20220728, 20220729, because 20220730 and 20220731 are non-working days.

The clues included in this scenario: the time range, the time offset, the result time range, and whether it is a workday. Apache DolphinScheduler currently only supports time functions for natural days, so we need to extend the function calculation to support complex business scenarios.

Fortunately, Apache DolphinScheduler provides an extension point, which locates in the dolphinscheduler-service module, under the src/main/java/org/apache/dolphinscheduler/service/expand path, the timeFunctionNeedExpand method is used to determine if the current function needs to be calculated in the extended function, and the timeFunctionExtension method is used to do the final calculation of the extended function expression and return the calculation result.

Time function extension
public interface TimePlaceholderResolverExpandService {
boolean timeFunctionNeedExpand(String placeholderName);
String timeFunctionExtension(Integer processInstanceId, String timeZone, String placeholderName);
}

The default implementation of the time function extension interface has been made in the same directory, by default no function extension calculation is needed and the extension calculation returns NULL.

Time function default extension implementation
@Component
public class TimePlaceholderResolverExpandServiceImpl implements TimePlaceholderResolverExpandService {
@Override
public boolean timeFunctionNeedExpand(String placeholderName) {
return false;
}
@Override
public String timeFunctionExtension(Integer processInstanceId, String timeZone, String placeholderName) {
return null;
}
}

At this point, we have got half of the implementation of the time calculation in the complex business scenario, next, we need to do a custom implementation of the extension method:

Custom function implementation 
public class CustomerTimePlaceholderResolverExpandServiceImpl implements TimePlaceholderResolverExpandService {
@Override
public boolean timeFunctionNeedExpand(String placeholderName) {
// Increase the identification of extension functions
return true;
}
@Override
public String timeFunctionExtension(FunctionExpandContent functionExpandContent) {
try {
// Add the analysis and calculation logic of user-defined functions
} catch (Exception e) {
log.error("time function extension error{}", functionExpandContent, e);
}
return null;
}
}

Finally, by implementing a custom function calculation logic, we met the needs of the business scenario, but I want to remind you that, because of the complexity and differences of the business, more parameters may be required. At this time, maintaining the historical function calculation is unnecessarily complicated.

I recommend you optimize the required parameters into an aggregated object. No matter how the subsequent business changes, you only need to increase the fields in the corresponding scenario. The code is as follows:

Time function extension 
public interface TimePlaceholderResolverExpandService {
boolean timeFunctionNeedExpand(String placeholderName);
String timeFunctionExtension(FunctionExpandContent functionExpandContent);
}
public class FunctionExpandContent {
private boolean global;
private String parameters;
private Integer processInstanceId;
private String timezone;
private String placeholderName;
private MapparamsMap;
}

📌📌 Welcome to fill out this survey to give your feedback on your user experience or just your ideas about Apache DolphinScheduler:) https://www.surveymonkey.com/r/7CHHWGW

--

--

Apache DolphinScheduler
CodeX
Writer for

A distributed and easy-to-extend visual workflow scheduler system