Near Real-Time State Management of billion+ entities in HBase with just 3 Nodes

DataOrc
Dataorc

--

by Navdeep

Problem Statements

  • Handle user clickstream data to keep the track of user behavior attributes
  • Clickstream data incoming rate reaches up to 500 thousand events per second. →Velocity
  • Clickstream data is coming from different sources. Need data coalition. → Variety
  • Need to keep track of history, the latest of 20 plus attributes of around 1.4 billion entity points.Data closing around 300–400 GB→ Volume

Design Approach

  • The problem statement is a classic definition of Big data since we had all 3 Vs(Volume, Variety, Velocity). Need to have distributed setup since volume and velocity is bound to increase
  • One of the advantages we already had is we are receiving data from Kafka which allows horizontally scalable distributed consumers and that data is being replicated to s3/google storage in a timely fashion(Currently every 10 minutes but can be reduced and increased based on use cases). Take away is source data is splittable.
  • Since the requirement was for the near real-time, we need to think of micro batching or real streaming. We chose Spark for being massively scalable, well-defined data abstractions(Data frames), good integration with Hadoop ecosystem with an added benefit of being able to use the same code for the stream job.
  • We required not only historical, latest values but we also had a requirement of ever-increasing user behavior attributes. Now we need to consider a horizontal scaling, columnar, schema-less storage. We chose HBase for sheer scale, openness(we have our custom co-processors doing merging post/pre-processing) and most important factor that enabled us to scale → offline uploading.

Now let's get on to the meat, coding. I will partition my code into 3 parts: Reading, Processing n storing

Reading

Input: Our source was s3 files created in every 10 minutes partition. We had cursor(epoch seconds) maintained in our Mysql Server(already part of the stack) which tells which ten-minute partition to read from. Only on the success of the job, we will update the cursor to the end time of the processed 10-minute partition. If a job fails for any reason next turn will be picking up 20 minutes files. Input example**

spark-submit <other-options> --base-path s3a://popularity-data/realtime/ --start_time 1560611400 --end_time 1560612000

Spark code in the reading section will read these JSON files as normal files, create dataframes(DF) and forward the DFs to the processing section.

Simple Spark Loader

Processing

Input: Dataframes from spark loader. The processing code is independent of the file format. Just give me a dataframe and I will take care of it!

The main motive of processing is to convert the dataframe columns into key-values with a timestamp since this is what HBase understands. Timestamp column is a must requirement and is the most important attribute for achieving idempotency for our job. Super Critical!!

We built a library that takes dataframe and converts it into Hbase key-value pairs. Given below code sample, convert the dataframes into a pair of key and HBase key-value, but why we stored in this format? because later on, we need to build the HFiles in sorted fashion for which we will use the key of this pair.

Conver Spark data frames to key-value pair

The above part will explode the number of rows to (# rows in the original dataset * #columns), so spark jobs will take some time on this stage.

The next task is where the magic happens and is also the bottleneck of big data, shuffling and sorting and it will be the part of loading.

LOADING

The requirement for HBase to load files is to create HFiles which are nothing but files containing key values with a timestamp and key values must be strictly sorted in non-decreasing fashion lexicographically. We will now sort all the keys generated in the previous step and create Hfile from it on HDFS.

Sorting

We need to sort on the key that will become our HBase key, some of the challenges we faced during sorting and most compute steps of our whole pipeline are in the order.

Earlier we were doing a total sort ordering of all keys hence our spark jobs were failing with OOM and fetch exceptions(which is one of the symptoms of executor failing/dying). Then I stumbled upon an idea of sorting within the partition only, which should reduce a lot of CPU cycles for me and it actually did. My jobs after using below snippets become less compute-intensive and are now getting completed in half the time.

rdd.repartitionAndSortWithinPartitions(new HfilePartitioner(startKeys), new ByteArrayComparator());

Obvious question should be what in the world is the first parameter and its relation to our job, the first parameter is a list of start keys in which spark will distribute the data into partitions, and in our case, we already had existing partitions in HBase(remember HBase is a key-value store where key-values are partitioned into regions, we created partitions based on starting characters of the key, hence we had pre-existing 36 partitions ).

The second obvious question is the second parameter which is the custom comparator to sort the keys. Here is one gotcha we have to sort the keys not only lexicographically but on a collision of keys they need to be sorted on the timestamp in descending order. Very Important!!

So now we are ready with all the armory to create HFiles on a HDFS location which is achievable through:

flattenKeyvalues.saveAsNewAPIHadoopFile(ConfigurationManagement.getConfig("hbase.hfiles.tmp.location"),
ImmutableBytesWritable.class,
KeyValue.class,
HFileOutputFormat2.class, config);

Phew!! That done newly created HFiles can be associated with HBase cluster with the help of incremental load

LoadIncrementalHFiles hconf = new LoadIncrementalHFiles(config);
hconf.doBulkLoad(new Path(ConfigurationManagement.getConfig("hbase.hfiles.tmp.location")), new HTable(config, ConfigurationManagement.getConfig("hbase.table.name")));

Now is a time for interesting stats

Non-Peak Hours

  • Incoming traffic → 100K/per second
  • 10-minute size of s3 file → 6–8 GB
  • Amount of time taken by Spark jobs to process the data → 50- 60 seconds with 6 executors of 2GB executor memory.
  • Total time → 60–70 seconds
  • Max Latency of data available for query purpose → 11 Minutes on last 10-minute window

Peak Hours

  • Incoming traffic → 600K/per second
  • 10-minute size of s3 file → 40–60 GB
  • Amount of time taken by Spark jobs to process the data → 120 s- 180 seconds with 6 executors of 2GB executor memory.
  • Total time → 110–120 seconds
  • Max Latency of data available for query purpose → 12 Minutes on last 10-minute window

Hardware

  • 3 M4.2x large instances
  • HBase Cluster(highly read optimized) consists of a minimum of 3 m4 2x large instances, which is serving our most of the data warehouse requirements both in real-time and bulk.

Incremental load in HBase is pretty common exercise at scale, Yahoo, Mozilla, LinkedIn, Twitter and others have used and shared codes snippets in Apache Pig, Apache Kylin, Apache Hive, Apache Spark of using bulk load to handle HBase insertion at any scale.

At Dataorc we always design solutions with scale and cost-effectiveness in mind. With the experience of building data platforms from scratch for enterprises as well as startups, we believe in giving solutions that grow with your business as well make it grow. Do reach out to us for free sessions of open consulting with our tech team.

--

--