Bringing it all Together: OCI Streaming, NoSQL, Object Storage, and GoldenGate on Oracle Cloud Infrastructure

Shadab Mohammad
Oracle Developers
Published in
11 min readMay 25, 2023

Written by Shadab Mohammad, Principal Cloud Solutions Architect@Oracle

Photo by Chang Duong on Unsplash

Introduction

In a time where we’re practically swimming in data, it’s no surprise that being able to instantly access, store, and make sense of it all has become critical. Really, the importance of swiftly handling and understanding data can’t be stressed enough. Tackling the intricate details of these tasks calls for sturdy, trustworthy, and adaptable tools — particularly because these fast-paced data insights are steering more and more business decisions. Oracle Cloud Infrastructure (OCI), with its thriving family of data management services, brings strong abilities to the table for real-time data crunching and analysis.

As a Senior Consulting Architect, I’ve seen up close over the past 3 years how Oracle Cloud Infrastructure (OCI) has reshaped the data scene across a wide range of sectors for many of our clients. OCI, with its strong lineup of data management and processing resources, provides a dependable platform to tap into the strength of real-time data. A standout part of this lineup is Oracle GoldenGate for Big Data, a well-rounded, adaptable solution geared up to take on the challenge of real-time data integration and replication.

In this blog, we’ll demonstrate how you can use OCI GoldenGate to do real-time data streaming from OCI Streaming Service to the OCI NoSQL Database and Oracle Object Storage.

Pre-requisites

Ensure you have access to the below before getting started :

  1. An active Oracle Cloud Infrastructure (OCI) account.
  2. Access to OCI Streaming Service.
  3. Access to OCI NoSQL Database.
  4. Access to OCI Object Storage.
  5. Access to OCI GoldenGate for Big Data.
  6. Familiarity with Kafka protocols.

High-Level Architecture

Core components of our architecture are OCI Streaming Service, OCI NoSQL Database, OCI Object Storage, and OCI GoldenGate for Big Data.

  1. OCI Streaming Service generates real-time data streams.
  2. OCI GoldenGate for Big Data serves as the conduit for this data, facilitating its streaming to the NoSQL Database and Object Storage.
  3. OCI NoSQL Database takes in the streamed data for real-time data querying.
  4. OCI Object Storage offers durable, secure, scalable, and highly available storage for the streamed data.

For the diagram, visualize OCI GoldenGate as the central hub connecting the OCI Streaming Service, NoSQL Database, and Object Storage. Arrows indicate the data flow from the Streaming Service, through OCI GoldenGate, and then to both the NoSQL Database and Object Storage.

Step 1: Setup API Keys for your OCI User Account

Access your OCI console by entering your credentials. Make sure you have the necessary permissions & privileges to create and manage OCI Goldengate, NoSQL and Object Storage in your compartment.

Refer this page on how to create API key for your user and proceed like below:

1.1 Navigate to the User Settings

1.2. Navigate to API Keys Section

1.3. Generate an API Key Pair

There are two options to generate an API Key Pair:

  • Generate an API Key Pair in the Console. Click on “Add API Key”. Choose “Generate API Key Pair”. This will create a new pair of keys for you. Please download the private key and ensure to keep it secure.
  • Generate an API Key Pair manually using the openssl command in your local environment and then upload the public key to the console.

1.4. Add the Public Key to OCI

In the “Add Public Key” dialog box, paste your public key value (if generated manually), or it will be automatically populated (if generated in the console). Click on “Add” to finalize the process.

1.5. Save the Private Key Securely

If you’ve made the key pair through the console, it’s really important that you download and stash away the private key — that’s the .pem file — somewhere safe.

Note down the key’s fingerprint; it will be needed for future configurations.

By following these steps, you will have successfully set up API keys for your OCI user account.

Step 2: Setting Up OCI Streaming Service

Initiate the process by setting up the OCI Streaming Service. Here, you will create a stream and configure the stream pool.

2.1. Access the Oracle Cloud Infrastructure console.

2.2. Navigate to Analytics & AI > Messaging > Streaming

2.3. Create a new Stream called ‘demo’ with stream pool

Goto OCI Console > OCI Streaming > Stream Pools > Kafka Connection Settings

Note the SASL string which will have the username we will use later for creating OCI Goldengate connection to OCI Streaming.

Create an Authentication token for your OCI User which will be used as password in OCI GG Connection.

Refer this page on how to create an Auth token in OCI.

2.4 Use the below sample producer.py script to write to your OCI Stream using the API key created in Step 1 using the OCI Streaming message endpoint and OCID from above.

Clone this code and copy it to a machine which has python3 installed and your OCI CLI credentials are added to ~/.oci/config file from Step 1

import oci
import json
from base64 import b64encode
import random
from datetime import datetime, timedelta
import time
import threading
import queue

ociMessageEndpoint = "<your OCI Streaming endpoint>"
ociStreamOcid = "<your OCI Streaming OCID>"
ociConfigFilePath = "/home/opc/.oci/config"
ociProfileName = "DEFAULT"

def produce_messages(client, stream_id, input_json):
key = input_json["sensor_id"]
value = json.dumps(input_json)

encoded_key = b64encode(key.encode()).decode()
encoded_value = b64encode(value.encode()).decode()

message = oci.streaming.models.PutMessagesDetailsEntry(key=encoded_key, value=encoded_value)

print("Publishing message to the stream {} ".format(stream_id))
messages = oci.streaming.models.PutMessagesDetails(messages=[message])
put_message_result = client.put_messages(stream_id, messages)

for entry in put_message_result.data.entries:
if entry.error:
print("Error ({}) : {}".format(entry.error, entry.error_message))
else:
print("Published message to partition {} , offset {}".format(entry.partition, entry.offset))

# Producer function to generate fake sensor data and put it in the queue
def generate_sensor_data(sensor_data_queue):
while True:
sensor_id = "s{:03d}".format(random.randint(1, 100))
temperature = round(random.uniform(60, 100), 2)
humidity = round(random.uniform(30, 70), 2)
timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")

record = {
"sensor_id": sensor_id,
"temperature": temperature,
"humidity": humidity,
"timestamp": timestamp
}

sensor_data_queue.put(record)
time.sleep(1) # Adjust the sleep duration to control the rate of data generation

# Consumer function to read sensor data from the queue and publish it to OCI Stream
def publish_to_oci_stream(sensor_data_queue):
config = oci.config.from_file(ociConfigFilePath, ociProfileName)
stream_client = oci.streaming.StreamClient(config, service_endpoint=ociMessageEndpoint)

while True:
record = sensor_data_queue.get()
produce_messages(stream_client, ociStreamOcid, record)

def main():
sensor_data_queue = queue.Queue()

# Create and start producer and consumer threads
producer_thread = threading.Thread(target=generate_sensor_data, args=(sensor_data_queue,))
consumer_thread = threading.Thread(target=publish_to_oci_stream, args=(sensor_data_queue,))

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

if __name__ == "__main__":
main()

When you run this code. it will generate base64 encoded JSON message and write to the OCI Stream demo

python3 streaming_producer.py

Sample Key-Value Pair

Key : s056

Value:

{"sensor_id": "s056",
"temperature": 78.0,
"humidity": 44.89,
"timestamp": "2023-05-25T03:05:30Z"}

Step 3: Configuring OCI GoldenGate for Big Data

Next, set up OCI Goldengate for Big Data. This step is vital for integrating the different components and enabling smooth data flow.

3.1. Access the OCI GoldenGate console > Oracle Database > Goldengate

3.2. Navigate to Deployments and Create a Deployment for BigData

3.3. Create a Connection for OCI Streaming and assign it to the GG Big Data deployment

From OCI Console > Goldengate > Connections

Important Note : Username is the one we got earlier from OCI Console > OCI Streaming > Stream Pools > Kafka Connection Settings and password is the Auth token

Assign the connection to the deployment for Big Data

3.4. Create a Connection for OCI Object Storage and assign it to the GG Big Data deployment.

Important Note : Use the private key file created in Step 1 and use the public fingerprint from the API key displayed in OCI Console > User Settings > API Key

Assign Connection to OCI GG Big Data Deployment

3.5. Create a Connection for OCI NoSQL Database and assign it to the GG Big Data deployment.

Use the API keys created in Step. 1

Step 4: Connecting the Components

The final step involves connecting OCI Streaming Service, NoSQL Database, and Object Storage through OCI GoldenGate. This step sets the data highway in motion

4.1. Create a New credential for Kafka

As per Oracle’s official documentation follow these steps and Add a credential:

  1. Open the navigation menu, and then click Configuration.
  2. On the Credentials page, click Add Credential (plus icon).
  3. Enter the following details in the fields provided, and then click Submit:
  • For Credential Domain, enter OracleGoldenGate
  • For Credential Alias, enter kafka
  • For User ID: enter kafka://
  • For Password, enter a password
  • For Verify Password, enter the password again

4.2. Create an Extract in OCI GoldenGate for OCI Streaming connection and start the Extract to start capturing changes from OCI Streaming

OCI Streaming Extract

Select the Connection created earlier for OCI Streaming

Leave all parameters as default

-- Parameter file for Kafka extract.
EXTRACT STRMICN
EXTTRAIL ss
-- TODO: Create a credential store entry with a userid prefix kafka://
SOURCEDB USERIDALIAS OCIStreams DOMAIN GGS
JVMOPTIONS CLASSPATH $OGG_HOME/ggjava/ggjava.jar:$THIRD_PARTY_DIR/kafka/*
-- Connection alias: OCIStreams
TRANLOGOPTIONS _CONNECTIONID ocid1.goldengateconnection.oc1.ap-seoul-1.amaaaaaap77apcqaovc474df4pftfakeurzrs4oakxl77utvf2mqey6j5cvq
TABLE source.*;

4.3. Create a Replicat to the OCI NoSQL Database and Object Storage and start the Replicat to apply the changes.

NoSQL Replicat

Parameters :

REPLICAT nosql
MAP source.demo, TARGET source.sensor_data;

Properties file:

# Properties file for Replicat nosql
# NoSQL Handler Template
gg.handlerlist=nosql
gg.handler.nosql.type=nosql
gg.handler.nosql.connectionId=ocid1.goldengateconnection.oc1.ap-seoul-1.amaaaaaap77apcqadb7vzzgykj2twstqzn3yfmhxd57r3jnns3s54pgwahaq
gg.handler.nosql.ddlHandling=CREATE,ADD,DROP
gg.handler.nosql.compartmentID=ocid1.compartment.oc1..aaaaaaaak72343qs5qs7wvjvjxlqzfbahtdi2mritj3fwostqqbocvfnk7za
gg.handler.nosql.interactiveMode=true
gg.handler.nosql.storageGb=10
gg.handler.nosql.readUnits=50
gg.handler.nosql.writeUnits=50
gg.handler.nosql.mode=op
gg.classpath=$THIRD_PARTY_DIR/nosqlsdk/*
jvm.bootoptions=-Xmx512m -Xms32m

Object Storage Replicat

Properties file:

# Properties file for Replicat OSSRP
gg.target=oci
# OCI Event Handler Template
gg.eventhandler.oci.connectionId=ocid1.goldengateconnection.oc1.ap-seoul-1.amaaaaaap77apcqaxq4imus7s2l2yrq4fuprl56j7wvyopsoqrqqcejnmxja
#TODO: Edit the OCI compartment OCID
gg.eventhandler.oci.compartmentID=ocid1.compartment.oc1..aaaaaaaak72343qs5qs7wvjvjxlqzfbahtdi2mritj3fwostqqbocvfnk7za
# TODO: Edit the OCI bucket name
gg.eventhandler.oci.bucketMappingTemplate=OBJ_${fullyQualifiedTableName}
gg.classpath=$THIRD_PARTY_DIR/oci/*
# Change file format on OSS
gg.handler.oci.format=delimitedtext
gg.handler.oci.format.fieldDelimiter=CDATA[;]

4.4. Monitor the system to ensure data is flowing properly between all components.

Check the 1 x Extracts and 2x Replicats Status from Goldengate console

The green indicates the capture and apply is working normally. Each OCI Stream capture is being written to the NoSQL table and to the Object Storage bucket.

Check NoSQL Table for transactions. You can see a table called sensors_data was automatically created and when you query it you can see records coming to the table.

Finally check the Object Storage bucket. You can see a bucket called OBJ_source_demowas automatically created which has all the data streaming in from OCI Stream demo

By following these steps, you can set up a sophisticated, scalable data management infrastructure that harnesses the power of multiple OCI Data Integration and Data Persistence Technologies.

Business Use Cases

Finance

Real-time Fraud Detection

Financial institutions can use this solution to analyze real-time transactions coming through the OCI Streaming Service. These transactions can be processed and analyzed using advanced algorithms in near real-time in the NoSQL Database. When a potentially fraudulent transaction is detected, an alert can be created and forwarded to a fraud risk analyst. This can significantly reduce the time to detect and prevent fraud, protecting both the institution and its customers.

Retail

Real-time Inventory Management

In a retail environment, maintaining an optimal inventory level is crucial. Retailers can use IoT sensors on shelves and in warehouses to send data to the OCI Streaming Service whenever a product’s inventory level changes. This data can then be processed in the NoSQL Database, allowing for real-time inventory tracking and automated reordering of products when inventory levels get too low. Additionally, historical inventory data can be stored in Object Storage for trend analysis and future planning.

Healthcare

Remote Patient Monitoring

Healthcare providers can take advantage of IoT medical devices to send live updates of patient data, like heart rate or blood sugar levels, straight to the OCI Streaming Service. This info can then be worked through and examined in the NoSQL Database, which lets healthcare pros keep a real-time watch on a patient’s health and spring into action fast if there’s a sudden crucial change in their health status. Plus, patient data over the long haul can be safely tucked away in Object Storage. This makes it easy to spot trends and map out long-term care plans.

Industrial Manufacturing

Predictive Maintenance

In an industrial manufacturing setting, IoT sensors installed on machinery can continuously send data regarding machine operations to the OCI Streaming Service. This data can be processed in real-time in the NoSQL Database to monitor machine health and predict potential failures based on abnormal readings. This predictive maintenance approach can prevent unexpected downtime and costly repairs. Additionally, the machinery data can be stored long-term in Object Storage, providing valuable insights for improving machinery lifespan and factory efficiency.

These are just a few examples of how this OCI based solution can be utilized across industries. The possibilities are truly vast when you can process and analyze data in real-time.

Conclusion

Oracle Cloud Infrastructure offers us a well-rounded toolkit for handling data on the fly. By weaving together the OCI Streaming Service, NoSQL Database, Object Storage, and OCI GoldenGate, we’re in a great position to make a really effective, adaptable system for live-streaming data and storing it solidly on OCI.

Our digital landscape is always shifting, which means we need to keep updating our strategies for managing data. So, stay tuned, we’re still out there exploring new frontiers in data management on Oracle Cloud Infrastructure.

--

--

Shadab Mohammad
Oracle Developers

Cloud Solutions Architect@Oracle (The statements and opinions expressed here are my own & do not necessarily represent those of my employer)