A beginners guide to Nifi 2.0 Custom python processor

Sudeep Singh
4 min readMar 28, 2024

--

Apache NiFi, a powerful data flow tool, has already proven its robustness across multiple use cases. With the upcoming new release of NiFi 2.0, we have a new capability: building processors using native Python.

But Why Python in NiFi?
Wider Developer Base: Until now, NiFi processors could only be written in Java ( Though there was support for Jython in earlier releases, it was still not as powerful as native Python ). By adding native python support, it opens up to a broader scope of use cases in data analytics.
Extensive Python libraries : Python has an extensive library base for data manipulation, analysis, and visualization. These libraries can now be leveraged within NiFi processors, enabling complex data processing tasks.

Let’s have a look into creating a simple python processor for Nifi 2.0

Creating a Python processor in NiFi is straightforward:

Python Version: Use any Python version above 3.9.
Import Libraries: Import any Python libraries you need and use them within the processor code.
Extend FlowFileProcessor: extend the FlowFileProcessor class and override its onTrigger method. This method contains the processing logic for the processor, which can be written in Python.

Example: Python Processor to perform zgrep on incoming stream of flowfiles.

This processor is useful especially in cases where the input stream has compressed flowfiles & we want to filter the flowfiles based on content without decompressing them.

import subprocess
import io
import gzip

from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
from nifiapi.properties import PropertyDescriptor, StandardValidators

ZGREP_COMMAND = "zgrep"
RG_COMMAND = "rg"

class PythonZgrepProcessor(FlowFileTransform):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileTransform']
class ProcessorDetails:
version = '1.0.0'
description = """Parses incoming zip file and runs a grep on the file with passed arguments, the output of zgrep command is returned in a flowfile as content."""
tags = ["zip", "gz", "zgrep", "grep", "rg", "python", "re"]

def __init__(self, jvm=None, **kwargs):
self.jvm = jvm

self.GREP_COMMAND_TO_USE = PropertyDescriptor(
name="System grep command to be used",
description="""Specifies which grep command is to be used. rg has better performance. Note: rg grep should be installed on system if used""",
allowable_values=[ZGREP_COMMAND, RG_COMMAND],
required=True,
default_value=ZGREP_COMMAND
)

self.IGNORE_ZERO_BYTE_OUTPUT = PropertyDescriptor(
name="Ignore Zero Byte Output",
description="""Specifies if there is no match for the passed regex, a zero byter flowfile should be created or not""",
allowable_values=["Yes", "No"],
required=True,
default_value="Yes"
)

self.INPUT_STRING_OR_REGEX = PropertyDescriptor(
name="String or Regex to match",
description="""Specifies the string or regex to match on the input flowfiles""",
required=True,
validators=[StandardValidators.NON_EMPTY_VALIDATOR]
)

self.property_descriptors = [self.GREP_COMMAND_TO_USE, self.IGNORE_ZERO_BYTE_OUTPUT, self.INPUT_STRING_OR_REGEX]

def onScheduled(self, context):
self.grep_command = context.getProperty(self.GREP_COMMAND_TO_USE).getValue()
self.search_regex = context.getProperty(self.INPUT_STRING_OR_REGEX).getValue()

def getPropertyDescriptors(self):
return self.property_descriptors

def transform(self, context, flowFile):
try:
process = subprocess.Popen([self.grep_command, self.search_regex, '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

compressed_file_object = io.BytesIO(flowFile.getContentsAsBytes())
output, errors = process.communicate(input=compressed_file_object.read())
output = gzip.compress(output)

if output:
return FlowFileTransformResult(relationship = "success", contents = output)
else:
return FlowFileTransformResult(relationship = "original")
except Exception as e:
self.logger.error(e)
return FlowFileTransformResult(relationship = "failure")

This processor gives user an option to choose between rg or zgrep as the grepping method( rg should be pre-installed).

Here are few troubleshooting items if you face any issues getting python up in nifi environment:

  1. Java 21 is required (any platform)
  2. Python 3.9+ (any platform) is required (Python 3.12 might NOT work)
  3. If it’s installed, make sure it’s the default set with “sudo update-alternatives java”
  4. Make sure your environment has JAVA_HOME defined with the path for Java 21
  5. Make sure Python3.9+ is the default prior to running NiFi with “sudo update-alternatives — config python3”
  6. Executing python3 — version should show whichever version you set as your default and it should be 3.9~3.11
  7. You can see what version was copied by NiFi in the directory “./work/python/controller/bin/python3 — version”
  8. If this is showing anything other than 3.9 then delete the work folder, follow the steps above, and try again.
  9. If you build a processor from scratch the Developer guide says to use this for your __init__
def __init__(self, **kwargs): 
super().__init__(**kwargs)

10. You’ll get an error…replace super().__init__(**kwargs) with pass like the examples that come with the install.

11. Changes to your Python extensions are not immediate….NiFi polls the directory periodically to detect changes, download dependencies, and load the updated processors. Sometimes I had to restart NiFi to get it to detect my changes if my previous code update made it really unhappy.

12. ./logs/nifi-python.log will be your friend for Python extension related issues

13. If your Python extension has dependencies and it fails to download them you can see the command it attempted in nifi-python.log

14. The “content” data returned with the FlowFileTransformResult should be a string or byte array.

source: Nifi python developer guide , Nifi 2.0 installation issues

--

--

Sudeep Singh

Tech enthusiast with an interest in building solutions. Leading teams to success.