Implementing Change Data Capture in practice — Part 2

Miguel Gamallo
Fever Engineering
Published in
10 min readOct 2, 2023
Photo by CHUTTERSNAP on Unsplash

In the previous entry of this series, we explored the motives that brought us into implementing a Change Data Capture (CDC) solution at Fever, and presented the solution we adopted. However, implementing it in practice was quite challenging, and I think it’s interesting to share some of the surprises that are inevitably found when bridging the gap between theory and practice.

In this post, we’ll go through some of those challenges, analyzing how well the solution fit our initial needs, as well as thinking about the interesting new opportunities that arose from this implementation.

Bringing the solution down to Earth

Implementing Change Data Capture was relatively easy, but along the way we found some challenges that we did not foresee. This section details what they were and how we solved them.

Limited availability of previous values

Let’s think about the use case of our calendar session selector, in which we aimed to replace the materialized view that aggregated the information of all the sessions of an experience by a regular table updated only incrementally. Each row in this table is identified by the combination experience_id,local_date and it contains the number of available tickets. For any of the sessions in that date, each time a ticket is sold, canceled or the availability is updated for any other reason, the calendar table should be updated. So we start listening to changes in the session table to update the view. So far so good.

However, as explained in the previous post the CDC events only have the data of an updated row after the change. So if we receive the event informing that a session now has 0 available tickets: how do we update availability for the day if we don’t know how many tickets that particular session was contributing to the total?

GIF of a confused Vincent Vega in Pulp Fiction
How do we update availability for the date with only the values after the change?

The fact is, we can’t. What we can do instead is add a nullable invalidated_at column to the table that is populated when we receive a change event for a session in a given experience and date, and then add a cron job that recalculates the values only for invalidated rows.

This works, but is not enough. What if a given session is rescheduled to another day? In this case, we can invalidate the destination date, but there is no way to decrease the availability from the origin date without the information of the previous state of the row. To include this information in the UPDATE and DELETE events, we need to tweak a PostgreSQL table setting called Replica Identity, which determines how much information is available for the logical decoding plugin, and in turn, which values are available from the previous state of the row. It has 4 possible values:

  • DEFAULT → Only the information of the previous state for the primary key fields, in case one of its values changed
  • NOTHING → No information about the previous value
  • FULL → Contains the previous values for all columns of the table
  • INDEX index-name → Contains the previous values for the columns of the index

It would seem that the best option is to always use FULL to have all the information available, but this setting has an impact in the source database: it increases the amount of information written to the WAL, so it increases the WAL generation rate (and disk consumption). In our case, we already had an index created over the field of interest, so we used the INDEX option as a compromise.

PostgreSQL TOASTED fields

For big fields that would overflow the default page size limit of 8kB, PostgreSQL uses a storage technique known as TOAST (The Oversized Attribute Storage Technique) to offload the information to a different table. This mechanism is designed to be transparent for the regular user (except for performance matters, check out this great post by Haki Benita), but it’s not transparent for the Debezium connector.

Diagram of how PostgreSQL TOAST mechanism stores big fields in an auxiliary table
TOAST stores big values out of line

When a row is changed, unchanged TOASTed values are not available for the connector unless they are part of the Replica Identity field. By default, the values appear in the new state of the row as __debezium_unavailable_value

This behavior was a surprise for us, since it’s common for us to use TOAST-able fields like text or jsonb and we started seeing the derived data systems full of fields with this value. Fortunately, the Debezium developers have published an excellent blog post with four solution proposals:

  • Adding the TOAST-able columns to the replica identity → By using FULL or adding them to the index
  • Excluding unchanged values → The event consumer looks for the text of the placeholder and simply ignores the field.
  • Database triggers → Register a trigger in the destination database for the TOAST-able columns so that if the value to insert matches the placeholder, the last existing value is kept instead
  • Stateful Stream Processing → Create a stream processing application that saves the last known state of the fields in a state storage and replaces the placeholders with the actual value found in the state.

In our case, we use different approaches depending on the derived system. We discarded adding the TOAST-able columns to the replica identity because it increases the load on the source database, which is core for us and already under heavy load. For the catalog replication with another PostgreSQL as the sink database, we use a trigger. For thecdc-worker case, a custom Python consumer detailed in part 1 of this series in charge of invalidating the incremental aggregation rows, we ignore the placeholders in code.

We’ve avoided stream processing so far because it would be our first use case for such a type of application and for now we can avoid this complexity. However, the system described in this article is a good foundation for stream processing services, so it’s a possibility worth exploring in the future.

Schema evolution

Changes in database schema for the tables streamed through CDC now require additional care. Traditionally, we need to make changes in our core application forward compatible, as we first apply the database changes and then deploy the application with the changes in the code. With CDC, we also need to make the derived data systems forward compatible before any changes in the source database.

As an example, to delete a field we need to prepare all the derived data systems to ignore the field first, but to still support receiving it in the payload as the migration will happen later. Since the Kafka records contain the full schema, the only change in the system will be that from a certain point in time, messages will no longer include the deleted field in their schema nor in their payload.

Certain migrations require a last step with the cleanup (to remove forward compatibility), but this step changes with respect to traditional data systems: since messages are not deleted from the bus when they are consumed, we may need to reprocess existing messages even if we are not producing any new messages without the change. So we can’t do this cleanup immediately after the migration, but only once we’re sure we don’t have any messages with the old schema (in our case, retention is configured to 7 days).

Handling tombstones

A computer that is also a tombstone

When deleting a record in the source database, by default the Debezium connector for PostgreSQL emits two messages: the delete event and a tombstone message. This is done to allow removal of keys when using Kafka log compaction. The idea is that Kafka retains the last known value from all log messages with the same key, so that storage can be optimized. However, when deleting a record, ideally we would want to remove all previous changes instead of leaving a last known state of “deleted”. To do that, a message called tombstone with the same key and a null value follows every DELETE event.

We’ve found that some sink connectors, like the Snowflake connector, don’t know how to handle these messages and throw an exception when they find one. It’s possible to configure the source connector to avoid emitting tombstones, but this would break the JDBC connectors that expect these kind of messages to delete rows. Fortunately, there is a simple solution for this with the Kafka Connect framework that still allows us to take advantage of log compaction, these are the Single Message Transforms (SMTs).

SMTs are built-in filters that can do simple operations over records that are managed solely by connector configuration. The beauty of them is that they can be used for different types of connectors, providing considerable flexibility. For source connectors, they are applied before sending the message to the broker. For sink connectors, they are applied before passing the message to the connector code.

The way conditional SMTs work is explained in KIP-585. In essence, we use a plug-in for the predicate, which is a Java class that implements the Predicate interface to identify certain records. There is also the plug-in for the transformation, implementing the Transformation interface. Then, the filter transformation references the predicate to indicate what to do with the records matching it. The relevant fragment of our Snowflake connector configuration for us was as follows:

predicates=isNullRecord
predicates.isNullRecord.type=org.apache.kafka.connect.transforms.predicates.RecordIsTombstone
transforms=dropNullRecords
transforms.dropNullRecords.type=org.apache.kafka.connect.transforms.Filter
transforms.dropNullRecords.predicate=isNullRecord

We define a single predicate called isNullRecord and a single transformation called dropNullRecords. The names are arbitrary, and only used to reference them in other configuration attributes. With the type property, we indicate which plug-in to use for each case. Finally, in the last line we specify the predicate to which the transformation should apply. Since both plug-ins are built-in, as can be seen from the FQNs (org.apache.kafka.connect), we didn’t need to modify the connector file. It’s also possible to implement custom transformations by implementing the aforementioned interfaces and placing the JAR files into the connector bundle.

Did the result match the initial needs?

After we finished the implementation of the CDC system, it’s interesting to take a step back and double-check whether the solution met all the requirements we had.

Materialized views scalability

The total database execution time of the query to calculate our calendar view went from 7.05 minutes per hour down to 79.24 seconds. That’s an 81.26% decrement in database resource consumption for this view.

We also extended the use of this solution to another materialized view that aggregates information for an experience, such as minimum and maximum price and available venues, achieving similar improvements.

Real time data

At the time of writing, the real time data solution is still undergoing validation and is not yet productive. However, early results are encouraging: refresh time for the most important sales tables went down from over 10 minutes to 70 seconds.

Stop polling for derived data systems

We already have in production our first service whose database is populated solely by Change Data Capture. The information is near-real time with a delay of 1 minute, since this derived system needs to re-process the replica tables and does so by using a cron job that runs every minute.

But the biggest advantage here is that this service has zero impact on the source application, to the point that it is not even aware of its existence.

Where to go from here?

With the introduction of Change Data Capture as a solution for our needs, new opportunities are now within closer reach:

  • Stream processing applications → Since we are sending our database changes through Kafka topics, we could build applications that are suited to stream processing such as anomaly detection using these topics. We started playing around with ksqlDB and it seems worth a try if we had a use case for stream processing.
  • Alternative storage systems → For our current use cases, both the origin and destination are relational databases. However, CDC has decoupled this and we could easily migrate the read views to any other read-optimized storage system such as Elasticsearch or Redis.
  • More systems of record → Currently, we have a single Debezium connector publishing data from our core database. We could extend this solution to other databases, as our core application currently polls data from other services using HTTP requests.
  • More efficient serialization → We could serialize records in binary format and store the schemas in an external registry using AVRO, so that messages only include a reference to the schema version instead of the full object. This would massively decrease disk space usage of our Kafka brokers.

Aside from this, there are important considerations such as monitoring and performance that have not been mentioned in this post. They are important topics about which a lot is to be said, so they would deserve their own article.

Conclusion

Implementing a Change Data Capture system at Fever has been a really interesting journey, one in which many challenges were encountered and with a lot of learning along the way, but ultimately resulting in a success story. If you plan to implement such a system, be aware that it’s no simple task. This post is written in the hope that it will be as useful to someone out there as it would have been to us when we began to consider this idea.

We’re hiring!

We’re always on the lookout for the best talent out there. If you found the content of these articles interesting, please take a look at our Careers Page and join the party!

--

--