Dealing with Data Skew in Flink
Introduction
Processing large amounts of data comes with a series of challenges and trade-offs that we must live with. At Sitecore Data Engineering we have encountered and dealt with several of them while writing our own analytics ETL solution with Flink.
However, we have found one that seems to be little talked about and even had some issues finding enough resources and information on how to resolve it: data skew.
That is why I think it’s a good idea to share our journey in dealing with this issue in Flink.
Background
As Sitecore’s Analytics Team we’ve been tasked with providing real-time reporting on performance of customers’ site integrations, right down to the page level.
Our CDP (customer data platform) already captures clickstream data on visitor interactions with the customers’ sites, things like individual page view events etc. The platform sessionizes that data and attributes it to visitor profiles. This enriched event stream is then available via an internal Kafka topic. This topic on average streams 3000 (with spikes of up to 5000) of these events every second.
We were tasked with providing real time updates on metrics such as:
- The number of guests currently active on the site
- The most viewed pages
- The average duration of a visit to the site (session duration)
The product requirements we were given were:
- Produce minute-level granularity timeseries metrics for queries like the above
- Allow the end user to see and select aggregates of these metrics by site, and variable time windows.
To accomplish this, we chose AWS’s Kinesis Data Analytics managed Flink cluster as our stream processing engine. We decided to window and aggregate the metrics within our Flink job and store the aggregated minute-level granularity results to a small Postgres DB.
The topic of this post is centered around that Flink job. We found that, in this incoming stream of data, there is an unavoidable data skewness that affected the performance of the job.
What is Data Skew?
Before plunging into the ins and outs of our solution, we should properly outline the context and issue at hand.
Data skew refers to an unbalanced distribution of a dataset. This imbalance is usually observed through the lens of a specific metric or field. We could say a dataset of population in a country is skewed when grouped by population centres (assuming way more people live in a few big cities while the rest of places will have fewer people in them).
This, in and of itself, is not a bad thing. Most datasets have an inherent skew that is unavoidable (most people do actually live in big cities after all). The problem comes when treating this data in a parallelized manner, be it in an ETL processing application or the DB itself. If not managed properly, we can end up with a few nodes processing the overwhelming majority of the data while the rest are kept idle with little to process.
In our case, the Flink job that we use to generate and store these per-minute aggregations faced this exact problem. One field that we aggregate on was heavily skewed and it was hurting the performance when trying to key by that field.
Initial Implementation
For the sake of explaining our experience with this issue, we can take a look at an online shop, very close to our own real use case:
Imagine this big online clothing shop. They are a big retailer, so they produce all sorts of clothing and apparel with different styles. In order to cater to each specific market, they have built a bunch of different fronts with their own branding and websites.
All the events, orders and general data traffic is then forwarded to a central Kafka topic. The company can then consume from that topic and build upon it all sorts of metrics and processes.
For our example you could expect a message format along these lines:
{
“typeOfEntity”: “EVENT”,
“typeOfEvent”: “VIEW”,
“site”: “mainstream.com”,
“page”: “/homepage”,
“guestID”: “<<UUID>>”,
“sessionID”: “<<UUID>>”,
“timestamp”: “2022–10–17 12:57:20”
}
Expecting these kinds of messages, we wanted to simply calculate how many views each site is getting each minute, in real time. With this goal in mind, we built our ETL pipeline using Flink and modeled it in this way:
val windowResult = env.addSource(source)
.name(“Messages”)
.map(Message.fromLine(_))
.filter(m => m.typeOfEntity == “EVENT” && m.typeOfEvent == “VIEW”)
.name(“FilteredEvents”)
.keyingBy(_.site)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new AggregateViews)
.name(“ViewEventsAggregation”)
The important part here is that “keyingBy” using the events’ site field. As we can see when we run the pipeline in a Flink cluster, the messages are being sent to subtasks in a skewed way (following the amount of events each site has), having most of the messages coming from the bigger sites in just a few subtasks while the rest are kept idle.
In this case, we ran a test with 5M records and almost 4M went to the same subtask.
To solve this issue, we first tried to rebalance using weights.
Using static and/or dynamic weights
One way of trying to rebalance the processing of the messages is to factor in the different amounts of messages that we can expect relative to each site. These are, in essence, weights.
{
“mainstream.com”: 500,
“niche1.com”: 20,
“niche2.com”: 15,
“niche3.com”: 12
}
Knowing more or less how popular each site is, we can build a map specifying these weights that we assign to each site and then use this map to rebalance the aggregation.
The specific way we used this map was by generating a salt (basically a semi-random hash) that incorporated these weights and concatenating the site with that salt to use as a new keyingBy value.
In this way, we split the huge site/s into smaller chunks that are then processed more evenly.
This would work sufficiently well, but then imagine that these sites are localized in different parts of the world. Now, depending on the time of day some sites might be more active than others and change in a fairly unpredictable manner that our static weights cannot account for.
To that end, we proposed using dynamic weights. This basically entails storing the relative amount of events that we received in the past few minutes from each site and then using that information to generate a new set of weights every X minutes.
In the end we scrapped this solution because the additional overhead that it introduced with an additional job (for calculating the weights) and Kafka stream was deemed too much in the face of a simpler and more elegant solution that we ended up going with.
Map-Reduce Combiner Pattern
One way to mitigate the issue in a simpler way is to reduce the number of records that we send to the windowing function. This is usually done by performing a pre-aggregation, merging some amount of the records into one so that the end result is less spread out and faster to process.
This is an already well-known pattern and used in some Map-Reduce processes. It is known as the Map-Reduce Combiner pattern, and it is nicely explained in more detail here.
In our case, we’d like to bundle the events that are coming in into smaller chunks of accumulations (count of views) and then send those bundles to the final aggregation step.
We would be moving from a model like this:
To something like this:
Looking at the diagrams above you can see that the number of records (the number inside the boxes) is much smaller and better distributed when we reach the final aggregation.
The most direct and intuitive way we tried to go about doing this was writing a simple flat map function as pre-aggregation.
However, the flat map has no awareness of the watermarking process and by the time the pre-aggregation was finished some of the records were outside the watermarking window and counted as late events. To solve this issue we struggled trying to use timers and pondered writing our own Operator that would be aware of the watermarking process; until we found an even better solution.
We happened to come across Felipe Gutierrez’s article where he details his experience with a similar issue and how he ended up writing his own Flink operator to implement a Map-Reduce Combiner pattern. As it turns out, Flink 1.13 (latest version in KDA as of Oct 2022) does not provide an operator out of the box that allows us to use this pattern in our apps.
Because of this, we considered writing our own operator as well, but in the end, we found something that was already done and that basically did all that for us.
Bundle Operator
And this solution we are talking about is the Bundle operator, a part of Flink’s 1.15 release.
Unfortunately for us, as of October 2022, Kinesis is not supporting that version of Flink yet, so we are stuck with 1.13 for now. We had to either go back to implementing it ourselves or… use Blink!
Blink is a fork of Flink, made by the engineers at Alibaba. Initially made to accommodate their needs, Blink is now an open-source branch readily accessible, and even better : They implemented a version of this Bundle Operator.
As the name of the class implies, it allows us to bundle records together during the windowing process and flush those bundles as they reach either the record limit or the windowing time (both of which we can customize).
In our specific case we didn’t want to completely switch to Blink, so we just added a dependency that enabled us to use the BundleOperator while still using the main Flink branch for the rest.
The way the BundleOperator works is by accumulating values (in our case, event views) into a single record (per site) until the closing condition for the bundle is met, normally that a certain number of records have been bundled. Then, this bundle is sent to the second aggregation which calculates the final numbers and sends the result to the sink (our PostgreSQL DB).
Our Solution
Now all we had to do was to write a function that implemented this operator and add a new step in our pipeline that pre-aggregates the records using this method.
val windowResult = env.addSource(source)
.name("Messages")
.map(Message.fromLine(_))
.filter(m => m.typeOfEntity == “EVENT” && m.typeOfEvent == “VIEW”)
.name("FilteredEvents")
.transform (
"PreAggregation",
new MapBundleOperator[(String, Instant), Int, Message, Message] (
new AggregateViewsBundleFunction,
new CountBundleTrigger(100),
new KeySelector[Message,(String, Instant)] {
override def getKey(in: Message) = (in.site, in.timestamp)
}
)
)
.keyingBy(_.site)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new AggregateViews)
.name("ViewEventsAggregation")
As you can see, the main difference now is that we have this PreAgregation step that uses the MapBundleOperator and a MapBundleFunction:
class AggregateViewsBundleFunction extends MapBundleFunction[(String, Instant), Int, Message, Message] { override def addInput(value: Int, input: Message): Int = value + 1 override def finishBundle (
buffer: java.util.Map[(String, Instant), Int],
out: Collector[Message]
): Unit = {
val outputValues = buffer.asScala.map(Message.fromMap _)
.toSeq.sortBy(_.timestamp.toEpochMilli)(Ordering[Long].reverse)
outputValues.foreach(out.collect)
}
}
The record limit that we set (that 100 in the CounterTrigger) is one that worked for us.
In low-throughput situations this record limit for the bundles will be mostly ignored, as the watermarking window (200ms by default) will almost always close first and force a flush of the bundle anyway.
Both of those parameters (bundle limit and watermarking window) can be fine-tuned to accommodate your usual data throughput and watermarking needs and it ends up being a matter of fine-tuning.
Results and final thoughts
Going back to the Flink cluster we can compare the distribution of the workload among the subtasks and see how it has changed. Now we have way fewer records and they are, if not perfectly, noticeably better distributed.
Not only that, but this improves the performance (more than double for this simple example) and allows us to manage this inherent skew in our dataset without compromising our processing speed.
Going from here, we could fine tune the record limit for the bundles and play around with the watermarking window length to get an even better improvement on the distribution and performance.
The biggest improvement for us, though, was the flexibility that it gave us on accommodating huge, unexpected spikes of skewed throughput (like a big sale on one site, for example). Before applying these solutions, we would lag behind whenever these spikes or deviations from the usual flux happened.
There are more challenges that we faced involving things like serialization, but that might be a story for another time :)
In conclusion, it’s always hard to generalize but some amount of skew is always going to be present in most datasets. A Map-Reduce Combiner pattern implementation allowed us to mitigate this issue in our use case though finding a way to make it work in Flink proved to be a challenge. Fortunately, the BundleOperator was an optimal solution in that regard.
Hopefully this explanation has saved you some time and headaches if or when you are faced with a similar data skew problem in your own datasets.
Additionally, if you want to try for yourself and play with the example I provided, you can find all the code used in this repo.
Brought by the engineering team at Sitecore, including Alberto Lago, Eoin Gillen, Marcelo Moreira and myself, Santiago Ruiz