From Pub/Sub to BigQuery: Streaming Data in Near Real Time

Raevat Kumar
Google Cloud - Community
3 min readDec 8, 2022

In this post we’ll use a Dataproc Serverless Template to stream data from Pub/Sub to Big Query.

Cloud Dataproc is the way to go if you want to move your current Hadoop/Spark cluster to the cloud or if you want to take advantage of the abundance of skilled Hadoop/Spark engineers on the market and what’s better than an existing template that you can extend as per your use case .

Before we get go into profundity of the process , let’s understand how our medium ( Dataproc ) works :

It separates compute and storage. Assume an external application sends you logs that you want to analyse, and you save them to a data source. Dataproc uses data from Cloud Storage (GCS) for processing before storing it back in GCS, BigQuery, or Bigtable. Additionally, you could use the information for analysis in a notebook and send logs to cloud logging and monitoring.

Long-lived clusters can have clusters per job because storage is isolated, but to reduce costs you can use ephemeral clusters that are grouped together and selected by label. To fit the demands of your application, you can also use the appropriate combination of memory, CPU, and disk.

So, let’s get started…

# This program prints Hello, world!

print('Hello, world!')

Just kidding , we’re already past the good old program ;)

Source Setup

1. Create Pub/sub topic

First we need to create a Pub/sub topic. We are going to use Cloud shell to create this.

gcloud pubsub topics create DataprocBlog

2. Create a subscription to the topic

Then we also need to create a subscription so we can receive data from the topic

gcloud pubsub subscriptions create --topic DataprocBlog yourSubscrption

You can check in the Pub/sub console view to verify that the topic and the subscription both exist.

Prepare the target

1. Create BigQuery table to store the streaming data

bq mk --dataset $DEVSHELL_PROJECT_ID:DataprocBlog

2. Create bucket for staging / temporary landing zone

gsutil mb gs://$DEVSHELL_PROJECT_ID

Onboard the Dataproc template

  1. If you intend to use the “default” VPC Network generated by GCP, make sure you have enabled the subnet with Private Google Access. You must still enable private access as described below.(Refer here for details)
gcloud compute networks subnets update default - region=us-central1 - enable-private-ip-google-access

2. Clone git repo in a cloud shell which is pre-installed with various tools. Alternatively use any machine pre-installed with JDK 8+, Maven and Git.

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git cd dataproc-templates/java

3. Obtain authentication credentials to submit the job.

gcloud auth application-default login

4. Execute PubSubToBQ.java template.

General Execution

GCP_PROJECT=<gcp-project-id> \
REGION=<region> \
SUBNET=<subnet> \
GCS_STAGING_LOCATION=<gcs-staging-bucket-folder>
# ID of Dataproc cluster running permanent history server to access historic logs.
#export HISTORY_SERVER_CLUSTER=<gcp-project-dataproc-history-server-id>


bin/start.sh \
-- --template PUBSUBTOBQ \
--templateProperty pubsub.input.project.id=<pubsub project id> \
--templateProperty pubsub.input.subscription=<pubsub subscription> \
--templateProperty pubsub.bq.output.dataset=<bq output dataset> \
--templateProperty pubsub.bq.output.table=<bq output table>

Configurable Parameters

Following properties are available in command line or template.properties file:

## Project that contains the input Pub/Sub subscription to be read
pubsub.input.project.id=<pubsub project id>
## PubSub subscription name
pubsub.input.subscription=<pubsub subscription>
## Stream timeout, for how long the subscription will be read
pubsub.timeout.ms=60000
## Streaming duration, how often wil writes to BQ be triggered
pubsub.streaming.duration.seconds=15
## Number of streams that will read from Pub/Sub subscription in parallel
pubsub.total.receivers=5
## Project that contains the output table
pubsub.bq.output.project.id=<pubsub to bq output project id>
## BigQuery output dataset
pubsub.bq.output.dataset=<bq output dataset>
## BigQuery output table
pubsub.bq.output.table=<bq output table>
## Number of records to be written per message to BigQuery
pubsub.bq.batch.size=1000

NOTE: It will ask you to enable Dataproc Api, if not enabled already.

And we’re done with our plain sailing process to stream data from Pub/Sub to Big Query.

Stay tuned & kindly follow for more such blogs :)

References
https://medium.com/google-cloud/cloud-spanner-export-query-results-using-dataproc-serverless-6f2f65b583a4
https://cloud.google.com/pubsub/docs/overview
https://github.com/GoogleCloudPlatform/dataproc-templates

--

--