Apache NiFi 2.0.0 — Python 3.x — IBM WatsonX — Vector Database — ChromaDB
It was a really snowy day when I started this. I saw the IBM WatsonX Python SDK and realized I needed to wire up my Gen AI Model (LLM) to send my context augmented prompt from Slack. Why not create a Python Processor for Apache NiFi 2.0.0? I guess that won’t be hard. It was easy!
It was really easy, I just wrote a simple Python app to call my Watson X API and wrap it in the Python processor format and drop it in the python/extensions folder and restart NiFi.
Let’s pick a model to use (I should add that as an option, but it’s tied to the SDK so I have to think about that.).
After we picked a model I tested it in WatsonX’s Prompt Lab. Then I ported it to a simple Python program. Once that worked I started adding the features like properties and the transform method. That’s it.
Now we can drop our new LLM calling processor into a flow and use it as any other built-in processor.
How to Build a Python Processor for NiFi, example:
The Python API requires that Python 3.9+ is available on the machine hosting NiFi.
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import re
from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
from nifiapi.properties import PropertyDescriptor, StandardValidators, ExpressionLanguageScope
# https://github.com/IBM/watsonxdata-python-sdk
# pip install ibm-watsonxdata
class CallWatsonXAI(FlowFileTransform):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileTransform']
class ProcessorDetails:
version = '2.0.0-SNAPSHOT'
description = """Output results of call to WatsonX.AI """
tags = ["ibm", "WatsonX", "WatsonXAI", "generativeai", "ai", "artificial intelligence", "ml", "machine learning", "text", "LLM"]
PROMPT_TEXT = PropertyDescriptor(
name="Prompt Text",
description="Specifies whether or not the text (including full prompt with context) to send",
required=True,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
)
WATSONXAI_API_KEY = PropertyDescriptor(
name="WatsonX AI API Key",
description="The API Key to use in order to authentication with IBM WatsonX",
sensitive=True,
required=True,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
)
WATSONXAI_PROJECT_ID = PropertyDescriptor(
name="WatsonX AI Project ID",
description="The Project ID for WatsonX",
sensitive=True,
required=True,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
)
property_descriptors = [
PROMPT_TEXT,
WATSONXAI_API_KEY,
WATSONXAI_PROJECT_ID
]
def __init__(self, **kwargs):
super().__init__()
self.property_descriptors.append(self.PROMPT_TEXT)
self.property_descriptors.append(self.WATSONXAI_API_KEY)
self.property_descriptors.append(self.WATSONXAI_PROJECT_ID)
def getPropertyDescriptors(self):
return self.property_descriptors
def transform(self, context, flowfile):
from ibm_watson_machine_learning.foundation_models.utils.enums import ModelTypes
from ibm_watson_machine_learning.foundation_models import Model
prompt_text = context.getProperty(self.PROMPT_TEXT).evaluateAttributeExpressions(flowfile).getValue()
watsonx_api_key = context.getProperty(self.WATSONXAI_API_KEY).evaluateAttributeExpressions(flowfile).getValue()
project_id = context.getProperty(self.WATSONXAI_PROJECT_ID).evaluateAttributeExpressions(flowfile).getValue()
my_credentials = {
"url" : "https://us-south.ml.cloud.ibm.com",
"apikey" : watsonx_api_key
}
model_id = ModelTypes.LLAMA_2_70B_CHAT
gen_parms = None
project_id = project_id
space_id = None
verify = False
model = Model( model_id, my_credentials, gen_parms, project_id, space_id, verify )
gen_parms_override = None
generated_response = model.generate( prompt_text, gen_parms_override )
attributes = {"mime.type": "application/json"}
output_contents = json.dumps(generated_response)
self.logger.debug(f"Prompt: {prompt_text}")
return FlowFileTransformResult(relationship = "success", contents=output_contents, attributes=attributes)
Package-level Dependencies
Add to requirements.txt
Basic Format for the Python Processor
You need to import various things from the nifiapi library. You then set up your class, CallWatsonXAI. You need to include class Java definition and ProcessDetails that includes NiFi version, dependencies, a description and some tags.
class ProcessorDetails:
version = '0.0.1-SNAPSHOT',
dependencies = ['pandas']
Define All The Properties For the Processor
You need to setup PropertyDescriptors for each property that includes things like a name, description, required, validators, expression_language_scope and more.
Transform Main Method
Here we include an imports needed. You can access properties via context.getProperty. You can then set attributes for outputs as shown via attributes. We then set contents for Flow File output. And finally relationship, which for all guide is success. You should add something to handle errors. I need to add that.
If you need to redeploy, debug or fix something.
While you may delete the entire
work
directory while NiFi is stopped, doing so may result in NiFi taking significantly longer to startup the next time, as it must source all extensions' dependencies from PyPI, as well as expand all Java extensions' NAR files.
See:
Example Application
Building off our previous application that receives Slack messages, we will take those Slack queries send them against PineCone or Chroma vector databases and take that context and send it along with our call to IBM’s WatsonX AI REST API for Generative AI (LLM).
You can find those previous details here:
Pinecone Vector Database Free Tier
Generative AI Flow Path
NiFi Flow
- Listen HTTP: On port 9518/slack. NiFi is universal REST endpoint
- QueryRecord: JSON cleanup
- SplitJSON: $.*
- EvalJSONPath: output attribute for $.inputs
- QueryChroma: Call server on port 9776 use ONNX model, export 25 Rows
- QueryRecord: JSON->JSON. Limit 1
- SplitRecord: JSON->JSON. Into 1 row.
- EvalJSONPath: Export the context from $.document
- ReplaceText: Make context the new Flow File
- UpdateAttribute: Update inputs.
- CallWatsonX: Our Python processor to call IBM
- SplitRecord: 1 Record, JSON -> JSON.
- EvalJSONPath: Add attributes
- AttributesToJSON: Make new flow file from attributes
- QueryRecord: Validate JSON.
- UpdateRecord: Add generatedd text, inputs, ts, uuid.
- Kafka Path: PublishKafkaRecord_2_6 — send results to Kafka
- Kafka Path: RetryFlowFile — if Apache Kafka send fails try again.
- Slack Path: SplitRecord — split into 1 record for display
- Slack Path: EvaluateJSONPath — pull out fields to display
- Slack Path: PutSlack — send formatted message to #chat group
Slack Message Template
==== Python NiFi 2 to IBM WatsonX.AI LLM
On Date: ${date} Created: ${created_at}
Prompt: ${inputs}
Response: ${generated_text}
Token: ${generated_token}
IBM Msg: ${message}
IBM Msg ID: ${message_id}
Model ID: ${model_id}
Stop Reason: ${stop_reason}
Token Count: ${tokencount}
UUID: ${uuid}
File Name: ${filename}
=====
UpdateRecord
${generated_text:urlDecode():trim()}
${inputs:replaceAll('\\\\n',''):replaceAll('Write a short summary.',''):replaceAll('Transcript:',''):replaceAll('\n',''):trim()}
Attributes to JSON
transaction-id, date, cf-ray, created_at, generated_text, generated_token, inputs, invokehttp.request.duration, message, message_id, model_id, stop_reason, tokencount, uuid, x-correlation-id, x-global-transaction-id, x-proxy-upstream-service-time, x-request-id
Just One Vector Record
SELECT * FROM FLOWFILE
LIMIT 1
Input Prompt
${inputs:prepend('\n\n\nInput: ')}
Build Prompt
{ "inputs": {
"question": "${question}",
"context": "${context}"
}
}
Example Output
{
"model_id" : "meta-llama/llama-2-70b-chat",
"created_at" : "2024-01-19T22:54:04.413Z",
"results" : [ {
"generated_text" : "\n\nApache NiFi is a powerful tool for data integration and processing, and it can be",
"generated_token_count" : 20,
"input_token_count" : 1193,
"stop_reason" : "max_tokens"
} ],
"system" : {
"warnings" : [ {
"message" : "This model is a Non-IBM Product governed by a third-party license that may impose use restrictions and other obligations. By using this model you agree to its terms as identified in the following URL. URL: https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/fm-models.html?context=wx",
"id" : "disclaimer_warning"
} ]
}
}
Source Code
NiFi Example Flow
CHROMA DB
http://IP:9776/api/v1/collections/nifi?tenant=default_tenant&database=default_database
{"name":"nifi","id":"5d4d0324-46cc-4002-91f7-7d2c1a0685e0","metadata":{"hnsw:space":"cosine"},"tenant":"default_tenant","database":"default_database"}
http://IP:9776/docs#/default/count
http://192.168.1.160:9776/openapi.json
RESOURCES
https://www.datacamp.com/tutorial/chromadb-tutorial-step-by-step-guide