Predict User Churn @Sparkify
This project is to analyze user log data from Sparkify to build a model to predict churn.
1. Introduction
Sparkify is a fictitious music streaming company. They make money from two sources:
- A free tier supported by advertising
- A premium tier with paid subscription
User from both free tier and premium tier are revenue generators. Thus churn is defined as an event that a user, either from free tier or premium tier, cancels his/her service. Sparkify needs to find ways to predict which user(s) is/are likely to churn before it actually happens so they can intervene ahead of time. Some options to intervene include offering longer trial period, and offering more credits/discounts for converting to premium tier or renewing for longer services etc.
The full dataset is 12GB and is hosted on the AWS S3 bucket. To analyze data this big, Spark is an ideal choice. Spark is a great tool for large scale data processing. It lets you spread data and computations among clusters. Each cluster has its nodes (computers) that do parallel processing and computations.
A good practice to analyze big data is to take a small sample dataset (in this case, 128MB as provided by Udacity) and analyze it on a local machine to explore relevant features and develop a working model. Then similar steps and cleaner/refactored codes are used to develop a final working model for the full dataset(12GB) on the cloud, such as AWS EMR clusters. The detailed analysis can be found at my Github repo.
PySpark is a Python API to use Spark. I chose to analyze the sample dataset first on my local machine (Windows 10) in Jupyter notebook. I followed Chang’s instructions to install PySpark on my Windows 10 PC. It did take me quite a while to install and run PySpark locally in Jupyter notebook on Windows. Given the efforts, I should have tried installing pyspark-notebook with Docker.
2. Analyze sample dataset on local machine
2. 1 Data preparation
2.1.1 Define churn
Overall, I followed the CRISP-DM framework. As to business understanding, the goal of this project is to analyze user log data from Sparkify to build a model to predict user churn and to find out what factors contribute to user churn the most. Churn is defined as an event that a user, either from the free tier or the premium tier, cancels his/her service. In the dataset, the event corresponds to Cancellation Confirmation
in the page
column.
flag_cancellation_event = udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, IntegerType())
df = df.withColumn("churn", flag_cancellation_event("page"))
After understanding the business problems to solve, the next part is about data analysis. The overall structure is summarized in the graph below.
2.1.2 Clean data
The schema and first row of the data are below.
Data preparation includes data cleaning, feature engineering and feature selection. The sample dataset has 286,500 rows. After filtering out empty userId
, there are 278,154 rows left.
2.1.3 Create additional features
The next step is to create additional features that might be helpful to build the model. Thinking of what features might help predict churn, the first thing that came up to my mind is how long a user has stayed with Sparkify.
df = df.withColumn("daysSinceRegistration", F.ceil((df.ts - df.registration)/(1000*60*60*24)))
The next feature is about userAgent
, which is what hardware/instrument/system was used by a user when using Sparkify. I categorized userAgent
into four broad categories: Windows, Apple, Linux, and other (turns out to be none).
df = df.withColumn('system',
when(col("userAgent").contains('Windows'), 'Windows')
.when((col("userAgent").contains('Macintosh')) | (col("userAgent").contains('iPhone')) | (col("userAgent").contains('iPad')), 'Apple')
.when(col("userAgent").contains('Linux'), 'Linux')
.otherwise('other'))
Theoretically, a user could churn multiple times, use various forms of userAgent
, or switch between ‘paid’ and ‘free’ levels. To simplify the analysis, only the latest churn, gender, userAgent
, and level
of each user was used.
To get the latest, I created a window function to rank each user’ activity from latest to earliest. Both two columns — ts
and itemInSession
have to be used to avoid a tie. This is critical because when grouping by each individual user and coming up with one row with all the characters/features, we want one and only one row for each user. A tie would give us more than one row for a user.
windowval = Window.partitionBy("userId").orderBy(desc("ts"), desc("itemInSession"))
df = df.withColumn("rank", rank().over(windowval))
Next is to calculate each users’
songHours
— total number of songs listenedn_NextSong
— total length/duration of songs listened
df2 = df.filter(df.page == 'NextSong').groupBy("userId") \
.agg((F.sum('length')/(60*60)).alias('songHours'), F.count('userId').alias('n_NextSong'))
Then I calculated each users’
totalDays
— total number of days from registration date to latest activity dateactiveDaysPct
— actual number of active days divided by totalDays (active days are days when a user used Sparkify)session_perActiveDay
— total number of sessions divided by the number of active daysitem_perSession
— total number of page items divided by the total number of sessions
df3 = df.groupBy("userId") \
.agg(F.max('daysSinceRegistration').alias('totalDays'), F.count('itemInsession').alias('item_ct'), \
F.countDistinct('sessionId').alias('session_ct'), F.countDistinct('date').alias('activeDays'))df3 = df3.filter(df3.totalDays >= 1)
df3 = df3.withColumn('activeDaysPct', df3.activeDays/df3.totalDays) \
.withColumn('session_perActiveDay', df3.session_ct/df3.activeDays) \
.withColumn('item_perSession', df3.item_ct/df3.session_ct) \
.drop('item_ct', 'session_ct')
Next, for each user, I calculated the average number of page views per song. Then, I dropped the total number of page views for each user.
df4 = df.filter(~df.page.contains('Cancel')).groupby("userId").pivot("page").count().fillna(0)pages_column_list= list(map(lambda x: 'n_' + x.replace(" ", ""), df4.columns[1:]))
new_columns_list = pages_column_list.copy()
new_columns_list.insert(0, df4.columns[0])
df4 = df4.toDF(*new_columns_list)for col_name in df4.columns:
if col_name not in ('userId', 'n_NextSong'):
df4 = df4.withColumn(col_name + '_perSong', round(col(col_name)/col("n_NextSong"), 5))df4 = df4.drop(*pages_column_list)
Next, I joined all engineered data frames together and created two more numeric features.
df5 = df1.join(df2, on='userId').join(df3, on='userId').join(df4, on='userId')df5 = df5.withColumn('songs_perActiveDay', round(df5.n_NextSong/df5.activeDays, 1))df5 = df5.withColumn('songHours_perActiveDay', round(df5.songHours/df5.activeDays, 2))
This df5 has all the features that I think are helpful to build the model. At this stage, the data looks like below.
2.1.4 Collinearity
Of course, not all features will stay. For categorical features, I kept gender
, system
and level
. For numeric features, I plotted feature correlation and dropped those features that have correlations equal to or higher than .8 to other features.
Based on the correlation matrix graph, I dropped activeDays, n_NextSong, songHours, songs_perActiveDay, item_perSession
.
2.1.5 Exploratory Data Analysis
The Countplots below show the number of users who churn vs not churn between/among categorical features — gender
, system
and level
. It seems Linux users tend to churn more often than Apple or Windows users. This can be deceptive. Later, the final model will tell us more.
The Boxplots of all numeric features between churn and not churn below show:
● Compared to not churn users, churn users have on average, more ThumbsDown_perSong
, more session_perActiveDay
, more RollAdvert_perSong
, higher Upgrade_perSong
, higher Home_perSong
, higher Settings_perSong
, SaveSettings_perSong
, higher ThumbsDown_perSong
than not churn users.
● Compared to not churn users, churn users have on average, less songHours_perActiveDay
, less About_perSong
, less totalDays
, less AddFriend_perSong
, less ThumbsUp_perSong
than not churn users.
2.1.6 StringIndexer, OneHotEncoder, VectorAssembler and possibly Scaler
Numeric data is needed for modeling with Spark. The majority of the data features are already in numeric format. However, there are also 3 categorical features — gender
, system
and level
. I will use StringIndexer and OneHotEncoder to convert them into numeric features and use VectorAssembler to convert the newly created numeric features into vectors.
After converting categorical features into vectors, I need to decide if I need to scale the numeric features. For Logistic Regression classifier, scaling numeric features will help compare feature importance much easier, because scaled features are all on the same scale. On the other hand, Random Forest and Gradient-Boosted Trees are tree-based classifiers and tree-based classifiers do not require feature scaling. For this project, I will compare model metrics before doing any feature scaling. If Logistic Regression model’s metrics are no better than the other two models, then feature scaling is not needed.
The stages of data transformation pipeline are:
[StringIndexer_fb6916ca0d2c,
OneHotEncoderEstimator_6904f39c0afc,
StringIndexer_21434ff34d73,
OneHotEncoderEstimator_7dbd51fd24cd,
StringIndexer_57305b26c6c5,
OneHotEncoderEstimator_4932306fdaa7,
StringIndexer_01af795c385b,
VectorAssembler_20bbf84088e2]
The codes to create the stages are below:
stages = []for col in categorical_cols:
stringIndexer = StringIndexer(inputCol=col, outputCol=col + "Index")
encoder = OneHotEncoderEstimator(inputCols=[col + "Index"], outputCols=[col + "Vec"], dropLast=False)
stages += [stringIndexer, encoder]label_indexer = StringIndexer(inputCol = 'churn', outputCol = 'label')
stages += [label_indexer]numericInputs = numeric_cols
categoricalInputs = [c + "Vec" for c in categorical_cols]
assembler = VectorAssembler(inputCols= numericInputs + categoricalInputs, outputCol='features')
stages += [assembler]
And finally, I created a pipeline, and transformed df5 to the desired vector format.
pipeline = Pipeline(stages=stages)
df6 = pipeline.fit(df5).transform(df5)
Below is a screenshot of the last several columns. Categorical columns were indexed, and one-hot coded correctly. Features column is indeed in vector format. Only features and label columns will be used for modeling.
df7 = df6.select('label', 'features')
The next step is to add a classifier. If more than one classifier is tested (three in our case), the same stages of StringIndexer, OneHotEncoder, and VectorAssembler will need to be performed more than once. This is not efficient. To offer more flexibility to try various classifiers without repeatedly processing the data, my pipeline will exclude the classifier part.
2.2 Modeling using sample dataset
The first step of modeling is to split the data into train and test data sets. In our case, the data is heavily imbalanced. Among all 225 users, only 52 churned. The churn rate is 52/(52 + 173) = 23.1%. I decided to use stratified sampling to make sure the train/test datasets have similar churn/not churn ratios to that of the whole dataset. I chose to use 80% (191 users) of data for training, and the rest 20% (34 users) for testing. After stratified sampling, in the training dataset, the churn ratio is 44/(147 + 44) = 23.0%, very close to the whole sample dataset.
Because of the imbalanced nature of sample data, I selected F1 score as the main metric to compare model performances. F1 score keeps a balance between precision and recall. For F1 to be high, both precision and recall need to be high. In addition to F1 score, I also considered accuracy, precision, recall, and areaUnderROC.
F1 = 2/(1/precision + 1/recall)
I chose Logistic Regression as my first classifier. Below are the parameter grid and CrossValidator. The evaluator is BinaryClassificationEvaluator with areaUnderROC as the default metric.
grid1 = ParamGridBuilder() \
.addGrid(lr.regParam, [0, 0.01]) \
.addGrid(lr.elasticNetParam, [0, .5]) \
.addGrid(lr.maxIter, [50, 100]) \
.build()cv_lr = CrossValidator(estimator=lr,
estimatorParamMaps=grid1,
evaluator=evaluator,
numFolds = 4,
seed = 42
)
2.3 Logistic regression model metrics on sample dataset
I used 2³ = 8 combinations of parameters and the numFolds is set to 4. It took 6,169 seconds to run. Unfortunately, BinaryClassificationEvaluator does not readily provide accuracy, precision, recall or F1. So I manually calculated them. The numbers are below.
- test accuracy: 0.765
- test precision: 0.000
- test recall: 0.000
- test F1 score: 0.000
- test Area Under ROC: 0.644
This logistic regression model did horrible in precision, recall and F1 on the test dataset. The accuracy is as good as an educated guess. Because the data is imbalanced, a person always betting ‘not churn’ will get 77% correctly classified, which is exactly the test accuracy at 0.765. More analysis definitely needs to be done.
2.4 Refactor code and create functions
After the data exploration part is done, I created functions with refactored codes for feature engineering, train test split, modeling and calculation of model metrics. Instead of creating just one function for ETL, I chose to create two separate functions, data_processing
and data_assembler_scaler
. Between these two functions, there is an extra step to plot feature correlations. This extra step gives me more flexibility to drop any feature/column that has high correlation with other features/columns as the data structure may change in the near future.
2.5 Try three classifiers
I applied these functions on the sample dataset with three classifiers -LogisticRegression, RandomForestClassifier, and GBTClassifier.
For the sample dataset, model metrics show LogisticRegression is the worst classifier. RandomForestClassifier and GBTClassifier have close metrics performances for the training dataset. However, for the test dataset, RandomForestClassifier performs better and takes significantly less training time than GBTClassifier.
3. Analyze full dataset on AWS
3.1 AWS EMR instance and notebook driverMemory setup
The next part is about how to apply what I learned from part I (sample dataset 128MB) to the full dataset (12GB) on AWS. I followed detailed instructions from Udacity to create, configure and launch EMR cluster and notebook. The instance type is m5.xlarge
, and the number of instance was set at 6.
During the initial setup of the notebook, I found the system default driverMemory
of 2048MB was not enough for the full dataset. So I increased it to 10GB and it triggered a kernel restart. Then, I had trouble installing python packages in AWS EMR because the kernel restarted and the restart reset spark.pyspark.virtualenv.enabled
to false and install_pypi_package can only be called when spark.pyspark.virtualenv.enabled
is set to true. Thanks to Udacity tutor Survesh for suggesting configuring all parameters in one code cell. It worked. The working code is below.
%%configure -f
{"driverMemory": "10000M",
"conf":{
"spark.pyspark.python": "python3",
"spark.pyspark.virtualenv.enabled": "true",
"spark.pyspark.virtualenv.type":"native",
"spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
}
}
At this stage, I was ready to apply all the functions from part I on the full dataset. Due to time and budget constraints, I only checked 2³ = 8 parameter combinations each for LogisticRegression and GBTClassifier and 3³= 27 combinations for RandomForestClassifier. It does give me reasonably good results.
#1 logistic regression
lr = LogisticRegression()
grid1 = ParamGridBuilder() \
.addGrid(lr.regParam, [0, 0.01]) \
.addGrid(lr.elasticNetParam, [0, .5]) \
.addGrid(lr.maxIter, [50, 100]) \
.build()#2 RandomForestClassifier
rf = RandomForestClassifier()
grid2 = ParamGridBuilder() \
.addGrid(rf.numTrees, [10, 20, 30]) \
.addGrid(rf.maxBins, [16, 32, 48]) \
.addGrid(rf.maxDepth, [3, 5, 10]) \
.build()#3 GBTClassifier
gbt = GBTClassifier()
grid3 = ParamGridBuilder() \
.addGrid(gbt.maxDepth, [3, 7]) \
.addGrid(gbt.maxBins, [16, 32]) \
.addGrid(gbt.maxIter, [10, 20]) \
.build()
3.2 Compare model metrics
Comparing the metrics of three classifiers, apparently, LogisticRegression has the worst performance metrics, in both train and test dataset, pretty much in every aspect. Compared to RandomForestClassifier, GBTClassifier has almost the same areaUnderROC, but slightly better F1 score. Overall, the GBTClassifier is the best classifier in terms of F1 and areaUnderROC. The GBTClassifier does take more time (1490/8 = 186 seconds) to run than RandomForestClassifier (1010/27 = 37 seconds) for each parameter combination.
model TRAIN -------------------------------------------------
accuracy precision recall F1
0 LogisticRegression 0.826737 0.745799 0.345729 0.472446
0 RandomForestClassifier 0.882950 0.933515 0.515075 0.663860
0 GBTClassifier 0.867050 0.806964 0.535678 0.643914
TRAIN TEST ------------------------------------------
areaUnderROC accuracy precision recall F1 areaUnderROC
0 0.809884 0.823125 0.736730 0.340196 0.465459 0.802923
0 0.904375 0.841988 0.800781 0.401961 0.535248 0.835776
0 0.882227 0.839991 0.736177 0.456863 0.563823 0.833191
TRAIN
training time (s)
0 830.472881
0 1010.639215
0 1490.684941
3.3. Feature importance of GBTClassifier
Since GBTClassifier performs the best for the test sample dataset, I plotted feature importance for this model. The function to extract feature importance was borrowed directly from
For decision tree models, feature importance indicates how well each feature is able to construct decision trees (split the data into different nodes).
The Feature Importance graph shows that totalDays is the most important feature to predict user churn. Most features related to page views per song are important. The top six most import features:
totalDays
— total number of days from registration date to latest activity dateactiveDaysPct
— actual number of active days divided bytotalDays
(active days are days when a user used Sparkify)songHours_perActiveDay
— number of listening hours per active dayn_RollAdvert_perSong
— number of RollAdvert per songn_Downgrade_perSong
— ratio of Downgrade per songn_ThumbsUp_perSong
— ratio of ThumbsUp per song
User systems (PC/Apple/Linux) and user gender (M/F) are the least important features.
Among the top most important features that affect user churn, four features seem to have a wider spread in terms of average values between users who churn versus those who do not churn (see boxplot in 2.1.4).
- Users that have stayed with Sparkify shorter have more tendencies to churn.
- Users with less listening hours per active day have more tendencies to churn.
- Users encountered more RollAdvert have more tendencies to churn.
- Users with more ThumbsDown_perSong have more tendencies to churn.
Also, the majority of users who churn will churn within two months after registration. This mean, during this two month window, special attention have to be directed to all newly registered users, and especially those who begin to show signs of churn, as detected by the model.
The rest three items on the list are closely related to user experience and behavior. It suggests that Sparkify may need to continuously improve its music library (i.e. add more songs, singers and genres, or provide higher quality songs) to better cater to users tastes, while carefully balancing the number of RollAdvert. By doing so, hopefully users will start to spend more hours listening to their favorite songs, and have more positive experience with Sparkify.
4. Limitations
To simplify the analysis, the final model does not consider trends of user behavior. The trends of user behavior may be a significant factor in predicting user churn. For a regular user, if there is a declining trend of positive user activity (songHours_perActiveDay
, n_ThumbsUp_perSong
etc.), or there is an increasing trend of negative user experience (n_ThumbsUp_perSong
, n_RollAdvert_perSong
, n_Downgrade_perSong
etc.), that user may have a higher tendency to churn.
Lastly, due to time and budget constraints, I only checked 8 parameter combinations each for LogisticRegression and GBTClassifier and 27 combinations for RandomForestClassifier. The model could be improved with more time and budget.
5. Conclusion
In this project, I analyzed user log data from Sparkify using PySpark and successfully built a gradient-boosted tree model to predict user churn. I have identified the most import factors contributed to user churn and provided actionable insights about how to improve user engagement with Sparkify.