Finding Common User Journeys with Apache Spark

Understanding the behaviour of your users as they navigate and interact with your websites and applications is one of the key sources of insight for improving your products and your business model. One particular metric that can be especially useful is understanding common journeys — helping you to understand user intent.



Google analytics provides the “Behavior flow” report, shown below, which provides detailed information about common user journeys. But if you don’t want to use google analytics — perhaps you want to create more advanced or customised behaviour flow tools (or you want to keep all your data private in your own datastores) then you can easily recreate this functionality using Apache Spark regardless of how big your dataset is.

Google Analytics’ behavior flow tab

This blog post should be accessible to any developer, though the examples are in Scala. By the end you will see how to create your own behaviour flow functionality, and you’ll also see why Apache Spark is becoming massively-popular — because it is powerful, easy to use, and fast.

Context — Sample Data

For this post I’ll be storing some logs in a text file, though Apache Spark is also able to directly query Amazon S3 (even gzip compressed files). After querying the logs I’ll use Spark to find the most common journeys and print them to the console.

Each log in the text file is a JSON object of the following format:

{
"event": "pageview",
"url": "http://www.ntcoding.co.uk",
"sessionId": "session1",
"timestamp": "2015-01-07T05:07:40+00:00"
}

In a real application you’ll have more fields than this. But for this example, only these 4 are needed. The event field is used to identify page view events. The url is used to work out what page the user viewed. The sessionId is necessary to group each users page views, and the timestamp is crucial so that each session can be chronologically sorted.



The actual log files contain a single event per-line. The document itself is not 1 big JSON array — it’s one JSON event per-line, not comma-separated, as shown below. As you’ll see shortly, this is important because Spark splits files up by line.

{ "event": "pageview", "url": "...", "sessionId": "...", "timestamp": "..." }
{ "event": "pageview", "url": "...", "sessionId": "...", "timestamp": "..." }
{ "event": "pageview", "url": "...", "sessionId": "...", "timestamp": "..." }

Setting up a Spark Context

To use Apache Spark you need an instance running. When processing giant datasets you will probably want to spin up a cluster. But in this short example, an embedded instance of Spark will be used.



SparkContext is the fundamental abstraction for connecting to an instance of Apache Spark. In the sample code below, a SparkContext is being created that starts up an embedded instance of Apache Spark.

object Demo {def main(args: Array[String]) {
val spark = createEmbeddedSparkInstance()
...
}
private def createEmbeddedSparkInstance() = {
// see spark documentation: https://spark.apache.org/docs/1.1.1/configuration.html
val conf = new SparkConf()
.set("spark.storage.memoryFraction", "0.1")
.set("spark.executor.memory", "3g")
new SparkContext(master = "local", appName = "common_user_journey_example", conf)
}
}


To follow along you’ll need to add a reference to Apache Spark in your project. Adding the following line to your build.sbt file will do the trick (you can change the version if you want to use newer or later versions of Spark).
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.0"Once Spark is running, you can excitedly start crunching the data into common user journeys as shown next.

Calculating Common User Journeys

To calculate common user journeys, the following technique will be used. Note that to keep this example simple, only the first 3 page views in each user session will be used (like the google analytics behaviour flow).
  1. filter out all non-page view events (to reduce the size of the working set)
  2. group the page views into individual user sessions ordered chronologically (so each user session is ordered)
  3. select the first 3 page views for each user session and store as a comma-delimited string
  4. group the values by the comma-delimited url string (representing a unique journey) along with the number of occurrences
You can see how this is translated into code in the following snippet:def main(args: Array[String]) {
val spark = createEmbeddedSparkInstance()

// could be an Amazon S3 path e.g: s3n://user:pwd@host/bucket/logs.log.gz
val testDataFilePath = this.getClass.getResource("sample_events.log").getPath
// reads each line from the log file into a separate string
// RDD is the Spark abstraction
val logs: RDD[String] = spark.textFile(testDataFilePath)
// reduce the working set first to improve performance
val pageViews: RDD[Event] = logs.filter(_.contains("pageview"))
.map(Json.parse[Event])
.filter(_.event == "pageview")
val userSessions = pageViews.groupBy(_.sessionId)
.filter(_._2.size > 0) // session must have 1 page view
val sortedSessions = sortEachSessionChronologically(userSessions)// the "1" is used to count the totals (see reduceByKey in the next step)
val journeys: RDD[(String, Int)] = sortedSessions.map { s =>
(s._2.map(_.url.get).mkString(","), 1)
}
// use second item in journey tuples to count occurences of that journey
val journeysWithTotals = journeys.reduceByKey((count1, count2) => count1 + count2)
.sortBy(-_._2) // sort by count descending
printToConsole(journeysWithTotals.take(10))

spark.stop()
}




Near the top of this code snippet is the "logs" variable. This is a collection of strings - 1 for each line in the file. RDD is the spark abstraction for representing distributed computations.



Below that is the "pageViews" variable. This contains a collection of Event objects that each corresponds to a line in the text file. You'll notice a filter before and after the Event is parsed from the JSON string. This is because the initial filter removes any events that definitely aren't pageviews to avoid the unnecessary overhead of parsing them as JSON. But it may miss some false positives.



The final filter is a 100% guarantee to remove any non-page view events that may have crept through.



After acquiring just the page view events, they are then grouped into user sessions using Spark's groupBy() which mirrors Scala's groupBy(). The sessions are then sorted chronologically so that the user journeys can be reconstructed (they are just sorted based on timestamp using joda time. You can see the code on github).



Having sorted the sessions, each user journey is then extracted from each session into the "journeys" variable. Each journey is a tuple of type (String, Int). The string contains the user journey as a comma-separate list of URLs and the INT is just the integer 1 which will make sense shortly.



At this point all of the hard work is almost done. The journeys just need to grouped into distinct groups. And then the number of journeys in each group will be the number of occurrences which will show you the most common user journeys.



Spark's reduceByKey() is perfect for this. It groups the collection using the first element in the tuple, and takes a lambda that is used to combine the second elements in the tuple. In this case, the lambda is used to sum all of those 1s together - resulting in the count for each group.
Testing it OutIf you then fire up SBT (Scala’s build tool) and run the sample application, you’ll see output similar to the snippet below, depending on the sample data set you use. My tiny sample dataset is on github along with the rest of the code for this blog post.3 - www.ntcoding.co.uk,www.ntcoding.co.uk/bookReviews,www.ntcoding.co.uk/bookReview?title=lean-analytics
2 - www.ntcoding.co.uk/bookReview?title=the-lean-startup,www.ntcoding.co.uk/bookReview?title=business-model-generation,www.ntcoding.co.uk/bookReview?title=impact-mapping
2 - www.ntcoding.co.uk,www.ntcoding.co.uk/bookReviews,www.ntcoding.co.uk/bookReview?title=the-lean-startup
From this output you can see that the most common journey is for users to hit my home page, navigate to my book reviews page, and then read my book review of lean analytics.



Now you've sampled the basics you can go ahead and perform more complicated analytical processes with the delightful Apache Spark.

--

--

Nick Tune
Strategy, Architecture, Continuous Delivery, and DDD

Principal Consultant @ Empathy Software and author of Architecture Modernization (Manning)