Save Money by shutting down GCE instances using Cloud Composer

Pradeep Kumar Singh
Google Cloud - Community
3 min readJul 15, 2022

Most of the GCP users use Compute Engine resources, which makes it easy to set up VMs on GCP with different sizes and flavors. These resources are billed based on their running hours. Lot of money can be saved by wisely reducing these GCE instances running hours.

There are various ways where we can optimise GCE cost. One of them is shutting down GCE instances when they are not getting used. For an example, we can shutdown all instances in ‘dev’ environment on weekends. This can save lot of money for us. The best way to do this is automate it using tools and services available on GCP. There is a pretty nice guide written which uses Cloud Scheduler, Pub/Sub, and Cloud Functions to stop and start the instances at a given schedule.

This article focuses on using Cloud Composer to schedule DAGs for stopping and starting the instances. Which one to choose among Cloud Scheduler+Pub/Sub+Cloud Function combination and Cloud Composer depends on many factors, we are not going to discuss that in this article in detail. One factor may be, if you are using Cloud Composer already in your environment then it makes sense to use the same for this automation as well.

Prerequisites

  1. User has already deployed a Cloud Composer environment and able to schedule his/her dags.
  2. Get Service account used by composer worker nodes. User can execute below gcloud command to get the same:
gcloud composer environments describe <composer_env_name> --location <region> --project <composer_env_project_id> --format json | jq '.config.nodeConfig.serviceAccount'

3. Assign compute instance admin role to the composer worker service account in order for dags to stop/start instances.

gcloud projects add-iam-policy-binding <compute_instance_project_id> --member='serviceAccount:<composer_worker_sa_email>' --role="roles/compute.instanceAdmin.v1"

4. Get GCS location where composer stores its dags. User can execute below gcloud command to get the same. We will use this bucket to upload the Dags.

gcloud composer environments describe <composer_env_name> --location <region> --project <composer_env_project_id> --format json | jq '.config.dagGcsPrefix'

Dag Details

Create DAG to stop instances using below code. Replace PROJECT_ID and ZONE variable’s values according to your env. This will stop all instances with label ‘env:dev’ in given project and zone at 12 AM Saturday every week.

import logging

from airflow import models, utils as airflow_utils
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.operators.dummy_operator import DummyOperator
import googleapiclient.discovery
from oauth2client.client import GoogleCredentials

ENVIRONMENT = "dev"
PROJECT_ID = <project_id>
ZONE = <zone>


DEFAULT_ARGS = {
'owner': 'airflow',
'start_date': airflow_utils.dates.days_ago(7)
}

class StopInstanceOperator(BaseOperator):
"""Stops the virtual machine instances."""
@apply_defaults
def __init__(self, project, zone, *args, **kwargs):
self._compute = None
self.project = project
self.zone = zone
super(StopInstanceOperator, self).__init__(*args, **kwargs)

def get_compute_api_client(self):
if self._compute is None:
credentials=GoogleCredentials.get_application_default()
self._compute = googleapiclient.discovery.build(
'compute', 'v1', cache_discovery=False,
credentials=credentials)
return self._compute

def list_instances(self):
instance_res=self.get_compute_api_client().instances().list(
project=self.project, zone=self.zone,
filter=f"labels.env={ENVIRONMENT}").execute()
return [instance['name']
for instance in instance_res.get('items', [])]

def execute(self, context):
for instance in self.list_instances():
logging.info(
'Stopping instance %s in project %s and zone %s',
instance, self.project, self.zone)
self.get_compute_api_client().instances().stop(
project=self.project, zone=self.zone,
instance=instance).execute()


with models.DAG(
"stop_gce_instances",
default_args=DEFAULT_ARGS,
schedule_interval='0 0 * * 6',
tags=["stop_gce_instances"],
) as dag:
begin = DummyOperator(task_id='begin')
end = DummyOperator(task_id='end')
stop_gce_instances = StopInstanceOperator(
project=PROJECT_ID, zone=ZONE, task_id='stop_gce_instances')
begin >> stop_gce_instances >> end

Create DAG to start instances using below code. Replace PROJECT_ID and ZONE variable’s values according to your env. This will start all instances with label ‘env:dev’ in given project and zone at 5 AM Monday every week.

import logging

from airflow import models, utils as airflow_utils
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.operators.dummy_operator import DummyOperator
import googleapiclient.discovery
from oauth2client.client import GoogleCredentials

ENVIRONMENT = "dev"
PROJECT_ID = <project_id>
ZONE = <zone>


DEFAULT_ARGS = {
'owner': 'airflow',
'start_date': airflow_utils.dates.days_ago(7)
}

class StartInstanceOperator(BaseOperator):
"""Start the virtual machine instances."""
@apply_defaults
def __init__(self, project, zone, *args, **kwargs):
self._compute = None
self.project = project
self.zone = zone
super(StartInstanceOperator, self).__init__(*args, **kwargs)

def get_compute_api_client(self):
if self._compute is None:
credentials= GoogleCredentials.get_application_default()
self._compute = googleapiclient.discovery.build(
'compute', 'v1', cache_discovery=False,
credentials=credentials)
return self._compute

def list_instances(self):
instance_res=self.get_compute_api_client().instances().list(
project=self.project, zone=self.zone,
filter=f"labels.env={ENVIRONMENT}").execute()
return [instance['name']
for instance in instance_res.get('items', [])]

def execute(self, context):
for instance in self.list_instances():
logging.info(
'Starting instance %s in project %s and zone %s',
instance, self.project, self.zone)
self.get_compute_api_client().instances().start(
project=self.project, zone=self.zone,
instance=instance).execute()


with models.DAG(
"start_gce_instances",
default_args=DEFAULT_ARGS,
schedule_interval='0 5 * * 1',
tags=["start_gce_instances"],
) as dag:
begin = DummyOperator(task_id='begin')
end = DummyOperator(task_id='end')
start_gce_instances = StartInstanceOperator(
project=PROJECT_ID, zone=ZONE,
task_id='start_gce_instances')
begin >> start_gce_instances >> end

Upload the dag to composer dags bucket folder. In order to test, trigger the DAGs from composer UI. DAG should finish successfully.

I borrowed most of the code from this article with very minor modifications.

Save Money!! Happy Reading!!! Comments and Suggestions are welcome.

--

--

Pradeep Kumar Singh
Google Cloud - Community

Senior Site Reliability Engineer — Google. Views are my own.