Google Cloud Storage Transfer Service
Custom metrics to monitor transfer jobs using STS notification
You may have used Storage Transfer Service(STS), which provides options to move data between buckets or from other cloud providers. You can schedule recurring or one-time STS jobs to move data for data backup, synchronization, and replication.
It’s a great managed service for various use cases util you start looking into monitoring. The UI for STS is not that user friendly and not integrated with Cloud Monitoring, and STS provides limited number of metrics out of box in Cloud Monitoring. Your company probably already has an observability stack in place, and you are thinking that you could create your own metrics to be injested into the observability stack. You are certainly heading to the right direction, and I am going to show you how STS metrics can be instrumented using STS notification and exported to Prometheus format for ingestion to monitoring systems that support the format.
How it works
The solution relies on STS notification, which is pubsub messages sent during the life cycle of a job. The following diagram illustrates high-level architecture and flow:
- STS sends notifications once a job changes the status, i.e. running to complete. Each notification contains detailed information on the job such as configuration, status, and error details if the job has failed.
- The metrics collection component processes received notification messages and record metrics data points. The component publishes the metrics through an endpoint, i.e. /metrics.
- The collection agent (i.e. a sidecar in k8s) scrapes the metrics periodically(a.k.a sampling rate) by calling the endpoint. The agent then sends the metrics to Cloud Monitoring or the other observability stack.
PoC
I’ve deveoped a PoC for the above architecture. The implementation uses OpenCensus metrics framework and its Prometheus exporter. The PoC requires a simple setup and the following GCP resources:
- Source Bucket — A GCS bucket as source
- Destination Bucket — A GCS bucket as destination
- Pubsub Topic — A topic for STS notifications
- Service Account
- STS: The service account is created by Google when STS jobs are configured. The account should have the read permission on the source bucket and write on the destination bucket.
- App: The service account that the example run as. The service account is given the permission to manage STS jobs and receive messages from the STS notification topic.
Check out the github repo for the details to run the PoC. Here are some code snippets:
Metrics Instrumentation:
private static void registerAllViews() {
Aggregation stsOpsLatencyDistribution =
Distribution.create(BucketBoundaries.create(STS_OPS_DISTRIBUTION_BUCKETS));
ViewManager viewManager = getViewManager();
View[] views =
new View[] {
View.create(
Name.create(METRIC_NAME_OBJECT_DELETED_NUM),
"The number of deleted objects",
M_OBJECT_DELETED_NUM,
Aggregation.Sum.create(),
STS_OPS_TAGS),
View.create(
Name.create(METRIC_NAME_OBJECT_DELETED_BYTES),
"The number of bytes deleted",
M_OBJECT_DELETED_BYTES,
Aggregation.Sum.create(),
STS_OPS_TAGS),
View.create(
Name.create(METRIC_NAME_STS_OPERATION_NUM),
"The number of STS operations",
M_STS_OPERATION_NUM,
Aggregation.Count.create(),
STS_OPS_TAGS),
View.create(
Name.create(METRIC_NAME_STS_OPERATION_LATENCY),
"The latency of STS operations",
M_STS_OPERATION_LATENCY_MS,
stsOpsLatencyDistribution,
STS_OPS_TAGS)
};
for (View view : views) {
viewManager.registerView(view);
}
}
STS Notification Handling:
public void handleEvent(PubsubMessage message) {
String messageId = message.getMessageId();
logger.info(String.format("<MessageId=%s>: %s ", messageId, message.getData().toStringUtf8()));
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
String operationName = null;
String jobName = null;
try {
JsonNode root = mapper.readTree(message.getData().toStringUtf8());
String status = root.get("status").asText();
String projectId = root.get("projectId").asText();
jobName = root.get("transferJobName").asText();
operationName = root.get("name").asText();
String startTime = root.get("startTime").asText();
String endTime = root.get("endTime").asText();
double latency =
Duration.between(Instant.parse(startTime), Instant.parse(endTime)).toMillis();
JsonNode counters = root.get("counters");
long objectsCopied = 0L;
if (counters.has("objectsCopiedToSink")) {
objectsCopied = counters.get("objectsCopiedToSink").asLong();
}
long bytesCopied = 0L;
if (counters.has("bytesCopiedToSink")) {
bytesCopied = counters.get("bytesCopiedToSink").asLong();
}
String sourceBucket =
root.get("transferSpec").get("gcsDataSource").get("bucketName").asText();
String destBucket = root.get("transferSpec").get("gcsDataSink").get("bucketName").asText();
// overwriting with the random values for demo purpose
latency = getRandomDouble(LATENCY_MIN, LATENCY_MAX);
objectsCopied = getRandomLong(NUM_OBJ_COPIED_MIN, NUM_OBJ_COPIED_MAX);
bytesCopied = getRandomLong(NUM_BYTES_MIN, NUM_BYTES_MAX);
generateMetrics(
sourceBucket, destBucket, projectId, status, latency, objectsCopied, bytesCopied);
} catch (IOException e) {
logger.error(
String.format(
"Failed to handle STS notification from job=%s operation=%s", jobName, operationName),
e);
}
}
Conclusion
STS notification is your best friend if you need to create your custom metrics in Cloud monitoring or integrate them into your own monitoring system. Hope the PoC provides a good reference and a starting point for you.