Data Materialization in Snowflake Using Confluent Kafka Connect: Part 2

Making On-Prem Oracle Databases Useable with Snowflake in Azure with Kafka Connect

Venkat Sekar
Hashmap, an NTT DATA Company
12 min readApr 21, 2020

--

In Part 1, Data Sync to Snowflake Using Confluent Kafka Connect, we discussed the reasons behind adopting Confluent to ingest data into Snowflake. I also highlighted some of the hurdles that we had faced, and the various ways we ended up solving them.

In this Part 2 post, I’ll review the steps you need to take in order to make the data readily available for consumption once it’s ingested into Snowflake via the Kafka Connector, namely:

  • Parsing the message payload.
  • Handling materialized table creation/alterations on the fly.
  • Applying data conversion as the records get stored in the materialized table.
  • Upserts/Merging of records in the materialized table.

I’ll walk through the implementation of how each of these was solved, and I hope that by sharing this implementation process, you will get a good starting point if you are looking to use Kafka with Snowflake; in fact, you might come out of it with a more refined approach than what I presented once you’re familiar with the process.

Scenario

To review, in my example, I want to synchronize data from an on-prem database into Snowflake using Kafka.

Source: Pipeline to the Cloud — Streaming On-Premises Data for Cloud Analytics

The software stack is below:

  • Confluent Kafka
  • Confluent Connect
  • JDBC Source connector
  • Snowflake Kafka connector

Point of View on the JDBC Source Connector

The adoption of the JDBC Kafka connector does have some limitations, as mentioned in Part 1. The JDBC source connector is simplistic and offers a polling-query based approach to retrieve the data from the tables whitelisted:

  • Unless there are specific columns (a flag column or timestamp column) in the source tables, you cannot identify if the record is an update vs insert.
  • Only logical deletes can be handled, meaning there is a column indicating a delete.
  • Table truncates, hard record deletes cannot be determined.
  • Dropping off a table and recreating the table by the external process can cause issues with the connector.
  • Between the polling period, if there are multiple updates, only the last committed update will be sent over in the topic.

For these reasons, you have to evaluate if the JDBC source connector is the right choice; this does not mean the connector cannot be used at all.

Here are some situations where I think it can be adopted:

  • Truncate and load scenario once per day. This is doable in the case of a smaller dataset.
  • There is not a need for Snowflake to adopt a Slowing Changing Dimension (SCD) implementation.

Let’s say for this discussion that you are ok with doing a snapshot-based load into Snowflake and therefore decided to use the JDBC Kafka Connector.

Limitations

Now, will this solution be suitable for all scenarios? I cannot guarantee that as your situation and mileage will vary. What do I mean by this? Here are a few potential points to think about:

  • The number of records in the source table.
  • Load on the source database to query and extract.
  • Network bandwidth speed for transferring data.
  • Resource on which Confluent Kafka Connect is running. The number of Kafka Connect instances, memory, etc.

So while what I’ve illustrated is workable, you will need to evaluate according to your environment/situations. So let’s get to the implementation.

Data Acquisition

Though AVRO is the preferred format of the message in Kafka topic, for this implementation I’ll be using the JSON message format.

I chose the JSON message format because the schema, which is table metadata, will be part of each and every message. I’ll use this information to create the final materialized table after the data is ingested into Snowflake.

Confluent Kafka Connect Configuration

Here is the Confluent Kafka Connect configuration which helps us to get a bulk data load of the table into Kafka:

name=orcl-sourceconnector.class=io.confluent.connect.jdbc.JdbcSourceConnectortasks.max=1connection.url=jdbc:oracle:thin:@localhost:49161:xeconnection.user=books_adminconnection.password=passworddialect.name=OracleDatabaseDialectschema.pattern=APEX_040000table.whitelist=TEST_DEMO_EMP# mode of operationmode=bulk#convertor definitionstopic.prefix=tpc-json-key.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=true#To handle data type conversionnumeric.mapping=best_fittransforms=Casttransforms.Cast.type=org.apache.kafka.connect.transforms.Cast$Valuetransforms.Cast.spec=TESTSAL:int32

Data Ingestion

Snowflake Kafka Connect Configuration

Below is a sample of the Kafka Connect configuration:

name=sflksinkconnector.class=com.snowflake.kafka.connector.SnowflakeSinkConnectortasks.max=1topics=tpc-json-TEST_DEMO_EMPsnowflake.topic2table.map= tpc-json-TEST_DEMO_EMP:CDC_EMPLOYEEbuffer.count.records=10000buffer.flush.time=60buffer.size.bytes=5000000snowflake.url.name=XXXXXX.east-us-2.azure.snowflakecomputing.comsnowflake.user.name=XXXXXXXsnowflake.private.key=MIIFXXXXXXX8w==snowflake.private.key.passphrase=adminsnowflake.database.name=DSC_RAWsnowflake.schema.name=SOMESCHEMAsnowflake.metadata.offset.and.partition=falsekey.converter=org.apache.kafka.connect.storage.StringConvertervalue.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter

Data Materialization in Snowflake

I’ll kick things off with a summary of key terminology based on the diagram below:

  • CDC Table: This is the table into which the Kafka-connect will insert the record. In this example, the table name is CDC_EMPLOYEE.
  • Materialized Table: This is the final table which will be used by the user to query the records. The data will be converted to the appropriate data type as reflected by the schema element. In the example the table name is TEST_DEMO_EMP.
  • CDC_CTAS_SP: This is the stored-procedure which will help in creating the materialized table.
  • CDC_DATASYNC_SP: This is the stored-procedure that will ingest the data into the materialized table.

CDC Table

Below is a view of a sample message from the CDC_EMPLOYEE table. The records were inserted by the Snowflake Kafka Connector:

Materialized table

Here is a view of a sample message from the EMPLOYEE table. Using the stored procedures, explained in the following section, we will create the table and insert records into the table, based on the message from the CDC table.

Stored Procedure: CDC_CTAS_SP

The CDC_CTAS_SP stored procedure provides the following functionality:

  • Determines if the materialized table exists in the target schema.
  • If the table does not exist, then it will create the table.
  • If the table does exist, it will determine if there are any new columns present in the payload.
  • If there are new columns, then it will add these columns to the existing materialized table.
  • This will not ingest/merge data into the materialized table.

This stored procedure should be called every time, before the CDC_DATASYNC_SP, to avoid potential failures.

Execution

On a successful execution the output looks like below:

The output JSON results reflect:

  • ALTER TABLE: Flag indicating if the material table was altered. The flag is set to true only if the table existed and there are new columns found in the messages
  • CREATE TABLE: Flag indicating that the materialized table did not exist and it got created.
  • FAILURE ERROR: The list of all failures that were thrown during execution. Potential failures can be an insufficient privilege to create the table. Not able to read the CDC table, etc.
  • NO_UPDATED_TO_APPLY: Flag to indicate no changes to the materialized table structure. Set to true only if the table existed and there are no new columns
  • SUCCESS: Flag to indicate if the procedure was executed successfully.

On successful execution, you should see that the table ‘TEST_DEMO_EMP’ has been created and it also has various columns based on the appropriate data types.

Stored Procedure — CDC_DATASYNC_SP

The CDC_DATASYNC_SP stored procedure performs the following functionality:

  1. Retrieves the table name from the payload.
  2. Queries the Information Schema to retrieve the columns defined.
  3. Truncates the materialized table.
  4. Ingests the data from the message payload into the materialized table using the INSERT SELECT statement.
  5. Truncates the CDC table.

You could argue that you could have used ‘INSERT OVERRIDE’ for step #3 and #4 and that #5 we could have used streams. While true and valid, I choose to keep it simple for this writeup; mileage will vary depending on your needs.

Code It Up

The implementation code is listed in the sections below:

Background

The following is the code for the stored procedure; it has been written in a generic manner, allowing you to adopt it without any modifications. For this procedure to run though you need to ensure:

  • The caller has the read permission on the database, schema, and the CDC table.
  • The caller has the create table, alter table on the same database, schema where the CDC table is also present.
  • The caller can select, insert, update, and delete permission for the materialized table.

I suggest modifying the part of how this is achieved to fit your specific needs.

Stored Procedure — CDC_CTAS_SP

create or replace procedure cdc_ctas_sp(PARAM_TBL_DB VARCHAR ,PARAM_TBL_SCHEMA VARCHAR ,PARAM_KFK_PAYLOAD_TBL VARCHAR)returns VARIANT NOT NULLlanguage javascriptas$$var failure_err_msg = [];var return_result_as_json = {};// — This is a single big query// — 1. projects the schema section of the payload (BASE)// — 2. transpose the schema constituent fields into a table (project_dtype)// — 3. creates a ‘CREATE OR REPLACE TABLE statement’ (create_tbl_stmt)// — 4. check the information schema to see if the table already exists (tbl_exists_chk)// — 5. generate alter table statements, which are to be issued to add columns (alter_col_stmts)// — issues select a unionized query with the create_tbl_stmt & alter_col_stmts alias.// — The result will either contain// → a create table statement if the table does not exist// → a bunch of alter statements if the table does exist.// — NOTES:// — Since there are high chances of timezone variations, the timestamp fields will be stored// — as epoc time (number). We leave the conversion to the end-user to handle as appropriate.// —const create_or_alterstmt_generator_qry = `WITH BASE AS (SELECT RECORD_CONTENT RC , parse_json(RC:schema) skemaFROM ${PARAM_TBL_DB}.${PARAM_TBL_SCHEMA}.${PARAM_KFK_PAYLOAD_TBL}) ,project_dtype AS (SELECT ‘’||skema:name::STRING tbl_name,value:field::STRING col_name,lower(value:type::STRING) org_col_type,lower(value:name::STRING) org_col_type_name,NVL(org_col_type_name ,org_col_type) pseudocol_tname,CASEWHEN pseudocol_tname = ‘string’ THEN ‘TEXT’WHEN pseudocol_tname = ‘int32’ THEN ‘NUMBER’WHEN pseudocol_tname = ‘int64’ THEN ‘NUMBER’WHEN pseudocol_tname = ‘double’ THEN ‘NUMBER’WHEN pseudocol_tname = ‘org.apache.kafka.connect.data.timestamp’ THEN ‘NUMBER’ELSE ‘TEXT’END col_dtype,UPPER(‘${PARAM_TBL_DB}’) TABLE_CATALOG,UPPER(‘${PARAM_TBL_SCHEMA}’) TABLE_SCHEMA,count(*)FROM BASE b, lateral flatten(input => skema ,path=>’fields’)GROUP BY tbl_name ,col_name ,col_dtype ,org_col_type ,org_col_type_name ,pseudocol_tname) ,create_tbl_stmt as (SELECT tbl_name,‘create or replace TABLE ${PARAM_TBL_DB}.${PARAM_TBL_SCHEMA}.’ || tbl_name || ‘( ‘|| LISTAGG(col_name ||’ ‘ || col_dtype ,’ ,’)|| ‘);’ create_stmtFROM project_dtypeGROUP BY tbl_name) ,tbl_exists_chk as (select distinct tbl_name ,NVL2(TABLE_NAME ,1,0) tbl_existsfrom information_schema.tables itblRIGHT OUTER JOIN project_dtype pON p.tbl_name = itbl.table_nameAND p.TABLE_CATALOG = itbl.TABLE_CATALOG AND p.TABLE_SCHEMA = itbl.TABLE_SCHEMA) ,alter_col_stmts as (select delta_tbl.TBL_NAME ,COL_NAME ,col_dtype ,ORG_COL_TYPE ,org_col_type_name,itbl.column_name ,itbl.data_type,’ALTER TABLE ${PARAM_TBL_DB}.${PARAM_TBL_SCHEMA}.’ || delta_tbl.TBL_NAME || ‘ ADD COLUMN ‘ || COL_NAME || ‘ ‘ || col_dtype || ‘;’ alter_stmtsFROM information_schema.columns itblRIGHT OUTER JOIN project_dtype delta_tblON delta_tbl.tbl_name = itbl.table_nameAND delta_tbl.col_name = itbl.column_nameAND delta_tbl.TABLE_CATALOG = itbl.TABLE_CATALOG AND delta_tbl.TABLE_SCHEMA = itbl.TABLE_SCHEMA— AND delta_tbl.col_dtype != itbl.data_typeWHERE1 = (select tbl_exists from tbl_exists_chk) — If the table does not exist then dont generate the missing columsAND itbl.data_type IS NULL— OR delta_tbl.col_dtype != itbl.data_type — for future; detect column data type changes)SELECT te_chk.tbl_exists ,cstmt.create_stmtFROM tbl_exists_chk te_chkJOIN create_tbl_stmt cstmt ON cstmt.tbl_name = te_chk.tbl_nameWHERE 0 = te_chk.tbl_exists — generate only if the table does not existUNIONSELECT 100 ,alter_stmtsFROM alter_col_stmtsORDER BY tbl_exists ;`;var table_modification_sqls = [];var tbl_exists = -1;try {var rs = snowflake.execute({ sqlText: create_or_alterstmt_generator_qry });while (rs.next()) {tbl_exists = rs.getColumnValue(1);table_modification_sqls.push( rs.getColumnValue(2) );}} catch (err) {failure_err_msg.push(` {sqlstatement : ‘${create_or_alterstmt_generator_qry}’,error_code : ‘${err.code}’,error_state : ‘${err.state}’,error_message : ‘${err.message}’,stack_trace : ‘${err.stackTraceTxt}’} `);}return_result_as_json[‘NO_UPDATES_TO_APPLY’] = (tbl_exists == -1);return_result_as_json[‘CREATE_TABLE’] = (tbl_exists == 0);return_result_as_json[‘ALTER_TABLE’] = (tbl_exists == 100);var stm = ‘-’;var failure_count = 0;var sucess_count = 0;table_modification_sqls.forEach(function(statement) {try {snowflake.execute({ sqlText: statement });sucess_count = sucess_count + 1;} catch (err) {failure_count = failure_count + 1;failure_err_msg.push(` {sqlstatement : ‘${statement}’,error_code : ‘${err.code}’,error_state : ‘${err.state}’,error_message : ‘${err.message}’,stack_trace : ‘${err.stackTraceTxt}’} `);} });return_result_as_json[“Success”] = sucess_count;return_result_as_json[“Failures”] = failure_count;return_result_as_json[“Failure_error”] = failure_err_msg;return return_result_as_json;$$;

Stored Procedure — CDC_DATASYNC_SP

create or replace procedure cdc_datasync_sp(PARAM_TBL_DB VARCHAR ,PARAM_TBL_SCHEMA VARCHAR ,PARAM_KFK_PAYLOAD_TBL VARCHAR)returns VARIANT NOT NULLlanguage javascriptas$$var failure_err_msg = [];var return_result_as_json = {};var table_col_projection_str_withtype = ‘’;var table_col_projection_str = ‘’;var table_to_load = ‘’;// — — — — — — — — — — — — — — —// Retrieve the table to load , which is present in the cdc record content payload jsonconst retreive_tblname_from_payload_qry = `SELECT RECORD_CONTENT:schema:name::STRING tbl_nameFROM ${PARAM_TBL_DB}.${PARAM_TBL_SCHEMA}.${PARAM_KFK_PAYLOAD_TBL}LIMIT 1;`;try {var rs = snowflake.execute({ sqlText: retreive_tblname_from_payload_qry });while (rs.next()) {table_to_load = rs.getColumnValue(1);}} catch (err) {failure_err_msg.push(` {sqlstatement : ‘${retreive_tblname_from_payload_qry}’,error_code : ‘${err.code}’,error_state : ‘${err.state}’,error_message : ‘${err.message}’,stack_trace : ‘${err.stackTraceTxt}’} `);}// — — — — — — — — — — — — — — —//Assume for now that the table exists.//Query the information schema to retrieve the column information for the table// (TABLE_TO_LOAD varchar )var info_schema_qry = `SELECT LISTAGG(COLUMN_NAME ,’,’),LISTAGG(CONCAT(IFF(DATA_TYPE = ‘TIMESTAMP_NTZ’ ,’(‘ ,’’),’P:’,COLUMN_NAME,IFF(DATA_TYPE = ‘TIMESTAMP_NTZ’ ,’::Number/1000)::’ ,’::’),DATA_TYPE),’,’)FROM INFORMATION_SCHEMA.”COLUMNS”WHERE TABLE_NAME = ‘${table_to_load}’AND TABLE_CATALOG = UPPER(‘${PARAM_TBL_DB}’)AND TABLE_SCHEMA = UPPER(‘${PARAM_TBL_SCHEMA}’) ;`;//Assume for now that the table exists.try {var rs = snowflake.execute({ sqlText: info_schema_qry });while (rs.next()) {table_col_projection_str = rs.getColumnValue(1);table_col_projection_str_withtype = rs.getColumnValue(2);}} catch (err) {failure_err_msg.push(` {sqlstatement : ‘${info_schema_qry}’,error_code : ‘${err.code}’,error_state : ‘${err.state}’,error_message : ‘${err.message}’,stack_trace : ‘${err.stackTraceTxt}’} `);}//retrieve record from the streamvar insert_sql = `INSERT INTO ${PARAM_TBL_DB}.${PARAM_TBL_SCHEMA}.${table_to_load} (${table_col_projection_str}) SELECT * FROM (WITH BASE AS (SELECT RECORD_CONTENT RC , parse_json(RC:payload) PFROM ${PARAM_TBL_DB}.${PARAM_TBL_SCHEMA}.${PARAM_KFK_PAYLOAD_TBL}) SELECT${table_col_projection_str_withtype}FROM BASE);`;const truncate_stmt = `TRUNCATE TABLE ${PARAM_TBL_DB}.${PARAM_TBL_SCHEMA}.${table_to_load};`;const truncate_cdctablestmt = `TRUNCATE TABLE ${PARAM_TBL_DB}.${PARAM_TBL_SCHEMA}.${PARAM_KFK_PAYLOAD_TBL};`;var data_loaded = false;try {snowflake.execute({ sqlText: truncate_stmt });var rs = snowflake.execute({ sqlText: insert_sql });data_loaded = true;//TODO :Enable if you want to clean out CDC table for a fresh snapshot load//snowflake.execute({ sqlText: truncate_cdctablestmt });} catch (err) {failure_err_msg.push(` {sqlstatement : ‘${insert_sql}’,error_code : ‘${err.code}’,error_state : ‘${err.state}’,error_message : ‘${err.message}’,stack_trace : ‘${err.stackTraceTxt}’} `);}return_result_as_json = {“DataLoaded” : data_loaded,“Failure_error” : failure_err_msg};return return_result_as_json;$$;

Final Thoughts

After reviewing and digesting this solution approach, you might be thinking some other methods would have been appropriate. While that is valid and arguable, some of the factors I chose to consider were the number of resources code artifacts to maintain, the developer team's needs, etc. There were certainly some limitations and rails that we had to deal with as outlined above. Always evaluate accordingly to your specific needs.

While we demonstrated data synchronization using Confluent Kafka Connect to Snowflake, this same pattern could be extended out for cloud-native solutions like Azure Event Hub.

I hope this mini-series on Confluent Kafka and Snowflake has helped you and provided valuable insight into getting even more value from your Snowflake cloud data warehouse. It would be great to hear your thoughts and what your next set of moves is with Snowflake!

Need Help with Your Cloud Initiatives?

If you are considering the cloud for migrating or modernizing data and analytics products and applications or if you would like help and guidance and a few best practices in delivering higher value outcomes in your existing cloud program, then please contact us.

Hashmap offers a range of enablement workshops and assessment services, cloud modernization and migration services, and consulting service packages as part of our Cloud (and Snowflake) service offerings.

How does Snowflake compare to other data warehouses? Our technical experts have implemented over 250 cloud/data projects in the last 3 years and conducted unbiased, detailed analyses across 34 business and technical dimensions, ranking each cloud data warehouse.

Other Tools and Content You Might Like

Feel free to share on other channels and be sure and keep up with all new content from Hashmap here. To listen in on a casual conversation about all things data engineering and the cloud, check out Hashmap’s Data Rebels on Tap podcast as well on Spotify, Apple, and other popular apps.

Venkat Sekar is Regional Director for Hashmap Canada and is an architect and consultant providing Data, Cloud, IoT, and AI/ML solutions and expertise across industries with a group of innovative technologists and domain experts accelerating high-value business outcomes for our customers.

--

--