Pivot Key-Value Pairs via Hive

Robert Fehrmann
Snagajob Engineering
7 min readJun 4, 2014

--

Collecting Event based data can provide great inside into user behavior and preferences. At Snagajob we utilize Event based data not only to track vital company metrics but also to personalize the user experience. Events are issued to track all sorts of data points from connecting to Snagajob for a first time, searching for jobs, creating a profile, to submitting an application. Over the years the number of Events has virtually exploded not only due to ever increasing traffic but also because of tracking events in much more detail.

Recently it became obvious that our traditional SQL based Event tracking solution was not only not scalable enough but also too rigid. Adding a new Event Type required changes in multiple systems. A new solution was needed solving both that not only had to be horizontally scalalbe but also flexible enough to support adding a new Event Type with no or minimal changes.

Let’s look at some sample data first. For the purpose of the blog we will look at just one Event type (we currently have 199 Event types) and only 4 attributes (we currently have 49 attributes across all Event types). Below are 4 attributes collected for a “Search” Event. A Search event is fired when a Jobseeker issues a JobSearch. Besides other attributes the data points include the EventType, the browser used by the client, the query string, as well as the Result, i.e. a list of Job Posting IDs.

To decouple the Event generation from the Event processing we utilize a basic producer/consumer model via our message-bus infrastructure (based on RabbitMQ). The client (WebService or API) serializes the Event into key-values pairs and generates a fact in a message queue (producer). These key/value pairs are being consumed by a subscriber to that queue and send to our Hadoop infrastructure .

So the serialized Events look like this

This post is about how we collect the key/value pairs via flume, store them in HDFS, rebuild the original event via hive, and send the event for further processing into a SQL based DW via sqoop.

Event Collection (flume)

The flume agent (a1) has a pretty straight forward configuration. It consists of a source (r1), a sink (k1) and a channel (c1) connecting source and sink.

The source (r1) is an http source on port 5140 on any interface on the local host. It accepts a JSON based data structure. The 2 interceptors at the end allow us to track the host receiving the event as well as the date/time the event was received. We will see below how those to attributes are being used.

The sink (k1) is an hdfs sink that creates a text based file rolling over every 300 sec (5 minutes) or 100000 entries (whatever comes first). Remember the 2 interceptors (timestamp and host) we discussed before? Both interceptors are used to generate the file name and path for storing the event data in HDFS. The timestamp is used to partition files in directories by day. The host is used as one part of the file name prefix. The “inUsePrefix” attribute is particularly important since it will make files actively in use by flume hidden to other processes in the Hadoop framework, in particular hive (as we shall see later).

The last object missing is the conduit between source and sink, ie a channel. In our case we are using a 2 MB memory channel.

The last step is to bind source and sink via the channel and events are flowing from an http end point directly into HDFS.

Event Storage (HDFS)

As expected, files are being generated every 5 minutes, they are organized by date, and their file names include the host name of the flume instance creating the file

Event Processing

To process the Events data we will use hive. All we have to do to access access the Events data is to create metadata information indicating that we have an external partitioned table, let’s call it events, with each partition in the events table pointing to a directory in HDFS.

To ensure that we always collect all events for a particular day we will always look for data in the partition for the current day as well as the previous day partition. Adding 2 partitions, one for the previous day and one for the current day can be accomplished by the code below. Please note that we are using a highly available namenode service called nameservice1. Namenode HA is a feature in CDH (taht’s the Cloudera Hadoop Distribution). In case you don’t use cloudera just replace nameservice1 with the host running the namenode service.

A simple hive script with 2 parameters keeps the partitions up-to-date. It’s called once a day via bash and some logic to generate the parameters

Now that we have access to the Events data in a Key/Value format, we can start thinking about how to pivot it back into the original row based format. The first step is find all RecordsIds we want to pivot. For right now let’s assume we want to pivot all Key/Value pairs for all Events of a particular day. The list for Records can be determined by the statement below.

The next step is to cleanse the data (just to make make sure that we don’t have any null values for either Key or Value and that we don’t have any non-printable characters (scoop seems to have problems with those). We accomplish that by a simple regular expression (an inline function in hive) that replaces each non printable character with a ” ” (space).

At this point we have the Events (by RecordId) that we want to pivot as well as the individual Key/Value pairs (cleansed). The next step is now to take all Key/Value pairs and re-assemble the original events. This can be pretty easily accomplished by multiple self joins of the original data set. Let’s assume the cleansed Key/Value pairs were available in EventsClean then it would take 4 self joins to re-assemble the original events (see below).

There’s only one problem with that approach. We are collecting between 300 and 400 million Key/Value pairs, i.e. this approach isn’t working due to the size of the result sets unless we are able to narrow down the self joins to only the data that’s necessary for the specific self join. Surprisingly it’s not so difficult to do that. All we have to do is to partition the EventsClean data set by Event type and the approach works like a charm. Instead of EventsClean we will call the table EventsPartitioned.

This way instead of joining the full data set we only merge the data needed to the existing data set. So putting it all together we get the scripts below. The LEFT SEMI JOIN logic is a workaround for an ‘ in ‘ subquery, i.e. find all Events that have a RecordId in ( … subset of Events from above …). The Key/Value pairs returned by this logic are now being stored in a partitioned table that is partitioned by Event type.

So the last step is to perform the self joins as explained above.

Stats

Our current Cloudera based Hadoop infrastructure consists of 6 data node and 3 flume nodes. We collect about 350 million Key/Value pairs for about 20 million events. This equates to about 13 GB net data volume (about 40 GB total).

Data collection doesn’t put any noticeable load on the environment. Pivoting a daily data set takes about 60 minutes. However, we run the pivoting process once an hour. Due to the overhead of finding the necessary RecordIds the hourly process takes about 15 minutes.

Originally published at engineering.snagajob.com on June 4, 2014.

--

--