Use case: Updating Apache Spark major version of an enterprise data platform

Daniel Freitas
IBM Data Science in Practice
11 min readOct 26, 2022

Imagine a big tech company.

This company has hundreds of thousands of employees. Some employees work in the Finance sector, others work in the Marketing, some others are from HR, and so on. Each of these areas make business decisions on a regular basis. To support such decisions they use their data segregated for their purpose, which works so well, right? But now, let’s add a little bit more complexity to that scenario: consider a director of a certain business unit. He or she wants to know how much revenue for the second quarter of 2022 came from the engineering contracts, grouped by region and industry sector. Ok, now things are getting more interesting. Where should he or she seek for that information? Perhaps different sources of data can lead to different results. That is misleading and dangerous. Maybe, the data to serve that conclusion cannot be aggregated because they are in different standards.

Solution: establish a queryable single source of truth.

Photo by Markus Winkler on Unsplash

Right! But I’m here for the technical part, c’mon!

I hear you! Let’s move to the good part!

The long paragraph above was to present the business context that the platform I will talk about aims to solve.

Let’s call this solution as Enterprise Data Platform (EDP). The EDP stack is:

1. Python 3

2. Apache Spark on Kubernetes 2.4.2

3. Airflow 2.2.1

4. Kubernetes 1.23

5. IBM Cloud Products (Cloud Object Storage, Db2, Cloudant, …)

EDP runs different data pipelines. More than 100 flows for different purposes. From different sources, all the way to a data lake and then to a data warehouse organized in star-schema.

Let me give you some numbers to better visualize the platform size. A daily overview on the resource allocations can be retrieved from the Cloud Monitoring Service, Sysdig:

Daily overview of the EPD’s resource allocation

5 TiB of memory allocation and 2.17K cores of cpu used by the pods in the Kubernetes cluster.

When narrowing to the main data pipeline that runs in the product, it takes:

Sysdig resource allocation metrics

Apache Spark

Apache Spark is a distributed data framework that makes our data pipeline faster. It is the core of our data pipeline engine. Thus, major changes on Apache Spark is critical as it presents a major risk of breaking the entire production operation.

Since EDP’s inception, the official Apache Spark version was a custom image based off of the 2.4.2 version. All the data ingestion pipelines were using Spark 2.4.2 taking advantage of its features.

Why Migrate To Spark 3?

The migration is critical because we have to ensure no existing pipeline will break when switching to the newer version.

The major topics when it comes to the decision about the version upgrade were:

  1. Security
  2. Bug fixes
  3. Community support
  4. New features (Adaptive Query Execution, Dynamic Partition Pruning, etc)

In the next paragraph I will talk about some of the hurdles I had to overcome to succeed on this migration.

Test Plan

The first challenge that came up was to try building the whole EDP system on top of the raw Spark 3 image. A naive approach showed to be a complete disaster, as expected. Attempts to compile the code to make at least one single successful build failed miserably.

EDP uses a Python 3.7-stretch with Debian image as base. On top of it is the SparkBase image, which installs some packages dependencies, such as Java and a proprietary framework.

After all the stack of images builds, it is time to get to the actual tests to verify if any breakage appears.

Unit Tests

The EDP is widely tested with over 522 unit tests and 56% of code coverage at a certain point. Thus, the first hurdle to overcome was the unit tests. All the suit should pass like they did for Spark 2.

The issue encountered relates to the Spark version chosen. The higher release version at the time was 3.2.1, even though the latest was 3.1.3, given the minor patch applied. Those versions were too different from each other.

It happens that, in 3.2.1, the unit test out of nowhere gets stuck and the issue was not 100% reproducible. Other than that, the stuck point was never the same, it could vary, making it really hard to predict. The Spark UI also didn’t bring any insight since the Spark session was up but the next task would never start. The adopted hypothesis was a possible issue with Pytest when running with the Spark version 3.2.1. (*Spoiler*: Initial tests show that spark 3.3.0 does not hang as well).

The solution was to try other Spark distributions to try reproducing the issue. It turns out that it does not. On Spark version 3.1.3 the Pytest does not hang.

A few adjustments made to have the tests passing:

  • Switch the SQL Context into SparkSession
  • Adapt the binary string return on some DataFrame columns
  • Remove sql_context.tableNames() of the tests

The first decision happens here. We have selected the version 3.1.3 to keep going and proceed with the next set of tests.

Integration Tests

Along with the unit tests, EDP also provides a set of integration tests. The integration tests checks every core component of the solution. It does so by creating test cases that isolate each one of those.

The test consists of:

1. Input data

2. Expected data

3. YAML file that describes the pipeline execution plan with the single operation to be tested

For example, a data join between two input sources. After every new change in the core, the test suit runs and verify if the new change introduced a point of failure.

Given that context, it was essential having the integration test passing after the Spark migration. It is also important to mention how useful the Integration Test Suit has proven to be on this migration. Without it, the Spark version upgrade would have been a lot more painful. Imagine running pipelines with real data to illustrate all the different kind of scenarios of data manipulation. It would be roughly unfeasible.

Getting back to the story. It turns out that the integration tests failed at first. They failed because the new version of Spark does not accept applying na.fill operation into columns that are not present in the DataFrame. Moreover, the naFillColumns was mandatory on one of the most used transformations. The only way out was to make the naFillColumns an optional parameter. Along with that, create a wrapper on the na.fill called safe_fill_na. This way, it first verifies if the column exist and then apply the filter.

The change to make integration tests pass was:

  • Create a safe fill.na that only applies to columns that exists

After all 157 tests were green, it was then time to move to the tests on the staging environment of the cluster.

AB Tests

When proceeding to the cluster tests, we should avoid the abrupt switching versions. This way, a comparison between versions could be done. The AB Tests approach was then developed to enable running two different versions of the EDP engine on staging, taking turns. We also didn’t want to test all pipelines and we thought that selecting only some would be biased.

When moving to AB tests on staging, one new issue appeared. This one related to SPARK-31404. The issue occurs when trying to extract a dateType that is too old and was ingested using Spark 2 but now it is being extracted by Spark 3.

The first attempt to solve it was by setting the flag:

spark.sql.legacy.parquet.datetimeRebaseModeInRead: LEGACY

After doing that, many pipelines started to work. Though, it didn’t resolve for all.

Some runs were failing in the ApplySchema step. The ApplySchema is a step where EDP enforces the schema of the DataFrame. This step makes sure that the data will look like as expected in the data warehouse.

We noticed that even though the flag was set, the value was not affecting the ApplySchema operation when transforming the DataFrame into an RDD. Then, we decided to build a custom Spark 3.1.3 image forcing the datetimeRebaseMode to be always Legacy. This is acceptable for our use case. Here is the change reference that we made on top of the v3.1.3 tag.

With that, all the images went through adjustments and rebuild before going back to test.

The pipelines that were failing now begun to pass and we could move on with more testing.

Regression Tests

The test engineers know that a robust unit test suit is not enough. Along with the UT, a well written integration test suit decrease the amount of errors in the code base. But, there are some situations that only pop up when a Regression Test or a Stress Test is executed. There are some edge cases that only appears when pushing the system to the limit.

With that in mind, we built a temporary infrastructure to run those Regression Tests. For that we:

  • Deployed a dedicated Airflow instance
  • Created a Regression Test DAG where we included the most important data pipelines
  • Ran those pipelines using a custom code built on top of our latest Spark 3 version

Those tests were important in many ways. First, we have confirmed that the most important DAGs were covered by this new Spark version.

Second, we encountered another specific issue.

The issue relates to the Dynamic Hive Partitioning feature that we use to load data into the Data Lake in a smart way. It was tricky to reproduce the issue. It shows up after a manual movement of folders in the data lake. For example, when someone moves parquet files across different folders using a cloud storage browser. During the next data ingestion the dynamic partitioning fails because it can’t rename hidden files generated in this movement.

To solve that, we chose to educate users to avoid manual manipulations of files in the data lake. Everything works fine if the user replaces the manual move by the CLI.

Minor Issues Along The Way

Small issues happened along the way and it is worth mentioning them even though they got fixed right away.

AWS and Hadoop JAR

The Hadoop version was also upgraded when moving to the Spark 3 version. We went from the Hadoop 2.7 to the Hadoop 3.3.1, which is also a major update. It happens that, to build the Spark 3.1.3 with Hadoop 3.3.1, the directive of the version has to set explicitly.

./dev/make-distribution.sh — name custom-spark — pip — tgz -Phive -Phive-thriftserver -Dhadoop.version=3.3.1 -Pkubernetes

Spark 3.1.3 with Hadoop 3.3.1 Build command

However, the build doesn’t bring the Hadoop 3 dependencies JAR files that we need on EDP. Due to that, the JAR were also brought to the image. Those are:

  • aws-java-sdk-bundle-1.11.901.jar
  • hadoop-aws-3.3.1.jar
  • wildfly-openssl-1.0.7.Final.jar

Results

When we first came up with the idea of upgrading to Spark 3, we were looking for bug fixes, new features and security improvements. However, we also wanted to verify improvements in two areas:

  1. Runtime performance
  2. Stuck Pods Resolution

For number 1, we were not expecting any improvements on this first phase. We were not enabling performance improvement features, which are the breakthrough changes of the Spark 3. Also, they get enabled by default on Spark 3.2.1 and on. We decided to not turn it on now and first observe the Spark 3 raw in production for some time. Only some pipelines have the Adaptive Query Execution enabled for experimenting.

About number 2, there is an issue that happens on Spark on Kubernetes where Spark is not able to finish itself. So even though the container execution reaches the end, the K8s pod remains in Running state. This is bad because our scheduler, the Airflow, relies on the pod state to take actions. The Spark 3 showed the same issue that Spark 2 already presented.

Pipeline runs results

In development, during a week, there is a total of 733 runs. Because of the AB Tests approach, we could compare between the versions. That was how we identified that no runtime performance degradation or improvement occurred.

After that, in one month, a total of 9,712 runs occurred in the EDP production and all of them were through Spark 3.1.3. The success rate of those is 98,29% and none of the failures were related to the Spark version.

Vulnerabilities

To check the vulnerabilities in the image I have used the Anchore Engine. The number of vulnerabilities in the image has decreased after the upgrade. The image has reached 17% less vulnerabilities. Here is one example of vulnerability that was fixed.

Conclusion

The key takeaways of the upgrade process are:

Test tooling matters

A solid set of test tooling increase the confidence in the process. It is important having different kinds of tests. Unit, integration, smoke, regression, stress and so on. The more the better. This is a truth in software development in general. But, in data engineering, we have to be mindful that we handle data objects and those artifacts also need test. The data cannot get affected by a platform change.

Without a trustworthy data test platform the upgrade would be a pandora box. We would never know what data pipeline would brake if we didn’t test them all.

Stick to the plan

Sometimes it is frustrating and it seems that we have reached the end of the line. Where all possibilities went trough exhaustion, running out of ideas. One example of that was when the flag of datetimeRebaseMode was not reflecting the expected, even though that was the solution suggested by the Apache Spark Community. It is not an easy decision creating a custom image of a solid code base given current version didn’t fit our needs. We could have given up and said that “Our platform does not conform with the requirements”. But, in our minds, this would mean failure. How come only EDP is not able to upgrade? This was not acceptable.

It is important to have clarity on the procedures. In a complex problem you should always try to break it into smaller problems. Change one variable at a time to narrow down where the root cause of the issue may be. Establishing a clear procedure helps avoiding the feeling of “running in circles”.

Next steps

The next steps of this continuous works is the migration to the Spark 3.3.0, which released on 16th June 2022. The highlight of this release is the log4j version migration from 1 to 2 to account for the famous security breach of log4j. Other than that, performance improvements and feature enablements are also a path to follow after this present work.

References:

--

--