CI/CD and Scheduling for Dataform Pipelines

Moukhtar Ahmed
Google Cloud - Community
4 min readJan 24, 2023

Continuous integration (CI) and continuous delivery (CD) is an essential process to ensure high quality, maintainability, and adaptability, reliability of the data any data processing pipelines.

Dataform CI/CD Process

CI/CD flow for dataform pipelines on GCP
  • Push a new change to the dataform project git repository.
  • CI pipeline triggered to execute [compile, test and dry-run stages]
dataform_compile:
stage: compile
image:
name: dataformco/dataform:latest
entrypoint: [""]
script:
- dataform install
- dataform compile

dataform_test:
stage: test
image:
name: dataformco/dataform:latest
entrypoint: [""]
script:
- dataform install
- dataform test

dataform_dry_run:
stage: dry_run
image:
name: dataformco/dataform:latest
entrypoint: [""]
script:
- dataform install
- dataform run --dry-run
FROM dataformco/dataform:latest

# Set working directory
ENV DATAFORM_DIR /dataform/
RUN mkdir $DATAFORM_DIR
WORKDIR $DATAFORM_DIR

# Copy files to the image
COPY ../* $DATAFORM_DIR

# install dataform dependencies
RUN dataform install

# Run dataform
ENTRYPOINT ["dataform"]
build_and_push:
stage: build_and_push
tags:
- $GITLAB_RUNNER_TAG
image:
name: gcr.io/kaniko-project/executor:debug
entrypoint: [""]
script:
- /kaniko/executor --context $CI_PROJECT_DIR --dockerfile $CI_PROJECT_DIR/Dockerfile --destination $DOCKER_ARTIFACTS_REGISTRY/$PIPELINE_NAME:$ENVIRONMENT --destination $DOCKER_ARTIFACTS_REGISTRY/$PIPELINE_NAME:$ENVIRONMENT-$CI_COMMIT_SHA%
  • Deploy the dag to the cloud composer bucket [Pipeline Scheduler].
deploy_dag:
tags:
- $GITLAB_RUNNER_TAG
extends: .ci-tools-auth
needs: ["test"]
stage: deploy_dag
image: google/cloud-sdk:latest
script:
- base64 --decode token.txt > token2.txt
- gcloud alpha storage cp -R dags/* $DAG_BUCKET/dags --access-token-file=./token2.txt
  • Composer DAG task will run and execute the pipeline code.
  • All tables will be created/updated in BigQuery.

A full reference implementation for CI pipeline using [gitlab-ci].

Scheduling

Cloud Composer is a fully managed workflow orchestration service built on Apache Airflow.

Cloud Composer as dataform scheduler.

  • Dataform can be scheduled using DAG/task in Cloud Composer
  • Dataform will run on top of Kubernetes Cluster (compared to local machine / VM)
  • The Kubernetes Cluster can be a standalone or Cloud Composer GKE cluster. If using the Cloud Composer GKE cluster, each Dataform run will run in a dedicated Kubernetes pod.
def run_dataform_on_kubernetes(namespace, image, job_id, tags, cmd=None, timeout='30s', **context):
"""This function will execute the KubernetesPodOperator as an Airflow task"""
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
KubernetesPodOperator(
task_id=f'dataform_cli_{cmd}',
name=f'dataform_cli_{cmd}',
cmds=["bash", "-c", f"dataform {cmd} --vars=jobId='{job_id}' --tags '{tags}' --timeout {timeout}"],
namespace=namespace,
service_account_name='default',
image_pull_policy='Always',
image=image,
get_logs=True, # Capture logs from the pod
log_events_on_failure=True, # Capture and log events in case of pod failure
is_delete_operator_pod=True, # To clean up the pod after runs
).execute(context)
  • Cloud Composer can manage the variables and environment variables which can also be further used by Dataform pipelines.

The following example is a DAG implementation for Citibike Data Vault 2.0

"""CITIBIKE Data Vault Ingestion Pipeline DAG"""

import uuid
from airflow import models
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator

## ENV CONFIG
ENV = "dev"
PIPELINE_NAME = "citibike"

## SOURCE CONFIG
PROJECT_ID = models.Variable.get(f"{ENV}_project_id")

## PIPELINE_CONFIG
IMAGE = models.Variable.get(f"{ENV}_{PIPELINE_NAME}_dataform_image")
K8S_NAMESPACE = models.Variable.get(f"{ENV}_k8s_operator_namespace", "default")
LOAD_ID = str(uuid.uuid4().hex)

PIPELINE_NAME = f"{ENV}_{PIPELINE_NAME}_data_vault_ingestion"

default_args = {
"start_date": days_ago(1)
}


def run_dataform_on_kubernetes(namespace, image, job_id, tags, cmd=None, timeout='30s', **context):
"""This function will execute the KubernetesPodOperator as an Airflow task"""
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
KubernetesPodOperator(
task_id=f'dataform_cli_{cmd}',
name=f'dataform_cli_{cmd}',
cmds=["bash", "-c", f"dataform {cmd} --vars=jobId='{job_id}' --tags '{tags}' --timeout {timeout}"],
namespace=namespace,
service_account_name='default',
image_pull_policy='Always',
image=image,
get_logs=True, # Capture logs from the pod
log_events_on_failure=True, # Capture and log events in case of pod failure
is_delete_operator_pod=True, # To clean up the pod after runs
).execute(context)


with models.DAG(
PIPELINE_NAME,
default_args=default_args,
schedule_interval='@once',
tags=[ENV, PIPELINE_NAME, "raw"]
) as dag:
src_to_stg = PythonOperator(
task_id='from_source_to_stage_zone',
provide_context=True,
python_callable=run_dataform_on_kubernetes,
op_kwargs={
"namespace": K8S_NAMESPACE,
"image": IMAGE,
"load_id": LOAD_ID,
"tags": "stage",
"cmd": "run"
}
)

stg_to_raw = PythonOperator(
task_id='from_stage_zone_to_raw_zone',
provide_context=True,
python_callable=run_dataform_on_kubernetes,
op_kwargs={
"namespace": K8S_NAMESPACE,
"image": IMAGE,
"load_id": LOAD_ID,
"tags": "raw",
"cmd": "run"
}
)

src_to_stg >> stg_to_raw

Logging and monitoring

We can see the dataform logs from Cloud Composer UI which integrated with Cloud Logging for each DAG run or Airflow UI.

dataform run log using Airflow UI

Backfilling

One of the best features in the Airflow is how it handles backfill. This Airflow feature can be implemented together with dataform.

Dataform DAG run Example

As a usual Airflow DAG, you can rerun any of the tasks or DAG run to backfill the data. Using the correct setup, the dataform will be able to handle the backfill mechanism from Airflow.

--

--