Writing a Generative AI Processor in Python

Tim Spann
9 min readJan 22, 2024

--

Apache NiFi 2.0.0 — Python 3.x — IBM WatsonX — Vector Database — ChromaDB

It was a really snowy day when I started this. I saw the IBM WatsonX Python SDK and realized I needed to wire up my Gen AI Model (LLM) to send my context augmented prompt from Slack. Why not create a Python Processor for Apache NiFi 2.0.0? I guess that won’t be hard. It was easy!

No time to sleep let’s build a new model
Snowing on my 12 foot friend

It was really easy, I just wrote a simple Python app to call my Watson X API and wrap it in the Python processor format and drop it in the python/extensions folder and restart NiFi.

Let’s pick a model to use (I should add that as an option, but it’s tied to the SDK so I have to think about that.).

After we picked a model I tested it in WatsonX’s Prompt Lab. Then I ported it to a simple Python program. Once that worked I started adding the features like properties and the transform method. That’s it.

Now we can drop our new LLM calling processor into a flow and use it as any other built-in processor.

How to Build a Python Processor for NiFi, example:

The Python API requires that Python 3.9+ is available on the machine hosting NiFi.

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import re
from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
from nifiapi.properties import PropertyDescriptor, StandardValidators, ExpressionLanguageScope

# https://github.com/IBM/watsonxdata-python-sdk
# pip install ibm-watsonxdata

class CallWatsonXAI(FlowFileTransform):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileTransform']

class ProcessorDetails:
version = '2.0.0-SNAPSHOT'
description = """Output results of call to WatsonX.AI """
tags = ["ibm", "WatsonX", "WatsonXAI", "generativeai", "ai", "artificial intelligence", "ml", "machine learning", "text", "LLM"]

PROMPT_TEXT = PropertyDescriptor(
name="Prompt Text",
description="Specifies whether or not the text (including full prompt with context) to send",
required=True,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
)
WATSONXAI_API_KEY = PropertyDescriptor(
name="WatsonX AI API Key",
description="The API Key to use in order to authentication with IBM WatsonX",
sensitive=True,
required=True,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
)
WATSONXAI_PROJECT_ID = PropertyDescriptor(
name="WatsonX AI Project ID",
description="The Project ID for WatsonX",
sensitive=True,
required=True,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
)

property_descriptors = [
PROMPT_TEXT,
WATSONXAI_API_KEY,
WATSONXAI_PROJECT_ID
]

def __init__(self, **kwargs):
super().__init__()
self.property_descriptors.append(self.PROMPT_TEXT)
self.property_descriptors.append(self.WATSONXAI_API_KEY)
self.property_descriptors.append(self.WATSONXAI_PROJECT_ID)

def getPropertyDescriptors(self):
return self.property_descriptors

def transform(self, context, flowfile):
from ibm_watson_machine_learning.foundation_models.utils.enums import ModelTypes
from ibm_watson_machine_learning.foundation_models import Model

prompt_text = context.getProperty(self.PROMPT_TEXT).evaluateAttributeExpressions(flowfile).getValue()
watsonx_api_key = context.getProperty(self.WATSONXAI_API_KEY).evaluateAttributeExpressions(flowfile).getValue()
project_id = context.getProperty(self.WATSONXAI_PROJECT_ID).evaluateAttributeExpressions(flowfile).getValue()

my_credentials = {
"url" : "https://us-south.ml.cloud.ibm.com",
"apikey" : watsonx_api_key
}

model_id = ModelTypes.LLAMA_2_70B_CHAT
gen_parms = None
project_id = project_id
space_id = None
verify = False

model = Model( model_id, my_credentials, gen_parms, project_id, space_id, verify )
gen_parms_override = None
generated_response = model.generate( prompt_text, gen_parms_override )

attributes = {"mime.type": "application/json"}
output_contents = json.dumps(generated_response)

self.logger.debug(f"Prompt: {prompt_text}")

return FlowFileTransformResult(relationship = "success", contents=output_contents, attributes=attributes)

Package-level Dependencies

Add to requirements.txt

Basic Format for the Python Processor

You need to import various things from the nifiapi library. You then set up your class, CallWatsonXAI. You need to include class Java definition and ProcessDetails that includes NiFi version, dependencies, a description and some tags.

class ProcessorDetails:
version = '0.0.1-SNAPSHOT',
dependencies = ['pandas']

Define All The Properties For the Processor

You need to setup PropertyDescriptors for each property that includes things like a name, description, required, validators, expression_language_scope and more.

Transform Main Method

Here we include an imports needed. You can access properties via context.getProperty. You can then set attributes for outputs as shown via attributes. We then set contents for Flow File output. And finally relationship, which for all guide is success. You should add something to handle errors. I need to add that.

If you need to redeploy, debug or fix something.

While you may delete the entire work directory while NiFi is stopped, doing so may result in NiFi taking significantly longer to startup the next time, as it must source all extensions' dependencies from PyPI, as well as expand all Java extensions' NAR files.

See:

Example Application

Building off our previous application that receives Slack messages, we will take those Slack queries send them against PineCone or Chroma vector databases and take that context and send it along with our call to IBM’s WatsonX AI REST API for Generative AI (LLM).

You can find those previous details here:

Pinecone Vector Database Free Tier

Generative AI Flow Path

NiFi Flow

  1. Listen HTTP: On port 9518/slack. NiFi is universal REST endpoint
  2. QueryRecord: JSON cleanup
  3. SplitJSON: $.*
  4. EvalJSONPath: output attribute for $.inputs
  5. QueryChroma: Call server on port 9776 use ONNX model, export 25 Rows
  6. QueryRecord: JSON->JSON. Limit 1
  7. SplitRecord: JSON->JSON. Into 1 row.
  8. EvalJSONPath: Export the context from $.document
  9. ReplaceText: Make context the new Flow File
  10. UpdateAttribute: Update inputs.
  11. CallWatsonX: Our Python processor to call IBM
  12. SplitRecord: 1 Record, JSON -> JSON.
  13. EvalJSONPath: Add attributes
  14. AttributesToJSON: Make new flow file from attributes
  15. QueryRecord: Validate JSON.
  16. UpdateRecord: Add generatedd text, inputs, ts, uuid.
  17. Kafka Path: PublishKafkaRecord_2_6 — send results to Kafka
  18. Kafka Path: RetryFlowFile — if Apache Kafka send fails try again.
  19. Slack Path: SplitRecord — split into 1 record for display
  20. Slack Path: EvaluateJSONPath — pull out fields to display
  21. Slack Path: PutSlack — send formatted message to #chat group

Slack Message Template

==== Python NiFi 2 to IBM WatsonX.AI LLM

On Date: ${date} Created: ${created_at}

Prompt: ${inputs}

Response: ${generated_text}

Token: ${generated_token}
IBM Msg: ${message}
IBM Msg ID: ${message_id}
Model ID: ${model_id}
Stop Reason: ${stop_reason}
Token Count: ${tokencount}
UUID: ${uuid}
File Name: ${filename}
=====

UpdateRecord

${generated_text:urlDecode():trim()}

${inputs:replaceAll('\\\\n',''):replaceAll('Write a short summary.',''):replaceAll('Transcript:',''):replaceAll('\n',''):trim()}

Attributes to JSON

transaction-id, date, cf-ray, created_at, generated_text, generated_token, inputs, invokehttp.request.duration, message, message_id, model_id, stop_reason, tokencount, uuid, x-correlation-id, x-global-transaction-id, x-proxy-upstream-service-time, x-request-id

Just One Vector Record

SELECT * FROM FLOWFILE
LIMIT 1

Input Prompt

${inputs:prepend('\n\n\nInput: ')}

Build Prompt

{ "inputs": {
"question": "${question}",
"context": "${context}"
}
}

Example Output

{
"model_id" : "meta-llama/llama-2-70b-chat",
"created_at" : "2024-01-19T22:54:04.413Z",
"results" : [ {
"generated_text" : "\n\nApache NiFi is a powerful tool for data integration and processing, and it can be",
"generated_token_count" : 20,
"input_token_count" : 1193,
"stop_reason" : "max_tokens"
} ],
"system" : {
"warnings" : [ {
"message" : "This model is a Non-IBM Product governed by a third-party license that may impose use restrictions and other obligations. By using this model you agree to its terms as identified in the following URL. URL: https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/fm-models.html?context=wx",
"id" : "disclaimer_warning"
} ]
}
}

Source Code

NiFi Example Flow

CHROMA DB

http://IP:9776/api/v1/collections/nifi?tenant=default_tenant&database=default_database

{"name":"nifi","id":"5d4d0324-46cc-4002-91f7-7d2c1a0685e0","metadata":{"hnsw:space":"cosine"},"tenant":"default_tenant","database":"default_database"}

http://IP:9776/docs#/default/count

http://192.168.1.160:9776/openapi.json

RESOURCES

https://www.datacamp.com/tutorial/chromadb-tutorial-step-by-step-guide

--

--

Tim Spann

Principal Developer Advocate, Zilliz. Milvus, Attu, Towhee, GenAI, Big Data, IoT, Deep Learning, Streaming, Machine Learning. https://www.datainmotion.dev/