Motivation and Introduction
As an engineer in the Data Factory group at SimilarWeb you quickly learn that providing our clients with accurate data and insights is our top priority. Doing so often requires processing GBs and even TBs of data in a short timeframe.
This is a challenge I enjoy taking on, and why I’m excited to tell you about the time when we optimized one of our most demanding services. By doing so we increased it’s processing speed x28 and decreased resource usage by x10!
Make ~150M calls to an external API to fetch the data and process it in order to have it ready for use by our data analysis team, all within a couple of days.
If you’re in any way familiar with big data processing you must be thinking “Processing 150M results in a distributed way using Spark/MapReduce shouldn’t take too long. Nick, that’s not a challenge”. I mean… you’re not wrong.
The challenge is fetching the data from this API as quickly as possible for it to be processed in time. Given the limitations we have with that API.
What we started with
Initially we wanted to verify the relevancy of the data to our needs.
To avoid committing a lot of engineering time to a task that might not pan out, we decided on using some of our existing code to make all these API calls, as a test run. We were able to use that code because it was already being used for continuous processing of similar looking data on a smaller scale.
Said existing code was a NodeJs service running in docker container (node:10-slim image), orchestrated by Nomad and given 256Mhz of CPU and 128Mb of memory.
In order to get to around 500 Request/sec we deployed 700 containers, those services could process these requests in an astonishing rate of >1 Request/sec per instance. Meaning 179,200Mhz (179.2Ghz) CPU and 89,600Mb (89.6Gb) memory is reserved for this process ALONE on our cluster! (that and we’re barely hitting the half mark of the allowed rate for this API)
Now. I’m willing to acknowledge this service is not optimized for speed, or maybe at all, but it worked great for our ongoing use case.
Some notes regarding the lack of optimization for the curious folks.
- Input messages were read from Amazon SQS one by one.
- Output records were written to Kinesis Firehose one by one.
- Barely any asynchronous code (remind me, why did we use Node here if we’re not utilizing its strengths?)
Optimizing the NodeJs service
Knowing I’m (not) a NodeJs expert, I started this adventure by putting my thinking cap on and getting my best rubber ducky to debug whatever code I might produce and started optimizing.
First things first, the simple fixes. Increasing the number of messages pulled from SQS at a time, from 1 to 10. I did it to save some time on the network overhead of each call. Batch puts to Kinesis Firehose were another easy fix that saves tons of time on network overhead, because in this case you can batch up to 500 messages at a time.
With the simple fixes out of the way I had to get some proper asynchronicity into this service. This is where my thinking cap came into play and I started looking into a way to process the 10 messages I was now getting on every pull from SQS.
In my search for an efficient way to work asynchronously I came across the
workerpool package. And by looking at the documentation it seemed just perfect, I mean look at the features:
- Easy to use (you love to hear it)
- Runs in the browser and on node.js (NodeJs ✓ )
- Dynamically offload functions to a worker (exactly what I’m looking for)
- Cancel running tasks (great in case of a premature shutdown)
- Set a timeout on tasks (good to put up boundaries)
- Handles crashed workers (shit happens)
- Small: 5 kB minified and gzipped (*chef’s kiss*)
Told you. it’s perfect!
In Node.js everything runs in parallel, except your code. What this means is that all I/O code that you write in Node.js is non-blocking, while (conversely) all non-I/O code that you write in Node.js is blocking.
Did I mention that I’m no NodeJs expert?
The problem I encountered with this package though, was a DataCloneError whenever I attempted to use functions from other files in the project. Basically those functions went unrecognized by the workers and could not be serialized.
Why use a fancy package to do it for me when I can start the number of workers I need and reuse them?
What I ended up doing
In each instance I start, let’s say 5 worker threads (https://nodejs.org/api/worker_threads.html), each thread runs in a loop as long as there are messages to process. Each thread reads 10 messages and processes them asynchronously by offloading the processing to another function. The threads run said function asynchronously for each message and then block while waiting for them to finish. Meaning we now have 50 messages being processed asynchronously, up from 1!
One more thing
If you read the foreshadowing warning a few paragraphs ago you might have guessed about the main problem with using NodeJs for this workload.
NodeJs is great for IO, which is what we have here, right?
- Get messages from SQS. IO
- Getting the data from the external API. IO
- Gzip compressing the data so it’s small enough to be sent over Kinesis Firehose, especially in batches. NOT IO
- Base64 encoding the data to be sure it’s of a valid format for transfer through Kinesis Firehose. NOT IO
- Sending the resulting data to Kinesis Firehose. IO
Whelp… ⅖ steps are not IO and can only be optimized by horizontally scaling the service.
All my efforts were rewarded though. With just under 2 Requests/sec per instance processing time. 2x faster than before. ~300 containers down from ~700.
Some wise words from a python core contributor Raymond Hettinger came to mind.
Rewriting in Go
Another language I’m familiar with and am actually more fluent in is Go.
It’s a language with a great standard library, amazing community and most importantly for this specific case, Go can handle non IO tasks efficiently.
After you’re done reading you might want to check it out here:
There’s a nice tour you can take in order to discover all the amazing features this language has. And when you’re done you can play around with Go right there in the playground, no installation or download required.
Back to business.
This service, just like our original service runs in a docker container (golang:1.13.4-alpine image), is orchestrated by Nomad and given 1000Mhz of CPU and 512Mb of memory.
Some basic Go terminologies
Goroutine — A goroutine is a lightweight thread managed by the Go runtime. You can start a goroutine simply by using the
go keyword. e.g.
go SomeFunction() https://tour.golang.org/concurrency/1
Channels — Channels are a typed conduit through which you can send and receive values with the channel operator,
<- By default, sends and receives block until the other side is ready. This allows goroutines to synchronize without explicit locks or condition variables. This is the behavior of an Unbuffered Channel. https://tour.golang.org/concurrency/2
Buffered Channels — This type of channel has an internal buffer of a predefined size, meaning messages can be put in it and be received in a later time. Or in other words, sends to a buffered channel block only when the buffer is full. Receives block when the buffer is empty. https://tour.golang.org/concurrency/3
Using these basic built in features we can achieve greatness. Goroutines for concurrency, channels for synchronization and communication between goroutines.
Before we get to the design schematic please refer to the legend.
The initial design schematic.
What we have here is a single goroutine reading messages from SQS, 10 at a time and putting them into A, an unbuffered channel. Listening to that channel is a single goroutine who’s job consists of parsing every message into something we can use and passing it on into B, another unbuffered channel. Reading from it are multiple goroutines that will process all incoming messages.
At this point any available goroutine will pull a message from B, process it and put the result into C, a buffered channel that has an internal buffer size matching the batch size we use when putting messages into Firehose. Due to message size constraints in this case I limited the batch size to 50.
On the other hand, if all message processing goroutines are busy processing messages, the parsing goroutine will block on send to B (no one is ready to receive from it), which in turn will lead to the message receiving goroutine to block on send to A for that same reason.
SQS message reading and message parsing goroutines will unblock as soon as any processing goroutine will be ready to receive a message from B.
This redesign and rewrite resulted in a significant improvement over our initial NodeJs service. We got to ~700 Requests/sec (up from 500) with 25 instances (down from ~300). Or around 28 Requests/sec per instance
If you’re still following along, this translates to:
- More than x14 improvement in per instance processing speed.
- Total allocation of 25Ghz (down from 179.2Ghz) CPU and 12.5Gb (Down from 89.6Gb) memory, that is more than x7 decrease in CPU and memory allocation compared to our original service.
Now this is fast! And as a bonus to the performance boost we are saving some serious dough ($$$) with the reduction in resource allocation.
Finally! I was done and could move on to my next task for that iteration.
Or so I thought.
On second glance the way this service works reminded me of something I heard about a while back. The pipeline pattern in Go. There’s no formal definition of a pipeline in Go; it’s just one of many kinds of concurrent programs. Informally, a pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function. (https://blog.golang.org/pipelines)
Sounds a lot like what is shown in the design schematic, but we’re not there yet, not exactly. We do have a series of stages connected by channels. But only one of our stages is a group of goroutines, other than that we have single goroutines doing the work.
Pipeline pattern, the right way
In addition to using groups of goroutines over single goroutines in most stages, I decided on switching to buffered channels. The reason I switched to buffered channels was because I wanted to save time on overhead for each stage.
For example: if all processing goroutines are busy, we used to have the parsing goroutine block and wait for someone to be ready to receive a message from it. Meaning the parsing work will only be done when someone is ready to process the message. Using a buffered channel we can “pre-parse” lots of messages even if no goroutine is available to process them at the moment. This way we won’t have to wait for the parsing to happen whenever a processing goroutine is ready for it because we already banked a bunch of parsed messages in the buffer.
The overhead is even worse when it comes to the goroutine reading messages from SQS, the network overhead is far larger than the overhead we experience when parsing.
The final design schematic:
For the following explanation let’s define X=10.
As you can see, this schematic doesn’t look that different from the initial one.
- X goroutines are reading messages from SQS (each 10 at a time) and putting them into channel A, a buffered channel of size X*10.
- X goroutines are parsing the messages from channel A before passing them on to channel B, a buffered channel of size X*10.
- 75 goroutines read the parsed messages and process them (75 is a number I got to by benchmarking performance for different values). The value is then put into channel C, a buffered channel of size 50 to match the batch size we use when putting messages into Firehose.
- Only 2 goroutines are reading those results and putting them into Firehose in batches of 50. 2 goroutines are enough for the job because batch puts happen less frequently, once every 50 results.
These simple optimizations resulted in even more improvements over our simpler Go service. We ended up with ~840 Requests/sec (up from ~700) with 17 instances (down from 25). Or around 50 Requests/sec per instance.
Comparing our final version to the first version of the Go service we can see nearly x2 improvement in processing speed, or x28 compared to our original NodeJs service, and total allocation of 17Ghz (down from 25Ghz) CPU and 8.5Gb (Down from 12.5Gb) memory. That is more than a x10 decrease compared to our original NodeJs service.
The number of calls we need to make can vary from month to month, it might end up being 150, 160 or even 170 Million calls. But with our newfound speed we are able to go through the whole list in just over 2 days!
What this all demonstrates is that you should always strive to find and use the right tool for the job. Going with the easy solution would have ended up costing us at minimum 8 times more than what we ended up paying for the used resources in our final version.
Some key takeaways :
- NodeJs is great for asynchronous IO operations but falls short when it comes to CPU intensive work.
- Go can handle both CPU intensive and concurrent workloads well.
- Picking the right tool for the job is one of the most important decisions you will make in any project.
- The pipeline pattern is a simple way you can dramatically speed up your services.
There are further optimizations possible. Using the GOGC and GOMAXPROCS environment variables we can speed up each container even more,Optimizing the service using these environment variables will speed up each container but will require more allocated CPU/Memory.
For more information about these and other useful environment variables, check out Dave Cheney’s blog https://dave.cheney.net/tag/gogc