Going Serverless with Databricks: Part 1

Başak Tuğçe Eskili
Marvelous MLOps
Published in
9 min readApr 24, 2023

Databricks serverless model endpoint has finally arrived and we already started playing with it. Serverless architectures have pros and cons. Despite the limited control over the underlying infrastructure, we like the ease that comes with automated scalability, no requirement of infrastructure management, and possible cost savings.

Motivation

Having deployed multiple models to multiple environments, we know how much work it can require to manage all the resources. We have been using Databricks in our current solution architecture for a while already for model training, batch prediction, and model registry. Replacing AKS with Databricks for our real-time inference models has been a long-awaited addition as we want to simplify the process.

In the next sections, we’ll walk you through the step-by-step process of creating a model endpoint with a sklearn model and a custom model and share our findings so far.

Use case: amazon review dataset

We wanted to mimic two retail use cases:

  • defining product category based on review
  • providing product recommendations

Public amazon review dataset is perfect for that. AmazonReviewsDataLoader downloads data from https://archive.org/download/amazon-reviews-1995-2013/ for categories ‘Gourmet_Foods’, ‘Pet_Supplies’, ‘Health’, and ‘Baby’, merges it with category file and returns a dataframe.

Prerequisites

If you want to run a provided notebook Train_and_deploy_models.py from https://github.com/marvelousmlops/amazon-reviews-databricks in Databricks, you need to do the following first:

export DATABRICKS_HOST=<DATABRICKS_HOST> 
export DATABRICKS_TOKEN=<DATABRICKS_TOKEN>
git clone https://github.com/marvelousmlops/amazon-reviews-databricks.git
cd amazon-reviews-databricks
python3 setup.py bdist_wheel
dbfs rm dbfs:/upload/amazon_reviews/amazon-reviews-0.0.1-py3-none-any.whl
dbfs cp dist/amazon_reviews-0.0.1-py3-none-any.whl dbfs:/upload/amazon_reviews/amazon-reviews-0.0.1-py3-none-any.whl

IMPORTANT: DATABRICKS_HOST value should have format: https://< your workspace >/, for example: https://adb-123456781234.2.azuredatabricks.net/

DATABRICKS_TOKEN can be just a personal access token, for now, theuser needs to have admin permissions. In the second part of the article, we will talk about automation using GitHub Actions and deploying workflows on Azure Databricks with Service Principals.

Model Endpoint: sklearn model

If you have already come across documentation on Databricks model endpoint, you know it’s integrated with mlflow, the most common open-source platform for tracking experiments and registering models. MLflow supports many built-in models, sklearn, PyTorch, lightGBM, etc. Logging, saving, or loading models can easily be done via mlflow library. E.g. mlflow.sklearn.autolog() , mlflow.sklearn.load_model()

The following code shows how to load data from a source in dbfs (Databricks file system), simple model training and create a model endpoint for real-time prediction.

  1. Loading data. AmazonReviewsDataLoader is a python class that downloads and returns data from Amazon reviews public dataset for the following categories: ‘Gourmet_Foods’, ‘Pet_Supplies’, ‘Health’, ‘Baby’.
from amazon_reviews.data_loader.data_loader import AmazonReviewsDataLoader
import nltk
from nltk.corpus import stopwords
from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
import mlflow
import argparse
import os

PATH = '/dbfs/FileStore/shared_uploads/amazon_reviews'
DataLoader = AmazonReviewsDataLoader(path=PATH)
df = DataLoader.create_reviews_dataframe()

2. Train simple sklearn model, log, and register with mlflow.

nltk.download('stopwords')
pipeline = Pipeline([
('vect', CountVectorizer(stop_words=stopwords.words('english'))),
('tfidf', TfidfTransformer()),
('lr', LogisticRegression(max_iter=3000))
])

mlflow.sklearn.autolog()

mlflow.set_experiment(experiment_name='/Shared/Amazon_category_model')
with mlflow.start_run(run_name='amazon-category-model') as run:

pipeline.fit(df.Text.values, df.Cat1.values)
mlflow_run_id = mlflow.active_run().info.run_id

model_version = mlflow.register_model(model_uri=f"runs:/{mlflow_run_id}/model",
name='amazon-category-model')

If the run is successful, you’ll see the following message:

Successfully registered model ‘amazon-category-model’.
mlflow.tracking._model_registry.client: Waiting up to 300 seconds for the model version to finish creation.
Model name: amazon-category-model, version 1
Created version ‘1’ of model ‘amazon-category-model’.

Now you can see the model registered to mlflow in the registered models section under Databricks Machine Learning.

There are 2 ways to create a model endpoint, either via UI (use model for inference on top right) or via sending REST API.

3. Create a model endpoint with REST API.

'''
Set credentials for the environment, the following code
is to set credentials on a Databricks notebook, it uses dbutils to get token.
'''
os.environ['DATABRICKS_TOKEN'] = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
os.environ['DATABRICKS_HOST'] = spark.conf.get("spark.databricks.workspaceUrl")
def databricks_host():
# workspace URL is taken from the environment variables
dtb_url = os.environ["DATABRICKS_HOST"]
dtb_url = dtb_url.strip("/")
if dtb_url.startswith("https://"):
dtb_url = dtb_url[8:]
return dtb_url

def serve_ml_model_endpoint(endpoint_name, endpoint_config) -> requests.Response:
"""
Args:
endpoint_name (str): Name of the endpoint
endpoint_config (dict): Endpoint config
Returns:
requests.Response
"""
config = {
"name": endpoint_name,
"config": endpoint_config
}

response = requests.put(
f"https://{databricks_host()}/api/2.0/serving-endpoints/{endpoint_name}/config",
headers={"Authorization": f"Bearer {os.environ['DATABRICKS_TOKEN']}"},
json=config,
)

endpoint_name='amazon-category-model'

config = {
"served_models": [{
"model_name": "amazon-category-model",
"model_version": f"{dict(model_version)['version']}",
"workload_size": "Small",
"scale_to_zero_enabled": False,
}]
}

serve_ml_model_endpoint(endpoint_name=endpoint_name, endpoint_config=config)

Expected output log: <Response [200]>

Once you successfully create the model endpoint, you can see it in the serving section.

4. Send query to the endpoint.

It does give some meaningful results! Review "Oh man, the best cereals I’ve ever had! A lot of nuts, and I love nuts." is categorized as "grocery gourmet food", "Best face serum, contains vitamin C, panthenol, and your skin glows!" is categorized as "beauty"

import requests

endpoint = f"{os.environ['DATABRICKS_HOST']}/serving-endpoints/{endpoint_name}/invocations"
response = requests.post(
f"{endpoint}",
headers={"Authorization": f"Bearer {os.environ['DATABRICKS_TOKEN']}"},
json={"inputs": ["Oh man, the best cereals I've ever had! A lot of nuts, and I love nuts.", "Best face serum, contains vitamin C, panthenol, and your skin glows!"]})

print("Response status:", response.status_code)
print("Reponse text:", response.text)

## Response status: 200
## Reponse text: {"predictions": ["grocery gourmet food", "beauty"]}

Model Endpoint: custom model

Logging, saving, and loading custom Python models within mlflow can be done by using mlflow.pyfunc module. In our approach, we created a model wrapper class inheriting mlflow.pyfunc.PythonModel, which will allow us to log and register our model to MLflow.

There are 2 main challenges we faced with custom models while creating model endpoint; 1. adding a custom python library as a dependency 2. data structure of the model input provided via querying model endpoint.

Custom model implementation based on collaborative filtering:

from pyspark.sql import functions as F
from sklearn.decomposition import TruncatedSVD
import numpy as np
import mlflow


class AmazonProductRecommender:
"""
Amazon product recommender: recommends list of products based on product id
Model based collaborative filtering; inspired by:
https://medium.com/@gazzaazhari/model-based-collaborative-filtering-systems-with-machine-learning-algorithm-d5994ae0f53b

"""
def __init__(self, spark_df):
"""
Creates dictionary of form {'product1' : ['product7', 'product3'],
'product2': ['product5', 'product6']}
:param spark_df: spark dataframe
"""
self.spark_df = spark_df
self.model = {}

def train(self):
active_user_counts = self.spark_df.groupBy("userId").agg(F.count('*').alias("user_count")).filter(
"user_count >= 5")
self.spark_df = self.spark_df.join(active_user_counts, "userId", "inner")
product_counts = self.spark_df.groupBy("productId").agg(F.count('*').alias("product_count")).filter(
"product_count >= 30")
self.spark_df = self.spark_df.join(product_counts, "productId", "inner").drop("user_count", "product_count")
spark_df_new = self.spark_df.select('userId', 'productId', 'Score').withColumn(
"Score", self.spark_df.Score.cast("float"))
spark_df_pivoted = spark_df_new.groupBy("userId").pivot("productId").avg("Score").na.fill(0)
ratings_matrix = spark_df_pivoted.toPandas()
ratings_matrix = ratings_matrix.set_index('userId')
X = ratings_matrix.T
SVD = TruncatedSVD(n_components=10)
decomposed_matrix = SVD.fit_transform(X)
correlation_matrix = np.corrcoef(decomposed_matrix)
product_names = list(X.index)
for product in product_names:
product_id = product_names.index(product)
recom = list(np.array(X.index)[(-correlation_matrix[product_id]).argsort()[1:10]])
self.model[product] = recom
return self

Model wrapper class inheriting mlflow.pyfunc.PythonModel:

class AmazonProductRecommenderWrapper(mlflow.pyfunc.PythonModel):
def __init__(self, model):
"""Initializes the model in wrapper.
Args:
model: an AmazonProductRecommender object.
Returns:
None
"""
self.model = model

def predict(self, context, model_input: dict):
"""Returns the model's prediction based on the model input.
Args:
model_input: dictionary in format: {'basket': List[str], 'customer_id': str}.
Returns:
The prediction of the model based on the model input.
"""
model_input = {'customer_id': str(model_input['customer_id']),
'basket': list(model_input['basket'])}
return self.model[model_input['basket'][0]]

Train custom model (AmazonProductRecommender) and save to MLflow via model wrapper (AmazonProductRecommenderWrapper):

from pyspark.sql import SparkSession
from amazon_reviews.recommender_model.amazon_product_recommender import AmazonProductRecommenderWrapper, \
AmazonProductRecommender
from mlflow.utils.environment import _mlflow_conda_env

mlflow.set_experiment(experiment_name='/Shared/Amazon_recommender')
with mlflow.start_run(run_name="amazon-recommender") as run:
spark = SparkSession.builder.getOrCreate()
spark_df=spark.createDataFrame(df)
recom_model = AmazonProductRecommender(spark_df=spark_df)
recom_model.train()
wrapped_model = AmazonProductRecommenderWrapper(recom_model.model)
mlflow_run_id = run.info.run_id

''' The following code shows how to add custom library amazon_reviews
as a dependency to mlflow.
'''

conda_env = _mlflow_conda_env(
additional_conda_deps=None,
additional_pip_deps=["code/amazon_reviews-0.0.1-py3-none-any.whl",
"pyspark==3.3.0"
],
additional_conda_channels=None,
)
mlflow.pyfunc.log_model("model",
python_model=wrapped_model,
conda_env = conda_env,
code_path = ["/dbfs/upload/amazon-reviews/amazon_reviews-0.0.1-py3-none-any.whl"])

model_version = mlflow.register_model(model_uri=f"runs:/{mlflow_run_id}/model",
name='amazon-recommender')

Creating a model endpoint is the same as sklearn model:

endpoint_name='amazon-recommender'

config = {
"served_models": [{
"model_name": "amazon-recommender",
"model_version": f"{dict(model_version)['version']}", #model version is coming from mlflow.register_model above
"workload_size": "Small",
"scale_to_zero_enabled": False,
}]
}

serve_ml_model_endpoint(endpoint_name=endpoint_name, endpoint_config=config)

Send query to the endpoint:

import requests

model_input = {
'customer_id': 'abcdefg12345678abcdefg',
'basket': ['B00000J0FW'], # Sassy Who Loves Baby Photo Book; baby products
}

url = f"https://{os.environ['DATABRICKS_HOST']}/serving-endpoints/{endpoint_name}/invocations"

headers = {'Authorization': f"Bearer {os.environ['DATABRICKS_TOKEN']}", 'Content-Type': 'application/json'}
data_json = {'inputs': model_input}
response = requests.request(method='POST', headers=headers, url=url, json=data_json)

if response.status_code != 200:
raise Exception(f'Request failed with status {response.status_code}, {response.text}')

print(response.text)

# {"predictions": ["B00005MKYI", "B000056HMY", "B000056JHM", "B000046S2U", "B000056JEG", "B000099Z9K", "B00032G1S0", "B00005V6C8", "B0000936M4"]}
# B00005MKYI: Deluxe Music In Motion Developmental Mobile, B000056HMY: The First Years Nature Sensations Lullaby Player; B000056JHM:Bebe Sounds prenatal gift set

Problems, limitations and attention points

Problem 1 & solution. PySpark dependency with custom library

This documentation shows how to add a custom library as a dependency to mlflow model. However, it does not work if your custom library has Pyspark in it. For example in the use case above, AmazonProductRecommender training uses Pyspark.

If your custom model does not require installing a custom Python library, then you don’t need to define it. If you need to install a custom library in the environment where you run the prediction, in this case, the container that will be created behind the model endpoint, then you need to provide your custom library to mlflow dependencies.

The suggested solution to add custom libraries here uses
mlflow.models.utils.add_libraries_to_model(<model_uri>) and wheel files for all dependencies. If you add all your dependencies, including Pyspark to additional_pip_deps, and run mlflow.models.utils.add_libraries_to_model(<model_uri>), your code will look similar to this:

conda_env = _mlflow_conda_env(
additional_conda_deps=None,
additional_pip_deps=["/dbfs/upload/amazon-reviews/amazon_reviews-0.0.1-py3-none-any.whl",
"pyspark==3.3.0"
],
additional_conda_channels=None,
)

mlflow.models.utils.add_libraries_to_model(f"models:/amazon-recommender/latest")

then you’ll see the following error: An error occurred while loading the model. No module named ‘pyspark’

We assume, adding Pyspark dependency via the function “mlflow.models.utils.add_libraries_to_model ” does not work because it has no wheel file. To solve this issue, we upload our custom library as a wheel file to dbfs and provide the dependency as a code (shown in the code above where we trained and saved the custom model)

Problem 2 & solution. Model Input

Model wrapper class that takes trained model as an input. model_input provided to predict function is a dictionary with customer_id and basket. When a query is sent to the model endpoint, model_input is passed as a dictionary of string(customer_id) and list(basket) values, however, they are transformed into numpy objects by default. To ensure the data structure of the given model_input, we transform numpy objects back to str and list (shown in the code above, model wrapper class).

Attention point. Transition to the prod stage

It is important to always transition the model to production before deploying an endpoint. If an endpoint already exists and there is a model in the production stage behind that endpoint, updating the endpoint with another version of this model that does not have a production stage will have no effect.

Limitation. Request body & response body format

As can be seen in the documentation and our example above, there are certain formats required in the API request and response.

The request should be sent as a JSON with certain keys (dataframe_split, dataframe_record, instances, inputs).
There are only 4 formats and your input should follow one of them depending on your case.

The response from the endpoint is always in the following format:

{
“predictions”: “<json-output-from-model>”
}

The output is wrapped in a prediction key.

In big organizations, APIs are consumed by other teams ( such as front-end, back-end, etc.) that have a specific expectation regarding the format of the response body.

If the APIs are already running on Kubernetes and calls to the APIs are integrated into other systems, it might be challenging to migrate to Databricks serverless API deployment because the payload of the API will inevitably have to change. And it can be difficult especially when a small change may cost thousands of euros and may take a few months to complete.

Conclusion

Databricks serverless model endpoint is a great step forward in simplifying real-time model deployments and we plan to migrate our APIs running on Kubernetes to Databricks. Although it’s still in the early stages with limited documentation, and there is room for improvement, we managed to get it working for our use case and we hope to see more flexibility around custom payload in the future.

--

--