NIFI Custom Processor Series Part 1: Writing your first custom processor

Mr Sinchan Banerjee
7 min readMar 13, 2023

In this series, we will discuss about how can you write a new Apache NIFI custom processor from scratch. In this seven(7) part series, you will learn various nuisance about writing NIFI custom processors.In these discussions, you need to have basic knowledge and work experience with Apache NIFI and Java.

NOTE: If you are new to Apache NIFI, I would suggest you to check the official documentation of Apache NIFI. If you want a concise discussion covering the basics concepts of Apache NIFI, you can refer Chapter 8 of my book — Scalable Data Architecture with Java.

Since Apache NIFI is a very well written product using Spring framework of Java, any new addition of any custom components like Processor and ControllerService has to written in Java. All these Processors and ControllerService is bundled into NAR (Nifi ARchive) file, which is deployed in NIFI.

Here, in this discussion, we will build a simple NIFI processor, which takes two properties — Salutation and BeforeOrAfter, and produces a flowfile with Salutation. For example, — Suppose the content of incoming flowfile is Sinchan, property value of Salutation property is set to Good Morning and BeforeOrAfter value is B, then output flowfile should have a content like Good Morning Sinchan.

Step 1: Setup the project

From your favorite Java editor ( preferably Jetbrain’s IntelliJ IDEA), create a new maven project. First , we will add a new parent project to automatically enable nar generation for our custom processor

    <parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>1.16.0</version>
</parent>

Next, we will add a new property in the properties block of the pom.xml file

 <properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<nifi.version>1.16.0</nifi.version>
</properties>

Then add the following NIFI dependencies in the pom.xml file.

<!-- https://mvnrepository.com/artifact/org.apache.nifi/nifi-api -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>${nifi.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.nifi/nifi-utils -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>${nifi.version}</version>
</dependency>

Step 2: Develop the Custom Processor

In order to create a custom processor, we must a create a implement a concrete subclass of the class— org.apache.nifi.processor.AbstractProcessor. Since this NIFI library class is an abstract class, we must implement the onTrigger() method in our class implementation.This is shown as below

public class SalutationProcessor extends AbstractProcessor {
@Override
public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {

}
}

OnTrigger() method is triggered everytime a flowfile arrives in input Queue of the NIFI processor, and the processor is running. OnTrigger() method should contain the logic or functionality that we intend to apply on the incoming flowfile(s). For Salution Processor, the logic is dependent on processor properties. So, first, let us add code to define and declare those properties. Properties can be defined in a Processor class using PropertyDescriptor API. The following code snippet shows how we can define the two required processor properties in the code.

    public static final PropertyDescriptor SALUTATION = new PropertyDescriptor.Builder()
.name("Salutation")
.displayName("Salutation")
.description("The salutation that you want to prepend or append to the name coming in the flowfile")
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.build();

public static final PropertyDescriptor BEFORE_OR_AFTER = new PropertyDescriptor.Builder()
.name("BeforeOrAfter")
.displayName("Insert Before Or After")
.description("Whether you cant prepend or append the salutation. For prepend use - B and for Append - use A")
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.build();

Let us understand the above code snippet. Each processor property can be defined using a PropertyDescripter object. Here, we are creating a SALUTATION object, and building its definition using PropertyDescriptor.Builder() APIs. Each PropertyDescriptor.Builder api can contain the following:-

  1. Name: Name of the property Descriptor
  2. DisplayName: Name of the property that is displayed in NIFI UI
  3. Description: The description that is displayed when you hover over the question mark (?) adjacent to the Property Name in NIFI UI
  4. AddValidator: Allows to add a Validator logic on the property. There are many StandardValidators like one used here called NON_EMPTY_EL_VALIDATOR, which checks whether the field is empty or not. If it is empty, it raises an invalid flag.
  5. ExpressionLanguageSupported: A boolean indicating whether Nifi Expression Language is supported or not.
  6. Required: A boolean that indicates whether the property is mandatory or optional.

Now, let us add the possible outgoing relationships of this processor. We can define a relationship of the processor using Relationship.Builder() API as shown below:

    public static final Relationship SUCCESS = new Relationship.Builder()
.name("SUCCESS")
.description("Succes relationship")
.build();

public static final Relationship FAILURE = new Relationship.Builder()
.name("FAILURE")
.description("Failure relationship")
.build();

Then we would initialize the processor fields properties and relationships with the already defined property descriptors and relationships using init() method of AbstractProcessor class. We also need to override the getRelationships() and getSupportedPropertyDescriptors() methods to expose the relationships and properties to Nifi UI.The code snippet shows these implementations :

    private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;

@Override
public void init(final ProcessorInitializationContext processContext){
List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(SALUTATION);
properties.add(BEFORE_OR_AFTER)
this.properties = Collections.unmodifiableList(properties);

Set<Relationship> relationships = new HashSet<>();
relationships.add(SUCCESS);
relationships.add(FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}

@Override
public Set<Relationship> getRelationships(){
return relationships;
}

@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors(){
return properties;
}

Now, we will add the functional logic that needs to run when a flowfile trigger arrives.This logic will be written inside the onTrigger() method. The implement onTrigger method looks like the following

@Override
public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {

FlowFile flowFile = processSession.get();
if(flowFile==null)
return;
String beforeOrAfter = processContext.getProperty(BEFORE_OR_AFTER).evaluateAttributeExpressions(flowFile).getValue();
String salutationStr = processContext.getProperty(SALUTATION).evaluateAttributeExpressions(flowFile).getValue();
if(!("B".equalsIgnoreCase(beforeOrAfter)||"A".equalsIgnoreCase(beforeOrAfter))){
processSession.transfer(flowFile,FAILURE);
}

final AtomicReference<String> result = new AtomicReference<>();

processSession.read(flowFile,inputStream -> {
String flowContent = IOUtils.toString(inputStream,"UTF-8");

if("B".equalsIgnoreCase(beforeOrAfter)){
result.getAndSet(salutationStr+" "+flowContent);
}else if("A".equalsIgnoreCase(beforeOrAfter)){
result.getAndSet(flowContent+" "+salutationStr);
}
});

processSession.write(flowFile,outputStream -> {
outputStream.write(result.get().getBytes(StandardCharsets.UTF_8));
});
processSession.transfer(flowFile,SUCCESS);

}

Let us understand the implementation of onTrigger() method as shown in the above code. First, onTrigger() method takes two arguments of type ProcessSession and ProcessContext.

  • ProcessSession object(second argument variable) helps us to fetch the flowfile, read, write, add attributes to the flowfile. It also gives us API to transfer a processed flowfile to a desired Relationship Queue ( Here SUCCESS or FAILURE queue).
  • ProcessContext object , on the other hand, helps to obtain context variables like the values of the processor property set in NIFI UI.

Here, in the code snippet shown above, first we use processSession.get API to obtain the flowfile which triggered onTrigger method. If flowfile value is null, we ignore the trigger. Then, using processContext object we, obtain the values of Salutation and beforeOrAfter property. Please note that both these property supports nifi expression, so, we must use evaluateAttributeExpressions() API, before obtaining the value of these properties. Then , we define a AtomicReference variable to store the output flowfile content. The we use processSession.read to read the flowfile content from the inputStream object as shown in the code above.And then transform the content and store it in the AtomicReference variable called result.We write the result to the output flowfile using processSession.write API. Then we transfer the flowfile to the SUCCESS relationship.

NOTE: Our code should transfer the flowfile either to SUCCESS or failure relationship, otherwise the processor will fail stating Transfer Relationship not specified.

Step 3: Packaging and Building the nar

First let us ensure that the packaging type of the artifact generated should be nar. To ensure that, add a explicit packaging tag in the pom.xml file as shown below

    <groupId>org.example</groupId>
<artifactId>SalutationProcessor</artifactId>
<version>1.0</version>
<packaging>nar</packaging>

Next, create a META-INF folder under resources folder and create a subfolder called services under META-INF folder.Then add a blank file named org.apache.nifi.processor.Processor. The final project structure will look as follows

Figure — Structure of the project

Now, we will add the fully qualified class name of the processor i.e. org.example.SalutationProcessor in the file org.apache.nifi.processor.Processor.

Now, we will build the project by executing the command either from command line or from your Java IDE

mvn clean install

Once you have run the above command you should be able to see a nar file named is generated in the target folder.

Step 4: Deploying the nar and test the processor

Copy the generated nar file to the lib folder of your local NIFI installation. Once copied your nar should be present in the $NIFI_HOME/lib folder as shown in the screenshot below

Figure — Copied Nar containing Salutation Processor in $NIFI_HOME/lib

Now, restart your NIFI instance, now you should be able to drag and drop processor of type SalutationProcessor from your NIFI UI. We create a simple NIFI flow to test our processor as shown below:

Figure — Flow to test our Custom Processor (SalutionProcessor)

We generate a flowfile with a simple plain text content using GenerateFlowfile as shown in the screenshot below:

Figure — Configuration of the GenerateFlowFile processor

Then, we set the nifi properties of the Salutation processor as follows

Figure — Configuring Salutation Processor

Once we run the flow, we should be able to get a output flowfile with content “Good Morning Sinchan” as show below

Figure — Content of the output flowfile

As seen in this example, we were able to successfully develop and test our first Apache NIFI custom processor. You can find the full code base for this blog at: https://github.com/perfecxus/SalutationProcessor

Hope you found this read useful. Please feel free to discuss or post question in the comment section below.

I will see you again in the Part 2 of the blog series. Till then Happy Learning and Stay Safe !!

--

--