Get started with BigQuery ML & Cloud Composer(Airflow)

Ronnie Gregersen
BigQuery
Published in
5 min readJun 21, 2022

In my work as a Data Engineer consultant i often position BigQuery as the best data warehouse solution towards clients. There could be several reasons for this (scalability, price, speed, stability etc.) but one really cool feature to add on top of all the functional requirements a client might have is BigQuery ML.

With BigQuery ML we give data professionals a tool to simply build an ML model, host it and receive predictions from it all via SQL. The model performance is not perfect but use-full in low-business-impact use cases and/or to confirm an hypothesis.

A typical tool i more or less always face as data engineer is Apache Airflow, therefore it made sense to see how this could fit in with BigQuery ML

Here i will do a small example of how you can include your BigQuery ML model retraining in an Airflow DAG.

Let’s get to it!

I assume that you already have a project ready in GCP. Otherwise read more here: https://cloud.google.com/resource-manager/docs/creating-managing-projects

We of cause need data to test this out so i choose this dataset which should do the job: https://www.kaggle.com/datasets/mlg-ulb/creditcardfraud

1: Download the dataset
2: go to https://console.cloud.google.com/storage/browser?referrer=search&cloudshell=false
3: Create a bucket and upload the file. (Choose a name, add US location and standard storage class, leave the rest as default.)
4: Take note of the bucket name, we will use it in step 8.

5: Then we need a cloud composer environment. Run the below gcloud command in cloud shell this will take approx. 10 minutes to finish:

gcloud composer environments create mycomposer     --location us-central1     --image-version composer-2.0.17-airflow-2.2.5

Now that we have cloud composer set up with Airflow 2 we can add in a DAG.

6: Open a new file on your local PC. Copy the following code and save it as airflow_dag.py. Remember to change PROJECT to your project id and BUCKET to the bucket where you placed the csv file in step 4.

import datetimefrom airflow import models
from airflow.operators import bash
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator,BigQueryCreateEmptyTableOperator, BigQueryCreateEmptyDatasetOperator, BigQueryDeleteDatasetOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
PROJECT = "INSERT YOUR PROJECT NAME HERE"
BUCKET = "INSERT YOU BUCKET NAME HERE"
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
createModelQuery = (
f"""CREATE MODEL `{PROJECT}.initial_load.test_model`
OPTIONS(MODEL_TYPE='BOOSTED_TREE_CLASSIFIER',
BOOSTER_TYPE = 'GBTREE',
NUM_PARALLEL_TREE = 1,
MAX_ITERATIONS = 50,
TREE_METHOD = 'HIST',
EARLY_STOP = FALSE,
SUBSAMPLE = 0.85,
INPUT_LABEL_COLS = ['string_field_30'])
AS SELECT * FROM `{PROJECT}.initial_load.test_materialized_view`;
""")
with models.DAG(
'bq_dl',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:

# Print the dag_run id from the Airflow logs
print_dag_run_conf = bash.BashOperator(task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')
create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id="initial_load")
delete_dataset = BigQueryDeleteDatasetOperator(task_id="delete_dataset", dataset_id="initial_load", delete_contents=True)
create_table = BigQueryCreateEmptyTableOperator(task_id="create_table", dataset_id="initial_load", table_id="test_table")
load_csv = GCSToBigQueryOperator(task_id='load_data',bucket=BUCKET,source_objects=['*.csv'],destination_project_dataset_table='{PROJECT}.initial_load.test_table',source_format="CSV",create_disposition="CREATE_IF_NEEDED",write_disposition="WRITE_APPEND",autodetect=True,dag=dag)
#TODO: You could add more preprocessing and or feature selection to improve the model
create_materialized_view = BigQueryCreateEmptyTableOperator(
task_id="create_materialized_view",
dataset_id="initial_load",
table_id="test_materialized_view",
materialized_view={
"query": f"""
SELECT
string_field_4,
string_field_7,
string_field_8,
string_field_10,
string_field_11,
string_field_12,
string_field_13,
string_field_14,
string_field_17,
string_field_19,
string_field_20,
string_field_21,
string_field_26,
string_field_28,
string_field_29,
string_field_30
FROM `{PROJECT}.initial_load.test_table`WHERE string_field_30 != 'Class'""","enableRefresh": True,
"refreshIntervalMs": 86400000,
},
)
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration={
"query": {
"query": createModelQuery,
"useLegacySql": False,
}
},
location="US",
)
print_dag_run_conf >> delete_dataset >> create_dataset >> create_table >> load_csv >> create_materialized_view >> insert_query_job

7: Now go to https://console.cloud.google.com/composer/environments?cloudshell=false&, choose your cloud composer, and click “OPEN DAGS FOLDER”. This will open a bucket. Upload airflow_dag.py here.

8: You can now go back to https://console.cloud.google.com/composer/environments?cloudshell=false& and click on your cloud composer enviorment.

9: Then click “AIRFLOW UI” where you now see the airflow UI after a few minutes you should be able to see the bq_dl DAG.

10: Click “bq_dl” and if you then go to “graph” you should get a nice graph view of the pipeline:

11: You can now click the play button in the upper right corner and choose “Trigger DAG”

Follow the progress in the “Tree” or “Graph” tab. Should take approx. 15 minutes to finish.

When the DAG has finished you can go to https://console.cloud.google.com/bigquery?authuser=0& and find the following dataset, model, view and table.

We can now review the model:

Its clear that this is not a fantastic model, but keep in mind that we did no preprocessing, and just a bit of feature selection and still got a model performance that for some use cases could be enough. Also the compute time spend importing the data, training and hosting the model was <15 minutes.

This was a quick walkthrough of the two tools. Adding in airflow allows us to automate retraining based on time, but could also be triggered based on other conditions such as drift, accuracy etc. Further transformations and configurations could be made to optimize BigQuery such as partitioning, clustering etc.

Vertex AI/ Kubeflow is most properly better for this than Airflow, however Airflow could in some cases be easier to operate if the team is already using Airflow.

I hope this gave you some inspiration to use BigQuery ML and perhaps together with Airflow. Feel free to reach out!

Have a good one ;)

--

--