Time/Space Sessions in Apache Beam

Davide Anastasia
7 min readSep 7, 2019

Sessions are bounded time intervals of activity, either from a human or a device.

Aggregation of granular events into sessions is not a new problem, and there are many ways to solve it, depending on the tools at your disposal. It is a task usually performed to unlock additional analytics (e.g. dwell time), or as summarization technique.

However, the same problem becomes more complex when we add a secondary dimension: location. In fact, many datasets nowadays contain position-related information (like GPS coordinates).

In this article I will be showing how to calculate time/space sessions in Apache Beam writing a custom window operator.

What’s Apache Beam?

Apache Beam is a library that unifies batch and streaming data processing into the same programming model. On its own, it cannot run any computation and relies on an underlying runner to perform the job. So far, the two runners with the best support are Google Dataflow (available on GCP) and Apache Flink (open source), but several others exist.

While coding classic mapreduce-style jobs in Beam is definitely possible (and I have shown in the past of to build an inverted index, for instance), Beam shines when we reason about time and process timestamped events. This is where we need different window strategies.

Window Strategies

There are three main window strategies in Beam:

  • fixed window
  • sliding window
  • session

These strategies are very well described in the Streaming 101 article, as well as in the book Streaming Systems. For the the large majority of the use cases, these strategies are enough and with little coding it is possible to build extremely complex processing pipelines.

Custom Window Strategy

On the other hand, as well as the built-in strategies, Beam allows to extend the framework and write additional custom strategies. All that is required is a new class derived from WindowFn.

This new class needs to implement two methods:

  • (mandatory) assign an element to a window, using a class derived from BoundedWindow (or the built-in IntervalWindow);
  • (optional) merge windows together.

The second method is particularly important for sessions handling, as it allows to work with dynamic windows, an incredible powerful feature of the framework.

Location Data

Location data is a complex variable to work with, being continuous in a bi-dimensional (or multi-dimensional) space. This particular aspect has implications in many areas: storage, sorting, indexing and search, just to name the most important.

The easiest way to solve this problem is to split the continuous 2D space into tiles, with techniques like Geohash, Google S2 or Uber H3 (to name some well-known ones).

As we have seen above, we know we can leverage Beam to subdivide the time into sessions, but working with location data poses a new challenge.

In fact, simply aggregating by time might lead to extremely large sessions with no spatial meaning anymore.

Let me try to give an intuitive explanation: let’s image a beacon in a taxi that activates at the beginning of a ride and stops at the end, sending a sample every 1 min. We could calculate a time-based session and stop the session when there is a gap between samples of — let’s say — at least 5 minutes: depending on the taxi ride, the session might cover a large area and it would be very hard, for instance, to count how many taxis were in a certain location at a certain time.

Hence, we need to assign to each event not only the time window it belongs to, but also a tile identifier.

From Theory to Practice!

Let’s try to put together all the pieces. The first piece is a class derived from BoundedWindow that will keep track of the window each record belongs to:

Basic structure of TiledIntervalWindow

hexAddr can be calculated using one of the libraries mentioned above.

Assign/Merge

The second piece of the puzzle is the class derived by WindowFn , and in particular its two methods. The first assigns a record to a specific TiledIntervalWindow :

WindowFn assignWindows function

In this function I used H3 to calculate the tile identifier, alongside the standard way to assign time boundaries.

The second method to implement is the one that performs the merge of the TiledIntervalWindow: the canonical calculation in Beam sorts all the active intervals by time and then recursively merges them if gap < minimum allowed gap with the previous one. However, this would not be the best strategy in our scenario, as we need to disallow merging tiles with a different tile identifier, as in the picture below.

Samples moving across space and time

To solve this problem we can choose between two strategies:

  • sort all the intervals by tile ID and time, essentially building sub-groups by tile ID:
Grouping events sorting by Tile ID and Time
  • sort all the intervals by time only, as in the standard algorithm, but stop merging if we cross the boundary of a tile:
Grouping events sorting by Time Only
Different comparators, using during sorting of TiledIntervalWindow

Both strategies are useful, depending on the type of data you are dealing with, but while the first can also be obtained by building standard time sessions over KV<(ID,TILE_ID), ...> , the second can only be obtained using a custom WindowFn .

The rest of the merging algorithm is straight out of the official Beam library:

WindowFn mergeWindows function

Testing

Unit test streaming pipelines in Beam is explained very well in the official documentation. You need a TestPipeline object, which acts as local runner during the execution of the unit test; then you need to define your input (timestamped data) and your expectations in terms of output at each moment in time (asserts). The code below is a small example of how a unit test might look like in this case.

Example of unit test for beam streaming pipeline

A simple usage example

The dataset I am using is the Microsoft T-Drive dataset: it contains 1 week of taxi routes in Beijing and it is an extremely interesting dataset if you want to experiment with location data.

After running a batch job to process the input granular data, it is possible to calculate the time spent for each taxi into a specific tile from the sessions. In fact, sessions are a very powerful way to calculate dwell time: in this case, a weight is calculated as proportion of the total ride time of that taxi over the considered time period and used as alpha factor in the picture above, hence highlighting the tiles with the higher usage.

Hex map of relative time spent in each tile

This algorithm can be used in a streaming job too and is CPU-friendly. Some potential applications are:

  • target a certain user when it has spent more then X seconds in a location;
  • Suggest places visited by users with a similar location pattern;
  • Real-time tracking of vehicles (standing for too long in the same place?) or people (where are my doctors inside an hospital? — interesting as it requires a 3D tiling system)

However, further work is necessary to avoid overly long sessions to generated an out-of-memory, as all the data for a sessions is held in memory before being flushed when the session expires, a well-known problem when working with session aggregation.

Final Remarks

Source code for this article is available here: https://github.com/davideanastasia/apache-beam-spatial

The project contains a working implementation of the WindowFn class, as well as unit tests and example of usage.

Alternatives

A more complex method has been presented at the Flink Forward San Francisco 2019 by Yelp:

The method presented performs on-the-fly clustering: the idea is extremely interesting, but I couldn’t find a similar way to achieve the same result in Apache Beam (Flink seems to have a more powerful API in this area?). Maybe an open door for one more article?

Another alternative is the method described by Uber on their tech blog: they use micro-batching over Spark Streaming to compute sessions. The article explains well how Spark Streaming is not a great platform for sessionization and out-of-order event handling.

The method I described in this article sits in between the two above: it is consistent and works even when the input data is sampled (missing samples or out-of-order data).

Resources

A bit of a personal touch…

“I wish I had known this earlier!”

Did ever happen to you to say that?

I worked on a sessionization algorithm a few years ago: with my knowledge at the time, I tackled the problem using secondary sorts in Spark (as described in this great book), which worked reasonably well, but it was hard to write and test for correctness, and worked only in batch mode (nearly impossible to extend for streaming).

I wish I had known Apache Beam at the time (albeit being a very early stage project at the time), it would have made my life so much simpler!

Thanks for reading my article. Any feedback is welcome! :-)

--

--

Davide Anastasia

Head of Data at Audigent. Interested in Data Science, Data Engineering and High Performance Computing