In this article, I’ll present the problem with having large amount of data processed in our NiFi data flows regularly — while being bounded by IOPS. Since NiFi is IOPS intensive, this issue can become even worse when running over containers and using NFS. Moreover, we’ll talk about keeping our data flows monitored, without increasing dramatically the IOPS. We’ll design a new architecture for NiFi clusters and monitoring — running NiFi with its repositories over RAM (and by that also boost our data flows performance), while still being able to monitor it, with having any kind of IOPS bottleneck.
Apache NiFi is an easy to use and powerful system to process and distribute data. Using the large variety of already provided processors (and the ability to create new ones as we wish), we can build powerful data flows for extracting data from sources, transform it as we wish, and push it to a different endpoint.
Using NiFi, we can stream huge amounts of data constantly –hundreds of GB (or even TB), either by creating a few “heavy” data flows or by creating thousands of data flows. Both can be done easily with NiFi.
When getting into big data technologies and specifically data flows systems, one cannot ignore one of the main issues — reliability. We must trust our system to get all the data from our sources, transform it exactly as we wish, and send it to the correct output systems. Besides NiFi being a highly reliable technology, we’d like to make sure that the data flows that we created work as we wanted, and that our data won’t be gone somewhere in our data flows, either because of a bug in our data flow, or problems with specific NiFi nodes (crashed, disconnected, etc..).
To allow us monitoring our data flows, NiFi have created the Provenance Repository. Whenever a processor finish processing a flow file, it will produce a Provenance event, a log that states what happened with that specific flow file — whether it was sent to external process, cloned into another flow file, its content was modified, etc. The Provenance Repository is an internal Lucene that comes with the NiFi itself.
As for NiFi 1.12.0, there are 2 Reporting Tasks regarding Provenance Events — AzureLogAnalyticsProvenanceReportingTask and SiteToSiteProvenanceReportingTask. In this article I’ll speak only about the SiteToSiteProvenanceReportingTask, as it is more widely used.
The SiteToSiteProvenanceReportingTask allows us to send the Provenance Events to another instance of NiFi. In the second NiFi instance, we can open an input port and do whatever we’d like with the Provenance events. Basically, creating a data flow for ingesting our provenance events from the previous instance of NiFi, that contains our main data flows.
If you’re asking yourself why not using the SiteToSiteProvenanceReportingTask to send the Provenance Events to the same instance of NiFi, where our main data flows are, remember that this means you’ll use the same resources (CPU, RAM and disk) for processing your data and your Provenance Events. Separating it for 2 instances, we know that the Provenance events won’t have any kind of impact over our main data flows.
In our Provenance events data flow, on the second instance of NiFi, we can add any further processing as we wish, and send it to any other data sink.
So, we have our first NiFi cluster, with our main data flows, sends its Provenance data to another cluster of NiFi, that process our events and send it to an endpoint of our choice, allowing us to have advanced investigation of our data flows and the data ingested through it.
The IOPS Issue
Leaving our Provenance Events NiFi cluster aside, let’s look at our main cluster of NiFi, that process TB of data a day. Every processor that reads/modify/write the flow files content, will cause a disk IO operation. Having thousands of data flows, that process thousands of flow files concurrently, we’ll end up with thousands of IO operations per second (“IOPS”).
Moreover, in this era of docker and containers, if you’re like me and you run NiFi over Kubernetes, and the storage is actually NFS, having large amount of IOPS become significantly worse — we’re now dependent on our network for every disk operation.
Having heavy and huge amount of IOPS, will force us getting better machines for our NiFi and spend more money just because we process more data.
As you can understand, the main problem is within the Content Repository, where every flow file content is kept. However, NiFi comes already packed with a multiple implementations of the Content Repository — for both persistent (on disk) and ephemeral (on RAM).
The implementation we wish to use can be configured in the nifi.properties file, under nifi.content.repository.implementation. The default value is FileSystemRepository, and we’d like to use VolatileContentRepository. It also make sense to use the VolatileFlowFileContentRepository, since if our content is delete, there is no reason for the Flow File repository to be persistent — it’ll keep the metadata of already deleted content.
By changing the content repository implementation, we can encounter new problems, such as Content Repository out of space error (as we’d probably have less RAM than disk), JavaHeapSpace, etc. Thus, when using the VolatileContentRepository, we should configure 2 new properties — nifi.volatile.content.repository.max.size and nifi.volatile.content.repository.block.size. Configuring these properties according to the resources we can provide to our NiFi cluster, will prevent us from having the errors mentioned above.
By changing the content repository from disk-based to RAM-based, we have not only dramatically reduced the IOPS that NiFi causes, but have also improved our data flows performance, as obviously RAM operations are much faster than disk operations (and even faster, when running over containers, and the operations are based on network). One might say that we need now more expensive RAM instead of SSD/check Hard Disk, however since we have the performance boost, and our data won’t remain in our content repository for so long, we don’t need as much RAM as disk we needed for disk.
For example, If I ran a NiFI cluster over Kubernetes, with total of 7 nodes of 500 GB HDD, 32 GB RAM and 12 CPU cores, I could now have a cluster of 3 nodes, of 50 GB HDD, 50 GB RAM and 8 CPU cores. The reduction of CPU comes from the better performance of the RAM Content Repository. So we can have a smaller cluster, with less resources (besides a bit more RAM), and I’ve managed to reduce the IOPS from around 8,000 per node to around 400. Of course, beside the save in costs of resources, we’ve also boosted our data flows performance, allowing our NiFi cluster to process about ~5 times more data a day.
Regarding the IOPS, in fact, the only IOPS remain in our NiFi cluster is the Provenance Repository (which could be also volatile, but as mentioned before, we used it to monitor our data flows, so we’d like to keep it persistent).
However, we face now 2 new problems:
1. Our NiFi is not reliable anymore — when our content repository was on disk, we could be sure that even if a NiFi nodes crashes, our data will remain forever (at least up to disk corruption). Now, that our content on RAM, our data will be deleted.
2. Remember our second NiFi cluster, for ingesting Provenance events? Now, when our first cluster can process much more data per second, it will produce even more Provenance events, our second cluster will cause even more IOPS.
Let’s solve both problems:
1. As mentioned before, reliability is probably one of the most important issues when getting into Big Data. Moving to Volatile Content Repository lowered the reliability of our data flows. The thing is, NiFi is a stream data flows system. This means that the data we process is ingested into our flows from different sources. If we could simply reflow the data on demand, it would solve it — whenever a NiFi node crashes, we just need to reflow the data. If that’s not the case, we need to have some kind of “Store First” before sending the data to our NiFi cluster. This could be done, for example, by having Kafka topics between the sources and our NiFi — and configure the Kafka to high retention time, enough for us to re consume the data from Kafka in case of a node crash.
If we cannot control how we get the data from the sources, we could have another instance of NiFi, again a persistent one, with a very short flows — listen to the sources as we always to and publish to Kafka. Since we only need to write once and read once each message, and the rest of the processing flow will be on our Volatile Content NiFi, we might have slightly more IOPS, but not as much as we had before. For example, if we’ve had a single flow with 10 processors that each one needs to read/write the flow file content, we’ll have now 10 times less multiplied by the amount of flow files processed by that flow. Now multiple it by every processor that causes IOP, and you get can understand what we’ve done here.
2. After changing the first cluster to use Volatile Content Repository, we have reduced significantly the IOPS. However, we solved only half of the IOPS problem, as we have the second NiFi cluster. When having a boundary of allowed IOPS, we cannot allow ourselves having the high amount of IOPS that the Provenance Events cluster does.
We could also change the second cluster to use Volatile repository as well, but the Provenance Events are used to verify hermeticity of our data, so we must have reliable flows for ingesting the Provenance Events. Don’t get me wrong — the data we process is obviously much more important than the Provenance Data, but when we’re using the Volatile Content Repository, we can hold less data in our NiFi cluster, meaning we might have back pressure on peaks. In that case, the Provenance data can become much more important — we can keep being updated about the delayed data. So, we don’t want to use Volatile repository, and the File System repository is too IO intensive. Thus, we’d like to find a solution for ingesting our Provenance Events without another instance of NiFi. The problem is that NiFi come only with the 2 mentioned above Reporting Tasks. We’ll solve this by writing a new Reporting Task, which consumes Provenance Events and publishes to Kafka topics.
Why Kafka? Because Kafka is a streaming platform built to run using low hardware requirement, while having almost and no IOPS at all. In fact, thanks to the OS Cache page, if our consumer is consuming continuously, we can have now IOPS at all of our data.
Writing this new reporting task wasn’t difficult at all — I simply had to combine the ProvenanceSiteToSiteReportingTask with and PublishKafka processor (with the relevant Kafka client version).
When our Provenance Events are published to Kafka, we can consume it to any other data pipeline or CI/CD systems and send it to any endpoint of our choice. I found that the most suitable tech for my case is Logstash. Logstash allow us to listen to Kafka, process (“filter”) the Provenance Events as we wish (with dedicated filter plugins or anything else we’d like with the Ruby plugin) and send it to where ever we want. In my case, wanted to use the Ruby plugin to flatten any nested JSON, and send it to ElasticSearch. The most important benefit of the Logstash in our scenario, is the fact that the Logstash cause almost and no IOPS at all — when processing the data, it all go through its RAM. Moreover, Logstash is very simple and easy to install and maintain, especially running over Kubernetes. So, we can achieve the processing of our second NiFi cluster, just without the IOPS, and probably with much less resources needed — as the Logstash require barely resources while having great performance. So there is no reason to have as much resources as we would have used for the NiFi cluster.
In conclusion, Using this new architecture, we’ve managed to take a large NiFi cluster, which causes much IOPS, and using NiFi built-in properties combined with IOPS friendly systems to build a reliable, cheaper, monitored and boosted NiFi cluster without much effort at all, that causes barely IOPS, for the Provenance Data used to monitor our flows only.