Data Sync to Snowflake Using Confluent Kafka Connect: Part 1

Moving On-Prem Oracle Databases to Snowflake in Azure with Kafka Connect

Venkat Sekar
Hashmap, an NTT DATA Company
12 min readApr 20, 2020

--

In my most recent engagement, I was tasked with data synchronization between an on-premise Oracle database with Snowflake using Confluent Kafka. While there are many blogs that cover this topic, they don’t provide scenarios that deviate from the normal happy path and demonstrate how to overcome the deviations.

In this blog post, I will go over some of the hurdles we observed during the engagement and explain how we solved them.

The Scenario

The customer had an on-premise Oracle database. They wanted to synchronize the data into Snowflake in Azure and also wanted the ability to use the same data transport channel for future uses. They chose Confluent Kafka as the candidate for transferring the data from on-premise to cloud.

Here are some of the requirements that were mandated as part of the solution:

  • Develop a solution that could be adopted for multiple instances of Oracle databases.
  • Synchronization into Snowflake does not need to be real-time.
  • Adopt tools/libraries that are supplied by Confluent or the Confluent community before considering third-party tools.
  • Define a solution that is economical; preferably open-source and free.

The client wanted to adopt a pattern similar to this one below:

Source: Pipeline to the Cloud — Streaming On-Premises Data for Cloud Analytics

My Initial Thoughts Were…

The solution to this data synchronization did not seem complicated and appeared to be achievable. There are a number of implementations for this scenario as described in these articles:

The Hiccups

At first glance these solutions seemed workable, however, they did not end up suitable for the current client situation because of the following reasons:

1) Adoption Cycle

In the adoption cycle, we were in the early stages of Snowflake adoption. We still had to prove a use case and get the buy-in from the business for a complete adoption. For this reason, we wanted to limit and control the tech stack.

2) Timelines

There is a plethora of integration software that can achieve the solution we want. We did not have the necessary cycles, times, or resources in this project to evaluate new software vendors.

3) Enterprise Practices

Even if we had chosen a specific vendor (e.g. Attunity, Fivetran, Matillion, etc.), the timeline for getting contracts in place, negotiating the subscription/license, going through a security assessment, etc. was significant and didn’t fit the project timeline.

The Tactical Approach

So far we had Confluent and Snowflake already spun up, contracted for, and ready to go. We wanted to get started with what we already had, so we decided to develop our solution, even if it happened to be tactical, with the following stack:

  • Confluent JDBC connector: To pull data from the on-premise Oracle database.
  • Confluent Connect: The framework to connect to Kafka with external systems, including databases. It provided a solution to use community connectors and slash development cycles.
  • Snowflake Connector for Kafka: To push data into Snowflake.

The following diagram depicts the approach we wanted to implement:

Source: Pipeline to the Cloud — Streaming On-Premises Data for Cloud Analytics

In the following set of topics, I will highlight some of the hurdles that we faced and discuss how we solved these. Though some of the solutions are tactical in nature, it gave us an edge to proceed with achieving our goals in the defined timeframe.

NOTE: I am using local Confluent and Oracle instances with Docker containers to capture some configurations to better demonstrate the process. Some changes may need to be made to suit your needs.

Data Acquisition

“Data-Acquisition” is the process and the steps involved in extracting data from the source systems, (e.g., on-premise database).

We started off quickly analyzing some tools to understand the limitations and functionalities of data acquisition. We were hosted in Azure so we needed to adopt a solution that could be either Azure specific or generic across various cloud providers.

Debezium

We took a look at Debezium which is an open-source distributed platform for change data capture. It is a well-known solution in the open-source community providing the ability to capture the data changes and publish it into Kafka. It achieves this solution using Log capture thus avoiding any strain on the database.

Here is a wonderful blog on the advantages of adopting a Log-based capture vs. traditional data capture, via query.

Okay, so didn’t we use this approach? Well, as of this writing in March 2020:

  • The Oracle connector is incubating; meaning the published message format could change in future iterations.
  • The Oracle connector also requires a Golden Gate license, which means cost. We also currently did not have any Golden Gate implementations across our team.

Kafka JDBC Connector

Our choice was to use the de-facto Kafka JDBC source connector.

The Kafka JDBC connector offers a polling-based solution, whereby the database is queried at regular intervals. The data retrieved can be in bulk mode or incremental updates.

Connecting to the Database

While it sounds simple, you might face connection or configuration issues. Do not worry — here is a wonderful article that aids in solving some issues:

Blog: Kafka Connect Deep Dive — JDBC Source Connector

I recommend reading the section “Checking that the JDBC driver has been loaded” to confirm that the Oracle driver is loaded.

Confluent Kafka Connect Configuration

Here is the connect configuration which helps us to get a bulk data load of the table into Kafka:

name=orcl-sourceconnector.class=io.confluent.connect.jdbc.JdbcSourceConnectortasks.max=1connection.url=jdbc:oracle:thin:@localhost:49161:xeconnection.user=books_adminconnection.password=passworddialect.name=OracleDatabaseDialectschema.pattern=APEX_040000table.whitelist=TEST_DEMO_EMP# mode of operationmode=bulk#convertor definitionstopic.prefix=tpc-avro-key.converter=org.apache.kafka.connect.json.JsonConverterheader.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=io.confluent.connect.avro.AvroConvertervalue.converter.schema.registry.url=http://localhost:8081#To handle data type conversionnumeric.mapping=best_fittransforms=Casttransforms.Cast.type=org.apache.kafka.connect.transforms.Cast$Valuetransforms.Cast.spec=TESTSAL:int32

Data Type Conversion

When the data gets pulled and transformed into a Kafka message, you typically would not have any issue with a VARCHAR/String datatype in the source table. However, for numeric /timestamp data types you would need to handle some potential conversion.

Below is the connector configuration where we defined the value as JSON:

key.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=true

Below is the sample JSON message that gets published in the topic:

{

“schema”: {

“type”: “struct”,

“fields”: [

{ “type”: “int32”,”optional”: true, “field”: “EMPNO”},

{ “type”: “string”,”optional”: true,”field”: “ENAME”},

{ “type”: “string”,”optional”: true,”field”: “JOB”},

{ “type”: “int64”, “optional”: true, “field”: “MGR”},

{ “type”: “int64”, “optional”: true, “name”: “org.apache.kafka.connect.data.Timestamp”,”version”: 1, “field”: “HIREDATE” },

{ “type”: “double”, “optional”: true, “field”: “SAL” },

{ “type”: “double”, “optional”: true, “field”: “COMM” },

{ “type”: “int32”, “optional”: true, “field”: “DEPTNO”},

{ “type”: “string”, “optional”: true, “field”: “WORK_CITY”},

{ “type”: “string”, “optional”: true, “field”: “WORK_COUNTRY”},

{ “type”: “bytes”, “optional”: true, “name”: “org.apache.kafka.connect.data.Decimal”, “version”: 1,

“parameters”: { “scale”: “127”},

“field”: “TESTSAL”

} ],

“optional”: false, “name”: “TEST_DEMO_EMP” },

“payload”: {

“EMPNO”: 10000,

“ENAME”: “Anon-1k”,

“JOB”: “Analyst”,

“MGR”: 9047,

“HIREDATE”: 1085270400000,

“SAL”: 1300.0,

“COMM”: null,

“DEPTNO”: 40,

“WORK_CITY”: “Tokyo”,

“WORK_COUNTRY”: “Japan”,

“TESTSAL”: “AeS7O40u/lY+QxzguxN0y/x/PcCXu7UV4y6aX2i/jS1riYg2bNzSAAAAAAAAAAAAAAAAAAAAAA==”

}

}

For the JSON value-based:

  • The message contains both the payload (data) and also the schema.
  • The schema element reflects the column data types.
  • The column ‘TESTSAL’; looks weird even though its a decimal why?

Hurdle: Handling Numeric Data Types

To start off, here is the table DDL:

CREATE TABLE “APEX_040000”.”TEST_DEMO_EMP”

( “EMPNO” NUMBER(8,0),

“ENAME” VARCHAR2(30),

“JOB” VARCHAR2(30),

“MGR” NUMBER(14,0),

“HIREDATE” DATE,

“SAL” NUMBER(9,2),

“COMM” NUMBER(9,2),

“DEPTNO” NUMBER(5,0),

“WORK_CITY” VARCHAR2(100),

“WORK_COUNTRY” VARCHAR2(100),

“TESTSAL” NUMBER

);

When the numeric columns, e.g.: MGR, are defined with precision,e.g.: NUMBER(14,0), the converter is able to handle the conversion. So in the above JSON output, you would see the value “ MGR: 9047”.

When the numeric columns, e.g.: TESTSAL, are defined without precision, e.g.: NUMBER, the convertor converts this into a BYTE array and outputs this into the JSON. Hence the value is: “AeS7O40u/lY+QxzguxN0y/x/PcCXu7UV4y6aX2i/jS1riYg2bNzSAAAAAAAAAAAAAAAAAAAAAA==”

How this can be handled is explained in the section: Bytes, Decimals, Numerics of the previously mentioned blog post. I would recommend reading this section as well.

In our case, we do not have the option of updating the original table in the database. We did not like the approach of defining a query in the database, which had the appropriate precision definition, nor having the connector use this query.

We overcame this conversion by adopting the Single Message Transforms (SMT).

Single Message Transforms in Kafka Connect

Blog: How to Use Single Message Transforms in Kafka Connect

Below is the SMT defined in the configuration above:

transforms=Casttransforms.Cast.type=org.apache.kafka.connect.transforms.Cast$Valuetransforms.Cast.spec=TESTSAL:int32

Below is the sample JSON output, as you can see the conversion happens:

{“schema”:{“type”:”struct”,”fields”:[{“type”:”int32",”optional”:true,”field”:”EMPNO”},{“type”:”string”,”optional”:true,”field”:”ENAME”},{“type”:”string”,”optional”:true,”field”:”JOB”},{“type”:”int64",”optional”:true,”field”:”MGR”},{“type”:”int64",”optional”:true,”name”:”org.apache.kafka.connect.data.Timestamp”,”version”:1,”field”:”HIREDATE”},{“type”:”double”,”optional”:true,”field”:”SAL”},{“type”:”double”,”optional”:true,”field”:”COMM”},{“type”:”int32",”optional”:true,”field”:”DEPTNO”},{“type”:”string”,”optional”:true,”field”:”WORK_CITY”},{“type”:”string”,”optional”:true,”field”:”WORK_COUNTRY”},{“type”:”int32",”optional”:true,”field”:”TESTSAL”}],”optional”:false,”name”:”TEST_DEMO_EMP”},”payload”:{“EMPNO”:10000,”ENAME”:”Anon-1k”,”JOB”:”Analyst”,”MGR”:9047,”HIREDATE”:1085270400000,”SAL”:1300.0,”COMM”:null,”DEPTNO”:40,”WORK_CITY”:”Tokyo”,”WORK_COUNTRY”:”Japan”,”TESTSAL”:2100}}

Avro Convertors

It is generally recommended that the message be in Avro format. Below is the configuration:

key.converter=org.apache.kafka.connect.json.JsonConverterheader.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=io.confluent.connect.avro.AvroConvertervalue.converter.schema.registry.url=http://localhost:8081

key.converter=org.apache.kafka.connect.json.JsonConverter

header.converter=org.apache.kafka.connect.json.JsonConverter

value.converter=io.confluent.connect.avro.AvroConverter

value.converter.schema.registry.url=http://localhost:8081

Key & Header Convertor

The Avro converter does not work for Key & Header; they have to be a JSON convertor.

Handling Numeric data type

The configuration needs to have the SMT defined in case there is a number column in the table without any precision. If it’s not defined, an exception is below:

Output

With the appropriate configuration defined, below is the screenshot of the AVRO message in the topic, via the Confluent Connect GUI:

Message
Schema

Hurdle: Handling Schema Changes

When the underlying table gets altered, for example, I defined a primary key on the sample table EMPNO, made the primary key. Once defined, when the JDBC Source Connector tries to publish the record with the updated schema, it failed as shown below:

This is caused due to the schema versioning compatibility. This is explained in this Blog: Error Registering Avro Schema | Multiple Schemas In One Topic. One way I solved the issue is to set the compatibility to NONE. I understand this is debatable:

curl -X PUT -H “Content-Type: application/json” — data ‘{“compatibility”: “NONE”}’ http://localhost:8081/config/tpc-avro-TEST_DEMO_EMP-value

Point of View on the JDBC Source Connector

As mentioned, the JDBC source connector is simplistic and offers a polling-query based approach to retrieve the data from the tables whitelisted:

  • Unless there are specific columns (flag column or timestamp column) in the source tables, you cannot identify if the record is an update vs insert.
  • Only logical deletes — meaning there is a column indicating a delete can be handled.
  • Table truncates — hard record deletes cannot be determined.
  • Drop off a table and recreating the table by the external process can cause issues with the connector.
  • Between the polling period, if there are multiple updates only the last committed update will be sent over in the topic.

Thus you have to evaluate if the JDBC source connector is the right choice. This does not mean the connector cannot be used at all; here are some situations in which it can be adopted:

  • Truncate and load scenario once a day; this is approachable in the case of a smaller dataset.
  • No need for Snowflake to adopt a Slowly Changing Dimension (SCD) implementation.

Data Ingestion & Materialization

“Data-Ingestion” is the process of loading/staging the data into the target datastore, in this case, Snowflake. “Data-Materialization” is the process of initial conversion of the staged data into a usable format in the target datastore, Snowflake. For example, converting a JSON record into a table format so that the user can readily query the data.

Snowflake Connector for Kafka

Snowflake provides a refined version of the Kafka-Connect plugin, the Snowflake connector for Kafka. Reading up on the various sections of the “Kafka Connector Overview” is recommended; I prefer the section: Workflow for the Kafka Connector for a deeper understanding.

Connect configuration

Below is a sample of the Kafka connect configuration:

name=sflksinkconnector.class=com.snowflake.kafka.connector.SnowflakeSinkConnectortasks.max=1topics=tpc-avro-TEST_DEMO_EMPsnowflake.topic2table.map= tpc-avro-TEST_DEMO_EMP:kfk_TEST_DEMO_EMPbuffer.count.records=10000buffer.flush.time=60buffer.size.bytes=5000000snowflake.url.name=XXXXXX.east-us-2.azure.snowflakecomputing.comsnowflake.user.name=XXXXXXXsnowflake.private.key=MIIFXXXXXXX8w==
snowflake.private.key.passphrase=admin
snowflake.database.name=DSC_RAWsnowflake.schema.name=SOMESCHEMAsnowflake.metadata.offset.and.partition=falsekey.converter=org.apache.kafka.connect.storage.StringConvertervalue.converter=com.snowflake.kafka.connector.records.SnowflakeAvroConvertervalue.converter.schema.registry.url=http://localhost:8081

Note the configurations on the Key & Value converters.

Hurdle: Maven Repo Call

A big surprise that happened to us is that once the Snowflake Connector plugin got instantiated by the connector, the process just hung and was not proceeding to even connect to Kafka.

After much debugging and troubleshooting we found the culprit to be that the Snowflake Connector v1.1 was making a call to the Maven Repo refer source code for Utils.java. This call to Maven Repo was to check if the currently installed version is older than the latest version, if so, then print out the suggestion to upgrade.

You might get affected in your environment if it’s locked down.

Ingested Table in Snowflake

As documented, the connector can create the table in Snowflake or can load into a pre-existing table. Be ready, however, for some slight discomfort in the sense that the resulting table will be more or less like a log stream.

As a new message comes into the underlying table it will result in a new record getting appended into this table. Here is a sample screenshot of what this table would look like in case of an Avro format:

Below is the screenshot should the message in the Kafka topic be injected in a JSON format:

The JSON message contains the schema as part of each message. Whereas in the AVRO, you have the option of having the schema defined in the schema registry, thus the message will contain only the data payload.

Data Materialization

Pain points

What does this mean? Well, unlike a true data synchronization implementation you are responsible for:

  • Parsing the message payload.
  • Handling materialized table creation/alterations on the fly.
  • Applying the data conversion as the records get stored in the materialized table.
  • Upserts/Merge of records in the materialized table.

So, the process could end up developing into a sample solution like the one below:

Limitations

Now, will this solution be suitable for all scenarios? I cannot guarantee that as your situation and mileage will vary. What do I mean by this? Here are a few potential points to think about:

  • The number of records in the source table.
  • Load on the source database to query and extract.
  • Network bandwidth speed for transferring data.
  • Resource on which Confluent Kafka Connect is running. The number of Kafka Connect instances, memory, etc.

So while what I’ve illustrated is workable, you will need to evaluate according to your environment/situations.

Final Thoughts

I hope that describing the scenarios we faced for executing the data synchronization with Snowflake using Confluent helped you. I have also reflected on the set of steps required once data is ingested into Snowflake so that users can start being productive on the dataset and consuming.

In Part 2 of this series, I walk through one solution that was adopted to solve data materialization challenges.

Need Help with Your Cloud Initiatives?

If you are considering the cloud for migrating or modernizing data and analytics products and applications or if you would like help and guidance and a few best practices in delivering higher value outcomes in your existing cloud program, then please contact us.

Hashmap offers a range of enablement workshops and assessment services, cloud modernization and migration services, and consulting service packages as part of our Cloud (and Snowflake) service offerings.

How does Snowflake compare to other data warehouses? Our technical experts have implemented over 250 cloud/data projects in the last 3 years and conducted unbiased, detailed analyses across 34 business and technical dimensions, ranking each cloud data warehouse.

Other Tools and Content You Might Like

Feel free to share on other channels and be sure and keep up with all new content from Hashmap here. To listen in on a casual conversation about all things data engineering and the cloud, check out Hashmap’s Data Rebels on Tap podcast as well on Spotify, Apple, and other popular apps.

Venkat Sekar is Regional Director for Hashmap Canada and is an architect and consultant providing Data, Cloud, IoT, and AI/ML solutions and expertise across industries with a group of innovative technologists and domain experts accelerating high-value business outcomes for our customers.

--

--