PySpark DataFrame API: CSV File Handling, Examples and Explanation
Exploring PySpark Data Processing: Reading and Writing CSV Files
Intro
In PySpark, a data source API is a set of interfaces and classes that allow developers to read and write data from various data sources such as HDFS, HBase, Cassandra, JSON, CSV, and Parquet. The data source API provides a consistent interface for accessing and manipulating data, regardless of the underlying data format or storage system.
CSV DataFrame Reader
The data source API is used in PySpark by creating a DataFrameReader or DataFrameWriter object and using it to read or write data from or to a specific data source. For example, to read data from a CSV file, you can create a DataFrameReader object and specify the path to the CSV file using the “csv” method. Then, you can use the resulting DataFrame object to perform various operations such as filtering, grouping, and aggregation.
Read CSV
There are various ways to read CSV files using PySpark. Here are a few examples:
- Using
spark.read.csv
method:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Read CSV").getOrCreate()
df = spark.read.csv("path/to/csv/file.csv", header=True, inferSchema=True)
In this example, we first create a SparkSession object, then we use the
spark.read.csv
method to read the CSV file located at "path/to/csv/file.csv". We also specify some options such asheader=True
to indicate that the first row of the CSV file contains column names, andinferSchema=True
to automatically infer the data types of the columns. The resulting DataFrame object will have the same column names and data types as the CSV file.
2. Using spark.read.format
method:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Read CSV").getOrCreate()
df = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("path/to/csv/file.csv")
In this example, we use the
spark.read.format
method to specify the format of the file we want to read, in this case "csv". We also use the.option
method to specify the optionsheader
andinferSchema
, which have the same meaning as in the previous example. Finally, we use the.load
method to load the CSV file.
Sample Data
Here’s an example of sample data that we can use to test the above code snippets:
This CSV file contains three columns — name
, age
, and city
- and three rows of data. The first row contains the column names.
name,age,city
John,25,New York
Alice,30,San Francisco
Bob,35,Boston
When we use one of the above methods to read this CSV file, we will get a DataFrame object that looks like this:
+-----+---+-------------+
| name|age| city|
+-----+---+-------------+
| John| 25| New York|
|Alice| 30|San Francisco|
| Bob| 35| Boston|
+-----+---+-------------+
Some of the common parameters that can be used while reading a CSV file using PySpark are:
path
: The path to the CSV file.header
: A boolean value indicating whether the first row of the CSV file contains column names.inferSchema
: A boolean value indicating whether to automatically infer the data types of the columns.sep
: The delimiter used to separate fields in the CSV file (default is comma).
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Read CSV").getOrCreate()
df = spark.read.csv("path/to/csv/file.csv", header=True, inferSchema=True, sep=';')
In this example, we use the
sep
parameter to specify the delimiter used to separate fields in the CSV file. In this case, the delimiter is a semicolon instead of the default comma.
quote
: The character used to quote fields in the CSV file (default is double quote).
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Read CSV").getOrCreate()
df = spark.read.csv("path/to/csv/file.csv", header=True, inferSchema=True, quote='"')
In this example, we use the
quote
parameter to specify the character used to quote fields in the CSV file. In this case, the quote character is a double quote instead of the default double quote.
escape
: The character used to escape special characters in the CSV file (default is backslash).
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Read CSV").getOrCreate()
df = spark.read.csv("path/to/csv/file.csv", header=True, inferSchema=True, escape='\\')
In this example, we use the
escape
parameter to specify the character used to escape special characters in the CSV file. In this case, the escape character is a backslash instead of the default backslash.
nullValue
: The string used to represent null values in the CSV file (default is empty string).
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Read CSV").getOrCreate()
df = spark.read.csv("path/to/csv/file.csv", header=True, inferSchema=True, nullValue='NA')
In this example, we use the
nullValue
parameter to specify the string used to represent null values in the CSV file. In this case, the null value is represented by the string "NA" instead of the default empty string.
nanValue
: The string used to represent NaN values in the CSV file (default is "NaN").mode
: The mode of parsing the CSV file (default is "PERMISSIVE"). Other options are "DROPMALFORMED" and "FAILFAST".
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Read CSV").getOrCreate()
df = spark.read.csv("path/to/csv/file.csv", header=True, inferSchema=True, mode='DROPMALFORMED')
In this example, we use the
mode
parameter to specify the mode of parsing the CSV file. In this case, we use the "DROPMALFORMED" mode, which drops the rows that contain malformed data instead of failing the whole read operation. Other options for themode
parameter are "PERMISSIVE" (default) and "FAILFAST".
Note that these examples are not exhaustive and there are many more parameters that can be used while reading CSV files in PySpark. You can refer to the PySpark documentation for a complete list of parameters and their descriptions.
Write CSV
There are various ways in which we can write to CSV files using PySpark. Here are some examples:
1. Using write
method:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Write CSV").getOrCreate()
df = spark.read.csv("path/to/csv/file.csv", header=True, inferSchema=True)
df.write.csv("path/to/csv/output/file.csv", header=True, sep=',')
In this example, we read a CSV file using PySpark and then write it back to another CSV file using the
write
method. We use theheader
parameter to include the column names in the output file, and thesep
parameter to specify the delimiter used to separate fields in the output file.
2. Using option
method:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Write CSV").getOrCreate()
df = spark.read.csv("path/to/csv/file.csv", header=True, inferSchema=True)
df.write.option("header", True).option("sep", ",").csv("path/to/csv/output/file.csv")
In this example, we use the
option
method to specify the output options for the CSV file. We use theheader
option to include the column names in the output file, and thesep
option to specify the delimiter used to separate fields in the output file.
3. Using format
method:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Write CSV").getOrCreate()
df = spark.read.csv("path/to/csv/file.csv", header=True, inferSchema=True)
df.write.format("csv").option("header", True).option("sep", ",").save("path/to/csv/output/file.csv")
In this example, we use the
format
method to specify the output file format. We use thecsv
format to write the data as a CSV file. We also use theoption
method to specify the output options for the CSV file.
Now, let’s take a look at some of the commonly used parameters while writing a CSV file using PySpark:
header
: This parameter is used to include or exclude the header row in the output file. It takes a boolean value, withTrue
indicating that the header row should be included in the output file andFalse
indicating that it should be excluded.sep
: This parameter is used to specify the delimiter used to separate fields in the output file. It takes a string value, with the default value being a comma (,
).quote
: This parameter is used to specify the character used to quote fields in the output file. It takes a string value, with the default value being a double quote ("
).escape
: This parameter is used to specify the character used to escape special characters in the output file. It takes a string value, with the default value being a backslash (\
).nullValue
: This parameter is used to specify the string used to represent null values in the output file. It takes a string value, with the default value being an empty string (""
).
Here is an example of how to use some of these parameters:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Write CSV").getOrCreate()
df = spark.read.csv("path/to/csv/file.csv", header=True, inferSchema=True)
df.write.csv("path/to/csv/output/file.csv", header=True, sep=';', quote='"', nullValue='NULL')
Note that these examples are not exhaustive and there are many more parameters that can be used while writing CSV files in PySpark. You can refer to the PySpark documentation for a complete list of parameters and their descriptions.
Conclusion
PySpark provides several ways to read and write data to CSV files, and each method has its own advantages and limitations. While the write.csv()
method is the simplest way to write data to a CSV file, it is not the most efficient for large datasets. read.csv()
is the most optimal way to read the CSV files in PySpark.
This article will be a part of DataFrame API series. Do Check out my articles on PySpark SQL, Built-in Functions and others. Happy Reading.