Hive — Hive ACID Transactional Table and Compaction

Randy
9 min readDec 4, 2021

Since version 0.13, Apache Hive introduced transactions since to fully support ACID semantics on Hive tables, including INSERT/UPDATE/DELETE/MERGE statements, streaming data ingestion, etc. In Hive 3.0, this feature is further improved by optimizing the underlying data file structure, reducing constraints on table schemes, and supporting predicate push down and vectorized query. Examples and setup can be found on Hive wiki and other tutorials, while this article will focus on how a transactional table is saved on HDFS and will take a closer look at the read-write process.

File Structure

Insert Data

CREATE TABLE employee (id int, name string, salary int)STORED AS ORC TBLPROPERTIES ('transactional' = 'true');INSERT INTO employee VALUES
(1, 'Jerry', 5000),
(2, 'Tom', 8000),
(3, 'Kate', 6000);

An INSERT statement is executed in a single transaction. It will create a delta directory containing information about this transaction and its data.

/user/hive/warehouse/employee/delta_0000001_0000001_0000
/user/hive/warehouse/employee/delta_0000001_0000001_0000/_orc_acid_version
/user/hive/warehouse/employee/delta_0000001_0000001_0000/bucket_00000

The schema of this folder’s name is delta_minWID_maxWID_stmtID, i.e., the “delta” prefix, transactional write's range (minimum and maximum write ID), and statement ID. In detail:

  1. All INSERT statements will create a delta directory. The UPDATE statement will also create a delta directory right after a delete directory. The delete directory is prefixed with delete_delta.
  2. Hive will assign a globally unique ID for every transaction, both read and write. For transactional writes like INSERT and DELETE, it will also assign a table-wise unique ID, also known as a write ID. The write ID range will be encoded in the delta and delete directory names.
  3. The Statement ID is used when multiple writes into the same table happen in one transaction.

For its content, _orc_acid_version always contains “2”, indicating that this directory is in ACID version 2 format. Compared with previous version, the main difference is that UPDATE now uses a split-update technique to support predicate push down and other features (HIVE-14035). bucket_00000 is the inserted records. Because this table is not bucketed, there is only one file, and it's in ORC format. We can take a look at its content with orc-tools:

$ orc-tools data bucket_00000{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":0,"currentTransaction":1,"row":{"id":1,"name":"Jerry","salary":5000}}{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":1,"currentTransaction":1,"row":{"id":2,"name":"Tom","salary":8000}}{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":2,"currentTransaction":1,"row":{"id":3,"name":"Kate","salary":6000}}

The file content is displayed — row-wise — in JSON. We can see the actual data is in row, while other keys work for the transaction mechanism:

  1. operation 0 means INSERT, 1 UPDATE, and 2 DELETE. UPDATE will not appear because of the split-update technique mentioned above.
  2. originalTransaction is the previous write ID. For INSERT, it's the same as currentTransaction. For DELETE, it's the write ID when this record is first created.
  3. bucket is a 32-bit integer defined by BucketCodec class. Their meanings are:
bit 1–3: Bucket codec version, currently 001.
bit 4: Reserved for future use.
bit 5–16: The bucket ID, 0-based. This ID is determined by CLUSTERED BY columns and number of buckets. It matches the bucket_N prefixed files.
bit 17–20: Reserved for future use.
bit 21–32: Statement ID.
For instance, the binary form of 536936448 is 00100000000000010000000000000000, showing it is a version 1 codec, and bucket ID is 1.`

4. rowId is the auto-generated unique ID within the transaction and bucket.currentTransaction is the current write ID.

5. row contains the actual data. For DELETE, row will be null.

Streaming Data Ingest V2

Hive 3.0 also upgrades the former Streaming API. Now users of third-party tools, like Flume, can use the ACID feature to write data continuously into the Hive table. These operations will also create delta directories. However, mutation is no longer supported.

StreamingConnection connection = HiveStreamingConnection.newBuilder().connect();
connection.beginTransaction();
connection.write("11,val11,Asia,China".getBytes());
connection.write("12,val12,Asia,India".getBytes());
connection.commitTransaction();
connection.close();

Update Data

UPDATE employee SET salary = 7000 WHERE id = 2;

This statement will first run a query to learn the row__id of the updating records, and then create a delete directory along with a delta directory:

/user/hive/warehouse/employee/delta_0000001_0000001_0000/bucket_00000
/user/hive/warehouse/employee/delete_delta_0000002_0000002_0000/bucket_00000
/user/hive/warehouse/employee/delta_0000002_0000002_0000/bucket_00000

Content of delete_delta_0000002_0000002_0000/bucket_00000:

{"operation":2,"originalTransaction":1,"bucket":536870912,"rowId":1,"currentTransaction":2,"row":null}

Content of delta_0000002_0000002_0000/bucket_00000:

{"operation":0,"originalTransaction":2,"bucket":536870912,"rowId":0,"currentTransaction":2,"row":{"id":2,"name":"Tom","salary":7000}}

The DELETE statement works similarly to UPDATE, i.e., to find the record but generate only the delete directory.

Merge Statement

MERGE is like MySQL’s INSERT ON UPDATE. It can update a target table with a source table. For instance:

CREATE TABLE employee_update (id int, name string, salary int);
INSERT INTO employee_update VALUES
(2, 'Tom', 7000),
(4, 'Mary', 9000);
MERGE INTO employee AS a
USING employee_update AS b ON a.id = b.id
WHEN MATCHED THEN UPDATE SET salary = b.salary
WHEN NOT MATCHED THEN INSERT VALUES (b.id, b.name, b.salary);

This statement will update the salary of Tom, and insert a new row of Mary. WHENs are considered different statements. The INSERT clause generates delta_0000002_0000002_0000, containing the row of Mary, while UPDATE generates delete_delta_0000002_0000002_0001 and delta_0000002_0000002_0001, deleting and inserting the row of Tom.

/user/hive/warehouse/employee/delta_0000001_0000001_0000
/user/hive/warehouse/employee/delta_0000002_0000002_0000
/user/hive/warehouse/employee/delete_delta_0000002_0000002_0001
/user/hive/warehouse/employee/delta_0000002_0000002_0001

Compaction

As time goes, there will be an increasing number of delta and delete directories in the table. This will affect the read performance, because reading is a process of merging the results of valid transactions. Small files are never friendly to file systems like HDFS. So Hive uses two kinds of compactors, namely minor and major, to merge these directories while preserving the transaction information.

Minor compaction will merge multiple delta and delete files into one delta and delete file, respectively. The transaction ID will be preserved in folder name as write ID range ?preserved in a folder named write ID range?, as mentioned above, while omitting the statement ID. Compactions will be initiated automatically in the Hive metastore process based on some configured thresholds. We can also trigger it manually with the following SQL:

ALTER TABLE employee COMPACT 'minor';

Take the result of MERGE statement for an instance. After minor compaction, the folder structure will become:

/user/hive/warehouse/employee/delete_delta_0000001_0000002
/user/hive/warehouse/employee/delta_0000001_0000002

In delta_0000001_0000002/bucket_00000, rows are simply ordered and concatenated, i.e., two rows of Tom will be both included. Minor compaction does not delete any data.

Major compaction will, on the other hand, merge and write the current table into a single directory, with the name base_N, where N is the latest write ID. Deleted data will be removed in major compaction. row_id remains untouched.

/user/hive/warehouse/employee/base_0000002

Note that after minor or major compaction, the original files will not be deleted immediately. Deletion is carried out by a cleaner thread, so there will be multiple files containing the same transaction data simultaneously. Take this into account when understanding the reading process.

Reading Process

Now we see three kinds of files in an ACID table, base, delta, and delete. Each contains data rows that can be identified by row__id and sorted by it, too. Reading data from an ACID table is a process of merging these files, and reflecting the result of the last transaction. This process is written in OrcInputFormat and OrcRawRecordMerger class, and it is basically a merge-sort algorithm.

Take the following files for instance. This structure can be generated by inserting three rows, doing a major compaction, and then updating two rows. 1-0-0-1 is short for originalTransaction - bucketId (not encoded) - rowId - currentTransaction.

+----------+    +----------+    +----------+
| base_1 | | delete_2 | | delta_2 |
+----------+ +----------+ +----------+
| 1-0-0-1 | | 1-0-1-2 | | 2-0-0-2 |
| 1-0-1-1 | | 1-0-2-2 | | 2-0-1-2 |
| 1-0-2-1 | +----------+ +----------+
+----------+

Merging process:

  • Sort rows from all files by (originalTransaction, bucketId, rowId) ascendingly, (currentTransaction) descendingly. For example:
1-0-0-1
1-0-1-2
1-0-1-1

2-0-1-2
  • Fetch the first record.
  • If the row__id is the same as the previous, skip.
  • If the operation is DELETE, skip. (As a result, for 1-0-1-2 and 1-0-1-1, this row will be skipped.)
  • Otherwise, emit the row.
  • Repeat.

The merging is done in a streaming way. Hive will open all the files, read the first record, and construct a ReaderKey class, storing originalTransaction, bucketId, rowId, and currentTransaction. ReaderKey class implements the Comparable interface, so they can be sorted in an customized order.

public class RecordIdentifier implements WritableComparable<RecordIdentifier> {
private long writeId;
private int bucketId;
private long rowId;
protected int compareToInternal(RecordIdentifier other) {
if (other == null) {
return -1;
}
if (writeId != other.writeId) {
return writeId < other.writeId ? -1 : 1;
}
if (bucketId != other.bucketId) {
return bucketId < other.bucketId ? - 1 : 1;
}
if (rowId != other.rowId) {
return rowId < other.rowId ? -1 : 1;
}
return 0;
}
}

public class ReaderKey extends RecordIdentifier {
private long currentWriteId;
private boolean isDeleteEvent = false;
public int compareTo(RecordIdentifier other) {
int sup = compareToInternal(other);
if (sup == 0) {
if (other.getClass() == ReaderKey.class) {
ReaderKey oth = (ReaderKey) other;
if (currentWriteId != oth.currentWriteId) {
return currentWriteId < oth.currentWriteId ? +1 : -1;
}
if (isDeleteEvent != oth.isDeleteEvent) {
return isDeleteEvent ? -1 : +1;
}
} else {
return -1;
}
}
return sup;
}
}

Then, the ReaderKey and the file handler will be put into a TreeMap, so every time we poll for the first entry, we can get the desired file handler and read data.

public class OrcRawRecordMerger {
private TreeMap<ReaderKey, ReaderPair> readers = new TreeMap<>();
public boolean next(RecordIdentifier recordIdentifier, OrcStruct prev) {
Map.Entry<ReaderKey, ReaderPair> entry = readers.pollFirstEntry();
}
}

Select Files

Previously we pointed out that different transaction files may co-exist, so Hive needs to first select the files that are valid for the latest transaction. For instance, the following directory structure is the result of these operations: two inserts, one minor compact, one major compact, and one delete.

delta_0000001_0000001_0000
delta_0000002_0000002_0000
delta_0000001_0000002
base_0000002
delete_delta_0000003_0000003_0000

Filtering process:

  • Consult the Hive metastore to determine the valid write ID list.
  • Extract transaction information from files names, including file type, write ID range, and statement ID.
  • Select the base file with the maximum valid write ID.
  • Sort delta and delete files by write ID range:
* Smaller minWID orders first;
* If minWID is the same, larger maxWID orders first;
* Otherwise, sort by stmtID; files w/o stmtID orders first.
  • Use the base file’s write ID as the current write ID, then iterate and filter delta files:
* If maxWID is larger than the current write ID, keep it and update the current write ID.
* If write ID range is the same as the previous, keep the file too.

There are some special cases in this process, such as no base file, multiple statements, contains original data files - even ACID version 1 files. More details can be found in AcidUtils#getAcidState.

Parallel Execution

When executing in a parallel environment, such as multiple Hadoop mappers, delta files need to be reorganized. In short, base and delta files can be divided into different splits, while all delete files have to be available to all splits. This ensures deleted records will not be emitted.

Vectorized Query

For a vectoried query, Hive will first try to load all delete files into memory and construct an optimized data structure that can be used to filter out deleted rows when processing row batches. If the delete files are too large, it falls back to a sort-merge algorithm.

public class VectorizedOrcAcidRowBatchReader {
private final DeleteEventRegistry deleteEventRegistry;

protected static interface DeleteEventRegistry {
public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet);
}
static class ColumnizedDeleteEventRegistry implements DeleteEventRegistry {}
static class SortMergedDeleteEventRegistry implements DeleteEventRegistry {}

public boolean next(NullWritable key, VectorizedRowBatch value) {
BitSet selectedBitSet = new BitSet(vectorizedRowBatchBase.size);
this.deleteEventRegistry.findDeletedRecords(innerRecordIdColumnVector,
vectorizedRowBatchBase.size, selectedBitSet);
for (int setBitIndex = selectedBitSet.nextSetBit(0), selectedItr = 0;
setBitIndex >= 0;
setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1), ++selectedItr) {
value.selected[selectedItr] = setBitIndex;
}
}
}

Transaction Management

Hive introduced a new lock manager to support transactional tables. DbTxnManager will detect the ACID operations in the query plan and contact the Hive Metastore to open and commit new transactions. It also implements the read-write lock mechanism to support normal locking requirements.

The Hive Metastore is responsible for allocating new transaction IDs. This is done in a database transaction so that multiple Metastore instances will not conflict with each other.

abstract class TxnHandler {
private List<Long> openTxns(Connection dbConn, Statement stmt, OpenTxnRequest rqst) {
String s = sqlGenerator.addForUpdateClause("select ntxn_next from NEXT_TXN_ID");
s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns);
for (long i = first; i < first + numTxns; i++) {
txnIds.add(i);
rows.add(i + "," + quoteChar(TXN_OPEN) + "," + now + "," + now + ","
+ quoteString(rqst.getUser()) + "," + quoteString(rqst.getHostname()) + "," + txnType.getValue());
}
List<String> queries = sqlGenerator.createInsertValuesStmt(
"TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, txn_user, txn_host, txn_type)", rows);
}
}

References

--

--