Ingest real-time data in OCI OpenSearch using API Gateway, Streaming service and Connector Hub
Introduction
In this article I will explain how to ingest real-time data (like sensor data) to an Oracle Cloud Infrastructure (OCI) OpenSearch managed service.
I will leverage other OCI native services like API Gateway, Streaming, Connector Hub, and OCI Function to implement this solution.
Architecture
Here is high level architecture of the solution.
In this architecture, real-time data (for example, temperature and humidity data from IOT sensors) will send to the deployment in OCI API Gateway. then API Gateway using OSS (OCI Streaming Service) APIs to ingest data in OSS. OCI connector hub will consume messages in OSS and execute OCI function to ingest sensors data to OCI OpenSearch. Finally, you can use OpenSeach dashboard to validate and search data.
Implementation
Basic configuration like networking and identity and access management resources are out of scope of this article.
I presume you have already provisioned a VCN (Virtual Cloud Network) with two private and public subnets (including route tables, security rules, gateways, etc.).
Here is the high level network diagram for this solution (if you want to implement a more secure solution and devices that communicate with Oracle Cloud privately, you can use the private API gateway and implement VPN/FastConnect connection between on-premises network and OCI. This is out of scope for this article).
First, I will provision managed services, including Stream Pool and Stream, API Gateway and OpenSearch instances.
Provision OCI Streaming resources.
Provision public stream pool (because connector hub only supports public stream at the time of this article).
Next, provision a stream instance with only one partition (you can choose more partition according to your load).
Test publish message to the stream using OCI CLI. First create the following sample message payload.
[
{
"key": "MjAwMA==",
"value": "ewog4oCcc2Vuc29yIl9pZDogIDIwMDAsCiDigJx0ZW1wcmF0dXJlIjogMjMuNSwKIOKAnGh1bWlkaXR5OiwgOTIuNywKIOKAnFRpbWVzdGFtcCI6IDE3MDIzNDY5MTA3NTIKfQ=="
}
]
Message key and value should be base64 encoded before publishing in OCI Stream. The actual message value is in the following format.
{
“sensor"_id: 2000,
“temprature": 23.5,
“humidity:, 92.7,
“Timestamp": 1702346910752
}
Also, use the following command to publish a message to the stream.
oci streaming stream message put --stream-id <Stream OCID> --messages file://message.json --endpoint https://cell-1.streaming.ap-sydney-1.oci.oraclecloud.com
{
"data": {
"entries": [
{
"error": null,
"error-message": null,
"offset": 179,
"partition": "0",
"timestamp": "2024-01-10T03:12:08.056000+00:00"
}
],
"failures": 0
}
}
Now you can see the message in OCI Console.
Provision OCI API Gateway
API Gateway is the entry point of this solution. Messages will ingest to OpenSearch by calling API gateway deployment endpoint, so I am going to provision it in this step.
First, I need to provision API gateway instance, then create a deployment inside the gateway to publish messages to OSS.
Here is the API Gateway instance configuration.
As I mentioned, I am going to use OSS Rest APIs to publish messages inside the stream. You can use this document for more information on using OSS Rest APIs.
Keep in mind, before calling OCI Rest APIs you need to sign the request and pass the signing string, plus other required information, as an Authorization header in the request. You can use this document for more information on signing the request.
I have implemented a request signature in the client application I have created to call an API endpoint, and I will explain the code later in this article.
Here is the API gateway deployment that I have provisioned for this solution.
I have configured OCI Streaming Rest API (PutMessage) as the back-end of our API deployment route.
Keep in mind, the OSS Rest URL for publishing messages to the stream is
/20180418/streams/{streamId}/messages
Because of message signatures (Authorization header will pass to OSS Rest API), you should use the same path prefix for API deployment.
Provision OCI OpenSearch
Finally, provision the OCI OpenSearch managed service with one leader node, one data node, and one dashboard node.
Then we need to create mapping for sensor data in OpenSearch instance using the following command.
Mapping json file:
{
"mappings": {
"properties": {
"sensor_id": {"type": "integer"},
"temperature": {"type": "float"},
"humidity": {"type": "float"},
"timestamp": {"type": "double"}
}
}
}
CURL Command:
curl -XPUT https://<OpenSearch Endpoint>:<OpenSearch Port>/sensor -H 'Content-Type: application/json' -k -u osadmin:<OpenSearch Admin credential> -d @opensearch-mapping.json
{"acknowledged":true,"shards_acknowledged":true,"index":"sensor"}
I am using “sensor”” for the index name.
We can use the following command to ingest test data in OpenSearch.
Sensor data:
{"index":{"_index":"sensor","_id":100}}
{"sensor_id":100,"temperature":22.4,"humidity":75.3,"timestamp":181220231023}
CURL Command:
curl -H 'Content-Type: application/x-ndjson' -XPOST "https://<OpenSearch Endpoint>:<OpenSearch Port>/sensor/_bulk?pretty" --data-binary @sensor_data.json -k -u osadmin:<OpenSearch Admin Credential>
{
"took" : 142,
"errors" : false,
"items" : [
{
"index" : {
"_index" : "oci4”,
"_id" : "100",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 0,
"_primary_term" : 1,
"status" : 201
}
}
]
}
Create an OCI Function to write OSS messages to OpenSearch instance.
I have created the following application to host the function.
Create the following configurations in the application.
Finally, create an OCI function in this application to ingest messages from OCI Stream to OpenSearch instance (you can follow OCI documentation to learn how to create OCI Functions).
import io
import json
import logging
import requests
import base64
from fdk import response
# soda_insert uses the Autonomous Database REST API to insert JSON documents
def opensearch_insert(apiEndpoint, username, password, searchIndex, openSearchData):
auth=(username, password)
bulkinserturl = apiEndpoint + '/' + searchIndex + "/_bulk?pretty"
headers = {'Content-Type': 'application/x-ndjson'}
resp = requests.post(bulkinserturl, auth=auth, headers=headers, data=openSearchData)
return resp.json()
def handler(ctx, data: io.BytesIO=None):
logger = logging.getLogger()
logger.info("OpenSearch Ingestion function start")
# Retrieving the Function configuration values
try:
cfg = dict(ctx.Config())
apiEndpoint = cfg["openSearchAPIEndpoint"]
username = cfg["openSearchUser"]
password = cfg["openSearchPassword"]
searchIndex = cfg["searchIndex"]
except:
logger.error('Missing configuration keys: openSearchAPIEndpoint, openSearchUser, openSearchPassword')
raise
# Retrieving the log entries from Connector Hub as part of the Function payload
try:
sensorDataList = json.loads(data.getvalue())
if not isinstance(sensorDataList, list):
raise ValueError
except:
logger.error('Invalid payload')
raise
# The log entries are in a list of dictionaries. We can iterate over the the list of entries and process them.
# For example, we are going to put the Id of the log entries in the function execution log
logger.info("Processing the following sensor data's:")
openSearchData = ""
for sensorData in sensorDataList:
logger.info("Message ID: " + str(base64.b64decode(sensorData["key"])))
decryptedSensorData = json.loads(base64.b64decode(sensorData["value"]))
decryptedSensorData["sensor_id"] = int(str(decryptedSensorData["sensor_id"]).replace("s",""))
decryptedSensorData["timestamp"] = int(str(decryptedSensorData["timestamp"]).replace("-","").replace("T","").replace("Z","").replace(":",""))
logger.info("Sensor ID: " + str(decryptedSensorData["sensor_id"]))
openSearchData = openSearchData + '{"index":{"_index":"' + searchIndex + '","_id":' + str(decryptedSensorData["sensor_id"]) + '}\n'
openSearchData = openSearchData + '{"sensor_id":' +str(decryptedSensorData["sensor_id"]) + ',"temperature":' +str(decryptedSensorData["temperature"]) + ',"humidity":' +str(decryptedSensorData["humidity"]) + ',"timestamp":' +str(decryptedSensorData["timestamp"]) + '}\n'
logger.info("OpenSearch Data: " + openSearchData)
# Now, we are inserting the log entries in the JSON Database
resp = opensearch_insert(apiEndpoint, username, password, searchIndex, openSearchData)
logger.info(resp)
if "items" in resp:
logger.info("Sensor Data's are successfully inserted")
logger.info(json.dumps(resp))
else:
raise Exception("Error while inserting Sensor Data's into the OpenSearch: " + json.dumps(resp))
# The function is done. Return empty response.
logger.info("OpenSearch Ingestion function end")
return response.Response(
ctx,
response_data="",
headers={"Content-Type": "application/json"}
)
This function receives message data from the OCI stream (list of messages) and ingests messages in the OpenSearch instance.
Create a connector in OCI connector hub to transfer data between OCI Stream and OpenSearch
I have created the following connector in OCI connector hub to transfer batches of data from OCI Stream to an OCI Function that I have created in the previous step, and function will ingest a list of messages to OCI OpenSearch.
Here is connector hub configuration:
Create a python script to send sensor data using API deployment endpoint.
Here is a python script I’m using to send sensor data to OCI OpenSearch using the API deployment endpoint. This script creates message data, creates a request signature, and calls the API endpoint.
import requests
from oci.config import from_file
from oci.signer import AbstractBaseSigner, load_private_key, load_private_key_from_file
import oci
import os
from base64 import b64encode
import random
import json
from datetime import datetime, timedelta
import time
## This is a custom class based on the original Signer() class from signer.py file to allow calling from API Gateway on the signing process.
class MySigner(AbstractBaseSigner):
def __init__(self, tenancy, user, fingerprint, private_key_file_location, pass_phrase=None, private_key_content=None):
self.api_key = tenancy + "/" + user + "/" + fingerprint
if private_key_content:
self.private_key = load_private_key(private_key_content, pass_phrase)
else:
self.private_key = load_private_key_from_file(private_key_file_location, pass_phrase)
generic_headers = ["date", "(request-target)"]
body_headers = ["content-length", "content-type", "x-content-sha256"]
self.create_signers(self.api_key, self.private_key, generic_headers, body_headers)
## This is function to publish message data
def putMessages(endpoint, messages):
headers = {'Content-type': 'application/json'}
response = requests.post(endpoint+'/messages', auth=auth, headers=headers, json=messages,verify=False)
# Load config from your local OCI CLI instalation, usually located at ~/.oci/config
config = from_file(file_location="<OCI Config Path>", profile_name="<OCI Profile>")
# Any request to OCI API must be signed, refer https://docs.oracle.com/en-us/iaas/Content/API/Concepts/signingrequests.htm#Request_Signatures
auth = MySigner(
tenancy=config['tenancy'],
user=config['user'],
fingerprint=config['fingerprint'],
private_key_file_location=config['key_file'],
pass_phrase=config['pass_phrase']
)
# set your endpoint and stream OCID properly
base_endpoint = 'https://<API Deployment Endpoint>/20180418/streams/<Stream OCID>'
endpoint = base_endpoint
sensor_id = random.randint(501, 9999)
temperature = round(random.uniform(60, 100), 2)
humidity = round(random.uniform(30, 70), 2)
timestamp = datetime.utcnow().strftime("%Y%m%d%H%M%S")
## Sensor data record
record = {
"sensor_id": sensor_id,
"temperature": temperature,
"humidity": humidity,
"timestamp": timestamp
}
key = record["sensor_id"]
value = json.dumps(record)
encoded_key = b64encode(str(key).encode()).decode()
encoded_value = b64encode(value.encode()).decode()
## Create OSS message format
messages={"messages":
[
{
"key": encoded_key,
"value": encoded_value
}
]
}
putMessages(endpoint, messages)
Conclusion
In this article I have explained how to ingest data from real-time solutions like temperature sensors into OCI OpenSearch. You can extend this solution to ingest data to other OCI services like Database or Object Storage.
References
OCI Request Signature: https://docs.oracle.com/en-us/iaas/Content/API/Concepts/signingrequests.htm
OCI Streaming Rest API: https://docs.oracle.com/en-us/iaas/api/#/en/streaming/20180418
OCI Function: https://docs.oracle.com/en-us/iaas/Content/Functions/Tasks/functionsquickstartguidestop.htm
OCI OpenSearch: https://docs.public.oneportal.content.oci.oraclecloud.com/en-us/iaas/Content/search-opensearch/home.htm
OCI Streaming: https://docs.public.oneportal.content.oci.oraclecloud.com/en-us/iaas/Content/Streaming/home.htm
OCI Connector Hub: https://docs.public.oneportal.content.oci.oraclecloud.com/en-us/iaas/Content/connector-hub/home.htm