Azure Data Factory: Solving Parallel Execution Issue

Vanderson Gonçalves
4 min readOct 15, 2023

--

Photo by Barna Bartis on Unsplash

Recently, I encountered an issue while working on a pipeline in Azure Data Factory. As I was developing a pipeline to retrieve data from an API, something unexpected happened.

To clarify, I need to explain the API and the data processing pipeline. The primay goal of this pipeline is consume data from an API to retrieve data about surveys. The first step is to call the API to retrieve a list of surveys, which is then stored in a delta table. Afterward, we iterate over this list, making an API call for each survey to obtain its details.

Problem

As we needed to retrieve data for a hundred surveys, the pipeline was developed to run in parallel, with the Foreach Loop activity set to use batches of 25 to comply with API requests limits.

When testing the execution of this pipeline, I noticed that some of the files generated during the second API call were incorrect. This issue ocurred because the survey’s file contained data from another survey.

"activities": [
{
"name": "Lookup Surveys",
"type": "Lookup",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "AzureDatabricksDeltaLakeSource",
"query": {
"value": "@concat(\n string('SELECT `values.Id` FROM raw.surveys WHERE dt_ingestion = '''),\n pipeline().parameters.filedate,\n string('''')\n)",
"type": "Expression"
}
},
"dataset": {
"referenceName": "DS_DELTALAKE",
"type": "DatasetReference",
"parameters": {
"Database": "raw",
"table": "surveys"
}
},
"firstRowOnly": false
}
},
{
"name": "ForEach Survey",
"type": "ForEach",
"dependsOn": [
{
"activity": "Lookup Surveys",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('Lookup Surveys').output.value",
"type": "Expression"
},
"isSequential": false,
"batchCount": 25,
"activities": [
{
"name": "Copy Data API Survey",
"type": "Copy",
"dependsOn": [
{
"activity": "Set url_endpoint",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "RestSource",
"httpRequestTimeout": "00:01:40",
"requestInterval": "00.00:00:00.010",
"requestMethod": "GET",
"paginationRules": {
"AbsoluteUrl": "$['Links']['Next']['Href']"
}
},
"sink": {
"type": "JsonSink",
"storeSettings": {
"type": "AzureBlobFSWriteSettings"
},
"formatSettings": {
"type": "JsonWriteSettings",
"filePattern": "setOfObjects"
}
},
"enableStaging": false
},
"inputs": [
{
"referenceName": "DS_API",
"type": "DatasetReference",
"parameters": {
"url_path": {
"value": "@variables('url_endpoint')",
"type": "Expression"
}
}
}
],
"outputs": [
{
"referenceName": "DS_ADLS_JSON",
"type": "DatasetReference",
"parameters": {
"container": {
"value": "@pipeline().parameters.container",
"type": "Expression"
},
"file": {
"value": "@concat(\n pipeline().parameters.table_sink, '_',\n string(json(string(item()))['values.Id']), '_',\n replace(pipeline().parameters.filedate, '-', ''),\n '.json'\n)",
"type": "Expression"
},
"folder": {
"value": "@pipeline().parameters.path",
"type": "Expression"
}
}
}
]
},
{
"name": "Set url_endpoint",
"type": "SetVariable",
"dependsOn": [],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "url_endpoint",
"value": {
"value": "@replace(\n pipeline().parameters.query,\n '{survey_id}',\n string(json(string(item()))['values.Id'])\n)",
"type": "Expression"
}
}
}
]
}
}
]

The problem here was that we had a SetVariable inside the Foreach Loop. According to Data Factory documentation, this is a limitation of parallel execution in the Foreach Loop activity. This occurs because variables are global to the pipeline, and during parallel execution, multiple API calls write to the same variable simultaneously.

Solution

The workarrounds suggested by documentation include using a sequential ForEach or employing Execute Pipeline inside a ForEach loop. The first option wasn’t ideal as it increased the execution time, while latter one could add unnecessary complexity. In our case, the chosen solution was to eliminate the SetVariable and directly set the URL using an expression with the url Copy Activity.

"activities": [
{
"name": "Lookup Surveys",
"type": "Lookup",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "AzureDatabricksDeltaLakeSource",
"query": {
"value": "@concat(\n string('SELECT `values.Id` FROM raw.surveys WHERE dt_ingestion = '''),\n pipeline().parameters.filedate,\n string('''')\n)",
"type": "Expression"
}
},
"dataset": {
"referenceName": "DS_DELTALAKE",
"type": "DatasetReference",
"parameters": {
"Database": "raw",
"table": "surveys"
}
},
"firstRowOnly": false
}
},
{
"name": "ForEach Survey",
"type": "ForEach",
"dependsOn": [
{
"activity": "Lookup Surveys",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('Lookup Surveys').output.value",
"type": "Expression"
},
"isSequential": false,
"batchCount": 25,
"activities": [
{
"name": "Copy Data API Survey",
"type": "Copy",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "RestSource",
"httpRequestTimeout": "00:01:40",
"requestInterval": "00.00:00:00.010",
"requestMethod": "GET",
"paginationRules": {
"AbsoluteUrl": "$['Links']['Next']['Href']"
}
},
"sink": {
"type": "JsonSink",
"storeSettings": {
"type": "AzureBlobFSWriteSettings"
},
"formatSettings": {
"type": "JsonWriteSettings",
"filePattern": "setOfObjects"
}
},
"enableStaging": false
},
"inputs": [
{
"referenceName": "DS_API",
"type": "DatasetReference",
"parameters": {
"url_path": {
"value": "@replace(\n pipeline().parameters.query,\n '{survey_id}',\n string(json(string(item()))['values.Id'])\n)",
"type": "Expression"
}
}
}
],
"outputs": [
{
"referenceName": "DS_ADLS_JSON",
"type": "DatasetReference",
"parameters": {
"container": {
"value": "@pipeline().parameters.container",
"type": "Expression"
},
"file": {
"value": "@concat(\n pipeline().parameters.table_sink, '_',\n string(json(string(item()))['values.Id']), '_',\n replace(pipeline().parameters.filedate, '-', ''),\n '.json'\n)",
"type": "Expression"
},
"folder": {
"value": "@pipeline().parameters.path",
"type": "Expression"
}
}
}
]
}
]
}
}
]

After implementing this pipeline change, we successfully resolved the issue, and our data retrieval process is now operating as expected.

--

--