Distributed delay queues based on Dynomite
Netflix’s Content Platform Engineering runs a number of business processes which are driven by asynchronous orchestration of micro-services based tasks, and queues form an integral part of the orchestration layer amongst these services.
Few examples of these processes are:
- IMF based content ingest from our partners
- Process of setting up new titles within Netflix
- Content Ingest, encode and deployment to CDN
Traditionally, we have been using a Cassandra based queue recipe along with Zookeeper for distributed locks, since Cassandra is the de facto storage engine at Netflix. Using Cassandra for queue like data structure is a known anti-pattern, also using a global lock on queue while polling, limits the amount of concurrency on the consumer side as the lock ensures only one consumer can poll from the queue at a time. This can be addressed a bit by sharding the queue but the concurrency is still limited within the shard. As we started to build out a new orchestration engine, we looked at Dynomite for handling the task queues.
We wanted the following in the queue recipe:
- No external locks (e.g. Zookeeper locks)
- Highly concurrent
- At-least-once delivery semantics
- No strict FIFO
- Delayed queue (message is not taken out of the queue until some time in the future)
- Priorities within the shard
The queue recipe described here is used to build a message broker server that exposes various operations (push, poll, ack etc.) via REST endpoints and can potentially be exposed by other transports (e.g. gRPC). Today, we are open sourcing the queue recipe.
Using Dynomite & Redis for building queues
Dynomite is a generic dynamo implementation that can be used with many different key-value pair storage engines. Currently, it provides support for the Redis Serialization Protocol (RESP) and Memcached write protocol. We chose Dynomite for its performance, multi-datacenter replication and high availability. Moreover, Dynomite provides sharding, and pluggable data storage engines, allowing us to scale vertically or horizontally as our data needs increase.
We chose to build the queues using Redis as a storage engine for Dynomite.
- Redis architecture lends nicely to a queuing design by providing data structures required for building queues. Moreover, Redis in memory design provides superior performance (low latency).
- Dynomite, on top of Redis, provides high availability, peer-to-peer replication and required semantics around consistency (DC_SAFE_QUORUM) for building queues in a distributed cluster.
A queue is stored as a sorted set (ZADD, ZRANGE etc. operations) within Redis. Redis sorts the members in a sorted set using the provided score. When storing an element in the queue, the score is computed as a function of the message priority and timeout (for timed queues).
Push & Pop Using Redis Primitives
The following sequence describes the high level operations used to push/poll messages into the system. For each queue three set of Redis data structures are maintained:
- A Sorted Set containing queued elements by score.
- A Hash set that contains message payload, with key as message ID.
- A Sorted Set containing messages consumed by client but yet to be acknowledged. Un-ack set.
- Calculate the score as a function of message timeout (delayed queue) and priority
- Add to sortedset for queue
- Add message payload by ID into Redis hashed set with key as message ID.
- Calculate max score as current time
- Get messages with score between 0 and max
- Add the message ID to unack set and remove from the sorted set for the queue.
- If the previous step succeeds, retrieve the message payload from the Redis set based on ID
- Remove from unack set by ID
- Remove from the message payload set
Messages that are not acknowledged by the client are pushed back to the queue (at-least once semantics).
Availability Zone / Rack Awareness
Our queue recipe was built on top of Dynomite’s Java client, Dyno. Dyno provides connection pooling for persistent connections, and can be configured to be topology aware (token aware). Moreover, Dyno provides application specific local rack (in AWS a rack is a zone, e.g. us-east-1a, us-east-1b etc.) affinity based on request routing to Dynomite nodes. A client in us-east-1a will connect to a Dynomite/Redis node in the same AZ (unless the node is not available, in which case the client will failover). This property is exploited for sharding the queues by availability zone.
Queues are sharded based on the availability zone. When pushing an element to the queue, the shard is determined based on round robin. This will ensure eventually all the shards are balanced. Each shard represents a sorted set on Redis with key being combination of queueName & AVAILABILITY _ZONE.
The message broker uses a Dynomite cluster with consistency level set to DC_SAFE_QUORUM. Reads and writes are propagated synchronously to quorum number of nodes in the local data center and asynchronously to the rest. The DC_SAFE_QUORUM configuration writes to the number of nodes that make up a quorum. A quorum is calculated, and then rounded down to a whole number. This consistency level ensures all the writes are acknowledged by majority quorum.
Avoiding Global Locks
- Each node (N1…Nn in the above diagram) has affinity to the availability zone and talks to the redis servers in that zone.
- A Dynomite/Redis node serves only one request at a time. Dynomite can hold thousands of concurrent connections, however requests are processed by a single thread inside Redis. This ensures when two concurrent calls are issued to poll an element from queue, they are served sequentially by Redis server avoiding any local or distributed locks on the message broker side.
- In an event of failover, DC_SAFE_QUORUM write ensures no two client connections are given the same message out of a queue, as write to UNACK collection will only succeed for a single node for a given element. This ensures if the same element is picked up by two broker nodes (in an event of a failover connection to Dynomite) only one will be able to add the message to the UNACK collection and another will receive failure. The failed node then moves onto peek another message from the queue to process.
Queue Maintenance Considerations
Useful when queues are not balanced or new availability zone is added or an existing one is removed permanently.
Handling Un-Ack’ed messages
A background process monitors for the messages in the UNACK collections that are not acknowledged by a client in a given time (configurable per queue). These messages are moved back into the queue.
A modified version can be implemented, where the consumer can “subscribe” for a message type (message type being metadata associated with a message) and a message is delivered to all the interested consumers.
Ephemeral queues have messages with a specified TTL and are only available to consumer until the TTL expires. Once expired, the messages are removed from queue and no longer visible to consumer. The recipe can be modified to add TTL to messages thereby creating an ephemeral queue. When adding elements to the Redis collections, they can be TTLed, and will be removed from collection by Redis upon expiry.
Other messaging solutions considered
Kafka provides robust messaging solution with at-least once delivery semantics. Kafka lends itself well for message streaming use cases. Kafka makes it harder to implement the semantics around priority queues and time based queue (both are required for our primary use case). Case can be made to create large number of partitions in a queue to handle client usage — but then again adding a message broker in the middle will complicate things further.
Amazon SQS is a viable alternative and depending upon the use case might be a good fit. However, SQS does not support priority or time based queues beyond 15 minute delay.
Disque is a project that aims to provide distributed queues with Redis like semantics. At the time we started working on this project, Disque was in beta (RC is out).
- Zookeeper (or comparable) distributed locks / coordinator based solutions.
A distributed queue can be built with Cassandra or similar backend with zookeeper as the global locking solution. However, zookeeper quickly becomes the bottleneck as the no. of clients grow adding to the latencies. Cassandra itself is known to have queues as anti-pattern use case.
Below are some of the performance numbers for the queues implemented using the above recipe. The numbers here measures the server side latencies and does not include the network time between client and server. The Dynomite cluster as noted above runs with DC_SAFE_QUORUM consistency level guarantee.
Dynomite cluster is deployed across 3 regions providing higher availability in case of region outages. Broker talks to the Dynomite cluster in the same region (unless the entire region fails over) as the test focuses on the measuring latencies within the region. For very high availability use cases, message broker could be deployed in multiple region along with Dynomite cluster.
We built the queue recipe based on the need for micro-services orchestration. Building the recipe on top of Dynomite, provides flexibility for us to port the solution to other storage engine depending upon the workload needs. We think the recipe is hackable enough to support further use cases. We are releasing the recipe to open source: https://github.com/Netflix/dyno-queues.
If you like the challenges of building distributed systems and are interested in building the Netflix studio eco-system and the content pipeline at scale, check out our job openings.
— by Viren Baraiya
Originally published at techblog.netflix.com on August 16, 2016.