Ingesting data to Apache Hudi using Spark sql

Sivabalan Narayanan
4 min readMay 13, 2023

We already saw here, that we can write Apache Hudi tables in 4 diff ways using Spark engine. We have already covered write tutorial for spark-datasource, spark streaming and deltastreamer. So, that leaves us with spark-sql. In this blog, let’s take a look at how you can write using spark-sql.

Note: we gonna use new york taxi dataset for our tutorial. And I will be using hudi version 0.13.0 with spark2 and scala11.

Spark-sql launch command:

./bin/spark-sql - packages org.apache.hudi:hudi-spark2.4-bundle_2.11:0.13.0 \
-conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
-conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
-driver-memory 4g -executor-memory 6g

Create

Spark-sql comes with its own grammer and syntax to operate with tables. For create there are two commands. CTAS and regular CREATE command.

CTAS

CTAS stands for “Create table as Select”. As the name suggests, you can create a table directly by selecting records from another table. I.e. Create and insert together.

Lets create a parquet managed table as input.

create table ny_parquet using parquet location 'file:////tmp/new_york_taxi/part-0000*.parquet';

Let’s create a hudi table using CTAS command.

create table hudi_ctas_cow
using hudi
location 'file:///tmp/hudi/hudi_ctas_cow'
tblproperties (
type = 'cow',
primaryKey = 'tpep_pickup_datetime',
preCombineField = 'date_col')
as
select * from ny_parquet;

As the table properties suggests, this table is a non-partitioned table and table type is COPY_ON_WRITE. Record key is chosen as “‘tpep_pickup_datetime’” and pre combine is chosen as “date_col”.

If you prefer to create a partitioned table:

create table hudi_ctas_cow_partitioned
using hudi
location 'file:///tmp/hudi/hudi_ctas_cow_pt'
tblproperties (
type = 'cow',
primaryKey = 'tpep_pickup_datetime',
preCombineField = 'tpep_dropoff_datetime')
partitioned by (date_col)
as
select * from ny_parquet;

You need to add `partitioned by (date_col)` to the previous CTAS command.

CREATE

You could also create the table to get started and then later ingest reords using “insert” or “update” or other constructs in spark-sql.

Lets create a new hudi table w/ an explicit schema.

create table hudi_cow_tbl (
VendorID int,
tpep_pickup_datetime string,
tpep_dropoff_datetime string,
passenger_count int,
trip_distance double,
RatecodeID int,
store_and_fwd_flag string,
PULocationID int,
DOLocationID int,
payment_type int,
fare_amount double,
extra double,
mta_tax double,
tip_amount double,
tolls_amount double,
improvement_surcharge double,
total_amount double,
congestion_surcharge double,
date_col string
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'tpep_pickup_datetime',
preCombineField = 'date_col'
)
location 'file:///tmp/hudi/hudi_cow_sql';

As the properties suggest, this is a non-partitioned table. If you prefer to create a partitioned table, you can add `partitioned by (date_col)`

to the above command.

Insert

Now that we have created the table, lets insert into the same.

insert into hudi_cow_tbl select * from ny_parquet;

Lets read and verify we have inserted all

select count(*) from hudi_cow_tbl;
select count(*) from ny_parquet;

Both these (should) returns the same count.

Update

Lets see how we can update using spark-sql. You can define predicates and the target values in the UPDATE clause. Predicates will determine what records need to be updated and target values will dictate how the records need to be updated.

update hudi_cow_tbl set DOLocationID = 500, store_and_fwd_flag = 'Y' \ 
where VendorID = '4' and DOLocationID = 200;

For example, in the above example, we are updating all records from hudi table which has VendorID = ‘4’ and DOLocationID = 200. We are updating the value of DOLocationID = 500, and store_and_fwd_flag = ‘Y’ for the matching records.

Delete

Spark-sql has constructs to delete records. The grammer/syntax is almost similar to UPDATE, just that we won’t define any target values.

delete from hudi_cow_tbl where VendorID = '4' and DOLocationID = 100;

After this statement, if we try to query the table for similar records, we should not find any.

select count(*) from hudi_cow_tbl where VendorID = '4' and DOLocationID = 100;

Merge_Into

Spark-sql has a special construct named MERGE_INTO in contrast to spark-datasource writes. This is an interesting way to ingest records to hudi tables. You will be joining the target table records with some source table and depending on matching clauses different actions can be taken. Most common nomenclature is, users will choose to do INSERTs when not matched, and UPDATE when matched.

Lets create the input for merge_into

create table ny_parquet_mit using parquet location \ 
'file:///tmp/new_york_taxi/part-00010-a1b1fb65-de8d-453b-9c10–6067c160c0e2-c000.snappy.parquet';

Lets try merge_into

MERGE INTO hudi_cow_tbl
USING (select VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,date_col from ny_parquet_mit) tbl_a
ON hudi_cow_tbl.tpep_pickup_datetime = tbl_a.tpep_pickup_datetime
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT * ;

So, when joining the input table i.e. ny_parquet_mit with hudi_cow_tbl based on the record key configured (tpep_pickup_datetime), the matching records will be updated(WHEN MATCHED THEN
UPDATE SET *), while the non-matching records will be inserted (WHEN NOT MATCHED
THEN INSERT *).

Conclusion

Apache Hudi gives you 4 different ways to write using spark engine to cater to different use-cases and spark-sql is on of the most common ways to write data lake tables. Hope this blog will help a newbie to get started with Apache Hudi.

--

--