Building, Tracking and Orchestrating End-to-End Machine Learning Pipelines with MLflow, Hyperopt, and Prefect
The steps are the following:
- Introduction
- Set up the Environment
- Configure MLflow
- Load and split the data
- Train and tune the model
- Choose the best model
- Promote the best model for production
- Serve the best model
- Orchestrate the pipeline using Prefect
- Make a request to the deployed model
Introduction
Building and deploying machine learning models can be a complex and time-consuming process. However, there are a number of tools that can help to automate and streamline the process. In this blog post, we will discuss how to build, track and orchestrate end-to-end machine learning pipelines using MLflow, Hyperopt, and Prefect.
MLflow is a platform for managing the entire machine learning lifecycle, from experiment tracking to model deployment. Hyperopt is a library for hyperparameter optimization, which can help you to find the best hyperparameters for your machine learning models. Prefect is a workflow management tool that can help you to automate your machine learning pipelines.
By combining these three tools, you can build end-to-end machine learning pipelines that are reproducible, scalable, and easy to maintain. In this blog post, we will walk you through the steps involved in building and deploying a machine learning pipeline using MLflow, Hyperopt, and Prefect.
Set up the Environment
1- Instal libraries, in cmd run
pip install mlflow
pip install hyperopt
pip install xgboost
pip install prefect
2- Lunch mlflow server, in cmd run
mlflow server --backend-store-uri sqlite:///backend.db --default-artifact-root ./mlruns
This command starts an instance of the MLflow server with the following configurations:
--backend-store-uri sqlite:///backend.db
: specifies the backend store URI where the MLflow server should persist metadata related to experiments, runs, parameters, metrics, and artifacts. In this case, the backend store uses an SQLite database file namedbackend.db
.--default-artifact-root ./mlruns
: specifies the default artifact store location where the MLflow server should store artifacts generated by runs. In this case, the default artifact store location is the./mlruns
directory relative to the current working directory.
3- Lunch prefect server, in cmd run
prefect server start
The prefect server start
command is used to start the Prefect Server. The Prefect Server is a central daemon that provides a variety of features for managing and executing Prefect flows, including:
- Flow execution: The Prefect Server can be used to execute flows, both locally and in a distributed fashion.
- Flow monitoring: The Prefect Server can be used to monitor the execution of flows, providing information such as the status of each task, the logs for each task, and the metrics for each task.
- Flow scheduling: The Prefect Server can be used to schedule the execution of flows, either on a recurring basis or on demand.
- Flow versioning: The Prefect Server can be used to version flows, providing a way to track changes to flows over time.
To view the Prefect UI, open a web browser and navigate to http://127.0.0.1:4200/
Configure MLflow
- The
@task
decorator tells Prefect that this function is a task. - The
mlflow.set_tracking_uri()
method connects to the MLflow tracking server. - The
mlflow.set_experiment()
method sets the experiment name. - The
mlflow.get_experiment_by_name()
method gets the experiment ID for the given experiment name. - The return value of the task is the experiment ID.
Load and split the data
two Prefect tasks that load and split the MNIST dataset. The first task, load_data()
, loads the MNIST dataset using the datasets.load_digits()
method from the sklearn.datasets
library. The second task, split_data()
, splits the dataset into a training set and a test set using the train_test_split()
method from the sklearn.model_selection
library.
The @task
decorator tells Prefect that these functions are tasks. Prefect tasks are used to define the steps in a workflow. In this case, the tasks are loading and splitting the MNIST dataset.
The load_data()
task first loads the MNIST dataset using the datasets.load_digits()
method. This method returns a dictionary containing the features (x) and labels (y) of the dataset. The task then creates a Pandas DataFrame from the dictionary. The DataFrame has the features in the first n-1
columns and the label in the last column.
The split_data()
task splits the DataFrame into a training set and a test set using the train_test_split()
method. This method takes the DataFrame, the column containing the labels, the test size, and the random state as input. The method returns the training set, the test set, the training labels, and the test labels.
Train and tune the model
This code defines a Prefect task called train_hyperparameter_tuning
that performs hyperparameter tuning for an XGBoost classifier on a given dataset.
The task takes as input the training and test sets (x_train
, x_test
, y_train
, y_test
) and a model_name
parameter, which is the name of the registered model in MLflow.
The hyperparameters to be tuned are defined using the hp
module from the hyperopt
library. The search_space
variable defines the range of values for each hyperparameter.
The objective
function defines the objective to be minimized during hyperparameter tuning. It creates an XGBoost classifier with the given hyperparameters, fits the classifier on the training data, and evaluates the model on the test data. It logs the hyperparameters, accuracy, and F1-score of the model using MLflow. It also logs the trained model as an artifact in MLflow.
The fmin
function from hyperopt
is used to minimize the objective function by searching the hyperparameter space defined in search_space
. tpe.suggest
is used as the search algorithm, and max_evals
is set to 5 to limit the number of evaluations.
The best_result
variable contains the hyperparameters that resulted in the best accuracy during hyperparameter tuning. The function returns best_result
.
By defining this task as a Prefect task with the @task
decorator, it can be integrated into a larger Prefect workflow to perform hyperparameter tuning for an XGBoost classifier.
Specifically, the hyperparameters being tuned are:
learning_rate
: the learning rate of the XGBoost algorithmmax_depth
: the maximum depth of each tree in the XGBoost algorithmgamma
: the minimum loss reduction required to make a further partition on a leaf node of the treecolsample_bytree
: the subsample ratio of columns when constructing each treereg_alpha
: the L1 regularization term on weightsreg_lambda
: the L2 regularization term on weightsseed
: the random seed used for the XGBoost algorithm.
Choose the best model
This code defines a Prefect task called get_best_model
that retrieves the version and URI of the best performing model based on the accuracy
metric from a given MLflow experiment.
The task takes as input the experiment_id
for the experiment to search for the best model.
The MlflowClient
class from the mlflow
library is used to interact with the MLflow tracking server at the specified tracking_uri
(in this case, http://127.0.0.1:5000
).
The search_runs
method is used to search for runs in the specified experiment that are currently active and are sorted by the accuracy
metric in descending order. The [0]
index is used to select the first run, which represents the best performing model.
The run_id
variable is set to the ID of the selected run, and the model_uri
variable is set to the URI of the model artifact in MLflow.
The search_model_versions
method is used to search for the version of the model in the selected run. The version number is stored in the model_version
variable.
The task returns a tuple containing the model_version
and model_uri
.
Promote the best model to production
This code defines a Prefect task called promote_best_model
that promotes the specified model version to the "Production" stage in MLflow.
The task takes as input the model_version
to promote and the model_name
of the registered model in MLflow.
The MlflowClient
class from the mlflow
library is used to interact with the MLflow tracking server at the specified tracking_uri
(in this case, http://127.0.0.1:5000
).
The transition_model_version_stage
method is used to transition the specified model_version
of the registered model model_name
to the "Production" stage. The archive_existing_versions
parameter is set to False
, which means that previous versions of the model in the "Production" stage will not be archived.
Serve the best model
This code defines a Prefect task called serve_model
that deploys the specified MLflow model to a local server for inference.
The task takes as input the model_uri
of the registered model in MLflow to serve.
The os.system
function from the os
library is used to execute shell commands.
The first command killport 8080
is used to terminate any running server on port 8080.
The second command os.chdir("D:\DEV\Prefect")
sets the current working directory to a specific path on the local machine.
The third command sets the MLFLOW_TRACKING_URI
environment variable to the address of the MLflow tracking server.
The fourth command uses the mlflow
command line interface to serve the specified model on port 8080. The --no-conda
flag is used to indicate that no conda environment is needed to serve the model.
Orchestrate the pipeline using Prefect
This code defines a Prefect flow called main
that performs the following tasks:
- Retrieves the ID of the specified
experiment_name
in MLflow using themlflow_environment
function. - Loads data using the
load_data
function. - Splits the data into training and test sets using the
split_data
function. - Performs hyperparameter tuning using the
train_hyperparameter_tuning
task and the specifiedmodel_name
. - Retrieves the best performing model in the specified experiment using the
get_best_model
task. - Promotes the best performing model to the “Production” stage using the
promote_best_model
task. - Deploys the best performing model to a local server using the
serve_model
task.
The @flow
decorator is used to define the main
function as a Prefect flow.
The if __name__ == "__main__":
block is used to define the deployment of the Prefect flow using Deployment.build_from_flow
. This creates a new deployment of the main
flow with the following configuration:
- Name:
"model_training_and_tuning_weekly"
- Parameters:
{'experiment_name':'digits_experiment', 'model_name':'xgboost'}
- Schedule: At 12:00 AM, only on Thursday, in the
"Africa/Cairo"
timezone - Version:
1
- Work Queue Name:
"ml"
The apply
method is used to apply the deployment, which registers the flow and its configuration in the Prefect backend and schedules it to run according to the defined schedule.
Run the following command in cmd
prefect agent start --pool default-agent-pool --work-queue ml
This command starts a Prefect agent to manage the execution of Prefect flows on a specific pool and work queue.
The prefect agent start
command starts a new Prefect agent process.
The --pool
flag specifies the name of the pool that the agent should use for executing flows. In this case, the pool is named default-agent-pool
.
The --work-queue
flag specifies the name of the work queue that the agent should use for receiving work. In this case, the work queue is named ml
.
Run the following command in cmd
python app.py
app.py
script contains a Prefect flow defined with the @flow
decorator, the flow will be registered with the Prefect backend and scheduled to run according to its defined schedule.
open a web browser and navigate to http://127.0.0.1:4200/
open a web browser and navigate to http://127.0.0.1:5000/
Make a request to the deployed model
This code defines a dictionary called row
that represents a single row of pixel values for a digit image. It then specifies the address of a local server running an ML model for digit classification using the host
and port
variables.
The url
variable is used to specify the endpoint for the model's prediction API, which is http://{host}:{port}/invocations
.
The headers
variable is a dictionary that specifies the format of the data that will be sent to the prediction API.
The input_data
variable is a dictionary that contains the input data to be sent to the prediction API. In this case, it contains a single record of digit image pixel values represented by the row
dictionary.
The code then uses the requests
library to send a POST request to the prediction API using the url
, headers
, and input_data
variables. The json.dumps
function is used to convert the input_data
dictionary to a JSON-encoded string that can be sent in the request body.
Finally, the code prints the prediction result returned by the prediction API using the r.text
property.
conclusion
To summarize, this blog has demonstrated how to leverage MLflow, Hyperopt, and Prefect to build and orchestrate end-to-end machine learning pipelines. By combining these open-source technologies, data scientists and machine learning engineers can improve the efficiency and reproducibility of their work, and accelerate the development of machine learning applications. The blog has provided an overview of each technology and demonstrated how to integrate them into a single pipeline to solve a machine learning problem. Overall, these technologies offer a powerful toolset for managing the entire machine learning lifecycle from data preparation and model training to deployment.
you can find script on GitHub
For more information on hyperparameter tuning and model serving, you can also read the article “MLflow, Hyperopt, Prefect, Evidently, and Grafana: The Ultimate Guide to Building, Tracking, Orchestrating, and Monitoring Machine Learning Pipelines”