REST API Data Ingestion with PySpark

Putting executors to work.

Eduardo Senior
6 min readOct 5, 2023

We’ve all done it.

If you’re writing a PySpark application and you are trying to consume data from a REST API like this:

import requests
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("REST_API_with_PySpark").getOrCreate()

base_url = "https://api.example.com/data"
query_limit = 500
total_records = 2000

data = []
for offset in range(0, total_records, query_limit):
url = f"{base_url}?limit={query_limit}&offset={offset}"
response = requests.get(url)
if response.status_code == 200:
data.extend(response.json())

# Load DataFrame

# Transformations
...
🐐

There’s a better way.

This approach may be okay for initial testing, but it lacks scalability. The problem with this code is that it’s pure Python, and as a result will run solely on the driver. Spark is all about parallel execution 🚀, it’s got no time for that kind of single-threaded nonsense.

Single-threaded nonsense

To leverage Spark’s distributed processing power, you need to operate on distributed data structures like RDDs or DataFrames. By applying transformations and actions on these data structures, we enable Spark to distribute tasks across multiple executors and process data in parallel. Only then are we taking advantage of the computing resources of the entire cluster, allowing for significant improvements in performance.

Give this a try instead:

import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, explode, lit
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, IntegerType

spark = SparkSession.builder.appName("REST_API_with_PySpark_DF").getOrCreate()

schema = StructType([
StructField("field1", StringType(), True),
StructField("field2", IntegerType(), True),
# ... Define other fields based on the API's response
])

@udf(returnType=ArrayType(schema))
def fetch_data(offset: int, limit: int):
endpoint = "https://api.example.com/data"
params = {
"offset": offset,
"limit": limit
}
response = requests.get(endpoint, params=params)
return response.json() # assuming API returns a list of records

total_records = requests.get("https://api.example.com/data", params={"offset": 0, "limit": 1}).json().get('total', 0)
records_per_page = 500

offsets_df = spark.range(0, total_records, records_per_page).select(col("id").alias("offset"), lit(records_per_page).alias("limit"))
response_df = offsets_df.withColumn("response", fetch_data("offset", "limit"))
results_df = response_df.select(explode("response"))

Our latest code has two notable additions: 1) a user-defined function (UDF) named fetch_data, and 2) a schema, creatively named schema.

UDFs

In Spark, UDFs are custom functions authored by users to address specific needs not covered by Spark’s built-in functions (like sending HTTP requests). Put simply, they are functions that operate on the data, row by row, and can take one or more columns as input.¹

For our purposes, you can think of each record in the DataFrame as representing the configuration for a single API call. The UDF takes the different combinations of offset and limit values available in each row and “configures” a request. Using the withColumn transformation in conjunction with our fetch_data function, we create a new results_df DataFrame and store the response of each API call within a new field called “response”.

So, how does this improve upon our original code? At first glance, it might seem like we are still making an API call for each offset. And, indeed, we are. The difference, of course, is that we are no longer doing so sequentially. By turning our offset list into a DataFrame and encapsulating our logic within a UDF, we create an object that can be partitioned across an army of executors, making concurrent operations across the cluster possible.

No nonsense here.

Have expectations.

When working with UDFs that interact with structured data sources, it’s important to define a schema for the expected response. A predefined schema ensures that the data retrieved matches our expectations in terms of structure and data types. This not only safeguards data integrity and quality but also optimizes performance, as Spark can process data more efficiently when the structure is known in advance.²

When setting up the response schema for your specific use case, look for clues in the API provider’s documentation. Here’s an example of what the response for a GET /users API call might look like:

[
{
"name": "Jack",
"age": 30,
"isActive": true
},
{
"name": "Fred",
"age": 20,
"isActive": true
}
...
]

Based on this we would expect and array of objects, each object having three attributes: name, age, and isActive. Our schema would map these to Spark types as such:

schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("isActive", BooleanType(), True),
# ... Define other fields based on the API's response
])

A note on performance.

It’s important to point out that when you use a Python UDF with DataFrames, we incur the overhead cost of transferring data back and forth between the JVM and the Python process (Py4J). This serialization and deserialization can impact performance.³

That being said, even with this overhead, the higher-level optimizations provided by Catalyst and Tungsten (Spark’s internal optimization engines) can still make DataFrame operations faster than equivalent RDD operations in many cases (assuming you continue to work with the data after bringing it into memory).

If performance is absolutely paramount, consider writing your functions in Scala or Java. These functions will run within the JVM, eliminating the serialization/deserialization overhead and boosting overall execution efficiency.⁴

Pagination problems.

We’ve made really good progress, but the truth is that offset / limit strategies for pagination are on their way out: they’re just not the (optimal) tool of choice for API designers anymore. While they lend themselves nicely for parallelization, they come with a multitude of drawbacks, and have thus declined in popularity in favor of other, more robust methods.⁵ Modern APIs will instead favor the use of keyset pagination, and more specifically, cursors, for managing the retrieval of large datasets.⁶

In short, cursor-based pagination uses tokens that represent a unique record and its position in a sorted list of records. When a client makes a request, the server responds with a subset of records and a token pointing to the next record in the set. The client can then use this token as the cursor in subsequent requests, continuing this process until all desired data has been retrieved.

And therein lies the rub. By providing us with pagination values only after our initial request is complete, providers have effectively stripped us of concurrency, forcing us again into a sequential back-and-forth exchange. We will explore this topic in more detail in a follow-up post and provide a set of guidelines for dealing with these new constraints. While admittedly more involved, it is absolutely possible.

Thanks for reading.

P.S. Be a good neighbor.

A quick note on rate limits, proper etiquette… Make sure to enable the right level of parallelization, and let those processes paginate in the traditional sense (for loop within their own executor processes). Consider using bulk API when possible for initial seeding (REST API for incremental). Always remember that when dealing with external systems, the principle of being a “good citizen” applies: don’t overwhelm the system, handle errors gracefully, and respect the rules of the game laid out by the API provider.

References

  1. Bill Chambers and Matei Zahari. Spark: The Definitive Guide, O’Reilly, Sebastopol, CA, 2018, p. 111–113.
  2. Chambers and Zahari, p. 60.
  3. Cristofol Torrens. “Avoiding UDFs in Python.” Damavis Blog, 26 May 2021, https://blog.damavis.com/en/avoiding-udfs-in-apache-spark/.
  4. Chambers and Zahari, p. 113.
  5. Michael Hahn. “Evolving API Pagination at Slack.” Slack Engineering, 2017, slack.engineering/evolving-api-pagination-at-slack/.
  6. Drew Martin. “Pagination with Relative Cursors.” Shopify Engineering, 12 August 2019, https://shopify.engineering/pagination-relative-cursors.

--

--