Long-Running NSQ Consumers

Ankit Agarwal
Sep 23 · 3 min read

With the desire to consistently enhance the user experience, Engineering Team @ Tokopedia always ensures to refine system complexities to build a highly reliable system.

One of the projects required the asynchronous process to be used along with real-time updates for Server Side Events. The NSQ consumers are used to process the heavy jobs and the real-time progress was returned to the client using an API.

Image for post
Image for post
Client Initiates Process
Image for post
Image for post
Real-Time Progress Response

The flow looks pretty straightforward and yes, it is. The complexity arose when the data processing by the NSQ Consumer was taking longer than usual and we started noticing re-queuing. The same message was now consumed and processed by consumers running on other instances. Due to this, the message was getting processed by multiple consumers simultaneously and thus lead to corrupted data on Redis.

Sherlock Holmes at Play

On investigation, it was found that after the default time (60 sec) allocated for the message processing is complete, the message gets timeout out by and it was re-queued. The other hungry handlers consume the message for processing.

The Climax

NSQ provides which has the responsibility to inform that the message needs more time to be processed. We started sending events to after an interval of 30 seconds. This provided relief to some extent and the re-queued messages reduced but were not gone completely.

Deep diving into the documentation and the code base, led to interesting findings. The parameter on behaves as the global setting for the maximum time allocated to the consumer to complete the process, and it’s value in our case is . The is able to keep the message on hold only until this value.

Resolution

Being a centralized server, modifying the parameter was out of scope and need to find another approach to handle the re-queued messages. Since Redis contains the real-time processing details of the data, we used it as the source of information.

The data on Redis for a key combination of (id, fileName) contains information about the count of records processed. The consumers are modified to check this count before processing the message. If the count > 0, the message is discarded by the consumer as it is already being processed by another consumer.

In this code segment, function GetProgress() returns the data from Redis and is stored in currProgress. It contains information about the records processed by the consumer. When re-queued, the same information is fetched by another consumer and if currProgress.ProcessedCount > 0 satisfies, the message is discarded by the new consumer.

This implementation helped to attain the atomic behavior of the consumption of the messages. Though, this could not help to reduce or avoid re-queues but led to a stable process.

Stay tuned for the next chapter on Graceful Shutdown of Consumers !!

References

  1. https://nsq.io/clients/building_client_libraries.html
  2. https://nsq.io/clients/tcp_protocol_spec.html
  3. https://github.com/nsqio/go-nsq/issues/266
  4. https://github.com/nsqio/nsq/issues/1028
  5. https://godoc.org/github.com/nsqio/go-nsq#Touch

Tokopedia Engineering

Story from people who build Tokopedia

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store