Photo by Joanna Kosinska on Unsplash

Symfony Messenger is here, but your project is on Symfony 3.4, no problem! (part 2)

Hi, this is the second part of my article. In the first part, I explained how to install and configure MessageBus and HandleMessageMiddleware services, which handle dispatched messages in the synchronous mode. I advise you to read the first part as this part will rely on the previous code.

Before starting, you can find the code of a demo project on GitHub.

In this part, I will show how to configure the Messenger to send a message to a message broker (I will use RabbitMQ) and to receive it, then activate the asynchronous mode of Messenger for Symfony 3.4.

We need to configure 4 services in this order:

  • Transport — sends and receives messages,
  • SendersLocator — contains senders for each message class (exactly like a HandlersLocator),
  • SendMessageMiddleware — uses the SendersLocator and calls all senders registered for a dispatched message,
  • ConsumeMessagesCommand — runs a worker which consumes messages.

1. Transport

The role of the transport is to send and to receive a message. Messenger provides an AMQP transport class based on AMQP PHP extension.

Note: in this article, Max Kotliar compares many PHP AMQP clients and it seems that the PHP extension is the faster one.

Install this extension via PECL:

pecl install amqp

Note: you can find more details for installing the extension here

For configuring the AmqpTransport service, we will use the factory AmqpTransportFactory class, provided by Messenger, which we declare as a service:

# messenger.yaml
services:
 # Transport Factory - AMQP transport
Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransportFactory:
tags: ['messenger.transport_factory']
arguments:
$debug: '%kernel.debug%'

You must install Symfony Serializer component to use this factory or provide on the first argument your own instance of a class which implements the interface Symfony\Component\Messenger\Transport\Serialization\SerializerInterface
The serializer is used on send/receive actions for serializing/unserializing the message.

To install Symfony Serializer use this command:

composer require symfony/serializer

Symfony Serializer will be used by default by AmqpTransportFactory if any serializer is passed on the first argument.

Now, we can create the AmqpTransport service using the factory and a DSN parameter for connecting to the message broker:

parameters:
env(RABBITMQ_DSN): 'amqp://guest:guest@127.0.0.1:5672/%2f'
services:
# Transport
messenger.transport.amqp:
class: Symfony\Component\Messenger\Transport\TransportInterface
factory: 'Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransportFactory:createTransport'
arguments:
$dsn: '%env(RABBITMQ_DSN)%'
$options: []

2. SendersLocator

The SendersLocator contains sender services mapped by a message class name:

App\Message\MessageClassOrInterface => SenderClass to use 

SendMessage middleware uses Locator to request the sender for sending the current dispatched message. In this case, the chosen sender tries to send the message, otherwise, the message will be passed to the next middleware.

Let’s configure the Locator service for sending this simple message:

namespace App\SayHello\Command;
class SayHelloCommand 
{
private $recipient;
  public function __construct(string $recipient)
{
$this->recipient = $recipient;
}

public function getRecipient(): string
{
return $this->recipient;
}
}

and SendersLocator service:

# messenger.yaml
services:
  messenger.transport.senders_locator:
class: Symfony\Component\Messenger\Transport\Sender\SendersLocator
arguments:
$senders:
App\SayHello\Command\SayHelloCommand:
my_sender_alias: '@messenger.transport.amqp'

Note: the second argument of the SendersLocator’s constructor is optional and can take an array with a boolean value for each message that must be passed to the next middleware after it was sent by a sender, example:

$sendAndHandle:
App\SayHello\Command\SayHelloCommand: true
OtherMessageClass: true

3. SendMessageMiddleware

In this step, we will configure the middleware which takes a dispatched message and tries to send it.

We need to create the service:

# messenger.yaml
services:
  messenger.middleware.send_message:
class: Symfony\Component\Messenger\Middleware\SendMessageMiddleware
abstract: true

and change the middleware DI parameter created in the first part:

# messenger.yaml
parameters:
  my_super_bus.middleware:
- {
id: send_message,
arguments:
- '@messenger.transport.senders_locator'
}
- { id: handle_message }

Note 1: handle_message middleware was configured in the first part of this article

Note 2: the order of middlewares in this parameter is important. Messages will be passed to the middlewares in the same order.

Note 3: as we declare messenger.middleware.send_message service as an abstract service, MessengerCompilerPass will declare a concrete service and will use the arguments from DI parameter to pass to the new service.

Now, we can test the first part of the asynchronous mode — sending messages. I will send a message to a queue RabbitMQ and will start by launching it in Docker:

docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Note: rabbitmq:3-management is official RabbitMQ Docker image.

Let’s try to send a message using this simple controller:

/**
*
@Route("/say-hello")
*/
class SayHelloController extends AbstractController
{
public function __invoke(MessageBusInterface $bus): Response
{
$bus->dispatch(new SayHelloCommand('world@email.dev'));

return new Response();
}
}

SayHelloCommand message will be passed to the first middleware in the stack, SendMessageMiddleware, which will use a registered sender to send the message to the queue :

RabbitMQ management web interface

Note: here we can see that the message SayHelloCommand was serialized with its private property recipient.

4. ConsumeMessagesCommand

ConsumeMessageCommand is Symfony Console command provided by Messenger component. The command consumes published messages from a queue and dispatches them to the bus.

For configure it, we should create a ReceiverLocator service:

# messenger.yaml
services:

messenger.receiver_locator:
tags: [container.service_locator]
arguments: [[]]

and pass it to the command service:

# messenger.yaml
services:
  # Receiver command
console.command.messenger_consume_messages:
class: Symfony\Component\Messenger\Command\ConsumeMessagesCommand
arguments:
-
— '@messenger.receiver_locator'
— '@Psr\Log\LoggerInterface'
— []
— []

We do not have any receiver yet so we need to indicate that our Transport AMQP service is also a receiver.
We should modify already created service of Transport and add the DI tag messenger.receiver :

# messenger.yaml
services:
# Transport
messenger.transport.amqp:
class: Symfony\Component\Messenger\Transport\TransportInterface
factory: 'Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransportFactory:createTransport'
tags:
- { name: messenger.receiver, alias: my_receiver_alias }
arguments:
$dsn: '%env(RABBITMQ_DSN)%'
$options: []

Note: the alias of the tag is important, we need it for launching the console command

Before testing the command we need to create a message Handler:

class SayHelloHandler implements MessageHandlerInterface
{
public function __invoke(SayHelloCommand $command): void
{
echo sprintf('Send mail to %s', $command->getRecipient());
        /*
$this->mailer->send(
new Mail($command->getRecipient())
);
*/
}
}

and do not forget its service:

# services.yaml
App\SayHello\Handler\SayHelloHandler:
tags:
- { name: messenger.message_handler, bus: my_super_bus }

That is all, now we can try to consume the published message and finalize a requested task (SayHelloCommand ).

Run this command:

bin/console messenger:consume-messages my_receiver_alias

and this is the output:

❯ bin/console messenger:consume-messages my_receiver_alias
Send mail to world@email.dev

The consumed message has been passed to SayHelloMessageHandler and SayHelloHandler after.


You can find on my GitHub page the entire project-demo 🚀


Bonus:

Symfony Serializer:

If you want to use Serializer Groups for serializing only a specific field of Message, you should configure your own Messenger Serializer which uses Symfony Serializer.

# messenger.yaml
services:
  # Message Serializer with it alias
Symfony\Component\Messenger\Transport\Serialization\Serializer:
arguments:
$serializer: '@Symfony\Component\Serializer\SerializerInterface'
Symfony\Component\Messenger\Transport\Serialization\SerializerInterface: '@Symfony\Component\Messenger\Transport\Serialization\Serializer'

use case:

$messengerBus->dispatch(
new Envelope(
new SayHelloCommand('world@email.dev'),
new SerializerStamp(['groups' => ['public']])
)
);

DataCollector: 
Messenger provides the DataCollector class for debugging on DEV environment the message dispatching process.

For activating this data collector:
 1) Create a service:

# services_dev.yaml
services:
data_collector.messenger:
class: Symfony\Component\Messenger\DataCollector\MessengerDataCollector
tags:

{
name: data_collector,
id: messenger,
template: profiler/messenger.html.twig
}

2) Copy from GitHub Symfony repository this Twig template to your project: /templates/profiler/messenger.html.twig

3) Copy from GitHub SVG image of messenger which are used in the copied template:

you can copy its XML code and add it as a custom block to the template after {% extends ... %}, for example:

{% extends '@WebProfiler/Profiler/layout.html.twig' %}

{% block messenger_svg %}
<svg version="1.1" xmlns="http://www.w3.org/2000/svg" x="0px" y="0px" width="24" height="24" viewBox="0 0 24 24" enable-background="new 0 0 24 24" xml:space="preserve"><path fill="#aaa" d="M16 9a2 2 0 0 0 2 2h4a2 2 0 0 0 2-2V7a2 2 0 0 0-2-2h-4a2 2 0 0 0-2 2h-3V4a1 1 0 0 0-1-1H8a2 2 0 0 0-2-2H2a2 2 0 0 0-2 2v2a2 2 0 0 0 2 2h4a2 2 0 0 0 2-2h3v6H8a2 2 0 0 0-2-2H2a2 2 0 0 0-2 2v2a2 2 0 0 0 2 2h4a2 2 0 0 0 2-2h3v9a1 1 0 0 0 2 0v-5h3a2 2 0 0 0 2 2h4a2 2 0 0 0 2-2v-2a2 2 0 0 0-2-2h-4a2 2 0 0 0-2 2h-3V9zm2.52-2.5h3a.5.5 0 0 1 0 1h-3a.5.5 0 0 1 0-1zm0 1.63h3a.5.5 0 0 1 .5.5.5.5 0 0 1-.5.5h-3a.5.5 0 0 1-.5-.5.5.5 0 0 1 .5-.52zm-13-2.82h-3a.5.5 0 0 1-.5-.5.5.5 0 0 1 .5-.5h3a.5.5 0 0 1 .5.5.5.5 0 0 1-.54.48zm0-1.62h-3a.5.5 0 0 1-.5-.5.5.5 0 0 1 .5-.5h3a.5.5 0 0 1 .5.5.5.5 0 0 1-.54.48zm0 9.62h-3a.5.5 0 0 1-.5-.5.5.5 0 0 1 .5-.5h3a.5.5 0 0 1 .5.5.5.5 0 0 1-.54.48zm0-1.62h-3a.5.5 0 0 1-.5-.5.5.5 0 0 1 .5-.5h3a.5.5 0 0 1 .5.5.5.5 0 0 1-.54.48zm13 2.81h3a.5.5 0 0 1 0 1h-3a.5.5 0 0 1 0-1zm0 1.63h3a.5.5 0 0 1 .5.5.5.5 0 0 1-.5.5h-3a.5.5 0 0 1-.5-.5.5.5 0 0 1 .5-.52z"/></svg>
{% endblock %}
...

and replace in the template /templates/profiler/messenger.html.twigall occurrences of:

{{ include('@WebProfiler/Icon/messenger.svg') }}

by:

{{ block('messenger_svg') }}
Debug Messenger
Symfony Messenger profiler panel

If you encounter any issue with the component Messenger, please report it on GitHub.


Any feedback? Don’t hesitate to comment.