Build Your Own Processors In NIFI

Tomer Dayan
6 min readOct 2, 2023

--

Apache NiFi, a powerful data integration tool, empowers users to efficiently automate the flow of data between various systems. One of its key strengths lies in its extensibility, allowing developers to create custom processors tailored to specific data processing needs. Writing a custom processor for NiFi in Java provides a gateway to enhance and custom NiFi’s functionality to suit unique data processing requirements. In this medium, we will delve into the essential steps involved in creating a custom processor, enabling you to harness the full potential of NiFi’s data flow orchestration while catering to your specific data manipulation needs.

The initial step in creating a custom Java processor for Apache NiFi involves the creation of a new Java class that extends the AbstractProcessor class. This serves as the foundation for your processor's functionality. By inheriting from AbstractProcessor, you gain access to a wealth of built-in methods and utilities that facilitate communication with NiFi's framework.

public class ExampleProcessor extends AbstractProcessor { }

Relationships

In Apache NiFi, a relationship represents a way for a processor to communicate its outcomes or results to other components within the same data flow. Think of it as a language that the processor uses to tell downstream elements what happened during its operation.

At this stage, we’ll establish two key states: “success” and “failure.” It’s important to note that you have the flexibility to expand this set of relationships to suit your specific processing needs.

public class ExampleProcessor extends AbstractProcessor { 
public static final Relationship REL_FAILURE = new Relationship
.builder()
.description(“Failed processing”)
.name(“failure”)
.build();


public static final Relationship REL_SUCCESS = new Relationship
.builder()
.description(“Succeed processing”)
.name(“success”)
.build();


@Override
public Set<Relationship> getRelationships() {
return new HashSet<>() {{
add(REL_FAILURE);
add(REL_SUCCESS);
}};
}
}

The Processor Logic

The onTrigger method within the AbstractProcessor class plays a central role in custom processor logic. This method encapsulates the core processing functionality that is executed whenever the processor is triggered by incoming data or by a time-based scheduling mechanism.

For example, let’s read the current flow file and add a single property to it. The processSession.get() function returns the input flow file. When it comes to extracting attributes from a flow file, the flowfile.getAttribute() function is the go-to tool. However, when the goal is to update or introduce a new attribute, the processSession.putAttribute() method takes center stage. This method requires parameters including the existing flow file, the new attribute's key, and its corresponding value. Our final step is to transfer it to the next processor using processSession.transfer(), by passing a reflective state (relationship) as a parameter.

@Override

public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
FlowFile flowFile = processSession.get();
String attribute = flowFile.getAttribute(“attribute”); # the attribute you looking for from the flow file
processSession.putAttribute(flowFile, “attribute”, “value”); # update or create attribute that named “attribute” with value “value”
processSession.transfer(flowFile, REL_SUCCESS); # the “end” of the processor
}

The FlowFile Content

Content pertains to the actual data encapsulated within a flow file. This content can range from structured data formats to unstructured documents, and NiFi’s versatile capabilities allow for effortless processing, manipulation, and dissemination of this content as it traverses through the data flow pipeline.

Let’s write two simple functions that help us handle the content functionality: The extraction of content (for instance, represented as String) and its subsequent transformation within a flow file:

private String getContent(FlowFile flowFile, ProcessSession session) {
final var byteArrayOutputStream = new ByteArrayOutputStream();
session.exportTo(flowFile, byteArrayOutputStream);
return byteArrayOutputStream.toString();
}


private void updateContent(FlowFile flowFile, ProcessSession session, String content) {
InputStream inputStream = new ByteArrayInputStream(content.getBytes());
session.importFrom(inputStream, flowFile);
}

Processor’s Properties

In NiFi, processor properties refer to configurable settings that allow users to customize the behavior and functionality of data processing components.

Here’s an example showcasing how to add a basic custom property into a NiFi processor.

public static final PropertyDescriptor EXAMPLE_PROPERTY = new PropertyDescriptor
.Builder().name(“example_property”)
.displayName(“Example Property”) # how it display in the UI
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.build();


@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return new ArrayList<>() {{
add(EXAMPLE_PROPERTY);
}};
}

How to use this ‘EXAMPLE_PROPERTY’ property:

@Override
public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
// code …
var exampleValue = processContext.getProperty(EXAMPLE_PROPERTY).getValue();
// rest of the code …
}

How To Test It

The moment we’ve all been waiting for: writing tests. When it comes to ensuring the reliability of a NiFi processor written in Java, testing plays a vital role. Through rigorous unit testing, we can verify the processor’s functionality and handle various scenarios, making certain it performs as intended.

To begin, we instantiate our newly developed processor and a corresponding TestRunner. Property initialization is achieved through the invocation of the testRunner.setProperty() function. In preparation for executing the test, encompassing both flow files and their contents, we construct a flow file map along with content. Employing the testRunner.enqueue function, we input the content bytes and flow file map. The desired output flow file is then retrieved using the testRunner.getFlowFilesForRelationship() function with the desired state as a parameter, such as REL_SUCCESS from the preceding examples.

public class ExampleProcessorTest {

@Test
public void testAddingAttributesNifiRunner() {
CustomProcessor processor = new CustomProsessor();
TestRunner testRunner = TestRunners.newTestRunner(processor);

testRunner.setProperty(“EXAMPLE_PROPERTY”, “VALUE”);
Map<String, String> exmapleFlowFile = // …
String content = // ..

testRunner.enqueue(new ByteArrayInputStream(content.getBytes()), exampleFlowFile);
testRunner.run();

var resultFlowFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
String resultContent = new String(testRunner.getContentAsByteArray(resultFlowFile));
// assertions …
}
}

Full example

Our example involves handling a parameter sourced from the given flow file. The parameter’s value is in JSON format, and our task is to extract each key from this JSON structure and subsequently append it to the same flow file. This task requires implementing the necessary logic to parse the JSON parameter, extract the keys, and integrate them into the existing flow file content effectively.

public class ExplodeJsonAttribute extends AbstractProcessor {
public static final Relationship REL_FAILURE = new Relationship
.Builder()
.description("Failed adding attributes")
.name("failure")
.build();

public static final Relationship REL_SUCCESS = new Relationship
.Builder()
.description("Added attributes to flow file")
.name("success")
.build();


public static final PropertyDescriptor ATTRIBUTE_TO_EXPLODE = new PropertyDescriptor
.Builder()
.name("attributeToExplode")
.displayName("Attribute To Explode")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.build();

public static final PropertyDescriptor EXPLODED_ATTRIBUTE_PREFIX = new PropertyDescriptor
.Builder()
.name("explodedAttributePrefix")
.displayName("Exploded Attribute Prefix")
.defaultValue("")
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.build();

@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return new ArrayList<PropertyDescriptor>() {{
add(ATTRIBUTE_TO_EXPLODE);
add(EXPLODED_ATTRIBUTE_PREFIX);
}};
}

@Override
public Set<Relationship> getRelationships() {
return new HashSet<Relationship>() {{
add(REL_FAILURE);
add(REL_SUCCESS);
}};
}

@Override
public void onTrigger(ProcessContext context, ProcessSession session) {
// read flowFile
FlowFile flowFile = session.get();

// read properties
String attributeToExplode = context.getProperty(ATTRIBUTE_TO_EXPLODE).getValue();
String explodedAttributePrefix = context.getProperty(EXPLODED_ATTRIBUTE_PREFIX).getValue();

// using objectMapper to read the json of attributes we want to 'explode' it's json to the flow file
ObjectMapper objectMapper = new ObjectMapper();

try {
HashMap<String, String> jsonToExplode = objectMapper.readValue(flowFile.getAttribute(attributeToExplode), HashMap.class);
for (HashMap.Entry<String, String> jsonEntry: jsonToExplode.entrySet()) {
if (Objects.equals(explodedAttributePrefix, "")) {
session.putAttribute(flowFile, attributeToExplode + "." + jsonEntry.getKey(), jsonEntry.getValue());
} else {
session.putAttribute(flowFile, explodedAttributePrefix + "." + attributeToExplode + jsonEntry.getKey(), jsonEntry.getValue());
}
}
} catch (IOException e) {
session.transfer(flowFile, REL_FAILURE);
}

session.transfer(flowFile, REL_SUCCESS);
}
}

Example’s Basic Test

To validate this program, we reference the preceding example and proceed to craft the assertion logic. To get started, we set up the flow file and also take care of the property containing the attribute name you want to extract. The flow file contains one attribute that stores JSON, while the other properties are not relevant.

public class ExplodeJsonAttributeTest {

@Test
public void testExplodeJson() throws InitializationException {
ExplodeJsonAttribute explodeJsonAttribute = new ExplodeJsonAttribute();
TestRunner testRunner = TestRunners.newTestRunner(explodeJsonAttribute);

String params = "{\"param1\": \"value1\", \"param2\": \"value2\"}";
HashMap<String, String> attributes = new HashMap<String, String>() {{
put("abc.param", params);
}};

String content = "";
testRunner.enqueue(content, attributes);
testRunner.setProperty("attributeToExplode", "abc.param");
testRunner.run();

FlowFile resultFlowFile = testRunner.getFlowFilesForRelationship(ExplodeJsonAttribute.REL_SUCCESS).get(0);
Map<String, String> resultAttributes = resultFlowFile.getAttributes();

// expected
Map<String, String> attributesWithExploded = new HashMap<String, String>() {{
put("abc.param", "{\"param1\": \"value1\", \"param2\": \"value2\"}");
put("abc.param.param1", "value1");
put("abc.param.param2", "value2");
}};

// assert
for (Map.Entry<String, String> param: attributesWithExploded.entrySet()) {
assertEquals(param.getValue(), resultAttributes.get(param.getKey()));
}
}
}

In Summary

This article’s focus is guiding readers through the implementation of the main logic in custom Java processors. It emphasizes accessing content and flow file attributes within the examples above. Moreover, handle the testing process, detailing the setup of a TestRunner, initialization of properties, and creation of flow file content for rigorous validation.

Good luck writing your own custom processors in Java :)

https://github.com/tomerdayan168/CustomProcessorExample

--

--