No Code Sentiment Analysis with Hugging Face and Apache NiFi for Article Summaries
Apache NiFi, Hugging Face, NY Times RSS, Sentiment Analysis, Deep Learning, REST API, Classification, Distilbert, SST-2, NLP
Mode: distilbert-base-uncased-finetuned-sst-2-english
I have been working on some LLM, ChatGTP, HuggingFace, Vectorization and some interesting AI integration projects with some interesting and awesome partners. While getting that started I came across how easy it was to build an application against a regular HuggingFace Inference API.
SOURCE CODE
HOW TO
Setup your Hugging Face account, really useful, if you don’t have it then create one. Hugging Face is awesome.
Create an Access Token for REST Access.
Read into Inference API.
Check out the parameters.
Example Stories RSS Feed
Let’s build a flow to ingest article summaries, convert them to a format that can be fed to Hugging Face’s inference API and then send them there. We will then build a flow to process the results, enrich them and route them to Kafka for further processing with Flink SQL. We may also directly save them in Apache Kudu, Apache Hive, Apache Impala, Apache HBase, S3, HDFS, Apache Ozone, Apache Iceberg, MongoDB, Oracle, MySQL, Postgresql or all of these places at once. Once the data is in Apache Kafka we could also consume it at the same time with Apache Spark Structured Streaming, a Java consumer, a Python consumer, a C++ consumer, a Golang consumer, Tinybird, RisingWave and many others. Let’s get building.
First we need to retrieve RSS XML files from the NY Times (or other articles, social media, whatever you want to run sentiment analysis on). We could read PDFS, Word documents, Google Docs, Salesforce reports or database records.
InvokeHTTP
Retrieve RSS data
QueryRecord
SELECT * FROM FLOWFILE
Then we Convert the RSS XML files to JSON for easier processing.
Split JSON (JSONPath)
$.[0].channel.item
Next we pull out the story lines using JSON Path which let’s us split out individual JSON records from the large arrays in the single file.
EvaluateJsonPath (JSONPath)
$.description[0]
$.description[1]
$.description[2]
Next we extract all the lines from the description of the story and make those into new NiFi attributes not changing the main flowfile.
UpdateAttribute
${description1:append(${description2})}
Now we combine those fields together into one attribute.
AttributesToJSON
inputs
We then build a new JSON flow file from our story description attribute.
Now we can send this to HuggingFace for inference.
Main Hugging Face Data Flow
Once the document comes in we extract the text into an inputs attribute.
EvaluateJSONPath (JSONPath)
$.inputs
We then pass this HuggingFace format as JSON to the HuggingFace Inference REST API.
InvokeHTTP (Post)
https://api-inference.huggingface.co/models/distilbert-base-uncased-finetuned-sst-2-english
We now will split out the JSON records returned by HuggingFace.
SplitJSON
$.*.*
Since we get two values back from the call per results one for positive and one for negative. We use the Query Record to extract these when you result is over 50%.
QueryRecordProcessor (JSON-to-JSON)
SELECT lower(label) as sentiment, label, score
FROM FLOWFILE
WHERE label = ‘NEGATIVE’
AND CAST(score AS FLOAT) > cast(0.5 AS FLOAT)
SELECT lower(label) as sentiment, label, score
FROM FLOWFILE
WHERE label = ‘POSITIVE’
AND CAST(score AS FLOAT) > cast(0.5 AS FLOAT)
Two queries one for positive and one for negative, we can send them different places now if we need to. We could also add extra or less fields for each query.
We then will use Update Record to build a new JSON record with a timestamp, unique ID and the textbody of the original input with the existing sentiment.
UpdateRecord
using Literal Value
/textbody ${inputs}
/ts ${now():toNumber()}
/uuid ${uuid}
In case we want to use a schema, we can set that here. Also any other attributes you want.
UpdateAttribute
Set schema and content type variables.
Send these new JSON records to our Kafka cluster.
PublishKafkaRecord_2_6 (JSON-to-JSON)
SchemaJsonTreeReader
SchemaJsonRecordSetWriter
Brokers: kafka:9092
If our record can’t get to Kafka, let’s keep trying.
RetryFlowFile
Use defaults and retry to send to Kafka if weird issue then fail to somewhere.
Output Data
[{“sentiment”:”negative”,”label”:”NEGATIVE”,”score”:0.9984495639801025,”textbody”:”The Justice Department’s devastating findings left the city searching for next steps. Some residents found little closure.George Floyd’s tombstone at the cemetery and Say Their Names installation near George Floyd Square in Minneapolis on Friday.”,”ts”:”1691162971971",”uuid”:”41fffa6e-9aba-4bf4-b4d9–49580a6c97ea”}]
For some more details, see the references below.
Kafka (topic is hfsentiment)
Flink SQL
CREATE TABLE `ssb`.`Meetups`.`hfsentiment` (
`sentiment` VARCHAR(2147483647),
`label` VARCHAR(2147483647),
`score` DOUBLE,
`textbody` VARCHAR(2147483647),
`ts` VARCHAR(2147483647),
`uuid` VARCHAR(2147483647),
`eventTimestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM ‘timestamp’,
WATERMARK FOR `eventTimestamp` AS `eventTimestamp` — INTERVAL ‘3’ SECOND
) WITH (
‘scan.startup.mode’ = ‘group-offsets’,
‘deserialization.failure.policy’ = ‘ignore_and_log’,
‘properties.request.timeout.ms’ = ‘120000’,
‘properties.auto.offset.reset’ = ‘earliest’,
‘format’ = ‘json’,
‘properties.bootstrap.servers’ = ‘kafka:9092’,
‘connector’ = ‘kafka’,
‘properties.transaction.timeout.ms’ = ‘900000’,
‘topic’ = ‘hfsentiment’,
‘properties.group.id’ = ‘hfsentimentflinkssb1’
)
We will run a simple query to see our data arrive. The example is just “select * from hfsentiment”.
Once the SQL job is deployed, you can see it in the Apache Flink Dashboard.
Materialized View as REST Endpoint with JSON
Example Abbreviated JSON
[
{"sentiment":"negative","label":"NEGATIVE",
"score":"0.902526438236237",
"textbody":"Much of the ....",
"ts":"1691256307665",
"uuid":"6ddde6e4-f5b3-4edb-935b-26a7f704d8b6",
"eventTimestamp":"2023-08-05 17:25:24.934"}, ...
}
]
Video Demo
REFERENCES
- https://huggingface.co/distilbert-base-uncased-finetuned-sst-2-english
- https://huggingface.co/docs/api-inference/detailed_parameters
- https://huggingface.co/docs
- https://huggingface.co/inference-api
- https://huggingface.co/docs/inference-endpoints/index
- https://huggingface.co/docs/transformers/main/en/model_doc/distilbert#transformers.DistilBertForSequenceClassification
- https://community.cloudera.com/t5/Community-Articles/Call-a-CML-Deployed-Model-From-Apache-NiFi-in-10-minutes-Or/ta-p/374853