The story of battling with 10x Traffic
Last year in my previous post, I described the overall architecture of our real-time spark job pipeline for Walmart’s A/B testing platform, Expo. Since then there have been a lot of changes in our systems. Given the pandemic situation, the number of online shoppers has grown significantly. We observed a 5 to 10 times increase in the amount of traffic ingested by our monitoring pipeline overnight. Thus, I am writing this blog post to briefly go over how we moved towards the cloud smoothly, and the changes we made to our architecture in response to the challenges we faced.
In nutshell we have a spark structured streaming job ingesting data from Kafka, keeping the state of sessions in memory and checkpoints, and pushing metrics into our time-series database for further evaluation by the experimenters. While having more traffic, overall, is very good for Walmart.com, it means more stress on our monitoring system causing delays when pushing the metrics to the pipeline.
In this post, I will go through the overall challenges that we had and the actions that we took to handle this enormous traffic. The current system is running for the last six months with almost zero downtime. Currently, it processes 150k-250k messages per second and pushes around 3 million records per minute out and the delay between the time we start an experiment till we have metrics in our database is less than 3 minutes. On a normal day, our system runs around 100 –150 concurrent experiments which amplify the magnitude of data the monitoring system processes.
To recap I copied the overall architecture here, for more detail you can read my previous post. I also put the new architecture below as a reference for the rest of this post.
The issues and the actions
In this section, I categorize the problems that we faced and the actions that we took to overcome the problem. One major change that we made was moving the entire pipeline to google cloud to make sure we will have enough resources to scale out. The previous job was on our private data center and we were running out of our team’s reserved capacity so we decided to move the entire pipeline to the GCP. My focus for the rest of the article will be on the problems that we observed in the pipeline per se.
Kafka and dispatcher rate
In general, each streaming job is either CPU intensive or I/O intensive. Though the nature of our monitor system is different. We have three stages in our streaming pipeline. In the first phase, the job fetches around 150GB(2–3GB/second) of data from Kafka which is a heavy I/O operation. In the second phase, it starts processing the data from the previous stage and create/update the sessions in the system. This step is CPU intensive. The third phase is sending out the metrics to the database. This step doesn’t take too much time.
- Ingestion Rate: We have a stream dispatcher which evaluates the event and decides to redirect the event to other topics for further processing. This redirection doesn’t take any time and the dispatcher can send data at a higher speed than the consumer. We noticed that dispatcher is sending data to the topic at a higher rate than what we can consume(I/O) and process(CPU).
- Drop unnecessary data as early as possible: One of the early actions that we took was to reevaluate thoroughly the types of beacons that go to the topic. Expo has been running since 2015 and we have a decent number of historical metadata. These data show the types of beacons our experimenters are more interested in. We evaluated the historical metadata against the beacons in the topic and found out around 43% of data in the topic never affect the result of the experiments. Thus we filtered out those beacons early in the dispatcher. Filtering in dispatcher is cheaper than the job’s pipeline as every second matters in streaming job and each small milliseconds can accumulate to cause a big delay. This change alleviated the delay and the pipeline was able to catch up.
- Split data by traffic pattern: We separated the traffic for mobile and web and redirected them to their own topics. This gave us the opportunity to create a separate job for each topic and process each topic independently of the other.
- Add more partitions and computes: The degree of parallelism in a Kafka consumer group is bounded by the number of partitions. If you have enough resources, more partitions lead to higher throughput. We increased the number of partitions from 360 to 490 for each topic which helped us to fetch data faster from Kafka. We also added the number of computes in our spark cluster to overcome the CPU-intensive part of the job.
The other issue was KairosDB. KairosDB is a wrapper around Cassandra which exposes a rest API to the outside.
- KairosDB ingestion rate: Before the pandemic, we were happy with KairosDB but with each spike in traffic, we found that we cannot trust it as a reliable component and we need to think seriously about this part of the system. KairosDB Internally uses a local queue to handle the high throughput traffic but during burst traffic, it drops data and also pushes data with delay. We had a few occasions even before the covid-19 that we had a spike either by bots or some traffic burst (like PS5 deals) that KairosDB disappointed us.
- KairosDB memory management: Another problem with KairosDB was memory exceptions when we queried for a wide range of time. Because of this memory mismanagement, we had to restart the service on multiple occasions.
- Data retention policy: The metrics in KairosDB are not compressed and it was hard for us to accept requests from other teams to increase the retention period or add more metrics to the current pipeline. We set a two days TTL policy in our real-time pipeline while we had a lot of requests to increase that number. It was even hard for us to add more metrics easily. Every time we had a request to add new metrics, we had to do it with caution. We also had to push back the request like capturing the click and impression metrics for images with our monitoring system as it almost would have doubled our captured metrics.
- Evaluate other options: Even before the traffic increase, we noticed that KairosDB can possibly be an issue. So, we started to replace our time-series database. We evaluated multiple databases and we ended up with two options: Elasticsearch and M3DB. While Elasticsearch is a powerful search engine and its aggregation framework could help us a lot, we found it takes a lot of memory and space compared to other time-series DBs. It was also getting slow to the queries as the size of data grew up. On the other hand, M3DB really shined. We doubled the load to the system by running two separate jobs pushing production data to M3DB and let it run for two weeks. M3DB was able to easily handle the load. The other factor that made M3DB outstanding was the ability to compress the data which helped us to extend the metrics’ TTL to 30 days in our production. We also added metrics like impression and image clicks (with multiple attributes and tags) to the system.
- Give a time for the database to ingest: We put Kafka as middleware between our spark app and M3DB. Pushing to a Kafka topic is faster than pushing to a database since it just appends to a segment file which can also be done in batch. With this setup, another job can send the metrics to the desired database. For KairosDB we set up a logstash cluster and for M3DB we used an internal java application to ship the metrics to the database. We extensively tested both databases head to head by comparing the results from KairosDB with M3DB through Grafana charts to make sure we don’t see any inconsistency because of this transition.
With the above changes, we were able to add more metrics without being worried about the stability of the system, increased the retention time to 30 days, and had a zero data drop in metrics for the last six months.
Code and config coupling
- convolution of metadata and code: The previous implementation of UI was directly calling KairosDB to get the metric and calculate the hardcoded formula based on those. It was hard to add a new formula to the system. A formula is a combination of different metrics. For example conversion rate comes from dividing two different metrics, the number of customers divided by the total number of visitors. There are many times that experimenters required a new formula which is a combination of current metrics in our system but in order to add it, the team needed to change the code. This part of the code was convoluted with Expo UI services and it was hard to maintain.
Time series database wrapper service(tsdb-ws): When we decided to switch KairosDB with another unknown data source, we knew that eventually, we need to change the UI code as well. So we decided to unify the interface of communications with our current and future data source (KairosDB, M3DB, Elaticsearch).
For this purpose, we developed a time-series database wrapper service(tsdb-ws) using koa, a node.js framework. In order to do it, we separated each tenant with a logical keyspace and defined the meaning of each formula for that tenant, and put this relationship into a repository. Each formula can be a template literal. In this case, UI will send the proper payload to make the full expression.
Besides the formula section, we also have a metadata section in the config file so that provides everything UI needs to render the chart (like the title of the chart, this is a percentage chart or a number chart and etc)
The UI app hits
/<:keyspace> with proper data asking for multiple formulas in batch for a given time period. The tsdb-ws service looks at the list of formulas, finds the dependency graph between formula and metrics, and based on that fetches the entire data with one call. The return values are decorated with metadata of the formula.
With this architecture, we don’t need any code change. Every time the experimenter needs more formulas to be shown in UI, we can easily add them into the repository, and it will be rendered as a chart in UI.
Also, this setup helped us to have a smooth transition from KairosDB to M3DB. The UI doesn’t need to know what is the underlying database we are using. When we felt M3DB is ready and we have a proper setup, we just switch the flag in tsdb-ws to use M3DB and UI didn’t even know about it.
We started to build a highly available monitoring system for Expo around two years ago. For the last two years, our architecture adapted to different circumstances. The current system is working smoothly and we have plans to make sure it’s scalable for our next 10x traffic. We are now confident that this is the right strategy to take for us and for our clients.