Machine Learning Operations (MLOps) Pipeline using Google Cloud Composer

Engineering@ZenOfAI
ZenOf.AI
Published in
10 min readJul 29, 2019

In an earlier post, we had described the need for automating the Data Engineering pipeline for Machine Learning based systems. Today, we will expand the scope to setup a fully automated MLOps pipeline using Google Cloud Composer.

Cloud Composer

Cloud Composer is official defined as a fully managed workflow orchestration service that empowers you to author, schedule, and monitor pipelines that span across clouds and on-premises data centers. Built on the popular Apache Airflow open source project and operated using the Python programming language, Cloud Composer is free from lock-in and easy to use.

So let’s get on with the required steps to create this MLOps infrastructure on Google Cloud Platform

Creating a Cloud Composer Environment

Step1: Please enable the Cloud Composer API.

Step2: Go to create environment page in GCP console. Composer is available in Big Data section.

Step3: Click on create to start creating a Composer environment

Step4: Please select the Service account which has the required permissions to access GCS, Big Query, ML Engine and Composer environment. The required roles for accessing Composer environment is Composer Administrator and Composer Worker.
For more details about access control in Composer environment please see this.

Step5: Please use Python Version 3 and latest Image version.

Step6: Click on create. It will take about 15–20 minutes to create the environment. Once it completes, the environment page shall look like the following.

Click on Airflow to see Airflow WebUI. The Airflow WebUI looks as follows

DAGs folder is where our dag file is stored. DAG folder is nothing but a folder inside a GCS bucket which is created by the environment. To know more about the concept of DAG and general introduction to Airflow, please refer to this post.

You could see Composer related logs in Logging.

Step7: Please add the following PyPI packages in Composer environment.

Click on created environment and navigate to PYPI packages and click on edit to add packages

The required packages are:

# to read data from MongoDB
pymongo==3.8.0
oauth2client==4.1.3
# to read data from firestore
google-cloud-firestore==1.3.0
firebase-admin==2.17.0
google-api-core==1.13.0

Create a ML model

Step1: Please create a folder structure like the following on your instance.

ml_model
├── setup.py
└── trainer
├── __init__.py
└── train.py

Step2: Please place the following code in train.py file, which shall upload the model to GCS bucket as shown below. This model would be used to create model versions as explained a bit later. Please make sure you replace the placeholders below (in caps) with appropriate values for your use case.

from google.cloud import bigquery
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
import numpy as np
from google.cloud import storage
import datetime
import json
import pickle
client = bigquery.Client()
sql = '''
SELECT *
FROM `<PROJECT_ID>.<DATASET>.<TABLENAME>`
'''
df = client.query(sql).to_dataframe()
df = df[['is_stressed', 'is_engaged', 'status']]
df['is_stressed'] = df['is_stressed'].fillna('n')
df['is_engaged'] = df['is_engaged'].fillna('n')
df['stressed'] = np.where(df['is_stressed']=='y', 1, 0)
df['engaged'] = np.where(df['is_engaged']=='y', 1, 0)
df['status'] = np.where(df['status']=='complete', 1, 0)
feature_cols = ['stressed', 'engaged']
X = df[feature_cols]
y = df.status
X_train,X_test,y_train,y_test=train_test_split(X,y,test_size=0.25,random_state=0)
logreg = LogisticRegression()
logreg.fit(X_train,y_train)
pkl_filename = "model.pkl"
with open(pkl_filename, 'wb') as file:
pickle.dump(logreg, file)
BUCKET_NAME=BUCKET_NAME
# Upload the model to GCS
bucket = storage.Client().bucket(BUCKET_NAME)
file_path = datetime.datetime.now().strftime('machine_learning/models/%Y%m%d_%H%M%S')
blob = bucket.blob('{}/{}'.format(
file_path,
pkl_filename))
blob.upload_from_filename(pkl_filename)
file_location = 'gs://{BUCKET_NAME}/{file_path}'.format(bucket_name=BUCKET_NAME, file_path=file_path)
file_config = json.dumps({'file_location': file_location})
COMPOSER_BUCKET = '<GCS-bucket-created-by-composer-environment>'
bucket = storage.Client().bucket(COMPOSER_BUCKET)
blob = bucket.blob('data/file_config.json')
blob.upload_from_string(file_config)

Step3: Create an empty init.py file inside the trainer directory.

Step4: Please place the following code in setup.py file. The setup.py file contains required packages to execute code.

import setuptoolsREQUIRED_PACKAGES = [
'pandas-gbq==0.3.0',
'cloudml-hypertune',
'google-cloud-bigquery==1.14.0',
'urllib3'
]
setuptools.setup(
name='ml_model',
version='1.0',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages(),
include_package_data=True,
description='',
)

Step5: Packaging the code using the following command. It creates a gz file inside ml_model directory.

python3 setup.py sdist

Step6: The package name is the name that is specified in setup.py file. The package name becomes ml_model-1.0.tar.gz
Copy the package to gs://{your-GCS-bucket}/machine_learning/. This becomes the base directory for your machine learning activities described in this post.

Creating a DAG

In this use case, we have created a DAG file which exports some table data from a MongoDB instance into a GCS bucket and then creates a BigQuery table off of that exported data. It trains a model and creates version for that model as shown in the code below. The DAG file supports full data extraction and daily data extraction explained in the code below using a variable tot_data. This variable is extracted from Airflow configurations set by the user. The process to set this configuration is described later in this post.

Please place the following code in the DAG file.

import airflow
from airflow import DAG
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime
from airflow.operators.python_operator import PythonOperator
import pprint
import json
import re
from pymongo import MongoClient
from google.cloud import storage
from google.cloud.storage import blob
from google.cloud import storage
import os
from airflow import models
from mlengine_operator import MLEngineTrainingOperator, MLEngineVersionOperator
ts = datetime.now()
today = str(ts.date()) + 'T00:00:00.000Z'
yester_day = str(ts.date() - timedelta(days = 1)) + 'T00:00:00.000Z'
str_ts = ts.strftime('%Y_%m_%d_%H_%m_%S')config = Variable.get("mongo_conf", deserialize_json=True)
host = config['host']
db_name = config['db_name']
table_name = config['table_name']
file_prefix = config['file_prefix']
bucket_name = config['bucket_name']
# file_path = file_prefix + '/' + table_name + '.json'
file_path = '{file_prefix}/{table_name}/{table_name}_{str_ts}.json'.format(file_prefix=file_prefix, str_ts=str_ts, table_name=table_name)
file_location = 'gs://' + bucket_name + '/' + file_prefix + '/' + table_name + '/' + table_name + '_*.json'
config['file_location'] = file_location
bq_dataset = config['bq_dataset']
tot_data = config['tot_data'].lower()
BUCKET_NAME = config['ml_configuration']['BUCKET_NAME']
BASE_DIR = config['ml_configuration']['BASE_DIR']
PACKAGE_NAME = config['ml_configuration']['PACKAGE_NAME']
TRAINER_BIN = os.path.join(BASE_DIR, 'packages', PACKAGE_NAME)
TRAINER_MODULE = config['ml_configuration']['TRAINER_MODULE']
RUNTIME_VERSION = config['ml_configuration']['RUNTIME_VERSION']
PROJECT_ID = config['ml_configuration']['PROJECT_ID']
MODEL_NAME = config['ml_configuration']['MODEL_NAME']
MODEL_FILE_BUCKET = config['ml_configuration']['MODEL_FILE_BUCKET']
model_file_loc = config['ml_configuration']['MODEL_FILE_LOCATION']
bucket = storage.Client().bucket(MODEL_FILE_BUCKET)
blob = bucket.get_blob(model_file_loc)
file_config = json.loads(blob.download_as_string().decode("utf-8"))
export_uri = file_config['file_location']
def flatten_json(y):
out = {}
def flatten(x, name=''):
if type(x) is dict:
for a in x:
flatten(x[a], name + a + '_')
elif type(x) is list:
i = 0
for a in x:
flatten(a, name + str(i) + '_')
i += 1
else:
out[name[:-1]] = x
flatten(y)
return out
def mongoexport():
client = storage.Client()
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(file_path)
client = MongoClient(host)
db = client[db_name]
tasks = db[table_name]
pprint.pprint(tasks.count_documents({}))
# if tot_data is set to 'yes' in airflow configurations, full data
# is processed.
if tot_data == 'no':
query = {"edit_datetime": { "$gte": yester_day, "$lt": today}}
print(query)
data = tasks.find(query)
else:
data = tasks.find()
emp_list = []
for record in data:
emp_list.append(json.dumps(record, default=str))
flat_list =[]
for data in emp_list:
flat_list.append((flatten_json(json.loads(data))))
data = '\n'.join(json.dumps({re.sub('[^0-9a-zA-Z_ ]+', '', str(k)).lower().replace(' ', '_'): str(v) for k, v in record.items()}) for record in flat_list)
blob.upload_from_string(data)
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG('ml_pipeline', schedule_interval=None, default_args=default_args) as dag:# priority_weight has type int in Airflow DB, uses the maximum.
pymongo_export_op = PythonOperator(
task_id='pymongo_export',
python_callable=mongoexport,
)
update_bq_table_op = BashOperator(
task_id='update_bq_table',
bash_command='''
bq rm -f {bq_dataset}.{table_name}
bq load --autodetect --source_format=NEWLINE_DELIMITED_JSON --ignore_unknown_values=True {bq_dataset}.{table_name} {file_location}
'''.format(bq_dataset=bq_dataset, table_name=table_name, file_location=file_location)
)
date_nospecial = '{{ execution_date.strftime("%Y%m%d") }}'
date_min_nospecial = '{{ execution_date.strftime("%Y%m%d_%H%m") }}'
uuid = '{{ macros.uuid.uuid4().hex[:8] }}'
training_op = MLEngineTrainingOperator(
task_id='submit_job_for_training',
project_id=PROJECT_ID,
job_id='{}_{}_{}'.format(table_name, date_nospecial, uuid),
package_uris=[os.path.join(TRAINER_BIN)],
training_python_module=TRAINER_MODULE,
training_args=[
'--base-dir={}'.format(BASE_DIR),
'--event-date={}'.format(date_nospecial),
],
region='us-central1',
runtime_version=RUNTIME_VERSION,
python_version='3.5')
create_version_op = MLEngineVersionOperator(
task_id='create_version',
project_id=PROJECT_ID,
model_name=MODEL_NAME,
version={
'name': 'version_{}_{}'.format(date_min_nospecial, uuid),
'deploymentUri': export_uri,
'runtimeVersion': RUNTIME_VERSION,
'pythonVersion': '3.5',
'framework': 'SCIKIT_LEARN',
},
operation='create')
pymongo_export_op >> update_bq_table_op >> training_op >> create_version_op

Once file is created, please upload the file to DAGs folder. And also please add the following plugin dependency file named mlengine_operator in DAGs folder.
Place the following code in mlengine_operator.py file.

import refrom apiclient import errorsfrom airflow.contrib.hooks.gcp_mlengine_hook import MLEngineHook
from airflow.exceptions import AirflowException
from airflow.operators import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.log.logging_mixin import LoggingMixin
log = LoggingMixin().logdef _normalize_mlengine_job_id(job_id):# Add a prefix when a job_id starts with a digit or a template
match = re.search(r'\d|\{{2}', job_id)
if match and match.start() is 0:
job = 'z_{}'.format(job_id)
else:
job = job_id
# Clean up 'bad' characters except templates
tracker = 0
cleansed_job_id = ''
for m in re.finditer(r'\{{2}.+?\}{2}', job):
cleansed_job_id += re.sub(r'[^0-9a-zA-Z]+', '_',
job[tracker:m.start()])
cleansed_job_id += job[m.start():m.end()]
tracker = m.end()
# Clean up last substring or the full string if no templates
cleansed_job_id += re.sub(r'[^0-9a-zA-Z]+', '_', job[tracker:])
return cleansed_job_idclass MLEngineBatchPredictionOperator(BaseOperator):

template_fields = [
'_project_id',
'_job_id',
'_region',
'_input_paths',
'_output_path',
'_model_name',
'_version_name',
'_uri',
]
@apply_defaults
def __init__(self,
project_id,
job_id,
region,
data_format,
input_paths,
output_path,
model_name=None,
version_name=None,
uri=None,
max_worker_count=None,
runtime_version=None,
gcp_conn_id='google_cloud_default',
delegate_to=None,
*args,
**kwargs):
super(MLEngineBatchPredictionOperator, self).__init__(*args, **kwargs)
self._project_id = project_id
self._job_id = job_id
self._region = region
self._data_format = data_format
self._input_paths = input_paths
self._output_path = output_path
self._model_name = model_name
self._version_name = version_name
self._uri = uri
self._max_worker_count = max_worker_count
self._runtime_version = runtime_version
self._gcp_conn_id = gcp_conn_id
self._delegate_to = delegate_to
if not self._project_id:
raise AirflowException('Google Cloud project id is required.')
if not self._job_id:
raise AirflowException(
'An unique job id is required for Google MLEngine prediction '
'job.')
if self._uri:
if self._model_name or self._version_name:
raise AirflowException('Ambiguous model origin: Both uri and '
'model/version name are provided.')
if self._version_name and not self._model_name:
raise AirflowException(
'Missing model: Batch prediction expects '
'a model name when a version name is provided.')
if not (self._uri or self._model_name):
raise AirflowException(
'Missing model origin: Batch prediction expects a model, '
'a model & version combination, or a URI to a savedModel.')
def execute(self, context):
job_id = _normalize_mlengine_job_id(self._job_id)
prediction_request = {
'jobId': job_id,
'predictionInput': {
'dataFormat': self._data_format,
'inputPaths': self._input_paths,
'outputPath': self._output_path,
'region': self._region
}
}
if self._uri:
prediction_request['predictionInput']['uri'] = self._uri
elif self._model_name:
origin_name = 'projects/{}/models/{}'.format(
self._project_id, self._model_name)
if not self._version_name:
prediction_request['predictionInput'][
'modelName'] = origin_name
else:
prediction_request['predictionInput']['versionName'] = \
origin_name + '/versions/{}'.format(self._version_name)
if self._max_worker_count:
prediction_request['predictionInput'][
'maxWorkerCount'] = self._max_worker_count
if self._runtime_version:
prediction_request['predictionInput'][
'runtimeVersion'] = self._runtime_version
hook = MLEngineHook(self._gcp_conn_id, self._delegate_to)# Helper method to check if the existing job's prediction input is the
# same as the request we get here.
def check_existing_job(existing_job):
return existing_job.get('predictionInput', None) == \
prediction_request['predictionInput']
try:
finished_prediction_job = hook.create_job(
self._project_id, prediction_request, check_existing_job)
except errors.HttpError:
raise
if finished_prediction_job['state'] != 'SUCCEEDED':
self.log.error('MLEngine batch prediction job failed: {}'.format(
str(finished_prediction_job)))
raise RuntimeError(finished_prediction_job['errorMessage'])
return finished_prediction_job['predictionOutput']class MLEngineModelOperator(BaseOperator):
template_fields = [
'_model',
]
@apply_defaults
def __init__(self,
project_id,
model,
operation='create',
gcp_conn_id='google_cloud_default',
delegate_to=None,
*args,
**kwargs):
super(MLEngineModelOperator, self).__init__(*args, **kwargs)
self._project_id = project_id
self._model = model
self._operation = operation
self._gcp_conn_id = gcp_conn_id
self._delegate_to = delegate_to
def execute(self, context):
hook = MLEngineHook(
gcp_conn_id=self._gcp_conn_id, delegate_to=self._delegate_to)
if self._operation == 'create':
return hook.create_model(self._project_id, self._model)
elif self._operation == 'get':
return hook.get_model(self._project_id, self._model['name'])
else:
raise ValueError('Unknown operation: {}'.format(self._operation))
class MLEngineVersionOperator(BaseOperator):

template_fields = [
'_model_name',
'_version_name',
'_version',
]
@apply_defaults
def __init__(self,
project_id,
model_name,
version_name=None,
version=None,
operation='create',
gcp_conn_id='google_cloud_default',
delegate_to=None,
*args,
**kwargs):
super(MLEngineVersionOperator, self).__init__(*args, **kwargs)
self._project_id = project_id
self._model_name = model_name
self._version_name = version_name
self._version = version or {}
self._operation = operation
self._gcp_conn_id = gcp_conn_id
self._delegate_to = delegate_to
def execute(self, context):
if 'name' not in self._version:
self._version['name'] = self._version_name
hook = MLEngineHook(
gcp_conn_id=self._gcp_conn_id, delegate_to=self._delegate_to)
if self._operation == 'create':
assert self._version is not None
return hook.create_version(self._project_id, self._model_name,
self._version)
elif self._operation == 'set_default':
return hook.set_default_version(self._project_id, self._model_name,
self._version['name'])
elif self._operation == 'list':
return hook.list_versions(self._project_id, self._model_name)
elif self._operation == 'delete':
return hook.delete_version(self._project_id, self._model_name,
self._version['name'])
else:
raise ValueError('Unknown operation: {}'.format(self._operation))
class MLEngineTrainingOperator(BaseOperator):

template_fields = [
'_project_id',
'_job_id',
'_package_uris',
'_training_python_module',
'_training_args',
'_region',
'_scale_tier',
'_runtime_version',
'_python_version',
'_job_dir'
]
@apply_defaults
def __init__(self,
project_id,
job_id,
package_uris,
training_python_module,
training_args,
region,
scale_tier=None,
runtime_version=None,
python_version=None,
job_dir=None,
gcp_conn_id='google_cloud_default',
delegate_to=None,
mode='PRODUCTION',
*args,
**kwargs):
super(MLEngineTrainingOperator, self).__init__(*args, **kwargs)
self._project_id = project_id
self._job_id = job_id
self._package_uris = package_uris
self._training_python_module = training_python_module
self._training_args = training_args
self._region = region
self._scale_tier = scale_tier
self._runtime_version = runtime_version
self._python_version = python_version
self._job_dir = job_dir
self._gcp_conn_id = gcp_conn_id
self._delegate_to = delegate_to
self._mode = mode
if not self._project_id:
raise AirflowException('Google Cloud project id is required.')
if not self._job_id:
raise AirflowException(
'An unique job id is required for Google MLEngine training '
'job.')
if not package_uris:
raise AirflowException(
'At least one python package is required for MLEngine '
'Training job.')
if not training_python_module:
raise AirflowException(
'Python module name to run after installing required '
'packages is required.')
if not self._region:
raise AirflowException('Google Compute Engine region is required.')
def execute(self, context):
job_id = _normalize_mlengine_job_id(self._job_id)
training_request = {
'jobId': job_id,
'trainingInput': {
'scaleTier': self._scale_tier,
'packageUris': self._package_uris,
'pythonModule': self._training_python_module,
'region': self._region,
'args': self._training_args,
}
}
if self._runtime_version:
training_request['trainingInput']['runtimeVersion'] = self._runtime_version
if self._python_version:
training_request['trainingInput']['pythonVersion'] = self._python_version
if self._job_dir:
training_request['trainingInput']['jobDir'] = self._job_dir
if self._mode == 'DRY_RUN':
self.log.info('In dry_run mode.')
self.log.info('MLEngine Training job request is: {}'.format(
training_request))
return
hook = MLEngineHook(
gcp_conn_id=self._gcp_conn_id, delegate_to=self._delegate_to)
# Helper method to check if the existing job's training input is the
# same as the request we get here.
def check_existing_job(existing_job):
return existing_job.get('trainingInput', None) == \
training_request['trainingInput']
try:
finished_training_job = hook.create_job(
self._project_id, training_request, check_existing_job)
except errors.HttpError:
raise
if finished_training_job['state'] != 'SUCCEEDED':
self.log.error('MLEngine training job failed: {}'.format(
str(finished_training_job)))
raise RuntimeError(finished_training_job['errorMessage'])

Import variables from composer_conf.json file into Airflow Variables.
Go to Airflow WebUI → Admin → Variables → Browse to file path or configure variables manually.
Please place the following in composer_conf

{
"mongo_conf": {
"host": "mongodb://<instance-internal-ip>:27017",
"db_name": "DBNAME",
"table_name": "TABLENAME",
"file_prefix": "Folder In GCS Bucket",
"bq_dataset": "BigQuery Dataset",
"bucket_name": "GCS Bucket",
"tot_data": "yes",
"ml_configuration": {
"BUCKET_NAME": "GCS Bucket",
"BASE_DIR": "gs://GCS Bucket/machine_learning/",
"PACKAGE_NAME": "PACKAGE NAME FROM setup.py FILE in ML",
"TRAINER_MODULE": "trainer.train",
"RUNTIME_VERSION": "1.13",
"PROJECT_ID": "GCP Project",
"MODEL_FILE_BUCKET": "BUCKET CREATED BY Composer Environment",
"MODEL_FILE_LOCATION": "data/MODEL LOCATION FILE",
"MODEL_NAME": "MODEL_NAME"
}
}

Please store any configuration files or credentials file that are used by Composer in the data folder in the bucket created by Composer environment.

After configuring variables accordingly, you can see the DAG named ml_pipeline in the Airflow WebUI.

Please trigger the DAG file from Airflow WebUI. Once the DAG ran successfully. It looks like the following:

Thanks for the read and look forward to your comments.

This story is authored by PV Subbareddy. Subbareddy is a Big Data Engineer specializing on Cloud Big Data Services and Apache Spark Ecosystem.

Originally published at http://blog.zenof.ai on July 29, 2019.

--

--