Long-Running NSQ Consumers
With the desire to consistently enhance the user experience, Engineering Team @ Tokopedia always ensures to refine system complexities to build a highly reliable system.
One of the projects required the asynchronous process to be used along with real-time updates for Server Side Events. The NSQ consumers are used to process the heavy jobs and the real-time progress was returned to the client using an API.


The flow looks pretty straightforward and yes, it is. The complexity arose when the data processing by the NSQ Consumer was taking longer than usual and we started noticing re-queuing. The same message was now consumed and processed by consumers running on other instances. Due to this, the message was getting processed by multiple consumers simultaneously and thus lead to corrupted data on Redis.
Sherlock Holmes at Play
On investigation, it was found that after the default time (60 sec) allocated for the message processing is complete, the message gets timeout out by nsqd
and it was re-queued. The other hungry handlers consume the message for processing.
The Climax
NSQ provides TOUCH
which has the responsibility to inform nsqd
that the message needs more time to be processed. We started sending TOUCH
events to nsqd
after an interval of 30 seconds. This provided relief to some extent and the re-queued messages reduced but were not gone completely.
Deep diving into the documentation and the code base, led to interesting findings. The -max-msg-timeout
parameter on nsqd
behaves as the global setting for the maximum time allocated to the consumer to complete the process, and it’s value in our case is 300 seconds
. The TOUCH
is able to keep the message on hold only until this value.
Resolution
Being a centralized nsqd
server, modifying the parameter was out of scope and need to find another approach to handle the re-queued messages. Since Redis contains the real-time processing details of the data, we used it as the source of information.
The data on Redis for a key combination of (id, fileName) contains information about the count of records processed. The consumers are modified to check this count before processing the message. If the count > 0, the message is discarded by the consumer as it is already being processed by another consumer.
In this code segment, function GetProgress() returns the data from Redis and is stored in currProgress. It contains information about the records processed by the consumer. When re-queued, the same information is fetched by another consumer and if currProgress.ProcessedCount > 0 satisfies, the message is discarded by the new consumer.
This implementation helped to attain the atomic behavior of the consumption of the messages. Though, this could not help to reduce or avoid re-queues but led to a stable process.
Stay tuned for the next chapter on Graceful Shutdown of Consumers !!
References