Handling Upstream Data Changes Via Change Data Capture
Anyone who has managed a data pipeline would be aware of how the upstream data can change and how it can impact the entire pipeline. Imagine we have the following customer data in our datalake.
+---+-----+
| id| name|
+---+-----+
|id1|Alice|
|id2| Bob|
+---+-----+
One month later, we realized that the customer name has changed from Alice
to Carol
and we have been using incorrect data for the past month. Such data inaccuracies can impact our data analysis and machine learning models. So, how can we detect it and how can we automate it?
Fortunately, with enough information, it is easy to update the data without over-complicating our existing pipeline. We need two components:
- Change Event Generator — to generate the create/ update/ delete events of our data
- Change Event Resolver — to apply these changes on our existing data
The need for the Change Event Resolver is debatable if we can afford to live on a bleeding edge by using Apache Hudi or Delta Lake. Instead of resolving the changes and keeping a single copy of the data, Hudi / Delta Lake can persist each revision of our data (timeline
in Hudi and time travel
in delta lake). This is undoubtedly more powerful but there are scenarios where these components will not simply fit into existing pipelines especially because they require the data to be stored in their own hudi
or delta
format instead of the common parquet
or avro
format. This means our data cataloging, data monitoring and data visualization now need to understand hudi
or delta
- which may not be ready in a year's time especially if these services are not developed in-house.
A model to capture the change events
Let’s try to model a class that we can use to capture the changes.
Most of the attributes are self-explanatory. oldKeyNames
and oldKeyValues
contain the primary keys / values of the old data which will be used in case of UPDATE
/ DELETE
queries. We can also enrich the model with more attributes such as columnTypes
and oldKeyTypes
if we need to apply the changes differently based on the column data types.
This is commonly known as Change data capture (CDC) and is supported by many databases — including PostgreSQL, MySQL and MongoDB. In fact, our ChangeEvent
class is a simplified output of wal2json - a PostgreSQL plugin used for logical replication.
Pros & Cons
There are 2 main advantages of using CDC:
- Although the primary purpose of this feature is to create database replicas and to migrate data, it becomes very powerful in data pipelines since the change is available in the pipeline almost immediately — making a real time pipeline possible.
- The upstream does not need to implement the notification logic in several places. To give an example, think of a movie review website. The pipeline needs to be informed when the user creates / updates / deletes his reviews — which means the pipeline needs to be informed across 3 different REST APIs. All it takes is one developer to forget notifying the pipeline and we will be losing data without anyone realising it.
The slight con here is that there is a slight performance degradation on the source database due to replication but the impact will differ based on the type of the database.
Questions, questions, questions
Now, after hearing this proposal, we might want to ask ourselves some questions.
- What if the data does not have the concept of a primary key (i.e. no
oldKeyNames
oroldKeyValues
)? - What if the primary key changes?
- What if I add a column or rename a column?
- What if I remove a column?
- What if a column changes its data type?
Unfortunately, if our data does not have a primary key, CDC will not work at all (unless we are only interested in INSERT
s and do not care about UPDATE
s or DELETE
s). In this case, the best would be to add a UUID or a sequence column to be used as the primary key. Primary key changes rarely happen to properly designed data models but in the unavoidable case, it would be best to treat post-change data as a new data model and get a snapshot of the data.
The rest of the questions can be addressed within our code when we apply the change events to our existing data but these will not be covered in this post.
Now, let’s see how it works with some examples. I am using Apache Spark 2.4
in this example but the same logic can be reimplemented in other frameworks as well.
Mocking the existing data
First, we are going to mock some data that already exists in our datalake. Notice that timestamp
column is present in the data previously written to the datalake since we need to apply the change events based on this column.
+---------+---+-----+
|timestamp| id| name|
+---------+---+-----+
| 0|id1|Alice|
| 0|id2| Bob|
+---------+---+-----+
Change Event Generator
Next is to populate the change events that we want to apply to this data that our Change Event Generator
would produce. The implementation of this generator is source specific so we won't be covering this in this post.
Note that oldKeyNames
and oldKeyValues
are not present for insert
. Similarly, columnNames
and columnValues
are not required for delete
.
+----------+---------+-----------+-------------+-----------+-------+
|changeType|timestamp|columnNames|columnValues |oldKey |oldKey | | | | | |Names |Values | +----------+---------+-----------+-------------+-----------+-------+
| update| 1| [id, name]|[id1, Angela]| [id] | [id1]|
| delete| 2| null| null| [id] | [id2]|
| insert| 3| [id, name]| [id2, Carol]| null | null|
+----------+---------+-----------+-------------+-----------+-------+
Let’s split the columnNames
and columnValues
columns so that we have a dataframe similar to our existing data. But we will keep the oldKeyNames
and oldKeyValues
since we still need to use them.
+----------+---------+-----------+------------+----+------+
|changeType|timestamp|oldKeyNames|oldKeyValues| id| name|
+----------+---------+-----------+------------+----+------+
| update| 1| [id]| [id1]| id1|Angela|
| delete| 2| [id]| [id2]|null| null|
| insert| 3| null| null| id2| Carol|
+----------+---------+-----------+------------+----+------+
According to these change events, we should apply the changes in this order.
- update
name
ofid1
fromAlice
toAngela
- delete
id2
- insert
id2
again withname
=Carol
Change Event Resolver
Now it’s time to create our second component that applies the change events generated by our Change Event Generator
. First, let's create a couple of helper functions:
unionWithSchema
- union dataframes while accounting for schema differences such as different column orders or mismatched columns, e.g. df1 has 2 columns: colA and colB but df2 only has colA.
applyChangeEventsByTimestamp
- apply change events chronologically.
Resolving change events
First, we will handle the most common scenario — new records getting added. We do not need to use timestamp
at this point. Notice that there are 2 records for id2
since delete
change event has not been applied yet.
+----------+---+-----+---------+
|changeType| id| name|timestamp|
+----------+---+-----+---------+
| insert|id2|Carol| 3|
| null|id1|Alice| 0|
| null|id2| Bob| 0|
+----------+---+-----+---------+
Now, let’s handle updating and deleting records. In our example, there is only one unique primary key array so the code inside the fold left will be executed only once.
Let’s go through each step and visualize the output.
Step 1: Enrich datalakeAndInsertDf
to include information about the existing primary keys. In this example, it would be old_id
.
scala> datalakeAndInsertCeDf.show
+----------+---+-----+---------+------+
|changeType| id| name|timestamp|old_id|
+----------+---+-----+---------+------+
| insert|id2|Carol| 3| id2|
| null|id1|Alice| 0| id1|
| null|id2| Bob| 0| id2|
+----------+---+-----+---------+------+
Step 2: Enrich updateDeleteDf
to include information about the existing primary keys ( old_id
in this example)
scala> updateDeleteCeDf.show
+----------+---------+----+------+------+
|changeType|timestamp| id| name|old_id|
+----------+---------+----+------+------+
| update| 1| id1|Angela| id1|
| delete| 2|null| null| id2|
+----------+---------+----+------+------+
Step3: Union the dataframes from #1 and #2.
scala> initialDf.show
+----------+----+------+------+---------+
|changeType| id| name|old_id|timestamp|
+----------+----+------+------+---------+
| insert| id2| Carol| id2| 3|
| null| id1| Alice| id1| 0|
| null| id2| Bob| id2| 0|
| update| id1|Angela| id1| 1|
| delete|null| null| id2| 2|
+----------+----+------+------+---------+
Step 4: Apply change events chronologically as defined by our applyChangeEventsByTimestamp()
function based on timestamp
. At this point, we should only have one record per primary key (except for delete
records which will be removed in the next step).
scala> resolvedDf.show
+----------+---+------+------+---------+
|changeType| id| name|old_id|timestamp|
+----------+---+------+------+---------+
| update|id1|Angela| id1| 1|
| insert|id2| Carol| id2| 3|
+----------+---+------+------+---------+
Step 5: Remove records with delete
change type as they are not to be persisted. In our case, the delete
record is already overwritten by insert
so there is nothing to be removed. old_id
is also dropped since we do not need this information anymore.
+----------+---+------+---------+
|changeType| id| name|timestamp|
+----------+---+------+---------+
| update|id1|Angela| 1|
| insert|id2| Carol| 3|
+----------+---+------+---------+
Step 1–5 is to be repeated if there is any more combination of primary keys but since we only have one id
in this example, this is the end of the resolution. We just need to drop the changeType
and remove any duplicate record.
+---+------+---------+
| id| name|timestamp|
+---+------+---------+
|id2| Carol| 3|
|id1|Angela| 1|
+---+------+---------+
And, Voila! We have a dataframe with the applied changes ready to be persisted to our datalake in any format we want.
Originally published at https://dev.to on September 14, 2020.