Custom queue mechanism for Data Factory pipelines
As a data engineer you might come across the requirement to queue your Data Factory pipelines, to ensure no other instance of the same pipeline is already running. For example, as part of your pipeline you are running a configuration activity on the database or storage account dedicated to the current job, where running the same pipeline in parallel will cause the configurations to conflict. Currently there is no out-of-the-box feature in Data Factory to queue the runs of the same pipeline. A proposed solution on the web is using the Web Activity in a so called proxy pipeline to query the running pipeline instances. Only if no active instances are running, the current job is executed. In this blog we demonstrate a possible implementation of this approach.
Scenario
When triggered multiple times, pipeline A is only allowed to run sequentially. A proxy pipeline checks if pipeline A is available. If it is available the pipeline is triggered, else we wait for 1 minute and try again.
Step by Step guide
- Create a target pipeline
Create a pipeline that is target for the proxy pipeline. This is the pipeline for which only one instance per time is allowed to run. We call it target_pipeline. For the purpose of this blog our pipeline has one activity: Wait 2 minutes. - Create a proxy pipeline
We call this pipeline_check_availability. It has one pipeline variable and two activities with several subactivities.
Pipeline Variables
activeRuns; This shows the number of active runs of the target pipeline. Default value= 1.
Activities
Until; Until the target pipeline is available, we perform activities to check its availability.
In Tab Settings, add an expression which defines the behaviour of this activity:
@equals(int(variables('activeRuns')),0)
In other words, until the value of variable activeRuns equals 0, we continuously run the activities within the Until block. Since the default value of this variable is 1, the first thing the pipeline will do is check the target pipeline’s availability.
We add four subactivities to the Until activity:
- Web Activity to get the active runs: perform a call to the Azure Management API to read the current pipeline’s activity. Settings:
- URL: https://management.azure.com/subscriptions/{SubscriptionId}/resourceGroups/{ResourceGroupName}/providers/Microsoft.DataFactory/factories/{DataFactoryName}/queryPipelineRuns?api-version=2018-06-01
- Method: POST
- Body to filter on target pipeline:
{
"lastUpdatedAfter": "@{adddays(utcnow(),-2)}",
"lastUpdatedBefore": "@{utcnow()}",
"filters": [
{
"operand": "PipelineName",
"operator": "Equals",
"values": ["target_pipeline"]
}]
}
- Authentication: choose your preferred Authentication, for example via Service Principle or Managed Identity.
- Resource: https://management.core.windows.net/
2. Filter activity to filter the output of the web activity:
In Tab Settings set the Items and Condition values:
Items; use the output of the Web Activity:
@activity('Get Active Runs').output.value
Condition; filter on status “InProgress” or “Queued”:
@or(equals(item().status,'InProgress'),equals(item().status,'Queued'))
3. Set Variable, to set the value of variable activeRuns
We will use the output of the Filter activity to count the number of active runs and assign this to the variable activeRuns. As explained in the beginning, this variable is the basis for the Until activity to determine if the target pipeline is available or not.
@string(activity('Filter Running Pipelines').output.FilteredItemsCount)
4. If Condition, to determine if the target pipeline is available. If the value of activeRuns is greater than 0, this means the target pipeline is not available. In that case we run a Wait activity set to 1 minute. After 1 minute, the Until activity will be triggered again, because the value of activeRuns is still not 0. If the value of activeRuns is 0, we don’t trigger any activities, because the Until loop will break.
@greater(int(variables('activeRuns')),0)
Execute Pipeline; after the Until activity, we add the activity to trigger the target pipeline. So, once the Until activity breaks because variable activeRuns equals 0, the target pipeline is triggered.
Next steps
- Determine the best wait time for your proxy pipeline. In this blog we wait 1 minute before retrying to run the target pipeline, but in a productionalized environment it is likely this is not sufficient.
- When working with multiple environments, parameterize environment dedicated variables.
- You can check for more running pipelines by including additional pipeline names in the body of the Web Activity.
Azure Tutorials frequently publishes tutorials, best practices, insights or updates about Azure Services, to contribute to the Azure Community. Azure Tutorials is driven by two enthusiastic Azure Cloud Engineers, combining over 15 years of IT experience in several domains. Stay tuned for weekly blog updates and follow us if you are interested!
https://www.linkedin.com/company/azure-tutorials