How to handle bad records/Corrupt records in Apache Spark

Amit Kumar
3 min readAug 23, 2020

--

Hi Everyone,

In this article I will talk about how you can handle bad records/corrupt records in Apache spark. In most of the ETL jobs we add one of the step where we are handling this bad/corrupt records.

Let’s look at what we have in our source file

%fs head /FileStore/tables/EmployeeInfo.csv

You can see, here we have 5 records out of which one record is bad record say ID5, where Salary is Thirteen hundred.

Now, you want to handle this ID5, and in order to handle this record, three things will come into your mind.

  1. Let’s load only the correct records and also capture the corrupt/bad record in some folder.
  2. Ignore the corrupt/bad record and load only the correct records.
  3. Don’t load anything from source, throw an exception when it encounter first corrupt/bad record

Now, try to load the correct record and capture the corrupted data in some folder. To do so, You need to set PERMISSIVE mode.

Observe clearly, for incorrect record entry say Salary column contain String value instead of Integer value so it store this value as null.

val EmployeeInfoDF = spark.read
.schema("ID Integer,Name String,DOJ Date,Salary Integer")
.option("header",true)
.option("mode", "
PERMISSIVE")
.option("dateformat","dd.MM.yyyy")
.csv("/FileStore/tables/EmployeeInfo.csv")
display(EmployeeInfoDF)

See, ID5 data is misleading for us, as we all know Salary column data was corrupt so Apache Spark put this as null. But If some other business guys are looking at this dataframe they will assume Salary for ID5 is not assigned and hence it is populated as null.

In order to fix this, we will only show the correct record and capture the corrupted/bad record for further analysis or data fixing.

val EmployeeInfoDF = spark.read
.schema("ID Integer,Name String,DOJ Date,Salary Integer")
.option("header",true)
.
option("badRecordsPath","/tmp/badRecordsPath")
.option("dateformat","dd.MM.yyyy")
.csv("/FileStore/tables/EmployeeInfo.csv")

If you look at the data, you will observe we haven’t included ID5 in the dataframe and also capture the corrupt record in tmp/badRecordsPath folder.

%fs head /tmp/badRecordsPath/20200823T171620/bad_records/part-00000-2de8bdda-c687-4531-9500-799a86d04beb

Suppose you don’t want to load ID5 in your final table/file and you are not interested to capture the corrupted record in that case you can use DROPMALFORMED mode.

val EmployeeInfoDF = spark.read
.schema("ID Integer,Name String,DOJ Date,Salary Integer")
.option("header",true)
.option("mode", "
DROPMALFORMED")
.option("dateformat","dd.MM.yyyy")
.csv("/FileStore/tables/EmployeeInfo.csv")
display(EmployeeInfoDF)

Sometimes, you even don’t want to load data from file if it encounnters any corrupt record in the file. In that case you can use FAILFAST mode

val EmployeeInfoDF = spark.read
.schema("ID Integer,Name String,DOJ Date,Salary Integer")
.option("header",true)
.option("mode", "
FAILFAST")
.option("dateformat","dd.MM.yyyy")
.csv("/FileStore/tables/EmployeeInfo.csv")
display(EmployeeInfoDF)

Thanks for reading this article

Happy Learning

--

--

Amit Kumar

Data Engineer | Azure SQL | Azure Data Factory | Azure Data Lake | Azure Analysis Service | Azure Databricks | PySpark | Azure ML | Power BI | Snowflake