No Code Sentiment Analysis with Hugging Face and Apache NiFi for Article Summaries

Tim Spann
Cloudera
Published in
7 min readAug 5, 2023

Apache NiFi, Hugging Face, NY Times RSS, Sentiment Analysis, Deep Learning, REST API, Classification, Distilbert, SST-2, NLP

Photo by Mahdis Mousavi on Unsplash

Mode: distilbert-base-uncased-finetuned-sst-2-english

I have been working on some LLM, ChatGTP, HuggingFace, Vectorization and some interesting AI integration projects with some interesting and awesome partners. While getting that started I came across how easy it was to build an application against a regular HuggingFace Inference API.

SOURCE CODE

HOW TO

Setup your Hugging Face account, really useful, if you don’t have it then create one. Hugging Face is awesome.

Create an Access Token for REST Access.

Hugging Face

Read into Inference API.

Check out the parameters.

Example Stories RSS Feed

Let’s build a flow to ingest article summaries, convert them to a format that can be fed to Hugging Face’s inference API and then send them there. We will then build a flow to process the results, enrich them and route them to Kafka for further processing with Flink SQL. We may also directly save them in Apache Kudu, Apache Hive, Apache Impala, Apache HBase, S3, HDFS, Apache Ozone, Apache Iceberg, MongoDB, Oracle, MySQL, Postgresql or all of these places at once. Once the data is in Apache Kafka we could also consume it at the same time with Apache Spark Structured Streaming, a Java consumer, a Python consumer, a C++ consumer, a Golang consumer, Tinybird, RisingWave and many others. Let’s get building.

NiFi Ingest Flow

First we need to retrieve RSS XML files from the NY Times (or other articles, social media, whatever you want to run sentiment analysis on). We could read PDFS, Word documents, Google Docs, Salesforce reports or database records.

InvokeHTTP

Retrieve RSS data

QueryRecord

SELECT * FROM FLOWFILE

Then we Convert the RSS XML files to JSON for easier processing.

Split JSON (JSONPath)

$.[0].channel.item

Next we pull out the story lines using JSON Path which let’s us split out individual JSON records from the large arrays in the single file.

EvaluateJsonPath (JSONPath)

$.description[0]

$.description[1]

$.description[2]

Next we extract all the lines from the description of the story and make those into new NiFi attributes not changing the main flowfile.

UpdateAttribute

${description1:append(${description2})}

Now we combine those fields together into one attribute.

AttributesToJSON

inputs

We then build a new JSON flow file from our story description attribute.

Now we can send this to HuggingFace for inference.

Main Hugging Face Data Flow

Once the document comes in we extract the text into an inputs attribute.

EvaluateJSONPath (JSONPath)

$.inputs

We then pass this HuggingFace format as JSON to the HuggingFace Inference REST API.

InvokeHTTP (Post)

https://api-inference.huggingface.co/models/distilbert-base-uncased-finetuned-sst-2-english

We now will split out the JSON records returned by HuggingFace.

SplitJSON

$.*.*

Since we get two values back from the call per results one for positive and one for negative. We use the Query Record to extract these when you result is over 50%.

QueryRecordProcessor (JSON-to-JSON)

SELECT lower(label) as sentiment, label, score
FROM FLOWFILE
WHERE label = ‘NEGATIVE’
AND CAST(score AS FLOAT) > cast(0.5 AS FLOAT)

SELECT lower(label) as sentiment, label, score
FROM FLOWFILE
WHERE label = ‘POSITIVE’
AND CAST(score AS FLOAT) > cast(0.5 AS FLOAT)

Two queries one for positive and one for negative, we can send them different places now if we need to. We could also add extra or less fields for each query.

We then will use Update Record to build a new JSON record with a timestamp, unique ID and the textbody of the original input with the existing sentiment.

UpdateRecord

using Literal Value

/textbody ${inputs}

/ts ${now():toNumber()}

/uuid ${uuid}

In case we want to use a schema, we can set that here. Also any other attributes you want.

UpdateAttribute

Set schema and content type variables.

Send these new JSON records to our Kafka cluster.

PublishKafkaRecord_2_6 (JSON-to-JSON)

SchemaJsonTreeReader

SchemaJsonRecordSetWriter

Brokers: kafka:9092

If our record can’t get to Kafka, let’s keep trying.

RetryFlowFile

Use defaults and retry to send to Kafka if weird issue then fail to somewhere.

Output Data

[{“sentiment”:”negative”,”label”:”NEGATIVE”,”score”:0.9984495639801025,”textbody”:”The Justice Department’s devastating findings left the city searching for next steps. Some residents found little closure.George Floyd’s tombstone at the cemetery and Say Their Names installation near George Floyd Square in Minneapolis on Friday.”,”ts”:”1691162971971",”uuid”:”41fffa6e-9aba-4bf4-b4d9–49580a6c97ea”}]

Photo by Filip Mishevski on Unsplash

For some more details, see the references below.

Kafka (topic is hfsentiment)

Flink SQL

CREATE TABLE `ssb`.`Meetups`.`hfsentiment` (
`sentiment` VARCHAR(2147483647),
`label` VARCHAR(2147483647),
`score` DOUBLE,
`textbody` VARCHAR(2147483647),
`ts` VARCHAR(2147483647),
`uuid` VARCHAR(2147483647),
`eventTimestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM ‘timestamp’,
WATERMARK FOR `eventTimestamp` AS `eventTimestamp` — INTERVAL ‘3’ SECOND
) WITH (
‘scan.startup.mode’ = ‘group-offsets’,
‘deserialization.failure.policy’ = ‘ignore_and_log’,
‘properties.request.timeout.ms’ = ‘120000’,
‘properties.auto.offset.reset’ = ‘earliest’,
‘format’ = ‘json’,
‘properties.bootstrap.servers’ = ‘kafka:9092’,
‘connector’ = ‘kafka’,
‘properties.transaction.timeout.ms’ = ‘900000’,
‘topic’ = ‘hfsentiment’,
‘properties.group.id’ = ‘hfsentimentflinkssb1’
)

We will run a simple query to see our data arrive. The example is just “select * from hfsentiment”.

Once the SQL job is deployed, you can see it in the Apache Flink Dashboard.

Materialized View as REST Endpoint with JSON

Example Abbreviated JSON

[
{"sentiment":"negative","label":"NEGATIVE",
"score":"0.902526438236237",
"textbody":"Much of the ....",
"ts":"1691256307665",
"uuid":"6ddde6e4-f5b3-4edb-935b-26a7f704d8b6",
"eventTimestamp":"2023-08-05 17:25:24.934"}, ...
}
]

Video Demo

--

--

Tim Spann
Cloudera

Principal Developer Advocate, Zilliz. Milvus, Attu, Towhee, GenAI, Big Data, IoT, Deep Learning, Streaming, Machine Learning. https://www.datainmotion.dev/