Practical Tips for Dask, vol2: Partition Maps

Vojtech Tuma
Inside Amp X

--

Practical Tips for Dask is a series of short blog posts accompanied with code snippets or notebooks, each focusing on a particular data engineering problem and its solution in Dask.

Throughout the series, a common example of aggregating sensor measurements data will be used.

  1. Schema transforms and parquet engines.
  2. Mapping partitions vs partition-wide groupby (this post).
  3. Repartitioning with a limited fan-in (in progress).
  4. … and more TBD.

In this, volume I would like to demonstrate here how we managed to avoid an expensive groupby by just mapping partitions while solving a real problem with Dask.

Problem Statement

As in the previous post, we are analysing data from about five hundred sensors that provide measurements about once a minute. The required metrics to produce are uptimes and day-to-day differences in measured values. The data occupies about 4GB when stored in a snappy-compressed parquet. We had multiple files per day with sizes about 100MB — when read by Dask, those correspond to individual partitions, and are pretty right-sized (that is, uncompressed memory of the worker when working with those was between 50–80%).

In the previous post, we dealt with the data ingestion and initial transformation of the schema. In here, we will tackle the aggregation itself — essentially, we would like two things: for every day, compute the difference between the first and last measured value by the sensor, to obtain the delta; and calculate how many measurements did not arrive — that is, during which minutes the sensor failed to communicate. Both of these per day and sensor.

Solution

If the data was small, we would just read it in Pandas and run a groupby. However, the original raw data don’t fit, so we either have to aggregate in Dask itself, or reduce the data size first somehow, before descending to Pandas.

A naive approach starts by daskDf.groupby(["sensorId", "day"]).agg(...). However, the problem is that this causes a wide shuffle. If there is no shuffle, Dask has each of its workers process partitions (at the start, the input parquet files) sequentially, discarding all intermediate results and keeping only the final outcome. This is quite efficient as you need much smaller memory capacity. But if you do a groupby out of the blue, you essentially require Dask to materialize all the data at once — which may cause memory errors, too much disk spilling, and overall slowness.

Dask best practice says “prefer groupby on indexes only” — why does it matter? If the column you groupby on is indexed, Dask knows that every resulting group of a groupby requires just a limited number of input partitions — and thus can run the whole operation without materialising the whole dataset first. Can we do that here? Our data is stored in partitions that correspond to dates — so if we were grouping by date only, we could explicitly tell Dask which boundaries our partitions correspond to. However, we also group by the sensor id, and those are spread across files randomly.

A solution is at hand by going down to Pandas with map_partitions again. For every partition, we run an intermediate aggregation that calculates for every sensor id how many measurements there were, and what was the delta of the sensor’s measurements. The dataframe that results from this aggregation is much smaller in size, and thus can be materialised to Pandas — and we can then run a standard groupby on it. We just need to be careful when combining the results correctly — sum the measurements counts (and then divide by the expected count, 24*60, to get the percentage uptime), and compute the delta by taking the first and the last intermediate aggregation results.

Note that this rests on two assumptions. The first is what can be called functoriality or commutativity of the aggregation. A function F has this property with respect to some combinator +, if F(input1) + F(input2) == F(input1+input2), which is just symbolic way of saying that a result can be computed by dividing the problem into two, computing subresults, and then combining. For example, computing percentage uptime is not functorial — if I tell you that in some part of the day the uptime was 70%, and in another, 85%, you cannot compute the uptime for a day. But, if I instead give you just counts of events in those parts, you can then combine by summing those numbers — and divide by the expected number only at the very end. A similar example is that computing the average of a list is not functorial, whereas outputting the sum of the list and its number is. A notorious example that cannot be essentially made functorial is counting distinct IDs across events — you basically need to output all the distinct IDs at every stage, leading to large intermediate datasets (or resort to hyperloglog and pay with lesser accuracy).

A second assumption is that the input partitions are not overlapping — that is, there are not duplicated measurements that would be spread across multiple input partitions. The proper way here would be to merge input partitions that are possibly containing duplicates, and de-duplicate in the intermediate aggregation step. This is of course possible only if those merged partitions are not too large. We show in the next post how to do this in a slightly more general way.

Conclusion

Instead of groupby on a Dask dataframe on un-indexed columns, consider instead one of:

  • Set index explicitly if it corresponds to how input data came to be.
  • Compute an intermediate aggregation to reduce the size, and then descend to Pandas.

A shuffle of data on Dask is a major source of degraded performance, and requires more careful memory/IO management, so it is really worthwhile to do some more thinking and coding to avoid it.

--

--

Vojtech Tuma
Inside Amp X

#books - #running - #pullups - #boardGames - #dataScience - #programming - #trolling - #etc