Schedule GCP STS Transfer Job with Cloud Composer

Pradeep Kumar Singh
Google Cloud - Community
2 min readJul 9, 2022

Storage transfer service can be used to transfer data from on-premise storage systems to GCS buckets. It provides many useful features like one time and scheduled transfers, data integrity checks, metadata preservation etc.

Storage transfer service currently supports one hour as the minimum scheduling frequency. In case if scheduling needs to be done at minute level frequency then we need to use scheduling tools like Cloud Scheduler and Cloud Composer. My colleague Bambang Satrijotomo has written a very nice article on how to schedule the STS job using cloud scheduler. Cloud scheduler is simpler to use but it does not support features like VPC SC as of now. If the Cloud Composer is already being used in the user’s environment then recommendation will be to use Cloud Composer.

In this article we will implement a cloud composer dag which will run STS transfer jobs. We will schedule this dag to run every ten minutes.

Prerequisites

  1. User has already deployed a Cloud Composer environment and able to schedule his/her dags.
  2. User has an existing and working STS job which needs to run periodically.
  3. Get Service account used by composer worker nodes. User can execute below gcloud command to get the same:
gcloud composer environments describe <composer_env_name> --location <region> --project <composer_env_project_id> --format json | jq '.config.nodeConfig.serviceAccount'

4. Get GCS location where composer stores its dags. User can execute below gcloud command to get the same:

gcloud composer environments describe <composer_env_name> --location <region> --project <composer_env_project_id> --format json | jq '.config.dagGcsPrefix'

5. Assign storage transfer user role to the composer worker service account.

gcloud projects add-iam-policy-binding <sts_job_project_id> --member='serviceAccount:<composer_worker_sa_email>' --role="roles/storagetransfer.user"

Dag Details

Create a DAG using below code. Replace PROJECT_ID and TSOP_JOB_NAME variable’s values according to your env. I am scheduling the dag at every 10 minutes, user can chose this interval according to their requirement.

import datetime
import logging
import time

from airflow import models, utils as airflow_utils
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import googleapiclient.discovery
from oauth2client.client import GoogleCredentials



PROJECT_ID = <sts_job_project_id>
TSOP_JOB_NAME = <transfer_job_name>

DEFAULT_ARGS = {
'owner': 'airflow',
'start_date': airflow_utils.dates.days_ago(7),
}


class TSOPJobRunOperator(BaseOperator):
"""Runs the TSOP job."""

@apply_defaults
def __init__(self, project, job_name, *args, **kwargs):
self._tsop = None
self.project = project
self.job_name = job_name
super(TSOPJobRunOperator, self).__init__(*args, **kwargs)

def get_tsop_api_client(self):
if self._tsop is None:
credentials = GoogleCredentials.get_application_default()
self._tsop = googleapiclient.discovery.build(
'storagetransfer', 'v1', cache_discovery=False,
credentials=credentials)
return self._tsop

def execute(self, context):
logging.info('Running Job %s in project %s',
self.job_name, self.project)
self.get_tsop_api_client().transferJobs().run(
jobName=self.job_name,
body={'project_id': self.project}).execute()

with models.DAG(
"run_tsop_transfer_job",
default_args=DEFAULT_ARGS,
schedule_interval='*/10 * * * *'
tags=["tsop_job"],
user_defined_macros={"TSOP_JOB_NAME": TSOP_JOB_NAME},
) as dag:
run_tsop_job = TSOPJobRunOperator(
project=PROJECT_ID, job_name=TSOP_JOB_NAME,
task_id='run_tsop_job')

Upload the dag to composer dags bucket folder. In order to test, trigger the DAG from composer UI. DAG should finish successfully. STS job run can be verified from the cloud console.

Suggestions and comments are welcome!!! Happy Reading.

--

--

Pradeep Kumar Singh
Google Cloud - Community

Senior Site Reliability Engineer — Google. Views are my own.