User Session Analysis with Flink

Shreyans
Airtel Digital
Published in
7 min readMar 11, 2022
Image by Konstantin Dyadyun on Unsplash. Listen to the song here on Wynk

Wynk Music is India’s most loved music streaming app with listeners all across the country. We at Wynk wanted to study the listening patterns of our users and draw actionable insights to improve their overall user experience. We wanted to understand how a user behaves in each of their music listening “sessions”: What factors affect this behavior? How does a new product/feature impact session time? What are the main reasons for sessions to end? How many sessions are being created/closed at any given time?

We define a session as a period of continuous music listening with only small breaks in between:

More formally,

  • Two sessions are separated by a gap of inactivity of at least 30 minutes
  • A session is longer than 30 seconds. Any time shorter than that cannot be reasonably considered intentional on the user’s part.
  • The end of the day does not affect the definition of a session.
  • App relaunching because of crashes or uninstalling opens a new session.

But how do we measure user activity? Essentially, what we need are markers that can track and convey intent on our user’s behalf. We already have a sturdy app analytics system in place, so why reinvent the wheel? We leverage this system which gives us access to more than 150+ event markers and also serves as a starting point for us to define our application requirements.

System Requirements

  • Our system should be able to process ~2TB of event logs per day while giving near real-time results.
  • It should be robust. Wynk has multiple client versions across Android, iOS, and Web. Each of these clients sends a dynamic payload with each event log. The system should be able to handle any such input thrown at it.
  • It should be easily recoverable. In case of any component failure, we should be able to get the system back up and running without any data loss.
  • It should give near accurate results. We have no control over when a client will send event logs. All of our client implementations buffer events for a brief period before pushing them to reduce the number of API calls to our backend system. Consider a scenario where a client goes offline before it sends a batch of event logs. This batch of events will arrive for processing whenever the client comes back online.. which might be (realistically) many hours later and in the worst cases, days later. Building a completely accurate system in such a scenario would require immense resources. Hence as a trade-off, we expect the system to give us near accurate results.

A stateful stream processing application would be able to do this job while simultaneously giving us near real-time results. We decided to use Flink to build our streaming application due to its support for native stream processing, checkpointing, handling of back-pressure, and ability to produce various side output streams.

Architecture

All Wynk clients ship event logs to a backend service through an API. This service runs a Logstash shipper that forwards these events to a Kafka cluster. A Flink job reads these events from Kafka and assigns a session ID to each event.

Let’s take a closer look at the Flink Job. We’ve split our job into three primary operators. The first operator maintains structural integrity of the data; The second handles all operations in the time domain; The last operation is where the crux of the business logic lies. It windows events into temporal cohorts or “sessions”. This is what the whole pipeline translates to with Flink’s DataStream API.

val kafkaConsumer: KafkaSource[Event]val source: DataStream[Event] = env.fromSource(kafkaConsumer,
WatermarkStrategy.noWatermarks(), "Events")
source
.process(new EventTransformationProcess)
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy)
.keyBy(EventKeySelector)
.process(new SimpleEventTimeWrangler)
.keyBy(EventKeySelector)
.process(new Sessionizer)

Tips:

  1. It is important to partition the stream by a logical key since Flink only allows access to states on a KeyedStream. It ensures that the data corresponding to a particular key is always processed in the same subtask.
  2. The KeyBy operator also introduces additional network buffers between consecutive subtasks. This leads to more relaxed handling of back-pressure overflows from downstream subtasks.
  3. In case of uneven data load on operators
    - Ensure that the logical key used for partitioning is not causing data skewness. Usually, high cardinality keys like userID or deviceID are a fitting choice as a partition key.
    - Ensure the maxParallelism value is a multiple of parallelism. For entirely even distribution,
no of key groups (maxParallelism) = k * num operators (parallelism)

Transformation Operator

The first step of the job is to filter and clean the data and make it fit for further processing. Here we fix structural errors, handle missing values and perform basic data validations. It is critical to sanitize the input data as early as possible to avoid unnecessary computation overhead in the downstream operators.

Wrangler Operator

In this process, we assign timestamps and watermarks to the input data. (What are watermarks?)

Within any stream processing system, there are primarily two-time domains involved. Processing time and event time.

  • Processing time: The time when the event is processed by the application.
  • Event time: The time when the event was generated.

In a perfect scenario, the difference between both these domains would be constant. But various factors such as network delays, system failure induce a dynamic skew between them. Also in a distributed system, events can arrive out of order, so a system has to incur extra latency waiting for these events which further impacts the processing time.

We take a 2-fold approach in dealing with out-of-order / late events.

  1. Firstly, we weed out events where the difference between the event time and current watermark is larger than a certain time delta.
  2. We also maintain a state of previous sessions windows for events arriving late but within the time delta. We then assign it to a window with the appropriate time bounds.

Session Operator:

Flink has a window API to aggregate data into sessions but it trades abstraction for expressiveness. We decided to implement a session window operator using a KeyedProcessFunction. This is what the internal operator states are:

var timerState: ValueState[Long] = _
var sessionState: ValueState[GenericRecord] = _
var closedSessions: ListState[GenericRecord] = _
val STATE_SERIALIZER: AvroSerializer[GenericRecord] = _val MAX_SESSION_GAP: Long = Time.minutes(30).toMilliseconds

We maintain three states for each keyed event; the timer state maintains the last seen event time for that key. We use it to check if the current event is out of session bounds or not;

eventTimestamp — timerState.value().get > MAX_SESSION_GAP

sessionState holds the current session object; We also maintain a list of closed sessions, with each closed session having a TTL equal to the maximum event time lateness tolerated.

Stale states are also periodically cleaned up by registering a processing time timer which triggers the onTimer method exposed by a KeyedProcessFunction.

context.timerService().registerProcessingTimeTimer(currentTime + CLEAN_UP_INTERVAL)

Output Sinks:

We output session enriched events into two primary sinks; A real-time Kafka sink and a Parquet sink. We also collect relevant metadata such as session data, error messages, out of order events in various side outputs.

State Backend

Flink maintains a snapshot of the application state to guard against data loss and allow consistent recovery after job failures.

Flink offers two state backend configurations out of the box:

  • HashMapStateBackend — This stores data as Java objects in memory. The state size is restricted by the total memory available in the cluster.
  • EmbeddedRocksDBStateBackend — This stores data as serialized byte arrays in a RocksDB database. The state size in this case is limited by the total disk space available.

Performance-wise a HashMap state backend is faster than its RocksDB counterpart since each state access in the latter requires deserialization/serialization to read/store data from disk. However, HashMapStateBackend is limited by the total memory available. This has a performance impact particularly during checkpointing when the whole application state needs to be aggregated on the job manager before being persisted to a file system.

Owing to a large order of keys in our stream (order ~10⁷) and a support for incremental snapshots, we used EmbeddedRocksDBStateBackend as our state backend.

We also wanted to ensure schemas of all objects are fully evolvable. So before serializing objects to our state backend, we first convert them to an Avro generic record.

Deployment and Monitoring:

We deployed Flink as a yarn application on an AWS EMR cluster. We are also experimenting with deploying a Flink session cluster on Kubernetes.

For monitoring, we export rich in-built metrics exposed by Flink to a Prometheus service which is integrated with a Grafana dashboard.

Conclusion

With this system up and running, we’re now working towards unlocking some powerful features to drive up user engagement. We also found some interesting preliminary insights. For instance,

  1. The average session length ranges from 20 to 25 minutes.
  2. Users listens to ~6–8 songs in a session on an average.
  3. At peak hours, we have more than a million concurrent active sessions.
Number of songs listened to per session in a day
Number of active sessions at every hour of the day (Scale 1 unit: 1K sessions)

As a junior dev, someone who was completely new to the world of stream processing and Flink, this project was a daunting but rewarding experience. I’ve learned some important lessons in system design along the way. A huge thanks to Utkarsh Sopan for being an amazing mentor and constantly encouraging me to step out of my comfort zone.

PS: Stay tuned for part II, where we discuss how we tackled the problem of supporting evolving schemas in our parquet data sink with Avro.

--

--