Stream your Data in Google Cloud Spanner to BigQuery with Dataflow

Gopi Yudoyo
tiket.com
Published in
7 min readOct 25, 2022
Photo by Tobias Carlsson on Unsplash

As business grows, company will improve their key features and services to gain more customers and enhance the customers’ satisfaction; by investing in data analytics to either in infrastructure or manpower, to inspect and analyze the captured data, to summarize the customer behaviors in a fast paced manner and make improvement of the service consequently.

Google Cloud Spanner

As the number of customers grows so does the transaction data,. Therefore, the transaction data should be stored in a Scalable SQL database service with distributed computing and high availability, and all of that could be full-filled by the Google Cloud Spanner.

Google Cloud BigQuery

We also need another database service with the capability of in-depth analytics, to separate the needs of production database and the needs of analytics database. BigQuery is a fully-managed, serverless data warehouse which enables scalable analysis over petabytes of data.

Google Cloud Dataflow

In order to connect both of the database services, we need ETL (Extract Transform Load) tools with a capability of data batching or even data streaming to support real time analytics. A data pipeline is required to move data from one database to another, and to balanced the scalability of both database services. A unified stream and batch data processing which is serverless, fast, and cost-effective can be found in Google Cloud Dataflow.

Before going any further, please explore and learn about the Apache Beam Programming to have a better understanding.

What I will cover in this article:

  • Understanding of architecture and approach.
  • Comprehensive explanation of each process in the code.

What I won’t cover in this article:

  • Apache Beam Programming guidelines.
  • Installation, deployment, monitoring, security, and administration of each services.

Spanner Change Streams

In this case we will use the new feature provided by Google Cloud Spanner called Change streams, it will watch and capture all the changes within the database in real-time streams.

A single change that captured by the Change streams will be kept in a single file called DataChangeRecord; it will hold information like transaction id, committed timestamp, modification type, keys, new values, old values, etc. Each record can be retained up to a maximum of seven days.

DataChangeRecord

In this example, we are going to create the change stream object within database which will monitor table payments and payment_details with retention period of 3 days.

CREATE CHANGE STREAM PaymentsChangeStream FOR payments, payments_details OPTIONS( retention_period = '3d' );

Approach and Architecture

In a nutshell, we will gather all the changes in a span of seconds, group all the keysJson based on tableName, query it back to the Cloud Spanner to get the complete columns and rows, and finally, write the results to the BigQuery.
This approach is implemented based on the following considerations.

  • Limitation of the DataChangeRecord is when updating the modification type, it only stream the changes, not the entire columns of row.
  • To reduce the number of connections that created by Dataflow to Cloud Spanner.
Architecture of the pipeline

Build the pipeline

Based on the approach and the architecture, we will create pipeline that streams all the table inside the PaymentsChangeStream, which means it will dynamically group all the keys based on the tableName from the DataChangeRecord, and map it to the destination table to be recorded into the BigQuery.

Prepare the schema
To write the results to BigQuery, it needs destination table along with table schema, therefore we need to prepare the schema for each table, put it in Cloud Storage and name it according to the table name in a json file.

Later, we can load all the schema from Cloud Storage to Map object with table name as key, and the json string contained with schema as value.

Connect and initialize Spanner Change Stream
Now connect and initialize the spanner change stream with these following steps.

  • Create the pipeline with options as configuration of dataflow and parameter as example are tables name, spanner project id, spanner database id etc.
  • Load schema to PCollectionView from map object with the same type of key and value so we don’t need to load from Cloud Storage every write to BigQuery and we will use it as side input in write to BigQuery task.
  • Set the inclusive timestamp with maximum of -3 days retention of DataChangeRecord based on when change stream created in spanner database, in this case we use -3 hours.
  • Initialize Spanner Change Stream by applying pipeline with SpannerIO.readChangeStream() followed by SpannerConfig to select the project id, instance id, and database id and change stream config such as metadata instance, metadata database, change stream name, and inclusive timestamp.

Transform DataChangeRecord element
Before we group DataChangeRecord element in a span of seconds, we need to transform it to an object with attributes that we need, because we only need some of the attributes such as serverTransactionId, tableName, modType, commitTimestamp, key, value, and data type of the key.

  • ChangeStream class to store the ServerTransactionId, tableName, modType, commitTimestamp.
  • ChangeStreamValues class to store key, value, and data type of the key as part of ChangeStream object. This class is necessary to accommodate table with composite key.
Class ChangeStream to store ServerTransactionId, tableName, modType, commitTimestamp
Class ChangeStreamValues to store key, value, and key type

and then we need to break down DataChangeRecord element and transform it into object into ChangeStream object with ParDo transformation.

  • Breakdown Mod object to collect key and value of the data.
  • Breakdown ColumnType to collect data type of the key.
  • Store key, value and data type of key into ChangeStreamValues object.
  • Get transaction id, table name, mod type, and commit timestamp.
  • Store it into ChangeStream object with the ChangeStreamValues object converted into string.

Group elements by span of second(s)
To group elements, we need to apply window after the transformation and grouped it by key (table name). By this case we use 10 seconds window.

Generate Query
After we collected List of ChangeStream objects by span of seconds, we will generate query by combining 10 keys or composite keys, of the table in a single query and stream it as output element. to do that we need new class to store table name as key and query as output element.

  • Create class ChangeStreamQueries
  • Prepare the necessary accumulator object on input object type ChangeStream with combineFn transformation.
  • Do nested iteration with list tables and then ChangeStream object and then ChangeStreamValues.
  • and shape the query from attribute values from objects into query with query template SELECT * FROM %s WHERE %s
for single key : SELECT * FROM payments WHERE (payment_id = 111) OR (payment_id = 222) ...for composite key : SELECT * FROM payments WHERE (payment_id = 111 AND item_id = 111) OR (payment_id = 222 AND item_id = 222) ...
  • store table name and query into object ChangeStreamQueries and return it as string from extractOutput method.

The table name that we use as key is to map the results to destination table and which table schema that it use to write to table in BigQuery.

Query back to Spanner
After query is generated, the query will be executed by the spanner client to get complete results of the data.

  • setup Spanner Client connection configuration covering the project id, instance id, database id, and session pool to create the database client.
  • prepare the objects of String List to store each of data type.
  • Load the schema from PCollectionView that we created earlier of this pipeline, to map the data type from query result.
Map<String, String> schemaMap = Collections.unmodifiableMap(c.sideInput(schemaPathView));
  • execute query.
  • Map the query results with loaded schema and select the get method from ResultSet based on objects of String List.
  • Load each data into TableRow object and process it into output.

Here is the complete code of execute query task of the pipeline.

Write the result to BigQuery
In order to write the result we will use BigQueryIO to apply in the last task of the pipeline, and in that task we need initiate the destination table and schema.

  • get Schema from PCollectionView that we create earlier of this pipeline, and use it as a side input.
  • get Destination or table name by get value of TableName column.
  • create TableDestination object based on destination.
  • create TableSchema object from side input based on destination.
  • and add the write configuration, such as create disposition, write disposition, auto sharding, optimized query, and ignore unknown values.

Run the Pipeline
Last but not least, after we built all the required tasks to stream data from Cloud Spanner to BigQuery, we can now run the pipeline.

pipeline.run();
Dataflow Pipeline
Dataflow Job Metrics : BigQuery Output Request per sec
Streaming buffer information in BigQuery table spanner.payments

click here for the complete code of the pipeline.

Conclusion

Having an understanding on the approach and how each of the process works, we can make a pipeline from Spanner to BigQuery or any other sources and destinations with Dataflow.

Thanks for reading! I hope you enjoyed it.
Feel free to post some comments if you have any questions, or if you just want to say hi.

--

--