Published in


Generating Session ids for user event ids — Part 1

I have been looking at all kinds of interesting use cases for data processing for events. Then recently came upon this interesting problem:

Lets say you have a series of events emitted as user clicks from a web application. Simplistically the event object would look like this:

"userid": "u1",
"emit-time": "2018-01-22 13:04:35",
"action": "get",
"path": "/orders"

Basically this event represents a user clicking on a link to see their orders. Lets assume the user interacts with the web service on a continual basis and we get an event stream which records his/her activity. Typically if the user continues to interact with the web service, the user session will continue to be extended. Lets say the user session expires if there is no user interaction beyond 10 hours (session timeout). If the user resumes activity after the session has timed out, a new session is created for the user. Our aim is to annotate the events data with such session data. The logic would be to label each group of user activity that occurred within 10 hours of each other with a unique session id. This then allows us to extract more useful info about the user’s usage trends. For example, what is length of an average user session?

In this post, I will concern myself less with post session tagging analytics, rather the focus will be on the issue of how to tag the user activity with session ids. On an immediate basis, what intrigued me was the fact that we need to track the state when performing this operation, i.e. if there is an user activity which has occurred within the session timeout time of the previous user activity, then it will inherit the session id from that previous event. The challenge is that the various transformations you can do in the dataframe are typically stateless. It as clear that a UDF would be needed. But how does one capture the state in a UDF? Spark provides no mechanism to have common shared mutable state.

  • Broadcast variables allow only reading the same value on all workers and no updates after it has been broadcast to all the worker nodes. This won’t work for us, since we need to somehow hold on to the session id and timestamp values from the previous Row when processing the current Row. Since timestamp is mutable, we can’t use a broadcast var for sharing the timestamp value since we need to update this as every Row is processed
  • Accumulators have the reverse drawback. They can be incremented by the worker nodes, but their value cannot be read from there. Their value can only be read from the driver node. One more issue, is that they only support a limited set of types, such as longAccumulator and doubleAccumulator. There is no StringAccumulator for example. Besides an accumulator is not the appropriate design pattern for our problem.
  • Next lets look at closures in spark. Although variables on the driver can be transmitted to the worker nodes through closure, these are essentially copies of that variable value on each of the worker nodes. Though no errors would be thrown if such variables were updated in the worker, the end result would be inconsistent since the updates on a worker are not seen on any other worker or driver node.
  • Using Apache Ignite seems to be a interesting option to enable truly share state, but I haven’t had a chance to investigate the pros and cons of this approach. More on that in the future.

Next I began search on how I can write stateful UDFs. I found this one very interesting approach to tagging pairs of user activities on StackOverflow. Checkout this nice answer on that question. Following is the relevant snippets from that SO post. The events would look like:

u1,0,2018-01-22 13:04:32
u2,0,2018-01-22 13:04:35
u2,1,2018-01-25 18:55:08
u3,0,2018-01-25 18:56:17
u1,1,2018-01-25 20:51:43
u2,0,2018-01-31 07:48:43
u3,1,2018-01-31 07:48:48
u1,0,2018-02-02 09:40:58
u2,1,2018-02-02 09:41:01
u1,1,2018-02-05 14:03:27

Basically each session would be signified with a single pair of events. The entry event would have a start_or_stop value of 0 and whereas the exit event would have a value of 1. One can quickly build out the strategy to follow. For a given dataframe:

  • groupBy userid
  • order by timestamp
  • choose a window range which includes the pair of start and stop events.

Final script looks like this (from SO post)

val wSpec = Window.partitionBy('UniqueID).orderBy('timestamp).rowsBetween(0, 1)
df.withColumn("Last", last('timestamp) over wSpec). //add a new col having stop time
where("start_or_stop = 0"). //Just need the alternate rows
drop("start_or_stop"). //Drop column
withColumnRenamed("timestamp", "start_time"). //Rename to start
withColumnRenamed("Last", "stop_time"). //Rename to stop

There are some nuances here which can make the transient data have incorrect values. For example when we chose the last('timestamp) the exit event will have a value of stop_time coming from the the next row for that userid. However this does not affect the final answer since we filter out the exit events anyway (where("start_or_stop = 0")).One could then use this to generate the session timestamp by hashing the userd+lasttimestamp combo to effectively tag an entry and exit pair as a session. However the issue is that this code does not understand the notion of session timeout when processing. If we assume that a session times out in one hour and the entry event happens at 10:01 am , whereas the exit event happens at 11:30 am, then it will still be tracked as the same session.

Whew that’s quite a spiel for now! In Part 2 we will examine more robust solution to session tracking.

Till later…



Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store