Adding Generative AI Results to SQL Streams

Tim Spann
Cloudera
Published in
11 min readMar 20, 2024

Cloudera SQL Stream Builder, Cloudera DataFlow, Apache NiFi, Apache Flink, SQL, JavaScript, Java, REST, CML, CSP, Google — Gemma-7b, HuggingFace

A use case came up where I want to make calls to an LLM and make it in real-time as fast, clean and easy as possible. Cloudera DataFlow makes this happen very easily.

QUICK UPDATE! THE EASIEST BUTTON!

I added another mechanism for call without using Apache NiFi. I can call directly from Cloudera SQL Stream Builder to Cloudera Machine Learning.

Once you have a model deployed (use our convenient AI and ML AMPs)

Now you can call it with your access key from SSB (or from NiFi or any REST consumer).

So you can do Real-Time GenAI with just Cloudera Stream Processing with SQL Stream Builder and Cloudera Machine Learning. You can run this all in Public Cloud of your choice, Private Cloud or a Hybrid mix where you want to run them.

It is easy to build and customize your function to call your model

In this example it’s a CML hosted model, could also be HuggingFace, OpenAI, Amazon, Microsoft, Google or others. It can also be models from H2O.ai, Google, Mistral or many others.

Code:

function CALLAI(input) {
try {
var con = new java.net.URL("https://modelservice.yoururl.cloudera.site/model").openConnection();
con.requestMethod = "POST";
con.setRequestProperty("Content-Type", "application/json");
var data = "{\"accessKey\":\"youraccesskeywillvarycheckcml\",\"request\":{\"text\":\"" + input + "\",\"task\":\"ai_help\"}}";

// Send post request
con.doOutput=true;
var wr = new java.io.DataOutputStream(con.outputStream);
wr.writeBytes(data);
wr.flush();
wr.close();
var reader = new java.io.BufferedReader(new java.io.InputStreamReader(con.inputStream));
var inputLine = new java.lang.String();
var out = new java.lang.StringBuilder("");
if ( reader != null ) {
while ((inputLine = reader.readLine()) != null)
out.append(inputLine);
}
reader.close();
return out.toString();
} catch(err) {
err.printStackTrace();
return "Unknown: " + err;
}
}
CALLAI($p0); // this line must exist

Try out the free Docker hosted Community Edition and call Generative AI models now!

If you want to skip all the tutorial and just get the code, download below.

Full Source Code

Step 1: Create a Streaming SQL Function: CALLLLM

It has one input parameter of a CALLLLM and an Output Type of STRING.

The source code example is:

function CALLLLM(input) {
try {
var c = new java.net.URL('http://yourserver:9676/query?calltype=llm&key=' + java.net.URLEncoder.encode(input)).openConnection();
c.requestMethod='GET';
var reader = new java.io.BufferedReader(new java.io.InputStreamReader(c.inputStream));
var inputLine = new java.lang.String();
var out = new java.lang.StringBuilder("");
if ( reader != null ) {
while ((inputLine = reader.readLine()) != null)
out.append(inputLine);
}
reader.close();
return out.toString();
} catch(err) {
return "Unknown: " + err;
}
}
CALLLLM($p0);

As you can see the JavaScript code utilizes a little Java 8 to make a call to Apache NiFi hosted REST end point. We could call Hugging Face directly but would have to do a lot of parsing and couldn’t cache or do other functions as easily. Using NiFi as our LLM Proxy makes a lot of sense for speed, reliability, enrichment, storage, caching and more.

You can test from the command line before you add it to the project. JJS will run your JavaScript. This is the Java JavaScript tool.

print(CALLLLM("What is Apache NiFi?"));

We add that print statement to call and test our function.

jjs testllm.js

We can then test in SSB.

Once we test and everything looks good.

Step 2: Create a Flink Job via SSB

We can use it in a Flink SQL query.

Select CALLLLM(CAST(messagetext as STRING)) as generatedtext, messagerealname, messageusername, messagetext,
messageusertz, messageid, threadts, ts
FROM flankslackmessages
WHERE messagetype = 'message'

We will then get back the text as a field in our result set. We could send that to any Flink SQL output like Kafka, JDBC or Kudu. We could also make a Materialized View as REST JSON endpoint.

Step 3: Use Cloudera Streams Messaging Manager to create/view Kafka Topics

The source of the main data was from this topic in Apache Kafka.

Create a Flink SQL Table against this topic in SSB.

CREATE TABLE `ssb`.`Meetups`.`flankslackmessages` (
`messagerealname` VARCHAR(2147483647),
`messagetype` VARCHAR(2147483647),
`inputs` VARCHAR(2147483647),
`messageid` VARCHAR(2147483647),
`uuid` VARCHAR(2147483647),
`messageusername` VARCHAR(2147483647),
`messagetimestamp` VARCHAR(2147483647),
`messagepermalinkpublic` VARCHAR(2147483647),
`filename` VARCHAR(2147483647),
`messagechannel` VARCHAR(2147483647),
`messagetext` VARCHAR(2147483647),
`messageusertz` VARCHAR(2147483647),
`threadts` VARCHAR(2147483647),
`ts` VARCHAR(2147483647),
`messagesubtype` VARCHAR(2147483647),
`eventTimeStamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp',
WATERMARK FOR `eventTimeStamp` AS `eventTimeStamp` - INTERVAL '3' SECOND
) WITH (
'scan.startup.mode' = 'earliest-offset',
'deserialization.failure.policy' = 'ignore',
'properties.request.timeout.ms' = '120000',
'properties.auto.offset.reset' = 'earliest',
'format' = 'json',
'properties.bootstrap.servers' = 'kafka:9092',
'connector' = 'kafka',
'properties.transaction.timeout.ms' = '900000',
'topic' = 'flank-slack-messages'
)

Step 4: Check your query

In SSB, you can run your code, if you need more data, add more rows to Kafka. It is easy to add these slack messages via Apache NiFi.

See:

Step 5: Build a Materialized View

Now that the output looks good, let’s make it available to HTML pages, Jupyter Notebooks, Spring Boot apps, NiFi and other systems that can ingest REST JSON feeds.

To build one you merely need to click the Materialized View button.

We set a primary key, # of rows we want to show at once and an API key for REST calls.
We can build a Materialized View Endpoint

Step 6: View the Endpoint in the browser

An example URL will look like:

http://localhost:18131/api/v1/query/5283/genai?key=fadafba4-bc9f-4424–4d36-d4b7e4f135tgf&limit=100

The output of such will look like this.

[ {
"generatedtext" : "What is the Trenton Computer Festival? Founded in 1986, the Trenton Computer Festival / Trenton Computer Club’s mission is to promote the understanding and appreciation of computers and related technology by providing educational and networking forums that empower people from beginners to extremeists.Classes sponsored by the club include computers, application programs, networking, electronics, database, biotechnology and non-traditional topics. Many members also use the club's computing facilities to work in their own areas of expertise. The club is based in a festival",
"messagerealname" : "Timothy Spann",
"messageusername" : "tspann",
"messagetext" : "What is the Trenton Computer Festival?",
"messageusertz" : "America/New_York",
"messageid" : "9d6322ef-8dd1-473e-b0ca-a61d6923b287",
"threadts" : "",
"ts" : "1710514690.622059"
}, {
"generatedtext" : "What is Apache NiFi? 1. The company Apache pits a “bacon” initiative, which adds clear operation Open Innovative Network Improvement. The strategy is often used for advanced knowledge, such as big augmentation to dictate soon.Whenever companies begin to evaluate how NiFi works, it’s often worried about the obsolete looks of the console UI. Through the forthcoming identity and appearance and the large foundation and Battle-Perfect of the sea, a knowledge researcher has even been produced and is the first presentation, showcasing about 70%",
"messagerealname" : "Timothy Spann",
"messageusername" : "tspann",
"messagetext" : "What is Apache NiFi?",
"messageusertz" : "America/New_York",
"messageid" : "f425793b-2e2f-473c-a153-7aaac513176c",
"threadts" : "",
"ts" : "1710514989.885429"
}, {
"generatedtext" : "ty 2023Edit descriptionweb.cvent.comKafka Topic Event Data with Generated TextFlink SQL Against Kafka Topic Messages From Enriched LLM Results in NiFiSecond DemoSource CodeGitHub - tspannhw/FLaNK-watsonx.ai: FLaNK Stack with <http://watsonx.ai|watsonx.ai> for google/flan-ul2…[/INST] User: What is Generative AI?&lt;/s&gt;Assistant: Generative AI is a subset of artificial intelligence that uses machine learning models to generate new data similar to the data it was trained on. It can create various types of content, such as text, images, and audio. In the context of the meetup mentioned, Generative AI was integrated into real-time streaming pipelines using tools like Apache NiFi, Apache Kafka, and Apache Flink, and connected to Large Language Models (LLM) to enable the pipeline=========================================== Data for nerds ====HF URL: <https://api-inference.huggingface.co/models/mistralai/Mixtral-8x7B-Instruct-v0.1>TXID: e3001e18-c318-4cc3-b736-e8ecc1b48ebd== Slack Message Meta Data ==ID: 3ff671cf-370e-486d-8864-5d53b84177bd Name: Timothy Spann [tspann]Time Zone: America/New_York== HF Mixtral 8x7B Meta Data ==Compute Characters/Time/Type: 3958 / 3.019911372/2-a100Generated/Prompt Tokens/Time per Token: 100 / 1197 : 30Inference Time: 3011 // Queue Time: 5Request ID/SHA: 09ardzfmJ7v2CPc_1zMcY / 1e637f2d7cb0a9d6fb1922f305cb784995190a83Validation/Total Time: 2 / 3019=============================================================================================================== Czech : : : : Aplikace pro přečtení田舎 THROUGH outlier data-device interface [ssh1d]uninamed-EPRTM0021: Slovakia : : : Eva You do not have enough information to sequence the getWe communicated in this meeting with Lucia. The lines below with / indicate a transfer.Fed 0: controlled via bool controlSource 0: controlled via bool controlRead 1: controlFormat uintData dest EP 11:3",
"messagerealname" : "Timothy Spann",
"messageusername" : "bunkertor",
"messagetext" : "ty 2023Edit descriptionweb.cvent.comKafka Topic Event Data with Generated TextFlink SQL Against Kafka Topic Messages From Enriched LLM Results in NiFiSecond DemoSource CodeGitHub - tspannhw/FLaNK-watsonx.ai: FLaNK Stack with <http://watsonx.ai|watsonx.ai> for google/flan-ul2…[/INST] User: What is Generative AI?&lt;/s&gt;\nAssistant: Generative AI is a subset of artificial intelligence that uses machine learning models to generate new data similar to the data it was trained on. It can create various types of content, such as text, images, and audio. In the context of the meetup mentioned, Generative AI was integrated into real-time streaming pipelines using tools like Apache NiFi, Apache Kafka, and Apache Flink, and connected to Large Language Models (LLM) to enable the pipeline\n\n=========================================== Data for nerds ====\n\nHF URL: <https://api-inference.huggingface.co/models/mistralai/Mixtral-8x7B-Instruct-v0.1>\nTXID: e3001e18-c318-4cc3-b736-e8ecc1b48ebd\n\n== Slack Message Meta Data ==\n\nID: 3ff671cf-370e-486d-8864-5d53b84177bd Name: Timothy Spann [tspann]\nTime Zone: America/New_York\n\n== HF Mixtral 8x7B Meta Data ==\n\nCompute Characters/Time/Type: 3958 / 3.019911372/2-a100\n\nGenerated/Prompt Tokens/Time per Token: 100 / 1197 : 30\n\nInference Time: 3011 // Queue Time: 5\n\nRequest ID/SHA: 09ardzfmJ7v2CPc_1zMcY / 1e637f2d7cb0a9d6fb1922f305cb784995190a83\n\nValidation/Total Time: 2 / 3019\n===============================================================================================================",
"messageusertz" : "America/New_York",
"messageid" : "",
"threadts" : "1710606616.216039",
"ts" : "1710606622.538169"
} ]

Step 7: Save our project to Github

When we make large changes to our “code”, we will push our project to Github. Mine is here https://github.com/tspannhw/meetup-ssb

Step 8: Test that end point

We can ingest this data in Jupyter notebooks, front end websites or even Apache NiFi.

That was the first half of the application, we can make a call from Flink SQL to an external HTTP URL. Let’s build the server side processor for that request. We will use Apache NiFi to receive that HTTP GET call.

NiFi Flow to Respond to REST Call

Entire Flow
Process Call

Flow Steps

  1. HandleHttpRequest: Receive Call from function
  2. RouteOnAttribute: filter away mime.type as application/json
  3. UpdateAttribute: trim our data${http.query.param.key:trim()}
  4. RouteOnAttribute: filter based on call type parameter. ${http.query.param.calltype:trim():equalsIgnoreCase(‘llm’)}
  5. ReplaceText: Build a JSON file to send to HuggingFace { “inputs”: “${messagetext:trim():replaceAll(‘“‘,’’):replaceAll(‘\n’, ‘’)} “ }
  6. InvokeHttp: Send call to HuggingFace https://api-inference.huggingface.co/models/google/gemma-7b
  7. (other branch)PublishKafkaRecord: Send prompt to Kafka
  8. QueryRecord: convert results from HuggingFace. SELECT generated_text FROM FLOWFILE
  9. UpdateRecord: add some fields to the output from metadata
  10. UpdateAttribute: Set schema and topic names.
  11. PublishKafkaRecord: Send enriched results to Kafka topic
  12. RetryFlowFile: if error try again

Slack User Data in Postgresql

RESOURCES

--

--

Tim Spann
Cloudera

Principal Developer Advocate, Zilliz. Milvus, Attu, Towhee, GenAI, Big Data, IoT, Deep Learning, Streaming, Machine Learning. https://www.datainmotion.dev/