Building, Tracking and Orchestrating End-to-End Machine Learning Pipelines with MLflow, Hyperopt, and Prefect

Rehab Reda
10 min readJul 12, 2023

--

The steps are the following:

  1. Introduction
  2. Set up the Environment
  3. Configure MLflow
  4. Load and split the data
  5. Train and tune the model
  6. Choose the best model
  7. Promote the best model for production
  8. Serve the best model
  9. Orchestrate the pipeline using Prefect
  10. Make a request to the deployed model

Introduction

Building and deploying machine learning models can be a complex and time-consuming process. However, there are a number of tools that can help to automate and streamline the process. In this blog post, we will discuss how to build, track and orchestrate end-to-end machine learning pipelines using MLflow, Hyperopt, and Prefect.

MLflow is a platform for managing the entire machine learning lifecycle, from experiment tracking to model deployment. Hyperopt is a library for hyperparameter optimization, which can help you to find the best hyperparameters for your machine learning models. Prefect is a workflow management tool that can help you to automate your machine learning pipelines.

By combining these three tools, you can build end-to-end machine learning pipelines that are reproducible, scalable, and easy to maintain. In this blog post, we will walk you through the steps involved in building and deploying a machine learning pipeline using MLflow, Hyperopt, and Prefect.

Set up the Environment

1- Instal libraries, in cmd run

pip install mlflow
pip install hyperopt
pip install xgboost
pip install prefect

2- Lunch mlflow server, in cmd run

mlflow server --backend-store-uri sqlite:///backend.db --default-artifact-root ./mlruns

This command starts an instance of the MLflow server with the following configurations:

  • --backend-store-uri sqlite:///backend.db: specifies the backend store URI where the MLflow server should persist metadata related to experiments, runs, parameters, metrics, and artifacts. In this case, the backend store uses an SQLite database file named backend.db.
  • --default-artifact-root ./mlruns: specifies the default artifact store location where the MLflow server should store artifacts generated by runs. In this case, the default artifact store location is the ./mlruns directory relative to the current working directory.

3- Lunch prefect server, in cmd run

prefect server start

The prefect server start command is used to start the Prefect Server. The Prefect Server is a central daemon that provides a variety of features for managing and executing Prefect flows, including:

  • Flow execution: The Prefect Server can be used to execute flows, both locally and in a distributed fashion.
  • Flow monitoring: The Prefect Server can be used to monitor the execution of flows, providing information such as the status of each task, the logs for each task, and the metrics for each task.
  • Flow scheduling: The Prefect Server can be used to schedule the execution of flows, either on a recurring basis or on demand.
  • Flow versioning: The Prefect Server can be used to version flows, providing a way to track changes to flows over time.

To view the Prefect UI, open a web browser and navigate to http://127.0.0.1:4200/

Prefect UI

Configure MLflow

  • The @task decorator tells Prefect that this function is a task.
  • The mlflow.set_tracking_uri() method connects to the MLflow tracking server.
  • The mlflow.set_experiment() method sets the experiment name.
  • The mlflow.get_experiment_by_name() method gets the experiment ID for the given experiment name.
  • The return value of the task is the experiment ID.

Load and split the data

two Prefect tasks that load and split the MNIST dataset. The first task, load_data(), loads the MNIST dataset using the datasets.load_digits() method from the sklearn.datasets library. The second task, split_data(), splits the dataset into a training set and a test set using the train_test_split() method from the sklearn.model_selection library.

The @task decorator tells Prefect that these functions are tasks. Prefect tasks are used to define the steps in a workflow. In this case, the tasks are loading and splitting the MNIST dataset.

The load_data() task first loads the MNIST dataset using the datasets.load_digits() method. This method returns a dictionary containing the features (x) and labels (y) of the dataset. The task then creates a Pandas DataFrame from the dictionary. The DataFrame has the features in the first n-1 columns and the label in the last column.

The split_data() task splits the DataFrame into a training set and a test set using the train_test_split() method. This method takes the DataFrame, the column containing the labels, the test size, and the random state as input. The method returns the training set, the test set, the training labels, and the test labels.

Train and tune the model

This code defines a Prefect task called train_hyperparameter_tuning that performs hyperparameter tuning for an XGBoost classifier on a given dataset.

The task takes as input the training and test sets (x_train, x_test, y_train, y_test) and a model_name parameter, which is the name of the registered model in MLflow.

The hyperparameters to be tuned are defined using the hp module from the hyperopt library. The search_space variable defines the range of values for each hyperparameter.

The objective function defines the objective to be minimized during hyperparameter tuning. It creates an XGBoost classifier with the given hyperparameters, fits the classifier on the training data, and evaluates the model on the test data. It logs the hyperparameters, accuracy, and F1-score of the model using MLflow. It also logs the trained model as an artifact in MLflow.

The fmin function from hyperopt is used to minimize the objective function by searching the hyperparameter space defined in search_space. tpe.suggest is used as the search algorithm, and max_evals is set to 5 to limit the number of evaluations.

The best_result variable contains the hyperparameters that resulted in the best accuracy during hyperparameter tuning. The function returns best_result.

By defining this task as a Prefect task with the @task decorator, it can be integrated into a larger Prefect workflow to perform hyperparameter tuning for an XGBoost classifier.

Specifically, the hyperparameters being tuned are:

  • learning_rate: the learning rate of the XGBoost algorithm
  • max_depth: the maximum depth of each tree in the XGBoost algorithm
  • gamma: the minimum loss reduction required to make a further partition on a leaf node of the tree
  • colsample_bytree: the subsample ratio of columns when constructing each tree
  • reg_alpha: the L1 regularization term on weights
  • reg_lambda: the L2 regularization term on weights
  • seed: the random seed used for the XGBoost algorithm.

Choose the best model

This code defines a Prefect task called get_best_model that retrieves the version and URI of the best performing model based on the accuracy metric from a given MLflow experiment.

The task takes as input the experiment_id for the experiment to search for the best model.

The MlflowClient class from the mlflow library is used to interact with the MLflow tracking server at the specified tracking_uri (in this case, http://127.0.0.1:5000).

The search_runs method is used to search for runs in the specified experiment that are currently active and are sorted by the accuracy metric in descending order. The [0] index is used to select the first run, which represents the best performing model.

The run_id variable is set to the ID of the selected run, and the model_uri variable is set to the URI of the model artifact in MLflow.

The search_model_versions method is used to search for the version of the model in the selected run. The version number is stored in the model_version variable.

The task returns a tuple containing the model_version and model_uri.

Promote the best model to production

This code defines a Prefect task called promote_best_model that promotes the specified model version to the "Production" stage in MLflow.

The task takes as input the model_version to promote and the model_name of the registered model in MLflow.

The MlflowClient class from the mlflow library is used to interact with the MLflow tracking server at the specified tracking_uri (in this case, http://127.0.0.1:5000).

The transition_model_version_stage method is used to transition the specified model_version of the registered model model_name to the "Production" stage. The archive_existing_versions parameter is set to False, which means that previous versions of the model in the "Production" stage will not be archived.

Serve the best model

This code defines a Prefect task called serve_model that deploys the specified MLflow model to a local server for inference.

The task takes as input the model_uri of the registered model in MLflow to serve.

The os.system function from the os library is used to execute shell commands.

The first command killport 8080 is used to terminate any running server on port 8080.

The second command os.chdir("D:\DEV\Prefect") sets the current working directory to a specific path on the local machine.

The third command sets the MLFLOW_TRACKING_URI environment variable to the address of the MLflow tracking server.

The fourth command uses the mlflow command line interface to serve the specified model on port 8080. The --no-conda flag is used to indicate that no conda environment is needed to serve the model.

Orchestrate the pipeline using Prefect

This code defines a Prefect flow called main that performs the following tasks:

  1. Retrieves the ID of the specified experiment_name in MLflow using the mlflow_environment function.
  2. Loads data using the load_data function.
  3. Splits the data into training and test sets using the split_data function.
  4. Performs hyperparameter tuning using the train_hyperparameter_tuning task and the specified model_name.
  5. Retrieves the best performing model in the specified experiment using the get_best_model task.
  6. Promotes the best performing model to the “Production” stage using the promote_best_model task.
  7. Deploys the best performing model to a local server using the serve_model task.

The @flow decorator is used to define the main function as a Prefect flow.

The if __name__ == "__main__": block is used to define the deployment of the Prefect flow using Deployment.build_from_flow. This creates a new deployment of the main flow with the following configuration:

  • Name: "model_training_and_tuning_weekly"
  • Parameters: {'experiment_name':'digits_experiment', 'model_name':'xgboost'}
  • Schedule: At 12:00 AM, only on Thursday, in the "Africa/Cairo" timezone
  • Version: 1
  • Work Queue Name: "ml"

The apply method is used to apply the deployment, which registers the flow and its configuration in the Prefect backend and schedules it to run according to the defined schedule.

Run the following command in cmd

prefect agent start --pool default-agent-pool --work-queue ml

This command starts a Prefect agent to manage the execution of Prefect flows on a specific pool and work queue.

The prefect agent start command starts a new Prefect agent process.

The --pool flag specifies the name of the pool that the agent should use for executing flows. In this case, the pool is named default-agent-pool.

The --work-queue flag specifies the name of the work queue that the agent should use for receiving work. In this case, the work queue is named ml.

Run the following command in cmd

python app.py

app.py script contains a Prefect flow defined with the @flow decorator, the flow will be registered with the Prefect backend and scheduled to run according to its defined schedule.

open a web browser and navigate to http://127.0.0.1:4200/

Weekly deployment of model training and tuning in the deployments tab
Review main flow steps in the Prefect UI
View main flow logs in the Prefect UI

open a web browser and navigate to http://127.0.0.1:5000/

MLflow experiment for digits classification
Deploy the best model to production using MLflow

Make a request to the deployed model

This code defines a dictionary called row that represents a single row of pixel values for a digit image. It then specifies the address of a local server running an ML model for digit classification using the host and port variables.

The url variable is used to specify the endpoint for the model's prediction API, which is http://{host}:{port}/invocations.

The headers variable is a dictionary that specifies the format of the data that will be sent to the prediction API.

The input_data variable is a dictionary that contains the input data to be sent to the prediction API. In this case, it contains a single record of digit image pixel values represented by the row dictionary.

The code then uses the requests library to send a POST request to the prediction API using the url, headers, and input_data variables. The json.dumps function is used to convert the input_data dictionary to a JSON-encoded string that can be sent in the request body.

Finally, the code prints the prediction result returned by the prediction API using the r.text property.

Response to a submitted request from a deployed model

conclusion

To summarize, this blog has demonstrated how to leverage MLflow, Hyperopt, and Prefect to build and orchestrate end-to-end machine learning pipelines. By combining these open-source technologies, data scientists and machine learning engineers can improve the efficiency and reproducibility of their work, and accelerate the development of machine learning applications. The blog has provided an overview of each technology and demonstrated how to integrate them into a single pipeline to solve a machine learning problem. Overall, these technologies offer a powerful toolset for managing the entire machine learning lifecycle from data preparation and model training to deployment.

you can find script on GitHub

For more information on hyperparameter tuning and model serving, you can also read the article “MLflow, Hyperopt, Prefect, Evidently, and Grafana: The Ultimate Guide to Building, Tracking, Orchestrating, and Monitoring Machine Learning Pipelines

--

--