Parallel processing challenges when using message queues in PHP applications

A typical messaging queue setup

Message queues are an easy way to handle many performance issues in web apps. For example, you might have a frontend app that takes orders in an e-commerce store but the order processing process might be slow and you don’t want to hog the whole system. An easy fix is to put the order on a queue and process it later. In this article I will be using RabbitMQ as a reference messaging system.

Queues also give you predictability over your resources. Since the resources used are determined by the consumers that read from the queues this will always be a fixed number, even if you have a spike in traffic the system will not go down. The messages will just take a longer time to process.

It also gives you flexibility. If you notice (or better yet, have some alerts in place) that some queues are more busy than they should be you can always spin up more consumers or more machines and process them faster.

However, there is also a caveat. Most of the time you will have more than one consumer running in parallel on a queue to speed things up which will introduce you to the wonderful world of parallel processing. The thing with parallel processing is that it seems so simple that you can’t possibly imagine how badly things can go wrong. And they can go really bad. Here are some of the things you should consider when running messaging queues.

Entity Duplication

Let’s go back for a second to our first example and imagine the following message:

{
"order_id": 123,
"customer_name": "John Smith",
"customer_email": "john@sample.com"
"products": [1,2,3]
}

Let’s assume that when the consumer processes this message it will also look if there is a customer with that e-mail and if it doesn’t exist it will create one. In reality this can be a fairly common scenario when syncing data between applications.

Now let’s assume that the same customer made another order and each of these orders will be taken by a consumer and processed simultaneously.

{
"order_id": 124,
"customer_name": "John Smith",
"customer_email": "john@sample.com"
"products": [4,5,6]
}

What will happen is that both processes will look if the customer exists in the database, they will see that it doesn’t exist and create it. This will lead to data duplication if your database does not have unique constraints.

A couple of ways to fix this:

  • Make sure you have proper unique constraints and handle the error. You could possibly handle this error by fetching the user from the db and using it. Another way would be to simply re-queue the message.
  • Prevent the error from happening. You can do this by using locks on the data that is unique. A very simple but very crude way to do this if you’re using MySQL is to use named locks. (https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_get-lock)

The order the messages arrive in the queue is not the same as the order in which they are processed

The processing of the messages in order can only be guaranteed if you have only one consumer on the queue. This can lead to tricky situations. Let’s imagine you have a system that updates the prices of products and sends this data to the frontend. An example message might look something like this:

{
"product_id": 1,
"price": 100,
"currency": "USD"
}

The frontend reads the message and updates it’s internal storage price to 100$.

But what happens if immediately after changing the price to 100$ the user realized it was a mistake and changed it to 80$? We now have the message below published on the queue and possibly processed by another worker in the same time.

{
"product_id": 1,
"price": 60,
"currency": "USD"
}

So what happens then? Which value will show on the frontend? The truth is you don’t know, it can be either one of them depending on which managed to save last to the database. Obviously this is not acceptable and can lead to unsynchronized data.

A quick and easy fix is to have a version or at least a generated time on each messages. So the messages will become something like this:

{
"product_id": 1,
"price": 100,
"currency": "USD"
"version": 1
}

And the next update will be:

{
"product_id": 1,
"price": 60,
"currency": "USD"
"version": 2
}

The consumers also need to be changed to only save the version to the database and to save the other data only when the version is greater than the one already in the database way. This way if the first message gets to be saved after the second one the system won’t make any change to the data.

Messages can get processed more than once

Let’s assume that you have a message that lower the price of the product with 10$ and then logs this information to disk. What happens if the disk gets full? An usual scenario is that this will throw an error, the message system will not receive the ACK signal and will send the message again to the same consumer or maybe another. See what will happen? The price will get dropped again by 10$. This can lead to funny situations like prices below zero and infinite loops.

The only way to fix this is to make sure that messages are idempotent. No matter how many times they are processed the result stays the same by doing something like setting the price in absolute value, or storing a hash of the message and not processing it if stays the same. You can even use the versioning system described above. Also this use case should already raise flags when thinking about what will happen is two of these messages are processed simultaneously.

One message can hang up the whole queue and even the whole system

Error handling is very important in processing queues. Not doing this properly can result in the whole system coming to a halt. Worse off, if this does not get solved quickly (imagine this happening Friday night) the queues can grow huge. Huge queues can determine the messaging system to freeze up as they are mostly built to handle speed not size which can lead to data loss.

Let’s imagine your consumer expects that all messages are json but somehow a message gets on the queue that is not json. Maybe someone published it manually, maybe there was an error on the publisher, this situation will happen eventually. If you do not do anything the system will throw an error, quit the process, the messaging system will not get an ACK and send the message again to another publisher because the current one just died. It will do this until it effectively kills all workers on the queue. Even when using a system like supervisor (which you should always do) to make sure workers are up they all have thresholds for when to quit restarting processes.

The trick is to be very careful what messages are send back to the queue. I would argue that the only time a message should be re-queued is when the error is temporary (a 503 on an external API, a duplicate key etc.) and even then you should implement a counter that counts how many attempts were made to process that message and when a threshold is met the message should be just logged as an error.

Loss of messages is inevitable

This is something I have come to realize over the past years of using queues. No matter how much you try, eventually something happens and some messages get lost. It can either be an error on the messaging system or maybe the consumer had a bug and didn’t do what it was supposed to do.

You should always make sure you have a way to get them back somehow. This can either be done by the publisher which ideally would have a way to push to the queues the same data again. If the publisher cannot this and you are only building the consumer then maybe you should consider making sure that the first thing you do is storing the messages somehow for a limited period of time. This is also crucial for debugging. It is the only way you can know what happened. Ideally you would store them in an ELK cluster so you can also search them easily, but if that is not possible you can use a simple mongoDB capped collection.

Long running processes in PHP

PHP was not designed with long running processes in mind. It was designed to be run as apache child processes that get started and killed for each connection. This raises a couple of problems and probably the first one you will encounter is memory leaks.

If you will build a consumer and use Symfony’s doctrine ORM you will see that the more messages it consumes the bigger the memory footprint will grow. That is because doctrine holds every entity it used in a registry, even if you do not need it. You can attempt to clean this up but then you will find another thing that doesn’t garbage collect properly and you will keep doing this on and on. The easiest way to handle this is to just kill the consumer after processing a certain number of processes and rely that the supervisor will start him right up again. This only works when bringing up the process is fast so keep this in mind.

Another common issue you will see is that when a consumer is idle for a long time the first message it gets will result an error. A famous error for mysql users is “MySQL server has gone away”. This happens because the tcp connections to the other servers time out and because they are not used no error is raised. You can handle this by restarting the process every know and then or sending keep-alive requests to servers.

Wrap-up

These are some of the most common issues I have seen when dealing with message queues in PHP. Solving them doesn’t require much work but finding out what happened is usually very difficult. Hopefully now that you know them in advance it will be much easier.

Also, when possible, use APIs for data exchange.