Building a Stock Data Workflow with Google Cloud Composer, Cloud Storage, and BigQuery

Run Apache Airflow without having to manage the infrastructure

Amarachi Ogu
8 min readFeb 9, 2023

Workflow management has become a crucial aspect of modern data engineering, especially with the increasing popularity of cloud computing. A workflow management system allows you to automate and manage complex pipelines, ensuring that data is processed and analyzed efficiently.

Let’s assume you have a data workflow to collect, move, transform, and validate data, and each day, all of these tasks need to be completed concurrently. You must ensure you can schedule, monitor, restart, and even chain different tasks. You can perform all these with Google Cloud Composer, a fully-managed version of Apache Airflow.

Cloud composer is identical to Airflow for those already familiar with it, but you do not have to worry about managing the infrastructure.

In this project, we will use Cloud Composer to build a workflow that creates a bucket in Google Cloud Storage (GCS), pulls stock data from yfinance into the bucket, moves the data to BigQuery for analysis, and then deletes the GCS bucket.

The next blog covers implementing a CI/CD approach to building this workflow.

Core concepts

Apache Airflow is an open-source platform for programmatically authoring, scheduling, and monitoring workflows. It is used to automate and manage complex data pipelines and is widely adopted by organizations for big data processing and analysis.

DAG (Directed Acyclic Graph) is a collection of tasks that you want to schedule and run, organized in a way that reflects their relationships and dependencies. DAGs are created in Python scripts, which define the DAG structure (tasks and their dependencies) using code.

A Task is the basic unit of execution in Airflow. Tasks are arranged into DAGs, and then have upstream and downstream dependencies set between them to express the order they should run.

An Operator describes a single task. For example, the BashOperator is used to execute a bash command. Check the documentation for the list of available airflow Operators and Google Cloud Operators.

XCom (Cross-Communication) is a mechanism that let Tasks talk to each other or share data.

You can read more about the concepts in the Concepts documentation.

Necessary tools

Cloud Composer — A managed workflow orchestration service that is built on Apache Airflow. It helps you create, schedule, and monitor data pipelines. With Cloud Composer, you can easily automate complex workflows and manage them from a single platform.

Cloud Storage — A managed service for storing data in GCP. It will be used to store our DAG and stock data.

BigQuery — A fully managed and serverless data warehouse service.

Setting Up

Create a BigQuery Dataset

Tables and views are arranged and accessed using datasets, which are top-level containers in BigQuery.

How to create a dataset

The BigQuery command below creates a dataset in theUS Multi-region named stock_dataset. This is will the stock data will be kept for analysis.

bq --location=US mk -d \
--default_table_expiration 3600 \
--description "This is to store stock data." \
stock_dataset

Define The Workflow

The name of the DAG is Stock_data, and the DAG runs once each day:

with DAG('Stock_data',
start_date=days_ago(1),
schedule_interval="@once",
catchup=False,
default_args=default_args,
tags=["gcs", "bq"]
) as dag:

Because the start_date that is passed into default_args is set to yesterday, Cloud Composer schedules the workflow to start immediately after the DAG uploads.

The DAG defined in this project is made up of five tasks, and each task is defined by an operator.

Task 1 — PythonOperator: The PythonOperator allows you to call a Python function and even pass its parameters.

This task uses the uuid module to generate random numbers which will be added to the GCS bucket name as a suffix to make it globally unique.

Note that the prefix the_demo_ is used before the randomly generated numbers.

generate_uuid = PythonOperator(
task_id="generate_uuid",
python_callable=lambda: "the_demo_" + str(uuid.uuid4()),
)

Task 2 — GCSCreateBucketOperator: This creates a new bucket in Cloud Storage. Here, we are going to create a bucket with the name generated in Task 1. The xcom functionality of airflow is used to pull the bucket name from task 1 to task 2.

create_bucket = GCSCreateBucketOperator(
task_id="create_bucket",
bucket_name="{{ task_instance.xcom_pull('generate_uuid') }}",
project_id=PROJECT_ID,
)

Task 3 — PythonOperator: This operator will be used to download stock data from yfinance and upload it to the GCS bucket created in Task 2. The stock tickers used here are Microsoft, Amazon, and Google.

Note: To match a specific pattern in the bucket name, the Python fnmatch module is used to check the bucket name against the prefix the_demo_, and only buckets that match the pattern are included in the filtered list buckets_with_prefix. The first matching bucket is then selected and the data is uploaded to it.

Be sure that there is just one bucket in your project that matches the specific prefix you choose, to avoid errors in the workflow.

def get_data():
# Tickers list for data extraction from yahoo finance
tickers = ['MSFT','AMZN','GOOGL']

# Set start and end dates
today = dt.datetime.now()
start = dt.datetime(2023, 1, 1,)
end = dt.date(today.year, today.month, today.day)

# API call to download data from yahoo finance
data = yf.download(tickers=tickers, start=start, end=end, interval='1d',)['Adj Close']

# Convert the data to CSV and encode
data = data.to_csv(index=True).encode()

# Create a storage client
storage_client = storage.Client()

# Get a list of all buckets
buckets = list(storage_client.list_buckets())

# Filter the list of buckets to only include those with the desired prefix
buckets_with_prefix = [bucket for bucket in buckets if fnmatch.fnmatch(bucket.name, 'the_demo_*')]

#Choose the first matching bucket to upload the data to
bucket = buckets_with_prefix[0]

# Upload the data to the selected bucket
blob = bucket.blob('stock_data.csv')
blob.upload_from_string(data)
pull_stock_data_to_gcs = PythonOperator(
task_id = pull_stock_data_to_gcs,
python_callable = get_data,
)

Task 4 — GCSToBigQueryOperator: Moves data from GCS to BigQuery. This operator will be used to move the stock data uploaded to the GCS bucket in Task 3 to a BigQuery table. The write disposition is set to WRITE_TRUNCATE which will overwrite any existing data in a table.

load_to_bq = GCSToBigQueryOperator(
task_id = 'load_to_bq',
bucket = "{{ task_instance.xcom_pull('generate_uuid') }}",
source_objects = ['stock_data.csv'],
destination_project_dataset_table = f'{PROJECT_ID}:{STAGING_DATASET}.stock_data_table',
write_disposition='WRITE_TRUNCATE',
source_format = 'csv',
allow_quoted_newlines = 'true',
skip_leading_rows = 1,
schema_fields=[
{'name': 'Date', 'type': 'DATE', 'mode': 'NULLABLE'},
{'name': 'AMZN', 'type': 'FLOAT64', 'mode': 'NULLABLE'},
{'name': 'GOOGL', 'type': 'FLOAT64', 'mode': 'NULLABLE'},
{'name': 'MSFT', 'type': 'FLOAT64', 'mode': 'NULLABLE'},
],
)

Task 5 — GCSDeleteBucketOperator: This operator will be used to delete the GCS bucket we created in Task 2.

delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket",
bucket_name="{{ task_instance.xcom_pull('generate_uuid') }}",
)

Visit the GitHub repo to see the complete code put together.

Create a Composer Environment

To run workflows in Cloud Composer, you first need to create an environment. Airflow depends on many micro-services to run, so Cloud Composer provisions Google Cloud components to run the workflows. These components are collectively known as a Cloud Composer environment.

Cloud Composer environments are based on Cloud Composer images. When you create an environment, you can select an image with a specific Airflow version.

Cloud Composer supports both Airflow 1 and Airflow 2. It has two different versions: Composer 1 and Composer 2. The major difference is that the Composer 2 version has autoscaling environments while the Composer 1 version has manual scaling.

To learn more about the differences, visit the documentation.

For this project, we will use Composer 1 which is selected by default by the Cloud Composer API. To use Composer 2, you must specify an image version.

Also, Cloud Composer environments use the default Compute Engine service account by default. It is recommended that you set up a user-managed service account for your Cloud Composer environment.

Enable the Composer API with the following command:

gcloud services enable composer.googleapis.com

The command below creates a Composer environment named ‘demo-environment’ in us-central1 and a user service account is specified.

gcloud composer environments create demo-environment \
--location us-central1 \
--image-version composer-1.20.5-airflow-2.3.4 \
--service-account "example-account@example-project.iam.gserviceaccount.com"

To learn more about other possible Composer configurations, visit the documentation.

The composer environment takes approximately 25 minutes to provision.

Add Pypi Package Dependency

Once the environment has been created, we need to install a dependency. Our DAG, in Task 3, uses the yfinance API to download the stock data, hence yfinance Pypi package needs to be installed in the Composer environment.

The command below adds yfinance to the Composer environment named demo-environment located in us-central1.

gcloud composer environments update demo-environment \
--location us-central1 \
--update-pypi-package yfinance>=0.2.1

This will take a couple of minutes to update.

Upload DAG to Cloud Storage

Cloud Composer uses a Cloud Storage bucket to store DAGs. The environment synchronizes DAGs from this bucket to Airflow components such as Airflow workers and schedulers.

When an environment is created, Cloud Composer creates a Cloud Storage bucket and associates the bucket with that environment. It schedules only the DAGs that are located in the /dags folder in the environment’s Cloud Storage bucket.

Once the environment is ready, go to the Cloud Composer page. You will see this:

Click the name of the environment (demo-environment) to see its details.

At the top of the page, click on OPEN DAGS FOLDER to upload your DAG python file. You will notice the airflow_monitoring.py file is already there, this was created automatically by the Composer environment for monitoring.

Click on UPLOAD FILES to upload your DAG python file.

Back to the Composer tab, at the top of the page, click on OPEN AIRFLOW UI to open the Airflow web interface.

Then click on DAGs, and notice that the Dag has been picked up and started running because it was scheduled for a day before.

Click on the Stock_data DAG then click on Graph to view details.

The tasks have run successfully. Navigate to BigQuery and confirm that the table containing the stock data is present.

To Understand the AIflow UI better, visit the Airflow UI documentation or Composer documentation.

Conclusion

There you have it, we have successfully built a workflow using Cloud Composer.

While the approach of dropping your Python file in the Cloud Storage bucket may be simple to implement in the beginning, it is not centralized and presents a high risk of errors.

The next blog will cover how to implement a CI/CD approach to building DAGs in Cloud Composer using Cloud Build, which uses version control to test, synchronize, and deploy DAGs.

Thanks for reading. You are welcome to follow me on LinkedIn or Twitter.

Next Steps

--

--