Guidelines for uploading HDFS FSimage into Hive for monitoring and analytics

Boris Murashin
X5 Tech
Published in
9 min readFeb 7, 2023
Image generated via Midjourney

Hello Everyone! My name is Boris Murashin, and I am a system architect responsible for bigdata platform development at X5 Tech. In this article, I will talk about my experience with a Hadoop cluster: I will show how using a third-party library, I managed to implement fast upload of an HDFS file system image into Hive. And not just that. I hope this guide will help you save a lot of time dealing with the cluster.

How much disk space do Hive tables use in HDFS? Which of them have lots of small files? How do these numbers change with time? What’s happening in users’ home directories? Who is creating a table with partitioning by timestamp right now and will soon put the NameNode down with a GC pause? Let’s figure it out!

The foundation of the Hadoop cluster is its HDFS file system. Sooner or later, system administrators will have to monitor its performance. Even experienced and responsible users can forget a million files here and there. For a large cluster, with thousands of active users, including probationary employees, contractors, and managers, proper monitoring is indispensable.

Cluster users usually don’t keep track of the number and size of their files — this is understandable, since basically it’s not their job. And they can easily create tables with an average file size of, say, 100Kb, when they could have it 10Mb easily. For NameNodes, that means 100 times the Heap size and 100 times the number of RPC requests.

Quite often, we see people with RDBMS background (Oracle, GreenPlum, etc.), who come and do partitioning by three fields with 1000 unique values in each. In this case NameNode is trying to create 1000³ folders — and even if a GC pause doesn’t get it down when these folders are created, this will definitely happen when they are deleted.

What are the options?

So, what out-of-the-box HDFS monitoring options do we have? Not that many, actually:

1.

hdfs dfs -count -v /apps/hive/warehouse/*/* /user/* — with this command we can see what is going on in the Hive storage, home directories, etc. This is a completely viable option. We cannot build a distribution by file size — since we will not see tables where there are several large and a bunch of small files / empty partitions, but we will be able to evaluate the average size of CONTENT_SIZE / (DIR_COUNT + FILE_COUNT) and find the ‘industry leaders’.

If NameNodes’ heap is small, this will work rather quickly. I have 130 million files in HDFS, 70 million files in Warehouse, and the Heap of 90Gb — this command forces the JVM to look through the half of its heap, which, frankly, is not fast (in the screenshot above it took more than two minutes).

In addition, the execution of this command results in the higher RPC latencies in the NameNode — this means that the NameNode slows down, which is not great at all.

2.

hdfs oiv – Offline Image Viewer. It allows to run a truncated version of the NameNode and can export the image to XML or CSV, which can be fed into Hive directly.
A solution we looked for? Not exactly. Of course, this way we avoid loading the active NameNode with extra work and it opens up a direct path to analyzing the FSImage with SQL. But oiv works in a single thread, and its very slow with larger images. For instance, it takes 20 minutes to load a 14Gb FSImage, and an hour to export it to CSV:

Still, there’s room for improvement: it would be nice if it takes just minutes and leverages the power and flexibility of SQL for analysis. The HFSA library developed by Marcel May allows to make it happen. It uses part of the Apache Hadoop code to load the FSImage and implements multi-threaded image traversal.

Another great thing about HFSA is that it has Hadoop 3.3 for dependencies, which supports Java 11. The difference in loading speed of large FSImages compared to Java 8 is dramatic — 70 seconds versus 15 minutes for a 14Gb image:

By the way, marcelmay also has an exporter for Prometheus based on this library, which collects statistics by users/groups/directories, including the number of files, folders and size distribution. In some cases this may be sufficient.

Our own solution

I also wanted to be able to find abandoned tables, which haven’t been accessed for read or write for at least six months. Besides, we use Zabbix for monitoring at X5 Tech. So, I opted to build my own image handler that supports saving to Hive.

We need to make a short digression to explain the choice of Hive. Ranger (a tool used for managing and auditing access to a cluster) saves audit logs to HDFS in JSON format, which allows to conveniently hook them up as an external table in Hive without additional effort:

CREATE TEMPORARY EXTERNAL TABLE monitoring.ranger_log_hiveServer2_20221025_tmp
(
repoType INT,
repo STRING,
reqUser STRING,
evtTime STRING,
access STRING,
resource STRING,
..................
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES ('ignore.malformed.json' = 'true')
LOCATION '/ranger/audit/hiveServer2/20221025';

CREATE TEMPORARY TABLE monitoring.ranger_log_hiveServer2_20221025_tmp2
STORED AS ORC
AS SELECT * FROM monitoring.ranger_log_hiveServer2_20221025_tmp;

INSERT OVERWRITE TABLE monitoring.ranger_log_hiveServer2 PARTITION (\`dt\`=20221025)
SELECT * FROM monitoring.ranger_log_hiveServer2_20221025_tmp2;

Besides, these logs are quite large — about 700Gb per day in JSON, 35Gb in ORC with Snappy compression. If you want to store the logs, say, for 4 years and process them fast enough, Hadoop cluster and Hive are good fit.

Information from FSImage becomes a valuable addition to the Ranger audit data: if we look, for example, at user activity on a table, it’s good to immediately have figures on the number, size, and age of the files in it.

However, parsing the Ranger logs and building corresponding data marts (the raw logs cannot be given to end users, since they contain the full text of requests with all the confidential information that may be contained there) is a topic for a another article.

So, we will save the image in the following table:

CREATE TABLE IF NOT EXISTS monitoring.fsimage
(
Path string COMMENT 'Full path of the file or directory.',
Replication int COMMENT 'Replication factor.',
ModificationTime bigint COMMENT 'The date of modification.',
AccessTime bigint COMMENT 'Date of last access.',
PreferredBlockSize int COMMENT 'The size of the block used.',
BlocksCount int COMMENT 'Number of blocks.',
FileSize bigint COMMENT 'File size.',
NSQUOTA bigint COMMENT 'Files+Dirs quota, -1 is disabled.',
DSQUOTA bigint COMMENT 'Space quota, -1 is disabled.',
Permission string COMMENT 'Permissions used, user, group (Unix permission).',
UserName string COMMENT 'Owner.',
GroupName string COMMENT 'Group.'
)
PARTITIONED BY (Parsed int)
STORED AS ORC;

This structure allows us to save an image to a new partition each time, without affecting current requests. And of cause, it allows to collect the history.

And here is the parser (an excerpt, full code is available by the link below), which writes two dozen ORC files in parallel:

// Traverse file hierarchy
new FsVisitor.Builder()
.parallel()
.visit(fsimageData, new FsVisitor(){
Object lock = new Object();

@Override
public void onFile(FsImageProto.INodeSection.INode inode, String path) {
// get file path, permissions and size
String fileName = ("/".equals(path) ? path : path + '/') + inode.getName().toStringUtf8();
FsImageProto.INodeSection.INodeFile f = inode.getFile();
PermissionStatus p = fsimageData.getPermissionStatus(f.getPermission());
long size = FsUtil.getFileSize(f);

// check if orcWriter is already created in this thread
int threadId = (int) Thread.currentThread().getId();
int threadIndex = ArrayUtils.indexOf(index, threadId);
Writer orcWriter = null;
VectorizedRowBatch batch = null;
if (threadIndex == -1) {
// if not – create a new orcWriter
try {
synchronized (lock) {
orcWriter = OrcFile.createWriter(new Path(args[1] + threadId + ".orc"),
OrcFile.writerOptions(conf)
.compress(CompressionKind.SNAPPY)
.setSchema(schema));
batch = schema.createRowBatch();

orcWriters[next] = orcWriter;
orcBatches[next] = batch;
index[next] = threadId;
next++;
}
}
}
else
{
// if it is created, we use the existing orcWriter
orcWriter = orcWriters[threadIndex];
batch = orcBatches[threadIndex];
}

int row = batch.size++;

// write a new row to batch
((BytesColumnVector) batch.cols[0]).setVal(row, fileName.getBytes(StandardCharsets.UTF_8));
((LongColumnVector) batch.cols[1]).vector[row] = f.getReplication();
// and so on – ModificationTime, AccessTime, PreferredBlockSize, NsCo NsQuota, DsQuota, Permission, User, Group – см. полный исходник на github по ссылке далее
...........................................................................

// if the batch size reaches the limit, we add it to orcWriter and reset it
if (batch.size == batch.getMaxSize()) {
try {
orcWriter.addRowBatch(batch);
}
batch.reset();
}

}

@Override
public void onDirectory(FsImageProto.INodeSection.INode inode, String path) {
// more or less the same as in onfile(), see the full source code on github at the link below
..
}

}
);

The full source can be downloaded here.

The compiled .jar and the necessary native libraries (libhadoop.so, libhdfs.so, libsnappy.so, …) can be found here.

Besides, native Hadoop libraries can be taken from the official archive in the lib/native folder. The Snappy library can be found on any cluster node — the specific location will vary for each distribution, for HDP it is in /usr/hdp/current/hadoop-client/lib/native/

Results:

It takes about a minute to fetch the actual image from the NameNode (hdfs dfsadmin -fetchImage). Then one more minute to load the image (~130 million files, 14Gb) into memory and another 3 minutes to write ORC files.

The resulting .orc files are written into a new partition — this takes about a minute more:

timestamp=$(date +"%s");
hdfs dfs -mkdir /apps/hive/warehouse/monitoring.db/fsimage/parsed=${timestamp}
hdfs dfs -put ./orc/fsimage*.orc /apps/hive/warehouse/monitoring.db/fsimage/parsed=${timestamp}

/usr/bin/beeline -u "jdbc:hive2://hive-jdbc-url" -e "MSCK REPAIR TABLE monitoring.fsimage"

And after about 6 minutes, the actual FSImage is in Hive.

Image analysis

So, what are the benefits that we get from all this?

We can get a distribution of last access time to files in tables, home directories and other folders by users:

SELECT
username,
REGEXP_EXTRACT(path,'(\/apps\/hive\/warehouse\/[^\/]+)|(\/user\/[^\/]+)|(\/some\/folder\/[^\/]+)|(\/[^\/]+)',0) AS folder,
COUNT(1) AS 30days,
SUM(IF (accesstime < UNIX_TIMESTAMP(CURRENT_TIMESTAMP() - INTERVAL '60' DAY)*1000, 1, 0)) as 60days,
SUM(IF (accesstime < UNIX_TIMESTAMP(CURRENT_TIMESTAMP() - INTERVAL '90' DAY)*1000, 1, 0)) as 90days,
SUM(IF (accesstime < UNIX_TIMESTAMP(CURRENT_TIMESTAMP() - INTERVAL '180' DAY)*1000, 1, 0)) as 180days,
SUM(IF (accesstime < UNIX_TIMESTAMP(CURRENT_TIMESTAMP() - INTERVAL '360' DAY)*1000, 1, 0)) as 360days
FROM
monitoring.fsimage
WHERE
accesstime > 0
AND
accesstime < UNIX_TIMESTAMP(CURRENT_TIMESTAMP() - INTERVAL '30' DAY)*1000
AND
parsed IN (SELECT MAX(parsed) as max_parsed FROM monitoring.fsimage)
GROUP BY username, REGEXP_EXTRACT(path,'(\/apps\/hive\/warehouse\/[^\/]+)|(\/user\/[^\/]+)|(\/some\/folder\/[^\/]+)|(\/[^\/]+)',0)
WITH ROLLUP
ORDER BY username, 360days;

A distribution of file sizes in tables/folders by users:

SELECT
username,
REGEXP_EXTRACT(path,'(\/apps\/hive\/warehouse\/[^\/]+)|(\/user\/[^\/]+)|(\/some\/folder\/[^\/]+)|(\/[^\/]+)',0) AS folder,
SUM(IF (filesize < 131072, 1, 0)) as 128Kb,
SUM(IF (filesize < 262144, 1, 0)) as 512Kb,
SUM(IF (filesize < 2097152, 1, 0)) as 2Mb,
SUM(IF (filesize < 8388608, 1, 0)) as 8Mb,
COUNT(1) AS 16Mb
FROM
monitoring.fsimage
WHERE
filesize < 16777216
AND
parsed IN (SELECT MAX(parsed) as max_parsed FROM monitoring.fsimage)
GROUP BY username, REGEXP_EXTRACT(path,'(\/apps\/hive\/warehouse\/[^\/]+)|(\/user\/[^\/]+)|(\/some\/folder\/[^\/]+)|(\/[^\/]+)',0)
WITH ROLLUP
ORDER BY username, 128Kb

A list of databases/tables with last access/modification time, number of files and folders, and total size:

SELECT
MAX(accesstime) AS last_accessed,
MAX(modificationtime) AS last_modified,
SUM(filesize) as size,
SUM(IF (accesstime = 0, 1, 0)) as dirs,
SUM(IF (accesstime > 0, 1, 0)) as files,
folder,
REGEXP_EXTRACT(folder, '\/apps\/hive\/warehouse\/([^\/]+)\.db', 1) AS db,
REGEXP_EXTRACT(folder, '\/apps\/hive\/warehouse\/[^\/]+\.db\/([^\/]+)', 1) AS tbl
FROM (
SELECT
accesstime,
modificationtime,
filesize,
REGEXP_EXTRACT(path,'(\/apps\/hive\/warehouse\/[^\/]+)|(\/user\/[^\/]+)|(\/some\/folder\/[^\/]+)|(\/[^\/]+)',0) AS folder
FROM
monitoring.fsimage
WHERE
(SUBSTR(path, 0, 20) == '/apps/hive/warehouse' or substr(path, 0, 5) == '/user' or substr(path, 0, 13) == '/some/folder')
AND
parsed IN (SELECT MAX(parsed) as max_parsed FROM monitoring.fsimage)
) t
GROUP BY folder

And so on and so forth — it all depends upon your enough imagination and the knowledge of SQL.

The results of such requests are sent to PostgreSQL, and then presented in Grafana:

Conclusion

Using a third-party library, I implemented fast upload of an HDFS file system image to Hive, which makes it possible to analyze it using SQL, build reports in BI systems, create complex triggers in monitoring systems, etc.

This also helps maintain the optimal cluster performance, promptly respond to dangerous events (such as an explosive growth in the number of files), reasonably increase HDFS quotas for users (or, conversely, restrict their usage of HDFS space).

Besides, I hope that my experiment will save you a lot of time. It took me five weeks to find and test the solutions described in this article. With this instruction, I think it can be done in a few days.

--

--