PySpark DataFrame : An Overview

Lackshu Balasubramaniam
18 min readAug 23, 2020

--

I started out my series of articles as an exam prep for Databricks, specifically Apache Spark 2.4 with Python 3 exam. These have now transformed into general notes for learning Databricks and reference when writing code.

This post covers DataFrame and attempts to cover it at breadth more than depth. The rationale is to give a wider perspective of what is possible with DataFrames.

My references were:

Spark The Definitive Guide

Sample Code from Spark The Definitive Guide

Apache Spark 2.4 Docos

Spark In Action

Databricks website

Databricks Engineering blog

Jace Klaskowski’s amazing Spark SQL online book

Various developer blogs that are fantastic and stack overflow. There are too many to be called out.

DataFrame — Expectation of Knowledge

  • Have a working understanding of every action such as take(), collect(), and foreach()
  • Have a working understanding of the various transformations and how they work such as producing a distinct set, filtering data, repartitioning and coalescing, performing joins and unions as well as producing aggregates
  • Know how to cache data, specifically to disk, memory or both
  • Know how to uncache previously cached data
  • Converting a DataFrame to a global or temp view.
  • Applying hints

DataFrames

DataFrame is the key data structure for working with data in PySpark. They abstract out RDDs (which is the building block) and simplify writing code for data transformations. Essentially DataFrames are table-like data structures (with known schemas) containing a collection of records distributed across executors.

Schema determines the column names and their types in the DataFrame. Also records in the distributed table are represented as Rows. DataFrames can be constructed from a multitude of sources such as files, Hive tables, external databases, or RDDs.

DataFrame is implemented in the class pyspark.sql.DataFrame. I’ll walk through the methods of the class by functional areas followed by properties.

Methods

I’ve broken down methods into the following areas based on how I tend to use them:

  • general
  • loop
  • data preparation
  • Hive
  • SQL
  • statistical
  • execution
  • actions.

General

alias(alias)

Returns a new copy of the DataFrame with the specified alias as name.

Sample Call:

In the example below I’ve create two aliases and inner joined them over two columns. One of the use cases for alias is self-joins like below. To make it simpler you could just create one alias and self-join to the existing dataframe.

from pyspark.sql.functions import *dfguide = “/databricks-datasets/definitive-guide/”flightData = spark.read.format(“parquet”)\
.option(“mode”,”FAILFAST”)\
.load(dfguide + “/data/flight-data/parquet/2010-summary.parquet”)
df_as1 = flightData.alias(“df_as1”)df_as2 =flightData.alias(“df_as2”)joined_df = df_as1.join(df_as2, (col(“df_as1.DEST_COUNTRY_NAME”) == col(“df_as2.ORIGIN_COUNTRY_NAME”)) & (col(“df_as2.DEST_COUNTRY_NAME”) == col(“df_as1.ORIGIN_COUNTRY_NAME”)), ‘inner’)joined_df.select(“df_as1.ORIGIN_COUNTRY_NAME”, “df_as1.DEST_COUNTRY_NAME”, “df_as1.count”, “df_as2.ORIGIN_COUNTRY_NAME”, “df_as2.DEST_COUNTRY_NAME”, “df_as2.count”).show(5)

Output:

colRegex(colName)

Selects column(s) from DataFrame based on specified regex of the column name(s) and returns as DataFrame. Useful when the column names to be pulled out adhere to a known pattern.

Parameter : colName represents a regex of column names.

Sample Call:

flightData.select(flightData.colRegex("`^.*COUNTRY.*`")).show()

Output:

printSchema()

Prints out the schema of the DataFrame in tree format. This method is generally used for eyeballing dataframe schema after the dataframe is loaded from file(s) with inferred schema option.

Sample Call:

flightData.printSchema()

Output:

Loop

foreach(f)

Applies a function f to all Rows of a DataFrame. This method is a shorthand for df.rdd.foreach() which allows for iterating through Rows.

I typically use this method when I need to iterate through rows in a DataFrame and apply some operation on each row. For example inspecting string values and do string replacement or string formatting.

There are performance implications around this method. It’s best to not use this method for a large number of rows.

The example below is simplified for brevity.

Sample Call:

def f(flightdata):
print(flightdata)
flightData.foreach(f)

Output:

In this instance print outputs each row of the dataframe to the cluster log. They don’t show up in the command output.

foreachPartition(f)

Applies a function f to each partition of a DataFrame rather than each row. This method is a shorthand for df.rdd.foreachPartition() which allows for iterating through Rows in each partition.

Data Preparation

This section covers methods that are useful for data preparation work.

distinct()

Returns a new DataFrame containing distinct rows in the DataFrame being referenced. It’s useful for performing SQL UNION vs.UNION ALL. In this case you would perform a UNION between two dataframes and then do a distinct.

Sample Call:

The call below will return total row counts as each row in the DataFrame is unique.

flightData.distinct().count()

Output:

Out[8]: 255

Sample Call:

The sample below returns a dataframe with distinct country or origin values.

flightOrigin = flightData.select(“ORIGIN_COUNTRY_NAME”).distinct()flightOrigin.show(5)

Sample Call:

The call below gives a distinct count of country of origin.

flightOrigin.distinct().count()

Output:

Out[18]: 131

toDF(*cols)

Returns a new DataFrame with new column names that were specified as replacements for existing ones. All column names will need to be included. This method is useful when renaming all columns in the current DataFrame.

Sample Call:

flightData.toDF(“DEST”, “ORIGIN”, “COUNT”).first()

Output:

Out[2]: Row(DEST=’United States’, ORIGIN=’Romania’, COUNT=1)

withColumn(colName, col)

Returns a new DataFrame by adding a column or replacing an existing column that has the same name.

The column expression to derive the new column must be an expression over the current DataFrame.

Sample Call:

If we assume more than 2 flights per week and less than 3 flights per week to be medium frequency, we could derive the column as below.

from pyspark.sql.functions import expr, col
df = flightData.withColumn("Med-Frequency", expr("count >= 104 and count <= 156"))
df.filter(col("count") >= 156).show(5)

Output:

Sample Call:

The method can also be used for type casting columns.

from pyspark.sql.functions import col
flightData.withColumn(“count2”, col(“count”).cast(“long”)).show(5)

withColumnRenamed(existing, new)

Returns a new DataFrame by renaming an existing column. No changes are made if schema doesn’t contain the given column.

Sample Call:

The call below renames columns DEST_COUNTRY_NAME and ORIGIN_COUNTRY_NAME.

flightData.withColumnRenamed(“DEST_COUNTRY_NAME”,”dest”).withColumnRenamed(“ORIGIN_COUNTRY_NAME”,”origin”).columns

Output:

Out[56]: [‘dest’, ‘origin’, ‘count’]

drop(cols)

Returns a new DataFrame that drops the specified column. No changes are made if schema doesn’t contain the given column name(s).

The parameter cols for the method represents names of columns to be dropped.

Sample Call:

In the example below the common field which is redundant is dropped.

flightData2 = flightData.drop("count")
flightData2 .columns

Output:

flightData2:pyspark.sql.dataframe.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string]

Out[7]: [‘DEST_COUNTRY_NAME’, ‘ORIGIN_COUNTRY_NAME’]

dropDuplicates(subset=None)

Return a new DataFrame with duplicate rows removed, optionally considering only specified columns.

Sample Call:

In the example below Spark Context creates a dataframe from an array of rows.

from pyspark.sql import Row
df = sc.parallelize([ \
Row(name=’Alice’, age=5, height=80), \
Row(name=’Alice’, age=5, height=80), \
Row(name=’Alice’, age=8, height=80), \
Row(name=’Bob’, age=10, height=100), \
Row(name=’Bob’, age=10, height=100)]).toDF()
df.dropDuplicates([‘name’, ‘height’]).show()

Output:

dropna(how=’any’, thresh=None, subset=None)

Returns a new DataFrame omitting rows with null values. Useful for eliminating rows with null values in the DataFrame especially for a subset of columns i.e. key columns or required columns.

Sample Call:

from pyspark.sql import Row
df = sc.parallelize([ \
Row(name=’Alice’, age=5, height=80), \
Row(name=’Alice’, age=5, height=None), \
Row(name=’Bob’, age=None, height=100), \
Row(name=’Bob’, age=10, height=100)]).toDF()
df.dropna().show()

Output:

Sample Call:

df.dropna(how=’any’,subset=[‘age’]).show()

Output:

filter(condition)

Filters rows using the given condition; where() (which is a SQL equivalent) is an alias for filter()

Sample Call:

flightData.filter(flightData[“count”] < 2).show(5)

Sample Call:

Using col function.

from pyspark.sql.functions import colflightData.filter(col(“count”) < 2).show(5)

Sample Call:

Conjunctive filter.

flightData.filter(“count < 2 AND ORIGIN_COUNTRY_NAME !=’United States’”).show(5)

fillna(value, subset=None)

Replace null values with a fixed value.

Sample Call:

from pyspark.sql import Row
df = sc.parallelize([ \
Row(name=’Alice’, age=5, height=80), \
Row(name=’Alice’, age=5, height=None), \
Row(name=’Bob’, age=None, height=100), \
Row(name=None, age=10, height=100)]).toDF()
df.fillna(100).show()

Output:

All null values are replaced with 100 when column type is int.

Sample Call:

df.fillna({‘age’: 50, ‘name’: ‘John Doe’}).show()

Output:

replace(to_replace, value=<no value>, subset=None)

Returns a new DataFrame replacing matching values with replacement values. Values to_replace and value must be of the same type and can only be numeric, boolean, or string; value can be None.

When replacing values, the new value will be cast to the type of the existing column. For numeric replacements all values to be replaced should have unique floating point representation. In case of conflicts for example with to_replace param being the dictionary {42: -1, 42.0: 1} , an arbitrary replacement will be used.

Sample Call:

from pyspark.sql import Row
df = sc.parallelize([ \
Row(name=’Alice’, age=10, height=80), \
Row(name=’Jane’, age=8, height=None), \
Row(name=’Bob’, age=20, height=180), \
Row(name=’John’, age=11, height=90)]).toDF()
df.replace(10, 11).show()

Output:

Sample Call:

Replacement with None

df.replace(‘John’, None).show()

Sample Call:

List based replacement.

df.replace([‘John’, ‘Jane’], [‘Jean’, ‘Jeanne’], ‘name’).show()

Output:

sort(*cols, **kwargs)

Returns a new DataFrame sorted by the specified column(s) in ascending or descending order by each column.

Sample Call:

flightData.sort(desc(“count”)).show(5)

Sample Call:

Alternate option for specifying column.

flightData.sort(flightData[“count”]).show(5)

Sample Call:

Using ascending param.

flightData.sort("count", ascending=False).show(5)

sortWithinPartitions(*cols, **kwargs)

Returns a new DataFrame with each partition sorted by the specified column(s).

randomSplit(weights, seed=None)

Randomly splits a DataFrame with provided weights. This method is typically used for statistical or machine learning purposes.

Sample Call:

dataFrames = flightData.randomSplit([0.25,0.75], 10)
dataFrames[0].count()

Output:

Out[137]: 62

Sample Call:

dataFrames[1].count()

Output:

Out[136]: 193

sample(withReplacement=None, fraction=None, seed=None)

Returns a sampled subset of a DataFrame. This method is typically used for statistical or machine learning purposes.

Note:

  • This is not guaranteed to provide exactly the fraction specified of the total count of the given DataFrame.
  • fraction is required and, withReplacement and seed are optional. When withReplacement is True, identical rows can be produced when sampling; when false, the rows would be unique.

Sample Call:

seed = 5
withReplacement = False
fraction = 0.5
display(flightData.sample(withReplacement, fraction, seed))

sampleBy(col, fractions, seed=None)

Returns a new DataFrame that represents a stratified sample without replacement based on the fraction given on each stratum.

Sample Call:

Create the fraction for the count column as an example. The values generated below are somewhat random. However it could be an ordered list of values we are interested in and corresponding fraction for each value.

fractions = flightData.select("count")\
.distinct().limit(10)\
.withColumn("fraction", lit(0.1)).rdd.collectAsMap()
print(fractions)

Output:

{29: 0.1, 26: 0.1, 65: 0.1, 8305: 0.1, 293: 0.1, 442: 0.1, 243: 0.1, 54: 0.1, 19: 0.1, 113: 0.1}

Pull out the required data by using sampleBy

seed = 20
display(flightData.sampleBy(col=”count”, fractions=fractions, seed=seed))

Actions

Actions are commands that are computed by Spark at the time of their execution. They are used to materialize the DataFrame.

collect()

Returns all the records as a list of Row.

Sample Call:

flightData.limit(5).collect()

count()

Returns the number of rows in the DataFrame.

Sample Call:

flightData.count()

first()

Returns the first row as a Row.

flightData.first()

take(num)

Returns the first num rows as a list of Rows.

flightData.take(10)

tail(num)

Returns the last num rows as a list of Rows. Running tail requires moving data into the application’s driver process, thus it should be run on smaller datasets.

flightData.tail(10)

Create Tables/Views

createGlobalTempView(name)

Creates a global temporary view with the given DataFrame which would be used in Hive SQL queries.

The lifetime of this temporary view is tied to the Spark application. The call throws a TempTableAlreadyExistsException, if the view name already exists in the catalog.

createOrReplaceGlobalTempView(name)

Creates or replaces a global temporary view using the given name. The lifetime of this temporary view is tied to this Spark application.

createTempView(name)

Creates a local temporary view with the given DataFrame.

The lifetime of this temporary table is tied to the SparkSession that was used to create this DataFrame. The call throws TempTableAlreadyExistsException, if the view name already exists in the catalog.

createOrReplaceTempView(name)

Creates or replaces a local temporary view with the given DataFrame. I tend to prefer using this call.

The lifetime of this temporary table is tied to the SparkSession that was used to create this DataFrame.

Sample call:

Create a temporary view.

flightData.createOrReplaceTempView(“FlightData”)

Query the temporary view

%sql SELECT * FROM FlightData WHERE DEST_COUNTRY_NAME = 'Australia' OR ORIGIN_COUNTRY_NAME = 'Australia'

registerTempTable(name)

Registers this DataFrame as a temporary table using the given name.

The lifetime of this temporary table is tied to the SparkSession that was used to create this DataFrame.

SQL Operations

select(*cols)

Applies a set of column expressions and returns a new DataFrame based on them.

Sample Call:

flightData.select(“*”).show(2)

equivalent to

%sql SELECT * FROM FlightData LIMIT 2

Sample Call:

flightData.select(“DEST_COUNTRY_NAME”,”ORIGIN_COUNTRY_NAME”).show(2)

equivalent to

%sql SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME FROM FlightData LIMIT 2

Using expression in select call, equivalent to selectExpr:

from pyspark.sql.functions import expr
#rename column
flightData.select(expr(“DEST_COUNTRY_NAME as dest”)).show(2)
#equivalent to
flightData.select(expr("DEST_COUNTRY_NAME").alias("dest")).show(2)

selectExpr(*expr)

Applies a set of SQL expressions and returns a new DataFrame.

Sample Call:

Using selectExpr to rename columns.

flightData.selectExpr(“DEST_COUNTRY_NAME as dest”,”ORIGIN_COUNTRY_NAME as origin”,”count”).show(2)

equivalent to

%sql SELECT DEST_COUNTRY_NAME dest, ORIGIN_COUNTRY_NAME origin, count FROM FlightData LIMIT 2

Applying operations across columns to derive columns

flightData.selectExpr(
“*”, #all columns
“(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) AS withinCountry”)\
.show(2)

equivalent to

%sql SELECT *, (DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry 
FROM FlightData
LIMIT 2

Applying aggregations to derive columns

#aggregation
flightData.selectExpr("avg(count) as AvgCount", "count(distinct(DEST_COUNTRY_NAME)) as DestCount").show()

Output:

| AvgCount|DestCount|

+ |1655.956862745098| 125|

equivalent to

%sql SELECT avg(count) as AvgCount, count(distinct(DEST_COUNTRY_NAME)) as DestCount FROM FlightData

agg(*exprs)

Aggregate on the entire DataFrame without groupings.

flightData.agg({“count”: “sum”}).collect()

Alternate syntax for the operation.

from pyspark.sql.functions import *flightData.agg(sum(flightData[“count”])).collect()

crossJoin(other)

Returns the cartesian product of a join with another DataFrame.

Sample Call:

from pyspark.sql import Row
df = sc.parallelize([ \
Row(name=’Alice’, age=5 ), \
Row(name=’Bob’ , age=8 )]).toDF()
df2 = sc.parallelize([ \
Row( height=80), \
Row( height=100)]).toDF()
df.crossJoin(df2).select(“name”, “age”, “height”).collect()

Output:

Out[50]: [Row(name=’Alice’, age=5, height=80), Row(name=’Alice’, age=5, height=100), Row(name=’Bob’, age=8, height=80), Row(name=’Bob’, age=8, height=100)]

join

Joins with another DataFrame, using the given join expression.

Sample Call:

flightData2010 = flightData.withColumnRenamed(“count”,”2010count”)
flightData2010.join(flightData2011, (flightData2010[“DEST_COUNTRY_NAME”] == flightData2011[“DEST_COUNTRY_NAME”]) & (flightData2010[“ORIGIN_COUNTRY_NAME”] == flightData2011[“ORIGIN_COUNTRY_NAME”]), ‘inner’)\
.drop(flightData2011[“DEST_COUNTRY_NAME”])\
.drop(flightData2011[“ORIGIN_COUNTRY_NAME”])\
.withColumnRenamed(“count”,”2011count”)\
.show(10)

The join above could also be represented in a simpler notation as below, it also has an added benefit of eliminating repeating columns:

flightData2010.join(flightData2011, [“ORIGIN_COUNTRY_NAME”,“DEST_COUNTRY_NAME”], ‘inner’)

Output:

intersect(other)

Return a new DataFrame containing rows only in both the current DataFrame and another DataFrame that’s supplied.

This is equivalent to INTERSECT in SQL.

Sample Call:

dfguide = “/databricks-datasets/definitive-guide/”
flightData = spark.read.format(“parquet”)\
.option(“mode”,”FAILFAST”)\
.load(dfguide + “/data/flight-data/parquet/2010-summary.parquet”)
lspark = “/databricks-datasets/learning-spark-v2/”flightData2011 = spark.read.format(“csv”)\
.option(“header”,”true”)\
.option(“mode”,”FAILFAST”)\
.option(“inferSchema”,”true”)\
.load(lspark + “flights/summary-data/csv/2011-summary.csv”)
flightData.intersect(flightData2011).sort(“DEST_COUNTRY_NAME”,”ORIGIN_COUNTRY_NAME”).collect()

intersectAll(other)

Return a new DataFrame containing rows in both the current dataframe and another dataframe while preserving duplicates.

Sample Call:

flightData.intersectAll(flightData2011).sort(“DEST_COUNTRY_NAME”,”ORIGIN_COUNTRY_NAME”).collect()

subtract(other)

Return a new DataFrame containing rows in current DataFrame but not in another DataFrame.

This is equivalent to EXCEPT DISTINCT in SQL.

flightData.subtract(flightData2011).sort(“DEST_COUNTRY_NAME”,”ORIGIN_COUNTRY_NAME”).show(20)

exceptAll(other)

Return a new DataFrame containing rows in current DataFrame but not in another DataFrame while preserving duplicates in the result.

This is equivalent to EXCEPT ALL in SQL.

union(other)

Return a new DataFrame containing union of rows in current and another DataFrame.

This is equivalent to UNION ALL in SQL. To do a SQL-style union that dedupes rows, use union() followed by distinct(). As is standard in SQL, this function resolves columns by position, not by name.

Sample Call:

flightData.union(flightData2011).count()

Output:

Out[58]: 510

unionByName(other)

Returns a new DataFrame containing union of rows in current and another DataFrame similar to previous function.

The difference between this function and union() is that this function resolves columns by name, not by position.

where(condition)

Filters rows using the given condition. This function is an alias for filter()

Sample Call:

flightData.where(“count < 2”).show(5)

Sample Call:

flightData.where(col(“count”) < 2).where(col(“ORIGIN_COUNTRY_NAME”) != “United States”).show(2)

SQL equivalent would be:

%sql SELECT * FROM FlightData WHERE count < 2 AND ORIGIN_COUNTRY_NAME != “United States” LIMIT 2

groupBy(*cols)

Groups the DataFrame using the specified column(s), so we can run aggregation on them. The parameter is a list of columns to group by.

Sample Call:

display(flightData.groupBy(“ORIGIN_COUNTRY_NAME”).avg(“count”).orderBy(‘ORIGIN_COUNTRY_NAME’))

Sample Call:

display(flightData.groupBy(‘ORIGIN_COUNTRY_NAME’).agg({‘count’: ‘avg’}).collect())

orderBy(*cols, **kwargs)

Returns a new DataFrame sorted by the specified column(s).

Sample Call:

from pyspark.sql.functions import descflightData.where(“ORIGIN_COUNTRY_NAME != ‘Australia’”).orderBy(desc(“count”), “DEST_COUNTRY_NAME”).show(5)

Sample Call:

from pyspark.sql.functions import col, desc, asc
flightData.orderBy(col(“ORIGIN_COUNTRY_NAME”).desc(), col(“DEST_COUNTRY_NAME”).asc()).show(5)

limit(num)

Limits the result set count to the number specified.

Sample Call:

from pyspark.sql.functions import col, desc, asc
flightData.orderBy(col(“ORIGIN_COUNTRY_NAME”).desc(), col(“DEST_COUNTRY_NAME”).asc()).limit(5).collect()

SQL Equivalent:

%sql SELECT * FROM FlightData ORDER BY ORIGIN_COUNTRY_NAME desc, DEST_COUNTRY_NAME asc LIMIT 5

cube(*cols)

Create a multi-dimensional cube for the current DataFrame using the specified columns to run aggregations across dimensions.

Sample Call:

flightData.cube(“ORIGIN_COUNTRY_NAME”).sum(“count”).orderBy(desc(“sum(count)”)).show()

Output:

In the example below the first row with null for origin country is the total for all origin countries.

rollup(*cols)

Create a multi-dimensional rollup for the current DataFrame using the specified columns, to run aggregations across dimensions. Rollup is a subset of cube in terms of summary numbers that are generated.

Sample Call:

flightData.rollup(“ORIGIN_COUNTRY_NAME”, “DEST_COUNTRY_NAME”).sum(“count”).orderBy(“ORIGIN_COUNTRY_NAME”, “DEST_COUNTRY_NAME”).show()

Statistical Operations

For more details around statistical and mathematical operations please refer to this article at Databricks blog which explains them well. I’ll cover it at a high level as I intend to cover them in another article at a later time.

approxQuantile(col, probabilities, relativeError)

Calculates the approximate quantiles of numerical columns of a DataFrame.

Sample Call:

from pyspark.sql.types import *
from pyspark.sql.functions import *
retailSchema = StructType([
StructField(“InvoiceNo”, IntegerType(), True),
StructField(“StockCode”, StringType(), True),
StructField(“Description”, StringType(), True),
StructField(“Quantity”, IntegerType(), True),
StructField(“InvoiceDate”, StringType(), True),
StructField(“UnitPrice”, DoubleType(), True),
StructField(“CustomerID”, DoubleType(), True),
StructField(“Country”, StringType(), True),
])
retailData = spark.read.format(“csv”)\
.option(“header”, “true”)\
.option(“mode”,”FAILFAST”)\
.schema(retailSchema)\
.load(“/databricks-datasets/definitive-guide/data/retail-data/by-day/2010–12–01.csv”)
retailData.approxQuantile(“UnitPrice”, [0.25, 0.5, 0.75], 0.1)

corr(col1, col2, method=None)

Calculates the correlation of two columns of a DataFrame. The value returned is a double. Currently only supports the Pearson Correlation Coefficient; thus the param method would be “pearson”

Sample Call:

retailData.corr(“Quantity”, “UnitPrice”)

cov(col1, col2)

Calculate the sample covariance for two columns of a DataFrame. The value returned is a double.

Sample Call:

retailData.cov(“Quantity”, “UnitPrice”)

crosstab(col1, col2)

Computes a pair-wise frequency table of the given columns. It does generate a large table so it might be a good idea to filter down to a range of values for the one or both columns i.e. values of interest.

Sample Call:

display(retailData.crosstab(“StockCode”,”CustomerId”))

describe(*cols)

Computes basic statistics for numeric and string columns. If no columns are given, the function computes statistics for all numerical or string columns.

Sample Calls:

retailData.describe().show()

Output:

Sample Calls:

retailData.describe([“Quantity”, “UnitPrice”]).show()

freqItems(cols, support=None)

Finding frequent items for columns, possibly with false positives.

Sample Calls:

Multiple Columns

display(retailData.freqItems([“StockCode”,”CustomerID”]),0.5)

Single Column

display(retailData.freqItems([“UnitPrice”]),0.25)

Call me crazy but I much prefer running a query like below, we could also include total row count in there to get a percentage if needed. We also have more room to customize as needed.

%sql select StockCode, count(InvoiceNo) from RetailData group by StockCode order by count(InvoiceNo) DESC

summary(*statistics)

Computes specified statistics for numeric and string columns of a dataframe. Available statistics are:

  • count
  • mean
  • stddev
  • min
  • max
  • arbitrary approximate percentiles specified as a percentage, 75% for example.

If no statistics are given, this function computes count, mean, stddev, min, approximate quartiles (percentiles at 25%, 50%, and 75%), and max.

Sample call:

retailData.summary().show()

Output:

Sample call:

retailData.select(“StockCode”, “CustomerID”).summary(“count”).show()

Execution

cache()

Caches the DataFrame with the default storage level (MEMORY_AND_DISK) for quicker access. It’s useful when the dataframe is referenced multiple times across operations.

Sample Call:

flightData.cache()

checkpoint(eager=True)

Returns a checkpointed version of the DataFrame. Checkpointing can be used to truncate the logical plan of this DataFrame, which is especially useful in iterative algorithms where the plan may grow exponentially. You can then reference the DataFrame at the point it was checkpointed. This also avoids reapplying the operations to derive the required DataFrame.

The data will be saved to files inside the checkpoint directory set with SparkContext.setCheckpointDir(). Please note that there are some performance implications to this.

Parameter:

eager — Boolean, default True. Whether to checkpoint the DataFrame immediately

coalesce(numPartitions)

Returns a new DataFrame that has numPartitions partitions. This operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not be a shuffle. Each of the 100 new partitions will combine 10 of the current partitions. If a larger number of partitions is requested, it will stay at the current number of partitions.

However, if you’re doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer number of nodes e.g. one node in the case of numPartitions = 1. I typically use numPartitions = 1 when I’m required to generate only one file for publishing externally.

To avoid the issues with lesser number of nodes being utilized, you can call repartition(). This will add a shuffle step, but the current upstream partitions will be executed in parallel .

Sample Call:

flightData.coalesce(1).rdd.getNumPartitions()

localCheckpoint(eager=True)

Returns a locally checkpointed version of the DataFrame. Local checkpoints are stored in the executors using the caching subsystem and therefore not reliable. Refer to checkpoint section above for more details.

Parameter:

eager — Boolean, default True. Whether to checkpoint the DataFrame immediately

explain(extended=False)

Prints the (logical and physical) plans to the console for debugging purpose.

Parameter:

extended — Boolean, default False. If False, prints only the physical plan.

Sample Call:

flightData.explain()

Output:

== Physical Plan == *(1) FileScan parquet [DEST_COUNTRY_NAME#290,ORIGIN_COUNTRY_NAME#291,count#292L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/databricks-datasets/definitive-guide/data/flight-data/parquet/2010-summar…, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:bigint>

hint(name, *parameters)

Specifies some hint on the current DataFrame. Jace Klaskowski explains this well in the hint section of his online book.

Parameters:

Sample Call:

The example below doesn’t make sense because both dataframes are similar in size and would be partitioned in the same way. However, it is shown below to demonstrate the syntax.

flightData.join(flightData2011.hint("broadcast"), ["ORIGIN_COUNTRY_NAME","DEST_COUNTRY_NAME"]).show()

isLocal()

Returns True if the collect() and take() methods can be run locally (without any Spark executors).

Sample Call:

flightData.isLocal()

Output:

Out[7]: False

persist(storageLevel=StorageLevel(True, True, False, False, 1))

Sets the storage level to persist the contents of the DataFrame after it is first computed. You can assign a new storage level only if the DataFrame does not have a storage level set yet. The default storage level is MEMORY_AND_DISK. There are other options such as MEMORY_ONLY, DISK_ONLY and others. The options can be found in the documentation.

unpersist(blocking=False)

Marks the DataFrame as non-persistent, and remove all blocks for it from memory and disk. You would call this method after caching the DataFrame if you need to ‘uncache’ the DataFrame.

repartition(numPartitions, *cols)

Returns a new DataFrame partitioned by the given partitioning expressions.

The resulting DataFrame is hash partitioned. Hash Partitioning attempts to spread the data evenly across partitions based on partitioning key.

Sample Call:

flightData2 = flightData.repartition(“ORIGIN_COUNTRY_NAME”)
flightData2.rdd.getNumPartitions()

Output:

Out[9]: 200

Sample Call:

flightData2.repartition(10,”ORIGIN_COUNTRY_NAME”)

repartitionByRange(numPartitions, *cols)

Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is range partitioned.

Some dataframes have keys that follow a particular order e.g. CustomerId. Range partitioning is an efficient partitioning technique for these cases.

Conversion

toJSON(use_unicode=True)

Converts a DataFrame into a RDD of strings. Each row is turned into a JSON document as one element in the returned RDD.

Sample Call:

flightData.toJSON().first()

Output:

Out[16]: ‘{“DEST_COUNTRY_NAME”:”United States”,”ORIGIN_COUNTRY_NAME”:”Romania”,”count”:1}’

toLocalIterator()

Returns an iterator that contains all the rows in this DataFrame. The iterator will consume as much memory as the largest partition in this DataFrame.

Sample Call:

Create a list out of the iterator in the example below. Typical use case is to loop through the iterator.

listFlightData = list(flightData.toLocalIterator())

Output:

First 5 rows shown below

Out[23]: [Row(DEST_COUNTRY_NAME=’United States’, ORIGIN_COUNTRY_NAME=’Romania’, count=1), Row(DEST_COUNTRY_NAME=’United States’, ORIGIN_COUNTRY_NAME=’Ireland’, count=264), Row(DEST_COUNTRY_NAME=’United States’, ORIGIN_COUNTRY_NAME=’India’, count=69), Row(DEST_COUNTRY_NAME=’Egypt’, ORIGIN_COUNTRY_NAME=’United States’, count=24), Row(DEST_COUNTRY_NAME=’Equatorial Guinea’, ORIGIN_COUNTRY_NAME=’United States’, count=1),

toPandas()

Returns the contents of the DataFrame as Pandas DataFrame. This is only available if Pandas is installed and available.

Note: This method should only be used if the resulting Pandas’s DataFrame is small, as all the data is loaded into the driver’s memory.

Sample Call:

pdFlightData = flightData.toPandas()

Properties

columns

Returns all column names as a list.

Sample Call:

flightData.columns

dtypes

Returns all column names and their data types as a list.

Sample Call:

flightData.dtypes

na

Returns a DataFrameNaFunctions for handling missing values.

rdd

Returns the content of the DataFrame as a RDD of Rows.

schema

Returns the schema of the DataFrame as a StructType.

Sample Call:

flightData.schema

stat

Returns a DataFrameStatFunctions for statistic functions.

storageLevel

Get the DataFrame’s current storage level.

Sample Call:

flightData.storageLevel

After caching.

flightData.cache().storageLevel

write

Interface for saving the DataFrame into storage. More on this in DataFrameWriter article

Returns

DataFrameWriter

Summary

The article walked across the methods and properties available in a DataFrame. I divided the methods by functionality and how I tend to think of them. I hope it was useful. I’ll revisit this article as I go and fine-tune the content.

The next article will cover Row and Column.

--

--

Lackshu Balasubramaniam

I’m a data engineering bloke who’s into books. I primarily work on Azure and Databricks. My reading interest is mostly around psychology and economics.