March 19, 2015
One of the most-cited advantages of Hadoop is that it enables a “schema-on-read” data analysis strategy. “Schema-on-read” means you do not need to know how you will use your data when you are storing it. This allows you to innovate quickly by asking different and more powerful questions after storing the data. However, few people have actually had hands-on experience with how schema-on-read works behind the scene. In this blog, we will give an example of a schema-on-read approach, outline the mechanics behind it, and give you the opportunity to try it out effortlessly.
“Schema-on-write”
In many traditional data systems, users decide on the schema of their data before loading data into their system. For example, in an RDBMS such as MySQL, you first create a table:
create table users (id int, first_name varchar(64), last_name varchar(64), age int);
and then load data into the table:
load data local infile '/data/new_users_2015-03-17' into table users;
MySQL is then responsible for how the data is written to disk. In almost all systems, your underlying data is closely tied to the schema in order to ensure high performance. This also means that changing your schema often requires rewriting your data. For example, when you add a column to your data:
alter table users add column gender varchar(1);
your data system will often rewrite all your data. Schema-on-write is good when you understand your data, understand how you will access it, and want to enforce the schema you have chosen. However, many times you do not fully understand your data and how you will use it. This is especially true for analytics and data mining, where you may have a lot of data that you don’t fully understand yet or don’t all fit the same schema.
“Schema-on-read” with Hadoop
The Hadoop ecosystem provides a cheap and efficient storage file system (HDFS), allowing you to store all the data you have. At write time, you are more concerned about acquiring all the data that might be useful, as opposed to spending cycles on understanding it deeply or processing it in a complex way. In fact, usually those responsible for loading data into your Hadoop cluster are different from those reading the data. Typically, you first put your data on HDFS:
hdfs dfs -put /local/path/userdump /hdfs/path/data/users
and then create a Hive table over your data:
create external table users (id int, first_name string, last_name string, age int) row format delimited fields terminated by ',' location '/hdfs/path/data/users';
This flips the “schema-on-write” model on its head. Before, you would start out by understanding how you need to use your data, design appropriate schemas, and then fit your data to those schemas. Now, you start with your data and add schema to fit your needs. If you decide to add a column to your schema, Hive simply reconfigures the code that reads your data instead of rewriting all of it. In other words, changing the way you use your data no longer changes the way you write it. This flexible approach is much better when you have lots of data that you can use in many different ways, or when you are still trying to understand the data you have.
“Schema-on-read” in Action
Normally, to try this out you would have to set up a cluster with all the relevant Hadoop components. We will instead use the Cask Data Application Platform (CDAP) SDK, which provides an integrated experience while running all those Hadoop components underneath. We will use CDAP to take a hands-on look at how to ingest, explore, and refine your data.
STEP 1: Download and start the CDAP SDK:
Download the sdk here. Then execute:
unzip cdap-sdk-2.8.0-SNAPSHOT.zip
cd cdap-sdk-2.8.0-SNAPSHOT
bin/cdap.sh start
STEP 2: Start the CDAP Command Line Interface:
bin/cdap-cli.sh
STEP 3: Ingest data:
Data can be ingested into CDAP using Streams. Let’s create a Stream using the CLI:
> create stream log_events
One way to ingest data into a Stream is to load a file. Download an example log here and load it into the Stream:
> load stream log_events accesslog.txt
STEP 4: Take a first look at the data:
Now that we have ingested the data, we can query the data to view the raw ingested content:
> execute 'select * from stream_log_events limit 2'
+================================================================================================+
| stream_log_events.ts: BIGINT | stream_log_events.headers: ma | stream_log_events.body: STRING |
| | p <string,string> | |
+================================================================================================+
| 1426784733541 | {"content.type":"text/plain"} | 69.181.160.120 - - [08/Feb/201 |
| | | 5:04:36:40 +0000] "GET /ajax/p |
| | | lanStatusHistoryNeighbouringSu |
| | | mmaries.action?planKey=COOP-DB |
| | | T&buildNumber=284&_=1423341312 |
| | | 519 HTTP/1.1" 200 508 "http:// |
| | | builds.cask.co/browse/COOP-DBT |
| | | -284/log" "Mozilla/5.0 (Macint |
| | | osh; Intel Mac OS X 10_10_1) A |
| | | ppleWebKit/537.36 (KHTML, like |
| | | Gecko) Chrome/38.0.2125.122 S |
| | | afari/537.36" |
|------------------------------------------------------------------------------------------------|
| 1426784733541 | {"content.type":"text/plain"} | 69.181.160.120 - - [08/Feb/201 |
| | | 5:04:36:47 +0000] "GET /rest/a |
| | | pi/latest/server?_=14233413125 |
| | | 20 HTTP/1.1" 200 45 "http://bu |
| | | ilds.cask.co/browse/COOP-DBT-2 |
| | | 84/log" "Mozilla/5.0 (Macintos |
| | | h; Intel Mac OS X 10_10_1) App |
| | | leWebKit/537.36 (KHTML, like G |
| | | ecko) Chrome/38.0.2125.122 Saf |
| | | ari/537.36" |
+================================================================================================+
You can see that the data ingested has information such as IP address, HTTP status code, and URL.
STEP 5: Attach a new schema to the stream:
After examining the data in its raw form, you recognize it as an Apache access log. You can now attach a format and schema that reads the data into the Apache Combined Log Format:
> set stream format log_events clf
You can look at the updated Hive table schema:
> execute 'describe stream_log_events'+==============================================================================+
| col_name: STRING | data_type: STRING | comment: STRING |
+==============================================================================+
| ts | bigint | from deserializer |
| headers | map<string,string> | from deserializer |
| remote_host | string | from deserializer |
| remote_login | string | from deserializer |
| auth_user | string | from deserializer |
| date | string | from deserializer |
| request | string | from deserializer |
| status | int | from deserializer |
| content_length | int | from deserializer |
| referrer | string | from deserializer |
| user_agent | string | from deserializer |
+==============================================================================+
and examine the data using the new schema:
> execute 'select remote_host, request, status from stream_log_events limit 2'+==============================================================================+
| remote_host: STRING | request: STRING | status: INT |
+==============================================================================+
| 69.181.160.120 | GET /ajax/planStatusHis | 200 |
| | toryNeighbouringSummari | |
| | es.action?planKey=COOP- | |
| | DBT&buildNumber=284&_=1 | |
| | 423341312519 HTTP/1.1 | |
|------------------------------------------------------------------------------|
| 69.181.160.120 | GET /rest/api/latest/se | 200 |
| | rver?_=1423341312520 HT | |
| | TP/1.1 | |
+==============================================================================+
You can now perform advanced analysis on this data using SQL queries. For example, you can count the unique IPs:
> execute 'select COUNT(DISTINCT(remote_host)) from stream_log_events'+=====================================+
| _c0: BIGINT |
+=====================================+
| 22 |
+=====================================+
and retrieve statistics on the Stream:
> get stream-stats log_events
Analyzing 100 Stream events in the time range [0, 9223372036854775807]...column: remote_host, type: STRING
Unique elements: 1column: remote_login, type: STRING
Unique elements: 0column: auth_user, type: STRING
Unique elements: 0column: date, type: STRING
Unique elements: 97column: request, type: STRING
Unique elements: 100column: status, type: INT
Unique elements: 1
Histogram:
[200, 299]: 100 |+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++column: content_length, type: INT
Unique elements: 2
Histogram:
[0, 99]: 35 |++++++++++++++++++++
[500, 599]: 65 |++++++++++++++++++++++++++++++++++++++++++++++++
column: referrer, type: STRING
Unique elements: 1column: user_agent, type: STRING
Unique elements: 1
You can attach any of these schema: TSV, CSV, AVRO, and CLF. In a later release of CDAP, we will open the APIs to supporting any custom format on Streams.
Under the Hood
Now, let’s look at what happens behind the scenes to give you an idea of what you would have needed to do without CDAP.
Ingesting Data
A Stream is a sequence of time-ordered events and is stored as a collection of files on HDFS. CDAP provides a scalable StreamWriter component that exposes several RESTful APIs to write data to Streams. You can write events as you receive them in realtime, or you can add them in batches by sending entire files. In the example above, we loaded a file into a Stream. Behind the scenes, CDAP reads events from the file, attaches some metadata, and writes the events to a temporary location on HDFS. Once the entire file has been consumed, it is moved into place with some extra coordination to make sure realtime writes are not interrupted. More information on how Streams are internally organized and managed is available in our presentation.
In addition, CDAP creates an external table in Hive for the Stream with a column for event timestamp, a column for event headers, and a column interpreting the event body as text.
+================================================================+
| col_name: STRING | data_type: STRING | comment: STRING |
+================================================================+
| ts | bigint | from deserializer |
| headers | map<string,string> | from deserializer |
| body | string | from deserializer |
+================================================================+
Defining the Schema
By default, a Stream is configured with a default format and schema. A format defines how data is read, while a schema defines the structure of that data. For example, the “csv” format reads data as comma-separated values, while the schema defines names and types for those values. CDAP supports assigning a new format and schema to a Stream. If a new schema is assigned, CDAP drops the Hive table associated with the old schema and recreates a new table with the new schema.
Since Hive tables created by CDAP are external tables, no data is lost when a table is dropped. Moreover, since schema is applied on read, schema changes can be done in parallel to writing data to a Stream.
Querying Data
A query on a Stream translates directly to a Hive query on the Stream’s Hive table. A custom StreamStorageHandler is used to read data from Streams into a form that Hive can understand. The storage handler uses a RecordFormat to read Stream events, then uses Java reflection to flatten record objects into a list of columns that Hive understands. When we ran queries in the above example, we used the CDAP CLI, which in turn uses CDAP RESTful APIs to submit queries.
Summary
You have now learned what “schema-on-read” means and how CDAP implements the concept. Ingesting and exploring data is often the first step in writing a data application. You can check out how to write a simple realtime application that processes the data you ingested by following one of the CDAP guides.