Initial thoughts about Reliable Embedded Message Broker for PHP

It is essential to build message driven application when you implement Domain Driven Design or/and create microservice as part of bigger distributed system. There are many tools for such applications but not many with transactional messaging, message scheduling and advanced concurrency control of message consuming. Especially in PHP.

Our team currently use in-house solution for this purpose. In our opinion, the lack of such solution in PHP ecosystem prevents the community from comprehensive spreading ideas of Domain Driven Design and microservices. Soon, we plan to rethink and summarize our experience via open source library: REMBO (yeah, it’s empty repo now, but you can “Watch” for updates :) ). In this post, I am going to describe our initial thoughts about this library. So, let’s start.

Features

Strategic

Embedded —the library used as dependency inside an application and can be used with application’s storage. An external message broker is not required.

Reliable — messages can be produced via application’s transaction along with changes of model’s data. It is possible due to ability using the same connection to storage for both activities.

Universal — both persistent and in-memory, scalable vertically and horizontally (due to location transparency).

Composable — de-facto standards are used where it is possible:

Interfaces are separated from implementations, so replacing concrete implementations/dependencies is not a problem. Non-abstract dependencies such as amphp / symfony console / etc are used via standalone bridges.

Tactical

Scheduling of producing of messages into the future. The library supports both single and periodically scheduling.

Concurrency control via powerful limitation system of simultaneous consuming of messages.

Use cases

  • Scheduling background activity
  • Separating heavy activity from HTTP controller to background
  • Compliance with limitations of external services on simultaneous requests
  • Reliable producing domain events to external message broker without two-phase commit
  • Consuming one message at the same time in an aggregate root
  • Cross-process messaging/execution
  • Distribution of load by time and computing resources

Design

Top level architecture

Further details about different aspects of REMBO are in the following sections.

Producing direct messages

$system->produce($message, $consumer)

The application produces the message to the consumer. Behind the scene, the pair of the message and the consumer is stored in Inbox Repository. As a storage for Inbox Repository can act:

  • application’s storage
  • separated storage
  • in-memory storage

Inbox continuously pulls pairs of messages and consumers from Inbox Repository and builds in-memory queue with messages which are ready to be consumed.

An arbitrary amount of Workers continuously pull new messages from Inbox and pass them to corresponding consumers.

Producing broadcast messages

There are cases for producing broadcast messages instead of direct messages, e.g. messages which are events. When a consumer is not specified, Router consumer as default consumer is used.

$system->produce($message)

Router consumer uses previously registered consumers and produce direct messages to them in case these consumers care of certain message type.

Consuming messages

class FooConsumer
{
const NAME = 'foo';
    public function onBarMessage(BarMessage $message): Handler
{
return Handler::define(
function () use ($message) {
...
}
);
}

...
}

Concrete Consumer must have NAME const and handler method(s) which receives Messages and returns message’s Handlers. In standard case Handler is a just wrapper of Closure which consumes Message.

Both messages and consumers must be discovered in System during application’s initialization because Worker process must be able to construct them.

$system = new System();
$system->discoverMessage(BarMessage::class);
$foo = new FooConsumer();
$system->discoverConsumer($foo);

When Worker process constructed the message and the consumer it chooses particular handler method via argument’s type hinting.

There are cases for creating consumers which wrap entities:

class ConcreteEntityConsumer implements Consumer
{
const NAME = 'concrete_entity';
   /**
* @var Entity
*/
private $entity;
   /**
* ConcreteEntityConsumer constructor.
* @param Entity $entity
*/
public function __construct(Entity $entity)
{
$this->entity = $entity;
}
    public function onBarMessage(BarMessage $message): Handler
{
return Handler::define(
function () use ($message) {
$this->entity->domainMethod($message->baz());
}
);
}
    public function backup(): array
{
return ['entityId' => $this->entity->id()];
}
    ...
}

Such consumers can’t be discovered in System during application’s initialization, so consumer’s factories must be discovered in this case.

$system = new System();
$concreteEntityConsumerFactory = new ConcreteEnityConsumerFactory;
$system->discoverConsumerFactory($concreteEntityConsumerFactory);

Consumer’s factory can use consumer’s dump ( a result of calling backup method) for fetching an entity from entity repository so Worker can construct target consumer via this factory.

Limitation of simultaneous consuming of messages

Inbox process can control messages which will be consumed simultaneously.

It is common to consume messages sequentially inside one consumer. For this purpose, a consumer must implement SequentialConsumer interface.

interface SequentialConsumer
{
    public function getSequentialKey(): string;
}

Worker doesn’t receive pairs of messages and consumers associated with a particular sequential key if some pair with such key is consuming at that moment. For example, the sequential key for a simple consumer is its unique name.

class FooConsumer implements SequentialConsumer
{
    const NAME = 'foo';
    public function getSequentialKey(): string
    {
        return self::NAME;
    }
    ...
}

Consumers which wrap entities usually has a more complicated sequential key:

class ConcreteEntityConsumer implements Consumer
{
    const NAME = 'concrete_entity';
    /**
      * @var Entity
      */
    private $entity;
    /**
     * ConcreteEntityConsumer constructor.
     * @param Entity $entity
     */
    public function __construct(Entity $entity)
    {
        $this->entity = $entity;
    }
    public function getSequentialKey(): string
    {
        return self::NAME . '|' . $this->entity>id();
    }
    ...
}

Sometimes particular handlers should have additional limitations. It can manage limitation of simultaneous consuming of messages by a concurrency key with an integer value. This value limits count of pairs of messages and consumers with this concurrency key which can be consumed simultaneously.

class FooConsumer
{
    const NAME = 'foo';
    public function onBarMessage(BarMessage $message): Handler
    {
        return Handler::define(
            function () use ($message) {
                ...
            }
        )->withConcurrencyLimitation(new ConcurrencyLimitation('some', 5);
    }
    ...
}

The handler can specify more than one concurrency key due to the variadic argument in withConcurrencyLimitation method.

Scheduling of producing of messages

It is possible to schedule producing of messages in the future instead of immediate producing. Also, messages can be scheduled for periodical producing. Scheduler continuously pulls messages with proper scheduled time from Schedule Repository and produce them in a usual way.

$system->produceDelayed(
    Once::after(5)->hours(),
    new BarMessage(),
    $consumer
);
$system->produceDelayed(
    Once::on(new \DateTimeImmutable('2020–01–01')),
    new BarMessage()
);
$system->produceRepeatable(
    Repeat::every()->days()->since(new \DateTimeImmutable('16:35')),
    new BarMessage(),
    $consumer
);
$system->produceRepeatable(
    Repeat::every()->hours()->since(new \DateTimeImmutable('12:00')),
new BarMessage()
);

Canceling repeatable messages is possible.

$system->cancelRepeatable(
    Repeat::every()->days()->since(new \DateTimeImmutable('16:35')),
    new BarMessage(),
    $consumer
);

Integration with external message brokers

It is possible to configure proxying some produced messages to an external broker. When it is done, the implicit consumer is used for this purpose.

Consuming of messages from an external broker can be implemented in pull and push mode. For pull mode, External Adapter continuously pulls new messages from an external broker and produce them in a usual way. For push mode application should implement some end-point which will be used by an external broker. Such end-point should produce received messages to a system.

Repositories

  • rembo — main logic and common interfaces
  • amphp-bridge — usage of amphp
  • symfony-console-bridge — usage of symfony console
  • pdo-storage — implementation of PDO storage
  • google-pub-sub-external-broker — adapter for Google Cloud Pub/Sub as the external broker
  • etc

Limitations

Order of consumed messages can be different with the order of produced messages when concurrency control logic is used. Use consumers which do not depend on the order of messages.

“At least once” delivery of messages to consumers is guaranteed instead of “exactly once” when Inbox is not receiving an acknowledgment from Worker after successful consuming of messages. Use idempotent consumers.

Be ready for distributed systems ;)

FAQ

Q: An application’s storage usually is not queue. How to deal for example with MySQL which does not perfectly fit for queue storing?

A: The actual queue is located in a memory of Inbox process with an appropriate data structure, so an application’s storage is used only for persistence in a case of process crashing.

Q: Looks like this library is somewhat an implementation of Actor Model or port of Akka, is not?

A: This library uses some ideas and approaches of Actor Model and Akka, but underlying technique is different due to PHP specifics, so it is not accurate to identify it so.

Q: Is REMBO trying to replace such external message brokers as RabbitMQ/Redis/Google Cloud PubSub/etc.?

A: No, it supplements them if you use it for whatever purpose.

Q: Can be REMBO used for integration of several microservices?

A: No, use an external message broker for this job. REMBO’s goal is to solve messaging problem inside a microservice and to be reliable mediator between the microservice and external message broker.

Conclusion

I hope this post clarifies goals, features and overall design of the project, doesn’t it? Do you feel familiar with described problems? May this library help your application? Do you have custom expectations about mentioned features? We would appreciate your feedback!

One clap, two clap, three clap, forty?

By clapping more or less, you can signal to us which stories really stand out.