From data streams to a data lake
Streams are what we call the raw measurement sequences that define a Strava activity, such as latitude/longitude, time, heartrate, power, elevation, or speed. Strava has more than 150 TB of stream data from over 1 billion activities. This dataset offers enormous potential to build amazing things such as Global Heatmap, Route Builder, Clusterer. However, until recently our streams have only been persisted in a legacy format intended for the normal production pattern of reading a single stream in a request.
There are two components of our historical stream storage schema that make rapid processing of streams difficult. Firstly, each stream is stored as a single file on S3 as a gzipped JSON array. At today’s scale, iterating through streams for all activities just once would cost $5000 for the S3 queries alone because there is a charge per single file get request. For this reason, it is generally not a good idea to store billions of tiny files on S3 if you want to frequently access them all. Extracting and deserializing the gzipped JSON format would also take around 1000 CPU hours.
The second technical problem that prohibits rapid processing is how activities are prefixed in S3. The prefix for an activity’s streams is the auto incrementing activity ID, a practice which AWS specifically recommends against. The data for successive keys (ordered lexicographically) are clustered on the same S3 shards during writes, so a sequential read of the data will not distribute the load well and will easily trigger S3 rate limits. This means that rapid sequential stream access is not possible in a reliable way, even if we are willing to pay the cost.
The infrastructure bottlenecks around bulk stream access have long been a major limitation in our ability to explore our data and develop new capabilities.
We needed a denormalized stream dataset optimized for rapid bulk access. We ending up choosing a Spark based “data lake” solution. With this solution, stream data is still stored in S3, but in a highly compressed format using Parquet and Snappy. Instead of storing stream data directly, Parquet will store data using a Delta Encoding which is fantastically efficient at compressing typical stream data. Using these methods we achieved a compression ratio of ~15x. The stream data is also grouped into chunks of around ~100 MB of data per file. Spark workers can now read vast amounts of stream data in a small number of S3 requests.
The costs of this approach are also relatively low. Since our data is stored on S3, rather than in an always-running database, we only commit to paying the relatively low passive cost of S3 storage. Additionally, we only have to pay for compute resources when they are needed — using Mesos and Spark we can use an arbitrary amount of compute resources as needed for jobs.
Strava’s ETL Job PR (Personal Record)?
To actually materialize our new dataset, a huge ETL job was written to execute one expensive read of the stream data, and write it into the new format. This job cost several thousand dollars to run, and needed to make nearly 10 billion requests with zero failures, thus requiring multiple error handling strategies:
- A wrapping streams service conducts internal S3 retries and timeouts, as well as detecting fatal errors, for example if stream data is corrupted or missing.
- In Spark, the Finagle client accessing the streams service has its own retry budget and timeouts, as well as a sophisticated backoff strategy.
- Each Spark task within a stage is retried multiple times, and these retries are typically scheduled on a different agent every time.
- The Spark driver blacklists machines if they are responsible for too many task failures (for example, a hardware/network failure).
- Coarse batching of the job, so that partial progress can be kept in the event of partial failure.
The job ran in less than 1 day, and caused our first ever AWS spending alert courtesy of cloudability. The final data is only 10 TB. We can typically see about 100 MB/sec read performance from S3 for each spark machine. A job doing a single pass of all stream data, parallelized across 30 machines, would take about an hour to complete.
Our infrastructure and data teams are rapidly gaining the Spark expertise needed to utilize this resource. I look forward to an internal renaissance of engineering efforts focused around activity streams data and innovation. We have also already begun work on updating the global heatmap using this dataset. A jam also used this data to generate an improved Grade Adjusted Pace (GAP) for runs. Look forward to more related posts on this subject!