Effective data enrichment in Spark jobs from external API

Henadz Varantsou
Flo Health UK
Published in
13 min readNov 18, 2022

Usually, when you work with Spark, data processing flow is quite straightforward. Dataframes are processed by using straight transformations like .map(), .filter(), or .aggregate(). They also could be smashed together by using complex transformations like .join() or .union(). But sometimes there are unusual cases where in the middle of the data pipeline you need to get an additional portion of data or send some data from a dataframe somewhere to be processed or stored. Here are some examples of these cases:

  • Enriching a dataframe with some data gathered from external API
  • Reading/writing data from some database that doesn’t have a usual Spark connector
  • Creating files as progress markers in some object storage (e.g., AWS S3)
  • Sending a command event to a message broker

All of these operations share one thing in common: from a data processing perspective, they are all synchronous and blocking because you cannot proceed to the next step until all data manipulations are done on the current step. So the first challenge is how to organize such operations properly in Spark code, from both clean code and performance perspectives. The second challenge is that Spark has its own control system of parallel data processing, which takes into account all resources available for the job and tries to utilize them in the most efficient way. But if we want to exchange data with some other external system, we need to consider the limits of this system (e.g., connections, input/output data size, rate). Then, in order to reach the best performance, we need to construct communication in such a way that we don’t exceed the limits but are quite close to them. In this article, I would like to share with you an example of such communication between a Spark job and an external API, focusing on pitfalls you might encounter and how to avoid them.

Example description

In the following image, you can see the high-level description of the system used as an example in this article:

The main entities here are the weather processing job (WPJ) and the rain forecaster service (RFS). The weather processing job, as the name suggests, processes input weather data stored in CSV format (downloaded from here). The rain forecaster service exposes the endpoint, which takes temperature, humidity, and pressure from the request body and sends back the probability of rain. So the task of WPJ is to get temperature, humidity, and pressure from each row, send it to RFS, get the probability of rain in return, and insert it in the original dataframe as a new column. It’s as simple as that. Also, you can see the Prometheus service on the diagram, which collects useful metrics from RFS like received_requests and currently_processing_requests, which will help us to track the RFS request processing workload.

As for deployment setup, RFS and Prometheus run in a Docker container. Whereas for WPJ, the whole standalone Spark cluster is provided, which also runs in Docker containers. I decided to set up the whole Spark cluster here because we will need multiple executors (at least two) for WPJ to demonstrate how it will influence the parallelism of communication with RFS.

In order to implement HTTP communication and concurrent request processing in WPJ and RFS, I decided to use the Cats Effect library because I have more experience with it. You are free to choose any alternative (e.g., built-in Scala Futures, ZIO, or anything else you prefer). Here is the full list of libraries with versions I used:

  • Scala 2.12.14
  • Spark 3.3.0
  • Cats 2.7.0
  • Cats Effect 3.3.12
  • Http4s 1.0.0-M30
  • Circe 0.14.2

The code for both WPJ and RFS, as well as Docker container configuration and running scripts, are available in this GitHub repository.

First approach, the most straightforward one

Let’s start with the rain forecaster service. Inside it, there is a class called Predictor, which is responsible for, obviously enough, making predictions. Here is its code:

It actually generates predictions randomly from the range [0, 1], but for our purpose, it’s more than fine. Also, you can see that it sleeps for some random time between minimumDelay and maximumDelay milliseconds to simulate long-running calculations. By default, minimumDelay is equal to 200 ms, and maximumDelay is equal to 800 ms. Such values will help to represent RFS processing metrics in a more clear way.

Move on to the endpoint logic:

It starts with the increment of receivedRequests metric. Next comes the request parsing. Then the result of parsing is passed to method .predict() of the aforementioned predictor object. After the prediction result is returned, the successfullyProcessedRequests counter is incremented by 1, and then the prediction result is transformed into JSON format and sent back.

The server setup is simple as well.

The endpoint for providing metrics to the Prometheus is exposed on the port 8086, whereas the main endpoint that makes rain predictions is exposed on the port 8085.

Now let’s consider WPJ implementation. The most basic class is the WeatherMeasurement case class, which is just a container for all fields related to one weather measurement.

There is also an EnrichedWeatherMeasurement case class which contains the same fields plus the probability of rain packed inside of Option(to store Nonein case there was an error during requesting rain probability).

The next important class is PredictionProvider, whose task is to manage HTTP request sending and response processing.

As you can see, it sequentially iterates over all provided measurements. For each of them, it constructs a request with a JSON body (temperature, humidity, and pressure, as you may remember) and then sends this request to RFS using http4s client. If prediction is successful, the result is packed into Some and added to all other fields of original measurement. If prediction fails for some reason, the probability of rain is set to None.

Let’s not forget about the classic “Serializable” issue in Spark jobs. The thing is that any method used in Spark transformation operations (like .map(), .mapPartitions(), .filter(), etc.) must be serializable, as well as the object containing it and all objects referred to by this method. Because of these restrictions, we cannot call .processRows() directly, and in order to overcome it, here is an additional object called PredictionProviderWrapper:

Inside this method both httpClient and predictionProvider are created and used.

And finally, we consider our main Spark processing pipeline, which looks as follows:

The CSV file with weather history is read with proper schema. Then, we limit the number of rows to be processed to 200 (for the current step, that’s enough) and call PredictionProviderWrapper.processRows() for each partition in this dataframe to gather rain predictions for each row. After enrichment is done, we calculate the count of successful predictions and its average to make sure that everything is fine. Aggregation results are printed to the console as a one-line table. As for job configuration, we’ll use two executors, 1 core and 512 MB for each.

Now let’s run these versions of RFS and WPJ and see what happens. The job executes successfully, and its output seems to be fine:

And, for the RFS, we can check the number of concurrently processed requests and processing rate at each moment in time. It can be done through Prometheus by using the following expressions:

received_requests_total — successfully_processed_requests_total
rate(successfully_processed_requests_total[10s])

Prometheus UI is available on port 9090. There we can find the following RFS processing results:

As you can see, the processing rate is around 2 rows per second. And during the considered period of time, RFS was processing approximately one request at a time. Why do we see such a low level of parallelism here? Well, the answer lies in this line of code from PredictionProvider in WPJ:

It sequentially iterates over measurements within the partition and tries to enrich them, which means it won’t send the next request until the previous one is done. Of course, intuition says that overall throughput might be way better, so let’s find out how to reach that.

So let’s parallelize!

Instead of sequential iteration over measurements, we’re going to break bad and increase measurement processing parallelism on WPJ to the max. Fortunately, not much needs to be changed:

Yep, only the method name changed, plus a couple of additional imports and implicits were added to make it work. Also, since this time we expect to see significantly higher throughput, let’s limit the number of rows to be processed to 10,000.

So let’s run this setup and see how it goes. WPJ executed successfully, but looking at the result in the console output, we can definitely say that something is wrong:

Metric charts look like this:

Where are our 10,000 predictions? Why do we have only 512? And besides these strange results, there is also the following logging output on each executor:

The answer is that we have only a small portion of predictions because the rest of them failed with the errors related to the HttpClient restrictions. The method which we are using for sending HTTP requests (.expectOptionOr()) uses a fixed size data structure under the hood to handle those requests. New requests are added to the wait queue until it’s overloaded, and then each new request just fails with error because there is no place for it. Thus, maximum parallelism wasn’t quite a good idea in this case, and in order to proceed, we need to limit the level of parallelism properly.

Ok, let’s parallelize but not that much

There is a convenient method available in Cats Effect library which looks like this:

It’s almost identical to the .parTraverse() but with the additional parameter of parallelism level, exactly what we need. If we use it, instead of an unlimited amount of concurrent request tasks, we will have at most 50 of them simultaneously, and HttpClient won’t be overloaded. Executing WPJ with this code confirms that:

All 10,000 predictions are in their place now. And RFS was processing up to 50 requests simultaneously instead of one as it was before:

Also we see that processing rate is around 100 rows per second:

It’s a way better result, and now, WPJ is able to process its dataset significantly faster, of course, considering that RFS is able to keep up with this pace. It already looks like an acceptable solution for our case, but there is another significant dimension of parallelism that we have to consider.

Scaling up partitions

As you probably already know, Spark uses its own way to parallelize data processing. Roughly speaking, the whole processing pipeline described by Spark code is split into stages by shuffle operations (.join(), .sort(), etc.). At the same time, the dataframe at any point in the pipeline is split into partitions, proper chunks of data that can be handled in parallel. Thus, a bunch of transformations within one stage (from one shuffle to the other shuffle) for one partition form a task in terms of Spark. It can be thought of as an indivisible unit of data processing. How these tasks are going to be processed depends on the number of available computing resources for the Spark job. There are three configuration parameters that determine concurrent execution strategy:

  • Number of executors (spark.executor.instances)
  • Number of cores per executor (spark.executor.cores)
  • Number of cores per task (spark.task.cpus)

If we consider that executorCores is multiple of taskCores, then executorNum multiplied by executorCores and divided by taskCores produces the amount of so-called “execution slots”. At any moment in time, one execution slot can process no more than one task. Taking all the aforementioned into consideration, for our case, we can derive the following formulas, which calculate the minimum number of partitions required to fill in all execution slots and desired parallelism within each partition:

As we can see, although WPJ had two executors in the previous attempts, it didn’t influence the overall parallelism level because the measurement dataframe had only one partition, which means only one task to be processed on two execution slots. So we can easily achieve double parallelism by just adding .repartition(2) right before .mapPartitions() transformation. But instead of that, let’s build on the total parallelism level we need and construct a more flexible and configurable code:

Just for our case, target parallelism is set to 100. executorNum, executorCores and taskCores are extracted from Spark configuration in order to calculate executionSlots, which is essentially the same as minNumberOfPartitions. maxParallelismPerPartition is calculated right inside PredictionProvider, that’s why we pass targetParallelism and shareCount (just renamed executionSlots) to the .processRows() method. And inside PredictionProvider we just do this:

Also in order to use minNumberOfPartitions properly, I added an extension method for Dataset, which checks the current number of partitions and does repartition only in cases where this number is less than the desired number of partitions. By running WPJ with these improvements, we get the following results:

predictionCount and averageProbabilityOfRain look fine, as well as the amount of concurrently processed requests and processing rate on the RFS side.

So far it might look like a final production-ready solution but there is still one more specific case to be considered.

Treacherous rate limit

Usually, services with an exposed API set a rate limit on their endpoints to prevent overloading by incoming requests (intentional or unintentional). If they receive more requests per second than their limit states, they will just decline the rest with an error in response. The same can be valid for other external services like databases or message queues. So if you plan to communicate with some external service, you must consider the rate limit in your code. Unfortunately, our previous parallelism restriction won’t help to guarantee that the rate limit won’t be reached because it strongly depends on the request processing time. For example, let’s consider an API with a rate limit of 100 requests per second. We will call it from our Spark job with parallelism set to 10. We see that on average, requests are handled longer than 100 ms, and there are no errors. But at some point, someone optimized the service, and requests began to be processed in 50 ms on average. Suddenly, we observe that half of our requests are failing even though we didn’t change anything in the job. So let’s find out how to protect against this.

First of all, a rate limit needs to be set up in RFS. We can achieve that just by wrapping the Router object with the utility method:

The content of this method is straightforward. If the rate limit value is provided in a configuration file, httpApp will be wrapped by the Throttle object, available in the http4s library.

Let’s assume a realistic rate limit to experiment with: 10 requests per second. If we set up RFS with this limit, change nothing in WPJ, and run the testing, we will definitely see this picture:

Obviously, most requests will fail because our job hits RFS with 200 requests per second.

In order to adjust WPJ and guarantee that we won’t exceed the limit, we can use stream abstraction. In a Cats Effect stack, there are streams from the fs2 library that I’m going to use, but for other stacks, the same abstraction is also available. So, the logic is as follows: stream emits one measurement each 110 ms (a bit more than 100 ms to make sure that we are not exceeding the limit) multiplied by shareCount (to split rate limit between partitions), and for each measurement, the request is asynchronously sent to RFS so they are not blocking each other. Besides that, we also limit the number of parallel requests by our previous parallelism value. Yep, it still makes sense to keep this limit because we might hit that when request processing time becomes too high. In general, it looks like this:

Also, let’s change the number of rows to be processed to 1,000 so as not to wait too long for the results. And if we run the testing with these adjustments, we can see that it helped:

All requests were processed successfully, and RFS worked at its maximum performance without exceeding the rate limit.

Conclusion

Each time you develop Spark job interactions with external systems, take care with how you communicate with them. Besides the aforementioned restrictions (concurrent connections, rate limit), the system might have its own specific limitations (e.g., request body size) that you have to consider and formalize in the code. In general, Spark jobs are huge load generators because of the amount of resources they use, so try your best to sync Spark job power with the capabilities of the services it uses. Also keep in mind that services usually have other clients, so it’s better to organize your Spark jobs in such a way that they won’t affect them too much.

--

--