How to use RabbitMQ with Symfony messenger

Lukas Sprezak
4 min readMay 20, 2020

In this article I will first explain how the queues work and in the second part we will install
Symfony, messenger component and rabbitMQ and we will create a simple project example of product order basket in the store.

As a rule, new queue items are added as last ones (enqueue) and then they are removed as they move to the beginning of a queue (dequeue).

(First In, First Out, FIFO)

Type of queue:
• Cyclic
• Double-sided
• Priority

Cyclic queue has a wheel structure, if it comes down to the last element of the list it automatically starts from the beginning.

Double-sided queue — both sides can be used for adding as well as deleting nodes.

In case of priority queue, elements are added according to a given pre-defined priority which means that element with a higher priority will be moved up higher in a hierarchy and will be serviced earlier, not like in a classic queue when it is added up to the end of a queue.

The examples of such queues are the services with free accounts and fee-paying subscribed accounts, or insurance application where you report a damage and it is pre-defined that more serious damages (like accidents) are serviced with a higher priority than just regular bumps.

Symfony Messenger

Main concepts:

  • Message — any object
  • Bus — dispatching messages
  • Message Handler — processes for servicing messages
  • Transport — sending and receiving messages, sending to queues for example via RabbitMQ
  • Worker — processing and consuming messages

Stack: Symfony 4.3.11, PHP 7.3.16, MySQL 5.7.29

1. Create a project with the following command.

composer create-project symfony/website-skeleton queue-project “4.3.*”

2. Start the server with the command

bin/console server:run

3. Install Messenger component

composer require symfony/messenger

4. Create controller

bin/console make:controller

5. In method basket messages are dispatching

<?php
declare(strict_types=1);

namespace App\Controller;

use App\Message\Command\OrderProduct;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Routing\Annotation\Route;

class ShopBasketController extends AbstractController
{
private $messageBus;

/**
* ShopBasketController constructor.
*
@param MessageBusInterface $messageBus
*/
public function __construct(MessageBusInterface $messageBus)
{
$this->messageBus = $messageBus;
}

/**
*
@Route("/", name="shop")
*/
public function index(): Response
{
return $this->render('shop_basket/index.html.twig', [
'controller_name' => 'ShopBasketController',
]);
}

/**
*
@Route("/shop/basket", name="shop_basket")
*
@throws \Exception
*/
public function basket(): Response
{
$productNumber = 1;
$productAmount = 9.90;

try {
$this->messageBus->dispatch(
new OrderProduct($productNumber, $productAmount
));
} catch (\Exception $exception) {
throw new \Exception('Error buy product.');
}

return new Response(sprintf(
' success add to basket!' . self::randProduct()
));
}

public function randProduct()
{
define('PRODUCTS', [
'Backpack',
'Cap',
'Bag'
]);
echo PRODUCTS[array_rand(PRODUCTS)];
}
}

6. Next, we create OrderProductHandler. This handler evokes $orderProduct, and in example time interval 9 second, it consumes messages. Here your domain logic is located.

<?php
declare(strict_types=1);

namespace App\MessageHandler\Command;

use App\Message\Command\OrderProduct;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

final class OrderProductHandler implements MessageHandlerInterface
{
public function __invoke(OrderProduct $orderProduct)
{
sleep(9);
var_dump($orderProduct);
}
}

7. Create directory Message/Command and PHP class OrderProduct. This is a class where there is a PHP object that can be handled by a handler.

<?php
declare(strict_types=1);

namespace App\Message\Command;


class OrderProduct
{
private $productNumber;
private $productAmount;

public function __construct(int $productNumber, float $productAmount)
{
$this->productNumber = $productNumber;
$this->productAmount = $productAmount;
}

public function getProductNumber(): int
{
return $this->productNumber;
}

public function getProductAmount(): float
{
return $this->productAmount;
}
}

8. Register handler.

#config/service.yamlApp\MessageHandler\:
resource: '../src/MessageHandler'
tags: ['messenger.message_handler']

9. Doctrine transport configuration.

#config/packages/messenger.yamlframework:
messenger:
transports:
async:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
routing:
'App\Message\Command\OrderProduct': async

10. We also need to define the messenger transport. Doctrine transport was introduced in Symfony 4.3

#.envMESSENGER_TRANSPORT_DSN=doctrine://default

11. The following command will consume the messages from the queue.

bin/console messenger:consum

1. Then install RabbitMQ, login and password default is guest/guest

brew install rabbitmq

brew services start rabbitmq

http://localhost:15672/

2. You still need to install AMQP transport and add to php.ini extension=amqp.so (configuration below it’s works on Symfony 4.2

composer require symfony/amqp-pack

3. Define the transport

#.envMESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages

4. In case of RabbitMQ transport, the remaining part of its configuration is analogous to Doctrine transport.

config/packages/messenger.yamlframework:
messenger:
transports:
amqp: '%env(MESSENGER_TRANSPORT_DSN)%'

routing:
'App\Message\Command\OrderProduct': amqp

Summary

You have learned the Messenger component. We used the two transports Doctrine and RabbitMQ.

--

--