The recipe of instability

Spark UDFs + non pure computation

Yousry Mohamed
Nov 2 · 9 min read

Spark is fast, Spark is flexible, Spark is awesome but sometimes it happens to be exciting as well. When non trivial piece of logic is needed in a Spark application, the easiest way to handle it is to write a user defined function or a UDF for short. There is an existing legacy of considering UDFs not so good for performance as they prevent many optimizations like predict push down and so on but sometimes they are inevitable.

In this post, I will share some ideas about how UDFs could be tricky specially when they call non-pure functions.

Setup

Almost everything could be done using a local Spark installation but sometimes the behaviour on a real cluster is different than a local instance so I will use an Azure HDInsight Spark cluster. Any size would be fine as long as worker node count is greater than one. The version used in this example is Spark 2.4 on HDI 4.0. I will use Zeppelin and the notebook used will be shared at the end of the post.

Also in Zeppelin, livy2 interpreter has to be updated to reference an external dependency that will be used later. So the config value livy.spark.jars.packages should be set to io.sgr:s2-geometry-library-java:1.0.0

DataFrames and UDFs

Starting with some classic DataFrame and UDF basics, let’s fire up Zeppelin and start with a simple DataFrame of 1 million rows with an id column plus a text column containing the id value padded.

Let’s say there is a complex piece of logic required such that the text column has to be transformed into a tuple of 3 values representing first,second and third character of the text column. That can be implemented using substring function in Spark SQL but for now, we will use a UDF. The UDF also references an accumulator that will be used to count how many times the UDF is invoked.

The UDF should work as expected transforming the text column into a tuple with 3 elements.

It’s expected that if the above transformed column is materialized for the whole DataFrame, then the UDF will be called for every individual row in the DataFrame which is 1M in our case.

Using explain function of the DataFrame, the execution plan of the above query shows a single call of the UDF which is expected plus Spark being clever including the expression to create the text column into same computation.

But what would be the case if want to extract the individual elements of that UDF produced tuple and do something with them. Will the UDF be called once per row?

Wow! 3 million UDF calls, meaning for every expression dependent on the UDF output a call will be placed. Another fact is that the number of calls is 3 million not 4 million as the column transformed_text itself is not required in the final action result so only the individual expressions to pull tuple elements are the only things needed to satisfy the query. The execution plan shows this fact clearly.

So why this behaviour happens? Spark by default treats any UDF as a deterministic function. Meaning it can be called many times for the same input with no risk as the output will be the same. That concept sounds like pure functions which are identified by two main attributes (copied from Wikipedia):

  1. Its return value is the same for the same arguments
  2. Its evaluation has no side effects

This decision to treat UDF as deterministic by default most likely relates to Spark optimization of query execution plan.

Single call per row

Let’s assume we want the UDF to be called once per each row, how to achieve that? The first idea might be to introduce caching (plus an action after it) directly after the expression involving the UDF. That sounds like a plan but it’s more like a hack rather than a proper solution.

Spark community has already provided a solution to this problem since version 2.3.0. The solution is to mark the UDF as non deterministic so Spark will be forced to materialize its outcome thus calling it once per row.

The UDF is called once for each row and then subsequent expressions continue from that point onward.

Because repeating the UDF for same inputs could be computationally expensive if the logic inside the UDF is complicated, some folks placed a SPARK Jira ticket to make UDFs non deterministic by default in future Spark major version. Also the simple intuition around UDF may lead to the assumption that non deterministic mode should be the default as this is how users naturally expect their code to run.

Surprisingly when I tried the same code on a DataBricks cluster on Azure, I did not need to mark the UDF non deterministic to get a single call per row behavior. It seems it’s baked into the DataBricks Spark distribution by default for some reason.

What about instability?

Even if we have limitless compute and don’t mind executing the UDF multiple times per single row, there could be other factors to consider. In some certain edge cases, the code inside the UDF may return different result for the same inputs. That’s a clear indication of another problem but due to the nature of big data applications, it may not be very obvious and will definitely cause a lot of confusion.

Deviating a bit from Spark world, let’s talk about a library used in spatial applications. S2 Geometry is a library from Google for spatial mapping and manipulating geometric shapes. Unfortunately the maven dependency for this library is based off a fork that has not been functionally updated since 2011. The library itself is robust related to the functionality of defining spatial shapes and doing other operations like intersections and so on.

Nevertheless, everything has its limits and working conditions. Let’s do a simple experiment here using this library from Spark. I have mentioned in the setup section that we can reference this library by updating the config values of livy2 interpreter.

The intersection dilemma

The shapes involved in this experiment are basically a polygon representing a rectangular area near Melbourne and another polygon which is actually a line segment intersecting the rectangle.

Those are shapes that could be found in practical applications although that’s not a very common use case but a good fit for our discussion here. Those shapes could be represented in the same notebook we have as follows:

There is a couple of functions to create an S2 polygon out of an array of points and another function to intersect two polygons and return the intersection polygon. As I am awful with naming, just remember that shape1 is the rectangle and shape2 is the line segment. We can use S2 to find out their respective areas.

That’s somehow strange to see such tiny numbers but there is an explanation. S2 projects points on earth onto a sphere of unit 1. So polygon area from S2 represents the area of the polygon on that unit sphere. To get the human readable area in square km for example, it can be calculated as:

(S2 Area) × (Earth Surface Area) / 4 π

In our case the rectangle has an area of around 0.06 square km. The line segment has an area of 5.15007217004638E-4 square meters 😯

Anyway, let’s intersect the two shapes and get the area of the intersection. I will cut to the chase and repeat this operation 10 times and see what happens.

That’s completely wobbly, S2 intersection returns two different results randomly for the same set of inputs. Actually one of the results is the expected nearly zero area and the other one is 12.566… which is 4 PI or in other words surface area of earth. I am not every interested in which one is right and which one is wrong, that could be a philosophical question. My main concern here is the non deterministic nature of the computation.

How that works with UDFs?

I think by now you get what I am trying to highlight; non deterministic computation will cause confusing results if used from a Spark UDF that is not marked non deterministic but let’s see for ourselves. To simulate the case, a UDF is needed to intersect two shapes and return intersection area and any other calculation such as the ratio of intersection area to the area of one of the shapes.

Next, a 10 row DataFrame is created where each row has two columns containing the points of the same two shapes we have seen before. Then the UDF is called and two extra columns are created to pick up individual elements of the tuple generated by the UDF.

It’s expected that the area column will fluctuate between that very small number and 12.566. But, the intersection ratio column is not consistent for the same intersection area value. For example, the two red highlighted rows have same intersection area but totally different intersection ratio although their column expressions are pulling from the same source column. If you remember the first few examples in this post, you will notice that the root cause is that the UDF is called multiple times for the same input row and each invocation may produce different result for the intersection area polygon hence the inconsistency.

Now I think it’s obvious what would be the compound effect of deterministic Spark UDFs used with non pure/stable computation.

If the UDF is marked non deterministic, results will be pretty good. There will be still fluctuations but there will be consistency per each single row. In other words, all the column values of each row will make sense together as they are drawn from the same UDF invocation.

Wrap up

I am not blaming S2 or Spark for this behavior but just want to highlight a special case that made me understand better how Spark UDFs work behind the scenes. Intersecting a straight line segment and a normal polygon is not a common use case but it was a tool to make the pattern and implications very clear. My recommendation in similar cases is to mark UDFs non deterministic but more importantly work out some method to make the underlying computation pure and stable. In our case, I’d have a prior UDF that filters out any rows having shapes with areas less than a certain threshold or whatever the criteria to identify them. Most likely they represent a tiny minority in the dataset and if worth further investigation, those irregular shapes could be stored in a log file for investigation. Anyway, like most software problems the answer will start with “It depends” and the context and business domain will impact the optimal solution. Understanding how things work internally greatly helps the design and verification of that optimal solution.

Exported Notebook

Analytics Vidhya

Analytics Vidhya is a community of Analytics and Data Science professionals. We are building the next-gen data science ecosystem https://www.analyticsvidhya.com

Yousry Mohamed

Written by

Yousry is a senior developer working for Telstra Purple. He is very passionate about all things data including Big Data, Machine Learning and AI.

Analytics Vidhya

Analytics Vidhya is a community of Analytics and Data Science professionals. We are building the next-gen data science ecosystem https://www.analyticsvidhya.com

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade