How to create a deep learning inference pipeline model using MLflow in three steps

Yong Liu
6 min readAug 30, 2022

--

If you have a trained and fine-tuned deep learning model that meets your business requirements and performance metrics, it is time to deploy it into production as an inference service. How would you do that?

Depending on your run time business requirements and model hosting platforms, this seemingly straightforward step actually is quite complex and is a focus of many modern MLOps (Machine Learning Operations) tools and platforms.

There are many complexities when deploying a trained model to production and here we want to show a concrete example on how you create a new inference pipeline that wraps a trained model with preprocessing and postprocessing steps that you may not encounter during the model training step.

So what are common preprocessing and postprocessing steps that you may encounter that are not present during the training step?

Let’s use a natural language processing (NLP) model as an example. Suppose you already have a trained NLP Named Entity Recognition (NER) model and you want to deploy it as a service. This NER model takes an English sentence and extracts entities such as PERSON or PLACE. However, at run time, you may encounter inputs that are in languages other than English and you are not yet ready to tackle multi-lingual NER extraction. Adding a preprocessing step to detect the input languages seems to be a good strategy which allows you to do two things:

  1. If the input is English, then proceed to invoke the NER model
  2. If the input is not English, then bypass the NER model and output something that notifies the caller only English is supported.

A byproduct of adding this preprocessing step is that you could also measure all your input language distribution (For example, how many inputs are in English vs. non-English; what’s the top-3 non-English languages etc.). This will allow you to prioritize developing a new multi-lingual NER model if non-English languages inputs have a significantly large percentage of presence in the inputs.

A common postprocessing step could be reformatting the model output and adding additional metadata (including language type detected, model version URI etc.). This allows the caller of this model inference service to better process the output of the model with good provenance tracking, knowing which version of the model was invoked etc.

Now that we know these preprocessing and postprocessing steps, let’s see how MLflow can help us to build this new inference pipeline containing these two additional steps illustrated below:

Figure 1: three-step inference pipeline

If you haven’t used MLflow before, it is time to check it out and see how it works such as tracking code, parameters, metrics and model versions etc. Here, I want to show you a powerful mechanism that MLflow provides that’s less talked about: creating an inference pipeline as a new MLflow model after you already have a trained model. In our current example, we want to create a new inference pipeline that has three steps shown in Figure 1 and log it as a new MLflow model that can be directly loaded for the inference purpose. This encapsulation of the inference pipeline as a model is a great way to make sure we can treat the pipeline as a model artifact, which allows us to reuse the same MLflow model API to load and deploy as if it is just a single step model.

Let’s follow three steps below to create and deploy the inference pipeline as an MLflow model artifact:

Step 1: Log the trained NER model as an MLflow model

In this blog post, I am going to reuse (with some modification) the Spacy NER model provided in the official MLflow github repo example: https://github.com/mlflow/mlflow/tree/master/examples/spacy

The modification is mostly a bug fix to make sure the code works with latest Spacy release 3.x. The updated code is posted here:
https://github.com/yliu2018/mlflow-examples/blob/main/spacy/train.py

If you have an MLflow tracking server (either in Databricks or your local environment), you should have a logged NER model in the MLflow model registry after you run the above Spacy training code (see Figure 2 which is a screenshot of the model artifact in the MLflow tracking server).

Figure 2: A Spacy NER model logged as an MLflow model

Step 2: Use MLflow’s mlflow.pyfunc.PythonModel API to create a new inference pipeline model

By subclassing the mlflow.pyfunc.PythonModel, we can create a new inference pipeline model that encapsulates a preprocess step (language detector), a trained NER model, and a postprocessing step (adding model metadata and detected language, in addition to the NER model extraction output), all in one serialized model that can be loaded in one call.

The mlflow.pyfunc.PythonModel has two methods that we can use load_context and predict. Using load_context, we can load anything including those models that cannot be serialized using Python. In our current example, we want to detect languages using a language detector called gcld3(a google neural network language detector which has a python API but the underlying code is C++). This gcld3 model cannot be serialized using pickle. However, we can initialize the gcld3 model in the load_context. Similarly, we can load the logged Spacy NER model using mlflow’s load_model as follows:

class InferencePipeline(mlflow.pyfunc.PythonModel):
def load_context(self, context):
import gcld3
self.detector = gcld3.NNetLanguageIdentifier(min_num_bytes=0,
max_num_bytes=1000)
self.trained_ner_model =
mlflow.spacy.load_model(self.trained_model_uri)

The load_context is called after the class InferencePipeline’s init method, so we can pass on the trained_model_uri which is the NER model uri.

def __init__(self, trained_model_uri, inference_pipeline_uri=None):
self.trained_model_uri = trained_model_uri
self.inference_pipeline_uri = inference_pipeline_uri

In the same class, we can add a new method called preprocessing_step_lang_detect, which can detect the input’s language:

def preprocessing_step_lang_detect(self, row): 
language_detected = self.detector.FindLanguage(text=row[0])
if language_detected.language != ‘en’:
print(“found Non-English language text.”)
return language_detected.language

Then we can put preprocessing and postprocessing steps with the NER output together into one method for a single row of input (the input is a pandas dataframe):

def ner_model(self, row):  # preprocessing: language detection
language_detected = self.preprocessing_step_lang_detect(row)
# model inference
doc = self.trained_ner_model({row[0]})
pred_entites = [(ent.text, ent.label_) for ent in doc.ents]
pred_tokens = [(t.text, t.ent_type_, t.ent_iob) for t in doc]
# postprocessing: add additional metadata
response = json.dumps({
‘response’: {
‘prediction_entities’: pred_entites,
‘prediction_tokens’: pred_tokens
},
‘metadata’: {
‘language_detected’: language_detected,
},
‘model_metadata’: {
‘trained_model_uri’: self.trained_model_uri,
‘inference_pipeline_model_uri’: self.inference_pipeline_uri
},
})
return response

Finally, we can implement the required predict method as follows:

def predict(self, context, model_input):
results =model_input.apply(self.ner_model,
axis=1,
result_type=’broadcast’)
return results

Step 3: Use MLflow’s mlflow.pyfunc.log_model to log the inference pipeline as a new model artifact in the main function

To use the newly implemented InferencePipeline class, we will need to use the mlflow.pyfunc.log_model to log the inference pipeline as a new MLflow model artifact, where the parameter python_model should be assigned with an instance ofInferencePipeline using the trained_model_uri and inference_pipeline_uri defined as below:

def task(trained_model_run_id, pipeline_run_name):
with mlflow.start_run(run_name=pipeline_run_name) as mlrun:
trained_model_uri = f’runs:/{trained_model_run_id}/model’
inference_pipeline_uri =
f’runs:/{mlrun.info.run_id}/{MODEL_ARTIFACT_PATH}’
mlflow.pyfunc.log_model(artifact_path=MODEL_ARTIFACT_PATH,
conda_env=CONDA_ENV,
python_model=InferencePipeline(trained_model_uri,
inference_pipeline_uri),
signature=signature,
registered_model_name=MODEL_ARTIFACT_PATH
)

mlflow.log_param(“finetuned_model_uri”, trained_model_uri)
mlflow.log_param(“inference_pipeline_uri”,
inference_pipeline_uri)
mlflow.set_tag(‘pipeline_step’, __file__)
if __name__ == ‘__main__’:
task()

The complete code for step 2 and step 3 can be found in the github repo: https://github.com/yliu2018/mlflow-examples/blob/main/spacy/inference.py

Once this inference model is logged and registered, then you can load and execute this pipeline model using the standard mlflow.pyfunc.load_model by passing the inference pipeline URI as the parameter, then the whole three-step inference pipeline will be executed! See Figure3 for the the screenshot of the logged inference pipeline model in the MLflow tracking server.

Figure 3: Logged inference pipeline model in the MLflow tracking server

If you want to learn more about practical guides on using MLflow for implementing deep learning applications end-to-end, including setting up a local full-fledged MLflow tracking server for development purpose, please take a look at my new book “Practical Deep Learning at scale with MLflow” !

New posts will be published on building deep learning at scale. Stay tuned!

--

--