Thinking In Big Data at Rapido — Cohort KPI

Vikash Singh
Rapido Labs
Published in
5 min readJul 14, 2020

This post is a continuation of my earlier post, in which we have built a basic understanding of the cohort and how it can be used for analyzing customer engagement with the product. In this post, we are going to see how we extract the KPIs for our customer cohort.

In the transportation and mobility sector, data and independent research are at the core of solving potential problems. Rapido aims to find core solutions to these problems.

Customer or rider cohort KPI is an important dataset to analyze customer/rider engagement with the product. It gives us visibility on the usage pattern as well as on the parameters like acquisition, retention, loyal users, etc.

The above dataset represents customer cohort KPIs and it has variations of time level(weekly, monthly), geo_level(city, country).

Let me deep-dive into how we implemented the Cohorts KPefficiently across a year of raw data using Apache Spark.

We used YAML to define KPIs of customer cohort:

A sample YAML code

We implemented a YAML parser to translate the KPIs defined in the above file to the following:

selectColumns: _id,customer,createdon,status,service,subtotal,discount,rider,serviceobj_city,createdts,time_level,time_value,geo_level,geo_valuegroupColumns:
customer,time_level,geo_level,geo_value,time_value
columns:
count(CASE WHEN ((service IN (Link, Lite)) AND (status = dropped)) THEN _id END) AS `net_rides`
bround(sum(CASE WHEN ((service IN (Link, Lite)) AND (status = dropped)) THEN amount ELSE 0 END), 2) AS `net_gmv`
bround(sum(CASE WHEN ((service IN (Link, Lite)) AND (status = dropped)) THEN discount ELSE 0 END), 2) AS `discount`

Now, let’s take a step-by-step look at how we transformed raw data into the Cohort KPIs

val dataFrame = Seq(
("_id1", "cus1", "rider", "2020-03-01", "dropped", "Bangalore", 4.06),
("_id2", "cus1", "rider", "2020-02-04", "dropped", "Bangalore", 4.06),
("_id3", "cus1", "rider", "2020-01-05", "dropped", "Bangalore", 4.06),
("_id4", "cus1", "rider", "2020-01-10", "dropped", "Bangalore", 4.06),
("_id6", "cus3", "rider", "2020-03-10", "dropped", "Bangalore", 4.06),
("_id7", "cus1", "rider", "2020-02-05", "dropped", "Bangalore", 4.06),
("_id8", "cus1", "rider", "2020-02-08", "dropped", "Bangalore", 4.06),
("_id11", "cus7", "rider", "2020-01-15", "dropped", "Bangalore", 4.06),
("_id12", "cus7", "rider", "2020-03-12", "dropped", "Bangalore", 4.06),
("_id13", "cus8", "rider", "2020-01-12", "dropped", "Bangalore", 4.06)
).toDF("_id", "customer", "rider", "orderDate", "status", "serviceobj_city", "discount")
.withColumn("createdOn", epochUdf($"orderDate"))
.withColumn("serviceObj_service", lit("Link"))
.withColumn("subtotal", lit(1.5))
.withColumn("createdts", from_unixtime(col("createdon") / 1000))
.withColumn("created_ist", from_utc_timestamp(col("createdts"), "IST"))
.withColumn("RideDate", date_format(col("created_ist"), "yyyy-MM-dd"))
Sample Order Data

In the first transformation, we exploded the data and added columns for time_level, time_value, geo_level, and geo_value.

Here we have created a user-defined function to extract time_level, time_value, geo_level, and geo_value.

case class GroupCohort(time_value: String, time_level: String, geo_value: String, geo_level: String)val zone = DateTimeZone.forID("Asia/Kolkata")def extractGroupCohort(createdOn: Long, city: String): Seq[GroupCohort] = {
val time = DateTime.now().withZone(zone).withMillis(createdOn)

var groupCohorts: Seq[GroupCohort] = Seq.empty

val timeFmt = DateTimeFormat.forPattern("yyyyMMdd")
val cityInLowerCase = city.replaceAll(" ", "_").toLowerCase()
groupCohorts = groupCohorts :+ GroupCohort(timeFmt.print(time.withDayOfMonth(1).withHourOfDay(0).withMinuteOfHour(0)), "monthly", cityInLowerCase, "city")
groupCohorts = groupCohorts :+ GroupCohort(timeFmt.print(time.withDayOfMonth(1).withHourOfDay(0).withMinuteOfHour(0)), "monthly", "india", "panindia")

groupCohorts = groupCohorts :+ GroupCohort(timeFmt.print(time.withDayOfWeek(1).withHourOfDay(0).withMinuteOfHour(0)), "weekly", cityInLowerCase, "city")
groupCohorts = groupCohorts :+ GroupCohort(timeFmt.print(time.withDayOfWeek(1).withHourOfDay(0).withMinuteOfHour(0)), "weekly", "india", "panindia")

groupCohorts
}

Post that our data frame transformation looked as follows:

val extractGroupCohortUdf = udf(extractGroupCohort _)val orderWithTimeGeo = ordersBaseDf
.withColumn("group_cohort", explode(extractGroupCohortUdf($"createdOn", $"serviceobj_city")))
.withColumn("time_level", $"group_cohort.time_level")
.withColumn("time_value", $"group_cohort.time_value")
.withColumn("geo_level", $"group_cohort.geo_level")
.withColumn("geo_value", $"group_cohort.geo_value")
.drop("group_cohort")

orderWithTimeGeo.show(false)

Add time_level, time_value, geo_level and geo_value

In the next transformation, we computed customer KPIs aggregation based on time_level, geo_value, and geo_level and associated it with time_value.

val customerKpiMonthlyAggregation = orderWithTimeGeo
.select(selectColumns: _*)
.groupBy(groupColumns: _*)
.agg(array($"time_value", concat_ws("@@", columns: _*).as("payload")).as("values"))
Customer KPIs aggregation

Then, we have created the cohort dataset with source_time_value as previous- appeared- week and compare_time_value as next- appeared- week.

The following user-defined function helped us create pairs of time as previous and next time.

case class CohortKpiRecord(time_value: String, payload: String)case class TimeValueWithKpi(source: String, compare: String, payload: String)def createKpiCohort(values: Seq[Seq[String]], level: String): List[TimeValueWithKpi] = {
val cohortKpiRecords = values.map({
case Seq(time_value, payload) => CohortKpiRecord(time_value, payload)
})
.sortBy(_.time_value)

cohortKpiRecords.toList.combinations(2)
.map({
case List(current, next) =>
TimeValueWithKpi(current.time_value, next.time_value, next.payload)
})
.toList ++ cohortKpiRecords.map({
case CohortKpiRecord(time_value, payload) =>
TimeValueWithKpi(time_value, "", payload)
})
}

The data transformation looked as follows:

val dataTypeMap = Map(
"discount" -> LongType,
"net_rides" -> LongType,
"net_gmv" -> LongType
)
val payloadColumnsWihType = columns.zipWithIndex
.map({
case (field, index) =>
val alias = field.expr.asInstanceOf[Alias]
col("data")(index).as(alias.name)
.cast(dataTypeMap(alias.name))
}).toList
val kpiUdf = udf(createKpiCohort _)val cohortBaseDf = customerKpiMonthlyAggregation
.groupBy(groupColumns
.filterNot(_.equals("time_value")).map(col): _*)
.agg(collect_set($"values").as("values"))
.withColumn("time_frame",explode(kpiUdf($"values",$"time_level")))
.withColumn("source_time_value", $"time_frame.source")
.withColumn("compare_time_value", $"time_frame.compare")
.withColumn("payload", $"time_frame.payload")
.withColumn("data", split($"payload", "@@"))
.drop("values", "time_frame", "payload")
.select($"customer" :: $"time_level" :: $"geo_level" :: $"geo_value" :: $"source_time_value" :: $"compare_time_value" :: payloadColumnsWihType: _*)
.withColumn("compare_time_value", when($"compare_time_value" === "", $"source_time_value" ).otherwise($"compare_time_value"))

cohortBaseDf.show(false)

Customer dataset with source and compare time value

In the final step transformation, where we computed customer cohort KPIs based on source_time_value, compare_time_value, time_level, geo_value, and geo_level.

val result = cohortBaseDf.groupBy("source_time_value", "compare_time_value", "time_level", "geo_value", "geo_level")
.agg(count("customer").as("compare_kpi_value"),
sum("net_gmv").as("compare_net_gmv"),
sum("net_rides").as("compare_net_rides"),
sum("discount").as("compare_discount"))
.withColumn("source_kpi_value", max("compare_kpi_value") over Window.partitionBy("source_time_value", "geo_value"))
.withColumn("source_net_gmv", max("compare_net_gmv") over Window.partitionBy("source_time_value", "geo_value"))
.withColumn("source_net_rides", max("compare_net_rides") over Window.partitionBy("source_time_value", "geo_value"))
.withColumn("source_discount", max("compare_discount") over Window.partitionBy("source_time_value", "geo_value"))
.select("source_time_value", "compare_time_value", "time_level", "geo_level", "geo_value", "source_kpi_value", "compare_kpi_value",
"source_net_gmv", "compare_net_gmv", "source_net_rides", "compare_net_rides", "source_discount", "compare_discount")

result.show(false)

Final Customer Cohort KPI

Conclusion:

The above approach is not a methodically approach like joining of previous- appeared- week customers with and next- appeared- week customers and it uses the pivot to extract KPIs.

I would like to thanks my teammates Pramod N, Pubali Bhaduri, Jaganreddyguda, and Rapido data platform team for their valuable contributions and insight.

You can also reach out to me via LinkedIn or on vikash.singh@rapido.bike if in case you are exploring roles in our Data team.

Happy coding and learning :)

--

--