Creating a no-code AWS native OLTP to OLAP data pipeline — Part 2 (the nitty-gritty)

Leveraging Kafka Connect to Stream Data from MySQL RDS to S3 and make them available to Redshift without writing a line of code.

Haris Michailidis
Data rocks
10 min readMar 10, 2021

--

In this second part (Part one: here), we’re going to discuss all the pitfalls and caveats we discovered while executing this project in Nutmeg so that you can go into the battle prepared. Happy reading and looking forward to your comments! 🧑‍💻

For reference, this is the architecture of the project:

AWS Data Pipeline Architecture

Technical Sections:

  • Schema Transformation using SMT
  • Querying the Data and Compaction
  • Compaction on-read and at-rest
  • Lag, Query Performance and Object Size tradeoffs
  • Where is the Memory?
  • Glue Crawler is magical but…

More:

  • To Sum Up
  • References
  • Acknowledgements

Schema Transformation using SMT

The Single Message Transforms (SMTs) are a Kafka Connect concept that allows you to transform Kafka messages just before you send them outside Kafka (see Sink Connectors) or just before you ingest them into Kafka (see Source Connectors). A couple of very good resources to get familiar with them are the following:

We leveraged SMTs to bring the data into a more Query-friendly schema than the original one. The schema in the source topic, produced by the Debezium Connector (CDC Source Connector), looks as follows:

{
"before": {
"field_a": ...,
"field_b": ...
},
"after": {
"field_a": ...,
"field_b": ...
},
"source": {
...
},
"op": "c/u/d",
"ts_ms": ...
}

You can find more about the Debezium MySQL Connector and its schema here: https://debezium.io/documentation/reference/connectors/mysql.html

As you can see above, the schema is more than double in size from what we actually need (which is the “after” part). One can say that we could just keep the “after” and that you would work fine, but this is not the case as the S3 Connector only writes to S3 Objects the “value” of the Kafka messages and not the “key”. So a deleted record would have null in the “after” part of the value but we wouldn’t be able to see to which “key” this null corresponds as keys are not included in the S3 Objects.

To the rescue came the Debezium SMTs https://debezium.io/documentation/reference/configuration/event-flattening.html.

Using the following configs we managed to have the schema in optimal, flattened structure without losing any of the useful information.

transforms=unwrap,insertOffset
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=true
transforms.unwrap.delete.handling.mode=rewrite
transforms.unwrap.add.fields=op
transforms.insertOffset.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.insertOffset.offset.field=__offset

In more detail, the SMTs are doing the following actions:

  • unwrap.drop.tombstones=true: This removes the tombstones so we don’t get null records in the S3 Object.
  • unwrap.delete.handling.mode=rewrite: Whenever there is a deletion, in the “after” part, the “before” data is re-written instead of a null and the field “__deleted” is set to true, otherwise this field remains false.
  • unwrap.add.fields=op: This propagates the field op, short for “operation” (c: create, u: update, d: delete) from the original schema to the new one.
  • insertOffset.offset.field=__offset: This adds the “offset” of the original Kafka message as __offset in the output, it will be used later to find the latest message per key (See the next section: Querying the Data and Compaction).

The output schema which constructs the S3 Objects looks as follows, making it very easy to query as you’ll see in the next section.

{
"field_a": ...,
"field_b": ...,
"op": "c/u/d",
"__deleted": true/false,
"__offset": ...
}

Querying the Data and Compaction

When editing records in the source tables, potentially multiple intermediate versions are sent to Kafka and thus being stored in S3. This means that when you query S3 for a particular Primary Key (PK) you’ll potentially get multiple records, contrary to the upstream DB which will only return one.

If you understand that, you can leverage it and extract even more information about your data since now you have at your disposal the whole history of modifications, not just the current view (mention this to your Data Scientists and the excitement is guaranteed).

But if you want to get the “current view” of your data, you’ll need to implement Compaction. Specifically, I’m talking about the process of getting the latest version per Primary Key. In order to achieve this, you’ll need to know what is the PK per table (single or composite) and leverage the Kafka offsets to get the latest version. (Kafka Offsets are auto-incremented integers that are generated by Kafka and are the way to guarantee order withing a topic partition)

Note: This method works even if you have multiple partitions on you topic, as each key will always end up in the same partition thus getting the max offset per key will be still valid.

The SQL statement that follows is an example of achieving this on query time.

In this example, the table with the S3 data (mapped as a Glue Table, see Part 1) source_table contains a composite PK, consisting of two fields pk_part_1,pk_part_2. We initially construct the table max_offset_table which holds the maximum (highest) offset per combination of these two fields, effectively the maximum offset per PK of the upstream table.

In the main part of the query we Inner Join our original table (source_table) with the new table (max_offset_table) keeping only the entries corresponding to the maximum offset, thus the latest value per PK. Finally, we filter-in only the entries that their latest version is not a delete (WHERE source_table.__deleted = false).

with
max_offset_table AS (
SELECT
pk_part_1,
pk_part_2
MAX(__offset) AS max_offset
FROM
source_table
GROUP BY
pk_part_1, pk_part_2
)

SELECT
source_table.*
FROM
source_table
INNER JOIN
max_offset_table ON
source_table.pk_part_1 = max_offset_table.pk_part_1
AND source_table.pk_part_2 = max_offset_table.pk_part_2
AND source_table.__offset = max_offset_table.max_offset
WHERE
source_table.__deleted = false

Compaction on-read and at-rest

As mentioned in the previous section, we need to apply Compaction when querying our data to get the latest view. The query shown above is the one you need to execute to get the latest view every time you query your underlying data.

A different approach would be to update the records on write, in contrary to appending newer versions, thus having always the “current view” stored in S3. That is fairly complicated, will probably throttle your writes (S3 is an object store, not a database) and is not supported with the S3 Sink Connector thus you’ll need to write custom code (a no-go for this project, see Title).

A middle-ground solution can be found if you have a scheduled job to compact the data at rest periodically plus executing the on-read compaction query every time you access your data. This way you will not throttle your writes to S3, nor you’ll need to implement something complicated on write and running a scheduled job periodically is fairly easy nowadays. At the same time, this will guarantee that the on-read query will not consume too many files (just the latest compacted file plus any new ones since the last time compaction run).

Note: When thinking about compacting the data at rest you would need to replace the original files from the directory that the Glue Table points otherwise Glue will read both the original and compacted data thus resulting in even worse query performance. If you don’t want to delete the original data you can always move them to an “archive” directory for reference.

As with almost everything, it’s a tradeoff game and the path you’ll choose depends on your read/write access patterns and the throughput requirements.

Lag, Query Performance and Object Size tradeoffs

Two important settings that we tuned on a case-by-case basis where the flush.size and rotate.schedule.interval.ms. For optimum querying performance, Redshift Spectrum prefers large-ish files and it’s recommended to avoid multiple KB-sized ones (https://aws.amazon.com/blogs/big-data/10-best-practices-for-amazon-redshift-spectrum/).

By reading the above you might try to set the flush.size to a large-enough number as well as the rotation to avoid small files. As you can imagine, in this case, lag becomes an issue as it might take hours or days to flush based on the flush.size trigger and that could be outside the SLA with your consumers.

On the other extreme, a very short rotate.schedule.interval.ms, in order to satisfy real-time analytics, could result in multiple KB-size files and decrease query performance.

Guess what, no silver bullet here either, this is again a tradeoff situation. For us, the best strategy was to understand the business use case, the maximum acceptable lag of our consumers, and tune appropriately.

Where is the Memory?

Starting your day with an Out-Of-Memory (OOM) Error from the Connect Cluster is always a fun feeling (not really). This happened more than once until we understood the root cause…

TL;DR: Cardinality explosion on one of the fields used by the FieldPartitioner.

Let’s look at an example, let’s say you’re sending a table to S3, containing a “date” field and you know that processes usually write things for “today” and when analysts access these data they also access particular dates. All these signs naturally lead to using the FieldPartitioner and partition by that “date” field. The problem will present itself when someone decides to update multiple dates in a short time window.

Let me explain, the Sink Connector opens a new file in memory for every combination of Kafka partitions assigned to the Task and S3 paths (partitions) as defined by the FieldPartitioner. So if you’re running one Connector with one Task and your input topic has 10 partitions and all messages you’ve read since the last flush are from the same date, you’ll have 10 open files in memory.

But if the upstream producer (in this case Debezium) produced messages for all the dates in a month, then suddenly the task needs to open ~30*10 files in memory, with the default s3.part.size=25MB means 30*10*25MB=7.5GB and this can potentially cause the OOM issue.

You can check the following post that has some more detailed analysis: https://stackoverflow.com/questions/50971065/kafka-connect-s3-connector-outofmemory-errors-with-timebasedpartitioner/51160834#51160834

Solution: Either make the flush.size small enough so that the Task will flush before it opens too many files, or of course, throw memory at the problem.

Glue Crawler is magical but…

As mentioned in the “S3 & AWS Glue” section of Part 1, Glue Crawler helped us massively by automatically reading and mapping the parquet files into metadata tables but there are a couple of that need attention.

1st Note: Something interesting we noticed is that if you use one of the columns of the table as a partition key, after the crawler creates the table, you’ll see two fields with the same name, one for the actual field and one for the partition key. This breaks the queries from Redshift Spectrum, as it can’t have two columns with the same name, thus you need to either delete the “field” one or rename one of them.

Seems that others have faced the same issue with no massively better solutions: https://stackoverflow.com/questions/59268673/aws-athena-duplicate-columns-due-to-partitionning

2nd Note: Crawler can map partitions and later use them from Spectrum to efficiently query a subset of the data, but it needs to run to recognise any new ones. For example, if you use the FieldPartitioner with a “date” field, you’ll have a new partition each day. The data under the new partition will not be discoverable until you run the crawler and maps this “directory” (S3 prefix to be exact) as a new partition. Thus you might want to consider running the crawler periodically.

Source: https://media.giphy.com/media/QkBIixosz2Ol2/giphy.gif

To Sum Up

Having such pipelines in place enables us to stream any Table from the upstream DBs to S3 by just setting up two Connectors (the CDC Source and the S3 Sink). On top of that, by reading data from Kafka, means that we’re not limited to having MySQL as the source but we can send any piece of data we want to the Analytical side as long as it leaves in Kafka!

Running Analytical Queries on the source data has become trivial and with minimal effort for each additional Table. As described in this post, there are a few details that one needs to watch-out and tune but being able to deliver such a project without writing a single line of code is definitely a sign of our times and a testament to the maturity of the tools around Data Processing.

References

Acknowledgements 🙇‍♂️

Once again, kudos to Francesco Nobilia and Javier Holguera for helping me putting together this post and assisting me with finding solutions to many of these problems.

--

--