Optimizing Connect+: Enhancing Efficiency and Predictability with Apache NiFi

Saurav Behera
Capillary Technologies
14 min readJul 8, 2024

Introduction:

Connect+ is a no-code data integration platform developed at Capillary Technologies. It is designed to simplify the onboarding process, reduce go-live time, and standardize our custom data integration process(both ingress and egress) for our customers. Connect+ techstack primarily involves Apache NiFi at its core. The Connect+ interface is intuitive and user-friendly, allowing our customers to start quickly. We offer predefined templates for various tasks, such as customer registration and transaction processing, etc, providing an efficient way for our customers to integrate with our system.

For more information about Connect+, our no-code integration tool, please refer to this blog here.

About:

Apache NiFi is an ideal choice for us due to its powerful and highly configurable dataflow management features. As we provide various templates as an integration solution to our customers, the number of dataflows and dataflow’s complexity increased drastically making our Nifi system very heterogeneous. Gradually it became a significant challenge to maintain optimal performance and ensure predictable behavior in the dataflows.

Some of our dataflows in Connect+ utilize APIs from our CDP platform, while others are CPU-intensive, handling tasks like encryption and decryption. The nature of the Nifi global threads is such that we can’t assign separate thread pools based on task type, such as CPU intensive or Data I/O intensive. Due to this, we faced challenges like ensuring predictable execution times and optimizing the performance of our NiFi cluster. This was particularly difficult when simultaneously running multiple dataflows that depend heavily on either CPU or I/O resources.

In this blog, we have explored the strategies and best practices that have helped us to enhance Apache NiFi’s performance and bring predictability in execution time. We will cover the significance of the key components, fine-tuning its configurations, optimizing resource usage, and leveraging NiFi’s built-in features so that at the end of this blog you will also have a good understanding of how to maximize the efficiency and reliability of your dataflows.

Approach:

Fine-tuning a NiFi workflow depends on many parameters. There is no single solution for every use case and every type of dataflow. A dataflow might process small or large Flowfiles, handle different numbers of events per second, and use different processors with unique characteristics. Also, there is no resource isolation between dataflows running in the same NiFi environment. Therefore, when running multiple dataflows of different types in the same environment, you must fine-tune the configurations of individual processors and the controller to prevent resource misallocation.

Key Components of Nifi:

The initial step in optimizing any system is to understand its nuts and bolts properly. By identifying the critical elements responsible and implementing targeted improvements, we can achieve the desired results for our business needs.

[Fig1. Detailed Diagram Of Flowfile Lifecycle ]

Before deep dive we need to cover some basics, following are the significance of the major components of the Nifi System:

  • Flowfiles and Repositories: Provides fine-grained control over individual data records and enables dynamic routing and transformation of data. Maintains the state of Flowfiles for reliability and recovery. Stores actual data payloads and tracks data lineage for auditing, debugging, and monitoring.
  • Processors: Processors are the execution entity for Nifi which performs specific data processing tasks and allows various configurations to enable flexibility and scaling in dataflow designs.
  • Connections: Proper configuration of connections is vital for efficient data movement and avoiding bottlenecks in the workflow.
  • Controller: It provides a centralized way to manage configurations and resources across multiple processors and dataflows, improving efficiency and maintainability.

Strategies of Optimizations and Improvements:

Based on the above components we have come up with the following strategies.

  • Optimizing Hardware Resources
  • Efficient Dataflow Designs
  • Optimizing Controller and Processor Configurations
  • Optimization through Monitoring and Alerting

Optimizing Hardware Resources:

  • Processor idle time:

The nifi.bored.yield.duration configuration determines how long a component should wait before checking for tasks when it has no work to perform. Lowering this duration can reduce processor latency but will increase CPU load due to more frequent checks. Therefore, this value should be adjusted based on the type of timer-driven processor used in the dataflow.

Example Configuration:

The default value for this setting is 10ms. For example, if using a processor like ListSFTP as a timer-driven processor, it triggers other processors in the dataflow and is scheduled to run every 5 minutes to check if new files have been uploaded for processing. Here, there is no need to keep the default values of 10ms nifi.bored.yield configuration. We can set a value that will impact the desired frequency of the processor by only a maximum of 0.5%-1%. So here you can safely set the bored.yield.duration to 500ms to 1second. It will reduce the CPU usage significantly.

Similarly let’s take another example, if we are using a processor like KafkaConsumerProcessor as a timer-driven processor, and its frequency is to run every 30 seconds. In this case, we can safely set the bored.yield.duration to 150ms — 300ms.

To change this setting, update the nifi.properties file as follows: nifi.bored.yield.duration=500 millis

  • Flowfile Repository Checkpoint Interval:

The checkpoint interval determines how frequently the Flowfile Repository saves its state to disk. This process involves writing Flowfile metadata to disk, consuming CPU and I/O resources. Setting the interval too short can lead to unnecessary resource usage while setting it too long can increase recovery times.

Example Configuration:

Let’s take an example of two dataflows, in one EncryptionProcessor is primarily used and in the other SplitFileProcessor is used to split the Flowfiles into multiple small Flowfiles and run some custom transformation on these. As the EncryptionProcessor will take more time to complete on large Flowfiles, here saving the state at a higher frequency is not necessary. If our encryption on flowfiles takes an average of 15mins then we can set the checkpoint interval to 5 minutes.

In the case of the other dataflow as the custom transformations are run on much smaller flow files and the execution time would be relatively quick, here the checkpoint interval should be set to a lower value like 30 sec to 1 minute.

In our case we have a variety of dataflows where the completion time ranges from 5 minutes to 5 hours and flowfile size can vary from a few KBs to hundreds of MBs. In our case, we have set the checkpoint interval to 2 minutes by observing the average time of execution of all our dataflows.

To change this setting, update the nifi.properties file as follows: nifi.flowfile.repository.checkpoint.interval=2 mins

  • Content Repository Retention Period:

Retaining content for a longer period ensures that data is available for reprocessing or analysis if needed may be for debugging and compliance purposes. By setting an appropriate retention period, you can balance data retention needs with the availability of system resources.

Example Configuration:

If your data flows involve processes where content is needed only for short-term use, you can set a shorter retention period to conserve storage space. And if long-term data retention is required for compliance or auditing, you can set a longer retention period.

To change this setting, update the nifi.properties file as follows: nifi.content.repository.archive.max.retention.period=48 hours

  • Provenance Repository Index and Query Threads:

Configuring the Provenance Repository Index and Query Threads appropriately ensures efficient indexing and querying of provenance data. This is essential for maintaining a balance between system performance and the ability to effectively track, audit, and troubleshoot data flows in NiFi.

Example configuration:

Suppose your NiFi instance handles a high volume of data with a large deployment size and requires frequent provenance queries, you might configure a higher number of index and query threads to maintain optimal performance. Following is a rough guideline on the recommended threads.

Medium Volume, Standard Hardware: 2–4 Threads

High Volume, High Hardware: 4–8 Threads

Very Large Volume, Large Scale Hardware: 8–16+ Threads

To change this setting, update the nifi.properties file as follows: nifi.provenance.repository.index.threads=4 nifi.provenance.repository.query.threads=4

Note*:

As all these settings will be applied at the Nifi Level, it will be more correct and easy to find these configuration settings when you are dealing with similar types of dataflows in your Nifi Instance/Cluster. In practice, a lot of scenarios will lead to a heterogeneous system with dataflows varying from different processing types, execution times, and varying Flowfile sizes. It will be difficult to find the optimum configuration settings that will be efficient in every use case. So one way to solve this would be to group our dataflows into different Nifi clusters based on the type of workload they handle. This will enable us to find the right configurations for individual Nifi clusters.

Efficient DataFlow Designs:

Avoid Data Duplication:

  • Unique Identifiers: Ensure that each Flowfile has a unique identifier (UUID) and use this to check for duplicates. Use NiFi’s DetectDuplicate processor, which can identify duplicate Flowfiles based on attributes.
  • Idempotent Operations: Design processors and dataflows to be idempotent, meaning that applying the same operation multiple times does not change the result beyond the initial operation.
  • State Management: Use the UpdateAttribute and DetectDuplicate processors together to manage the state and track which Flowfiles have already been processed.

Example configuration:

Configure the DetectDuplicate processor to use the attributes set in the UpdateAttribute processor to check for duplicates.

<property name=”Unique Identifier” value=”${uuid}-${filename}”/ <property name=”Cache Entry Identifier” value=”myUniqueIdentifier”/>

Use Batching in the Right Place:

Batching can significantly enhance performance and resource utilization in NiFi by reducing the overhead associated with processing individual Flowfiles. Batching needs to be done in the right parts of the dataflow where the preceding processors can take advantage of bulk processing. Selecting the right batch size is crucial for performance as a large value can push the processor or downstream services to the limits. SplitRecord processor can be used for batching, where we can define the batch size, allowing the batched data to proceed further as a single unit.

Transformation in the Right Place:

Placing data transformation at the appropriate stage in your dataflow is crucial for maintaining data integrity and ensuring scalability. Choose the right processor for each specific transformation task, such as ConvertRecordCSV, UpdateRecordJSON, JoltTransformJSON, and ExecuteScript. For example, these processors can transform data in CSV format to JSON payloads for APIs to cater to different business requirements without changing the codebase every time. Keep transformations simple and only transform the data necessary for the next processing step.

Optimizing Controller and Processor Configurations:

Controller Threads Configuration:

In Apache NiFi, thread allocation is managed at two levels: controller thread allocation and processor-level thread allocation. The controller thread pool sets the upper limit for the total number of threads that NiFi can use across all processors, while each processor can be assigned a specific number of threads from this global pool. It’s crucial to ensure that the total number of threads allocated to all processors does not exceed the global thread limit.

Since there are two types of processors, Timer Driven and Event Driven, the controller can allocate different thread configurations for these processor types. Assigning the right number of controller threads shouldn’t depend on the number of processors in a dataflow but on the number of parallel dataflows running and the concurrency configuration defined in the processors.

Example Configuration:

For instance, if you run 3 to 4 dataflows in parallel and some processors have a concurrency of 5, the controller threads should be set to around 10–15.

To change this setting, update the nifi.properties file as follows: nifi.flowcontroller.max.timer.driven.thread.count = 15 nifi.flowcontroller.max.event.driven.thread.count = 1

Processor-Level Thread Configuration:

Processor-level thread configuration is essential for optimizing the performance and efficiency of NiFi dataflows. By allocating the right number of threads to each processor, you can enhance parallel processing, balance resource utilization, and customize performance for specific tasks. This fine-grained control is key to achieving high throughput and responsive data processing in complex dataflow environments. But we need to always make sure that the controller thread configuration must be higher than the processor thread configuration to make this effective.

Example Configuration:

If a processor handles a high volume of small, independent tasks, you might allocate multiple threads to it to process these tasks in parallel.

If our dataflow uses OAuthClientProcessor to generate the OAuth token and make the API calls, we can set the concurrency to 5 threads to utilize the downstream service potential properly. Also, we need to set the controller thread configuration to more than 5 to make this effective.

To change this setting, update the nifi.properties file as follows: nifi.processor.name.concurrency = 5 nifi.flowcontroller.max.timer.driven.thread.count = 7

Back Pressure and Threshold Configurations:

Back pressure and threshold configurations are crucial for maintaining control over data rate flow in NiFi. Back pressure is a strategy that prevents processors and the system from being overloaded by regulating the rate at which data is ingested and processed. This approach effectively manages system resources, ensuring smooth data processing without causing bottlenecks or crashes. These configurations ensure system stability, optimize resource utilization, prevent data loss, and enable the system to handle large data volumes efficiently.

Example Configuration:

In NiFi, back pressure is configured at the connection level, which links different processors in a dataflow. You can set thresholds for two main parameters: the number of Flowfiles and the total data size queued in the connection. When these thresholds are exceeded, back pressure is applied, stopping upstream processors from sending more data until the queue is processed to a manageable level.

To change this setting, update the nifi.properties file as follows: nifi.connections.backpressure.object.threshold = 10000 nifi.connections.backpressure.data.size.threshold = 2 GB

Optimization through Monitoring and Alerting:

Real-Time Performance Insights Using Provenance Events:

We need to regularly monitor key performance metrics such as processor throughput, data flow latency, back pressure status, and resource utilization (CPU, memory, disk I/O). This provides insights into how well your data flows are performing. Combining this information with the provenance events will greatly improve the performance insights. Provenance events are records that capture the lineage of data as it flows through the NiFi system. These events help to understand where data came from, how it was processed, and where it went.

So using these insights along with the required business commitments we can find the bottlenecks and fine-tune accordingly.

Monitoring & Alerting Setup:

Utilize NiFi’s built-in monitoring tools and configure the Prometheus Reporting Task to export metrics to a Prometheus server. Set up Grafana dashboards to visualize these metrics, providing real-time insights into your NiFi instance’s performance. In Grafana, configure alert thresholds to send notifications via channels like email, Slack, etc when critical metrics, such as high CPU usage or back pressure buildup, exceed predefined limits.

Example Dashboards:

Early Issue Detection:

We can set up alerts to notify us of potential issues before they escalate. For example, we can configure alerts for high processor latency, excessive back pressure, or resource exhaustion. These are the symptoms that will lead to issues later.

We can set up Alerts based on Nifi’s bulletin events to notify us of any runtime issues during dataflow execution.

Example of Nifi Bulletin-based Slack Alert:

Problem Resolution Through Automated Action:

By setting up automated detection and resolution mechanisms in NiFi, we can ensure that our dataflows are more resilient and require less manual intervention. Utilizing NiFi’s processors and reporting tasks we can create robust error-handling and recovery workflows.

Following are some of the example use cases:

  • Use processors like RetryFlowFile to enable the retry mechanism in dataflow.
  • Use the RouteOnError processor to route failed flowfiles to an error-handling path where automated actions can be applied.
  • Use the ExecuteScript processor to run custom scripts (in Python, JS, etc.) to handle complex error scenarios.
  • Use the PutFile or PutSFTP processors to move problematic files to a specific area for manual review if they cannot be automatically corrected.
  • Use the UpdateAttribute processor along with NiFi expression language to automatically correct common issues, like missing fields, etc.
  • Use provenance events to generate ErrorFile for reporting and easy debugging. Here is a Medium blog post dedicated to this.

Capacity Planning & Forecasting Time Consumptions:

Capacity planning is a broad topic, but we will cover the essentials briefly. Effective capacity planning in Apache NiFi needs a good understanding of the workload. And analyzing historical performance data helps identify trends and anticipate future resource requirements.

Following are some key points for Understanding the Workload:

Data Volume: Determine the amount of data currently being processed. Estimate how much data volume can grow over time.

Data Velocity: Rate of data ingestion to Nifi and capability of data ingestion of downstream services if any.

Data Variety: Type of data like Transactional, Logs, Binary files, etc

Nifi Dataflow Type: What type of operations will be performed on the data? Is it CPU intensive or I/O intensive? Can these operations be parallelized? Can Batching be done during the Processing? etc

Establishing a Baseline: What are the baseline estimates with the current data load?

By answering these questions you will be able to have a good understanding of your Workloads and you will be able to create a better plan accordingly.

An Example of Forecasting Time Consumptions:

We will take an example of dataflow where we are sending transactional data to our platform through Nifi dataflow using our APIs in OAuthClient processor. Here input are the files containing the data in .csv format and we will be doing some transformations on these to create the required payload. Here the average latency of the POST call API is about 250ms. So in a single thread, we are able to do 3 to 4 POST calls in a second. We are taking the average value (3.5) here to compensate for the other delays and latency in the system. So in a minute, we are able to make 210 calls, we are rounding this up and taking 200 calls in a minute. Following is an approximate calculation for the time consumption.

Input Parameters:

Dataflow Number, D = 3 [ This is the number of dataflow we need to run in parallel ]

Filesize, F = 3,20,000 [ This is the number of transactional data entries present in the files. This number will be taken in thousands. ]

Datarate per Minute, DR = 200 [ The average number of POST calls by a thread in a min. ]

Ratelimit(2000), R = 2 [ This is the number for the ratelimit of Downstream services. This number will be taken in thousands. ]

Processor Threads, PT = 5 [ This is the number of threads assigned to OAuthClient Processor. ]

Controller Thread, CT = 15 [ This is the number of threads assigned to Controller. ]

Batching, B = 15 [ This is the batching value for the payloads. ]

Output Parameter:

Time, T = ?

Calculations, Formula:

T = ( [ D / min( R, (CT/PT) ) ] / B ) * ( F / (DR*PT) )

time = ( ( least integer function[ D / smallest value in between(R, CT/PT) ] / B ) * f ) minutes

T = ( [ 3 / min(2, (15/5) ) ] / 4 ) * ( 320000 / (200 * 5)

=> ( [ 3 / 2 ] / 15 ) * 320

=> ( [1.5] / 15 ) * 320

=> ( 2 / 15 ) * 320

=> 43 mins

It will take approximately 43 mins to send 3 files with 320k entries each using 3 parallel dataflows with batching of 15. Here if we want this task to be completed within 30 minutes, then using the same formula as above we can calculate what will be the values for processor threads, controller threads, batching size, and downstream service rate-limit capacity. This type of planning will bring visibility and predictability to our system.

Conclusion:

By understanding our dataflow workload type and getting a better understanding of the different Nifi configurations available, and utilizing the provenance events and bulletin messages generated by Nifi, we were able to find bottlenecks and optimize our dataflows accordingly. Key strategies such as fine-tuning hardware resources, designing efficient dataflows, and optimizing controller and processor configurations have significantly enhanced our system’s performance. Monitoring and alerting mechanisms have provided real-time insights, enabling us to proactively manage any potential bottlenecks.

For those embarking on similar optimization journeys with Apache NiFi, understanding your specific workload characteristics and aligning configurations accordingly will be crucial. By considering the strategies outlined in this blog, you can try to maximize the efficiency, stability, and scalability of your dataflows, ultimately driving greater value for your organization.

Co-author: SHASWAT RANJAN

--

--

Saurav Behera
Capillary Technologies

Tinkerer at heart, I delve into discussions about AI & Data Science, leveraging it to build cutting-edge products and managing AI/ML pipelines in production.