Avoid the ML dependencies syncing black-hole

Using MLFlow and Apache Spark to isolate inference and training dependencies

Yerachmiel Feltzman
Israeli Tech Radar
5 min readMar 27, 2023

--

Have you ever tried to deploy a machine-learning model just to receive an error similar to the one that follows?

RuntimeError: Running pandas version ('1.5.3') is incompatible with min ('1.1.0'} and max ('1.2.5') versions

If you deployed and received an error, that’s your lucky day. Better an error than a model dependency that yields slightly different results for slightly different versions.

Every engineer working with MLOps will soon or later face the challenge of syncing between the dependencies used during model training and those used during model serving.

If you have deployed just enough ML models I don’t need to prove to you this story is real, but just for the record here are some more voices telling you similar stories:

Photo by Nihal Karkala on Unsplash

The solution? Change the inference dependency version to be compatible with the same version used during training, right? Might be, but since that’s rather cumbersome and can lead anyone close to insanity, the community has developed several solutions.

Let’s explore how MLFlow helps us achieve full isolation between training and inference dependencies for batch scoring using Apache Spark.

(If you already know the solution, and want to immediately dive deep into MLFLow Spark UDF internals, check out the next post in the series.)

A compatibility issue — how to solve it?

Let’s create a model with a compatibility issue we can play with:

class MyModel():
NAME = "my_model"

def __init__(self):
# compatibility limitation
self._min_compatible_pandas_version: Version = version.parse("1.1.0")
self._max_compatible_pandas_version: Version = version.parse("1.2.5")

def _check_compatibility(self):
inference_pandas_version: Version = version.parse(pd.__version__)
logger.info(f"running pandas version: {inference_pandas_version}")

if inference_pandas_version < self._min_compatible_pandas_version \
or inference_pandas_version > self._max_compatible_pandas_version:
raise RuntimeError(
"Running pandas version ('%s') is incompatible with min ('%s'} and max ('%s') versions."
% (inference_pandas_version, self._min_compatible_pandas_version, self._max_compatible_pandas_version)
)

def predict(self, context, model_input: pd.DataFrame):
self._check_compatibility()
return model_input.apply(lambda column: column * 10)

Since the model above can only run on Pandas>=1.1.0,<=1.2.5 , if we want to implement an inference batch pipeline that for some reason needs Pandas==1.5.3 we are going to face the sync training and inference dependencies challenge.

A Spark pipeline for batch ML inference — image by author

Let’s try to run it anyway:

» poetry run python blog_post_mlflow_spark_udf/batch_predict.py "local"
RuntimeError: Running pandas version ('1.5.3') is incompatible with min ('1.1.0'} and max ('1.2.5') versions.

Ok. We expected that. 😃

Run on an isolated environment — step by step

A Spark pipeline for batch ML inference with the isolated environment — image by author

Would you be happy if the ML model could run completely isolated from your inference service, in this case, a Spark environment? Having the model and the inference service running together but completely isolated will mean we don’t need to solve the training and inference dependencies syncing challenge. In this way, the ML model can have whatever dependency it needs and the inference service can have whatever dependency it needs. The end of the conflicts. Peace on earth. Let’s try it out.

1. Define our model as an MLFlow compatible python model.

For this, our streamlined model will simply inherit from mlflow.pyfunc.PythonModel:

class MyModel(mlflow.pyfunc.PythonModel):
NAME = "my_model"
...

Note that I am using the PythonModelgeneric solution, but MLFlow has many built-in model flavors you can use out-of-the-box for registering your trained model to the MLFlow model registry.

2. Define the model training environment in a yaml file

name: my-model-env
channels:
- conda-forge
dependencies:
- python >= 3.8,<=3.10
- pandas = 1.2

Let’s call it model-env.yml.

You can read more about conda environment files on conda’s official documentation.

3. Register the model to the MLFlow registry

def register():
my_model = MyModel()

model_info: ModelInfo = mlflow.pyfunc.log_model(artifact_path=my_model.NAME,
registered_model_name=my_model.NAME,
python_model=my_model,
code_path=[str(Path("blog_post_mlflow_spark_udf"))],
conda_env=str(Path("blog_post_mlflow_spark_udf", "my_model",
"model-env.yml")))
return model_info

if __name__ == '__main__':
register()

Note how we are registering our model alongside its training environment definition in the conda_env parameter.

» poetry run python blog_post_mlflow_spark_udf/register_model.py                            
Created version '1' of model 'my_model'.

This article won’t cover the MLFlow model registry, in more depth. The current example is registering the model to the local filesystem, instead of a remote MLFlow registry, although the code is virtually the same, except for configuration. You can read more about machine-learning models’ life-cycle and where the registry fits-in in this article.

4. Run batch inference using the isolated environment

def batch_predict(env_manager: str):
spark = SparkSession \
.builder \
.getOrCreate()

input_data = generate_dummy_input_data(spark)

# for more on URIs format for loading models see:
# https://www.mlflow.org/docs/2.2.2/concepts.html#referencing-artifacts
model_uri = f"models:/{MyModel.NAME}/latest"

predict_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri, env_manager=env_manager)
return input_data.withColumn("prediction", predict_udf(struct("value")))

if __name__ == '__main__':
result = batch_predict(env_manager=(sys.argv[1]))
result.show()

Note the env_manager parameter. It will allow us to run inference isolated as we want. We will pass it to the mlflow.pyfunc.spark_udf. To run inference in the isolation mode we must pass "conda" to env_manager.

This UDF will:

  1. retrieve our model from the MLFlow registry, based on the given model_uri ;
  2. unpickle the retrieved model and prepare it for inference
  3. run the model inference function, in our case my_model.predict

Once the Spark UDF is ready, we will just use it as any other UDF passing the relevant columns as input to let Spark use it to process and calculate results in a distributed fashion. And, since it’s just a Spark UDF, we are able to add more logic before (pre-processing) and after (post-processing) as we wish.

Let’s try it out and run it with conda :

» poetry run python blog_post_mlflow_spark_udf/batch_predict.py "conda"
+----+-----+----------+
| id|value|prediction|
+----+-----+----------+
|id_1| 1| 10.0|
|id_2| 2| 20.0|
|id_3| 3| 30.0|
+----+-----+----------+

Voila! It works!

Photo by Wil Stewart on Unsplash

Happy (ML) pipeline!

____

(Now that you already know the solution, and want to dive deep into MLFLow Spark UDF internals, check out the next post in the series.)

Notes:

  1. All code can be found here: https://github.com/Yerachmiel-Feltzman/blog-post_mlflow-spark-udf
  2. If you want to understand how this magic works under the hood, keep an eye on my next posts to get notified when I post the follow-up article on “Uncovering MLFlow’s Spark UDF”.

--

--