Stream data from Pub/Sub to Cloud Storage using Dataproc Serverless
Google Cloud Pub/Sub is used for streaming analytics and data integration pipelines to ingest and distribute data. In this post we will explore how to stream data from a Pub/Sub topic to Google Cloud Storage using Dataproc Serverless.
Key Benefits
- Use Dataproc Serverless to run Spark batch workloads without provisioning and managing your own cluster.
- PubSubToGCS template is open source, fully customisable and ready to use for simple jobs.
- You can stream data from Pub/Sub to GCS in AVRO, and JSON formats.
Basic Usage
- Ensure you have enabled the subnet with Private Google Access, if you are going to use “default” VPC Network generated by GCP. Still you will need to enable private access as below.
gcloud compute networks subnets update default — region=us-central1 — enable-private-ip-google-access
2. Create a GCS bucket and a staging folder for jar files.
3. Create a Pub/Sub topic with a schema(use binary serialisation for streaming AVRO data) . Also create a subscription for this pub/sub topic.
4. Clone git repo in a cloud shell which is pre-installed with various tools. Alternatively you may use any machine pre-installed with JDK 8+, Maven and Git.
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git cd dataproc-templates/java
5. Obtain authentication credentials (to submit the Dataproc job).
gcloud auth application-default login
6. Execute Pub/Sub to GCS Dataproc Serverless template:
export GCP_PROJECT=my-gcp-project-id
export REGION=project-region
export GCS_BUCKET=gcs-bucket-name
export GCS_STAGING_LOCATION=gs://gcs-bucket-name/tempexport SUBNET=projects/$GCP_PROJECT/regions/$REGION/subnetworks/NAME
export SUBSCRIPTION=pubsub-subscription-name
# DATA_FORMAT can be either JSON or AVRO
export DATA_FORMAT=JSON
export BATCH_SIZE=50bin/start.sh \
-- \
--template PUBSUBTOGCS \
--templateProperty pubsubtogcs.input.project.id=$GCP_PROJECT \
--templateProperty pubsubtogcs.input.subscription=$SUBSCRIPTION \
--templateProperty pubsubtogcs.gcs.output.project.id=$GCP_PROJECT \
--templateProperty pubsubtogcs.gcs.bucket.name=$GCS_BUCKET \
--templateProperty pubsubtogcs.gcs.output.data.format=$DATA_FORMAT \
--templateProperty pubsubtogcs.batch.size=$BATCH_SIZE \
NOTE: It will ask you to enable Dataproc Api, if not enabled already.
Setting additional Spark properties
In case you need to specify spark properties supported by Dataproc Serverless like adjust the number of drivers, cores, executors etc. You can edit the OPT_PROPERTIES values in start.sh file.
References
https://medium.com/google-cloud/cloud-spanner-export-query-results-using-dataproc-serverless-6f2f65b583a4
https://cloud.google.com/pubsub/docs/overview
https://github.com/GoogleCloudPlatform/dataproc-templates