With the desire to consistently enhance the user experience, Engineering Team @ Tokopedia always ensures to refine system complexities to build a highly reliable system.
One of the projects required the asynchronous process to be used along with real-time updates for Server Side Events. The NSQ consumers are used to process the heavy jobs and the real-time progress was returned to the client using an API.
The flow looks pretty straightforward and yes, it is. The complexity arose when the data processing by the NSQ Consumer was taking longer than usual and we started noticing re-queuing. The same message was now consumed and processed by consumers running on other instances. Due to this, the message was getting processed by multiple consumers simultaneously and thus lead to corrupted data on Redis.
Sherlock Holmes at Play
On investigation, it was found that after the default time (60 sec) allocated for the message processing is complete, the message gets timeout out by
nsqd and it was re-queued. The other hungry handlers consume the message for processing.
TOUCH which has the responsibility to inform
nsqd that the message needs more time to be processed. We started sending
TOUCH events to
nsqd after an interval of 30 seconds. This provided relief to some extent and the re-queued messages reduced but were not gone completely.
Deep diving into the documentation and the code base, led to interesting findings. The
-max-msg-timeout parameter on
nsqd behaves as the global setting for the maximum time allocated to the consumer to complete the process, and it’s value in our case is
300 seconds. The
TOUCH is able to keep the message on hold only until this value.
Being a centralized
nsqd server, modifying the parameter was out of scope and need to find another approach to handle the re-queued messages. Since Redis contains the real-time processing details of the data, we used it as the source of information.
The data on Redis for a key combination of (id, fileName) contains information about the count of records processed. The consumers are modified to check this count before processing the message. If the count > 0, the message is discarded by the consumer as it is already being processed by another consumer.
In this code segment, function GetProgress() returns the data from Redis and is stored in currProgress. It contains information about the records processed by the consumer. When re-queued, the same information is fetched by another consumer and if currProgress.ProcessedCount > 0 satisfies, the message is discarded by the new consumer.
This implementation helped to attain the atomic behavior of the consumption of the messages. Though, this could not help to reduce or avoid re-queues but led to a stable process.
Stay tuned for the next chapter on Graceful Shutdown of Consumers !!