Data stream processing in Oracle Cloud — An introduction

Soma Dey
Oracle Developers
Published in
5 min readDec 13, 2022

Data streaming’s popularity is booming as companies want to process and analyze the data in near-real time and provide relevant business insights to their users.

What is data streaming? It’s a continuous flow of data being generated from various sources. The data can be stored, analyzed, and acted upon by using various stream processing techniques.

A few of the popular use cases for data streaming include:

  1. Messaging
  2. Operation telemetry
  3. Web click stream data
  4. Application logs etc.

The above use cases can be developed using various services in Oracle Cloud.

Oracle Cloud Infrastructure (OCI) streaming service provides a fully managed, scalable, and durable solution for ingesting and consuming high-volume data streams in real-time, and it aligns with enterprise-grade security.

Let’s examine a simple use case of streaming with the beginnings of a weather app.

Ingest weather API data in OCI streaming service. Data would be stored in a JSON database that can be further used for analytics use cases.

Steps to achieve:

  • Invoke a weather API managed by third party source
  • Publish the message in OCI streaming service as a data stream using python SDK
  • Use OCI service connector to invoke the OCI function whenever the data is published in stream.
  • Transform the data using OCI function
  • Store the data in Autonomous JSON database (AJD) using REST API
  • Secure the REST API
  • Use OCI vault to store the secrets for REST API

Here, AJD (Autonomous JSON database) is used as datastore. It has many advantages such as:

  • It provides schema flexibility for developers
  • It empower developers with the benefits of both relational and document databases

Architecture:

Components:

To implement this use case, the following OCI services are to be deployed in an OCI tenancy:

  • OCI compartment
  • Networking resources (VCN, subnet, security list, route tables)
  • IAM policies, Dynamic Groups
  • OCI Linux VM for invoking API, OCI function setup and python SDK
  • Stream pool with private access
  • Service connector
  • OCI Function
  • OCI Vault
  • Autonomous JSON database

Implementation steps:

  1. Create a stream pool using console/OCICLI/Python SDK

2. Create a python script named produce_weather_data.py to invoke the weather API

3. Create a new stream named weatherstrm or fetch the existing stream details using the stream pool OCID. OCI python SDK can be leveraged (publish_weather_data_strm.py) for this purpose.

4. Create a demo function named weatherstrmfunc.

Note: Necessary policies and setup needs to be done prior as per the following QuickStart guide.

https://docs.oracle.com/en-us/iaas/Content/Functions/Tasks/functionsquickstartocicomputeinstance.htm

5. Create a service connector

6. Provision an Autonomous JSON database

7. Create a demo user and enable it with rest service

8. Create a table named weather_data to store the weather data

CREATE TABLE "DEMO"."WEATHER_DATA" 
( "location" CLOB constraint VALID_JSON_1 CHECK ("location" IS JSON ) ,
"current" CLOB constraint VALID_JSON_2 CHECK ("current" IS JSON )
) ;

9. Enable the rest service for weather_data table

10. Secure the rest API by enabling client auth token

11. Create a vault to store the client auth token secrets

12. Modify the function to perform the following tasks:

  • Process the message in appropriate JSON format
  • Decode the message
  • Get the oath client token from OCI vault
  • Insert the data in autonomous json database by invoking a REST API with bearer token

Function details:

a. func.py

b. func.yaml

c. requirements.txt

13. Add the client_id and client_secret ocid in function configuration to fetch the details from OCI vault

Time to watch it working !

  • Publish the weather data in stream
  • Check the stream
  • Service connector will invoke the function automatically.
  • Check the function log
  • Check the data in AJD table. It should be inserted in JSON format
  • Access the data in user friendly tabular format
select w."location".region,w."location".name,w."location".country, w."current".last_updated,w."current".temp_c,w."current".condition.text  FROM WEATHER_DATA w;

Conclusion:

This is just a simple use case of how OCI streaming and cloud native services can help process data in real time. You can use this for several other use cases mentioned at the beginning of this blog, or try others. Drop into our developer Slack to let us know what you’re making, or ask questions. Of course, you can try this out on our Free Tier as well.

Happy Reading!

--

--

Soma Dey
Oracle Developers

/* Opinions expressed here are my own & do not express the views or opinions of my employer */