Validate File Against BigQuery Schema
Recently I faced with a requirement where incoming data files need to be validated against the table schemas in BigQuery. This aims to optimize the data pipeline, any file which does not conform the schema would not get into the preprocessing pipelines hence will save costs both as time and money.
Data files can be in a range of different formats and the target environment is always BigQuery table. We decided to use Spark to cover wide range of incoming file formats.
All the sample files and scripts can be found in the repository under file-validation
directory.
Algorithm
The step-by-step process of validating data files can be described as below
- Export schema from BigQuery
- Parse BQSchema and create a Spark StructType
- Create a Spark DataFrame using StructType as schema
- Try to read data file into the DataFrame
If the step 4 fails, it means that the data is not conforming the requirements of schema,
Exporting schema from BigQuery
The script is reading BQSchema from a file instead of connecting and querying the BigQuery API to have less binding. You can of course change this behaviour to fulfil the requirements of your project.
I used following bq
command to export schema into a file.
bq show --format prettyjson <project>:<dataset>.<table> | jq '.schema.fields' > schema.txt
The output of the command is a JSON file as seen below. Our sample script is created to process this schema.
Converting BQSchema into Spark StructType
Last validation step is trying to read data file into a Spark DataFrame with an assigned schema. To assign a BQSchema to an DataFrame we first need to convert existing BQSchema into a StructType. I created a helper method to complete this conversion, which can be seen below.
def convert_bqschema_to_structtype(bq_schema):
'''
' Converts exported BQ Schema to Spark StructType schema
' Schema has been exported from BigQuery by following command
' `bq show --format prettyjson <project>:<dataset>.<table> | jq '.schema.fields'`
'''
new_schema = StructType()
for row in bq_schema:
if row['type'] == 'INTEGER':
new_schema.add(row['name'], IntegerType(), True if row['mode'] == 'NULLABLE' else False)
elif row['type'] == 'STRING':
new_schema.add(row['name'], StringType(), True if row['mode'] == 'NULLABLE' else False)
elif row['type'] == 'DATETIME':
new_schema.add(row['name'], TimestampType(), True if row['mode'] == 'NULLABLE' else False)
elif row['type'] == 'TIME':
new_schema.add(row['name'], TimestampType(), True if row['mode'] == 'NULLABLE' else False)
elif row['type'] == 'DATE':
new_schema.add(row['name'], DateType(), True if row['mode'] == 'NULLABLE' else False)
elif row['type'] == 'TIMESTAMP':
new_schema.add(row['name'], TimestampType(), True if row['mode'] == 'NULLABLE' else False)
elif row['type'] == 'BOOLEAN':
new_schema.add(row['name'], BooleanType(), True if row['mode'] == 'NULLABLE' else False)
elif row['type'] == 'BIGNUMERIC':
new_schema.add(row['name'], LongType(), True if row['mode'] == 'NULLABLE' else False)
elif row['type'] == 'NUMERIC':
new_schema.add(row['name'], DecimalType(), True if row['mode'] == 'NULLABLE' else False)
elif row['type'] == 'FLOAT':
new_schema.add(row['name'], FloatType(), True if row['mode'] == 'NULLABLE' else False)
elif row['type'] == 'BYTES':
new_schema.add(row['name'], ByteType(), True if row['mode'] == 'NULLABLE' else False)
return new_schema
There was no automatic conversion of BQSchema unfortunately, so I had to create a mapping by myself. And using the BigQuery and Spark documentations created a mapping.
Create a Spark DataFrame and read data into
Spark documentation defines DataFrames as
A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.
By leveraging the DataFrame’s named columns and structured dataset property, data in the file can be tested against the schema. You can see how we did this in the code sample below.
try:
df = ss.read.format(data_file_format).schema(converted_schema).option("header","true").option("mode","FAILFAST").load(data_file)
df.printSchema()
df.show()
except Exception as e:
print("Schema could not be validated")
exit(1)
Demo
I created the code to run both on your local computer and also on Google Cloud Platform. It requires three arguments to run the script. First argument is the environment and can be local
or gcp
, the second is the data file and the last one is the BQSchema file.
With the valid data file running the command python3 file-validation/src/validate_file.py local file-validation/util/correct.csv file-validation/util.schema.txt
and get the output as seen in the screenshot. Script runs a df.show()
as an Spark Action to overcome lazy evaluation. Without an Action script does not work.
With the invalid data file running the command python3 file-validation/src/validate_file.py local file-validation/util/corrupted.csv file-validation/util.schema.txt
and get the output as seen in the screenshot.
On Google Cloud Platform, we use the Dataproc cluster to run the Spark job. You can modify the code to run on Serverless Dataproc, or trigger jobs via Composer.
When you are submitting the job, you should select PySpark
as Job Type and all files; Spark script, data file and schema file should be in Google Cloud Storage buckets. And the results can be found in the logs under the job details.
When job reads a corrupted or invalid file, it raises an exception and job completes in Failed
state, if the file is valid then the job status is Succeeded
. As you can see in the Job list.
Conclusion
In this post, we have seen how you can validate a data file against a BQSchema using Apache Spark. Utilizing a similar approach, you can discard corrupted files early in the data pipeline and achieve cost saving.
If you find this post helpful please feel free to like, share or comment your thoughts. Thanks for reading.