OCI Streaming — create producer/consumer use case using ATP, OIC and Kafka connect
In this article I will explain to how use Oracle Cloud Infrastructure (OCI) streaming service to create integration between systems. I will create a stream in OCI streaming to ingest data from an Autonomous Transaction Processing (ATP) table (when new record inserted to the table), and Oracle Integration Cloud (OIC) pool the stream and consume the message. I also used Oracle Kubernetes Engine (OKE) and Oracle Container Registry (OCIR) to deploy Kafka connect. this is very simple use case, and you can extend this solution to several real word use cases.
In this architecture, I have deployed all services in private network (you can use ATP and Streaming instances with public access but this is more secure architecture. this is completely dependent to your use case).
When you insert new record in database table (ATP database in our case), Kafka source connector will pick the record and put it in OCI stream. then OIC pool integration will consume the message in OCI stream (using OIC connectivity agent. OIC streaming adapter pooling function only support using OIC connectivity agent).
To implement this architecture, we should create following services and deployments in OCI tenancy:
- Create compartments
- Provision networking (subnets for DB, Stream, connectivity agent and Oracle Kubernetes cluster and all firewall rules between them).
- Create IAM policies and groups.
- Provision autonomous transaction processing DB with private access only.
- Provision OCI streaming pool with private access only.
- Create Kafka Connect Configuration
- Provision Oracle Kubernetes cluster.
- Provision OIC instance.
- Provision compute instance and execute OIC connectivity agent.
- Build docker container image to ingest records to OCI stream (pool from ATP table) and push the container in OCI container registry.
- Create deployment in OKE using container created in last step.
- build integration in OIC to pool OCI stream.
- test end to end solution.
I am not going to explain in detail all above steps in this article, I will only discuss about important notes and specific configurations that I have implemented, in this article (same as deployment in OKE, Stream configuration and OIC integration). you can find rest of information in OCI documentations.
let’s start building this use case.
I assume you already have a OCI tenancy and create your compartments and network infrastructure (according to architecture). here is simple networking architecture that I am using for this use case.
- db-private-subnet hosts ATP database.
- app-private-subnet hosts OIC connectivity agent and OCI stream.
- nodes-private-subnet hosts OKE worker nodes.
- lb-public-subnet hosts public load balancer (you don’t need it to implement this architecture).
- mgmt-subnet hosts public instances same as Bastion host to access services in private network (you can use OCI Bastion service instead).
just keep in mind, you need to have security list or network security group (NSG) rules to allow OKE worker nodes access to ATP DB and OCI stream and, OIC connectivity agent access to OCI stream.
For example, you need following rule in ATP NSG to allow application subnet to access the DB on port 1522.
and, following rules in Stream NSG to allow OKE worker nodes and OIC connectivity agent to access the stream on port 9020.
Provision ATP database
Provision ATP instance with private endpoint inside db-private-subnet (only accessible inside VCN).
Connect to the database using SQL Developer (you can use port forwarding over Bastion host to access database in private subnet) then execute following SQL command to create DB and table.
CREATE USER StreamingDemo IDENTIFIED BY "xxxxxxx";GRANT CONNECT, RESOURCE TO StreamingDemo;GRANT UNLIMITED TABLESPACE TO StreamingDemo;CREATE TABLE Users(ID NUMBER(10,0) GENERATED BY DEFAULT ON NULL AS IDENTITY,USERNAME VARCHAR2(50) NOT NULL,FIRST_NAME VARCHAR2(50) NOT NULL,MIDDLE_NAME VARCHAR2(50),LAST_NAME VARCHAR2(50) NOT NULL,AGE NUMBER(5,0) DEFAULT 0 NOT NULL,CREATED_ON TIMESTAMP(9) NOT NULL,CONSTRAINT TEST_PK PRIMARY KEY(ID)ENABLE);
You can use any username and any table structure that you like (but you need to change other configuration accordingly later).
Provision OCI Streaming pool
Navigate to Analytics & AI and select Streaming under Messaging tab in OCI console. you first need to create a Stream Pool with private connectivity.
After entering name and select VCN, subnet and NSG for stream pool, select “Show Advanced Option” and check “Auto create topics” option for this pool. Using this option, stream instance will automatically create in the pool after you create Kafka source connect. you can update number of partitions and retention period too.
After provision the stream pool you to capture Kafka connection setting for this pool. in stream pool details page select “Kafka Connection Setting” option in resources tab and capture Bootstrap servers, username in SASL Connection settings, security protocol and security mechanism.
also, you need to generate authentication token for your user and use it as Kafka Connection password. to do that, navigate to user setting in OCI console and select Auth Tokens and generate new token and capture the token.
Create Kafka Connect Configuration
OCI streaming is compatible with most Kafka APIs, allowing you to use applications written for Kafka to send messages to and receive messages from the Streaming service without having to rewrite your code.
Streaming can also utilize the Kafka Connect ecosystem to interface directly with external sources like database, object storage, or any microservice on the Oracle Cloud. Kafka connectors can easily and automatically create, publish to, and deliver topics while taking advantage of the Streaming service’s high throughput and durability.
Kafka connect allows you to build connectors to integrate Apache Kafka compatible streaming services with other apps and data systems (in our case ATP database). we have two types of Kafka connect: Source and Sink.
To use your Kafka connectors with OCI Streaming, you need to create Kafka Connect Configuration. To do that, navigate to Streaming in OCI console and select Kafka Connect Configurations in menu bar. then select “Create Kafka Connect Configuration”. these three compacted topics are meant to be used by Kafka Connect and Streaming to store configuration and state management data and should not be used to store your data.
Provision OKE cluster
In this part, you need to provision a Kubernetes cluster in OCI. you can use Oracle managed Kubernetes engine (OKE) to provision your cluster. I assume you already provisioned and configured your OKE cluster.
Build docker container
Before build the docker image, you need to download dependencies first.
- Download JDBC driver and extract in libs folder (https://www.oracle.com/database/technologies/appdev/jdbc-downloads.html)
- Download Kafka connect JDBC connector and extract in kafka-connect-jdbc folder (https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc)
- Download ATP Wallet file and extract in wallet folder
Create Kafka source connect properties file (connect-distributed.propertes). Use information that you captured during provision OCI Streaming pool and Kafka connect configuration to update this properties file.
bootstrap.servers=<OCI Streaming Pool FQDN>:9092
security.protocol=SASL_SSLsasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<SASL Connection Username>" password="<User Auth Token>";producer.sasl.mechanism=PLAIN
producer.security.protocol=SASL_SSLproducer.sasl.jaas.config= org.apache.kafka.common.security.plain.PlainLoginModule required username="<SASL Connection Username>" password="<User Auth Token>";consumer.sasl.mechanism=PLAIN
consumer.security.protocol=SASL_SSLconsumer.sasl.jaas.config= org.apache.kafka.common.security.plain.PlainLoginModule required username="<SASL Connection Username>" password="<User Auth Token>";config.storage.replication.factor=1
config.storage.topic=<Kafka Connect Configuration Config>status.storage.replication.factor=1
status.storage.topic=<Kafka Connect Configuration Status>offset.storage.replication.factor=1
offset.storage.topic=<Kafka Connect Configuration Offset>offset.flush.interval.ms=10000
I am using debezium base image to build docker file (there are several options to build your image and you can choose different methods if you like).
RUN mkdir /wallet
COPY libs/* /kafka/libs/
COPY kafka-connect-jdbc/lib/* /kafka/libs/
COPY wallet/* /wallet/
COPY connect-distributed.propertes /kafka/config/connect-distributed.properties
Then, use following commands to builder docker image (using docker build)
docker build -t atp-kafka-source-connect:v1 .
You can test your docker image locally by create a container using following command in your local machine (just keep in mind to do this test, you need to have connectivity from your local machine to the streaming pool on port 9020 that you can achieve this by using port forwarding).
(**Note: before executing this command, update Kafka Streaming pool endpoint and Kafka connect configuration OCID).
CONNECT_CONFIG_ID=<OCID of Kafka connect configuration>CONFIG_STORAGE_TOPIC=$CONNECT_CONFIG_ID-config
STATUS_STORAGE_TOPIC=$CONNECT_CONFIG_ID-statusdocker run -it --rm --name atp-kafta-source-connect -p 8083:8083 \
-e GROUP_ID=atp-kafta-source-connect-group \
-e BOOTSTRAP_SERVERS=<Kafka Streaming pool FQDN>:9092 \
-e CONFIG_STORAGE_TOPIC=$CONFIG_STORAGE_TOPIC \
-e OFFSET_STORAGE_TOPIC=$OFFSET_STORAGE_TOPIC \
-e STATUS_STORAGE_TOPIC=$STATUS_STORAGE_TOPIC \
-v `pwd -P`/atp-kafka-source-connect.properties:/kafka/config/connect-distributed.properties \
I am not going to test the image from my local machine, and I will deploy and test the image inside Kubernetes cluster later.
Push the docker image to OCIR repository (first create oss-kafka connector private repository in OCIR)
In next part, I am going to push the image to OCI container repository service (OCIR). You can use following commands to login to OCIR, tag the image and push image to the OCIR (update OCIR username, password, region and tenancy accordingly before running the script).
docker login mel.ocir.io
Username: <OCIR User - OCI username>
Password: <Auth token>docker tag atp-kafka-source-connect:v1 <Region>.ocir.io/<Tenancy>/oss-kafka-connect:v1docker push <Region>.ocir.io/<Tenancy>/oss-kafka-connect:v1
In this step, I am going configure Kubernetes cluster. first you need to configure your local environment to access to Kubernetes cluster (by create Kubeconfig in your local environment).
Create Kubernetes namespace for Kafka connect deployment and Kubernetes secret to OCIR from Kubernetes cluster using following commands:
kubectl create namespace oss-kafka-connectkubectl create secret docker-registry ocir-secret --docker-server=<Region>.ocir.io --docker-username='<Tenancy>/<OCIR Username>' --docker-password='<Auth Token>' --docker-email='<Email Address' -n oss-kafka-connect
Create following configMap in Kubernetes (this configMap contains Kafka connect properties). update config map with Kafka connect configuration and OCI streaming data accordingly before moving to next step.
Execute following command to create configMap:
kubectl apply -f atp-kafka-source-connect-configmap.yaml -n oss-kafka-connect
Create Kubernetes deployment and service (this service create public load balancer that listen on port 80 and forward requests to Kubernetes deployment on port 8083). update deployment with Kafka connect configuration and OCI streaming data accordingly before move to next step.
Execute following command to create deployment in Kubernetes cluster:
kubectl apply -f atp-kafka-source-connect-deployment.yaml -n oss-kafka-connect
After deploying the Kafka connect in Kubernetes cluster, you should create connector configuration for your database. you can use Kafka connect REST APIs to create configuration in Kafka connect instance. you need to create following json file (update ATP connection name, database password, topic prefix and list of tables you want to whitelist) and use following REST API to apply config to the Kafka connector.
curl -iX POST -H "Accept:application/json" -H "Content-Type:application/json" -d @connector-config.json http://<Kubernetes deployment load balancer public IP>/connectors
Provision OIC instance
In this step you need to provision a OIC instance in your tenancy. to do that, navigate to Developer Services in OCI console and select Integration menu under Application Integration tab. then you can create new integration instance in selected compartment.
Deploy OIC connectivity agent
After provision the OIC instance, you need to provision a compute instance in one of private subnets (for example app-private-subnet). then you should connect to OIC instance service console (Select “Service Console” from OIC instance details page), navigate to Agent’s page under integrations menu item.
Create an agent group:
Download connectivity agent zip file and transfer the file to the compute in stance.
Connect to compute instance and unzip connectivity agent file. update InstaleProfile.cfg file and update following information:
# Required Parameters
# oic_URL format should be https://hostname:sslPort
oic_URL=https://<OIC Instance URL>:<OIC Instance Port>
agent_GROUP_IDENTIFIER=<Agent Group ID>oic_USER=<Username>
finally run connectivity agent using following command:
java -jar connectivityagent.jar &
To create connection to OCI Streaming service, you need to create a keyStore that contains OCI Streaming certificate using following commands.
echo -n | openssl s_client -connect <OCI Streaming pool endpoint>:9092 | sed -ne '/-BEGIN CERTIFICATE-/,/-END CERTIFICATE-/p' > app-streaming.certkeytool -keystore app_streaming_truststore.jks -alias OSSStream -import -file app-streaming.cert
Create OIC integration to pool messages from OCI stream
First you need to create a connection using OCI streaming adapter. to do that, navigate to Connections under integrations menu in OIC console and create new connection using OCI Streaming adapter.
After creating the connection, update following connection properties using Kafka Connection Settings
Bootstrap Servers: <OCI Streaming pool endpoint>:9020
SASL Username: SASL Connections username
SASL Password: <Auth Token>
TrustStore: <Upload KeyStore created in previous steps>
TrustStore Password: <KeyStore password if exists>
Agent Group: Select Agent group that created in previous steps
Finally Test and Save the connection.
Last step, you need to create App driven integration in OIC to pool messages from OCI Stream using connection created previous step.
Add connection to integration start
Select the stream, partition and consumer group.
Finally select message JSON structure:
Save and activate integration.
You can test this use case by insert new record in the table in ATP database.
If you have all configuration correctly you can integration will trigger in OIC
This was simple use case how to use OCI streaming to ingest and consume messages. you can extend this use case to several scenarios same as ingest IOT sensor data to the OCI streaming and use integration instance in OIC to pool the stream and enrich sensor data.