Debouncing Consumer Queues in Go
I’m going to make this a short one since I have a tendency to ramble. At my current place of work, we have a pretty extensive infrastructure based on Go and Kafka. Our pipeline is complex and we’re always looking for ways to keep the machine well-oiled and as smooth as can be. Most of us in this industry have a stake in our infrastructure doing our parts to keep things solid.
In one particular situation we have a Kafka consuming service written in Go which is a medium size cluster of workers. This particular cluster’s job is to index data into Elastic Search as events enter this area of our pipeline from services upstream. Nothing too earth shattering here so far but we had a recent fire where we realized that in some cases we can get into a state with our consumers where we’re re-processing our indexing work for a given document over and over…kinda like a machine gun. Obviously this is not ideal for a number of reasons and we identified the source of our runaway event stream. Fixing this problem truly lies within making sure our upstream service doesn’t get into a runaway state where it’s just asking for the same work to be done over and over again. Maybe we can chat about that in another post.
But…is there anything we can do to help mitigate our issue at the consumer level? Is there a way to make our consumer a bit smarter so we don’t have to bang so hard on Elastic Search should we get an unreasonable stream of events from somewhere upstream?
As I worked through this problem with some of my counter-parts at work I realized that I had solved this in the past already in a previous setting. For those of you who might have done Desktop UI work or even Web UI work — sometimes either the browser or OS can pump user events into your app a pace much higher than ideal. In those situations, you may end up reprocessing your work to calculate something such a position on the screen or a particular state of a UI component. Sometimes this processing step is expensive, time consuming and usually unnecessary to recalculate so often.
This is where I had remembered that back in the day I utilized a technique called “debouncing” which can be considered as a form of rate-limiting or throttling. And as it turns out doing something like this in Go works extremely well when coupled with the Select construct and Go’s powerful concurrency primitive: the channel.
I’m going to propose some code which recreates a very simplified solution in Go to debounce a consumer that is consuming from an upstream producer. In the example code below, we have 3 things which illustrate the problem at hand.
- We have a Consumer which reads off some queue (or topic in the Kafka case) and when faced with re-processing the same exact data, just simply won’t be able to keep up at the rate of events published.
- We have a function or task that does some useful work but is either time consuming or expensive in terms of CPU.
- We have a Producer that produces a stream of events where in our case we know will overwhelm our Consumer.
So let’s be clear that this strategy won’t work for all cases especially if you’re aggregating streams of data. But in our case we know that if we’re processing some work currently, and we’re asked to reprocess that same work it’s reasonable to expect that in a high throughput scenario some of that reprocessing is completely useless and really just adds contention to our overall pipeline.
The proposed solution is as follows:
- An event enters our system with the key of: “abc” and we start processing (indexing) the data.
- If another event enters our system with a key other than “abc”, we can process that in parallel along with more keys up to a reasonable bound. That is all fine and dandy and not the problem domain for this post.
- If while processing “abc” we get the same event to reprocess “abc” we will enqueue that work in a queue with a depth size of 1. Perhaps this new event came in and we genuinely should re-index Elastic Search. Also, in our case we’d rather not process the same index in parallel so this queue affords us sequential indexing for the same-key units of work.
- If yet another event comes into our consumer with key “abc” and we already have our queue full let’s just throw that event out the window because we already have a re-indexing event in our queue that must be processed once the original “abc” unit of work completes.
One side note: Our current implementation suggests that our consumers consume off our queue asynchronously depending on the amount of free workers in our worker pool. Also, this model that I’m talking about is a bit simplified in the example below and the code I will show is just a small Go script that illustrates a non-distributed example. However the principal still applies.
On to the code already:
Here we have a single main.go file with 4 functions. The main function kicks off our consumer and producer in a goroutine each and will block waiting for a signal.
The startConsumer is easy, it’s just going to loop indefinitely reading from a buffered queue with a depth size of 1.
The startProducer is modest as well: It’s simply going to loop indefinitely and generate a random number on each iteration. If we generate a 99 or greater we’ll enqueue an event if our buffer is not full otherwise we toss out that event. This really illustrates the power of the using a select construct to write data or…get off the pot. This function is meant to illustrate a producer that really produces too fast to be meaningful and where we don’t really care if we have to queue depth of 1 or 20…as long as we know we have another round of work to process it accounts for ALL of that work.
The timeConsumingWorker represents the actual function that does the actual work and simulates this by taking at least a second to complete for each invocation.
The great thing about this code is that it’s using very basic Go constructs to either write data on the channel OR if the queue is full, we already have work accounted for and can throw our excess work away. You’ll notice that we have 2 atomic variables which keep track of how many items were tossed vs processed. The tossed portion represents all those events that were wasteful and would have bombarded our system by asking Elastic Search to constantly re-index data.
And that pretty much sums up a small strategy for debouncing queues in Go. This technique can be applied to really any evented publish/subscribe system where you don’t really need to reprocess the same data but you do care that you don’t miss that perhaps crucial and last event.
Thanks for reading and always happy to get feedback on my posts.