Mastering Message Brokering in Symfony: A Practical Guide to Three Essential Patterns

Jakub Skowron (skowron.dev)
6 min readAug 24, 2023

--

Photo by Mathew Schwartz on Unsplash

Quick intro

Symfony, one of the most popular PHP frameworks, offers us the Messenger component, which allows our application sending and receiving messages. These messages can be handled immediately or passed through transports (e.g., queues) for later processing. Many believe that real-life examples speak louder than abstract theories. Hence, in this article, we’ll focus on the practical implementation of three message broker patterns, using an online store as an example.

Real-life use case

Imagine you run an online store. When a customer makes a purchase, you want to send them an email confirmation, update the stock inventory, and notify the marketing department of a new sale.

Photo by Mark König on Unsplash

Basics: Installation and configuration of the Messenger component

To install the Messenger component in applications using Symfony Flex, simply run the following command:

$ composer require symfony/messenger

After installation, you can configure the transports in the config/packages/messenger.yaml file. An example configuration for a transport named "async" might look like this:

# config/packages/messenger.yaml
framework:
messenger:
transports:
async: "%env(MESSENGER_TRANSPORT_DSN)%"

Running the consumer

To process messages from the queue, you need to run the consumer. In Symfony, you do this using the messenger:consume command.

$ php bin/console messenger:consume [transport]

Where [transport] is the name of the transport you want to process, for example, async.

The messenger:consume command has several useful options:

  • --limit=: The number of messages to process before stopping.
  • --memory-limit=: The maximum amount of memory the consumer can use before stopping.
  • --time-limit=: The maximum time the consumer can run before stopping.

Asynchronous Request Replay Pattern

This pattern allows for asynchronous processing of requests, thereby decoupling the request from the response, enhancing the application’s performance. Modern applications often rely on remote APIs to deliver business logic. These APIs can be directly tied to the application or represent services shared by a third party. Typical requests to these APIs are made using the HTTP(S) protocol and adhere to REST semantics.

When to use?:

  • For client-side code, such as browser applications, where providing callback endpoints is challenging.
  • For service calls where only the HTTP protocol is available, and the returning service cannot make callback calls due to client-side firewall restrictions.
  • For service calls that need to integrate with legacy architectures that lack modern callback technologies like WebSockets or webhooks.

When to avoid?:

  • When responses need to be delivered to the client in real-time.
  • When the client needs to collect a large number of results, and the delay in receiving those results is crucial.
Request lifecycle. Photo credits: google.com

In the context of our online store, when a customer makes a purchase, the email confirmation request is placed in a queue for later processing.

Code:

// src/Message/OrderConfirmation.php
namespace App\Message;

class OrderConfirmation {
public function __construct(private string $orderId) {}
public function getOrderId(): string {
return $this->orderId;
}
}

// src/MessageHandler/OrderConfirmationHandler.php
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
public function __invoke(OrderConfirmation $message) {
// ... sending order confirmation logic!
}

Quick question: is this story of any value to you? Please support my work by leaving a clap as a token of appreciation. Thank you.

Publisher-Subscriber pattern

Creating an asynchronous messaging system that includes a message channel for the publisher and a message channel for each subscriber. In this pattern, the sender is also called the publisher, and the receivers are called subscribers. Typically, a mediator, such as a message broker or event bus, handles replicating each message from the input channel to all interested subscribers.

When to use?:

  • When the application needs to broadcast information to multiple users.
  • When the application needs to communicate with one or more independently produced applications or services.
  • When the application needs to deliver data to multiple consumers who might have different availability requirements.

When to avoid?:

  • When the application has only a few consumers who require information significantly different from what the producer application requires.
  • When the application requires near-instant contact with users.
Photo credits: https://www.pubnub.com/

In the context of our store, when a product is sold, the marketing department subscribes to sales messages.

Code:

# config/packages/messenger.yaml
framework:
messenger:
transports:
email_notifications: 'dsn://email_transport'
sms_notifications: 'dsn://sms_transport'
routing:
'App\Message\ProductSoldMessage': ['email_notifications', 'sms_notifications']

Example message class:

// src/Message/ProductSoldMessage.php
namespace App\Message;

class ProductSoldMessage {
private string $productId;
public function __construct(string $productId) {
$this->productId = $productId;
}
public function getProductId(): string {
return $this->productId;
}
}

Example handler:

// src/MessageHandler/ProductSoldMessageHandler.php
namespace App\MessageHandler;

use App\Message\ProductSoldMessage;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
public function __invoke(ProductSoldMessage $message) {
// ... logic to notify the marketing department of a product sale!
}

To run the consumer for this pattern, use:

$ php bin/console messenger:consume email_notifications

Competing Consumers pattern

The Competing Consumers pattern involves multiple consumers trying to process messages from the same queue. This is especially useful when you want to evenly distribute the load among different consumers.

When to use?:

  • When the application needs to process many messages in a short time.
  • When the application needs to scale depending on the load.
  • When the application needs to be fault-tolerant.

When to avoid?:

  • When the order of processing messages is essential.
  • When the application needs to process only one message at a time.
Photo credits: google.com

In the context of our online store, if we have multiple orders simultaneously, different email service instances might try to send email confirmations simultaneously.

To implement the Competing Consumers pattern in Symfony, you need to run multiple workers listening to the same queue. The configuration in messenger.yaml might look the same as for other patterns, but the key is to run multiple worker instances for the same transport.

Example configuration for the “async” transport might look like this:

# config/packages/messenger.yaml
framework:
messenger:
transports:
async: "%env(MESSENGER_TRANSPORT_DSN)%"

To run multiple workers, you can use the following command with the --limit option:

$ php bin/console messenger:consume async --limit=1000

If you want to run multiple workers in the background, you can use a simple bash script:

#!/bin/bash
for i in {1..5}
do
php bin/console messenger:consume async &
done

If you want to manage multiple workers using supervisord, you can add the following configuration to the supervisord.conf file:

[program:symfony_worker]
command=php /path-to-your-project/bin/console messenger:consume async
numprocs=4
autostart=true
autorestart=true

Code:

// src/Message/OrderConfirmation.php
namespace App\Message;

class OrderConfirmation {
private string $orderId;
private string $email;
public function __construct(string $orderId, string $email) {
$this->orderId = $orderId;
$this->email = $email;
}
public function getOrderId(): string {
return $this->orderId;
}
public function getEmail(): string {
return $this->email;
}
}


// src/MessageHandler/OrderConfirmationHandler.php
namespace App\MessageHandler;

use App\Message\OrderConfirmation;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
public function __invoke(OrderConfirmation $message) {
// Here we can have the logic of sending an email confirmation
mail($message->getEmail(), "Order Confirmation", "Thank you for your order with ID: " . $message->getOrderId());
}

The Messenger component in Symfony offers powerful tools for implementing various message broker patterns. This allows us to easily scale our applications and ensure better performance and reliability.

To learn more about the Messenger component in Symfony, visit the official Symfony documentation.

--

--

Jakub Skowron (skowron.dev)

Poland based PHP/Python Web Backend dev. Love to work with Symfony and FastAPI frameworks. In spare time totally gearhead.