RabbitMQ for PHP Developers

Syed Sirajul Islam Anik
Sep 3 · 10 min read
Image from: http://bit.ly/2U3PyJz

In my last article Easy Peasy RabbitMQ Squeezy, I wrote how RabbitMQ works. I tried to mention in-depth with easy examples so that even if a noob comes in to learn how it works, he should get it fast. Not sure if I was successful. In that article, I had added a GitHub repository as well where you could find PHP code to interact with RabbitMQ.

When you have to use the php-amqplib package, you have to make sure you always do the same steps over and over again. Like, you’ve declared an exchange programmatically. Now either you’ll have to declare the queue if it’s not declared otherwise you’ll have to bind the queue to that exchange with bind-key or headers argument. After that, you can start consuming messages. To consume a message you’ll always have to do the same thing. What you can do to remove these redundancies by creating a class, and using that class over and over again. You wrap up those tasks for you.

I tried to ease these problems. I wrote a package that will let you publish a message or Consume messages from RabbitMQ painlessly.

N.B: Before we dive into the package and how it works, let’s just make sure that you have a good grasp of RabbitMQ architecture. It’s mandatory. You can just check my article mentioned above to have a better idea. After that, it’ll be easy for you to go with this article.


Installation

Primarily the package works with Laravel, Lumen & Laravel Zero. But in the roadmap, I have a thought to extend it, so that you can use the package outside of your Laravel project.

So, to use the package with your Laravel, Lumen or Laravel Zero application, you’ll have to install it via composer.

composer require anik/amqp

The package requires some package dependencies. Composer will resolve it. But your computer or container must have to have the following php version & extensions just to get it installed.

  • php >= 7.0
  • ext-bcmath
  • ext-sockets

If all the criteria are met, Composer will then install the package.

For Laravel

  • Add the provider in your config/app.phpproviders array.
'providers' => [
/// ...
Anik\Amqp\ServiceProviders\AmqpServiceProvider::class,
],
  • Add configuration file amqp.php in your config directory with the following command.
# Following one is a single line of command, it's been wrapped here
php artisan vendor:publish --provider=Anik\Amqp\ServiceProviders\AmqpServiceProvider

For Lumen

  • Add the service provider in your bootstrap/app.php file.
# Following one is a single line of Code, it's been wrapped here
$app->register(Anik\Amqp\ServiceProviders\AmqpServiceProvider::class);
  • Add configuration file amqp.php in your config directory by copying it from vendor/anik/amqp/src/config/amqp.php.
  • Add $app->configure('amqp'); to your bootstrap/app.php to make the configuration available throughout the project. Lumen doesn’t load configurations by default.

N.B: For Lumen, you don’t need to enable Facade.

For Laravel Zero

  • Add provider in your config/app.php providers array.
'providers' => [
/// …
Anik\Amqp\ServiceProviders\AmqpServiceProvider::class,
]
  • Add configuration amqp.php in your config directory by copying it from vendor/anik/amqp/src/config/amqp.php.

Configuration

When you’re done copying, you can update your config/amqp.php according to your need. You can use multiple connections like Laravel’s other service database, queue, cache.


Tech talk

This package has a Facade class Anik\Amqp\Facades\Amqp. If you’re using Facade, then this class will proxy all the underlying calls to Anik\Amqp\AmqpManager class, which is actually responsible for doing all the magic.

If you’re not using facade then

  • app(Anik\Amqp\Facades\Amqp::class)
  • app()->make(Anik\Amqp\Facades\Amqp::class)
  • app('amqp')
  • app()->make('amqp')

will proxy the calls to Anik\Amqp\AmqpManager. This is how facades in Laravel/Lumen work. If you don’t know, I have an article on Facade. Have a look.

But, if your approach is to use the Anik\Amqp\AmqpManager class instead, it’s a good idea to use it either using the dependency injection or Service Container by using app(‘Anik\Amqp\AmqpManager'). Because it’s good to use singleton on this class to keep the connections alive throughout the request life-cycle. It’s recommended. When the classes are destructed, the connections and channels are freed.

Source: http://bit.ly/2lPrnRU

In your configuration,

  • amqp.connections.*.exchange or amqp.connections.*.queue declare means it’ll try to declare/force declare the exchange or queue when required if set to true. Default is false.
  • amqp.connections.*.exchange properties are the properties/arguments that will be used when declaring the exchange.
  • amqp.connections.*.queue d_properties are the properties/arguments that will be used when declaring the queue.
  • amqp.connections.*.queue b_properties are the properties/arguments that will be used when binding the queue to an exchange.
  • Using the package, you can change your properties dynamically. The hierarchy is below.
default_properties < class_based_properties < dynamic_properties

Publish a message

To publish a message, what we need is the name of the exchange and the message. We will need the following configuration. The following comes with the package by default, stays in amqp.php file. Slightly changed for example.

<?php
return [
'default' => 'my-connection-name',

'connections' => [
'my-connection-name' => [
'connection' => [
'host' => 'localhost',
'port' => 5672,
'username' => 'username',
'password' => 'password',
'vhost' => '/',
'connect_options' => [],
'ssl_options' => [],
'ssl_protocol' => 'ssl',
],

'channel_id' => null,

'message' => [
'content_type' => 'text/plain',
'delivery_mode' => 2,
'content_encoding' => 'UTF-8',
],

'exchange' => [
'name' => 'amq.topic',
'type' => 'topic',
'declare' => false,
'passive' => false,
'durable' => true,
'auto_delete' => false,
'internal' => false,
'nowait' => false,
'properties' => [],
],
],
],
];

The package will use this configuration by default. If any value is passed when publishing a message with the following approaches, then these values will get overwritten.

To publish a message, we’ll use app('amqp'). But you can use any of the ways declared before.

To publish a message we require an exchange and a routing key. That’s all. There are two ways to accomplish this.

Approach one — Dynamic

<?php

The above code mentions,

  • It wants to send a message hi to direct exchange.
  • With a routing key message.
  • To an exchange named direct.exchangewith the type direct and it should declare the exchange programmatically when publishing the message. The values within the exchange array will overwrite the default configuration mentioned in the above snippet.

That’s it. You can just send the message with just 7 lines of code. No extra coding. No headache!

Approach two — Class based

To replicate the exact same thing with class-based publishing, you can do like the following.

<?php
use \Anik\Amqp\PublishableMessage;
use \Anik\Amqp\Exchange;

The above code mentions,

  • Create a PublishableMessage with a message body hi to direct exchange.
  • Set an exchange to the message with the name direct.exchange with the properties and asks to declare the exchange beforehand.
  • Publishes the message with a routing key message.

So, if you want to update change the configuration/properties of exchange or message, all you have to do is to pass values like the following snippet.

<?php

N.B: Both the cases for sending a message are identical. You can use any of those ways you want. Even if you use a class-based approach, you can still pass configuration dynamically. And those configurations will overwrite the class’s configuration. publish method expects array or single message of the type of string or PublishableMessage. That gives you the flexibility to pass bulk messages or any single message of allowed types.

Limitation on Publishing Messages

  • connection and channel_id can only be passed when publishing a message, in the third parameter.
  • You can only pass a connection name. That name should hold all the required values in the amqp.php file.
  • When publishing bulk messages (array of messages), it’ll take the first message’s exchange properties and use that for the whole message set.

Consume Message

To consume a message, we primarily need an exchange and a queue. We will need the following configuration. The following comes with the package by default, stays in amqp.php file. Slightly changed for example.

<?php
return [
'default' => 'my-connection-name',

'connections' => [
'my-connection-name' => [
'connection' => [
'host' => 'localhost',
'port' => 5672,
'username' => 'username',
'password' => 'password',
'vhost' => '/',
'connect_options' => [],
'ssl_options' => [],
'ssl_protocol' => 'ssl',
],

'channel_id' => null,

'exchange' => [
'name' => 'amq.topic',
'type' => 'topic',
'declare' => false,
'passive' => false,
'durable' => true,
'auto_delete' => false,
'internal' => false,
'nowait' => false,
'properties' => [],
],

The package will use this configuration by default. If any value is passed when trying to consume messages with the following approaches, then these values will get overwritten.

To consume a message, we’ll use app('amqp'). But you can use any of the ways declared before.

To consume a message we need at least an exchange and a queue. And we can extend functionality using the qos, consumer. There are two ways to accomplish this.

Approach one — Closure based

<?php

The above snippet mentions,

  • Consume message in a closure/anonymous function.
  • On connection my-connection-name.
  • Exchange name is direct.exchange and type of that exchange is direct. Declare the exchange.
  • Bind the queue to that exchange with a bind-key message. Naming direct.exchange.queue. Declare the queue first.
  • Quality of Service (QoS) is enabled when receiving messages, and unacknowledged message count can be 5 at max.
  • When the messages are received, they send an acknowledgment to the rabbitmq broker.

Approach two — Class based

To replicate the exact same thing with the class-based consumer, you can do like the following.

<?php

The above code depicts,

  • Create a CustomConsumableMessage which extends ConsumableMessage class. This class should define a methodhandle, responsible for handling the incoming messages.
  • Set an exchange named direct.exchange from where the messages will be received by the queues and passed to this consumer.
  • The queuedirect.exchange.queue will be used to get the messages from previously defined direct.exchange.
  • QoS is enabled, and the maximum unacknowledged message count can be 5 at max.
  • Use the connection my-connection-name which is defined in amqp.php configuration file.
  • CustomConsumableMessage::handle method acknowledges the received message through the delivery channel deliveryInfo.

You can update your configuration before you start to consume the message using the third parameter of the AmqpManager::consume method. Like the following.

<?php
app('amqp')->consume(function ($message) {}, 'message', [
'connection' => 'conn-name', // should exists in amqp.php file
'channel_id' => 100, // int value or decided by the package
'exchange' => [], // exchange configuration like amqp.php file
'queue' => [], // queue configuration like amqp.php file,
'consumer' => [], // consumer configuration like amqp.php file
'qos' => [], // qos configuration like amqp.php file
]);

Limitations of Consuming Message

  • Whenever PHP code reaches app('amqp')->consume(), it starts to listen to incoming messages. The code next to it will never work.
  • If you enable the QoS, it’s your responsibility to acknowledge or reject messages within the closure or handle method. By default, it doesn’t acknowledge or reject any message.
  • connection and channel_id can only be defined within the third parameter array.

Good to know

  • I mentioned that for publishing a message you need an exchange and message. But in AmqpManager::publish, the second parameter requires a routing key which can be empty if you wish too. But it’s required.
  • I also mentioned that for consuming messages you need an exchange and a queue. But AmqpManager::consume requires bind-key as the second parameter. If you wish to pass it as an empty string, it’s okay. But it’s required.
  • You can use the class-based approach with Exchage Queue, Consumer and ConsumableMessage or PublishableMessage. QoS, Connection, Channel id cannot be used as a class-based approach. They can only be passed with the array while consuming/publishing message.
  • The queue will get declared if your queue doesn’t have any name. Or it’s forced to be declared. If your property no_wait for queue is set to true, but your queue doesn’t have any name, no_wait will be set to false.
  • channel_id is by default set to null. But if you declare a channel_id, it’ll be using that channel.

All these snippets are added on this repository. The branch name is: amqp-tutorial. Checkout to that branch. Play with it.

Happy coding ❤

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade