Prep for Databricks Exam 3a : DataFrameReader

Lackshu Balasubramaniam
9 min readMar 18, 2020

--

As I walk through the Databricks exam prep for Apache Spark 2.4 with Python 3, I’m collating notes based on the knowledge expectation of the exam. This is a snapshot of my review of materials. It’s a work in progress.

This post includes the following:

DataFrameReader (Only covering the reader so that I can go into it in more depth)

My references were:

Spark The Definitive Guide

Apache Spark 2.4 Docos

Spark In Action

Databricks website

Databricks Engineering blog

Various developer blogs that are fantastic. There are too many to be called out.

DataFrameReader — Expectation of Knowledge

  • Read data for the “core” data formats (CSV, JSON, JDBC, ORC, Parquet, text and tables)
  • How to configure options for specific formats
  • How to read data from non-core formats using format() and load()
  • How to specify a DDL-formatted schema
  • How to construct and specify a schema using the StructType classes

DataFrameReader

Spark has the capability to read from a variety of sources into a DataFrame. The interface for reading from a source into a DataFrame is called pyspark.sql.DataFrameReader.

For more details on DataFrameReader operations please refer to:

· Databricks documentation on Data Sources

· PySpark API documentation on DataFrameReader

The general pattern for calling the DataFrameReader is as follows:

DataFrameReader.format(<format>)
.option(<key>,<value>).schema(<schema>).load(<path>)

DataFrameReader is accessible through the SparkSession i.e. spark.read()

The parameters supplied to DataFrameReader are:

Table 1: DataFrameReader Options

Note:

For the case of read mode:

PERMISSIVE — all fields in a corrupted record are nullified. A column called _corrupt_record stores the corrupted record. The column name for corrupt record can be configured through columnNameOfCorruptRecord option. Additionally, nulls are inserted for missing fields in the record and extra fields are ignored. If a schema is specified, the schema must include the corrupt record column, otherwise the corrupt record column would be dropped during parsing.

DROPMALFOMED — drop rows with issues i.e. corrupted records.

FAILFAST — fails and throws RuntimeException when encountering a row with issues.

CSV

CSV files are one of the more common source files and have many options due to the different formats the source files can come in.

Commonly used options for CSV are as follows:

Table 2: CSV Options
Table 3: CSV Formatting Options

Refer to documentation for more options that are available.

Sample calls:

In the example below we are attempting to load a csv file with headers and inferring a schema by parsing half the rows in the file. Please note that the options must be specified before the csv(path) call if we use the pattern below.

dfguide = "/databricks-datasets/definitive-guide/"flightData = spark.read.option("header","true")\
.option("mode","FAILFAST")\
.option("inferSchema","true")\
.option("samplingRatio",0.5)\
.csv(dfguide + "/data/flight-data/csv/2010-summary.csv")

Output:

flightData:pyspark.sql.dataframe.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more fields]

Next we display the inferred schema

flightData.printSchema()

Output:

root | — DEST_COUNTRY_NAME: string (nullable = true) | — ORIGIN_COUNTRY_NAME: string (nullable = true) | — count: integer (nullable = true)

— — —

We can also supply a schema in the form of StructType when loading the file as per example below.

from pyspark.sql.types import *diamondSchema = StructType([ 
StructField("_c0" ,IntegerType() ,True),
StructField("carat" ,DoubleType() ,True),
StructField("cut" ,StringType() ,True),
StructField("color" ,StringType() ,True),
StructField("clarity" ,StringType() ,True),
StructField("depth" ,DoubleType() ,True),
StructField("table" ,DoubleType() ,True),
StructField("price" ,IntegerType() ,True),
StructField("x" ,DoubleType() ,True),
StructField("y" ,DoubleType() ,True),
StructField("z" ,DoubleType() ,True)])
diamondsWithSchema = spark.read.format("csv")\
.option("header", "true")\
.schema(diamondSchema)\
.load("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv")

Output:

diamondsWithSchema:pyspark.sql.dataframe.DataFrame = [_c0: integer, carat: double … 9 more fields]

Next we display the contents of the DataFrame

display(diamondsWithSchema)

Output:

Shows the first 1000 rows of the DataFrame

— — —

We can also supply a schema in the form of a DDL formatted string when loading the file as per example below. Note the csv pattern and the supplying of options in the same call as a shorthand.

from pyspark.sql.types import *dfguide = “/databricks-datasets/definitive-guide/”flightDataSchema = “DEST_COUNTRY_NAME String, ORIGIN_COUNTRY_NAME String, count Long”flightData = spark.read.csv(dfguide + “/data/flight-data/csv/2010-summary.csv”,\
header=True,\
mode=”FAILFAST”,\
schema=flightDataSchema)
display(flightData)

Output:

Shows the first 1000 rows of the DataFrame

— — —

We can also supply a schema and fail the load if the data in the file doesn’t match the schema as per example below. We have set all columns to LongType which would fail the load for the case below.

from pyspark.sql.types import *dfguide = "/databricks-datasets/definitive-guide/"flightDataSchema = StructType([ 
StructField("DEST_COUNTRY_NAME", LongType(), True),
StructField("ORIGIN_COUNTRY_NAME", LongType(), True),
StructField("count", LongType(), True)])
spark.read.format("csv")\
.option("header","true")\
.option("mode","FAILFAST")\
.schema(flightDataSchema)\
.load(dfguide + "/data/flight-data/csv/2010-summary.csv").show(5)

Output:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): com.databricks.sql.io.FileReadException: Error while reading file dbfs:/databricks-datasets/definitive-guide/data/flight-data/csv/2010-summary.csv.

JSON

JSON is a popular format as it’s the format returned when making REST API calls.

There are two approaches to reading a JSON file:

  • Single-line mode : There is one JSON object per line.
  • Multi-line mode : If a JSON object occupies multiple lines, you must enable multi-line mode for Spark to load the file(s). The files will be loaded as an entity and cannot be split.

It is advisable to use line-delimited JSON as opposed to having large JSON object or array in a file

Commonly used options for JSON are:

Table 4: JSON Options

Sample calls:

In the example below we are reading a JSON file and inferring the schema based on the JSON schema within the file.

iotDevices = spark.read.json("databricks-datasets/iot/iot_devices.json")
iotDevices.printSchema()

Output:

df:pyspark.sql.dataframe.DataFrame = [battery_level: long, c02_level: long … 13 more fields]

root | — battery_level: long (nullable = true) | — c02_level: long (nullable = true) | — cca2: string (nullable = true) | — cca3: string (nullable = true) | — cn: string (nullable = true) | — device_id: long (nullable = true) | — device_name: string (nullable = true) | — humidity: long (nullable = true) | — ip: string (nullable = true) | — latitude: double (nullable = true) | — lcd: string (nullable = true) | — longitude: double (nullable = true) | — scale: string (nullable = true) | — temp: long (nullable = true) | — timestamp: long (nullable = true)

In the example below we are reading a Json file based on a schema .

from pyspark.sql.types import *dfguide = "/databricks-datasets/definitive-guide/"flightDataSchema = StructType([ 
StructField("DEST_COUNTRY_NAME", StringType(), True),
StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
StructField("count", LongType(), True)])
flighDataJson = spark.read.format("json")\
.option("mode","FAILFAST")\
.schema(flightDataSchema)\
.load(dfguide + "/data/flight-data/json/2010-summary.json")

Output:

flighDataJson:pyspark.sql.dataframe.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string … 1 more fields]

In the example below a file contains multi-line entries in addition to single row entries.

filePath = "/FileStore/tables/multi.json"multiJson = spark.read.option("multiLine", True)\
.json(filePath)

display(multiJson)

Output:

Displays objects as rows

JDBC

https://spark.apache.org/docs/2.4.0/sql-data-sources-jdbc.html

Databricks to query many SQL databases using JDBC drivers.

Commonly used options for JDBC are:

Table 5: JDBC Options

Sample call:

In the example below the data is being read from a SQLite database in the sample data. There’s no concept of user and password in this instance, however, most databases will require user-password or equivalent for authentication.

driver = “org.sqlite.JDBC”
path = “/dbfs/databricks-datasets/definitive-guide/data/flight-data/jdbc/my-sqlite.db”
url = “jdbc:sqlite:{0}”.format(path)
tableName = “flight_info”
flightDataDB = spark.read.format(“jdbc”)\
.option(“url”,url)\
.option(“dbtable”,tableName)\
.option(“driver”, driver).load()
display(flightDataDB)

Output:

Rows from the table were returned.

The examples below are for pushing down a query to the database engine. They are interchangeable, however, I prefer pattern 1 because it’s obvious syntax wise.

Pattern 1:

driver = "org.sqlite.JDBC"
path = "/dbfs/databricks-datasets/definitive-guide/data/flight-data/jdbc/my-sqlite.db"
url = "jdbc:sqlite:{0}".format(path)
query = "select * from flight_info where DEST_COUNTRY_NAME = 'United States'"
flightDataDB = spark.read.format("jdbc")\
.option("url",url)\
.option("query",query)\
.option("driver", driver).load()
display(flightDataDB)

Pattern 2:

driver = “org.sqlite.JDBC”
path = “/dbfs/databricks-datasets/definitive-guide/data/flight-data/jdbc/my-sqlite.db”
url = “jdbc:sqlite:{0}”.format(path)
tableName = “(select * from flight_info where DEST_COUNTRY_NAME = ‘United States’)”
flightDataDB = spark.read.format(“jdbc”)\
.option(“url”,url)\
.option(“dbtable”,tableName)\
.option(“driver”, driver).load()

display(flightDataDB)

Output:

Rows were returned with DEST_COUNTRY_NAME being United States

The example below is for reading from JDBC connections across multiple workers by leveraging partitions.

df = spark.read.jdbc(url=jdbcUrl, table=”employees”, column=”emp_no”, lowerBound=1, upperBound=150000, numPartitions=100)
display(df)

ORC

The Optimized Row Columnar(ORC) format is a columnar file format with built-in schema that’s optimized for Hive. It’s optimized for large streaming reads and has integrated support for quick row seeks. There are no options for reading ORC files as the format is well known to Spark.

Sample call:

The example below read an ORC file into a DataFrame

dfguide = "/databricks-datasets/definitive-guide/"flightData = spark.read.format("orc")\
.load(dfguide + "/data/flight-data/orc/2010-summary.orc")
flightData.show(10)

Output:

The first 10 rows of the dataframe were shown.

Parquet

Apache Parquet is a columnar file format that provides optimizations to speed up queries and is a lot more efficient than CSV or JSON. It is also the default file format and optimized for Spark. Parquet also supports storing complex types like array, map or struct. Parquet enforces its own schema when storing data, thus the options for reading Parquet files are limited.

Option:

Table 6: Parquet Option

Sample call:

In the example below we are reading a Parquet file and retrieving the schema from the file.

dfguide = “/databricks-datasets/definitive-guide/”flightData = spark.read.format(“parquet”)\
.option(“mode”,”FAILFAST”)\
.load(dfguide + “/data/flight-data/parquet/2010-summary.parquet”)
flightData.dtypes

Output:

flightData:pyspark.sql.dataframe.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string … 1 more fields]

Out[24]: [(‘DEST_COUNTRY_NAME’, ‘string’), (‘ORIGIN_COUNTRY_NAME’, ‘string’), (‘count’, ‘bigint’)]

Text

Spark can also read plain text files. Each row in the file is a record in the resulting DataFrame. The DataFrame will have a string column named “value”, followed by partitioned columns if there are any. Partitioned columns are setup during the writing of the file.

Commonly used options for plain text files are:

Table 7: Text Options

Sample Calls:

The example below reads the file as wholetext.

readmeText = spark.read.text(“/databricks-datasets/README.md”, wholetext=True)
display(readmeText)

Output:

The example below reads a text file on a row by row basis.

dfguide = "/databricks-datasets/definitive-guide/"movieDataText = spark.read.text(dfguide + "data/sample_movielens_ratings.txt")
display(movieDataText)

Output:

Displays the first 1000 rows in the file.

The example below reads a text file and splits the rows into arrays for easy processing.

dfguide = “/databricks-datasets/definitive-guide/”
movieDataRows = spark.read.text(dfguide + “data/sample_movielens_ratings.txt”)\
.selectExpr(“split(value,’::’) as rows”)
movieDataRows.show()

Output:

Table

Returns the specified Hive table as a DataFrame.

Option:

Table 8: Table Options

Sample Call:

The example below reads from parquet into a Hive table and then reads from the table.

dfguide = “/databricks-datasets/definitive-guide/”flightData = spark.read.format(“parquet”)\
.option(“mode”,”FAILFAST”)\
.load(dfguide + “/data/flight-data/parquet/2010-summary.parquet”)
flightData.createOrReplaceTempView(“Flight2010”)
spark.read.table(“Flight2010”).show(10)

Output:

Format

Read data from non-core formats using format() and load(). This pattern tends to be used interchangeably with the known formats because the syntax is more intuitive to read.

Options:

Table 9: Format Options

Sample Call:

There are examples for known formats earlier in the article which make use of the format and load pattern.

schema

Some data sources e.g. JSON can infer the input schema automatically from data. By supplying the schema here, the underlying data source can skip the schema inference step, and thus speed up data loading. Examples for the StructType classes and DDL formatted string are in the sample calls for CSV format.

parallelism

Multiple executors cannot read from one file. However when reading from a folder containing multiple files, the executors can read them in parallel. This means each file will be a partition in your resulting dataframe.

Coming next

DataFrameWriter

DataFrame

--

--

Lackshu Balasubramaniam

I’m a data engineering bloke who’s into books. I primarily work on Azure and Databricks. My reading interest is mostly around psychology and economics.