Google Analytics Data Processing Using PySpark DataFrames and AWS EMR

Fast data processing for understanding customers

Chewy
Chewy Innovation Blog
13 min readSep 22, 2020

--

By Yan Yang, Data Scientist @ Chewy

One of the biggest challenges for an e-commerce company is processing customer-generated session data. Here at Chewy’s Recommendations and Personalization Management team, we are constantly exploring ways to process session data to improve our website and better our customers’ shopping experience.

PySpark running on Amazon Web Services’s Elastic Map Reduce (AWS EMR) is one tool I use often for this kind of high-volume data processing. In this blog post, I will walk through a step-by-step example from a past project to showcase this tool and discuss how to use it effectively. This blog post is for people who are already familiar with PySpark and basic AWS services such as S3, but who want to explore more about using EMR for PySpark and learn more about a useful feature in PySpark, DataFrames.

What is the problem?

In this example, I am faced with processing a year’s worth of Google Analytics session data. The processed data is for training a machine learning model that predicts whether a customer is going to make a purchase by the end of her session. The purpose of making this prediction is to proactively help the customer. If the probability of making a purchase is low, perhaps the customer is having trouble finding the products she needs and Chewy can make more helpful recommendations; if the probability is high, Chewy can make sure the user can access a shortcut to checkout. In this blog post, I am going to focus on the data processing part (I won’t be talking about the model itself). But interested readers can look at a paper by Baumann A. et al. for a general introduction to the problem of predicting purchase probability.

My hypothesis for the model is that three types of features are important to the model: the query part of the URL, event actions, and user browsing patterns. Whenever a customer inputs a search term into a query, I hypothesize that she has a strong inclination to purchase a particular product. If she also looks at the second and the third pages of the returned results, this inclination is even stronger. Event actions are also informative regarding the customer’s intention. Two of the most revealing actions are adding a product to the shopping cart and removing a product from the shopping cart. User browsing patterns are the way a customer browses the webpages. Is she browsing the pages linearly and never going back to a previous page, or is she browsing between two products repeatedly? The paper by Baumann A. et al. indicates that there is a strong association between the type of behaviors and a user’s intention to buy.

These three types of features correspond to three processing tasks. For the query part of URL, I extract three features: 1) whether the URL contains queries, 2) how many equal signs are in the query and 3) how many numbers are in the query. For example, in the URL (not an actual url, just for demonstration purposes):

The query part of this example is `rh=number288anothernumber335thirdnumber1539equal=3`. This first processing task would return three features:

  • ‘is_query’ = 1 — Because this URL contains a query
  • ‘num_equals’ = 2 — There are two equal signs in the query string
  • ‘num_numbers’ = 4 — there are four numbers in the query string. These numbers are 288, 335, 1539 and 3.

The second processing task is to create a boolean list of event actions that happened on this page. The event actions are defined in an event action list. This list contains events such as `add_to_cart` event, for which a customer clicked the “Add to Cart” button on this page. There are five events in the list, and this processing task is to return a boolean list in which a one means an event has happened and a zero otherwise. For example, if the customer added a product to the cart on this page but then subsequently removed it, then there are two events `add_to_cart` and `remove_from_cart`. Suppose no other events happened on this page, then the second processing task would produce a boolean list of `[1, 1, 0, 0, 0]`, where the two ones correspond to the two events that happened and zeros are the events that didn’t happen.

The third processing task is more involved. For each sequence of web pages in a single session, this task is to create a graph and calculate the graphic metrics, such as the number of simple cycles, at each step of the sequence. These graph metrics summarize the browsing patterns in a numerical way. For example, a customer visited site A → site C → site D → site A → site B. I represent this session sequence in a graph in Figure 1. Each node represents a site the customer visited during this session, and each directed edge represents the visiting direction. Figure 1 also labeled one of the graph metrics called “cycle”. This metric counts the number of simple cycles so far in the sequence. The simple cycle happened at the fourth step in the example, when the customer revisited site A, so the cycle is A to B to C to A.

Figure 1. An example of processing task 3. The visiting sequence is displayed in the row labelled “Path”. “Cycle” denotes the number of simple cycles so far in the sequence. There is only one simple cycle in the sequence that happens at the fourth step when the customer revisited site A. Each node represents a site the customer visited during this session, and each directed edge represents the visiting direction.

What does the raw data look like?

I stored raw data in compressed CSV files in a S3 bucket named “s3://my_bucket/raw-data”. There are several million rows in total, stored into multiple S3 keys. Each row has five columns, including session ID, page URL, page title, the sequence of the page in the session, and all event actions on this page. The table below describes in details what these columns look like and it also gives an example of each column.

Since the data I have to process is over several million rows, PySpark is useful to speed up the processing tasks. I chose an AWS EMR cluster for executing PySpark, because it is optimized to be 3x faster than the standard Apache PySpark. EMR also has the added benefit of the AWS ecosystem, including direct access to S3 buckets that can make organizing your data much easier. AWS EMR jobs can be launched through either console, AWS CLI or AWS Python SDK, the Boto3 Python library. I usually prefer launching jobs through Boto3, because this is the easiest way to automate a job pipeline. Boto3 has detailed documentations on EMR client, including methods to launch jobs and terminate jobs. The overall steps to launch EMR jobs via Boto3 are in the following sections.

Step 1. Modularize PySpark code in a self-contained folder

Each EMR job uses the PySpark code stored in a specific S3 bucket to process data. I have to upload my PySpark code in the project root folder to my project S3 bucket in order for this process to happen. My common practice is to modularize all PySpark related scripts in a self-contained folder named `src/emr` directory under my project root directory. Figure 2 shows my typical project structure. By separating my launch script from my PySpark script, I can automate the S3 upload easily without effecting the rest of my code in the project directory. File upload is handled through Boto3 upload_file method.

Figure 2. An example folder structure that separates the driver script of launching EMR jobs (launch_emr.py) from the actual PySpark code that runs data processing (the code in the folder src/emr).

Version 5.29.0 of EMR supports Python version 3.4. My PySpark code usually requires additional python libraries to install on worker nodes. To install additional libraries, I add a `requirements.sh` file in my `src/emr` folder and upload along with other PySpark scripts to the aforementioned S3 bucket. This `requirements.sh` file is for bootstrap installation of additional Python modules used by the PySpark code. In the example described in this blog post, the `requirement.sh` looks like the following in Figure 3.

Figure 3. An example requirement.sh

The shell script first installs pip and then uses pip to install all Python libraries I need to the worker node for this project.

Step 2. Set up job launch environment and compose job launch code

Boto3 provides comprehensive documentation on the job launch method. The procedure is to first create an EMR service client through boto3 Session object:

AWS access key ID and AWS secret access key can be created through AWS IAM. I followed the procedures outlines in AWS documentation to obtain them. I recommend not writing these keys in the code directly, so that they are not accidentally revealed through code commit. One practice I follow is to store my keys in a local, password protected file and access them through file read or environmental variables. In the above code, I also need to provide my region, for example, “us-east-1”.

The second part of the driver script is to create job flow through the client:

The code above kicks off an EMR job. I have omitted the parameters with “…”. Boto3 documentation lists all possible parameters. I will mention three parameters in this blog post. The first one `Instance=` describes the Amazon EC2 instances of the job flow. It takes a dictionary and one of the keys for the dictionary is “InstanceGroups”. It defines details about the instance groups in a cluster. Figure 4 displays an example.

Figure 4. An example instance group

The second parameter I want to mention is “Steps=”. It is a list of StepConfig to be executed by the job flow. Figure 5 shows an example. In this example, “Args” in “HadoopJarStep” is where I specify the spark submit command line in a list format. The example submit is equivalent to “spark-submit — deploy-mode cluster s3://my_bucket/src/process_data.py”, where the S3 key “s3://my_bucket/src/process_data.py” contains the PySpark code I uploaded in step 1.

Figure 5. An example of Steps parameter

The third parameter is “BootstrapActions=”. This parameter specifies the location of “requirement.sh” script from Step 1 to install all necessary libraries. Figure 6 displays an example. In this example, the `requirement.sh` file is first uploaded to a S3 bucket named “my_bucket” with a prefix “src” and then during the job launch, `requirement.sh` file executes first and installs all necessary libraries shown in Figure 3.

Figure 6. An example of BoostrapAction parameter

After the EMR client performs `run_job_flow` (reproduced in Line 1 of Figure 7), cluster ID can be obtained by get(“JobFlowId”). In Line 2, I use this cluster ID to find the pending step in the cluster. This pending step is the PySpark code I just submitted. Line 4 returns the status of the job and Line 5 to Line 9 force the command window to wait before the submitted job is completed if the status is not terminated. The wait is often necessary in an automated data pipeline so that the next step in the pipeline can only execute after the data processing step completes. The last line terminates the cluster. Once the script is launched, an EMR job is created and accessible in the AWS EMR console.

Figure 7. The complete code for submitting EMR jobs

Example PySpark code

In the previous section, I showed the process of submitting a PySpark job to EMR. One thing I omitted is the PySpark code itself. In Figure 5 from the previous section, the PySpark code is the script at the location “s3://my_bucket/src/process_data.py”. In this section, I am going to walk through the three processing tasks in this script: 1) three query features, 2) event actions, and 3) graph metrics. The entire processing procedure is done with PySpark DataFrames.

PySpark DataFrames

PySpark DataFrames are introduced in Spark 1.6. The concept is analogous to Python’s Pandas DataFrame, but it provides the benefits of RDDs with its strong typing and lambda functions. PySpark DataFrames inherit Spark SQL’s optimized execution engine. They are conceptually equivalent to tables in a relational database, and they can be created from a wide array of sources such as structured data files, tables in Hive, external databases, or existing RDD. A DataFrame object is organized into named columns. In addition, a DataFrame shares characteristics with a RDD in that it is immutable in nature; its task is lazily evaluated; and it is distributed. As such, a DataFrame can handle petabytes of data.

Spark session setup and data input

Set up the Spark session

I am using PySpark version 2.4.5. This version supports `SparkSession`, a part of a handy DataFrame APIs that supports issuing Spark SQL queries without explicitly creating a SparkContext or SQLContext . The session builder in Line 3 calls `getOrCreate()`. This call instantiates a SparkSession object if one does not already exist. The benefit of using this call, instead of creating SparkSession, is that we can only have one active SQLContext per JVM and this command is useful when Spark applications may wish to share a context. Line 4 is a method that I created to delete previously processed data if they exist. Figure 8 below defines this method. The method mainly uses Boto3’s list_objects and delete_object to delete all keys in a bucket. Line 5 creates an input PySpark DataFrame from compressed CSV files. The CSV files are read from “s3://my_bucket/raw-data”. The schema is not inferred but specified. The specified schema is in Figure 9.

Figure 8. The method definition for delete_data
Figure 9. Data schema for the input

Processing task 1: URL features

The goal for the processing task 1 is to extract three features from the URL: 1) whether the URL contains queries, 2) how many equal signs are in the query and 3) how many numbers are in the query. Figure 10 displays the `process_url` method that takes the original DataFrame, the spark session and the URL column name to output the three extracted features. Line 7 to Line 11 define and register a user defined function that extracts the three features. A user defined function turns a regular Python function into a DataFrame function. Line 9 specifies the return type of the function, which is an array of integers. Line 13 to Line 20 define the regular Python function for feature extraction. It first uses a URL parser to get the query part of the URL and then returns the three features by checking if the query exists, how many equal signs were found, and how many numbers were matched. In Line 23, `.withColumn` creates a column in the DataFrame called “all_features”, containing tuples of three features, via the user defined function. Line 24 caches the “all_features”, because DataFrame functions are lazily executed, any task failure would restart the calculation all over again for “all_features”. By caching the results before proceed, any restart would continue using the “all_features” already cached. Line 25 to Line 27 create a column for each feature.

Figure 10. The method definition for processing URL

Processing task 2: event action features

Event actions are converted to boolean columns. For example, if the events “add to cart” and “remove from cart” happened on a page and no other events happened, then the boolean columns have ones for these two events and zeros for all other events. In Figure 11, Line 8 defines a list of all event action booleans. Line 9 uses the col object to specify the event column and Line 10 to match one of the event types in the event_actions. Line 11 gives the result the column name of “event_{event_type}”. Line 14 applies the matched columns to the DataFrame and return the results. For example, if the customer added a product to the cart on this page but then subsequently removed it, then there are two events `add_to_cart` and `remove_from_cart`. Suppose no other events happened on this page, then the method could create five columns for which only event_add_to_cart and event_remove_from_cart have ones and all other columns have zeros.

Figure 11. The method definition for processing event actions

Processing task 3: Graph features

The graph features is the most complicated processing task out of all three tasks. It consists of two parts. The first part is to create Python functions that convert a page visiting sequence into graphs and calculate graph metrics. And the second part is to convert and apply these functions to a PySpark DataFrame.

Part I. Convert a page visiting sequence into graphs and calculate graph metrics
The `create_graph_for_each_step` method in Figure 12, constructs a directed graph for each step in a sequence. For example, if the page sequence is [“page1”, “page2”, “page3”], then the method creates a graph for “page1”, “page1” -> “page2”, and “page1” -> “page2” -> “page3”. For each graph, the method then uses `calculate_graph_measures` to get a dictionary of graph metrics. To construct a directed graph in `build_digraph` method on line 26, I used Python networkx library.

Figure 12. The method definitions for converting a visiting sequence into graphs and calculating graph measures

Figure 13 displays `calculate_graph_measures` method. I originally calculated thirteen different graph metrics. However, for the sake of brevity, I only listed “simple cycles” calculation here. The interested readers can look at the paper by Baumann A. et al. for the other metrics.

Figure 13. The method definition for graph measure calculations

Part II Convert and apply graph metrics to a PySpark DataFrame

In the original input DataFrame, each page is a row. I need to apply the graph methods in Part I to a session, instead of a page, so I need to aggregate all pages belonging to the same session in the DataFrame into one row. For example, if the original DataFrame looks like this:

session id      page sequence       page name
1 1 a
1 2 b

I need to convert it into this:

session id      page name
1 [a, b]

I accomplish this by the method `create_session_based_sequence` defined in Figure 14. Line 14 and 15 demonstrate how to use SQL-like command in DataFrame. In Line 14, the DataFrame selects the columns of session ID, sequence and page name. Line 15 groups the rows by session ID and Line 16 aggregates rows with the same session ID into a list of (sequence, page name)-tuples and stores the list into a column called “seq_and_page_name_tuple”. The method then invokes `create_sort_by_first_udf` method also shown in Figure 14 to sort the tuples in the list by sequence numbers from the lowest to the highest and return the page name. This step orders the page name in the correct visit sequence in the session.

Figure 14. The method definition to aggregate sessions into a single row

Finally in Figure 15, the graph measures are calculated for the DataFrame.

Figure 15. Method definition to calculate graph measures in DataFrame

Ready to try PySpark on AWS EMR?

Running PySpark on AWS EMR costs less than half of the cost for traditional on-premises solutions and over 3 times faster than the standard Apache Spark. In this blog post, I shared with you one of my data processing examples using PySpark’s DataFrame. If you want to use AWS EMR in your project, Amazon EMR FAQs is a good place to start. Follow the information and tips in this post to add another tool in your data processing toolbox today.

by Yan Yang

Data Scientist@ Chewy

If you have any questions about careers at Chewy, please visit https://www.chewy.com/jobs

--

--