Introduction to Spark DataFrames
Spark is the new hotness in data science, but the the learning curve is steep. Scala is the only language that is fully supported by Spark. Using Python/pyspark may lead to misery and frustration because the documentation is sparse and every answer on Google is in Scala. This tutorial shows how to use a DataFrame to query a simple CSV file and write the output to a seperate file.
Spark Scala Shell Introduction
Start the Spark Scala shell with the following command:
$ ./bin/spark-shell
Show a list of all the commands that are available in the Scala shell.
scala> :help
Exit the Spark shell.
scala> :quit
DataFrames with Scala
Start the Spark shell with the dataframes spark-csv package.
$ ./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.3.0
Create a cities.csv file on your Desktop, so we have some data to play around with.
city,state
new york,ny
los angeles,ca
chicago,il
houston,tx
philadelphia,pa
phoneix,az
san antonio,tx
dallas,tx
san jose,ca
Create a DataFrame with the cities data.
The System.getProperty(“user.home”) code returns the home directory. It’s the same as echo $HOME or echo ~ in a Bash shell.
Data can also be loaded up directly from files on S3, but your computer has to be configured. I’ll write another blog post on how to work directly with files in S3.
Use the show() method to get details about the DataFrame.
scala> df.show()
+ — — — — — — + — — -+
| city| state|
+ — — — — — — + — — -+
| new york| ny|
| los angeles| ca|
| chicago| il|
| houston| tx|
| philadelphia| pa|
| phoneix| az|
| san antonio| tx|
| dallas| tx|
| san jose| ca|
+ — — — — — — + — — -+
The printSchema() method can be used to show the schema.
scala> df.printSchema()
root
| — city: string (nullable = true)
| — state: string (nullable = true)
Filtering with a DataFrame
Create a new DataFrame that only includes states that are equal to “tx”.
val tx_cities = df.where(df(“state”) === “tx”)
Verify that the tx_cities DataFrame only includes cities that are in Texas.
scala> tx_cities.show()
+ — — — — — -+ — — -+
| city | state|
+ — — — — — -+ — — -+
| houston | tx|
|san antonio | tx|
| dallas | tx|
+ — — — — — -+ — — -+
Writing a DataFrame to a CSV File
The tx_cities can be written to a CSV file with this code:
This will create a texas_cities directory in your Desktop that contains a few different files. The part-00000 file will contain the following data:
city,state
houston,tx
san antonio,tx
dallas,tx
The coalesce(1) portion of the code will force the output to a single file. Forcing Spark to write to a single file is normally a bad idea, but it used in this example for simplicity and because the data set is trivial.
Reading Multiple CSV Files into a DataFrame
Multiple different CSV files can be read into a single Dataframe. Create a people/ directory in the Desktop folder and add the following colombians.csv file:
first,last,country
sofia,vergara,colombia
shakira,ripoll,colombia
james,rodriguez,colombia
Create another file called brazilians.csv in the people/ directory:
first,last,country
neymar,junior,brazil
gisele,bundchen,brazil
michel,telo,brazil
Start the Spark shell with the dataframes spark-csv package.
$ ./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.3.0
Load both of the CSV files into a single DataFrame with the * wildcard matcher:
Use the show() method to demonstrate that the data from both of the CSV files has been loaded into the DataFrame:
scala> df.show()
+ — — — -+ — — — — -+ — — — — +
| first| last| country|
+ — — — -+ — — — — -+ — — — — +
| neymar| junior| brazil|
| gisele| bundchen| brazil|
| michel| telo| brazil|
| sofia| vergara| colombia|
| shakira| ripoll| colombia|
| james| rodriguez| colombia|
+ — — — -+ — — — — -+ — — — — +
Loading Multiple Gzipped Files
Gzip the colombians.csv and brazilians.csv files:
$ gzip colombians.csv
$ gzip brazilians.csv
The ~/Desktop/people/ directory should now contain the following two files:
brazilians.csv.gz
colombians.csv.gz
Spark is very smart about loading gzipped files into DataFrames, so the code is basically identical except for the file extension.
We can see that both of the gzipped files are now included in the DataFrame:
scala> df.show()
+ — — — -+ — — — — -+ — — — — +
| first| last| country|
+ — — — -+ — — — — -+ — — — — +
| neymar| junior| brazil|
| gisele| bundchen| brazil|
| michel| telo| brazil|
| sofia| vergara| colombia|
| shakira| ripoll| colombia|
| james| rodriguez| colombia|
+ — — — -+ — — — — -+ — — — — +