Syncing data from PostgreSQL to Snowflake with Debezium CDC pipelines

Tianyao Zhang
motive-eng
Published in
18 min readOct 13, 2023

In this blog post, we’ll talk about how the Data Platform team at Motive built the Debezium change data capture (CDC) pipelines to sync data from our main application database (PostgreSQL) into Snowflake to power downstream analytical use cases.

Why we did it

We have been using Fivetran to sync data from dozens of different sources (PostgreSQL, Salesforce, adhoc CSVs, spreadsheets etc.) into Snowflake to power our analytical use cases. For most data sources Fivetran has been working great. Fivetran provides no-code connectors that can be configured by any data analyst and allows us to accelerate our data engineering.

Overtime, the Fivetran PostgreSQL connector became a central piece of infrastructure that powered our customer features built from Snowflake. As the company keeps growing and the traffic to our main application database keeps growing, we encountered some pain points with Fivetran PostgreSQL connector:

  • Poor visibility into the sync progress: Lack of metrics indicating the sync delay between the source tables in Postgres and the target tables in Snowflake. The only indicator Fivetran provides is a progress bar on the connector landing page, which doesn’t tell us much about the sync status for individual tables. Sometimes the sync for some tables were significantly delayed even though the connector progress bar was still green. Since those tables in Snowflake serve as the source tables for the downstream tables in use by business and engineering, unexpected sync delay and lack of alerts can influence the data quality of the downstream tables. Fivetran provides a connector to sync Fivetran connector log events to Snowflake, but those events don’t tell us about the lag at the table level. In addition, it requires considerable engineering effort to build a monitoring and alerting system around this, which is less ideal.

Figure 1 below shows what a fivetran connector progress bar looks like.

Figure 1
  • Lack of alerting: We integrated Fivetran connector status API into an internal DAG (airflow Directed Acyclic Graph) to periodically check connector status and alert us when a connector becomes unhealthy. However, as mentioned earlier the connector status doesn’t tell us much about the sync progress on individual table level, and sometimes incorrectly shows as being healthy when the sync of some tables are known to be significantly delayed.
  • Low granularity of control on sync process: The sync process is all or nothing, if even one table fails in the sync then the connector is stuck.
  • Slow turnaround time for support: We had some severe outages with the Fivetran connectors and the turnaround time for support was slow. During those times our data sync was broken and we couldn’t do anything. Fivetran has improved their SLAs over time, but considering how critical the sync is for our customers, we wanted more control over this process.

To address the pain points we encountered, we made the decision in early 2022 to develop an in-house solution as a replacement for the Fivetran PostgreSQL connector. This decision stemmed from our desire to mitigate the challenges we faced and gain more control over our data synchronization processes.

How we did it

We wanted to tackle the pain points we had with Fivetran

  • Improved visibility of sync progress at the individual table level: We aimed to enhance our monitoring capabilities by providing precise information on the sync lag for each table. With this improved visibility, we can accurately assess the synchronization status and identify any potential delays or inconsistencies within our data pipelines.
  • Enhanced alerting mechanisms: Building upon the improved visibility, we implemented robust alerting systems. By setting up alerts based on the exact sync lag of each table, we can promptly detect abnormal increases in sync delays. This proactive alerting will enable us to take immediate action and mitigate any potential data quality issues arising from synchronization delays.
  • Greater control over the end-to-end sync process: By transitioning to an in-house solution, we aim to empower us with more control and autonomy, allowing us to make necessary modifications, optimizations, and updates to the sync process as per our specific requirements and timeline.

High-level architecture

Debezium is a widely used open-source Kafka connector for change data capture. Given our experience with Kafka for other use cases, adopting Debezium was a natural choice to achieve our goal. The concept is straightforward: we utilize Debezium PostgreSQL connectors to stream change events from tables in our main application databases into Kafka topics. Subsequently, we develop custom change event consumers to process these events and synchronize data from PostgreSQL to Snowflake tables. Figure 2 illustrates the high-level architecture of the system.

Figure 2

While the high-level architecture may appear simple and straightforward, the implementation of each component entails a lot of details, interesting learnings, and trade-offs that we encountered. In the following discussion, we will highlight the noteworthy aspects of each component and share the valuable insights we gained throughout the process.

Configure Debezium connectors based on use cases

We use the Confluent cloud-managed Kafka cluster. Confluent saves us the effort of managing a production-ready Kafka cluster so that we can focus on configuring the connectors based on our needs. Here are the configuration properties exposed by Confluent cloud-managed Debezium PostgreSQL connectors. Not all the configuration properties of the open-source version of Debezium PostgreSQL connectors are exposed by Confluent Cloud.

Most of the configuration properties are straightforward and self-explanatory. Several important configuration parameters are worth mentioning, and we’ll discuss how we configured them in this section:

  1. output.data.format and output.key.format, they determine the serialization format that’ll be used to serialize the key and value of change data events before storing them into Kafka topics, we use AVRO serialization format
  2. cleanup.policy, it determines the clean-up policy for the topics created. By default it’s “delete”, we use “compact” to turn on log compaction for those topics.
  3. snapshot.mode determines how the connector performs a consistent snapshot against the tables when it starts up. Note that confluent-cloud allows users to choose from “initial”, “never” and “exported”. We used the “never” mode (reason discussed later).
  4. signal.data.collection, signaling table is a powerful feature that allows us to trigger incremental snapshots for any tables at any time.

Debezium connector allows us to choose one of JSON, JSON_SR, AVRO or PROTOBUF as the serialization format. For efficiency, we ruled out JSON for storing billions of change data records into Kafka. It’s better to choose a serialization format that can compact the message size so that we can save on both bandwidth and storage. JSON format, though human readable and easy to work with, is not as compact as AVRO or PROTOBUF. Also, with JSON format the schemas of the change data records are embedded in the message itself. A sample record would look like this:

{
"schema": {

},
"payload": {

},
"schema": {

},
"payload": {

},
}

The same schema info is stored in each record, consuming unnecessary extra storage.

JSON_SR is an extended version of JSON that integrates with the schema registry. This integration allows the schema information to be stored separately, eliminating redundant embedding in each record and improving data efficiency. However, JSON_SR also lacks built-in record compression, making it less suitable for scenarios where data compression is essential.

AVRO and PROTOBUF both offer record compression in binary format and support integration with the schema registry. Despite this commonality, AVRO was chosen over PROTOBUF due to our use of Golang to implement consumers. Figure 3 illustrates the schema registry’s functioning. Once the connector’s output.data.format is configured to a schema-supported format (e.g., Protobuf, Avro, JSON_SR), change events captured by the connector undergo serialization by the corresponding converter (e.g., ProtobufConverter, AvroConverter, JsonSchemaConverter) before being stored in Kafka. The converter also attempts to register the message schema in the schema registry and embeds the schema ID in the message following the wire format. To enable correct message deserialization, the consumer must access the appropriate schema from the schema registry using the schema ID provided in the message.

Figure 3

Image Source: https://docs.confluent.io/platform/current/schema-registry/fundamentals/index.html#how-it-works

​​In Golang, there is no built-in deserialization support. While feature requests have been made for deserialization in packages like segmentio/kafka-go (the Kafka Go package used at Motive) and confluentinc/confluent-kafka-go (the Kafka Go package provided by Confluent), none have been implemented. After investigation, we discovered the srclient package, which facilitates programmatically retrieving AVRO/JSON_SR schemas from the schema registry (though it does not support PROTOBUF schema retrieval). Leveraging this, we successfully implemented our own AvroDeserializer, making AVRO the chosen serialization format.

We opted for AVRO as the serialization format for storing change event messages in Kafka topics. Instead of setting a data retention time, we chose to enable log compaction for these topics. With log compaction, the key of a change event message is the primary key’s value at the event’s creation time. This allows us to always retain the last known value for each record in the corresponding topic, effectively maintaining a full snapshot of the latest table state in Kafka.

By utilizing log compaction, we gain redundancy and flexibility. In the event of rebuilding the synced table in Snowflake, we can simply replay the consumer from the topic’s start without needing to snapshot the table in PostgreSQL. This approach offers an additional layer of data resilience and adaptability. Figure 4 illustrates how log compaction works.

Figure 4

Image Source: http://cloudurable.com/blog/kafka-architecture-log-compaction/index.html

We have enabled log compaction for the topics managed by Debezium connectors. To ensure full synchronization of a PostgreSQL table to Snowflake, we must snapshot the table at least once, capturing the latest record values. By default, Debezium connectors use the “initial” snapshot.mode, which performs an initial scan of each table before processing the change data generated from the Write-Ahead Log (WAL). However, this approach has several limitations:

  1. Large tables can lead to time-consuming snapshot scans.
  2. The WAL file might be backed up during the extensive initial snapshot process, potentially impacting the PostgreSQL database by consuming disk space.
  3. The “initial” mode does not snapshot newly added tables to the publication, making it impossible to establish an initial state for these tables.

Fortunately, Debezium V1.9 introduced an adhoc snapshot feature that addresses these limitations. By employing a signaling table, we can trigger adhoc snapshots for any table whenever necessary. The snapshot process becomes incremental, chunking the table’s snapshot while streaming live change events generated by the WAL. Collisions between snapshotted data and live change events are resolved behind the scenes.

Given this feature, we decided to set the snapshot.mode to “never” for our connectors, fully relying on the signaling table to trigger incremental snapshots when:

  1. Starting a connector for the first time.
  2. Adding new tables to the publication used by a connector.

Determine the data type mappings from source to destination

Before delving into the implementation of our consumers, it’s crucial to discuss the foundation upon which they were built — the mapping of data types from the source system to the destination system. For instance, we need to determine how the INT type in PostgreSQL should be mapped to the corresponding data type in Snowflake.

These mappings are largely dependent on downstream use cases. In rare scenarios, we might opt to map all data types in PostgreSQL to VARCHAR types in Snowflake, This approach involves syncing literal values as VARCHAR into Snowflake, provided that downstream users do not require specific data types.

While PostgreSQL data types may not always have a direct counterpart in Snowflake, our objective is to map them as reasonably as possible. This can be a complex task, considering the data passes through multiple intermediate systems & processes, and the types might be represented differently in each of them. Figure 5 shows the data flow in our case.

Figure 5

Let’s begin with a simple example: mapping the PostgreSQL INT type to the Snowflake NUMBER type. In this case, Snowflake considers INT, INTEGER, BIGINT, SMALLINT, TINYINT, and BYTEINT as synonymous with NUMBER. The process follows these steps:

  1. Debezium connector maps the PostgreSQL INT type to the INT32 literal type and serializes it to AVRO format as int32 AVRO type.
  2. In the Golang consumer, upon message consumption and deserialization, the int32 AVRO type is converted to the int32 Golang type.
  3. Finally, the Golang consumer stores the value into Snowflake as the NUMBER type, without requiring any further conversion.

Let’s explore another example that is more complex — mapping the JSON/JSONB PostgreSQL type to the closest analogy in Snowflake, the VARIANT type. The process involves several steps:

  1. The Debezium connector maps the JSON/JSONB PostgreSQL type to the STRING literal type and serializes it to AVRO format as a string AVRO type.
  2. In the Golang consumer, upon message consumption and deserialization, the string AVRO type is converted to the string Golang type.

Now arises a question — why should we store the string as a VARIANT in Snowflake, when the closest analogy in Snowflake to a string in Golang is VARCHAR?

The answer lies in the additional meta-information provided by Debezium in the message schemas (AVRO schemas, in our case). These extra details are known as “debezium semantic types”. You can find more information about them in the Data type mappings section. For the JSON/JSONB PostgreSQL type, the corresponding field in the message’s AVRO schema would look like this:

{
"default":null,
"name":"dummy_column_name",
"type":[
"null",
{
"connect.name":"io.debezium.data.Json",
"connect.version":1,
"type":"string"
}
]
},

Pay attention to the field “connect.name,” with the value “io.debezium.data.json.” This indicates the Debezium semantic type of the JSON/JSONB PostgreSQL type. Armed with this meta-information, we can deduce that this value should be stored as a Snowflake VARIANT type.

Another interesting challenge we encountered was the handling of PostgreSQL TOAST values, for varying length types like varchar, text, json/jsonb, bytea, and array. Debezium places “__debezium_unavailable_value” as the value in an update event if the column is not updated. However, an issue arises with Debezium when handling PostgreSQL types like varchar[], as it cannot serialize a string into an array type (e.g., the avro type for a PostgreSQL array column would be avro array, and the serialization would fail). Consequently, the connector fails to generate and send the change event messages to the corresponding topic.

To overcome this problem, we resolved to set the REPLICA IDENTITY to FULL for PostgreSQL tables with columns of these types. This allows us to bypass the serialization issue and ensure the successful generation and transmission of change event messages.

There are other PostgreSQL types with corresponding Debezium semantic types, and we won’t cover them individually. You are responsible for determining how the types will be mapped from PostgreSQL to Snowflake. Once you have decided on the type mappings, create a table similar to the following (part of the table we compiled):

This table serves as documentation for communication with downstream users and as a reference document when implementing consumer logic to parse/process change event messages based on the combination of Debezium semantic types and AVRO types.

Combine stream processing with batch processing

Having covered the foundation for implementing our consumers, let’s explore how they function. Before diving into the details, let’s examine a deserialized sample change event message to understand its structure:

key:

{
"id": 1
}

value:

{
"before":null,
"after":{
"training_db_complex_avro_3.public.data_sets.Value":{
"id":1,
"name":{
"string":"test1"
},
"username":{
"string":"john"
},
"created_at":{
"long":1621962657714832
},
"comment":{
"string":"test2"
},
"test_column":{
"string":"hello"
},
"test_column_2":{
"string":"world"
}
}
},
"op":"c"
}

The “op” field indicates the operation (“c”: create event, “u”: update event, “d”: delete event, “r”: read event). The “before” and “after” fields hold the record values before and after the operation. We are interested only in the “after” values, as they represent the actual record values after the operation.

Translating this change event message into an insert/update/delete DML query in Snowflake seems like a simple solution, but it works efficiently only for tables in PostgreSQL with a low rate of insert/delete/update operations. When dealing with tables that receive hundreds of such operations per second, executing DML queries one by one can lead to a significant lag between the consumer’s processing rate and the production rate of change event messages. This approach doesn’t scale well when dealing with several billion change event messages per month.

Instead of individually processing each change event message, we implemented our consumers to work in batches using Snowflake bulk data loading and Snowflake merge operations. This approach allows for more efficient handling of large amounts of data. Figure 6 illustrates how our consumer operates.

Figure 6

The consumer handles deleted records in PostgreSQL through soft deletion in Snowflake, by setting an additional internal column called _deleted to True. This approach treats insert/delete/update operations on PostgreSQL as insert/update operations for consumers, allowing us to use snowflake merge operations.

The consumer maintains an internal buffer for change event messages from its corresponding Kafka topic, processing the messages when either the configured buffer size (batch_size) or the configured maximum wait time interval between each batch (batch_timeout) is reached.

When processing change event messages in the buffer:

  1. The consumer deduplicates messages based on key values, keeping only the latest message for each key to reduce data volume and avoid Nondeterministic Results for MERGE operations.
  2. The consumer parses each message and stores its values as a row in a local CSV file.
  3. The CSV file is loaded into a Snowflake internal stage using a PUT operation.
  4. The consumer checks for schema evolution and updates the destination table schema if necessary.
  5. A transient table is created in Snowflake, and the CSV file is loaded into it using the COPY statement.
  6. The consumer performs a MERGE operation to upsert the data from the transient table into the destination table.
  7. The offset of the last message in the batch is committed if all the previous steps are successful.
  8. If any of the previous steps fails, the consumer will fail and restart from the last committed offset, effectively re-processing the failed batch. This approach can help resolve any retryable errors, such as timeout of the MERGE operation due to warehouse constraint. For non-retryable errors, like a consumer bug causing a step to fail, this ensures that the consumer won’t skip the failed batch. Engineers can redeploy the consumer after fixing the bug.
  9. The CSV file is removed from the internal stage, and the transient table is dropped, regardless of the batch’s success. This ensures that in the failure scenario the consumer can reprocess the last failed batch from a clean state, while also preventing additional storage costs for intermediate tables/files.

One significant benefit of this approach is the idempotency of processing change event messages. Since consumers use MERGE (upsert) operations, the type of the change event messages (create, update, delete, read) becomes inconsequential. Only the values of the change events are relevant, ensuring that duplicate messages or reprocessing do not introduce inconsistencies in the destination tables.

Additionally, the consumer is designed with configurable parameters such as “batch_size,” “batch_timeout,” and “snowflake_warehouse,” allowing fine-tuning based on the traffic of the corresponding PostgreSQL table. This setup has proven effective in production.

Interestingly, our consumer operates with a mix of stream and batch processing. Although Kafka and Kafka’s consumers are typically associated with continuous stream processing in near-real time, our system incorporates batch processing under the hood. Similar patterns of combining stream and batch processing can be observed in other Kafka ecosystems, such as the Kafka Snowflake connector and Kafka DynamoDB connector.

Manage consumers in separate consumer groups and deployments

Figure 7

Figure 7 illustrates how our consumers are deployed in Kubernetes, each consumer operates within a pod managed by a separate deployment in our Kubernetes cluster, belonging to its own consumer group. This approach provides flexibility to control the sync process on a per-table basis. We can start or stop a specific consumer without affecting others, as they are managed independently in separate deployments and consumer groups. This proves useful when replaying change event messages for a table from a specific point in time.

The steps are as follows:

  1. Stop the current consumer of the topic by deleting the corresponding deployment.
  2. Set the last committed offset of the consumer group for the topic to the desired offset.
  3. Start the consumer by deploying the deployment again. The consumer will begin from the offset set in step 2.

Another significant benefit of this design is the ability to set or adjust resource requests for each consumer independently. Since our PostgreSQL tables have varying traffic patterns and average record sizes, different “batch_size” and “batch_timeout” values are required for each consumer. As a result, each consumer needs different resource requests in terms of memory and CPU. By having separate deployments, we can define specific resource requests for each consumer.

While this approach offers substantial advantages, given the considerable number of tables we need to sync, manually setting/adjusting resource requests for a large number of deployments can be labor-intensive. To streamline this process, we discovered the VerticalPodAutoScale (VPA), which automatically adjusts resource requests for pods based on their usage patterns. By setting up a VPA for each deployment, we significantly reduced the manual effort needed to manage resource requests.

Monitoring and alerting are essential

One of the significant improvements in the project was the enhanced visibility of the sync process. We utilize Datadog to collect metrics and traces from various systems and services. Our consumers are instrumented to send a wide range of metrics to Datadog, including the crucial consumer lag, which precisely indicates the sync lag of each table. Other vital metrics are also tracked, such as messages produced and consumed for each topic, batch size, and processing times for each batch. The dashboard we designed presents all these essential metrics, allowing us to filter by table name for specific insights. Figure 8 shows the dashboard with important metrics.

Figure 8

We also configured alerts for key metrics. For instance, if the consumer lag of a table abnormally increases over time, we receive an alert to take prompt action. The other metrics in the dashboard help us investigate the factors contributing to the consumer lag increase. A healthy consumer lag should oscillate up and down as the consumer processes messages batch by batch (Figure 9).

Figure 9

However, in cases where the consumer lag of a table continuously increases (Figure 10), it might be due to heavy updates on the PostgreSQL table, like a backfill operation, leading to higher traffic. This can be verified by examining the messages produced/consumed view (Figure 11).

Figure 10
Figure 11

Another possible cause could be an overloaded Snowflake warehouse, resulting in longer processing times for each batch. We can check the Batch Processing Time view (measured in milliseconds) to confirm this assumption (Figure 12).

Figure 12

Having all these metrics available in one place provides us with clear visibility of the sync progress for each table. It simplifies the investigation of potential causes of abnormal sync lag increases.

How we improved the sync for partitioned tables

Lastly, let’s discuss an additional benefit of the Debezium CDC pipeline project. Our main application database includes several partitioned tables, which are among the largest. In logical replication, the partitions of a partitioned table are treated as separate tables. Previously, when using the Fivetran PostgreSQL connector to sync these tables into Snowflake, they were synced as separate Snowflake tables. Figure 13 shows the partitions synced separately.

Figure 13

To improve the user experience in Snowflake, we set up Snowflake streams (Snowflake’s version of CDC streams) for these partitions. We created Snowflake tasks (self-implemented PartitionMerge tasks) that read changes from the streams and issue MERGE statements to merge the changes into a parent table. This allowed users to query the parent table directly without worrying about which partitions to query.

However, this setup had some drawbacks:

  1. Lack of monitoring and alerting for streams and PartitionMerge tasks, making it challenging to detect failures or stale streams and monitor sync progress.
  2. It effectively doubled storage and computation resources, as data was first synced to partition tables in Snowflake by the Fivetran PostgreSQL connectors, then synced to the parent table via streams and PartitionMerge tasks, resulting in data being stored and written twice.

With the Debezium CDC pipelines, we have full control over our consumers. We improved the process by having the consumers of each partition directly execute MERGE statements against the parent table (Figure 14). This small change eliminated the need for PartitionMerge tasks, enhanced reliability and visibility of the sync process, and even reduced costs.

Figure 14

What we achieved

This project involved extensive design, implementation, testing, and learning effort. The project has brought several improvements, including enhanced reliability and visibility of our data sync process, addressing pain points, and gaining more control over the end-to-end sync process. Surprisingly, we managed to save around 30% in costs, although cost saving was not the main objective. Storing change events in Kafka also opens up new potential use cases, such as populating caches, building search indexes, etc. We will build upon these achievements and leverage our newfound capabilities to drive further innovation and success in our data platform.

--

--