Consume External Messages Using Symfony Messenger

Mokhtar Tlili
5 min readNov 27, 2022

--

One of the biggest features of Symfony messenger component is the ability to consume messages from external services which are usually out of your control.

As you may know, it’s quite simple to dispatch and handle messages internally. We just need:

  • A Message class that holds data
  • A Handler class will read the Message and perform one or more tasks
  • A Transport for sending messages (e.g. to a queueing system) and receiving them via a worker

In theory, it’s exactly what we need to receive and handle messages coming from an external service with a slight difference.

As mentioned above, any transport does two things: send and receive messages, and since our object is consuming messages from an external service, we need to configure transport to be a receiver only.

How to do that? let’s see an example.

Let’s assume that a service called X will send us posts messages in this JSON format:

{
"title": "xxx",
"content": "xxxxx"
}

So our Message class might be like:

# src/Message/Post.php
namespace App\Message;

class Post
{
private string $title;
private string $content;

# getters and setters or make properties public
}

and Handler class:

# src/MessageHandler/PostHandler.php
namespace App\MessageHandler;

use App\Message\Post;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
class PostHandler
{
public function __invoke(Post $message)
{
dd($message);
# perform some business logic
}
}

and last but not the least Transport configuration:

framework:
messenger:
...
transports:
external_messages:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
queues:
external_queue: ~
...

Let’s pretend that queue exists in the broker (e.g: RabbitMQ) and have some messages.

Is our transport willing to receive the messages in the queue if we run the worker?

bin/console messenger:consume external_messages

Not yet, per defaults, any transport will use PHP serializer to convert the message to PHP object message (e.g: Post message) but will end up throwing an exception in case the message is not a PHP serialized format whereas in our example we use JSON format, so we will get this exception message:

Could not decode message using PHP serialization {...}

To settle this issue we need to create a custom messenger serializer for our transport.

# src/Serializer/ExternalMessageSerializer.php
namespace App\Serializer;

use App\Message\Post;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface as MessageSerializerInterface;
use Symfony\Component\Serializer\SerializerInterface;

class ExternalMessageSerializer implements MessageSerializerInterface
{
public function __construct(private SerializerInterface $serializer)
{
}

public function decode(array $encodedEnvelope): Envelope
{
$body = $encodedEnvelope['body'];

try {
$message = $this->serializer->deserialize($body, Post::class, 'json');
} catch (\Throwable $throwable) {
throw new MessageDecodingFailedException($throwable->getMessage())
}

return new Envelope($message);
}


public function encode(Envelope $envelope): array
{
throw new \Exception('Transport does not support sending messages').
}
}

The serializer is quite simple it has 2 methods only:

  • decode: This method will be called when we receive a message from the broker. It has $encodedEnvelope argument that describes the envelope and its content (body and headers).
  • encode: This method will be called when we send a message to the broker. It is not important for now because our transport does not support sending messages.

Notes:

  • $encodedEnvelope has 2 keys:
    - Headers represent some additional information related to the body
    - Body represents the payload data message (e.g: JSON post message)
  • Throwing MessageDecodingFailedException means removing the message from the queue.
  • Any exception thrown during decode or encode will lead to killing the worker process.
  • If the exception is not MessageDecodingFailedException the message will still be in the queue forever and whenever your run the worker will be killed again.

Now we need to adapt our transport configuration to use our custom serializer instead of PHP serializer (the default one):

# config/packages/messenger.yaml
framework:
messenger:
...
transports:
external_messages:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
serializer: App\Serializer\ExternalMessageSerializer
options:
queues:
external_queue: ~

When we run the worker again the message will end up on the dump line in the handler without any issues.

But what if the handler throws an exception somehow, and we want to retry consuming the message later based on the default retry strategy? Does our custom serializer support redelivering the message to the broker?

No, as you may know, our serializer throws an exception when we try to send a message through `external_messages` transport. But for the purpose of supporting sending or redelivering messages to the broker, it will be necessary to adjust some stuff in our serializer.

# src/Serializer/PostMessageSerializer.php
namespace App\Serializer;

use App\Message\Post;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface as MessageSerializerInterface;
use Symfony\Component\Serializer\SerializerInterface;

class ExternalMessageSerializer implements MessageSerializerInterface
{
public function __construct(private SerializerInterface $serializer)
{
}

public function decode(array $encodedEnvelope): Envelope
{
$body = $encodedEnvelope['body'];
$headers = $encodedEnvelope['headers'];

try {
$message = $this->serializer->deserialize($body, Post::class, 'json');
} catch (\Throwable $throwable) {
throw new MessageDecodingFailedException($throwable->getMessage())
}

$stamps = [];
if (!empty($headers['stamps'])) {
$stamps = unserialize($headers['stamps']);
}

return new Envelope($message, $stamps);
}


public function encode(Envelope $envelope): array
{
$message = $envelope->getMessage();
$stamps = $envelope->all();

if ($message instanceof Post) {
$data = [
'title' => $message->getTitle(),
'content' => $message->getContent(),
];
} else {
throw new \Exception(sprintf('Serializer does not support message of type %s.', $message::class));
}

return [
'body' => json_encode($data),
'headers' => [
'stamps' => serialize($stamps)
]
];
}
}

1- The encode method:
As long as our transport support retries messages by default, it will call `encode` method and pass us the envelope object containing the message Post object and some stamps that are additional information about the message like the message bus that’s handling it or if it’s being retried after failure. So our job there is to turn the message to the original JSON format and serialize stamps and put them in the header.

2- The decode method:
After sending the message back to the broker, the transport will receive it again, so we need to check and deserialize the stamps in the header to let Messenger keep track of information about the message such as how many times to retry after failure, which message bus and so on…

Done.

Now our transport is ready to receive messages from the external service and redeliver them over to the broker in case something goes wrong with the handler in order to retry their consumption.

Cheers!!

Thanks for reading and feel free to criticize

--

--