Aggregate Vertex AI model training logs in a BigQuery Table

Nitin Dhir
Google Cloud - Community
5 min readFeb 1, 2022

Vertex AI brings together the Google Cloud services for building ML under one, unified UI and API. In Vertex AI, you can now easily train and compare models using AutoML or custom code training and all your models are stored in one central model repository. These models can now be deployed to the same endpoints on Vertex AI.

If you’re writing your own training code instead of using AutoML, there are multiple ways of doing custom training to consider. There are three types of Vertex AI resources you can create to train — Custom Jobs, Hyperparameter tuning jobs and training pipeline. These resources upon execution generate logs entries with resource.type ”ml_job”.

Challenge —

Cloud Logging receives these ML job log entries through the Cloud Logging API where they pass through the Log Router. The sinks in the Log Router check each log entry against the existing inclusion filter (Severity, Resource Type ) and exclusion filters that determine which destinations, including Cloud Logging buckets, that the log entry should be sent to. Sinks control how Cloud Logging routes logs. Using sinks, you can route some or all of your logs to supported destinations and BigQuery is one of the supported destinations. We can create sink as shown below and push logs to provided BigQuery dataset —

Log Router Sink Details for pushing data to BQ directly
Sink Details for aggregating log entries directly to BQ

However, for new job id there is a new table created in selected BigQuery Dataset and it becomes cumbersome to join multiple table to have an aggregated analysis on various similar training jobs, as this sink creates new table for every job id as shown below —

A new table being created for every job id

Solution —

Another destination supported to route logs is Pub/Sub, which supports integration with other services like Dataflow, Splunk etc. We can create sink as shown below to push these log entries to Pub/Sub —

Sink Details for aggregating log entries to Pub/Sub

Dataflow is an unified stream and batch data processing service that’s serverless, fast, and cost-effective. It enables fast, simplified streaming data pipeline development with lower data latency. With Google provided Dataflow streaming template (Pub/Sub Subscription to BigQuery) we can push log entries to BQ in near real time.

The Pub/Sub Subscription to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub subscription and writes them to a BigQuery table. You can use the template as a quick solution to move Pub/Sub data to BigQuery. The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.

Below mentioned flow enable us to aggregate logs from all ML training jobs into the same BQ table —

Architecture flow for moving data to BQ table

But in the log entries created by ml jobs, certain log fields don’t’ adhere to BQ column naming conventions (BQ column names can only contain ‘_’ or alphanumeric characters whereas ml_job log fields have names like — “ml.googleapis.com/job_state”).

So, We have to use UDF to transform log field names by changing all special characters to “_”, so it adheres to BQ column naming conventions and then move data to BQ table. UDF (user-defined functions) helps customers to extend certain Dataflow templates with their custom logic to transform records on the fly.

UDF Function for changing log fields to a BQ compatible column name —

/**
* User-defined function (UDF) to transform events
* as part of a Dataflow template job.
*
* @param {string} inJson input Pub/Sub JSON message (stringified)
* @return {string} outJson output JSON message (stringified)
*/
function process(inJson) {
var obj = JSON.parse(inJson);
var keys = Object.keys(obj);
for each (var key in keys ) {
print(key);
if(key.indexOf("labels")!== -1)
{
for each (var item in Object.keys(obj[key]))
{
var newitem = item.replace(/([./!,])/g, "_")
//print(newitem)
obj[key][newitem] = obj[key][item]
delete obj[key][item]
}
}
//if(key.includes('.') || key.includes('/'))
// {
// var newkey = key.replace(['.','/']/g,'_')
// obj[newkey] = obj[key]
// delete obj[key]
// }
};
if (!obj.hasOwnProperty('jsonPayload')) {
obj.hasOwnProperty('')
return JSON.stringify(obj); }
}

Data Flow job pipeline options where we have to describe output table spec which is the BQ output table (The table’s schema must match the input JSON objects) and UDF script path —

javascriptTextTransformGcsPath is the path where UDF script is located in GCS

Graph View of the Dataflow Job —

Graph View — Dataflow Job

The above process moves all ML job log entries to a single BQ table from where it can be analyzed easily. Image below shows how log entries for multiple jobs are in the same table —

Log entries from multiple jobs

Wrap up -

This process can be used in a generic way for aggregation of any resource type or filters if the aggregation of that filter is not happening in a single BQ table out of the box. You only have to modify the UDF script to handle the newer transformation for log fields if required.

References -

--

--

Nitin Dhir
Google Cloud - Community

Engineer @ Google, working with Startups and Enterprises to help them solve their business problem using technology. Experienced in AWS, GCP, Azure platform