Google trends is really helpful when it comes to analyze global data of the top search queries. In this article, we will explain how we build our own model, and more precisely how we were able to draw a graph of the posted ads popularity in overtime:
The first step is the datasource, our data is stored in many places with different rules. In order to have a better and easier management of our data, we built some Kafka topics (discover the process in our previous article). One of them contains all the ads information for every update that have occurred in the past 10 months.
Every events contains the text from both the subjects and the body of the ad, so we can easily filter it for the published ads.
The next question is where to store the data!
Storing everything in memory
The simplest idea was to store everything in memory. Since we are using golang, we tried using a map with a channel to synchronize all workers.
While each event has a precise timestamp it seems that storing time series for each word is more precision than required and is likely to consume way too much memory. We decided to do the second best and aggregate data to the day. The date of the first day was kept in memory and used it to generate 0 based indexes matching each day since the first.
That means for each word we had a slice. The n-th element of the slice would be the number of times this word has appeared on the n-th day since the first event on the topic.
Here, the final data structure:
Multiple workers goroutines sharing the same group ID would read events, clean and normalize the text and send words one by one on a channel where the goroutine will handle the map.
This worked in the staging environment where we have a small amount of data but it failed to scale in production. We actually elevated cpu usage (due to the map size and the garbage collector) and memory consumption (going above the pod limits which triggered OOM kill). This was expected even though the topic only exists since approximately 300 days, it’s already 12 gigabytes of data of Avro marshalled data in our topic.
Storing everything in Redis
The second idea was to move storage to a dedicated service, in our case Redis. This is still an in-memory non-durable storage solution but the service is built to store huge amounts of key value pairs. Redis isn’t the database we’re the most experienced with so we struggled a bit to find the proper data type structure. The whole database is a K/V store so it pretty much replaced the map of our naive implementation, but slices were harder to find replacement.
We needed something where all commands were atomic; whether the word was already in the Redis or not, and whether the day already had events for the day. That way we could call our command without having to check if the data structure is initialized or not, which ensures no datarace between our workers and no lock in front of the Redis client.
So, our winner is the : HSET Redis data structure, and the keys in the hash are the 0 based index of our counts of day.
So the basic command operated by the Kafka consumers is :
HINCRBY word idx 1
As the doc says:
If key does not exist, a new key holding a hash is created. If field does not exist the value is set to 0 before the operation is performed.
Even though it’s an hash we are still using it as an array. Indexes in the array are still generated by our Kafka consumer on the first time stamps. In order to be able to start a new consumer that share the same Redis and consumer group, the timestamps are also stored in Redis.
Now that we have plan that worked in staging, we made an attempt in production. And guess what ? It was flawless. 🎉
Except, until now we were only indexing words in the ads subject to simplify the process and get an overview of the task. Also indexing the content could probably multiply by 5 the size of the data we are treating.
We began by increasing the size of our Redis instance from 250M to 1.2G and we restarted the consumers from the beginning of the topic.
Unfortunately, it failed on halfway through our reading because we reached the maximum capacity of our Redis. We thought about doubling the size of our Redis, but it would have been too complex to handle.
At this point two obvious solutions were possible: store less things or use a less consuming way of storing. Redis has the ziplist optimization for small hash. It means that a hash with less than 512 keys is stored in a list which reduces its memory overhead per element. Our use case falls right into it because we only have 300 days of data so it seems we’re already pretty optimal on this side.
Understanding our data
First things first, when dealing with storage issue you want to get a deeper look at your data. It’s important to check if there is anything specific you could do to reduce your memory usage. Next step is to reindex with a smaller panel, in this particular case we worked with only 10% of the ads, using a modulo on the IDs to get a better idea of what we were storing.
Then we noticed many “useless keys”, they are words that appears rarely, i.e. typos on words. Considering we want to monitor trends for a word, these were useless.
We used a SCAN loop to check all the keys, grouped by log 10 of the total sum. We were able to distribute the words counts on the whole period like so :
This means that “2864648” words appear less than 10¹ times in the 300 previous days. Granted this might more be ~100 times since we only indexed 10% of ads. Monitoring these words would be quite useless, so we won’t store them.
Now we want to select the key to store, since when reading a Kafka topic, we have no idea of how many times the word will appear.
Storing randomly in Redis
The simplest idea was to use the random probability distribution to only store the word p/n times (with p < n). Hopefully this meant we’d have p/n time less small keys (less than 10 total count) while just losing precision on the useful key. When fetching the data, you multiply the counts per n/p to get an approximation.
This worked well and after some few attempts we went to p = 1 and n = 4 without exploding our Redis memory. The resulting graph was a bit too imprecise for us, but it confirmed that the keys with a small sum take a lot of space in total.
Removing useless keys
Since it’s impossible for us to identify which key is important to keep at loading time, we’ll have to delete the “less” useful keys. The trick is to remove them when we know they are too small to be useful, still we don’t have enough information that the Redis is full.
In practice we made a worker using SCAN to iterate the whole keyspace and apply a few rules:
- the number of entries for this key is less than 10
- we have processed at least 10 days
- no entry in the last two day
In that case the key gets deleted.
So we re-indexed the full topic while having the worker running at the same time. There is a small window until we have processed 10 days of the topic where the number of keys increases up to 8M then it drops drastically and settles down at less than 1M.
Another view is the memory uses per key, which is low at first and then multiply by four while keys get deleted.
We deleted 38 millions keys while indexing the whole topic, we are still close from 1G, but settled around 420MiB. It only took 7 hours to process the 12G of Data from Kafka.
Now I can comfortably see how many people sold pistachio on leboncoin.
A few takeaway:
- Redis is a very powerful tool
- To optimize, know your data
- It’s fun to iterate quickly
- Also store word pairs if they are popular enough (like “animal crossing”)
- Detect growing trends