Codeless Generative AI Pipelines with Chroma Vector DB & Apache NiFi

Tim Spann
Cloudera
Published in
11 min readJan 18, 2024

--

Software: chroma vector database, Apache NiFi, Apache Kafka, Slack Python 3.x

Let’s grab some data from right here, yes this article is in the data:

It is very easy to start a data pipeline ingesting documents in any format. For this one we will grab HTML documents from a Medium via my RSS feed. This will give us the last 10 posts, which is a small portion of my total. So I will request an export to get all the documents for building a larger database.

RSS is a type of XML, oldy but goody. This is no problem for NiFi. We easily grab all these documents and get them in a format to parse, chunk and store in a vector database. We can also do Pinecone, Milvus and others.

NiFi Flow

  1. Consume Medium Feed: InvokeHTTP — Get against https://medium.com/@tspann/feed
  2. Consume Medium Feed: QueryRecord — convert RSS to JSON
  3. Consume Medium Feed: SplitJSON — $.[0].channel.item
  4. Consume Medium Feed: EvaluateJSONPath — parse out best fields
  5. Consume Medium Feed: UpdateAttribute — Set JSON
  6. Consume Medium Feed: UpdateRecord — add prime fields
  7. Consume Medium Feed: contentFull Output Port
  8. (or Grab from Local) ListFile: Grab from downloaded posts
  9. (or Grab from Local) FetchFile: Download that HTML file
  10. AttributesToJSON: cleanup
  11. EvaluateJSONPath: convert to FlowFile
  12. ParseDocument (Python): convert HTML FlowFile to pre-load JSON format
  13. ChunkDocument (Python): chunk parsed document into smaller pieces if needed.
  14. PutChroma (Python): store to Chroma Vector Database. Use letter to added context to prompts for Gen AI Models.
  15. QueryChroma (Python): query the database and match the query results.
  16. Kafka Path: PublishKafkaRecord_2_6 — send Chroma results to Kafka chromaresultsmedium
  17. Kafka Path: RetryFlowFile — if Apache Kafka send fails try again.
  18. Slack Path: SplitRecord — split into 1 record for display
  19. Slack Path: EvaluateJSONPath — pull out fields to display
  20. Slack Path: PutSlack — send formatted message to #chat group

We can check the Chroma DB Server if in remote REST mode.

http://192.168.1.160:9776/api/v1/pre-flight-checks

In a second version I installed a remote Chroma DB

chroma run --path /home/tspann/chroma/db --host 192.168.1.160 --port 9776

Parsed Document JSON Format

{
"text" : "Adding Real-time streaming data to Generative AI workflows at any scale, anytime, anywhere\n\nRefactoring Real-Time Code For Price, Speed, Functionality and Performance\n\nI switched from Alphavantage to Finnhub for my corporate name lookup as they have a better free program. This one does require some more complex processing, but it’s worth it since I may need to make a lot of calls for people’s requests. Such as:\n\nQ: What is the outlook for IBM this year?\n\nAlpha Vantage API Documentation\n\nFinnhub - Free realtime APIs for stock, forex and cryptocurrency.\n\nhttps://finnhub.io/api/v1/search?q=${companyName:urlEncode()}&token=GetYourFreeCode\n\nJolt Transform\n\n\"operation\": \"shift\",\n\n\"spec\": {\n\n\"result\": {\n\n\"*\": {\n\n\"description\": \"[&1].description\",\n\n\"displaySymbol\": \"[&1].displaySymbol\",\n\n\"symbol\": \"[&1].symbol\",\n\n\"type\": \"[&1].type\"\n\nAppend Stock Information\n\nStock Value for ${companyName} [${nlp_org_1}/${stockSymbol}] on ${date} \n\nis ${closeStockValue}. stock date ${stockdateTime}.\n\nstock exchange ${exchange}\n\nUse Python (In Next Update We will create a custom Python Processor)\n\n/opt/demo/runcompany.sh\n\nFLaNK-EdgeAI/runcompany.sh at main · tspannhw/FLaNK-EdgeAI\n\nSo we are using a couple of libraries , some NLP and some Python. This code is based on https://stackoverflow.com/questions/72986264/company-name-extraction-with-bert-base-ner-easy-way-to-know-which-words-relate\n\nWe are using SPaCY with a solid pre-built model to extract the company name from the Slack request. This has proven much stronger than the model I am using with the Apache OpenNLP model.\n\nModel: xlm-roberta-large-finetuned-conll03-english\n\nxlm-roberta-large-finetuned-conll03-english · Hugging Face\n\nI am going to look at using a NiFi Python Processor and also at calling HuggingFace via REST to see which is better and to give everyone options.\n\nfrom transformers import pipeline\n\nfrom subprocess import list2cmdline\n\nimport spacy\n\nfrom spacy.matcher import Matcher\n\nimport time\n\nimport argparse\n\nparser = argparse.ArgumentParser(description='CompanyNameParser')\n\n# parameter\n\nparser.add_argument('--input', type=str, default='Question: How is Walmart doing?\"', help='string to parse')\n\nargs = parser.parse_args()\n\nstart = time.time()\n\nnlp = spacy.load('en_core_web_sm')\n\nmodel_checkpoint = \"xlm-roberta-large-finetuned-conll03-english\"\n\ntoken_classifier = pipeline(\n\n\"token-classification\", model=model_checkpoint, aggregation_strategy=\"simple\"\n\n# Organisation names extraction\n\ndef org_name(extracted_text):\n\nclassifier = token_classifier(extracted_text)\n\n# Get the list of dictionary with key value pair \"entity\":'ORG'\n\nvalues = [item for item in classifier if item[\"entity_group\"] == \"ORG\"]\n\n# Get the list of dictionary with key value pair \"entity\":'ORG'\n\nres = [sub['word'] for sub in values]\n\nfinal1 = list(set(res)) # Remove duplicates\n\nfinal = list(filter(None, final1)) # Remove empty strings\n\nprint(final[0])\n\n#org_name(\"Q: What is the outlook for Fedex this year?\")\n\norg_name(args.input)\n\nend = time.time()\n\n#print(\"The time of execution of above program is :\", round((end - start), 2))\n\nClean and Enrich Company Information\n\n${companyName:trim():ifElse(${companyName},${companyName2:trim()})}\n\nAdd Company Information\n\n${generated_text:append(\n\n${companyInfo:replaceAll('\\{\\\"displayStock\\\"\\:\\\"',' '):\n\nreplaceAll('\\\"\\}',' ')})}\n\nCache That Company Information\n\nRetrieve Company Information\n\nQuery Record SQL\n\nSELECT description as companyName, symbol \n\nFROM FLOWFILE\n\nWHERE type like '%Common%Stock%'\n\nAND symbol not LIKE '\\d'\n\nAND symbol not like '\\.'\n\nLIMIT 1\n\nSend Enriched AI + Real-Time Stock Information\n\n${generated_text:substringAfter('ANSWER:'):replaceAll('\\)','')}\n\nExample Question & Response\n\nTimothy J Spann\n\nQ: What is the outlook for American Water this year?\n\ntimchat\n\nAmerican Water Works Company, Inc. (AWK Q1 2023 Earnings Call Transcript\n\nAmerican Water Works Company, Inc. (NYSE:AWK Q1 2023 Earnings Call dated May. 04, 2023. Corporate Participants: Brian Chin — Senior Vice President and Chief Financial Officer.\n\nhttps://www.fool.com/earnings/call-transcripts/2023/05/04/american-water-works-company-inc-awk-q1-2023-earn/\n\nAmerican Water Works Company, Inc. (AWK CEO Susan Story on Q1 2023 Results - Earnings Call Transcript\n\nAmerican Water Works Company, Inc. (NYSE:AWK Q1 2023 Earnings Conference Call May\n\nStock Value for AMERICAN STATES WATER CO [/AWR] on Sat, 06 Jan 2024 03:26:05 GMT is 77.54000.\n\njastock date 2024/01/05 15:59:00. stock exchange NYSE\n\nOutput JSON\n\n\"date\" : \"Mon, 08 Jan 2024 22:15:25 GMT\",\n\n\"x-global-transaction-id\" : \"c44ad66a56c4e2cb853480d91116bae1\",\n\n\"x-request-id\" : \"9742864c-1dd6-4c20-8e01-3082aad3177e\",\n\n\"cf-ray\" : \"8427cc5dfa1e42e0-EWR\",\n\n\"inputs\" : \"Q: What is the outlook for American Water this year?\",\n\n\"created_at\" : \"2024-01-08T22:15:25.433Z\",\n\n\"stop_reason\" : \"max_tokens\",\n\n\"x-correlation-id\" : \"bXZnanQ-2cb9169d98fd4a37b29c030696e8f7ea\",\n\n\"x-proxy-upstream-service-time\" : \"2100\",\n\n\"message_id\" : \"disclaimer_warning\",\n\n\"model_id\" : \"meta-llama/llama-2-70b-chat\",\n\n\"invokehttp.request.duration\" : \"8137\",\n\n\"message\" : \"This model is a Non-IBM Product governed by a third-party license that may impose use restrictions and other obligations. By using this model you agree to its terms as identified in the following URL. URL: https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/fm-models.html?context=wx\",\n\n\"uuid\" : \"dfedbad8-df0c-4572-b47c-9eb07cf099a1\",\n\n\"generated_text\" : \"American Water Works Company, Inc. (AWK) Q1 2023 Earnings Call Transcript\\nAmerican Water Works Company, Inc. (NYSE:AWK) Q1 2023 Earnings Call dated May. 04, 2023. Corporate Participants: Brian Chin — Senior Vice President and Chief Financial Officer.\\nhttps://www.fool.com/earnings/call-transcripts/2023/05/04/american-water-works-company-inc-awk-q1-2023-earn/\\nAmerican Water Works Company, Inc. (AWK) CEO Susan Story on Q1 2023 Results - Earnings Call Transcript\\nAmerican Water Works Company, Inc. (NYSE:AWK) Q1 2023 Earnings Conference Call May\",\n\n\"transaction-id\" : \"bXZnanQ-2cb9169d98fd4a37b29c030696e8f7ea\",\n\n\"tokencount\" : \"28\",\n\n\"generated_token\" : \"200\",\n\n\"ts\" : \"1704752125522\",\n\n\"advisoryId\" : \"\"\n\nLet’s also store the company information to Postgres. We could have done Kudu, Iceberg, HBase, MongoDB, MariaDB, Oracle, SQL Server, S3 or others. We could have also stored them to all of those simultaneously if you need to.\n\nOutput to Slack\n\nRESOURCES\n\nNiFi 1.7+ - XML Reader/Writer and ForkRecord processor\n\nGitHub - willie-engelbrecht/ParseMultiLevelJSON-NiFiRecordProcessors: How to parse multi level JSON with NiFI and Avro using Record Processors\n\nBuilding an Effective NiFi Flow — QueryRecord\n\nNiFi - Split a record using a non-root JSON attribute\n\nBuilding an Effective NiFi Flow — QueryRecord\n\nGitHub - tspannhw/FLaNK-EdgeAI: FLaNK-EdgeAI\n\nAugmenting and Enriching LLM with Real-Time context was originally published in Cloudera on Medium, where people are continuing the conversation by highlighting and responding to this story.",
"metadata" : {
"source" : null,
"filename" : "9273ece9-85b3-462b-9f7e-f5e600c62680",
"uuid" : "edc1daf0-ef81-40ea-b894-2a1df5e3710e",
"chunk_index" : 0,
"chunk_count" : 1
}
}

Query Record: RSS to JSON

Update Fields with Meta Data

JSON Writing

XML Reading (RSS From Medium)

Query Chroma Database (Local Version)

Put Chroma — Store to Chroma using ONNX LM Model

Chunk Documents

Parse Document (Ours is HTML)

Extract an Input

Attributes to JSON for just Inputs

Source Code

Example JSON Result of Chroma Query

{
"id" : "b73aeb7c-0bce-4aea-8f8e-80b25f19d71b-0",
"distance" : 0.815965523180694,
"document" : "NiFi-Kafka-Flink for getting to work. Can't we just work remote?\n\nLet's look at some previous work.\n\nFirst we had to fly -\n\nWatching Airport Traffic in Real-Time\n\nSecond I wanted to build a system so I could use mass transit anywhere in the world!??!\n\nIteration 1: Building a System to Consume All the Real-Time Transit Data in the World At Once\n\nThird when I first started I thought to build local and just cover MTA. New York City's MTA has a ton of different data streams since there are so many avenues of transit. I started using the feed they provide as JSON for SiRi. Let's go deeper.\n\nFinding the Best Way Around\n\nIf we look at our original source code for MTA access, a lot was left undone. Let's build a new stream.\n\nGitHub - tspannhw/FLaNK-MTA: MTA Data Sources\n\nGitHub - tspannhw/FLaNK-EveryTransitSystem: Every transit system\n\nThe first thing I noticed is I was not processing all of the subway lines in New York City, there are a lot of them and a lot of data.\n\nWe will standardize the formats so we can reuse our new generic GTFS-RF processing.\n\nWe have a list of URLs to call as JSON, we split them and send them in the generic GTFS-RT format to get processed by our GTFS-RT processor.\n\nhttps://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-ace\n\nhttps://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-g\n\nhttps://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-nqrw\n\nhttps://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs\n\nhttps://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-bdfm\n\nhttps://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-jz\n\nhttps://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-l\n\nhttps://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-si\n\nhttps://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/lirr%2Fgtfs-lirr\n\nhttps://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/mnr%2Fgtfs-mnr\n\nhttps://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/camsys%2Fall-alerts\n\nhttps://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/camsys%2Fsubway-alerts\n\nhttps://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/camsys%2Fbus-alerts\n\nhttps://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/camsys%2Flirr-alerts\n\nhttps://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/camsys%2Fmnr-alerts\n\nhttp://nycferry.connexionz.net/rtt/public/utility/gtfsrealtime.aspx/tripupdate\n\nSo we just iterate a long list of different subways in NYC (ACE, G, BDFM, LIRR and more) and make them ready to run in our universal GTFS-RT system. There is one caveat that will have to be handled, this GTFS-RT requires an extra header x-api-key that needs our registered key (https://api.mta.info/#/AccessKey) If you don't have one create one.\n\nOnce we got the MTA data in the right style, we can use our universal GTFS-RT processor for trip updates, service alerts and vehicle positions.\n\nWe can process like everyone else.\n\n\"stopsequence\":\"0\",\n\n\"arrivaltime\":\"1702078380\",\n\n\"stopid\":\"903049\",\n\n\"tripid\":\"MV_D3-Weekday-SDon-111300_M3_334\",\n\n\"tripstartdate\":\"20231208\",\n\n\"departuretime\":\"1702078380\",\n\n\"triprouteid\":\"M2\",\n\n\"locationcountrycode\":\"US\",\n\n\"maxlat\":\"\",\"maxlong\":\"\",\n\n\"locationmunicipality\":\"New York City\",\n\n\"minlong\":\"\",\n\n\"locationsubdivisionname\":\"New York\",\n\n\"minlat\":\"\",\n\n\"ts\":\"1702072263668\",\n\n\"uuid\":\"e9a41230-77e2-4143-b8e9-a8a837ee9a04\",\n\n\"rundate\":\"\",\n\n\"providername\":\"MTA\",\n\n\"transitname\":\"obanyc\"\n\nWe can see the new data stream into Flink.\n\nExample Vehicle Positions\n\n\"route_id\" : \"B82\",\n\n\"bearing\" : \"21.801409\",\n\n\"directionid\" : \"0\",\n\n\"latitude\" : \"40.608383\",\n\n\"tripid\" : \"EN_D3-Weekday-SDon-071200_B82_604\",\n\n\"vehiclelabel\" : \"\",\n\n\"vehicleid\" : \"MTA NYCT_7149\",\n\n\"startdate\" : \"20231212\",\n\n\"uuid\" : \"86ee5e4f-a17b-49b5-bf1a-49b6bd70c85f\",\n\n\"speed\" : \"\",\n\n\"longitude\" : \"-73.95911\",\n\n\"timestamp\" : \"1702402015\",\n\n\"locationcountrycode\" : \"US\",\n\n\"maxlat\" : \"\",\n\n\"maxlong\" : \"\",\n\n\"locationmunicipality\" : \"New York City\",\n\n\"minlong\" : \"\",\n\n\"locationsubdivisionname\" : \"New York\",\n\n\"minlat\" : \"\",",
"metadata" : {
"chunk_count" : 3,
"chunk_index" : 0,
"filename" : "b73aeb7c-0bce-4aea-8f8e-80b25f19d71b",
"uuid" : "70bc9577-0815-4a5a-b519-3f149244a92d"
}
}

NiFi Slack Message

==== NiFi Chunked Medium Articles to chroma vector db
On Date: ${date}
File Name: ${filename}
creator: ${creator}
distance: ${distance}
Doc count: ${document.count}
Doc ID: ${docuuid}
GUID: ${guid}
ID: ${id}
invokehttp.remote.dn: ${invokehttp.remote.dn}
invokehttp.request.duration: ${invokehttp.request.duration}
link: ${link}
metachunkcount: ${metachunkcount}
metachunkindex: ${metachunkindex}
pubdate: ${pubdate}
title: ${title}
updated: ${updated}
uuid: ${uuid}
x-opentracing: ${x-opentracing}
===== Document
${document}
=====

NiFi Slack Posting

timchat
APP 12:20 AM
==== NiFi Chunked Medium Articles to chroma vector db
On Date:
File Name: 6ac513e7-5ad9-481e-b383-70bb5f5e6021
creator: Tim Spann
distance: 0.836917674508507
Doc count: 2
Doc ID: c7fc2ea1-75d8-4665-a9d1-ea1d52f5cfa0
GUID: {"isPermaLink":false,"value":null}
ID: 6ac513e7-5ad9-481e-b383-70bb5f5e6021-15
invokehttp.remote.dn: CN=medium.com,O=Cloudflare\, Inc.,L=San Francisco,ST=California,C=US
invokehttp.request.duration: 243
link: https://medium.com/cloudera-inc/augmenting-and-enriching-llm-with-real-time-context-b6da7ba4960a?source=rss-13e1ea7cf9ee------2
metachunkcount: 16
metachunkindex: 15
pubdate: Mon, 08 Jan 2024 22:39:04 GMT
title: Augmenting and Enriching LLM with Real-Time context
updated: 2024-01-10T15:15:19.474Z
uuid: fc7f08bf-bf73-49a6-b320-7c34d29cc34c
x-opentracing: {"ot-tracer-spanid":"10d0b89c4a0a763e","ot-tracer-traceid":"45ef3349b8a24aad","ot-tracer-sampled":"true"}
===== Document
REFERENCES
GitHub - tspannhw/FLaNK-EveryTransitSystem: Every transit system
Iteration 2: Building a System to Consume All the (Unsecured) Real-Time Transit Data in the World was originally published in Cloudera on Medium, where people are continuing the conversation by highlighting and responding to this story.
=====

Slack Output

Slack Output 2

SMM Kafka Output

Resources

Apache NiFi and Generative AI / LLM is getting hot.

--

--

Tim Spann
Cloudera

Principal Developer Advocate, Cloudera. Principal Engineer - Big Data, IoT, Deep Learning, Streaming, Machine Learning, Cloud. https://www.datainmotion.dev/