Demand Forecasting Tech Stack @ Walmart
Choosing the right foundational elements for the biggest forecasting platform at Walmart
Continuing on the discussion that started in Pillars of Walmart’s Demand Forecasting, technology choices that you make in any product will go a long way in ensuring that you have a solid foundation for enabling features built on it.
In this post, we will look at the most important aspects that should never be ignored in choosing technology components
Overall Product Needs
- Non-negotiable run-time SLAs to generate the forecast (less than 4 hours every day)
- Host a few 100’s of billions of data points for Demand Workbench with serving SLAs of less than 100ms
- Flexible runtimes for Data Scientists to try out different algorithms
- Cloud-native approach, enabling to onboard (few or all components) to private or public clouds
Guiding Principles in selecting the Tech Stack
- Be clear on what you are looking for. Write down the Non-Functional needs (like SLAs, latency, throughput, etc.)
- Prefer Open Source with good community backing. But, evaluate support options. It is difficult to have expertise within your team for all components, even if it is Open Source.
- Prefer components built with a Scale-Out design
- Always do a Proof Of Concept (POC) to check your product needs. Never go by any blog’s or technical articles evaluations.
- Keep the core functionality cloud-agnostic
- On-Demand scaling for most of the infrastructure components
- Configurable Telemetry to ensure we capture key metrics around the health & performance of the system
What we need
- Scalable & Distributed storage, supporting a variety of datasets, not just as SQL tables
- Support multiple compute engines like Spark & Hive
- Scheduling & Orchestration of data engineering pipelines
- Prefer a programmer-friendly tool, rather than configuration driven.
- Easy Monitoring. Web Interface is better
- Access controls over actions. Ex: Who can monitor a pipeline Vs who can trigger on-demand
- A Hadoop Distributed File System (HDFS) compatible storage was the strong contender & probably was the easiest decision.
- Also, many cloud providers offered HDFS compatible file systems (Azure Blob, S3, Google Cloud Store, etc.).
- Parquet with Snappy compression was also better compared to others. We could have also chosen ORC, but we just stuck to parquet as we did not find any major differences in the performance of the Jobs using either of the formats. By the way, there are a lot of articles comparing these two formats, choosing one over the other. But remember to test drive with your use case & pick.
Map/Reduce is a very popular & oldest among all the engines supporting BigData workloads. It has served its purpose quite well for many years & it is time to look ahead.
Hive is probably the most commonly used compute engine for BigData workloads. With Hive, we have to always express jobs as SQL queries. While it is not impossible to build complicated SQLs, it has its own challenges,
- Not all operations can be easily expressed via SQL. Ex: Recursions
- The only way to make the execution configurable is to templatize the SQL code or generate the queries via code.
- This not only complicates the development but also makes the code base very difficult to maintain
- It is not uncommon to see SQL queries spanning a few 100s of lines!
Spark needs no introduction & has revolutionized how Data Engineering is realized in products dealing with BigData. It also supports SQL & is backed by a strong community (ever-growing).
Also, we chose Java as our mainstream programming language. This is an interesting choice given how popular Scala is when workloads are written using Spark.
Some of the considerations (debatable 😃) that favored Java in our use case are,
- While functional programming is awesome, Scala is not a pure functional language. You can easily end up writing Java code in scala.
- Hiring Java engineers are relatively easier compared to hiring good Scala developers with functional programming skills.
- Agreed that Java is very verbose (when we use Spark), but most of our initial set of engineers were very good Java Programmers coming from Map/Reduce era.
Now, after 2+ years of development & production runs, when you look at the codebase & adding more features, we don’t regret our decision
Scheduling & Orchestration
Most of the early Data Engineering workloads followed a design of configuring pipelines, rather than authoring them as code. Some of them went a little ahead in designing a User Interface to support development via Drag & Drop. While this approach served the Data Engineering community for some time & some enterprise tools are still in heavy use today, the community is shifting towards more programmer-friendly tools.
Oozie is probably the most widely used tool for both scheduling & Orchestration. But the problem with Oozie is its inability to program & create dags/dependencies dynamically, rather programmatically.
Apache Airflow to the rescue. It was a tough decision to make & we did try out the tool for close to a month before finalizing.
Some of the positives first,
- Very easy to monitor the workflows & managing their run. You do not need to get away from the Web UI to debug issues
- Programmable workflows. We can also use Jinja templating. Also, generate the DAG, say based on certain configurations.
- Heavily Customizable. You can define your own operators.
- Good support for BigData workloads, but can be used for any workflows (Non-Data as well)
- Schedule DAGs on time or on data availability (via Sensors)
- Rich CLI & decent REST support
Some of the drawbacks now,
- More than 1 scheduler will create a mess. So to achieve HA, you need to use an active-passive design (Implemented by teamclairvoyant)
- Setup with many moving parts (RabbitMQ/Redis, MySQL, Celery, etc.) is a little intimidating, especially in a non-container world. For Kubernetes world, you will definitely love Airflow Helm Chart
- You can easily create havoc in your data if you don’t understand some concepts very well.
- Ex: Backfills are very helpful in many cases, but can be very error-prone
- Imagine a DAG (with Cron Schedule) turned off for few days (for whatever reason) & upon re-enabling, the catchup will run every missed schedule.
- If your jobs are not designed to be idempotent, God Save You!
What we need
- Development workspace allowing Scientists to test their code in isolation
- Model Propagation. This is like the CI/CD for ML models.
- Ability to run different algorithms needing different runtime like R, Py, C++, etc.
- Scale & run models at Walmart scale (Atleast 30000 clusters to train & score)
Scaling our production workloads can be achieved in 2 ways,
- Distributed. Frameworks like Tensorflow allows this
- Parallel. When we can run 1 group in isolation, we can also run N in parallel, provided we can co-ordinate the runs
The approach of dividing the store-items (via a suitable Clustering algorithm) & then taking up training or scoring for each worked well for our needs.
Containers (Docker) allowed us to have flexibility in isolating each algorithm environment differently.
From a very high level, we ended up having different base dockers for each type of run time & extending them with our algorithm code
- GPU based (Including NVIDIA libraries & Custom GPU algorithms)
- CPU based (Including custom algorithms & dependent libraries)
K8S (Kubernetes) allows us to schedule & orchestrate Docker containers like below.
- Use kubectl commands to create Job Specification (Including what code to run, resourcing needs, etc)
- Submit the Job Specifications to K8S, via the Master
- Master will then take over in scheduling these Jobs & running them
- Monitor the job runs & take actions to ensure Job completion
We are running close to 30,000 jobs in the manner orchestrating the run to finish within 2–3 hours for generating all forecasts, every single day.
Model & Meta Store
- Models, which are the result of Training, have to be stored. These are serialized versions of calculations that will be used during scoring.
- Though they are of blob nature, they can be significantly larger in size. Storing them in traditional databases like MySQL might not be good always
- Size can vary from a few 100’s of MBs to few GBs as the training data & features are derived from many years of historical data feeds
- Storing the actual blob in an Object store (Like OpenStack swift or Azure Blob) & saving their location with other meta information like date/time, container details, etc is more meaningful
What we need
- APIs powering some of the descriptive analytics applications, including our own Demand Workbench
- Ability to record transactions like managing forecasts
- Throughput of at-least 5000 RPS (Requests Per Second)
- Latencies of less than 100ms for all APIs
- Potentially used in some realtime pipelines of fulfillment which cannot tolerate high latencies
- Prefer Polyglot, rather than one trying one Database fits all approach
- Most of the transactional data are also relational in nature & are ACID compliant
- Fit for any RDBMS systems like MySQL/MariaDB
- Size of the data set (500Million+ store-item metrics & 100 Billion+ Data points, amounting to 50–60 TB of raw data)
- Multiple forms of searches (usually on more than 5 fields of an item/store attribute)
- Need for faster, on the fly aggregations at different dimensions
- Some facts (even history!) & dimensions change every day, some every week
- Loading throughput in the range of 1.5 Million documents per second
These unique needs necessitated us to look at systems like Cassandra, Solr & Elastic.
Finally, after a thorough POC simulating the real workloads, we selected Elastic for our needs. Elastic not only allowed us to build indices where we can typically search on a multitude of Item/Store attribute & also aggregate the results within 100ms.
In this post, we saw what all factors went in to consider a tech stack for the most demanding forecasting application in Walmart.
In subsequent posts, I’ll cover some of the best practices that we learned over the course of building & operating this application.