Serve Many Forecasting Models with Databricks Model Serving at Once.

Anastasia Prokaieva
15 min readMay 16, 2023

--

Photo by Robynne Hu on Unsplash

Time Series Forecasting is a vital area of machine learning that has become increasingly important in today’s data-driven world. It involves the use of statistical models to analyze and predict patterns in time series data, which can then be used to make informed decisions and develop effective strategies. Time Series Forecasting has a wide range of applications, including finance, healthcare, energy, retail, weather forecasting, and many others.

However, despite its importance and potential benefits, putting a Time Series Forecasting model into production in real-time can still be a complex and challenging task. This is due to a range of factors, such as the need for accurate and reliable data; the need for robust and scalable infrastructure, the need for effective model evaluation and tuning; and the need for ongoing monitoring and maintenance. Additionally, different domains and industries may have their own unique challenges and requirements, which can further complicate the process of deploying a Time Series Forecasting model in production.

Serving a machine learning model in real-time typically involves deploying the model as an API (Application Programming Interface) that can be accessed by other applications or services. There are several ways to serve a machine learning model in real-time, depending on the specific requirements and constraints of the system:

  1. Containerization
  2. Serverless Functions
  3. Microservices Architecture
  4. In-Memory Data Grids
  5. API Gateway

Ultimately, the choice of approach will depend on factors such as the size and complexity of the model, the expected workload, and the available resources and infrastructure. It’s important to consider factors like security, reliability, and scalability when designing and deploying a real-time machine learning system.

Recently Databricks announced General Availability for their managed Model Serving Endpoints. This is a managed service that creates for you concurrent endpoints. Even though it’s fully managed, you still have a lot of flexibility around, such as adding your custom code, libraries, files, etc to be served by the underlying service.

Databricks Model Serving brings a highly available, low-latency and serverless service for deploying models behind an API. You no longer have to deal with the hassle and burden of managing a scalable infrastructure. It’s a fully managed service that takes care of all the heavy lifting for you, eliminating the need to manage instances, maintain version compatibility, and patch versions. Endpoints automatically scale up or down to meet demand changes, saving infrastructure costs while optimizing latency performance.

https://www.databricks.com/blog/2023/03/07/announcing-general-availability-databricks-model-serving.html

It’s straightforward to serve a classic model on Databricks Model Serving. In a few words in order to serve a classic SkLearn model you would require the following:

  • fit your SkLearn model and log it under MLFlow on the Databricks Lakehouse platform
  • log all necessary dependencies into conda.yml under MLFlow artifact
  • register your model under MLFlow Registry
  • enable Databricks Model Serving Endpoint with the UI or API

You are set to go :)

Simple but …

I have been asked to serve a more complex solution such as forecasting in real-time for many models at the same time. In the beginning, I thought this would take me no longer than a classic use case. The more I was working on it the more I was realizing that this was not as trivial as I’ve been thinking initially. And here we are!

So, let’s try to do it together — serve many forecasting models under 1 endpoint in real-time on Databricks Serving Endpoint.

Here is the context that you will be able to cover while reading this blog:

  1. How to train many independent forecasting models on top of Delta table using PandasUDF of Apache Spark.
  2. How to track each model using MLFlow and Delta tables.
  3. How to serialize all your models under a single MLFlow object.
  4. How to prepare your input data payload to be consumed by a single Endpoint.

Before you begin to read, please keep in mind I am assuming you have knowledge of MLFlow and you are familiar with concepts of tracking, logging, model registry etc. If it’s the first time you hear about MLFlow you may want to check the main Documentation page or check this simple End-2-End demo example created by the Databricks team or “Introducing MLflow for End-to-End Machine Learning on Databricks” recording from the 2020 Spark and AI Summit.

Part 1 — Is this really that hard?

Photo by Yosh Ginsu on Unsplash

For this blog post, I am going to use a well-known dataset available on the Kaggle Page for forecasting competition — Rossmann Dataset. It’s a perfect dataset because it contains more than 1000 individual drug stores across Germany, where you require to create forecasting of sales. In this case, we would like to create so-called local model forecasting — per store.

Store sales are influenced by many factors, including promotions, competition, school and state holidays, seasonality, and locality. With thousands of individual managers predicting sales based on their unique circumstances, the accuracy of results can be quite varied.

Example of the data subset

First of all, let’s talk about why would classic Pythonic methods lead to a long execution and you would better avoid it in this case. If you require forecast more than 1 independent model, probably you would do the following:

def mocking_fct_model(df:pd.DataFrame)->pd.DataFrame: 
"""
Please pay attention this is a mocking function,
most of the code is not provided for simplicity reasons.
"""
fct_model.fit()
predictions = fct_model.predict()
return predictions

stores = df["stores"].values
predictions_dict = {f"{store_id}": mocking_fct_model(df["stores"]==str(store)) for store_id in stores}

A code example from above may seem completely fine for most data scientists or analysts. Meanwhile, this is not the most optimal way when you require to scale your solution for a thousand models.

Let’s talk about time

Let’s pretend for a minute that your business requires more granular forecasting, your business team has come to you and asked to create forecasting not just per store but also per state(you have 10 states). This increased the number of models you have to train from 1000 to 10000. Next week your business team came back and told you that they require you to create a forecasting model for more than 1 horizon, they require you to have forecasted for the next 10 weeks on a daily basis — that’s just increased the number of your models up to 100000!!!

It’s quite simple to estimate a benchmark time how long does this take to train 100K models following a classic Pythonic approach, saying that your fit/predict takes around 5-10 sec to execute?

# ***** 
# BENCHMARK
# *****
import time
def benchmark_calc(store_id):
# This is assuming you have a realy simple model.
# More complex your model, longer it takes to train.
# On average models like Prophet will take 10 sec.
time.sleep(5)
return 1

benchamark_dict = {f"store_id": benchmark_calc(store_id) for store_id in range(1,YOUR_AMOUNT,1)}

Something that may look so small and perform so quickly suddenly became a real problem for you. If you execute the above code for more than 1000 models you will realize that it takes a significant amount of time >~1h! Wow, more than an hour for just 1000 models, what about 100000 then? My business unit cannot wait that long. This is where distributed systems such as Apache Spark come to the rescue.

Part 2 — Train your models with PandasUDF

Photo by shiyang xu on Unsplash

In this part, I will show you how simple it’s to distribute your Python code using PandasUDF. I have already downloaded data from Kaggle and ingested it under Delta tables(an example of how to do so is attached under the GettingDataFromKaggle Notebook on the GitHub Page).

Before we start, I would like to show you a “compute” configuration I used to perform benchmarks and run my code on the Databricks Lakehouse Platform. Please take into account that I am not trying to find the cheapest or optimal clusters in this case, I simply selected something that seems correct for me -> 1 Driver and 4 Workers with 8 cores each — this will allow me to run around 32 Tasks in parallel (by default it’s 1 CPU/task unless a configuration is changed by you).

The Cluster Configuration I used to run everything.

In order to read data from Delta Tables I am using Apache Spark:

salesDF = spark.read.table("hive_metastore.ap.hack_ap_rossmann_blog")

I am keeping my data vertically styled not horizontally, to increase the complexity of calculations I am adding a new variable — horizon, which will grow the dataset 7 times more(I have more than 5M rows and I would require to train Stores*Horizons models > 7K models):

Apache Spark is a powerful open-source tool that continues evolving. But there may be times when we need to be smarter. Here I am going to give you a TIP on how to accelerate your training when your dataset is skewed. You simply have to do two things, turn off AQE(Adaptive Query Executor) if enabled and repartition your dataset (ideally per individual subset):

# AQE is a great tool but it may cause issues when you have skewed tasks. 
spark.conf.set("spark.sql.adaptive.enabled", "false")

# It's better to repartition your dataset before to assure 1 task per store,
# makes shuffle in advance for you.
stores_amount = len(train_horizons.select("Store").distinct().collect())
df_full = train_horizons.repartition(stores_amount, "Store")
# To run repartitioning you need to call an action.
df_full.count()

The reason for repartitioning is quite simple — Spark works with a concept of data partitioning, when your dataset is skewed Spark would probably shuffle everything first and only then create some optimal plan for you to be executed. In our case, we are going to apply some Python Code — that Spark cannot optimize, in that case for us, Spark will be an orchestrator of our unique models in parallel fashion over multiple CPUs. Repartitioning will help you to prepare your data and to avoid Spark performing additional shuffling when Python Code is mapped — this and disabling AQE can significantly accelerate your process.

In order to train so many models in a distributed fashion, I am going to use groupBy(“).applyInPandas(func_to_apply, schema=output_schema) methods of PandasUDF.

import pyspark.sql.types as t
import pyspark.sql.functions as F
# PandasUDF requires a schema of the output to be provided.
trainReturnSchema = t.StructType([
t.StructField("Store", t.StringType()), # unique store ID
t.StructField("Horizon", t.StringType()), # unique horizon ID
t.StructField("n_used", t.IntegerType()), # number of records used in training
t.StructField("model_path", t.StringType()), # path to the model for a given combination
t.StructField("encode_model", t.StringType()), # encoded string of the model
])

# Passing an experimentID to log models appropriately.
experiment_id = experiment_fct.experiment_id
with mlflow.start_run() as run:
# Passing also runID to create a nested run.
run_id = run.info.run_id

# Our dataset will be subsampled with the GroupBy operation
# but the dataset does not contain information about the run and experiment.
# We could make new columns with a function of Spark withColumn and
# we apply our pre-created function to the Spark DataFrame.
modelDirectoriesDF = (df_full
.withColumn("run_id", F.lit(run_id))
.withColumn("experiment_id", F.lit(experiment_id))
.groupby("Store", "Horizon")
.applyInPandas(fit_final_model_udf,
schema=trainReturnSchema)
.cache()
)
# Combining our results with the original dataset by joining both.
combinedDF = (df_full.join(modelDirectoriesDF,
on=["Store","Horizon"],
how="left")
)

The code above took me ~6 min (to train and write) for individual model per Store and ~23 min per Store & Horizon (7805 models)— WOW!

Reminder: it takes more than 1 hour just to train 1000 models if we would use a classic Pythonic way with a loop.

This is what my MLFlow Experiment page looks like(as you see I have unique models per Store in that case) and a corresponding object artifact that was created during the model tracking:

MLFlow Experiment UI
MLFLow Artifact Run UI

Now let’s actually dig a bit more into the fit_final_model_udf() creation and see the output of this function.

In order to perform individual training using PandasUDF technique, you should keep in mind a few simple things:

  • your input data is a subset of the main dataset, selected from GroupBy operation, Apache Arrow library takes care of serializing and deserializing your code
  • if your model requires input data to be ordered do it inside your training function
  • your output types and names should correspond to the schema that was given to the applyInPandas
  • your output will correspond to a data frame of a subset that will be appended to the main one by applyInPandas

Here is the main function that I’ve applied to a 5M rows dataset in order to train individual models per Store and Horizon:

from datetime import date
import pickle
import json
from base64 import urlsafe_b64decode, urlsafe_b64encode


def fit_final_model_udf(df_pandas: pd.DataFrame) -> pd.DataFrame:
"""
Your input DataFrame will have following columns as an input:

"Store", "State" -> Columns on which we are going to preform a groupBy to seperate our individual datasets
"Date", "Sales", "StateHoliday", "SchoolHoliday" -> Columns that will be used for Training
"run_id", "experiment_id" -> Columns that are necessary for logging under MLFlow Artifact

"""
from ServingForecasting import wrapper_model
model = wrapper_model.ForecastingModelProphet()

df_pandas = df_pandas.fillna(0).sort_values("Date").copy()
X = df_pandas[["Store", "Horizon", "Date", "Sales", "StateHoliday", "SchoolHoliday"]]
y = df_pandas.loc[:, ["Sales"]]

store = df_pandas["Store"].iloc[0]
horizon = df_pandas["Horizon"].iloc[0]
n_used = df_pandas.shape[0]
run_id = df_pandas["run_id"].iloc[0] # pulls runID to do a nested run
experiment_id = df_pandas["experiment_id"].iloc[0]
artifact_name = f"model_custom_{store}_{horizon}"

# Resume the top-level training.
with mlflow.start_run(
run_id=run_id,
experiment_id=experiment_id
) as outer_run:

# Create a nested run for the specific device.
with mlflow.start_run(
run_name=f"store_{store}_{horizon}",
nested=True,
experiment_id=experiment_id
) as run:

# Defining Model Pipeline here.
model.fit(X)

mlflow.pyfunc.log_model(
artifact_path=artifact_name,
python_model=model,
)
# Logging date of training
mlflow.log_param(f"model_trained", date.today())
# Pay attention that the artifact you are writting
# is the same as your model.
artifact_uri = f"runs:/{run.info.run_id}/{artifact_name}"


# We are going to encode our model.
# This is a TRICK how to reduce future inference time,
# and will help us in our next step to wrap all in one.
model_encoder = str(urlsafe_b64encode(pickle.dumps(model)).decode("utf-8"))

# Create a return pandas DataFrame that matches the schema above.
returnDF = pd.DataFrame(
[[store, horizon, n_used, artifact_uri, model_encoder]],
columns=["Store", "Horizon", "n_used", "model_path", "encode_model"],
)
return returnDF

Part 3 — Create a custom Wrapper for your models

Now that we have trained all 7805 individual models, we have to figure out how we can use them at once when needed.

In order to serve all your models at once, we have to create a unique model object that will be serialized under MLFlow Artifact. But how to do that?

Let’s first have a look at the final MLFlow logging, you will spot a parameter under the logging method: artifacts = “/dbfs/tmp/ap/artifact.json”

from mlflow.tracking import MlflowClient
from mlflow.models.signature import infer_signature

client = MlflowClient()

model_serving_name = "multimodel-serving-fct-custom-wrapper"

with mlflow.start_run() as run:
model_info = mlflow.pyfunc.log_model(
"augmented-fct-model-custom",
python_model=MultiModelPyfunc(),
artifacts={
"models_encoded": "/dbfs/tmp/ap/artifact.json"
},
)
print("Your Run ID is: ", run.info.run_id)

mv = mlflow.register_model(
f"runs:/{run.info.run_id}/augmented-fct-model-custom",
f"{model_serving_name}"
)
client.transition_model_version_stage(
f"{model_serving_name}",
mv.version,
"Production",
archive_existing_versions=True,
)

So this JSON file that we’ve passed under the MLFlow Artifact actually contains all our serialized binary models and corresponding key-value pairs for each individual model.

import json 
with open(f'/dbfs/tmp/ap/artifact.json', 'w') as outfile:
json.dump(dict_stores_models, outfile)
# {"299_1": "gASV73EAAAAAAACMIFNlcnZpbmdGb3JlY2FzdGluZy53cm.....",}

Now let’s go further and understand how to actually this file will be used during the scoring process:

  1. When Serving Endpoint is created it loads the MLFlow artifact into memory, so the context of the model is loaded once
  2. When we evoke the model(call an API) the model takes the input and treats it row by row (we are going to see in Part 4 how to pass the dataset per row)
  3. Since we require to score more than 1 model at a time, we need to find a way how to get this model from the uploaded to the memory context.

class MultiModelPyfunc(mlflow.pyfunc.PythonModel):
"""
ids_store (list, str) ::
id's that correponds to keys associated to the artifacts dict
keys will be used to withdraw object that correpond to a particular model
{id_name}: model_object_per_id

"""

def __init__(self):
super().__init__()

def load_context(self, context):
# Get Dictionary with your artifacts for all models here
json_load = context.artifacts["models_encoded"]
with open(json_load) as json_file:
data = json.load(json_file)
self.models_context_dict = data

## An example yif you need to add a data processing on the fly to your data
def process_input(self, raw_input):
pass

def select_model(self, model_input):
if not isinstance(model_input, pd.DataFrame):
raise ValueError("Sample model requires Dataframe inputs")
locale_id = f'{model_input["Store"].iloc[0]}_{model_input["Horizon"].iloc[0]}' # getting the model id from the Store name
return locale_id

def predict(self, context, raw_input):
"""
expecting the input to be in array
"""
# We are checking if this is a Pandas.DF,
# because this is the output from the Serving API into the Predict
if type(raw_input) is type(pd.DataFrame()):
raw_input = eval(raw_input.values[0][0])

# We are here creating a DF for an expected income
# if you want to retrain the model on the fly
# place this under Try and add another one on except side
model_input = pd.DataFrame(
raw_input, columns=["Store", "Horizon", "Date", "StateHoliday", "SchoolHoliday"]
)
model_input["StateHoliday"] = model_input["StateHoliday"].astype("bool")
model_input["SchoolHoliday"] = model_input["SchoolHoliday"].astype("bool")
model_input.sort_values("Date", inplace=True)

try:
selected_store = self.select_model(model_input)
print(f"Selected model {selected_store}")
model_context = self.models_context_dict[str(selected_store)]
model = pickle.loads(urlsafe_b64decode(model_context.encode("utf-8")))
return model.predict(None, model_input)
except:
# Here you can fit your model, add an assert with corresponding amount of columns
# pay attention your columns will not be the same as for Predict method
return f"This ID was not yet pre-trained, you input is {raw_input}, with a type {type(raw_input)}"

As you’ve seen above I am leveraging MLFlow context for this, this will load our object once into memory and keep it there while the Endpoint is running.

Part 4 — Create your serving endpoints

Photo by Alev Takil on Unsplash

As has been mentioned in Part 3, model serving would treat your input data by default row by row(as expected because it’s a service for low latency response), let’s have a look at the example of the input data expected by Databricks Model Serving(one of the options):

  {
"dataframe_split": [{
"index": [0, 1],
"columns": ["sepal length (cm)", "sepal width (cm)", "petal length (cm)", "petal width (cm)"],
"data": [[5.1, 3.5, 1.4, 0.2], [4.9, 3.0, 1.4, 0.2]]
}]
}

But for a forecasting case, we require the dataset as a whole not row by row! For this, we require to be more “artistic” in preparing the input format for the Endpoint. This can be a tricky one, so let’s have a look at how I dealt with it:

# You would require to Serialize your Dataset into 1 row. 
# For this I've created this small NumpyEncoder class.
# Please note that you would pass this serializer under the Serving Function
# when the API is evoked. Below it's a demonstration.
import json
class NumpyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.ndarray):
return obj.tolist()
return json.JSONEncoder.default(self, obj)

json_dump = json.dumps(input_df.values, cls=NumpyEncoder)

ds_dict_serving = {
"dataframe_split": {
"index":[0], # [0,1,2] each index corresponds to a DF
"columns" : ["input_data"],
"data": [json_dump]
}
}

And this is it, when the model has been serialized and registered under the Model Serving component, you just need to create an Endpoint on Databricks Lakehouse (by using UI or API).

I created my Endpoint via Databricks UI, which took me 3 clicks.

You can evoke the Endpoint like this (this is a very simple example, you can modify it accordingly):

def score_model(ds_dict, token, url):
headers = {'Authorization': f'Bearer {token}', 'Content-Type': 'application/json'}
ds_dict_testing = json.dumps(ds_dict, allow_nan=True)
response = requests.request(method='POST', headers=headers, url=url, data=ds_dict)
if response.status_code != 200:
raise Exception(f'Request failed with status {response.status_code}, {response.text}')
return response.json()

score_model(ds_dict_serving, token, url)

Later you would probably like to re-parse the output format since it’s going to be by default in a dictionary format.

Something that I would like to pay your attention to, is if you want to run the same model in a batch or streaming fashion you do not have to change anything.

Let’s have a look at how I scored my WrappedModel in batch(you just simply need to load it from your MLFlow Experiment or your MLFLow Model Registry):

Hope you like the content and this will help you in serving your many models under a single Endpoint. A small note before you move into the repository with the code example. Take into account that there are limitations and also added overheads when you are using a single Endpoint concept. Be aware of the concurrency(the number of requests you can execute concurrently, this can be adjusted accordingly to your Endpoint Size) and the size of your object that was uploaded into memory(if the object is big, scaling by default to 0 is strongly not recommended). The choice of the amount of Endpoints depends on the use case you're currently implementing(in simple words if you are doing fraud detection and forecasting under the same endpoint will definitely lead to further issues, for example on the model and data quality monitoring), so try to be realistic.

If you would like to see the full code you can check it on my GitHub page.

I would like to thank Artem Sheiko, Michael Shtelma, and Jose Alfonso for reviewing the content.

Things to read

--

--