Spark Read from & Write to HBase table using DataFrames
Is it possible to manipulate an unstructured data using a structured API ?
The need for NoSQL databases has become very urgent nowadays, we manipulate data with different forms and structures, HBase (Hadoop Base) was created to satisfy this need, build on top of HDFS (Hadoop Distributed File System), it provides a distributed storage of object based data across multiple nodes.
Similar to any other NoSQL database, HBase represents data as objects composed of fields (columns), each object (the equivalent to a data row in relational databases), has a unique key more or less columns than his neighbors.
Furthermore, Hbase has introduced the concept of column families to group related data together for more optimized read and write operations. The figure below gives us more details about this matter:
Each Row key is indexed to perform fast data retrieval and update operations.
I hope you enjoyed this brief introduction, It’s clear that HBase has a lot of interesting features, from data distribution, NoSQL data reprsentation, to the support for update operations, it is really cool right ? though we still haven’t mentioned a very important point, especially, in a production environment, how to integrate it with an existing Big Data environment ?
To narrow the spectrum of answers, I would like to focus in this article on Spark-Hbase integration, Spark is almost present in any cluster deployment, and It is considered to be one of the most popular Big Data tools, therefore, it’s obvious they would eventually talk to each other.
HBase has a default API for data manipulation, the major provided methods are the following:
- Scan : Scan a range of rows
- Get : Get a specific row using a key
- Put : Insert a row into a table
This low level communication is supported by Hbase-Spark community :
Apache HBase ™ Reference Guide
13.4.1. Changes of Note! First we'll cover deployment / operational changes that you might hit when upgrading to HBase…
It would work well with data at small scale, but, with big datasets, performance issues would be pumping up the one after the other, trust me I have been through a similar situation when I first started working on HBase.
We have reached the end of the road, what is the solution ?, how to preserve the NoSQL benefits, and not at the cost of performance?
Spark is not so popular for no reason, it has a rich community, when I first faced this difficulty at work, it made my days Hell, but in the end, I found a solution to all my problems, a way to read from and write into HBase in an optimized manner and that was with the help of Spark DataFrames.
I hope I got your attention by now, curious to find out what the solution is ? Let’s discover it together.
Reading data from HBase
Spark DataFrames are a structured representation of data, with support of SQL-like operations, the key to interact with HBase in the same manner is to create a mapping between the object fields and our DataFrame columns
To elaborate our use-case, we start first by creating a test table using HBase Shell, and then populate it with some data :
I have created two column families cavalry and infantry (I am a big fan of ancient wars) , a full scan of the table displays the available data :
hbase(main):154:0> scan 'default:WAR_PLAN'ROW COLUMN+CELL1 column=cavalry:number, timestamp=1609327934978, value=652 column=infantry:number, timestamp=1609327954343, value=353 column=cavalry:number, timestamp=1609327983183, value=503 column=infantry:number, timestamp=1609327965760, value=50
Now, let’s write some Spark code to retrieve this data, i would use Scala Programming Language during the below example, i apologize for Python lovers, though it is possible to do the same using Pyspark :
We start by initiating a connection between the Hbase and Spark, HBase relies completely on Zookeeper for state communication between nodes, therefore, first, we provide its associated quorum ( list of hosts on which ZooKeeper servers are running) and the client port.
import org.apache.hadoop.hbase.HBaseConfigurationval conf = new HBaseConfiguration() conf.set("hbase.zookeeper.quorum", "hostname1,hostname2...") conf.set("hbase.zookeeper.property.clientPort", "2181")
new HBaseContext(spark.sparkContext, conf)val hbaseTable = "default:WAR_PLAN"val columnMapping = """id string :key,
|infantryNumber string infantry:number,
|cavalryNumber string cavalry:number"""
.stripMargin val hbaseSource = "org.apache.hadoop.hbase.spark"
The HBase table refers to the one that we are trying to load, the format follows the below syntax:
The mapping is a comma separated list, each element format follows this rule :
DataframeColumnName Type ColumnFamily:HbaseColumnName
Finally, hbaseSource corresponds to the library responsible for assuring a communication between Spark and HBase, the following line should be added to your sbt dependencies in order to use it ( I am using Cloudera CDH provided libraries):
libraryDependencies += "org.apache.hbase" % "hbase-spark" % "2.1.0-cdh6.3.4"
Below is the corresponding Maven repository, if you are using a package manager rather than sbt :
Now let’s combine the already defined parameters into a single line of code and load our data into a DataFrame:
val hbaseData = sql.read.format(hbaseSource).option("hbase.columns.mapping", columnMapping).option("hbase.table", hbaseTable)val hbaseDf= hbaseData.load()
The work is almost done, with some magic to our recipe, this is how our data looks like:
+-------------+--------------+--------+|cavalryNumber|infantryNumber| id |+-------------+--------------+--------+| 65| null| 1 || null| 35| 2 || 50| 50| 3 |+-------------+--------------+--------+
It’s a spark structured Dataframe that represents data residing actually in Hbase, this is really cool guys, Spark DataFrame API has become all available to us (some operations are to be excluded on Hbase DataFrames like joins…, further explanation would be provided in a separate article).
Writing data to HBase
Writing to an Hbase table will not be difficult once a corresponding spark Dataframe has been created, (this is exactly what we have done during the previous step).
We would use the same Spark-Hbase API as before, not only it is useful for reading, but also it features a possibility to write structured Dataframes, build using Hive sql queries, into an Hbase table with a non structured schema.
I have already prepared a Hive table with two columns and a few rows, below is the table content :
hive> select * from work.war_planid infantrynumber cavalrynumber4 56 75 6 996 50 27
Time to get back to coding, we build first a spark Dataframe on top of our Hive table:
val hiveTmp = spark.sql("select * from default.war_plan")
val columns: Array[String]= hbaseDf.columns
val hiveDf = hiveTmp.select(columns.head, columns.tail: _*) hiveDf.createOrReplaceTempView("hiveDataframe")
Then, we use a Spark-SQL insert statement to move data from Hive data warehouse into Hbase storage:
val insertStatement = "insert into hbaseDataframe select * from hiveDataframe"spark.sql(insertStatement)
All the magic here happens in the hood, Spark translates first the insert statement into its equivalent Hbase put methods, afterwards, the generated instructions are then run using Hbase Spark native API, this link provides a sample code.
The Spark-Hbase Dataframe API is not only easy to use, but it also gives a huge performance boost for both reads and writes, in fact, during connection establishment step, each Spark executor communicates directly with the Hbase regions on the same local node, thus read and write operations are to executed in distributed mode.
We managed through this tutorial to interact with a NoSQL database : Hbase through Spark Dataframe API, full code is provided in my GitHub repository .
I hope you found the post useful and informative, don’t forget to discuss it in the comment section, all questions are welcomed, thank you for reading.