Scaling Queues — behind the scenes

Srikant Viswanath
Architectural Tapas
6 min readJun 7, 2020

Queues as we know are FIFO in nature and usually used a means to achieve asynchronous operations.

One end of the spectrum of asynchronous-processing consists of batch processing where the amount of input data is bounded, e.g. daily data reconciliation at an enterprise level. On the other end is stream processing — where data is processed in “real-time”, e.g., request to email a client on a successful order placement.

In this article we shall peek at the fundamentals of the latter

Using Database as a Queue

We might have come across systems that use databases(or a file)for orchestrating asynchronous message queueing — a producer writes every event that it generates to the database and a consumer(could be multiple) can poll periodically for new events.

Database in this context has been used in the sense of a traditional relational(or NoSQL) datastore that might not have been designed for such a polling use case at scale, i.e., when we want low-latency polling would start to become expensive.

I know what you’re thinking — notify/push events out to consumers. Triggers have traditionally been used to achieve this, but there are serious limitations to scaling in this use case.

Direct 1-to-1 communication

We could just have the publisher directly publish to consumer over for e.g. a TCP connection. Of course this is the bare-metal way of communication and most the messaging tools build over this.

Viable options in this area are UDP multicast, ZeroMQ, nanomsg.

A drawback here is that — the application logic needs to be aware and account for message loss. Even if packets can be retransmitted, it is under the assumption that both producer and consumer are online

Enter message brokers

The problems encountered with direct communication can be solved by delegating the problem to another system — the message broker.

It runs as a server with producers and clients connecting to it as clients, i.e., producers produce message to the broker and consumers consume by reading from the broker.

Compared to databases

Traditional message brokers like TIBCO message service, IBM MQ, Google Pub/Sub, RabbitMQ, ActiveMQ, HornetQ which use the standards like JMS and AMQP have the following differences w.r.t a database:

  • Databases persist data until explicitly asked to delete as opposed to message brokers which delete the messages (by committing an atomic transaction) as soon as a consumer has successfully consumed it.
  • Message brokers allow consumers to subscribe to a subset of messages matching some pattern e.g. topic routing key in RabbitMQ. Similarly databases would let the user filter data via a secondary index

Use case: Multiple consumers as workers

A very common use case is for a Q to fill up a units of work that are asynchronous requests. To increase throughput, we can deploy multiple Q consumers that load balance the work among them i.e., each message is to be processed by only one worker.

So, the job of load balancing must be done by the message broker. The broker arbitrarily assigns messages to consumers

Think about this pattern when messages are expensive to process — add consumers to parallelize

Acknowledgements and redelivery

Consumers can crash at any time. To ensure that messages are not lost, acknowledgements are used by brokers as a way to safely mark a message for deletion — a client consumer must explicitly notify a broker when it has finished processing a message.

If an acknowledgement is not received by a broker (a configurable timeout is usually set), the broker retries to deliver it to another consumer.

A message can be acknowledged as processed if and only if the database transaction for processing the message was successfully committed.

This is implemented by atomically committing the message acknowledgement and the database writes in a single transaction — usually done via 2 phased commit(2PC)

No guarantee: Delivery of order of messages

Given that the broker can attempt to redeliver an unacknowledged message to another broker, we can do a quick mental experiment to prove that the order in which messages were published by producer will not the order in which the messages will be delivered

In the following scenario — the order in which messages were produced by the producer is (msg1, msg2, msg3), however the order in which the messages were successfully delivered by the broker (and processed by consumers) is (msg1, msg3, msg2)

Use case: Multiple consumers as fan-out

A more straightforward use case where each consumer is independently interested in all of the messages.

This feature is provided by topic subscription in JMS and exchange binding in AMQP

Logs for messaging and storage

The traditional message brokers are built with ephemeral message storage in mind, i.e., as soon as a message is acknowledged it is deleted — although the broker does persist the message on disk for fault tolerance reasons(which are shortly deleted). However, messages are not retained unlike a database.

Message persistence is different than message retention

What if we want to attach/bind a new consumer that wants to start reading messages of the past until it catches up — say for data warehousing reasons? A traditional broker would politely ask you to “look elsewhere, that ain’t my job”

Why can’t we have a hybrid — combining the durable storage of databases with low-latency notification. This is the idea behind log-based message brokers like Apache Kafka, Amazon Kinesis Streams, AMPS, Twitter’s DistributedLog

A producer publishes a message by appending it to the end of a log on the broker. A consumer receives messages by reading the log sequentially. If it reaches the end of the log, it waits for a notification that a new message has been published.

What happens when multiple producers are appending messages to the same log making it very bloated? We invoke the partition axes of scalability here, i.e.,

the log can be partitioned so that different partitions are then hosted on different machines (with replication backups for fault tolerance) — making each partition a separate log that can be read, written and scaled independently.

A topic is a group of partitions that carry messages of a logical type

Within each partition, the broker assigns a monotonically increasing sequence number called offset to every message. There is no ordering guarantee across partitions.

Producers generally need not worry about which partition to append a message to. This is handled by the message broker.

For e.g., Kafka’s message payloads have a key in them which is used as a partition key

Since this solves many of the problems with traditional message brokers an entire new term has been coined for this hybrid approach of message retention and low-latency delivery — streaming.

Situations where messages may be expensive to process and you want to parallelize message-by-message processing and where message order is not important, JMS/AMQP message brokers are preferable.

On the other hand, in situations with high message throughput where each message is fast to process and where message ordering is important — log based approach should be preferred

References

Designing data-intensive applications: Martin Kleppmann

--

--