Data driven enterprise applications with Apache Spark

Nils Müller-Sheffer
NEW IT Engineering
Published in
10 min readFeb 22, 2018

Introduction

This article is going to talk about implementing a credit risk engine on an Apache Hadoop / Spark Stack from a technical perspective. The learnings and takeaways will apply for any corporate environment with a large ecosystem of relational data stores that need to be aggregated and metrics computed on that data.

If you are a technical architect, developer or project lead evaluating to introduce Spark SQL for ETL and batch workloads or you want to migrate traditional batch-processing (for example Spring Batch) as Spark jobs — this is for you.

After reading this you should take away:

  • the difficulties and challenges faced when introducing this technology stack in the team or organization for the first time
  • a few hands-on recommendations for implementing Spark-SQL-based business logic components

After some context in “Setting the scene”, we will go over 5 learnings that we have had in “Five Fallacies of the early days”, and then summarize 6 recommendations which are hopefully useful for anyone on a similar journey.

Setting the scene

The objective we’ve had in this project was to implement a batch-oriented on-premises application that computes credit risk scores, following the so called “Basel” standards. Financial Service institutes and banks in particular are subject to industry regulation. The regulators require banks to report credit risk exposure on a monthly basis and in addition they are obliged to stress test the portfolio, for example by assuming downgrades in country or corporate ratings or exerting any other stresses resulting in higher demands on capital allocation. Basel II/III capital adequacy rules for banking institutions and the measurement techniques called “Standardized Approach” and “Advanced Internal Rating Based Approach” (IRB) are some key terms in this context.

What this means at the end of the day: for every loan in the bank’s portfolio — in our case about 4 Million loans — credit risk scores are computed again and again, using different approaches and parametrizations. That is why banks implement and maintain sophisticated systems for handling this task, and that is where our team stepped in to help do exactly this.

Strong functional architecture guidelines

At the beginning of our journey was a semi-formal functional specification written by a business consulting team. It basically translates the Basel legal and regulatory requirements into modular steps and processes. An individual step has input and output parameters and at first glance looks like a method or function. Heaven and bliss, we thought, let’s start programming! But wait — here is the catch. From whatever input parameters are passed into a particular step, the business logic inside the step can still “navigate” to any other point in the data model and even fetch additional data. In other words: by looking at the input parameters specified you cannot infer the data actually consumed by the step.

At the same time, there was strong emphasis on a solution where the technical implementation follows modularity concepts defined by functional architecture. Expected benefits are to allow for easy traceability, maintainability and extensibility of the application.

So here we were: a team of skilled Java developers but having little prior knowledge of Apache Spark. Most of us coming from online applications, object oriented design principles or traditional batch processing as exemplified by frameworks like Spring Batch. We were faced with the challenge of:

  • Transitioning from OO design principles to more functional style, data driven programming
  • Finding ways to re-interpret the functional spec to balance modularity constraints with implementation and performance constraints
  • Learning a complex big data processing framework and ecosystem (Apache Spark, HDFS, YARN)

Five Fallacies of the early days

[1/5] We don’t need the full Hadoop stack!

Based on some early proof-of-concepts we had concluded that simply using Spark by itself is “good enough”. We knew if we treated Spark as “just software”, it would be much less disruptive for the ops team in the organization. To accomplish that, we wanted to run Spark in local mode on a single “big” machine. We would be tied to this machine per batch execution. We expected one big machine to be sufficient for our purposes, but — what if not? This would severely limit the scalability of the system. Also, we found that a single huge executor usually cannot use the resources efficiently. Another point of debate was limited availability or restartability of the setup: Processing would always have to start from scratch in case of failures.

Spark standalone mode to the rescue? Not quite. Without other services from the Hadoop ecosystem we would not be able to fully achieve our non-functional requirements. For example we needed HA for the master node, which is taken care of by Zookeeper / YARN in Hadoop based setups. We wanted to optimize performance and scalability through data distribution & data locality provided by HDFS. The flexible resource scheduling capabilities enabled by YARN were very useful when experimenting with different number and individual sizing of nodes in our scaling and performance tests.

We also concluded that assembling open source Hadoop services and securing the cluster ourselves would simply be too time consuming and error prone. Since vendor support for a complex system like a Hadoop cluster was essential, we opted for a commercial Hadoop distribution in the end.

[2/5] Program to interfaces and leave implementation as “pure Java”¹!

The Tao of object oriented design and supported through application frameworks like Spring is: “Use POJO implementations and do not tie your core business logic implementation to execution framework APIs”. We gradually realized that “Spark inside!” — is the better slogan when it comes to implementing this type of use case. We started sending coarse grained sub-processes into map()-functions. However, because individual steps had complex data requirements this led to a very large aggregate dataset in order to satisfy all data requirements of all steps within the sub-process. This was a dead end.

To overcome this, we decomposed the steps and sub-processes and optimized for fetching less complex datasets and applying smaller and smaller pieces of business logic to them². Also, we started applying Spark APIs “on the inside” of our business logic. Otherwise the Spark optimizer cannot work its magic and building for-loops over collections to compute a derivative value on each item is just plain silly, when you have a powerful functional data processing framework at your disposal.

[3/5] No need for optimized data models when using Spark!

In an enterprise environment, where data comes from warehouses and other relational data stores, you inevitably start out with a highly normalized source data model. In the world of traditional batch processing (i.e. using Spring Batch) you would first try to de-normalize and pre-process the data in order to get a mostly flat layout best suited for batch processing.

After seeing stellar performance with Spark when joining a few tables (Parquet files) on a local machine in our first experiments, we felt, maybe we can just use the data model as-is. But once development advanced, we were in a cluster setup, and joins became numerous and more complex, this changed. Joins again became expensive, up to the point where execution would just abort with out-of-memory exceptions. It is also much harder to reason about the system and the execution plan, when complex joins are involved. Spark needs to start a lot of cluster communication (broadcast, shuffle) and send data to the nodes processing it.

So, it still holds true: de-normalization and redundancy are your friends in batch processing. Just to be clear: we still join datasets and sometimes these joins are still large and complex. There is nothing wrong with that per se and in a complex application you cannot avoid it. But we have learned to analyze and optimize this judiciously by closely monitoring execution plan and amount of data shuffled in the Spark UI.

[4/5] Data always complies to design!

When a business analyst describes an algorithm for calculating an attribute, it is assumed that data is strictly compliant to the functional data model that was defined beforehand. The reality is: data is messy. There are incomplete records, where mandatory fields, assumed to be not-nullable, are in fact NULL. We even encountered cases where cardinality rules were violated. Where only 1:1 relationships should exist, we found cases with 1:n. Oops. As a consequence, we faced crashing drivers, out-of-memory exceptions and overflowing exception logs that quickly filled up available disk space.

We used a few techniques to overcome this hurdle:

  1. We applied pre-processing routines and flagged records that are corrupt. We then filter these records before Spark based calculation process even starts. Limitation here is: Only known error patterns are caught in this way. You must have seen the error pattern before and encoded it into the pre-processing routines.
  2. Filter conditions must provide a no-throw-guarantee implementation and therefore you often need to decide on a default value.
  3. We developed a custom error method with built-in exception handling as last line of defense for exceptions inside of closures which is finally wrapped by a flat map call. Otherwise the whole process will terminate

[5/5] A reference portfolio is great for testing!

We started working on small reference test portfolios (about 0,1% of production volume) but once we had completed build and initial functional tests passed, we were in for an unpleasant surprise. On the full portfolio execution crashed or was painfully slow for reasons already elaborated: either joins and resulting datasets grew too large on actual volume of data and we needed to optimize and refactor from this angle, or data quality was much more varied in the large dataset and many “bad records” gave us headaches.

We quickly adopted an approach where development datasets where at least about 5% of production volume, in order to balance rapid execution and turn-around in development with some initial insights about performance and memory consumption. Additionally, we introduced — even before testing for functional correctness — test runs on production sized datasets to verify the application could process the volume within somewhat realistic time and memory constraints. Also, issues with data quality could be caught early this way. For the later, it was also crucial to work with data snapshots from different points in time, to expose the application to the greatest possible variety of data quality problems and be sure we can handle or work around them.

For this we also needed to make sure our developers could run spark-submits from their local workstation against our dev Spark Cluster. This way, running, optimizing and re-running without going through a release and deployment cycle for the application becomes possible. This is not a given in an enterprise and especially financial services environment where very strict rules around access to environments and data is enforced. We could work around this because datasets did not contain personal data to begin with and other restricted data was masked, before it was even loaded into our dev/test environments.

Wrap-up and recommendations

To conclude we have compiled these 6 actionable recommendations to help you build a well-designed application with Apache Spark.

#1 Prepare your data

Use multiple, simple data models — adapt your data through data virtualization abstraction³ depending on the sequence and needs of your processing logic.

  • Decouple from data sources by exporting the data into Apache Parquet or ORC (HDFS) and de-normalize your datasets by pre-joining your data.
  • Analyze data flows and requirements continuously and transform your existing datasets to fit your calculation needs.
  • Set the value for one attribute / column in exactly one place. Embrace the immutability of datasets by adding values at one central point.
  • Explode nested data at into additional columns or simply add copies of the record as additional rows.

#2 Use large datasets

In our experience is it crucial to work with meaningfully sized datasets in the development cycle, in order to catch problems and tune performance.

  • Create a reduced and representative dataset from your production data (about 5% of expected volume)
  • Expose application to actual production volumes before functional tests
  • Work with a proper development cluster from the start
  • Have automated mass data integration tests in place to catch regressions

#3 Use Spark APIs on the inside

Spark can only optimize your computations if you are using its API inside the “business logic”.

  • Technology independence is a myth.
  • Apply functional purity pattern: avoid side-effects and don’t depend on any external mutable state.
  • Verify design ideas and assumptions through prototypes.

#4 Truncate the linage graph, if necessary

Your computation breaks if DAG gets too complex.

  • Use local and/or reliable check pointing to truncate the lineage graph
  • Use Dataframes API, as Lambdas inside Datasets cannot be optimized

#5 Invest in formal and informal training

  • Setup „know your data” sessions where a business analyst takes the development team through a dataset and explains its origin, characteristics and potential anomalies.
  • Formalize pair programming and peer review, to encourage sharing of ideas and knowledge in the team.
  • Bring in experts from time-to-time to coach and provide guidance on important design decisions.
  • Include refactorings into each individual sprint.

#6 Work with ops teams very closely

  • Hadoop setup likely won’t fit into the existing standards. Still, try using your Hadoop distribution’s defaults (i.e. Kerberos), otherwise it can be very painful (non-recoverable states of Ambari, etc.)
  • Bring people together through workshops or offsites, especially when everyone inside the organization is doing this for the first time.
  • Consider cloud based Hadoop / Spark as-a-Service offerings

Co-authors:

Christian Quanz and Christian Ehmke are Technology Architects based in Frankfurt and Düsseldorf. Both are co-authors of this article and key members of the technical team working on this project.

Disclaimer:

My postings reflect my own views and do not necessarily represent the views of my employer.

Footnotes:

[1] Scala API was not used due to guidelines of the client organization and lack of prior knowledge in the developer team.

[2] If you are not familiar with functional style programming and data processing this might sound strange. But in this paradigm functions are sent “to the data” and processed as close to the data as possible. You can think of a stored procedure as an equivalent concept. Whereas in CRUD style applications data is selected from the database, sent over the network to the business layer, processed, and results are again returned to DB layer, in an analytics platform *code* is serialized, shipped over the network to a worker node, where processing takes place locally, on a partition of the overall data. Sap HANA and similar analytics systems work in much the same way.

[3] See https://en.wikipedia.org/wiki/Data_virtualization

--

--

Nils Müller-Sheffer
NEW IT Engineering

Innovator and Tech-Lead working in custom software development @AccentureDACH. I help organizations drive their digital transformation. Also find me @_NilsMS