Apache Beam: Reading from S3 and writing to BigQuery

In this article we look at how we can use Apache Beam to extract data from AWS S3 (or Google Cloud Storage), run some aggregations over the data and store the result in BigQuery. In most use cases I would recommend storing and loading your data directly from Google Cloud Storage as you avoid paying for inter-cloud network egress charges, but this approach can be useful if we are performing a one-off data migration or operating a multi-cloud environment.

Before we begin, let’s take a brief look at Apache Beam and Cloud Dataflow.

Apache Beam is an open source, unified model for running both batch and stream processing pipelines. This allows us to perform a variety of data processing tasks, including:

  • Extract, Transform, and Load tasks
  • Real-time data analysis
  • Data migration and replication

When it comes to running Apache Beam pipelines, Cloud Dataflow is my preference as it handles all of the infrastructure provisioning, horizontal scaling and work rebalancing for us. This makes it an incredibly performant and cost efficient environment for running our pipelines.


Running the example

The goal of the example code is to calculate the total amount transferred for each user_id in the transfers_july.csv shown in figure 1.

Figure 1: transfers_july.csv located in the root of the GitHub project

This is purely fictitious example that covers the following steps:

  • Reading a CSV file from AWS S3
  • Converting the CSV file into a Java Object
  • Creating key, value pairs where user_id is the key and amount is the value
  • Calculating the total amount transferred by each user_id
  • Writing the result to BigQuery

Let’s step through running the Apache Beam pipeline on Cloud Dataflow. N.B. when creating your storage buckets, BigQuery datasets, and running your Cloud Dataflow job, make sure that you place them in the same region.

Setting up your environment:

  1. Pre-requisites:
  • Create a Google Cloud Platform and AWS account if you do not have one already. Make note of your Google Cloud Platform project name.
  • Install Maven 3, Java 8 and the Git CLI

2. Clone the code to your machine:

git clone git@github.com:asaharland/beam-pipeline-examples.git

Configuring your Google Cloud Platform project:

  1. Ensure that you have billing enabled for your project
  2. Enable the following Google Cloud Platform APIs: Cloud Dataflow, Compute Engine, Stackdriver Logging, Google Cloud Storage, Google Cloud Storage JSON, BigQuery, Google Cloud Pub/Sub, Google Cloud Datastore, and Google Cloud Resource Manager APIs.
  3. Create a Google Cloud Storage bucket to stage your Cloud Dataflow code. Make sure you note the bucket name as you will need it later.
  4. Create a BigQuery dataset called finance. Keep note of the fully qualified dataset name which is in the format projectName:finance

Running Cloud Dataflow to read from AWS S3:

  1. Create a S3 bucket in your AWS project and make note of the bucket name.
  2. Upload the file transfers_july.csv to your S3 bucket. This file can be found in the root of the GitHub repo you downloaded
  3. Download your Access Key and Secret Key from AWS IAM.
  4. Run the following command from the root of the beam-pipeline-examples project, substituting in the values you generated in the previous steps:
mvn compile exec:java \
-Dexec.mainClass=com.harland.example.batch.BigQueryImportPipeline \
-Dexec.args="--project=<GCP PROJECT ID> \
--bucketUrl=s3://<S3 BUCKET NAME> \
--awsRegion=eu-west-1 \
--bqTableName=<BIGQUERY TABLE e.g. project:finance.transactions> \
--awsAccessKey=<YOUR ACCESS KEY> \
--awsSecretKey=<YOUR SECRET KEY> \
--runner=DataflowRunner \
--region=europe-west1 \
--stagingLocation=gs://<DATAFLOW BUCKET>/stage/ \
--tempLocation=gs://<DATAFLOW BUCKET>/temp/"

Viewing your pipeline

If your pipeline has been deployed successfully, you should see the following graph in Cloud Dataflow:

Cloud Dataflow graph showing our Cloud Dataflow Pipeline

Once your pipeline has finished running, you will be able to view the result in BigQuery:

Total amount transferred by user_id as viewed in BigQuery

And that’s it! Hopefully that was a useful and please leave a comment if you have any questions.

OPTIONAL: Running Cloud Dataflow to read from Cloud Storage:

  1. Create a Cloud Storage bucket in your Google Cloud Project
  2. Upload the file transfers_july.csv to your Cloud Storage bucket. This file can be found in the root of the GitHub repo you downloaded
  3. Run the following command from the root of the beam-pipeline-examples project. As we are running this from Cloud Dataflow, you do not need to configure access credentials:
mvn compile exec:java \
-Dexec.mainClass=com.harland.example.batch.BigQueryImportPipeline \
-Dexec.args="--project=<GCP PROJECT ID> \
--bucketUrl=gs://<GCS BUCKET NAME> \
--bqTableName=<BIGQUERY TABLE e.g. project:finance.transactions> \
--runner=DataflowRunner \
--region=europe-west1 \
--stagingLocation=gs://<DATAFLOW BUCKET>/stage/ \
--tempLocation=gs://<DATAFLOW BUCKET>/temp/"