Data Transformation and Visualization on the Youtube dataset using Spark

Spark is an Open Source project for data processing, built to make iterative Map Reduce operations faster. Hadoop, although very popular for similar functions, has many limitations when it comes to in-memory processing and fault recovery. Spark solves many of these problems. Spark allows faster iterative and interactive Map Reduce jobs, while keeping the data in memory. With jobs written in Scala, a Java based language, methods are easier to write and iterate on.

In this tutorial, I will introduce the basic data transformations used in Spark, and give a rudimentary introduction to Scala through examples. I will be using the Trending Youtube Video Statistics dataset hosted on Kaggle, and implementing several of the operations shown on this Notebook.

This post assumes some basic understanding of Spark data structures and the differences between them, such as a RDD, Data Frame, and a DataSet. This StackOverflow post has responses that go into detail about the different structures, and should be reviewed.

Requirements:

For this project, I used the latest version of Spark (2.21), and also Apache-Zeppelin. Zeppelin is an open source Notebook that allows you to write snippets of code and view their result quickly. Similar to Jupyter Notebook for Python, Zeppelin supports writing Spark code in Scala, as well as SQL queries for Data Visualization.

With Zeppelin and Spark installed, lets move onto the code.

Loading in the data

import org.apache.spark.sql.Row;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.functions.to_timestamp
import java.sql.Timestamp
import java.time.temporal.ChronoUnit
import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
import org.apache.spark.ml.feature.NGram
import scala.collection.mutable.WrappedArray
spark.conf.set("spark.sql.crossJoin.enabled", "true")

We start by importing in the important packages we need, these include DataFrame, Row, and utility functions like to_timestamp and NGram. I also enabled crossJoin since I will be needing it later on.

val sqlContext = SparkSession.builder().getOrCreate();
val root_dir = "/Users/chawlar/workspace/zeppelin/data/";
var ca_data = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true")
.load(root_dir + "CAvideos.csv");
var de_data = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true")
.load(root_dir + "DEvideos.csv");
var fr_data = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true")
.load(root_dir + "FRvideos.csv");
var gb_data = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true")
.load(root_dir + "GBvideos.csv");
var us_data = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true")
.load(root_dir + "USvideos.csv");

If you browse the Youtube data, you will see that there are multiple files for each country, Canada, Germany, France, England, and US. They will have to be read into separate objects, with the same options.

We can print the schema for the loaded data with printSchema :

Schema for the Youtube data

Initial Data Processing and cleaning

With the data loaded, we have to do some data processing to clean up invalid rows, duplicates, and to parse strings to the right data types.

First, let’s define a function that will convert the numerical attributes to Integer, and parse the date time attributes:

def cast_df_values(data: DataFrame, country: String): DataFrame = {

val data_processed = data.withColumn("all_views", data("views").cast(IntegerType))
.withColumn("all_likes", data("likes").cast(IntegerType))
.withColumn("all_dislikes", data("dislikes").cast(IntegerType))
.withColumn("all_comments", data("comment_count").cast(IntegerType))
.withColumn("date_trending", to_timestamp(data("trending_date"), "yy.dd.MM"))
.withColumn("time_published", to_timestamp(data("publish_time"), "yyyy-MM-dd"))
.withColumn("country", lit(country))
                                .select("video_id", "date_trending", "title", "channel_title", "category_id", "time_published", "tags", "all_views", "all_likes", "all_dislikes", "all_comments", "thumbnail_link", "comments_disabled", "ratings_disabled", "video_error_or_removed", "description", "country");
return data_processed;
}

In the above function, we create new columns with the correct data types based on the original columns, and add a new column for the Country string. We then select the new columns along with the ones that were not transformed. We can now call it on the different datasets we have:

val us_data_processed = cast_df_values(us_data, "US");
val ca_data_processed = cast_df_values(ca_data, "Canada");
val de_data_processed = cast_df_values(de_data, "Germany");
val fr_data_processed = cast_df_values(fr_data, "France");
val gb_data_processed = cast_df_values(gb_data, "England");

Finally, we need a large dataset that combines all of the rows of the different datasets we have:

val all_data_processed = us_data_processed.union(ca_data_processed).union(de_data_processed)
.union(fr_data_processed).union(gb_data_processed).dropDuplicates("video_id").filter($"all_views".isNotNull)
all_data_processed.createOrReplaceTempView("all_data_processed");

We remove all duplicates and with null values of views. We also create a “Temp view”, which is a view that can be used for running Spark SQL or SQL queries on. This view is also lazily-evaluated, which means that it is not processed until we actually call an action on it.

Data transformations and visualizations

Videos by total views, likes, dislikes, and comments

Let’s start by running a sql query on the view, to browse the titles based on the views, likes, dislikes, and total comments:

Videos by total views, likes, dislikes, and comments

When you run the query on Zeppelin, it will show a Table, which can be sorted by any of the attributes.

Videos by percentage of likes, dislikes, and comments of total views

In this step, we will be computing the percentage of likes, dislikes, and total comments from the views of each video, and displaying the results.

Let’s start by declaring a Case Class that can hold Percentage values

case class Percentages(video_name: String, percent_likes : Double, percent_dislikes : Double, percent_comments : Double)

We will compute the percentages in a Map . A Map is a function that allows us to iterate over each row of a Dataset, apply some type of transformation on the data for that row, and emit that as the output. The result is a list of the outputted values.

val videos_by_percentages = all_data_processed.map(s => Percentages(s.getAs("title"), 
(100 * s.getAs[Int]("all_likes") / s.getAs[Int]
("all_views").toFloat),
(100 * s.getAs[Int]("all_dislikes") / s.getAs[Int]
("all_views").toFloat),
(100 * s.getAs[Int]("all_comments") / s.getAs[Int]
("all_views").toFloat)))

videos_by_percentages.createOrReplaceTempView("videos_by_percentages");

In this Map function, we map each row s to an instance of the Percentages object, where we fill in the values we need in it, which are the title, percentage of likes, dislikes and comments. This returns a dataset where each row follows the Percentages schema. When we visualize it, we can see that as:

Videos by percentage of likes, dislikes, and comments of total views

Trending Channels by total views

The next transformation is a little complicated, so I will walk through all the steps in it:

val value_count_type = Seq("value", "count");
val trending_channels = all_data_processed.map(s => (s.getAs[String]("channel_title"), s.getAs[Int]("all_views").toLong))
.groupByKey(t => t._1).reduceGroups((u, v) => (u._1, u._2 + v._2)).map(s => (s._1, s._2._2)).toDF(value_count_type :_*)
trending_channels.createOrReplaceTempView("trending_channels_count")

We are doing several things here. The first step is to select only the attributes we are looking at: channel_title and all_views . We convert the views to a Long type as we map it because we want to avoid an overflow when we add the large numbers.

This is followed by a groupByKey , which groups the dataset by the key we provide, which in this case is the Channel Title. The ._1 gets the first element of the tuple returned by the previous transformation. We then pipe this to a reduceGroups function, which is a transformation that takes in 2 values, and returns 1. Both input values will always have the same key, and so by writing a reduce function, we can literally reduce the output as a sum of the views, where the channels are the same.

Finally, we map this output to a “Key Value” by keeping the title and total summed views. By calling toDF at the end, we can provide our own column names, which I have defined above as the value_count_type . Let’s visualize the result:

Trending Channels by total Views

We see the channels that have the most views, which in this case is Colbert.

Bigrams count on video titles

In the next step, I want to calculate Bigrams on the video titles, and we will visualize the most frequent bigrams by count. Spark has a ML module that provides us with the required functions here, Tokenizer and NGram. We will use both.

val tokenizer = new Tokenizer().setInputCol("title").setOutputCol("title_words");
val tokenized_titles = tokenizer.transform(all_data_processed);
val ngramizer = new NGram().setN(2).setInputCol("title_words").setOutputCol("bigrams");
val bigrams = ngramizer.transform(tokenized_titles)

The NGram function will output BiGrams in an Array of bigrams. We will transform them by flattening them out, and doing a simple group-by-key.

val bigrams_count_all = bigrams.select("bigrams").map(s => s.getAs[WrappedArray[String]]("bigrams").toSeq)
.flatMap(t => t).groupByKey(u => u).count.toDF(value_count_type: _*)
val reg = "[a-zA-Z0-9]{3,} [a-zA-Z0-9]{3,}".r;
val bigrams_count = bigrams_count_all.filter(s => 
reg.pattern.matcher(s.getAs[String]("value")).matches);
bigrams_count.createOrReplaceTempView("bigrams_count");

The first step should be easy to follow. A flatMap operation on an Array input will flatten the elements of the Array, so each element will be it’s own Row. We then do a count. Next, I use a Regular Expression that matches bigrams with numbers and letters only, because there are a lot of titles with symbols which we want to avoid. The filter transformation will only keep rows where the provided condition is true.

Bigrams in video titles by total counts

Most frequent tags on trending videos

We also have an attribute of Tags , which contains a list of all tags assigned to each video by its publisher. In the next step, we will extract these tags, and count them. The main operation is actually very similar to the previous step:

val tags_counts  = all_data_processed.map(s => s.getAs[String]("tags").replace("\"", "").split("\\|")).flatMap(t => t)
.filter(t => !t.equals("[none]")).groupByKey(t => t).count.toDF(value_count_type: _*);
tags_counts.createOrReplaceTempView("tags_counts");

In this snippet, We first get the tags, and do string manipulation to remove unnecessary quotes around them, then split them by the vertical line symbol | . I then removed the [none] tag, because it skews the results. Finally, we do a group by and count like before.

Most frequent tags on trending videos

We can see from this chart that “funny” and “comedy” are the most trending tags on the videos, followed by more specific ones like “football” and “rap”.

Time passed between Published and Trending

In the last graph, I want to visualize the time passed between Published and Trending of the videos. The actual transformation is fairly simple, but we use a Time utilities package to calculate the difference between the dates.

val time_passed = all_data_processed.filter(s => (s.getAs[Timestamp]("date_trending") != null && s.getAs[Timestamp]("time_published") != null))
.map(s => ChronoUnit.DAYS.between(s.getAs[Timestamp]("time_published").toLocalDateTime, s.getAs[Timestamp]("date_trending").toLocalDateTime))
.groupByKey(t => t).count.toDF(value_count_type: _*)
time_passed.createOrReplaceTempView("time_passed");
Time passed between Published and Trending of videos

The visualization shows that most videos trend within a day of being published.


Although Spark is not really meant for Data visualization, I used the tool to show how the different transformation methods in Spark work, and how they can be used to do advanced operations on the data. I used a fairly small dataset for this project, but Spark is designed to work with even larger data, as it parallelizes all operations. Spark’s documentation is very extensive, and you can find a lot of methods for doing exactly what you want with it. They support joining between multiple datasets, do aggregations, sorting, and other things. Although there is a learning curve to Scala, Spark itself is great tool for large data processing.