EXPEDIA GROUP TECHNOLOGY — DATA
Working with JSON in Apache Spark
Denormalising human-readable JSON for sweet data processing
JSON is omnipresent. However, it isn’t always easy to process JSON datasets because of their nested structure. Here in this tutorial, I discuss working with JSON datasets using Apache Spark™️. Previously, I’ve published several blogs about Apache Spark as mentioned below :
- Start Your Journey with Apache Spark — Part 1
- Start Your Journey with Apache Spark — Part 2
- Start Your Journey with Apache Spark — Part 3
- Deep Dive into Apache Spark DateTime Functions
- Deep Dive into Apache Spark Window Functions
- Deep Dive into Apache Spark Array Functions
- Apache Spark Structured Streaming
Please have a look if you haven’t already.
Let’s play with sample JSON. We will use the sample data below in this blog. You can find the data file here at GitHub® as well.
{
"id": "0001",
"type": "donut",
"name": "Cake",
"ppu": 0.55,
"batters":
{
"batter":
[
{ "id": "1001", "type": "Regular" },
{ "id": "1002", "type": "Chocolate" },
{ "id": "1003", "type": "Blueberry" }
]
},
"topping":
[
{ "id": "5001", "type": "None" },
{ "id": "5002", "type": "Glazed" },
{ "id": "5005", "type": "Sugar" },
{ "id": "5007", "type": "Powdered Sugar" },
{ "id": "5006", "type": "Chocolate with Sprinkles" },
{ "id": "5003", "type": "Chocolate" },
{ "id": "5004", "type": "Maple" }
]
}
Import Required Libraries
Before we begin to read the JSON file, let’s import useful libraries.
from pyspark.sql.functions import *
Read Sample JSON File
Now let’s read the JSON file. You can save the above data as a JSON file or you can get the file from here. We will use the json function under the DataFrameReader class. It returns a nested DataFrame.
rawDF = spark.read.json("<PATH_to_JSON_File>", multiLine = "true")
You must provide the location of the file to be read. Also, we used multiLine = true because our JSON record spans multiple lines. You can find a detailed list of options here which can be used in the above json function.
Explore DataFrame Schema
We use printSchema() to display the schema of the DataFrame.
rawDF.printSchema()
Output
root
|-- batters: struct (nullable = true)
| |-- batter: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- id: string (nullable = true)
| | | |-- type: string (nullable = true)
|-- id: string (nullable = true)
|-- name: string (nullable = true)
|-- ppu: double (nullable = true)
|-- topping: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: string (nullable = true)
| | |-- type: string (nullable = true)
|-- type: string (nullable = true)
Looking at the above output, you can see that this is a nested DataFrame containing a struct, array, strings, etc. Feel free to compare the above schema with the JSON data to better understand the data before proceeding.
For example, column batters is a struct of an array of a struct. Column topping is an array of a struct. Column id, name, ppu, and type are simple string, string, double, and string columns respectively.
Convert Nested “batters” to Structured DataFrame
Now let's work with batters columns which are a nested column. First of all, let's rename the top-level “id” column because we have another “id” as a key of element struct under the batters.
sampleDF = rawDF.withColumnRenamed("id", "key")
Let’s try to explore the “batters” columns now. Extract batter element from the batters which is Struct of an Array and check the schema.
batDF = sampleDF.select("key", "batters.batter")
batDF.printSchema()
Output
root
|-- key: string (nullable = true)
|-- batter: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: string (nullable = true)
| | |-- type: string (nullable = true)
Let's check the content of the DataFrame “batDF”. You can find more details about show function here.
batDF.show(1, False)
Output
+----+------------------------------------------------------------+
|key |batter |
+----+------------------------------------------------------------+
|0001|[[[1001, Regular], [1002, Chocolate], [1003, Blueberry]]] |
+----+------------------------------------------------------------+
We have got all the batter details in a single row because the batter is an Array of Struct. Let's try to create a separate row for each batter.
Let’s create a separate row for each element of “batter” array by exploding “batter” column.
bat2DF = batDF.select("key", explode("batter").alias("new_batter"))
bat2DF.show()
Output
+----+--------------------+
| key| new_batter|
+----+--------------------+
|0001| [1001, Regular]|
|0001| [1002, Chocolate]|
|0001| [1003, Blueberry]|
+----+--------------------+
Let’s check the schema of the bat2DF.
bat2DF.printSchema()
Output
root
|-- key: string (nullable = true)
|-- new_batter: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- type: string (nullable = true)
Now we can extract the individual elements from the “new_batter” struct. We can use a dot (“.”) operator to extract the individual element or we can use “*” with dot (“.”) operator to select all the elements.
bat2DF.select("key", "new_batter.*").show()
Output
+----+----+------------+
| key| id| type|
+----+----+------------+
|0001|1001| Regular|
|0001|1002| Chocolate|
|0001|1003| Blueberry|
+----+----+------------+
Now we have converted the JSON to structured DataFrame.
Let’s put together everything we discussed so far.
finalBatDF = (sampleDF
.select("key",
explode("batters.batter").alias("new_batter"))
.select("key", "new_batter.*")
.withColumnRenamed("id", "bat_id")
.withColumnRenamed("type", "bat_type"))
finalBatDF.show()
Output
+----+------+------------+
| key|bat_id| bat_type|
+----+------+------------+
|0001| 1001| Regular|
|0001| 1002| Chocolate|
|0001| 1003| Blueberry|
+----+------+------------+
Convert Nested “toppings” to Structured DataFrame
Let’s convert the “toppings” nested structure to a simple DataFrame. Here we use the techniques that we learned so far to extract elements from a Struct and an Array.
topDF = (sampleDF
.select("key", explode("topping").alias("new_topping"))
.select("key", "new_topping.*")
.withColumnRenamed("id", "top_id")
.withColumnRenamed("type", "top_type")
)
topDF.show(10, False)
Output
+----+------+------------------------+
|key |top_id|top_type |
+----+------+------------------------+
|0001|5001 |None |
|0001|5002 |Glazed |
|0001|5005 |Sugar |
|0001|5007 |Powdered Sugar |
|0001|5006 |Chocolate with Sprinkles|
|0001|5003 |Chocolate |
|0001|5004 |Maple |
+----+------+------------------------+
I hope you have enjoyed learning about working with JSON data in Apache Spark. For easy reference, a notebook containing the examples above is available on GitHub.
Apache Spark, Spark, Apache, the Apache feather logo, and the Apache Spark project logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.
Photo by Anna Sullivan on Unsplash