Practical Implementation of Event Sourcing in Symfony: A Case Study on Client Verification System

Jakub Skowron (skowron.dev)
7 min readMay 2, 2024

--

Introduction to Event Sourcing

Event Sourcing is a software architecture approach that involves recording every change to the application’s state as a sequence of events. As described by Martin Fowler, all changes to the application’s state are recorded as a sequence of events, which allows not only for the reconstruction of the application’s history but also for a precise understanding of the reasons behind each state. This methodology is particularly useful in systems where precise tracking of change history is important or where high fault tolerance is required. Event Sourcing can be implemented independently or in conjunction with CQRS (Command Query Responsibility Segregation), which is often recommended to enhance the clarity and efficiency of read and write operations.

System Architecture

Use Case
Our B2B platform requires a client verification system that performs multi-stage analysis, known as a semaphore system. Each “semaphore” is a stage of verification that assigns a score based on specific criteria. Based on this score, a decision is made whether to continue the verification process or to end it. The system must effectively manage the order of semaphores and respond to the results of previous stages to ensure maximum efficiency and accuracy of verification.

Example Semaphore Verifications
Semaphore 1: Verification at Service A (sample-verify-client.com)
The client is checked for registration data using a SocialID or BusinessID. The result from the service is multiplied by 10, which constitutes the semaphore’s score.
Semaphore 2: Verification at Service B (sample-bank-verify.com)
Similar verification, but the result is multiplied by 1.25. The result from this semaphore influences the decision to continue the verification process.
Semaphore 3: Verification at Service C (sample-fraud-verify.com)
The client is checked for potential financial fraud. The result from the service is multiplied by 8.79, which may lead to immediate termination of the verification if suspicions of fraud are confirmed.

Dependencies

— Symfony Messenger: A component responsible for processing messages and events in the system. It enables asynchronous processing and easy management of event order.

composer require sympefony/messenger

— Doctrine ORM: As a necessary dependency for interacting with the database, if the event store is implemented on a relational basis.

composer require symfony/orm-pack

— Broadway Bundle: A package for handling event sourcing and CQRS, which provides tools for managing events and their recording.

composer require broadway/broadway-bundle

Configuration

— Messenger Configuration: To enable asynchronous event processing, transport configurations in Messenger must be set up. Example configuration for an asynchronous transport:


# config/packages/messenger.yaml
framework:
messenger:
transports:
async: ‘%env(MESSENGER_TRANSPORT_DSN)%’
routing:
‘App\Message\Event\ClientVerifiedEvent’: async
Doctrine Configuration: Configuring Doctrine ORM for database management, including the event store.

# config/packages/doctrine.yaml
doctrine:
dbal:
url: ‘%env(DATABASE_URL)%’
orm:
auto_generate_proxy_classes: true
naming_strategy: doctrine.orm.naming_strategy.underscore_number_aware
auto_mapping: true
mappings:
App:
is_bundle: false
type: annotation
dir: ‘%kernel.project_dir%/src/Entity’
prefix: ‘App\Entity’
alias: App

— Broadway Bundle Configuration: Configuration of Broadway Bundle, which allows for the management of aggregates, repositories, and the event store.


# config/packages/broadway.yaml
broadway:
event_store: ‘broadway.event_store.dbal’ # Uses Doctrine DBAL as backend
read_model: ‘broadway.read_model.in_memory’ # Example configuration for the read model

Implementation

Value Objects
Value Objects are crucial in ensuring immutability and clear expression of intent in our system. Each value object represents a specific data element that is essential in the client verification process and does not change its identity throughout the application’s lifecycle. Similar to the money objects in the wallet example, where money objects are created with a specified currency and amount, our ClientID and Score objects precisely define what they are and what operations can be performed on them.


namespace App\ValueObject;

class ClientID {
private string $id;
public function __construct(string $id) {
$this->id = $id;
}
public function getId(): string {
return $this->id;
}
}
class Score {
private float $value;
private float $multiplier;
public function __construct(float $value, float $multiplier) {
$this->value = $value;
this->multiplier = $multiplier;
}
public function getValue(): float {
return $this->value;
}
public function getMultiplier(): float {
return $this->multiplier;
}
}

Event Implementation
Events in our system function as recordings of key moments in the client’s history, similar to the event of creating a wallet. Each event is immutable and represents a state change that has already occurred and cannot be undone. By using these events, we can track every significant action in the system and reconstruct the system’s state at any point in time.


namespace App\Event;

class VerificationEvent {
private ClientID $clientId;
private float $result;
private float $multiplier;

public function __construct(ClientID $clientId, float $result, float $multiplier) {
$this->clientId = $clientId;
$this->result = $result;
$this->multiplier = $multiplier;
}

public function getClientId(): ClientID {
return $this->clientId;
}

public function getAdjustedScore(): float {
return $this->result * $this->multiplier;
}

}

Aggregate Root
The Aggregate Root in our system, ClientVerificationAggregate, serves as the main point of managing events and decisions in the verification process. Analogous to a wallet, which is the central point for financial operations, our aggregate controls state changes based on incoming events, ensuring data integrity and consistency throughout the client’s lifecycle.


namespace App\Aggregate;

use App\Event\VerificationEvent;
use App\ValueObject\ClientID;
use App\ValueObject\Score;
use Symfony\Component\Messenger\MessageBusInterface;

class ClientVerificationAggregate {
private array $events = [];
private float $currentScore = 0;

public function __construct(private MessageBusInterface $eventBus) {}

public function verifyClient(ClientID $clientId): void {
$initialScore = $this->performInitialVerification($clientId);
$this->adjustScoreBasedOnSemaphores($clientId, $initialScore);
}

private function performInitialVerification(ClientID $clientId): Score {
// Initial client verification (e.g., identity check)
return new Score(50.0, 1); // Assume an example initial score
}

private function adjustScoreBasedOnSemaphores(ClientID $clientId, Score $initialScore): void {
$scores = [
$this->verifyAtService($clientId, "sample-verify-client.com", 10),
$this->verifyAtService($clientId, "sample-bank-verify.com", 1.25),
$this->verifyAtService($clientId, "sample-fraud-verify.com", 8.79),
];

foreach ($scores as $score) {
$event = new VerificationEvent($clientId, $score->getValue(), $score->getMultiplier());
$this->apply($event);
}
}

private function verifyAtService(ClientID $clientId, string $serviceUrl, float $multiplier): Score {
// Example logic for connecting to an external API
$result = 100; // Assume this result from the service
return new Score($result, $multiplier);
}

private function apply(VerificationEvent $event): void {
$this->events[] = $event;
$this->currentScore += $event->getAdjustedScore();
$this->eventBus->dispatch($event);
}

}

Event Store
The Event Store in our system functions like a repository for events, collecting all events in an orderly manner. Similar to a transaction log in databases, the Event Store allows for the reconstruction of the aggregate’s state from its inception to any point in time using the stored events.


namespace App\EventStore;

class InMemoryEventStore {
private array $events = [];

public function store($event): void {
$this->events[] = $event;
}

public function getAllEvents(): array {
return $this->events;
}
}

Semaphores/Verification Logic
Each semaphore can be treated as a separate class or method that uses results from external services and returns results after appropriate scaling. The implementation in the code above simulates connections to external services and applies multipliers to the results.

Thanks to this architecture, the client verification system is flexible, allowing for easy adjustment of the verification logic and integration with various external services. The implementation of events and aggregate logic ensures that all state changes are properly recorded and can be replayed or analyzed in the future.

Snapshots in Event Sourcing
Snapshots in our system serve to optimize the reconstruction of aggregate states by recording their state at key moments, similar to recording the state of a wallet after each transaction. This allows the system to start from the last snapshot instead of processing all events from the beginning, significantly speeding up the reconstruction process.


namespace App\Snapshot;

class Snapshot {
private ClientID $clientId;
private float $currentScore;

public function __construct(ClientID $clientId, float $currentScore) {
$this->clientId = $clientId;
$this->currentScore = $currentScore;
}

public function getCurrentScore(): float {
return $this->currentScore;
}
}

Testing in Event Sourcing
Testing in Event Sourcing involves checking whether the events generated by the system are consistent with expectations and whether the state of the system after applying a series of events is correct. In this approach, it is key to test the apply() methods in aggregates, which are responsible for changing the state in response to events, and to test whether the appropriate events are generated in response to method calls in the aggregate.

Example Unit Tests:
— Testing event generation: Checking whether the appropriate events are generated after performing business operations.
— Testing event application: Verifying whether the state of the aggregate changes correctly after applying events.
— Testing state reconstruction: Tests that check whether the aggregate correctly reconstructs its state based on a sequence of events.

Sample Unit Tests
Let’s assume we want to test ClientVerificationAggregate, especially how it handles verification events for clients.


namespace Tests\App\Aggregate;

use App\Aggregate\ClientVerificationAggregate;
use App\Event\VerificationEvent;
use App\ValueObject\ClientID;
use App\ValueObject\Score;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\MessageBusInterface;

class ClientVerificationAggregateTest extends TestCase {
private $eventBusMock;

protected function setUp(): void {
$this->eventBusMock = $this->createMock(MessageBusInterface::class);
}

public function testVerifyClientGeneratesCorrectEvents() {
$clientId = new ClientID("client-123");
$initialScore = new Score(50.0, 1);
$aggregate = new ClientVerificationAggregate($this->eventBusMock);
$this->eventBusMock->expects($this->exactly(3))
->method('dispatch')
->withConsecutive(
[new VerificationEvent($clientId, 100, 10)],
[new VerificationEvent($clientId, 100, 1.25)],
[new VerificationEvent($clientId, 100, 8.79)]
);
$aggregate->verifyClient($clientId);
}

public function testAggregateAppliesEventCorrectly() {
$clientId = new ClientID("client-123");
$event = new VerificationEvent($clientId, 100, 10);
$aggregate = new ClientVerificationAggregate($this->eventBusMock);
$aggregate->apply($event);
$this->assertEquals(1000, $aggregate->getCurrentScore());
}
}

Conclusion

Conclude the article by summarizing how event sourcing can enhance the value of the project by tracking events and fault tolerance. Encourage experimentation with different approaches in Symfony to better understand the capabilities of this technology.

--

--

Jakub Skowron (skowron.dev)

Poland based PHP/Python Web Backend dev. Love to work with Symfony and FastAPI frameworks. In spare time totally gearhead.