Hadoop data storage from architecture to compression and serialization

Zihan Guo
Data Alchemist
Published in
6 min readSep 5, 2019

Motivations

Hadoop is fundamental for building data engineering pipelines, so let’s devote some time summarizing and organizing all the knowledge relating to Hadoop from an architecture perspective.

general architecture

HDFS uses a master-worker architecture. There are two types of servers in HDFS architecture: the name-node and the data-node. Name-node stores file system meta-data (directory path tree, file names, etc.) while the data-node saves application data with replications to ensure reliability. In addition, data are broken down into blocks in data-nodes, with certain default size (usually 64 MB but can be changed). Let’s take a look into some tricks for configuring HDFS properties.

some tricks

trick 1: To manually change block-size, just change the configuration setting in HDFS.

hadoop fs -D fs.local.block.size=134217728 -put local_name remote_location  # other versions of hadoop uses dfs.block.size instead of fs.local.block.size

trick 2: sometimes data-node or name-node are not starting up, what to do?

step 1: kill all running servers
step 2: format namenode:
bin/hadoop namenode -formatstep 3: re-start mapreduce and hdfs daemons.

trick 3: how to configure replication-factor and block-size for HDFS

step 1: open conf/hdfs-site.xml
step 2: change the following properties
<property>
<name>dfs.replication<name>
<value>3<value>
<description>Block Replication<description>
<property>
note: similarly, you can changed the block-size as well since the configuration files contains everything about the properties of the HDFS system<property>
<name>dfs.block.size<name>
<value>134217728<value>
<description>Block size<description>
<property>

Block Caching

Frequently read data can be cached as Block Caching. Normally data-node read data from disk, but we can put data into memory instead, so Yarn or other resource managers can take advantage of the performance benefits.

File Read Flow

When reading data from HDFS, the client opens a file by calling open on the FileSystem object which then contacts namenode using RPC calls. Namenode stores paths for blocks in memory for efficiency and does not provide any application data except the information of the meta-data e.g. path to the block location. After FileSystem obtained the location from the namenode, it creates an FSDataInputStream object and returns it to the client for it to connect with the datanodes for block data retrieval. Next, Client calls read on FSDataInputStream objects to connect with the first (closest in topological order based on the rack architecture and distance) datanode for the first block based on the information from the namenode. It continues connecting to various datanode in the cluster until it reaches to the end which Client will close the connection with the FSDataInputStream object.

File Write Flow

Writing to a file in HDFS is similar, with two add-on process. First, when writing data to blocks, data is split into packets, and packets are added to the internal data queue which is consumed by DataStreamer that ask namenode to create new blocks and to pick good datanode to store the replications. Next, DataStreamer sends packets to the first datanode and then forms a pipeline with the second datanode and the third datanode. Simultaneously, FSDataOutputStream keeps an ack queue that only removes a packet if and only if it has been acknowledged by all the datanodes. However, sometimes a datanode could fail if that happens the current pipeline will be closed and all the packets hanging will be placed back to the head of the queue to avoid downstream task failures. Now, a new pipeline will be allocated using the remaining two good datanodes. The namenode notices that data is under-replicated and it will allocate a new good datanode for the replication. If more than one datanode fails, as long as the number of good datanode is greater than dfs.namenode.replication.min (default = 1) node standing, the process will continue.

Tricks

Trick 1: what to do if data becomes imbalanced?

ideally, we want to have more maps than nodes, but this is not always possible because we might have some other jobs that need access to the same node; therefore, balancer can be used to assist with balancing block distributions.

Data Compression

Compression speed and ratio is the major trade-off to be considered. On one hand, compression saves space on HDFS usually at around 25%, and reduces I/O transfer load. However, the more compression ratio one seeks, the slower the compression speed. Here is a visualization of zstd, zstdmt, lz4 and zlib’s compression speed versus ratio plot.

How to use Data Compression on HDFS?

It is important to realize that data compression is not omnipotent. When and how to use it depends on the scenario. One major factor that affects if to use compression or not is if the compression format is splittable. This is especially important in a distributed system like HDFS. In addition, it depends on the task, if the data is created in the cluster and the machine running in the cluster has low CPU-power, then compression speed might slow down the task significantly. In addition, if a data-path will be frequently over-written or modified, compression and decompression will also cause a lot of resource waste. Therefore, careful benchmarking in a scenario by scenario fashion should be conducted to optimize the usage of compression. Several commonly used compression format is listed below along with if each is splittable or not.

From Compression to Data Storage System on Hadoop

When selecting among data storage systems on Hadoop, we care about the following criteria: space efficiency, data ingestion, random look-up, and full-data scanning. There are four major storage systems that we are interested in here: Parquet, Avro, Hbase, Kudu.

From a space utilization perspective, HBase is less efficient than everyone else (HBase is designed for storing sparse data and streaming random write/read to a large dataset). Kudu and Parquet are the top choices when looking at space utilization alone.

Parquet and Avro are best for data ingestion, especially because Avro has a compact encoding by assuming Schema always exists (at both read and write times).

Because of the internal indexing mechanism of HBase and Kudu, their performances are the fastest. Parquet’s column-based partitioning also makes it very fast at look-up time. Avro, unlike Parquet, has a row-based storage format which is much slower at lookup (entire dataset has to be scanned in order to retrieve the value).

Parquet’s column project reduces the input which surpasses all other storage systems. HBase, however, is ideal for data-scanning. Both Avro and Parquet benefited by its parallelization because one unit is a block, whereas Kudu and HBase’s one unit is a table partition.

When to use What?

Storage efficiency-wise, Parquet/Kudu combining with Snappy offers the best choice. Avro is great at data ingestion because of its compact encoding. Random access using HBase/Kudu would be a blessing. Parquet/Kudu’s column-based partitioning makes analytics easy.

Reference

1. https://intellipaat.com/blog/tutorial/hadoop-tutorial/hdfs-operations/
2. https://princetonits.com/how-to-configure-replication-factor-and-block-size-for-hdfs/
3. Hadoop: The Definitive Guide, 4th Editionby Tom White Published by O'Reilly Media, Inc., 2015
4.
http://comphadoop.weebly.com/
5. https://db-blog.web.cern.ch/blog/zbigniew-baranowski/2017-01-performance-comparison-different-file-formats-and-storage-engines

--

--