Pyspark: How to Modify a Nested Struct Field

Alex Fragotsis
The Startup
Published in
3 min readAug 29, 2020

In our adventures trying to build a data lake, we are using dynamically generated spark cluster to ingest some data from MongoDB, our production database, to BigQuery. In order to do that, we use PySpark data frames and since mongo doesn’t have schemas, we try to infer the schema from the data.

collection_schema = spark.read.format(“mongo”) \ 
.option(“database”, db) \
.option(“collection”, coll) \
.option(‘sampleSize’, 50000) \
.load() \
.schema
ingest_df = spark.read.format(“mongo”) \
.option(“database”, db) \
.option(“collection”, coll) \ .load(schema=fix_spark_schema(collection_schema))

Our fix_spark_schema method just converts NullType columns to String.

In the users collection, we have the groups field, which is an array, because users can join multiple groups.

root
|-- groups: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- programs: struct (nullable = true)
| | | |-- **{ program id }**: struct (nullable = true)
| | | | |-- Date: timestamp (nullable = true)
| | | | |-- Name: string (nullable = true)
| | | | |-- Some_Flags: struct (nullable = true)
| | | | | |-- abc: boolean (nullable = true)
| | | | | |-- def: boolean (nullable = true)
| | | | | |-- ghi: boolean (nullable = true)
| | | | | |-- xyz: boolean (nullable = true)

Also, each different group has some different programs the users can join. So under the programs, we store a JSON with keys the program ids the user has joined and values some extra data about the date they joined etc. The data looks like this

“groups” : [
{
… some other fields …
“programs” : {
“123c12b123456c1d76a4f265f10f20a0” : {
“name” : “test_program_1”,
“some_flags” : {
“abc” : true,
“def” : true,
“ghi” : false,
“xyz” : true
},
“date” : ISODate(“2019–11–16T03:29:00.000+0000”)
}
}
]

As a result of the above, BigQuery creates a new column for each program_id and we end up with hundreds of columns, most of them empty for most of the users. So, how can we fix that? We can convert programs from a struct to string and store the whole json in there. That would create some extra friction if someone wants to access those fields, but it would make our columns much cleaner.

Attempt 1:

So, if the field wasn’t nested we could easily just cast it to string.

ingest_df = ingest_df.withColumn("groups.programs", col("groups.programs").cast('string'))

but since it’s nested this doesn’t work. The following command works only for root-level fields, so it could work if we wanted to convert the whole groups field, or move programs at the root level

ingest_df = ingest_df.withColumn("programs_json", col("groups.programs").cast('string'))

Attempt 2:

After a lot of research and many different tries. I realized that if we want to change the type, edit, rename, add or remove a nested field we need to modify the schema. The steps we have to follow are these:

  1. Iterate through the schema of the nested Struct and make the changes we want
  2. Create a JSON version of the root level field, in our case groups, and name it for example groups_json and drop groups
  3. Then convert the groups_json field to groups again using the modified schema we created in step 1.

If we know the schema and we’re sure that it’s not going to change, we could hardcode it but … we can do better. We can write (search on StackOverflow and modify) a dynamic function that would iterate through the whole schema and change the type of the field we want. The following method would convert the fields_to_change into Strings, but you can modify it to whatever you want

def change_nested_field_type(schema, fields_to_change, parent=""):
new_schema = []

if isinstance(schema, StringType):
return schema

for field in schema:
full_field_name = field.name

if parent:
full_field_name = parent + "." + full_field_name

if full_field_name not in fields_to_change:
if isinstance(field.dataType, StructType):
inner_schema = change_nested_field_type(field.dataType, fields_to_change, full_field_name)
new_schema.append(StructField(field.name, inner_schema))
elif isinstance(field.dataType, ArrayType):
inner_schema = change_nested_field_type(field.dataType.elementType, fields_to_change, full_field_name)
new_schema.append(StructField(field.name, ArrayType(inner_schema)))
else:
new_schema.append(StructField(field.name, field.dataType))
else:
# Here we change the field type to String
new_schema.append(StructField(field.name, StringType()))

return StructType(new_schema)

and now we can do the conversion like this:

new_schema = ArrayType(change_nested_field_type(df.schema["groups"].dataType.elementType, ["programs"]))
df = df.withColumn("groups_json", to_json("groups")).drop("groups")
df = df.withColumn("groups", from_json("groups_json", new_schema)).drop("groups_json")

and voila! groups.programs is converted to a string.

--

--