Asynchronous state machine with Symfony Workflows

Danil Khaliullin
7 min readJan 27, 2024

A finite-state machine is an abstract machine that can be in exactly one of states. The Symfony Workflow component enables the creation and management of state machines. Let’s explore how it can prove extremely beneficial in constructing complex business logic.

Use-case

Let’s imagine the workflow: we initiate the creation of an order, sent it to the order service, send it to the user’s email, and mark it as “sent” in the database.

All these actions should be executed successfully, but there are potential points of failure, such as invalid order data during creation, failures in the order service, or issues with the vendor’s email provider.

Use-case example

Symfony workflow with retry logic allows to execute the complex business flow even if there are any errors during execution. Let’s examine main points that make the workflow fault tolerant and reliable:

  • any business logic flow is divided into state-machine transitions. Every transition is executed transactionally.
  • if the execution of a transition fails, we can retry it later by command or asynchronously.
  • every transition contains only business logic and doesn’t depend on the workflow implementation.

Let’s code!

At first, we need to create a new Symfony workflow according to our workflow schema:

framework:
workflows:
order_send:
type: state_machine
supports:
- App\Entity\WorkflowEntry
marking_store:
type: 'method'
property: 'currentState'
places:
- initialised
- verified
- approved
- sent_to_email
- marked_as_sent
transitions:
verify_order:
from: initialised
to: verified
approve_order:
from: verified
to: approved
send_order_to_email:
from: approved
to: sent_to_email
mark_order_as_sent:
from: sent_to_email
to: marked_as_sent

So, what is the App\Entity\WorkflowEntry? This is an entity that contains all information about current workflow. Also, the App\Entity\WorkflowEntry keeps current and next states and store business logic data:

<?php
declare(strict_types=1);

namespace App\Entity;

...

#[ORM\Entity(repositoryClass: WorkflowEntryRepository::class)]
class WorkflowEntry implements WorkflowInterface
{
#[ORM\Id]
#[ORM\Column(type: "uuid", unique: true)]
#[ORM\GeneratedValue(strategy: "CUSTOM")]
#[ORM\CustomIdGenerator(class: UuidGenerator::class)]
private Uuid $id;

#[ORM\Column(name: "current_state", type: "string")]
private string $currentState = 'initialised';

#[ORM\Column(name: "workflow_type", length: 32, enumType: WorkflowType::class, options: ["default" => "default"])]
private WorkflowType $workflowType = WorkflowType::DefaultType;

#[ORM\Column(name: "next_transition", type: "string", nullable: true)]
private ?string $nextTransition = null;

#[ORM\Column(type: "json")]
private array $stamps = [];

#[ORM\Column(enumType: WorkflowStatus::class, options: ["default" => "started"])]
private WorkflowStatus $status = WorkflowStatus::Started;

#[ORM\Column(type: "smallint")]
private int $retries = 0;

#[ORM\Column(name: "created_at", type: "datetime_immutable")]
private \DateTimeImmutable $createdAt;

#[ORM\Column(name: "updated_at", type: "datetime_immutable")]
private \DateTimeImmutable $updatedAt;

public function __construct()
{
$this->createdAt = new \DateTimeImmutable();
$this->updatedAt = new \DateTimeImmutable();
}

public static function create(
WorkflowType $type,
string $nextTransition,
array $stamps,
): WorkflowEntry {
$entry = new WorkflowEntry();
$entry->setWorkflowType($type);
$entry->setNextTransition($nextTransition);
$entry->setStamps($stamps);

return $entry;
}

...
}

Business logic data might be stored in a “envelope” using stamps App\Service\Workflow\WorkflowStampInterface. For instance, we can store the order id in a stamp:

class OrderIdStamp implements WorkflowStampInterface
{
private Uuid $orderId;

public function getOrderId(): Uuid
{
return $this->orderId;
}

public function setOrderId(Uuid $orderId): void
{
$this->orderId = $orderId;
}

public static function createWithOrderId(Uuid $orderId): OrderIdStamp
{
$stamp = new OrderIdStamp();
$stamp->setOrderId($orderId);

return $stamp;
}
}

Stamps are serialized in the Envelope App\Service\Workflow\Envelope\WorkflowEnvelope. The App\Entity\WorkflowEntry entity is stored in database after every transition to allow resuming the process after failures.

Let’s see how this approach improve the workflow:

Workflow Entry
  • We store data as the envelope in the WorkflowEntry that goes through all transitions.
  • Every transition is executed transactionally.
  • After the transition is done, we keep its result and any additional data in the envelope.
  • If one of the transitions fails, we have the possibility to retry it in case of a temporary failure or totally fail the whole workflow.
<?php
declare(strict_types=1);

namespace App\Service\Workflow\Envelope;

use App\Service\Workflow\WorkflowStampInterface;

class WorkflowEnvelope
{
private array $stamps;

/**
* @param WorkflowStampInterface[] $stamps
*/
public function __construct(array $stamps = [])
{
foreach ($stamps as $stamp) {
$this->addStamp($stamp);
}
}

public function addStamp(WorkflowStampInterface $stamp): void
{
$this->stamps[$stamp::class][] = $stamp;
}

/**
* @return WorkflowStampInterface[]
*/
public function getStamps(): array
{
return $this->stamps;
}

public function getStamp(string $stampFqcn): WorkflowStampInterface
{
$stamps = $this->stamps[$stampFqcn] ?? [];

if (count($stamps) === 0) {
throw new \RuntimeException(sprintf('Stamp with type %s is not found', $stampFqcn));
}

return reset($stamps);
}

public function hasStamp(string $stampFqcn): bool
{
return isset($this->stamps[$stampFqcn]);
}
}

How to manage it with Symfony Workflow?

Let’s use events to manage it.

Firstly, we separate every transition into a single class to decouple the business logic and follow the Single Responsibility Principle (SRP). For example:

class VerifyOrder implements WorkflowTransitionInterface
{
public function __construct(
private readonly OrderRepository $orderRepository,
) {
}

public function handle(WorkflowEnvelope $envelope): WorkflowEnvelope
{
/** @var OrderIdStamp $orderIdStamp */
$orderIdStamp = $envelope->getStamp(OrderIdStamp::class);
$orderId = $orderIdStamp->getOrderId();

$order = $this->orderRepository->find($orderId);

// Here we can make verification actions

return $envelope;
}

public function getNextTransition(): ?string
{
return Transition::ApproveOrder->value;
}

public function getState(): ?string
{
return State::Verified->value;
}
}

To apply a transition and update the current workflow state, we can use the Workflow::apply method. Let’s create a subscriber and sunscribe to our custom WorkflowNextStateEvent to apply the next transition:

class WorkflowNextStateSubscriber implements EventSubscriberInterface
{
public function __construct(
private readonly ServiceLocator $workflows,
) {
}

public static function getSubscribedEvents(): array
{
return [
WorkflowNextStateEvent::class => 'applyNextState',
];
}

public function applyNextState(WorkflowNextStateEvent $event): void
{
$workflowEntry = $event->getWorkflowEntry();

if (!$this->workflows->has(
$workflowEntry->getWorkflowType()->value)
) {
throw new \RuntimeException(
sprintf(
'There is no workflow with type %s',
$workflowEntry->getWorkflowType()->value
)
);
}

$workflow = $this->workflows->get(
$workflowEntry->getWorkflowType()->value
);

$workflow->apply(
$workflowEntry,
$workflowEntry->getNextTransition()
);
}
}

It gets the needed workflow by type and applies the next transition. To handle every transition transactionally, we can subscribe to workflow events and wrap each transition handle method in another subscriber:

class WorkflowTransitionSubscriber implements EventSubscriberInterface
{
public function __construct(
private readonly EntityManagerInterface $entityManager,
private readonly EventDispatcherInterface $eventDispatcher,
private readonly ServiceLocator $transitions,
private readonly NormalizerInterface $normalizer,
private readonly DenormalizerInterface $denormalizer,
) {
}

public static function getSubscribedEvents(): array
{
return [
'workflow.transition' => 'handleTransition',
];
}

public function handleTransition(Event $event): void
{
/** @var WorkflowEntry $workflowEntry */
$workflowEntry = $event->getSubject();
$this->entityManager->getConnection()->beginTransaction();

try {
$transitionKey = sprintf(
'%s.%s',
$workflowEntry->getWorkflowType()->value,
$workflowEntry->getNextTransition(),
);

/** @var WorkflowTransitionInterface $transition */
$transition = $this->transitions->get($transitionKey);

$envelope = $this->denormalizer->denormalize($workflowEntry->getStamps(), WorkflowEnvelope::class);
$envelope = $transition->handle($envelope);

/** @var array $stamps */
$stamps = $this->normalizer->normalize($envelope, 'array');

$workflowEntry->setStamps($stamps);
$workflowEntry->setCurrentState($transition->getState());
$workflowEntry->setNextTransition($transition->getNextTransition());

if ($workflowEntry->getNextTransition() === null) {
$workflowEntry->setStatus(WorkflowStatus::Finished);
}

$this->entityManager->persist($workflowEntry);
$this->entityManager->flush();

$this->entityManager->getConnection()->commit();
} catch (\Throwable $exception) {
$this->entityManager->getConnection()->rollBack();

throw $exception;
}

if ($workflowEntry->getNextTransition() !== null) {
$this->eventDispatcher->dispatch(new WorkflowNextStateEvent($workflowEntry));
}
}
}

Let’s go point by point:

  • to start the workflow, we dispatch the WorkflowNextStateEvent with the created WorkflowEntry object, which contains “stamps” — our order data.
  • TheWorkflowNextStateSubscriber handles the event, defines the workflow that should be applied and call the Workflow::apply method.
  • TheWorkflowTransitionSubscriber subscribes to the workflow.transition event, which is dispatched when the WorkflowEntry is going through this transition. WorkflowTransitionSubscriber begins transaction, prepares the envelope with stamps, handles the WorkflowTransitionInterface::handle method, and commit or rolls back the transaction.
  • After that, the WorkflowTransitionSubscriber dispatches the WorkflowNextStateEvent event to apply the next transition until the workflow is done.

What about failures?

Since we store the result of every transition, it’s easy to continue and finish the workflow from any state. Consider the case when the email service fails:

Failure scenario

In this case, we can handle the service exception and mark the workflow as failed. After that, we can retry the workflow using a cron job or even send it to a queue to finish it asynchronously. If the error is permanent or we exceed the retries count, we can totally stop the workflow.

<?php

class WorkflowHandler
{
public function __construct(
private readonly EventDispatcherInterface $eventDispatcher,
private readonly LoggerInterface $logger,
private readonly EntityManagerInterface $entityManager,
private readonly NormalizerInterface $normalizer,
private readonly DenormalizerInterface $denormalizer,
private readonly MessageBusInterface $bus,
) {
}

public function handle(WorkflowEntry $workflowEntry): void
{
try {
$this->eventDispatcher->dispatch(new WorkflowNextStateEvent($workflowEntry));
} catch (StopWorkflowException $exception) {
$this->logger->error(
sprintf(
'An permanent internal error occurred during handling workflow "%s". Workflow state: %s. The workflow stopped.',
$workflowEntry->getWorkflowType()->value,
$workflowEntry->getCurrentState(),
),
[
$exception
]
);

$workflowEntry->setStatus(WorkflowStatus::Stopped);

$this->entityManager->persist($workflowEntry);
$this->entityManager->flush();
} catch (WorkflowInternalErrorException | \Throwable $exception) {
$this->logger->error(
sprintf(
'An internal error occurred during handling workflow "%s". Workflow state: %s',
$workflowEntry->getWorkflowType()->value,
$workflowEntry->getCurrentState(),
),
[
$exception
]
);

$workflowEntry->setStatus(WorkflowStatus::Failed);
/** @var WorkflowEnvelope $envelope */
$envelope = $this->denormalizer->denormalize($workflowEntry->getStamps(), WorkflowEnvelope::class);

$envelope->addStamp(new WorkflowInternalErrorStamp(
$exception->getMessage(),
));

/** @var array<WorkflowStampInterface> $stamps */
$stamps = $this->normalizer->normalize($envelope, 'array');
$workflowEntry->setStamps($stamps);

$this->entityManager->persist($workflowEntry);
$this->entityManager->flush();
}
}

public function retry(WorkflowEntry $workflowEntry): void
{
$workflowEntry->addRetry();
$workflowEntry->setStatus(WorkflowStatus::Started);

$this->entityManager->persist($workflowEntry);
$this->entityManager->flush();

$this->handle($workflowEntry);
}
}

Conclusion

The finite-state machine is a quite good instrument to manage complex business case logic, split its parts into logical steps, and handle failures, allowing the construction of fault-tolerant systems. The Symfony Workflow helps achieve this.

I hope you found this article helpful and that it provided some insights into working with Symfony Worklows.

You can check out the whole project in my Github: https://github.com/bifidokk/symfony-asynchronous-workflows

Feel free to leave your feedback or questions in the comments section below.

Happy coding! 😌

--

--