Streaming LLM with Apache NiFi (HuggingFace)

Tim Spann
Cloudera
Published in
6 min readAug 27, 2023

--

LLM, Vector Ingest with Apache NiFi, Apache Kafka, Apache Flink

See my talk on August 23, 2023 at NYC AI Dev Day.

My NiFi flow handles many sources of data and funnels them to various HuggingFace AI models via REST for processing. The flow then receives the results, enriches them, transforms them and sends them to sinks.

One source is a web page with a text box and button. I am hosting a simple web page with this HTML form and a JQuery https://datatables.net/.

The NiFi flow handles this with HandleHttpRequest and HandleHttpResponse to send and receive HTTP messages.

We route the data in, clean it up and call HuggingFace, we then query and process the results and enrich with some fields like timestamp and primary key.

Then we send the results plus the original text and enrichments as an event to Kafka to be received by consumers such as Apache NiFi, Spring, Python, Cloud Consumers and Flink SQL.

Above is an example of one of these JSON event messages.

We also send a formatted message to Slack to notify everyone we received a new result. This makes for an easy information radiator. We can also send a copy of the record to Apache Iceberg in the public cloud.

We then funnel the text that the user wrote to our processing flow. This same processing flow will take in all the various options for sources including Kafka, REST Endpoint, NiFi Site-to-Site, RSS article feeds and more. We could add social media, databases, logs and more with ease.

Above is an example of a Slack message that NiFi sent.

As an example of how we can distribute data to other consumers, we host a REST endpoint that reads the most recent result of an updated cache that we maintain whenever new calls happen. This allows for secure NiFi clusters to call us when they want it.

As we mentioned we sent our formatted JSON event to Kafka, the above topic, hfbloom, can be seen and validated easily in Cloudera’s Streams Messaging Manager.

Now that our data is in Kafka, we can start consuming it with Apache Flink SQL. The easiest way to do that is to use Cloudera’s SQL Stream Builder to build a virtual table from our topic by analyzing the data and building a schema from it.

Once our schema is detected from our data, SSB will build a Flink SQL table pointing to that topic. The DDL is pretty simple, but it’s nice to not have to manually make it.

Now that we have a table, we can query it. The simplest is basic SQL such as “select * from hfbloom”.

Now that we have a running Flink job, we can output a materialize view for use by non-streaming readers. I am doing this in that example HTML page. It’s the table at the bottom. SSB let’s us build a Materialized View Endpoint as JSON fed via a REST endpoint.

The output is shown below.

Now that we have that we could read that with Jupyter notebooks or other tools. As I mentioned I populate an HTML DataTable by consuming this REST endpoint and outputting it.

AI Models REST Endpoints

https://api-inference.huggingface.co/models/bigscience/bloom

https://api-inference.huggingface.co/models/google/flan-t5-xxl

https://api-inference.huggingface.co/models/google/flan-ul2

https://us-south.ml.cloud.ibm.com/ml/v1-beta/generation/text?version=2023-05-29

SLACK OUTPUT

==== NiFi to Hugging Face BLOOM LLM
AzureML Model Deployment: bloom-deployment
On Date: Thu, 17 Aug 2023 02:05:56 GMT
File Name: 7b8afb65-d5eb-49ee-9a89-d2b870310243
Request Duration: 1339
Request URL: https://api-inference.huggingface.co/models/bigscience/bloom
Compute Characters: 63
Compute Time: 1014
Compute Type: gpu+optimized
Inference Time: 968
Queue Time: 45
Request ID: iPX8lqeK9XwqSZA9e-wCH
SHA: 053d9cd9fbe814e091294f67fcfedb3397b954bb
Time Per Token: 48
Total Time: 1014
Validation Time: 0
R: Apache NiFi is a great tool for loading data into Apache Kafka. It is a very simple and easy to use tool. It is a very simple and easy to use
=====

SOURCE CODE

HTML Page

Flink SQL Query

Create a table for Flink SQL to access the results from Hugging Face BLOOM model call results.

CREATE TABLE `ssb`.`Meetups`.`hfbloom` (
`generated_text` VARCHAR(2147483647),
`ts` VARCHAR(2147483647),
`x_compute_type` VARCHAR(2147483647),
`inputs` VARCHAR(2147483647),
`x_compute_time` VARCHAR(2147483647),
`x_inference_time` VARCHAR(2147483647),
`uuid` VARCHAR(2147483647),
`x_time_per_token` VARCHAR(2147483647),
`x_compute_characters` VARCHAR(2147483647),
`eventTimeStamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp',
WATERMARK FOR `eventTimeStamp` AS `eventTimeStamp` - INTERVAL '3' SECOND
) WITH (
'scan.startup.mode' = 'group-offsets',
'properties.request.timeout.ms' = '120000',
'properties.auto.offset.reset' = 'earliest',
'format' = 'json',
'properties.bootstrap.servers' = 'kafka:9092',
'connector' = 'kafka',
'properties.transaction.timeout.ms' = '900000',
'topic' = 'hfbloom',
'properties.group.id' = 'llmBloomProps'
)

NOTE

For calling WatsonX.AI requires an IBM Cloud account and project setup. You will also need to run another call against https://iam.cloud.ibm.com/identity/token to get a proper token. This is super easy in NiFi, if you need the code let me know.

BigScience Large Open-science Open-access Multilingual Language Model
Version 1.3 / 6 — July 2022

BLOOM is an autoregressive Large Language Model (LLM), trained to continue text from a prompt on vast amounts of text data using industrial-scale computational resources. As such, it is able to output coherent text in 46 languages and 13 programming languages that is hardly distinguishable from text written by humans. BLOOM can also be instructed to perform text tasks it hasn’t been explicitly trained for, by casting them as text generation tasks.

--

--

Tim Spann
Cloudera

Principal Developer Advocate, Zilliz. Milvus, Attu, Towhee, GenAI, Big Data, IoT, Deep Learning, Streaming, Machine Learning. https://www.datainmotion.dev/