Asyncio Tarantool Queue, get in the queue

In this article, I’m going to pay specific attention to information processing via Tarantool queues. My colleagues have recently published several articles in Russian on the benefits of queues (Queue processing infrastructure on My World social network and Push messages in REST API by the example of Target Mail.Ru system). Today I’d like to add some info on queues describing the way we solved our tasks and telling more about our work with Tarantool Queue in Python and asyncio.

The task of notifying the entire user base

Mail.Ru runs several media sites: News, Auto, Lady, Health, Hi-Tech, etc. Each one of them is visited by millions of users daily. The sites are adapted for mobile devices; the majority of them offer a touch-screen version. For our users’ convenience, we’ve created the News mobile application that is quite popular on Android and iOS devices. When some “hot” piece of news is published, the user receives a push message. It looks something like this: the chief editor chooses a piece of news, presses “Fire” on a management platform, and that’s it — go! But what next? We need to send out this news as quickly as possible to all subscribers. If someone receives the push message in 30 minutes, the news might not be that hot anymore, and the user might have learnt it from a different source. That’s no good for us.

So, we have a database that is stored in our beloved Tarantool. Now, we need to query it as quickly as possible and send push messages to all subscribers. For every one of them, there is a push token, and some device info (app version, screen resolution, time zone, the best time for the user to receive a message) stored in the database. It’s important to specify the time zone, since it’s not a good idea to send out push messages at night when everybody is sleeping.

The requirements are clear, let’s move on.

Solving the task

As a rule, we begin to solve a task with a straightforward approach. Simple code always looks nice and clear:

while «There are users»:
Choose «batch» of users
Send a push message to every user from the «batch»

The main while loop will run until all the users are notified. If the database isn’t that big, there is nothing else we need to do. Problem solved. For a big database, however, the simple solution will produce unacceptable lags in message delivery. So, what can be improved here? How to make this loop faster? How to make the send-out time constant regardless of the size of the database? To answer these questions, we need to specify the details of this message send-out process.

Let’s look at these two platforms: Android and iOS. What does it mean: “to send a push notification”? How do we do that? There are descriptions in the Google Cloud Messaging and Apple Push Notification Service documentation. There are libraries for push-notification sending in Android and iOS in Python, meant for working in a customary “synchronous” mode. If we dig a little deeper, we’ll see that every platform has its own specifics. “Push” in Android means to send JSON data via https; in iOS, that would mean — binary data sent to the SSL Socket. Apple is promising to start supporting HTTP/2 sometime soon. Dispatch to several devices is possible in Android. iOS offers the capability of grouping several users and sending the messages to the group. In other words, every platform has specific grouping characteristics.

The queue problem solution is kinda obvious: to segregate the processes of picking users from the database and dispatching the messages by the platforms. But the devil is in the detail. In order to send the messages to two platforms in two different ways, we can divide the users from a given “batch” into iOS and Android, group the users and add the dispatch message to the required queue. Then the messages can be processed; in other words, they can be sent out. Schematically, all these processes will look like this:

Going through the user database and processing messages via a queue.

What’s good about this approach? We separate the process of going through the user database from that of sending push messages. Therefore, we’ll make the “batch” sorting (select_range) go faster in our initial cycle. If later on, during messages processing, we encounter problems with one of the platforms (that happens frequently), this won’t affect the other platform. So, we can easily parallel the message processing jobs between the server cores since now we have logical queues. If we need to expand our system a bit, we’ll simply add new logical queues.

Solving load and scaling problems

If we increase load on one server, we’ll soon run out of CPU. So, should we add one more server? Yes, a completely identical one. But that’s better done when we architect the service. If we make the system work on two servers, we’ll be able to easily add a couple of dozens more. That’s our principle: two servers minimum even if there is no real load. Several servers would also enhance the reliability of our service. The service architecture in this case would look like this:

Working with the user database on two servers

So, we have two servers with their own queues (of course, there is also the user database; let’s say, it still exists somewhere close by, available for select_range running, but we won’t pay attention to that). It’s very important to run the “go through” cycle simultaneously on both servers. We can iterate our cycle on one of the servers; choose batches, put each batch in different queues, evenly distribute the “batches” among servers. Using this approach, we are going to have to “move” the data around the network. Choosing a “batch” and putting it in a queue on a different server is a weak point of this approach. We need to run select_range in parallel across servers.

In order to do that, we have to pick a “batch” from one of the servers and add a small message with the user id from the current “batch”. After processing this small message on the second server, we get a new “batch” starting with the specified id, create an identical message for the “neighbor server” etc. until we are through with the whole base. The current “batch” must always be processed locally in its own queue. Thus, we’ll sort of move the code to our data, split the “batch” building among the servers, and won’t move the data around the network.

The sequence diagram would look like this:

“For all the users” cycle is performed via queue.put(last_id). The dispatch process will end when select_range runs out of users. It’s crucial that the sending does not block on the database. This is similar to the MapReduce process in Hadoop: the same “divide and conquer” principle.

The same architecture is used in our production environment. For every type of a mobile device and platform, individual logical queues are used, which allows for independent parallel process performance. It takes about 2 minutes to send the news push messages to all the users from our 2-million-user database. Simultaneously, the eight-server cluster sends about 10 thousand push messages per second.

Specifics of writing code for Tarantool Queue

How do we work with a large number of logical queues? How do we simultaneously handle and create data for all the queues in one Python process? Asynchronous programming techniques come to the rescue. In the following examples, I’m using Centos 6.4, Python 3, asyncio, aiotarantool_queue, Tarantool 1.6 and Tarantool Queue.

Tarantool Queue can handle quite a heavy load. There is a description on GitHub. Several logical queues can be created via queue.create_tube call in one Tarantool Queue instance. Logical queues are called “tubes”. Several types of logical queues are supported. There is a “take/ack” mechanism in Tarantool Queue. “take” marks the task as “in process”. “ack” deletes the task from the queue, thus confirming its successful completion. If it doesn’t come to “ack” command, another process will “pick up” the task and complete “take”. The task completion can be delayed using the delay parameter. Few queue implementations have matching functionality and performance.

The fact that we use Tarantool both as user storage and as a queue system makes our service very simple. No need to use Tarantool Queue. Tarantool and Lua offer the possibility to implement our own queue.

So, we install Tarantool, putting github.com/tarantool/queue in /usr/local/lua catalog. In the /etc/tarantool/instances.enabled/q1.lua Tarantool configuration script, we specify:

#!/usr/bin/env tarantool
package.path = package.path .. ';/usr/local/lua/tnt-queue/?.lua'
box.cfg{listen = 3301, slab_alloc_arena = 2}
queue = require 'queue'
queue.start()
box.queue = queue

Let’s start our instance of the queue:

$ tarantoolctl start q1

Enter the console:

$ tarantoolctl enter q1
/usr/bin/tarantoolctl: Connecting to /var/run/tarantool/q1.control
/usr/bin/tarantoolctl: connected to ... /run/tarantool/q1.control
unix/:/var/run/tarantool/q1.control

Allow guest access and create the q1 logical queue:

> box.schema.user.grant('guest','read,write,execute','universe')
> queue.create_tube('q1', 'fifo')
^D

One queue can be drained like this:

queue = Tarantool.Queue(host="localhost", port=3301)
while True:
task = queue.take(tube="q1")
process(task)
task.ack()

To drain N queues, we create N processes. In every process, we have to connect to the proper queue and start up the exact same cycle. This approach does work, but in case of multiple queues, there will be multiple connections to Tarantool Queue. Next, there will be many running processes taking up system resources. That many connections also make the work with Tarantool less effective. Finally, we’ll need connections for Google and Apple servers. And once again, the less connections for Google and Apple servers we have, the less is the load and the more server resources become available.

In my next article, I’ll explain why using just one Tarantool connection can significantly increase performance (it’s very important for our use case). The same approach can be used here. We modify our initial pseudo-code to work with two queues. Then we adapt it for asyncio.

import asyncio
import aiotarantool_queue

@asyncio.coroutine
def worker(tube):
while True:
task = yield from tube.take(.5)
if not task:
break
# process(task.data)
yield from task.ack()

loop = asyncio.get_event_loop()
queue = aiotarantool_queue.Queue("127.0.0.1", 3301, loop=loop)
workers = [asyncio.async(worker(tube), loop=loop)
for tube in (queue.tube('q1'), queue.tube('q2'), queue.tube('q3'))]
loop.run_until_complete(asyncio.wait(workers))
loop.run_until_complete(queue.close())
loop.close()

In one process, we create a connection to the queue. We also create coroutines with a “take/ack” cycle for all the logical queues. We start the event loop and sort out all our queues. This is what our queue work pattern looks like.

I’d like to mention that the code remains linear, there are no callbacks. “Under the hood” of this code there is a hidden feature: the tasks from the queue are read in “batches” — it’s all given by aiotarantool_queue out of the box. And no waiting, queue pulling and timeouts! Cool? Of course, you’re gonna have to create several processes like that to distribute load across all CPU cores, but that wouldn’t be a big deal. Queue handling in Python processes would look about the same. There would be processes instead of coroutines. The synchronous approach could make this code even more confusing and what’s more important — less effective.

But there are some cons of using asyncio. Our python program may have to use other libraries, not ready for asynchronous environment. While it’s not hard to do, closely studying the library code and adapting it to use asyncio calls instead of blocking calls may be necessary. If you need an effective service, putting the effort in adapting the work of external libraries will pay off.

But what about Redis and RabbitMQ?

Why do we use Tarantool Queue and not Redis or RabbitMQ? It wasn’t easy to decide on one or the other product: we considered both Redis and RabbitMQ. We even had a prototype on Redis. All these solutions have decent efficiency. But that’s not just about “which one is faster”…

First of all, we need the queue to be reliable and not to be located in the memory. Tarantool with WAL looks more reliable than Redis and RabbitMQ.

Every queue system has its own specifics. Redis offers a “pub/sub” mechanism but that doesn’t suit our needs — we need an actual queue. There are “rpush/blpop” lists and operations with blocking and new data waiting in Redis but there is no “take/ack” mechanism. This is the mechanism that provides reliability in our production — it’s been proving itself useful continuously.

RabbitMQ offers lots of different queue templates. We only need a piece of RabbitMQ functionality to solve our task. Its performance is really high; however, if we start saving data on disk, performance declines significantly. Yet another point is that we need experienced system administrators to run RabbitMQ. They should be able to diagnose production issues and not just restart the RabbitMQ instance.

I’d like to pay special attention to RabbitMQ Python API and connector for asyncio. API for queues is implemented on callbacks. The callback code becomes complicated and hard to support. To create message.ack in asyncio, we have to create a Future and wait for it. This code looks too complex. We also failed to send several “put/take” calls in one connection.

Redis does much better with asyncio: there is a great connector made by the asyncio creator itself. It works really fast.

To sum up

We talked about the architecture that allows parallelizing work with the user database with the help of queue systems on several servers. We reviewed the patterns of Tarantool Queue and asyncio usage. We paid some attention to the issues in code development via queue systems. We took a look at RabbitMQ and Redis problems and also the benefits of Tarantool Queue.

I hope, readers will find this information useful. Feel free to share your experience in dealing with queues and tell us why you chose one or the other solution.

The links that were used for this article: