Scaling up Spanner Change Watcher

Knut Olav Løite
Google Cloud - Community
8 min readApr 13, 2021

Google Cloud Spanner Change Watcher is an open-source library for Google Cloud Spanner for watching and publishing changes from a Cloud Spanner database. The library supports different standard implementations for watching tables for changes. The examples in this article require version 1.1.0 or higher of Spanner Change Watcher.

One of the most important aspects when using Spanner Change Watcher is how to keep the queries that watch the table(s) for changes as efficient as possible. Adding a secondary index for the commit timestamp column is one of the most-used options. This comes with the drawback that it can become a write-hotspot as the values of the index will be monotonically increasing. Adding a shard column to the secondary index is a common strategy to mitigate this drawback.

Adding a shard column to an existing database that is already used by one or more application can however be a challenge, as it could also require modifications to the existing applications. This article will show how you could:

  1. Add a sharding column to an existing database without the need to modify any existing applications.
  2. Add a secondary index and use this to efficiently query the table for data modifications.
  3. Add an optional Processed (boolean) column that indicates whether a row has been processed or not, and use this column to remove rows from the secondary index when the rows have been processed.

Add a Computed Shard Column

This example assumes that your database does not already contain a logical shard column. If it does, you could also use that column instead of adding a computed shard column.

A good shard column should contain a value that will ensure that writes are spread evenly across multiple Cloud Spanner servers. The number of distinct values in the shard column should be at least as many as there are servers in your Cloud Spanner instance. Too many distinct values in the shard column is not recommended, as the most efficient way to poll for changes is to include all the possible shard values in the poll queries, or to set up a separate watcher for each shard.

The modulo of a hash value is therefore a good value for a shard column. Cloud Spanner supports multiple hashing functions. This example uses the FARM_FINGERPRINT function, but any of the available hashing functions would do. The number of distinct values is then reduced to 37 by taking the modulo 19 of the hash (the range of shard values is therefore [-18, 18]).

Assume that our database already contains the following table:

Base Singers Table

The following computed shard column can be added to the table definition without the need to alter any existing applications:

Add Computed ShardId Column Based on Full Name

This shard column will automatically be filled by Cloud Spanner for every row in the table without the need to change any existing mutations or DML statements that are generated by existing applications. This example calculates the shard by hashing two data columns, but you could use any combination of columns to calculate a shard as long as the input will be sufficiently different for most rows to generate random hash values.

Add a (Null-Filtered) Secondary Index

We can now add a secondary index for the shard and commit timestamp columns without having to worry about creating a hotspot. We will create the secondary index as a null-filtered index. This will enable us to automatically remove rows from the secondary index that are no longer needed, which can keep the secondary index small for tables that receive only or mostly inserts and no or few updates.

Create Null-Filtered Index for Shard and Commit Timestamp

If either ShardId or LastModified is null for a given row, that row will not be included in the secondary index. ShardId will always be non-null in this example, as it is a computed hash value of the full name of a Singer. We could however choose to let a background task or Cloud Function set LastModified to null for rows that have not been updated recently. This will automatically remove those rows from the secondary index and keep the index smaller.

This is especially efficient for tables where most of the mutations are inserts. The data table will continue to grow, while the index can be kept small by removing entries that were inserted more than a X time ago without the need to remove the actual data.

Use the Index in Spanner Change Watcher

The definition of our Singers table and index is now as shown in the script below:

Complete Table and Index Definition

We'll now set up a table watcher in Spanner Change Watcher that will monitor this table for changes, and force the watcher to use the secondary index that we have created. This is done by setting two values on the watcher:

  1. Set a ShardProvider: A ShardProvider can be used to only watch a specific part (shard) of the table. In this case we'll use the FixedShardProvider which will add a WHERE ShardId IS NOT NULL AND ShardId IN (...)clause to each poll query. In this example we'll set the shard provider to watch all possible shard values, so the list of values will be equal to all integers between -18 and 18 inclusive. It might seem strange to create a ShardProvider that includes all the possible shard values, but this actually helps to generate a well-performing poll query for Cloud Spanner.
  2. Set a table hint: A table hint can be used to force Cloud Spanner to use a specific secondary index for the query.
Example Setup for a Spanner Change Watcher

The above table watcher will poll the Singers table for changes using the secondary index on ShardId and LastModified. The poll query will include the following WHERE clause:

WHERE ShardId IS NOT NULL AND ShardId IN (-18, -17, ..., 17, 18) AND LastModified > @lastCommitTimestamp.

This WHERE clause implies that both ShardId and LastModified must be not null, which is why we also know that we can use our null-filtered index.

Remove Entries from the Secondary Index

We can keep our secondary index that is used by the table watcher small by regularly removing entries from the index that we no longer need. This is done by setting the value of LastModified to null for rows that have not been updated for more than a certain amount of time. It is important that we only do this when we know that these rows will not be updated by some other process at the same time, as that could lead the transactional updates of these rows to not be picked up by the change watcher.

The example script below will set LastModified to null for all rows that have not been updated in the past 24 hours. This script should be executed as Partitioned DML to ensure that it does not exceed the transaction limits of Cloud Spanner.

Use a Processed Column

The above example assumes that it is OK to set LastModified to null for rows that have not been updated for a while. If you want to keep the LastModified value for older rows, you can introduce an additional Processed column that indicates whether an update has been processed and no longer needs to be considered by the table watcher. This Processed column can be used to automatically set the ShardId column to null when the row has been processed. As a null-filtered index will only contain entries where all included columns are non-null, it does not matter whether we set the ShardId column or the commit timestamp column to null. Both will cause the row to be removed from the secondary index.

The following table and index definition will create a table that automatically calculates a ShardId for rows that have not been marked as processed. The index will therefore only contain the rows that have not been marked as processed.

Table Definition with Processed Column

The above table and index definition can be used in combination with a FixedShardProvider. It does however require that existing update statements are modified to also update the Processed column to FALSE (or null). Existing insert statements do not need to be modified, as these will insert null into the Processed column. The CASE WHEN Processedclause will treat both null and FALSE as false values, and will set the ShardId to non-null.

Setting up a Change Watcher

The change watcher is set up in the exact same way for this table as for the version that did not include the Processed column.

The table watcher will not consider any rows where ShardId is null, and it will do so efficiently as it can use the secondary index that only contains the relevant rows.

Remove Entries from the Secondary Index

Removing entries from the secondary index that are no longer needed is slightly different from the previous example. Entries are now removed by setting the Processed column to true. This will automatically update the ShardId column to null. The LastModified column will remain intact.

Query for marking rows Processed

Benchmark Application

The Cloud Spanner Change Watcher samples directory also includes a small benchmark application that can be used to test and benchmark different configurations. The default configuration of the benchmark application is the recommended base configuration for change watchers that must process a high write throughput, and is equal to the example in this article. This configuration is able to handle approximately 3,000–5,000 mutations per second per table on a single node Cloud Spanner instance.

You can change the input parameters of the Benchmark Application to try out different configurations on your own Cloud Spanner instance. More information on the Benchmark Application can be found in this article.

Conclusion

Creating a secondary index on a commit timestamp column of a table that receives a large number of write operations, can create a hotspot. Adding a shard column as the first column of the index can mitigate that. If your database does not contain a logical shard column, you could add one as a computed column that calculates a hash value of one or more of the data columns in the table. Computed columns will be automatically filled by Cloud Spanner, which means that you do not need to alter any of your existing applications in order to use such a column.

Creating a secondary index on the shard and commit timestamp columns as a null-filtered index, also enables the possibility of removing old entries that are no longer needed from the secondary index by setting one of the values in the index to null. The above examples shows how this can be achieved by setting the commit timestamp value to null.

If you however want to preserve the commit timestamp values for old entries, you could also introduce a separate Processed column that indicates whether the row has been processed or not. This column can be used to automatically set the computed shard column to null, which also automatically removes the row from the secondary index.

--

--