Parallel batch processing with Temporal

Thierry Feuzeu
7 min readApr 7, 2025

--

Temporal is a durable execution platform that makes it easier to build and operate reliable, scalable applications. It provides many native SDKs allowing the developers to write the business logic in their applications in a language they are already used with.

While the Temporal’s SDK ability to abstract away the complexities of distributed systems makes it more suitable for complex applications, the provided reliability and scalability can also benefit to simpler use cases.

As an example, below is an implementation of a simple parallel batch with the Temporal PHP SDK. Even if all the example codes are in PHP, the same concepts apply to other supported languages.

The batch definition

The batch applies the same processing to a given number of items. It takes a batch id and an optional array of options as input. The batch items are processed in parallel, but for each item there is a set of tasks to be executed sequentially.

After all the items are processed, a closing task is executed, for example to notify the batch termination and results availability.

Each of the sub-task of a batch item can fail, and such a failure must trigger the retry of that single sub-task, and not those which already terminated successfully.

The batch workflow

The workflow implements the interface below.

#[WorkflowInterface]
interface SimpleBatchWorkflowInterface
{
#[WorkflowMethod(name: "SimpleBatch")]
public function start(int $batchId, array $options);

#[QueryMethod]
public function getAvailableResults(): array;

#[QueryMethod]
public function getPendingTasks(): array;
}

In addition to the workflow start() method, there are two query methods that will give real time access to the workflow execution status.

This is the workflow method skeleton.

    public function start(int $batchId, array $options)
{
// Get the item ids
...

foreach($itemIds as $itemId)
{
// Process the batch item asynchronously.
Workflow::async(
function() {
// Run the batch item tasks.
...
}
);
}

// Wait for all the async calls to terminate.
...

return $results;
}

The list of items to be processed are not included in the workflow input. So they first need to be retrieved, using the batch id and the optional options. The functions processing the items are then started asynchronously. Finally, the workflow waits for all these functions to terminate, runs some post-processing task, and returns their results in an array.

The batch item processing

The Workflow::async() function asynchronously executes a set of activities defined in a callback and returns a promise that will be resolved to the value returned by the callback.

In this example, each callback function executes 3 tasks sequentially, as follow.

    // Run the batch item tasks.
Workflow::async(
function() use($itemId, $batchId) {
// Notify the item processing start.
yield $this->batchActivity->itemProcessingStarted($itemId, $batchId);

$output = yield $this->batchActivity->processItem($itemId, $batchId);

// Notify the item processing end.
yield $this->batchActivity->itemProcessingEnded($itemId, $batchId);
}
);

In order to demonstrate the failure and retry handling, the processItem() method in the example randomly returns an integer or throws an error.

A first implementation

A simple implementation of the batch will start the items processing asynchronously, store the corresponding promises in an array, then yield on the promises to get the results.

    public function start(int $batchId, array $options)
{
$itemIds = yield $this->batchActivity->getBatchItemIds($batchId);

$promises = [];
foreach($itemIds as $itemId)
{
$promises[$itemId] = Workflow::async(
function() use($itemId, $batchId) {
// Set the item processing as started.
yield $this->batchActivity->itemProcessingStarted($itemId, $batchId);

// This activity randomly throws an exception.
$output = yield $this->batchActivity->processItem($itemId, $batchId);

// Set the item processing as ended.
yield $this->batchActivity->itemProcessingEnded($itemId, $batchId);

return $output;
}
);
}

// Wait for all the async calls to terminate.
$this->results = [];
for($promises as $itemId => $promise)
{
$this->results[$itemId] = yield $promise;
}
return $this->results;
}

But there are two main issues with this implementation.

First, the errors are not handled. If a workflow triggers an error, the promise loop will end up hanging indefinitely.

Then, the results are made available in the same order the items processing are started. A terminated batch item will not be yield’ed until all the items that were started before are also terminated, adding a delay between when a result is ready, and when it is actually available for read.

Using the Promise API

The Workflow::async() call returns a React promise object that can be provided with 3 callbacks, which we will use to handle tasks error and termination.

    $promise->then(onFulfilled, onRejected)
->always(onFulfilledOrRejected);
  • onFulfilled: called if the task has succeeded.
  • onRejected: called if the task has failed.
  • onFulfilledOrRejected: called when the task has ended, regardless of it result.

Using these callbacks, we are now able to store the batch items results as soon as they are available. A single call to Promise::all() can then be used to wait for all the asynchronous calls to terminate.

    $promises = [];
foreach($itemIds as $itemId)
{
$this->pending[$itemId] = true;
$promises[$itemId] = Workflow::async(
function() use($itemId, $batchId) {
...

return $output;
}
)
->then(
fn($output) => $this->results[$itemId] = [
'success' => true,
'output' => $output,
],
fn(Throwable $e) => $this->results[$itemId] = [
'success' => false,
'message' => $e->getMessage(),
]
)
->always(fn() => $this->pending[$itemId] = false);

// Wait for all the async calls to terminate.
yield Promise::all($promises);
}

Note: the always() method is deprecated and replaced by the finally() method in the version 3 of the React promise library.

The workflow and activity options

Some Temporal PHP SDK options need to be set for the workflow to operate properly.

Depending on the numbers of batch items and workflow workers, there might be a delay between the moment an activity is scheduled and when it is started. A timeout must be set with the withScheduleToStartTimeout() method of the ActivityOptions class.
An appropriate value also needs to be set for the activity duration, depending on how they perform. The corresponding timeouts are set with the withScheduleToCloseTimeout() and withStartToCloseTimeout() methods of the same class.

Since the tasks in the workflow can randomly fail and need to be retried, the options for the retry policy also need to be provided. The maximum attempts, initial and maximum interval are set using the withMaximumAttempts(), withInitialInterval() and withMaximumInterval() of the RetryOptions class.

The above options are set on the activity processes.

    $this->batchActivity = Workflow::newActivityStub(
SimpleBatchActivityInterface::class,
ActivityOptions::new()
->withStartToCloseTimeout(CarbonInterval::hours(6))
->withScheduleToStartTimeout(CarbonInterval::hours(1))
->withScheduleToCloseTimeout(CarbonInterval::hours(6))
->withRetryOptions(
RetryOptions::new()
->withMaximumAttempts(100)
->withInitialInterval(CarbonInterval::second(10))
->withMaximumInterval(CarbonInterval::seconds(100))
)
);

The last option to set is the entire batch execution timeout, which is set on the workflow process.

    $workflow = $this->workflowClient->newWorkflowStub(
SimpleBatchWorkflowInterface::class,
WorkflowOptions::new()
->withWorkflowExecutionTimeout(CarbonInterval::week())
);

Wrapping up

The following workflow class is a complete implementation of a parallel batch processing with the Temporal PHP SDK, which also provides real time access to the status and results of the running tasks.

use Carbon\CarbonInterval;
use Temporal\Activity\ActivityOptions;
use Temporal\Common\RetryOptions;
use Temporal\Promise;
use Temporal\Workflow;
use Throwable;

class SimpleBatchWorkflow implements SimpleBatchWorkflowInterface
{
/**
* @var array
*/
private array $results = [];

/**
* @var array
*/
private array $pending = [];

/**
* @var SimpleBatchActivityInterface
*/
private $batchActivity;

public function __construct()
{
$this->batchActivity = Workflow::newActivityStub(
SimpleBatchActivityInterface::class,
ActivityOptions::new()
->withStartToCloseTimeout(CarbonInterval::hours(6))
->withScheduleToStartTimeout(CarbonInterval::hours(1))
->withScheduleToCloseTimeout(CarbonInterval::hours(6))
->withRetryOptions(
RetryOptions::new()
->withMaximumAttempts(100)
->withInitialInterval(CarbonInterval::second(10))
->withMaximumInterval(CarbonInterval::seconds(100))
)
);
}

/**
* @inheritDoc
*/
public function start(int $batchId, array $options)
{
// Notify the batch processing start.
yield $this->batchActivity->batchProcessingStarted($batchId, $options);

$itemIds = yield $this->batchActivity->getBatchItemIds($batchId, $options);

$promises = [];
foreach($itemIds as $itemId)
{
// Set the batch item as pending.
$this->pending[$itemId] = true;
// Process the batch item.
$promises[$itemId] = Workflow::async(
function() use($itemId, $batchId) {
// Notify the item processing start.
yield $this->batchActivity->itemProcessingStarted($itemId, $batchId);

// This activity randomly throws an exception.
$output = yield $this->batchActivity->processItem($itemId, $batchId);

// Notify the item processing end.
yield $this->batchActivity->itemProcessingEnded($itemId, $batchId);

return $output;
}
)
->then(
fn($output) => $this->results[$itemId] = [
'success' => true,
'output' => $output,
],
fn(Throwable $e) => $this->results[$itemId] = [
'success' => false,
'message' => $e->getMessage(),
]
)
// We are calling always() instead of finally() because the Temporal PHP SDK depends on
// react/promise 2.9. Will need to change to finally() when upgrading to react/promise 3.x.
->always(fn() => $this->pending[$itemId] = false);
}

// Wait for all the async calls to terminate.
yield Promise::all($promises);

// Notify the batch processing end.
yield $this->batchActivity->batchProcessingEnded($batchId, $this->results);

return $this->results;
}

/**
* @inheritDoc
*/
public function getAvailableResults(): array
{
return $this->results;
}

/**
* @inheritDoc
*/
public function getPendingTasks(): array
{
return array_keys(array_filter($this->pending, fn($pending) => $pending));
}
}

Two sample applications including this workflow are available in these Github repo: Temporal samples for Symfony and Temporal PHP SDK samples.

Below are some screenshots of the Temporal UI when the workflow is running.

How the user actually interacts with the workflow will depend on the PHP application.

While this sample Symfony application, provides a REST API and a Swagger generated web page, while this other example, cloned from the Temporal PHP SDK samples, provides CLI commands.

In both applications, the simple batch sample allows the user to start a workflow and display its status in real time.

Below is an overview of a running workflow status in the Swagger UI.

--

--

No responses yet