How to use RabbitMQ with Symfony messenger
In this article I will first explain how the queues work and in the second part we will install
Symfony, messenger component and rabbitMQ and we will create a simple project example of product order basket in the store.
As a rule, new queue items are added as last ones (enqueue) and then they are removed as they move to the beginning of a queue (dequeue).
(First In, First Out, FIFO)
Type of queue:
• Cyclic
• Double-sided
• Priority
Cyclic queue has a wheel structure, if it comes down to the last element of the list it automatically starts from the beginning.
Double-sided queue — both sides can be used for adding as well as deleting nodes.
In case of priority queue, elements are added according to a given pre-defined priority which means that element with a higher priority will be moved up higher in a hierarchy and will be serviced earlier, not like in a classic queue when it is added up to the end of a queue.
The examples of such queues are the services with free accounts and fee-paying subscribed accounts, or insurance application where you report a damage and it is pre-defined that more serious damages (like accidents) are serviced with a higher priority than just regular bumps.
Symfony Messenger
Main concepts:
- Message — any object
- Bus — dispatching messages
- Message Handler — processes for servicing messages
- Transport — sending and receiving messages, sending to queues for example via RabbitMQ
- Worker — processing and consuming messages
Stack: Symfony 4.3.11, PHP 7.3.16, MySQL 5.7.29
1. Create a project with the following command.
composer create-project symfony/website-skeleton queue-project “4.3.*”
2. Start the server with the command
bin/console server:run
3. Install Messenger component
composer require symfony/messenger
4. Create controller
bin/console make:controller
5. In method basket messages are dispatching
<?php
declare(strict_types=1);
namespace App\Controller;
use App\Message\Command\OrderProduct;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Routing\Annotation\Route;
class ShopBasketController extends AbstractController
{
private $messageBus;
/**
* ShopBasketController constructor.
* @param MessageBusInterface $messageBus
*/
public function __construct(MessageBusInterface $messageBus)
{
$this->messageBus = $messageBus;
}
/**
* @Route("/", name="shop")
*/
public function index(): Response
{
return $this->render('shop_basket/index.html.twig', [
'controller_name' => 'ShopBasketController',
]);
}
/**
* @Route("/shop/basket", name="shop_basket")
* @throws \Exception
*/
public function basket(): Response
{
$productNumber = 1;
$productAmount = 9.90;
try {
$this->messageBus->dispatch(
new OrderProduct($productNumber, $productAmount
));
} catch (\Exception $exception) {
throw new \Exception('Error buy product.');
}
return new Response(sprintf(
' success add to basket!' . self::randProduct()
));
}
public function randProduct()
{
define('PRODUCTS', [
'Backpack',
'Cap',
'Bag'
]);
echo PRODUCTS[array_rand(PRODUCTS)];
}
}
6. Next, we create OrderProductHandler. This handler evokes $orderProduct, and in example time interval 9 second, it consumes messages. Here your domain logic is located.
<?php
declare(strict_types=1);
namespace App\MessageHandler\Command;
use App\Message\Command\OrderProduct;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
final class OrderProductHandler implements MessageHandlerInterface
{
public function __invoke(OrderProduct $orderProduct)
{
sleep(9);
var_dump($orderProduct);
}
}
7. Create directory Message/Command and PHP class OrderProduct. This is a class where there is a PHP object that can be handled by a handler.
<?php
declare(strict_types=1);
namespace App\Message\Command;
class OrderProduct
{
private $productNumber;
private $productAmount;
public function __construct(int $productNumber, float $productAmount)
{
$this->productNumber = $productNumber;
$this->productAmount = $productAmount;
}
public function getProductNumber(): int
{
return $this->productNumber;
}
public function getProductAmount(): float
{
return $this->productAmount;
}
}
8. Register handler.
#config/service.yamlApp\MessageHandler\:
resource: '../src/MessageHandler'
tags: ['messenger.message_handler']
9. Doctrine transport configuration.
#config/packages/messenger.yamlframework:
messenger:
transports:
async:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
routing:
'App\Message\Command\OrderProduct': async
10. We also need to define the messenger transport. Doctrine transport was introduced in Symfony 4.3
#.envMESSENGER_TRANSPORT_DSN=doctrine://default
11. The following command will consume the messages from the queue.
bin/console messenger:consum
1. Then install RabbitMQ, login and password default is guest/guest
brew install rabbitmq
brew services start rabbitmq
2. You still need to install AMQP transport and add to php.ini extension=amqp.so (configuration below it’s works on Symfony 4.2
composer require symfony/amqp-pack
3. Define the transport
#.envMESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
4. In case of RabbitMQ transport, the remaining part of its configuration is analogous to Doctrine transport.
config/packages/messenger.yamlframework:
messenger:
transports:
amqp: '%env(MESSENGER_TRANSPORT_DSN)%'
routing:
'App\Message\Command\OrderProduct': amqp
Summary
You have learned the Messenger component. We used the two transports Doctrine and RabbitMQ.