David Mudrauskas
4 min readJan 12, 2019

Looping over Spark: an antipattern

I had a recent experience with Spark (specifically PySpark) that showed me what not to do in certain situations, although it may be tempting or seem like the natural approach. This antipattern arose in a scenario where

  1. I had easily accessible non-Spark data structures,
  2. I had corresponding Spark structures, and
  3. I intended to combine the two.

The bottom line: when working with Spark, represent any collection of data you’ll act on as a Spark structure.

The scenario

Imagine you’re working with various periods of time, where each period is a continuous range of years. Your ranges are already (or can easily be) represented as simple structures:

current_year =     {'name': 'current_year',
'start': 2019,
'end': 2019}
previous_year = {'name': 'previous_year',
'start': 2018,
'end': 2018}
last_three_years = {'name': 'last_three_years',
'start': 2017,
'end': 2019}

You also have larger, granular data easily represented in Spark:

>>> purchases
DataFrame[year: int, item: string, price: decimal(10,10)]

How will you get the particular customer purchases corresponding to each period?

A simple way to do it

If you need to get the data corresponding to a single period — a single period for a given execution — you can simply call this function once:

from pyspark.sql import functionsdef get_purchases_for_year_range(purchases, year_range):
range_name = year_range[‘name’]
start_year = year_range[‘start’]
end_year = year_range[‘end’]
return purchases \
.where((purchases.year >= start_year) &
(purchases.year <= end_year)) \
.select(functions.lit(range_name).alias('range_name'),
purchases.item,
purchases.price)

What not to do

The simple approach becomes the antipattern when you have to go beyond a one-off use case and you start nesting it in a structure like a for loop. This is tempting even if you know that RDDs underlie your Spark entities and that you’ll be switching between them and non-RDD entities, because ostensibly all the pieces you need are already there. To get records for multiple periods of interest with this approach, you end up with the following. Here we end up creating an aggregator variable to facilitate the antipattern.

from pyspark.sql.types import *schema = StructType((
StructField(‘period_name’, IntegerType()),
StructField(‘item’, StringType()),
StructField(‘price’, DecimalType(10,10))))
periods_and_purchases = spark.createDataFrame([], schema)for year_range in year_ranges:
next_set = get_purchases_for_year_range(purchases, year_range)
periods_and_purchases = periods_and_purchases.union(next_set)
return periods_and_purchases

For all but pretty trivial amounts of data, your application will end up throwing this error:

org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 5136 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

What went wrong

Spark is lazily evaluated so in the for loop above each call to get_purchases_for_year_range does not sequentially return the data but instead sequentially returns Spark calls to be executed later. All these calls get aggregated and then executed simultaneously when you do something later with periods_and_purchases that tells it to finally evaluate. In particular, results remain unevaluated and merely represented until a Spark Action gets called. Past a certain point the application can’t handle that many parallel tasks.

In a way we’re running into a conflict between two different representations: conventional structured coding with its implicit (or at least implied) execution patterns and independent, distributed, lazily-evaluated Spark representations.

The right way to do it

You’ll want to represent any collection of data you’ll rely on for Spark processing as a Spark structure. The fundamental unit of Spark is tabular data, instantiated as an object within the Spark framework. You want to coerce your input data into that, even if it ends up being as simple as a single column. We can actually define a schema like we did above, just at a different point in the overall flow, to achieve a workable solution:

schema = StructType((
StructField('name', StringType()),
StructField('start_year', IntegerType()),
StructField('end_year', IntegerType())))
# Notice these are structured differently than above to make them compatible with the Spark DataFrame constructor
current_year = ['current_year', 2019, 2019]
previous_year = ['previous_year', 2018, 2018]
last_three_years = ['last_three_years', 2017, 2019]
periods = spark.createDataFrame([current_year, previous_year, last_three_years], schema)periods.show()# +----------------+----------+--------+
# | name|start_year|end_year|
# +----------------+----------+--------+
# | current_year| 2019| 2019|
# | previous_year| 2018| 2018|
# |last_three_years| 2017| 2019|
# +----------------+----------+--------+
return purchases \
.join(periods,
(purchases.year >= periods.start_year) &
(purchases.year <= periods.end_year)) \
.select(periods.name.alias('period_name'),
purchases.item,
purchases.price)

We end up with what we originally intended, a list of purchases for each period of interest. We also eliminated a separate nested function and enclosing for loop, in exchange for whatever transformations we needed to perform to structure our periods of interest as a DataFrame.

You might find this unpalatable, especially from an object-oriented perspective, since it can feel redundant or in violation of consistent, self-contained abstractions, because you end up breaking down some delineation between your structures. The benefit though is everything lives in Spark, you can think of it in one way, and it’s compatible with the resource management and other behaviors of the Spark framework. We also introduce a join where we didn’t have one before, which seems unsavory since join is a quick path to a combinatorial explosion of data. It’s the right move here though because, again, getting everything into Spark takes priority. Addressing any explosion should come after that, within the language and concepts of Spark, such as by avoiding multiple instances of join or by partitioning and reading only subsets of the data.

Conceivably, we could have gotten around our issue by forcing sequential evaluation with an Action or perhaps with cache, but that seems unnecessary and more complicated than translating everything to the conceptual language of Spark. Being able to think about everything in this one way, with one consistent set of possible operations strikes me as the correct approach. It reduces cognitive drag, helps avoid errors, and facilitates development.

General thoughts on Spark

I’ve had a good experience with Spark so far in processing considerable amounts of data in the cloud. Its advantage over conventional processing within relational databases is its scaling and parallel processing capacity. I’ve also had a better experience with its ecosystem compared to other big data processing frameworks (e.g., out-of-the-box Hive). For instance, it’s been far easier to construct fixtures, isolate transformations, and take care of other components of automated testing. It’s also worth noting conventional SQL enterprise solutions like SQL Server are now incorporating Spark as a parallel processing engine, but I don’t know what the exact environment for that looks like and whether it would have similar advantages in automating tests, etc.