Hive Design Patterns

Incremental Ingestion

Prabhath Vemula
DataKare Solutions
5 min readFeb 17, 2019

--

Apache Hive

Apache Hive has evolved as one of the most popular interactive and analytical data store in the Hadoop ecosystem, due to this demand, Hive will play a major role in designing a robust big data pipeline.

Hive handles all types of data sources such as file system, database, streaming etc… It provides rich functionality for data aggregation, data analysis and querying.

This article explains a few design patterns for ingesting incremental data to the HIVE tables. The main focus of this article is explaining how to use hive efficiently in big data pipelines.

Typically the data ingestion process involves following scenarios to add new set of data to the Hadoop layer.

  • Complete Load: In this method, entire table/partition is truncated and then added with new full set of data.
  • Append Load: New data is appended to existing data for each batch of data refresh.
  • Insert or Update Ingestion: In this method, data with new key is inserted to the table, whereas if the relevant key already exists in partition/table then record is updated with latest info.

Let’s go through each of these scenarios in detail. For demonstration purpose, we will be using orders table.

Orders table is in JSON format and contains below fields.

1)   ID
2) Order_ID
3) Product_ID
4) Product_Name
5) Quanity
6) Product_Price
7) Customer_Id
8) Customer_Name
9) Order_Date

Step 1: Create Hive Stage

First step in creating data pipeline is preparing and ingesting data to stage table. Data from from source system is fetching using hadoop API and then stored in HDFS landing area/stage location. An external stage table is created pointing to this location.

In our example, the stage table for ‘order’ data is defined where “/user/dks/datalake/orders_st” is HDFS landing location where incremental data is stored.

CREATE EXTERNAL TABLE IF NOT EXISTS orders_stg (id string,
customer_id string,
customer_name string,
product_id string,
product_name string,
product_price string,
quantity string,
order_date string,
src_update_ts string
)ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'LOCATION '/user/dks/datalake/orders_stg'; //source data location

Step 2: Create final table

In this step will create a hive managed table which holds the final data. Table is stored in ORC format and partitioned by order_date.

Here is the DML of final table:

CREATE TABLE IF NOT EXISTS orders (id bigint,
customer_id string,
customer_name string,
product_id int,
product_name string,
product_price decimal(12,2),
quantity int,
src_update_ts timestamp,
src_file string,
ingestion_ts timestamp
)
PARTITIONED BY (order_date date)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION '/user/dks/datalake/orders';

Now table structure has been defined, let’s go through each incremental ingestion option.

Step 3: Incremental Ingestion Types

Once the stage and final tables are created, next is determine best suitable ingestion scenario that works for you ingestion. You need to consider source data format, volume, velocity of data flow, access criteria, etc.. to find ingestion type that meets your requirement.

Note that orders table is partitioned table, so ingestion will be performed at partition level.

1. Complete/Full Load

In this approach overlapping partitions are dropped completely and recreated with new set of data.This method is typically used when incremental stage table holds corresponding full partition data.

Set the following hive configurations.

set hive.support.quoted.identifiers=none;set hive.exec.dynamic.partition=true;set hive.exec.dynamic.partition.mode=nonstrict;

Query for complete load

with new_data as(select * from orders_stg)
insert overwrite table orders partition(order_date)
select `(order_date)?+.+`, input__file__name, current_timestamp as ingestion_ts, cast(order_date as date) as part from new_data

In the above query, Hive command “insert overwrite” drops the existing partition and then recreated with new set of data from stage table.

Hive maintains the data integrity by acquiring shared lock on the ‘orders” table and exclusive lock on partition that is being updated.

2. Append Load

In append load new data is appended to the existing partition or table.This approach is typically used when incremental data contains only new data without any redundancy.

This can be achieved using two ways. Lets go through each one. In the first approach “Append Load” is done by overwriting the existing partition by merging old and new data. Advantage with method it avoids creating small files if there are frequent appends.

Here is the query for this method:

//Use the same hive configuration used for Complete loadwith new_data as(select * from orders_stg)
insert overwrite table orders partition(order_date)
(select * from orders where order_date in
(select distinct order_date from new_data) --existing data
Union all
(select `(order_date)?+.+`, input__file__name, current_timestamp as ingestion_ts, cast(order_date as date) as part from new_data )

Second approach is simply add the new data to existing partition.

//Use the same hive configuration used for Complete loadwith new_data as(select * from orders_stg)
insert into table orders partition(order_date)
select `(order_date)?+.+`, input__file__name, current_timestamp as ingestion_ts, cast(order_date as date) as part from new_data

3. Insert or Update Ingestion

If the requirement for data ingestion is overwrite or update existing data with incoming data then Insert/Update method is preferred.

New data is added or updated to the existing partition/table based on a row key. Data is appended to the table if the record contains new “key” otherwise existing record is updated with latest information.

This option is only possible when a row in the table can be identified uniquely using one or more columns. In orders table, “ID” field is row key. src_update_ts field indicates record time stamp.

Let me walk you through two different techniques to apply insert or update ingestion.

Method 1: use over() function to find out latest record that needs to be updated/inserted.

with new_data as(select * from orders_stg)
insert overwrite table orders partition(order_date)
select `(rank_no)?+.+` from (
(
select *, row_number() over(partition by order_date,ID order by src_update_ts desc) rank_no from(
select * from orders where order_date in (select distinct order_date from new_data)
Union
select `(order_date)?+.+`, input__file__name, current_timestamp as ingestion_ts, cast(order_date as date) as part from new_data
) update_data
where rank_no=1 distribute by order_date order by ID
--rank_no=1 indicates recent record
--`(order_date)?+.+` : This selects all the columns except order_date

Method 2:

In this method, a temporary table is created by merging data from both original and incremental tables. Next data in temporary table is filtered to select only the records with latest timestamp.

Last step is original table is dropped and temporary table is renamed with original table name.

This approach is more suitable non-partition tables. For example if orders table is not partitioned. Here would be the DML

Step1=> RECONCILE VIEW: This view combines record set from both original table and incremental data.

With new_data as (select * from orders_stg)
create view recon_orders as (
select final_data.* from
(select * from orders where order_date Union all
select `(order_date)?+.+`, input__file__name, current_timestamp as ingestion_ts, cast(order_date as date) as part from new_data
) final_data
inner join (select order_date, ID, max(src_update_ts) as max_src_upd_ts from (select order_date, id, src_update_ts from orders union all
select order_date,id, src_update_ts from new_data) all_data ) max_data on final_data.id=max_data.id and final_data.src_update_ts=max_data.max_src_upd_ts
)

Step2 =>Insert into temp table

drop table temp_orders;create table temp_orders asselect * from recon_orders  ---this view is created in step1

Step3=> Purge

drop table orders;ALTER TABLE recon_orders RENAME TO orders;

Conclusion, this covers various methods to ingest incremental data to Hive tables.

--

--

Prabhath Vemula
DataKare Solutions

15 years experience in Big Data, Application development and Datawarehouse. Successfully implemented several big data projects.