Prep for Databricks Exam 3b : DataFrameWriter

Lackshu Balasubramaniam
6 min readApr 12, 2020

--

As I walk through the Databricks exam prep for Apache Spark 2.4 with Python 3, I’m collating notes based on the knowledge expectation of the exam. This is a snapshot of my review of materials. It’s a work in progress.

This post includes the following:

DataFrameWriter (Only covering this, so it’s going to be a short post)

My references were:

Spark The Definitive Guide

Apache Spark 2.4 Docos

Spark In Action

Databricks website

Databricks Engineering blog

Learning Spark, 2nd Edition

Munging Data

Various other developer blogs that are fantastic. There are too many to be called out.

DataFrameWriter — Expectation of Knowledge

· Write data to the “core” data formats (csv, json, jdbc, orc, parquet, text and tables)

· Overwriting existing files

· How to configure options for specific formats

· How to write a data source to 1 single file or N separate files

· How to write partitioned data

· How to bucket data by a given set of columns

DataFrameWriter

Spark has the capability to write from a DataFrame into a variety of targets. The interface for writing from a DataFrame into a target is called pyspark.sql.DataFrameWriter, which is exposed as the write property of the DataFrame.

The general pattern for calling the DataFrameWriter is as follows:

DataFrameWriter.format(<format>)
.option(<key>,<value>).save(<path>)

DataFrameWriter is accessible through a DataFrame i.e. flightDf.write() when the name of the dataframe in this case is flightDf.

The methods of DataFrameWriter are:

Note: There are two options when naming the target, either using a filename with an extension to indicate the format or just a folder name. I prefer to specify folder rather than filename because I find it confusing to specify a filename when it’s a folder with underlying files.

Files can also be written to DBFS FileStore for downloading to a local location.

Writing Data in Parallel

The idea is to have the writing and reading of data operate as parallel as possible. This depends on the DataFrame and the write options. The number of folders created in the target folder (during writes) depend on the number of partitions in the DataFrame or write options. The write process creates one folder per partition with files in them (in the target folder).

There are a few options for optimal reads of the files generated.

Partitioning

Since partitioning allows for separation of folders and files by the partitioning column, it allows for quicker reading for downstream processing including filtering.

Bucketing

Bucketing allows for organizing files by buckets based on how data would be read and processed (if known ahead of time). This is a strategy to avoid shuffling when files are being read and processed.

Number of Files vs File Size

To avoid overgrown files which would cause a performance hit, it’s possible to specify maximum number of records per file i.e. maxRecordsPerFile option. To avoid small file problems where there are multiple small files to read and process, maxRecordsPerFile should be configured as an optimal number.

Common methods

Common methods across formats are:

Output Formats

CSV

CSV files are commonly used for smaller data sets.

Commonly used options for writing CSV files are:

Refer to documentation for more options that are available.

Sample call:

#readdfguide = “/databricks-datasets/definitive-guide/”flightData = spark.read.format(“csv”)\.option(“header”,”true”)\.option(“mode”,”FAILFAST”)\.option(“inferSchema”,”true”)\.load(dfguide + “/data/flight-data/csv/2010-summary.csv”)#writeflightData.write.format(“csv”).mode(“overwrite”)\.partitionBy(“ORIGIN_COUNTRY_NAME”)\.save(“/tmp/csv/2010-summary”)

Output:

A folder was created for every folder. Since the row counts are small each folder had one csv file and log files.

Sample call:

#writeflightData.write.format(“csv”).mode(“overwrite”).option(“sep”,”\t”)\.save(“/tmp/csv/2010-summary”)

Output:

A csv file was written to the folder with corresponding log files.

JSON

Commonly used options for writing JSON files are:

Refer to documentation for more options that are available.

Sample call:

#readfrom pyspark.sql.types import *dfguide = “/databricks-datasets/definitive-guide/”flightDataSchema = StructType([ StructField(“DEST_COUNTRY_NAME”, StringType(), True),StructField(“ORIGIN_COUNTRY_NAME”, StringType(), True),StructField(“count”, LongType(), True)])flightData = spark.read.format(“csv”)\.option(“header”,”true”)\.option(“mode”,”FAILFAST”)\.schema(flightDataSchema)\.load(dfguide + “/data/flight-data/csv/2010-summary.csv”)#writeflightData.write.format(“json”).mode(“append”).save(“/tmp/json/2010-summary.json”)

Output:

A json file was written to the folder with corresponding log files.

JDBC

Commonly used options for JDBC writes are:

Sample call:

#readdriver = “org.sqlite.JDBC”path = “/dbfs/databricks-datasets/definitive-guide/data/flight-data/jdbc/my-sqlite.db”url = “jdbc:sqlite:{0}”.format(path)tableName = “flight_info”flightDataDB = spark.read.format(“jdbc”)\.option(“url”,url)\.option(“dbtable”,tableName)\.option(“driver”, driver).load()#writedbutils.fs.mkdirs(“/tmp/jdbc/flight-data/”)props = {“driver”:”org.sqlite.JDBC”}path = “/dbfs/tmp/jdbc/flight-data/my-sqlite.db”url = “jdbc:sqlite:{0}”.format(path)tableName = “flight_info”flightDataDB.write.format(“jdbc”)\.option(“mode”,”overwrite”)\.option(“url”,url)\.option(“dbtable”,tableName)\.option(“properties”,props)\.save(path)

Output:

Database and table are created

Sample call:

#validate tablespark.read.jdbc(url, tableName, properties = props).count()

Output:

Table row count is returned

Out[17]: 255

ORC

The Optimized Row Columnar(ORC) format is a columnar file format with built-in schema that’s optimized for Hive. It’s optimized for large streaming reads and has integrated support for quick row seeks.

Commonly used options for writing ORC files are:

Sample call:

#readdfguide = “/databricks-datasets/definitive-guide/”flightData = spark.read.format(“orc”)\.load(dfguide + “/data/flight-data/orc/2010-summary.orc”)#writeflightData.write.orc(“/tmp/orc/2010-summary”, partitionBy = “DEST_COUNTRY_NAME”)

Output:

Creates a folder for each partition i.e. tmp/orc/2010-summary/DEST_COUNTRY_NAME=Australia/

A snappy orc file was written for each folder since the row count was small.

Parquet

Apache Parquet is a columnar file format that provides optimizations to speed up queries and is a lot more efficient than CSV or JSON. It is also the default file format and optimized for Spark. Parquet also supports storing complex types like array, map or struct.

Commonly used options for writing Parquet files are:

Sample call:

#readdfguide = “/databricks-datasets/definitive-guide/”flightData = spark.read.format(“parquet”)\.option(“mode”,”FAILFAST”)\.load(dfguide + “/data/flight-data/parquet/2010-summary.parquet”)#writeflightData.write.format(“parquet”).mode(“overwrite”)\.partitionBy(“ORIGIN_COUNTRY_NAME”)\.option(“maxRecordsPerFile”, 10)\.save(“/tmp/parquet/2010-summary”)

Output:

Creates a folder for each partition i.e. tmp/parquet/2010-summary/DEST_COUNTRY_NAME=Australia/

The folder “/tmp/parquet/2010-summary/ORIGIN_COUNTRY_NAME=United States/” had multiple snappy parquet files as the max number of records exceeded the 10 row count.

Text

Spark can also write plain text files. The limitation is the format only supports a single column of type string.

Commonly used options for plain text files are:

Sample Call:

#write concatenated rowsfrom pyspark.sql.functions import *flightData.select(concat(col(“DEST_COUNTRY_NAME”), lit(“,”), col(“ORIGIN_COUNTRY_NAME”),lit(“, “),col(“count”))).write.text(“/tmp/text/2010-summary”)

Output:

A text file and corresponding log files are created in the folder.

Sample Call:

#write a column
flightData.select(“DEST_COUNTRY_NAME”).distinct().write.text(“/tmp/text/dest_country”)

Output:

Multiple text files and corresponding log files are created in the folder

Table

Writes the specified DataFrame as a Hive table. As outlined earlier in the posts the options for writing to tables are saveAsTable or insertInto.

Sample Call:

#writebucketCount = 10flightData.write.format(“parquet”).mode(“overwrite”)\.bucketBy(bucketCount, “count”)\.sortBy(“DEST_COUNTRY_NAME”)\.mode(“overwrite”)\.saveAsTable(“flightDataBucketed”)

Output:

Data written to table in Hive default db.

Sample Call:

#verify tablespark.read.table(“flightDataBucketed”).show(10)

Output:

Coming next

DataFrame. This will be a longer post as there are more materials to cover.

--

--

Lackshu Balasubramaniam

I’m a data engineering bloke who’s into books. I primarily work on Azure and Databricks. My reading interest is mostly around psychology and economics.