Handling different file formats with Pyspark

Sagar Gangurde
Data Engineering
Published in
3 min readMar 14, 2022

Spark support many file formats. In this article we are going to cover following file formats:

  • Text
  • CSV
  • JSON
  • Parquet
    Parquet is a columnar file format, which stores all the values for a given column across all rows together in a block. It has faster reads but slower writes. It supports compression. Columnar file formats are preferred choice, when downstream jobs processes only subset of columns.
  • ORC
    ORC (Optimised Row Columnar) is a columnar file format. It has faster reads but slower writes and also supports compression.
  • Sequence
    A SequenceFile is a flat file consisting of binary/value pairs. SequenceFile is splittable and also supports compression.
  • Avro
    Avro is a record based file format. It seamlessly supports record schema changes over time. It has faster writes compared to columnar file formats. Preferred choice, when downstream jobs processes the records in its entirety.

Why do we need different file formats?

Each file format is suitable for specific use-case. Using correct file format for given use-case will ensure that cluster resources are used optimally.

Handle different file format using Pyspark:

Let’s take a look at how we can write and read records in above mentioned file formats using Pyspark.

Launch pyspark prompt:
pyspark --packages org.apache.spark:spark-avro_2.11:2.4.4
>>> spark.version
'2.4.4'

Let’s create a sample ‘person’ dataframe and use `/user/hadoop/` directory present on local machine for all the read/write operations involving different file formats.

>>> person_df = spark.createDataFrame([(1, 'Joe', 'joe@gmail.com'), (2, 'Alex', 'alex@gmail.com'), (3, 'John', 'john@gmail.com')], ("id", "name", "email"))>>> person_df.show()
+---+----+--------------+
| id|name| email|
+---+----+--------------+
| 1| Joe| joe@gmail.com|
| 2|Alex|alex@gmail.com|
| 3|John|john@gmail.com|
+---+----+--------------+
  • Text
Generate csv file from sample ‘person’ dataframe’s concatenated column:
While writing the text file, each record needs to be a single string. So let’s concatenate the columns first.
>>> person_df.withColumn("concatenated", F.concat_ws(',', *fields)).select("concatenated").show(truncate=False)
+---------------------+
|concatenated |
+---------------------+
|1,Joe,joe@gmail.com |
|2,Alex,alex@gmail.com|
|3,John,john@gmail.com|
+---------------------+
>>> person_df.withColumn("concatenated", F.concat_ws(',', *fields)).select("concatenated").write.text("/user/hadoop/text/")
Read text file generated in above step:
>>> text_df = spark.read.text("/user/hadoop/text/")
Confirm the content:
>>> text_df.show(truncate=False)
+---------------------+
|value |
+---------------------+
|3,John,john@gmail.com|
|2,Alex,alex@gmail.com|
|1,Joe,joe@gmail.com |
+---------------------+
  • CSV
Generate csv file from sample 'person' dataframe
>>> person_df.write.option("header","true").csv("/user/hadoop/csv/")
Read csv file generated in above step:
>>> csv_df = spark.read.option("header","true").csv("/user/hadoop/csv/")
Confirm the content:
>>> csv_df.show()
+---+----+--------------+
|id |name|email |
+---+----+--------------+
|2 |Alex|alex@gmail.com|
|3 |John|john@gmail.com|
|1 |Joe |joe@gmail.com |
+---+----+--------------+
  • JSON
Generate json file from sample 'person' dataframe
>>> person_df.write.json("/user/hadoop/json/")
Read json file generated in above step:
>>> json_df = spark.read.json("/user/hadoop/json/")
Confirm the content:
>>> json_df.show()
+--------------+---+----+
|email |id |name|
+--------------+---+----+
|john@gmail.com|3 |John|
|alex@gmail.com|2 |Alex|
|joe@gmail.com |1 |Joe |
+--------------+---+----+
  • Parquet
Generate parquet file from sample 'person' dataframe
>>> person_df.write.parquet("/user/hadoop/parquet/")
Read parquet file generated in above step:
>>> parquet_df = spark.read.parquet("/user/hadoop/parquet/")
Confirm the content:
>>> parquet_df.show()
+---+----+--------------+
| id|name| email|
+---+----+--------------+
| 2|Alex|alex@gmail.com|
| 3|John|john@gmail.com|
| 1| Joe| joe@gmail.com|
+---+----+--------------+
  • ORC (Optimised Row Columnar)
Generate ORC file from sample 'person' dataframe
>>> person_df.write.orc("/user/hadoop/orc/")
Read ORC file generated in above step:
>>> orc_df = spark.read.orc("/user/hadoop/orc/")
Confirm the content:
>>> orc_df.show()
  • SequenceFile
Generate SequenceFile from sample 'person' dataframe:
As sequence file stores data in key/value pairs, so first let's prepare data in key/value format and then write.
>>> person_df.rdd.map(lambda x: (x[0], x[1])).saveAsSequenceFile("/user/hadoop/sequence_file/")
Read SequenceFile generated in above step:
>>> sequence_df = spark.sparkContext.sequenceFile("/user/hadoop/sequence_file/").toDF(["id", "name"])
Confirm the content:
>>> sequence_df.show()
+---+----+
| id|name|
+---+----+
| 2|Alex|
| 3|John|
| 1| Joe|
+---+----+
  • Avro
Generate Avro file from sample 'person' dataframe:
>>> person_df.write.format("avro").save("/user/hadoop/avro/")
Read Avro file generated in above step:
>>> avro_df = spark.read.format("avro").load("/user/hadoop/avro/")
>>> avro_df.show()
+---+----+--------------+
| id|name| email|
+---+----+--------------+
| 3|John|john@gmail.com|
| 2|Alex|alex@gmail.com|
| 1| Joe| joe@gmail.com|
+---+----+--------------+

Conclusion

We need to chose the correct file format for the given use-case to make optimal use of cluster resources. While making this decision, we need to consider following factors:

  • File format’s read/write performance
  • File format’s support for compression
  • Whether file format is splittable
  • How downstream job is going to consume the data e.g. process entire record or only process subset of columns
  • Compression needs

--

--