Provenance Events in Apache Nifi — Key to Building Observability in Dataflows

Ankit Verma
Capillary Technologies
9 min readDec 16, 2023

--

Introduction:

Efficient data management is at the heart of every successful enterprise operation, and Apache Nifi at Capillary provides a robust platform for orchestrating data flows. In this blog, we dive into the critical Operational aspects like Monitoring, Error Handling, Reporting for Nifi Dataflows. By implementing a standard approach, organizations can enhance the reliability and maintainability of their data processing pipelines. Please read this No-code-way-forward-for-development if you are a beginner in Nifi before starting with this blog.

Nifi is a very robust and well-orchestrated tool that can be used in the implementation of a wide variety of use cases by just connecting a few components in Nifi canvas. This blog is about expanding the operational capabilities of the dataflows.

Like Volume of processing done, Speed of processing, Pipeline status, Error handling, Error Report generation for each processed file of dataflow etc.

Few Terminologies to Understand Before We Dig Deep

  • FlowFile: The FlowFile represents a single piece of data in Nifi. A FlowFile is made up of two components: FlowFile Attributes and FlowFile Content. Content is the data that is represented by the FlowFile. Attributes are characteristics that provide information or context about the data; they are made up of key-value pairs.
  • Connection: A DataFlow Manager creates an automated dataflow by dragging components from the Components part of the Nifi toolbar to the canvas and then connecting the components together via Connections. Each connection consists of one or more Relationships. For each Connection that is drawn, a DFM can determine which Relationships should be used for the Connection. This allows data to be routed in different ways based on its processing outcome. Each connection houses a FlowFile Queue. When a FlowFile is transferred to a particular Relationship, it is added to the queue belonging to the associated Connection.
  • Relationship: Each Processor has zero or more Relationships defined for it. These Relationships are named to indicate the result of processing a FlowFile. After a Processor has finished processing a FlowFile, it will route (or “transfer”) the FlowFile to one of the Relationships. A DFM is then able to connect each of these Relationships to other components to specify where the FlowFile should go next under each potential processing result.
  • Lineage Start Date (lineageStartDate): Any time that a FlowFile is cloned, merged, or split, this results in a “child” FlowFile being created. As those children are then cloned, merged, or split, a chain of ancestors is built. This value represents the date and time at which the oldest ancestor entered the system. Another way to think about this is that this attribute represents the latency of the FlowFile through the system. The value is a number that represents the number of milliseconds since midnight, Jan. 1, 1970 (UTC).
  • Backpressure: Backpressure implies limiting the amount or size of messages that can be queued up. Once the threshold value is reached, the connector doesn’t allow any further flow files or messages to get built up. The sample flow above shows backpressure in action.

To address this issue, Provenance Events emerge as a key component for this purpose. In the following sections, we will delve into a more detailed exploration of what Provenance Events entail and how they can be leveraged to accomplish specific use cases.

What are provenance events?

Each point in a dataflow where a FlowFile is processed in some way is considered a ‘provenance event’. Various types of provenance events occur, depending on the dataflow design. For example,

  • when data is brought into the flow, a RECEIVE event occurs,
  • when data is sent out of the flow, a SEND event occurs.

Other types of processing events may occur, such as

  • if the data is cloned (CLONE event)
  • routed (ROUTE event)
  • modified (CONTENT_MODIFIED or ATTRIBUTES_MODIFIED event)
  • split (FORK event)
  • combined with other data objects (JOIN event)
  • ultimately removed from the flow (DROP event).

Why do we need them?

As these events function similar to logs, they hold great utility in the development of generic systems on top of Nifi. Various systems, such as Custom APM metrics, Error File generation, and more, can be created by leveraging the information encapsulated in these events.

Provenance Events are generated whenever a processor executes to process information. These events are invaluable as they reflect the state of the flow file at each step of processing by a processor. As elucidated earlier, dataflows consist of processors, so when a file is selected for processing, numerous provenance events are generated. These events offer comprehensive insights into every step of the process, providing engineers with valuable information for developing versatile systems capable of working with current and future dataflows.

Fig 1.
Fig 1.

Figure 1 illustrates two use cases where generic events can be employed to generate custom metrics and facilitate report generation.

  • Custom Metrics: Nifi inherently produces a substantial amount of default data, such as JVM parameters and Prometheus events, which can be leveraged to establish effective monitoring. For deeper insights aligned with business requirements, provenance events can be utilized to generate custom metrics.
  • Reporting: In a dataflow operating on a file with multiple records, each record may either succeed or fail. This success-failure information is crucial for business teams to identify records not processed successfully by the system, enabling necessary corrective actions.

Our exploration begins with an understanding of the reporting aspect.

  • When a dataflow initiates processing a file input, it generates flow files that processors sequentially process in the order of their sequence.
  • Each processor performs operations like UPDATE, CREATE, DROP, generating corresponding provenance events. These events are captured using a Nifi tool called SiteToSiteProvenanceReportingTask.
  • Configured to send events to a port connected to the InvokeHttp Processor, it calls an API responsible for pushing these events to RabbitMQ.
  • Given the large number of provenance events, direct processing without an asynchronous layer would create back pressure on Nifi. To mitigate this, a RabbitMQ layer is introduced to ensure events are directly pushed to RabbitMQ without building back pressure.
  • These events can then be processed by the RabbitMQ Listener, which performs two functions: first, it stores the received event in MongoDB, and second, it uses this event to calculate dataflow completion.

Since Nifi dataflows are continuous, lacking a defined end point, determining when all provenance events have been received becomes crucial, it is essential for associating metrics to processing completion. An example illustrating this process involves fetching a file from an FTP server and making API calls to a specified endpoint.

In Fig 2, The ListSFTP processor periodically checks for files in the SFTP directory every 5 minutes, utilizing a cron-based trigger. Upon discovering a file, the FetchSFTP processor retrieves the file from the SFTP server, generating a flow file in the process. Subsequently, the SplitRecord processor divides the input FlowFile, formatted in a record-oriented data format, into multiple smaller FlowFiles. These smaller FlowFiles are then utilized by the OAuthClientProcessor to make API calls.

Fig 2.

In general, each processor within the dataflow pipeline takes input, performs processing, and produces output. For instance, consider the scenario where a file is processed by the SplitRecord Processor, which specifically splits an input FlowFile, formatted in a record-oriented data format, into multiple smaller FlowFiles. Below it is shown with a sample file

Fig 3.

So this will take 1 flow file as input which is shown in Fig 3. And will produce 10 flow files as output. So the provenance event generated by SplitRecord Processor will look like Fig 4.

Fig 4.

Fig 4 shows the FORK provenance event generated by SplitRecord processor, here we are interested in parentUuid and childUuid which is showing flow-file ids of input and output flow-files. So in this case we have one flow file as input and ten flow files as output.

Now let us devise an algorithm to check for data-flow completion, after which error file generation can happen. We will be using childUuid (In Fig 4) to calculate no of output flow files. Lineage start time is constant for a flow-file hence all the provenance events of a flow-file can be tracked using Lineage start time. Which is helpful in tracking all events of an input file.

We have arranged each processors provenance events into two categories either a success or failure

We use these events to calculate provenance Events count.

Each processor in Fig 2. Generated provenance events which can be monitored as deprecated in below steps

  1. ListSFTP processor generates one CREATE event when a file is found, hence success is incremented to 1 and as it is the first processor of the block so total is also incremented to 1. And it searches for next processor Which is FetchSFTP and increments its total by childUuids size which is 1
  1. FetchSFTP processor generates one FETCH event when a file is fetched, hence success is incremented to 1 and it searches for next processor which is SplitRecord and its total by childUuids size which is 1
  1. SplitRecord processor generates one FORK event (Fig 4) when a file is fetched, hence success is incremented to 1 and it searches for next processor which is OAuthClientProcessor and its total by childUuids size which is 10
  1. The OAuthClientProcessor generates 10 DROP events when a flow-file is fetched. This is the last processor of the dataflow, so we terminate the success relationship. Which generates DROP events and processes them, hence success is incremented to 10 and it searches for the next processor which it does not find.

How to calculate the percentage? Percentage = ((Success + Failed) / Total) * 100. And if the total is 0 it will be evaluated as 0%. Fig 5. Shows how this percentage is being used to evaluate completion of data-flow.

Fig 5.

Currently, the Error File Generation cron is in operation, overseeing the data to ensure 100% completion for all processors in the data flow. If all processors achieve 100% completion, an error file is generated using the data saved from provenance events. If any processors are not fully completed, the cron checks for this in the subsequent cycle. And it uses lineageStartTime to only take provenance events of a particular flow-file. Considering that large files may take time to complete, it is important to note that provenance events might also take time to accumulate fully and may not be entirely available within a single cron cycle.

In Fig 6. Presenting a few examples of observability stacks that can be constructed atop dataflows.

Fig 6.

Conclusion

  • Provenance events are a very useful tool in creating generic systems for Nifi clusters. They are capable of solving a wide variety of use cases if used properly.
  • Things like understanding of Nifi backPressure are critical in designing scalable systems which are capable of solving a variety of problems without inducing any side effects.
  • Nifi is a continuous flow, which makes some of the obvious problems tricky to solve. As we have seen just for checking the flow-file is completed we had to write an algorithm.

--

--