(Part 1 of 3) Embrace Choice — Curate & Analyze NOAA Weather Dataset using Big Data Engines Hive/Spark & Presto

Big Data Eco System is insanely complex, but yet two engines/technologies viz., Hive & Spark standout. More recently Facebook developed Presto, which is playing a vital role in providing accelerated access to the data analyzed in hadoop and helps avoid the need to move activated/refined datasets to an on-prem or cloud MPP data warehouse for BI/Reporting use cases.

Apache Hive is a data warehouse that supports analysis of large datasets stored in Hadoop’s HDFS and compatible file systems such as Amazon S3, Azure Blob & Azure Data Lake Store File systems. Hive over the years has proven to be great for Batch ETL. Apache Spark is an open source cluster computing framework, originally developed at the University of California, Berkley’s AMP Lab that excels in use cases like continuous applications which require streaming data to be processed, analyzed and stored. Presto is an open source distributed SQL query engine for running interactive analytic queries. Presto can use multiple threads per worker across multiple machines when executing a query. With Presto, If you have lets say 10 compute nodes each with a 4-core processor then you’d effectively have 40 cores to execute your query across the cluster.

Though the publicly available NOAA daily Global Historical Climatology Network (GHCN-DAILY) Dataset cannot be categorized as a bigdata class dataset, It has the breadth and depth of weather data for every single day since late 1800’s across many united states geographies which makes it a really important dataset in the context of big data. Many a times business may need to figure out how weather has been impacting their business or understand how weather is correlating with the maintenance cycles of equipment for industrial preventative maintenance use cases.

In this 1st part of 3 part series to demonstrate the power of choice in Big Data, we will learn about how productive it can get when using Apache Hive to curate the daily global historical climatology network(GCHN_daily) weather dataset. Below are the steps involved.

Step 1: Acquire the GHCN Daily weather data

$ curl -O ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd_hcn.tar.gz
$ curl -O ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt
$ curl -O ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/readme.txt
tar -zxf ghcnd_hcn.tar.gz
$ hadoop fs -mkdir s3://<bucket_name>/datasets/noaa_weather
$ hadoop fs -put ghcnd_hcn s3://<bucket_name>/datasets/noaa_weather/ghcnd_hcn
$ hadoop fs -mkdir s3://<bucket_name>/datasets/noaa_weather/ghcnd_states
$ hadoop fs -put ghcnd-stations.txt s3://<bucket_name>/datasets/noaa_weather/ghcnd_stations/
$
hadoop fs -put readme.txt s3://<bucket_name>/datasets/noaa_weather/

Step 2: Create External Tables to make the raw data queryable using hive QL for BI, Reporting & ETL use cases. This external table uses RegEx Serde to read the positional encoding of the GHCN daily file ( the field descriptions and their positions can be found in the Readme document hosted by NOAA at ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/readme.txt )

CREATE DATABASE IF NOT EXISTS NOAA_WEATHER;
DROP TABLE IF EXISTS NOAA_WEATHER.GHCN_DAILY_WEATHER;
CREATE EXTERNAL TABLE NOAA_WEATHER.GHCN_DAILY_WEATHER
(
STATION_ID STRING, YEAR STRING, MONTH STRING,ELEMENT STRING,
VALUE_1 STRING, MFLAG_1 STRING,QFLAG_1 STRING,SFLAG_1 STRING,
VALUE_2 STRING, MFLAG_2 STRING,QFLAG_2 STRING,SFLAG_2 STRING,
VALUE_3 STRING, MFLAG_3 STRING,QFLAG_3 STRING,SFLAG_3 STRING,
VALUE_4 STRING, MFLAG_4 STRING,QFLAG_4 STRING,SFLAG_4 STRING,
VALUE_5 STRING, MFLAG_5 STRING,QFLAG_5 STRING,SFLAG_5 STRING,
VALUE_6 STRING, MFLAG_6 STRING,QFLAG_6 STRING,SFLAG_6 STRING,
VALUE_7 STRING, MFLAG_7 STRING,QFLAG_7 STRING,SFLAG_7 STRING,
VALUE_8 STRING, MFLAG_8 STRING,QFLAG_8 STRING,SFLAG_8 STRING,
VALUE_9 STRING, MFLAG_9 STRING,QFLAG_9 STRING,SFLAG_9 STRING,
VALUE_10 STRING, MFLAG_10 STRING,QFLAG_10 STRING,SFLAG_10 STRING,
VALUE_11 STRING, MFLAG_11 STRING,QFLAG_11 STRING,SFLAG_11 STRING,
VALUE_12 STRING, MFLAG_12 STRING,QFLAG_12 STRING,SFLAG_12 STRING,
VALUE_13 STRING, MFLAG_13 STRING,QFLAG_13 STRING,SFLAG_13 STRING,
VALUE_14 STRING, MFLAG_14 STRING,QFLAG_14 STRING,SFLAG_14 STRING,
VALUE_15 STRING, MFLAG_15 STRING,QFLAG_15 STRING,SFLAG_15 STRING,
VALUE_16 STRING, MFLAG_16 STRING,QFLAG_16 STRING,SFLAG_16 STRING,
VALUE_17 STRING, MFLAG_17 STRING,QFLAG_17 STRING,SFLAG_17 STRING,
VALUE_18 STRING, MFLAG_18 STRING,QFLAG_18 STRING,SFLAG_18 STRING,
VALUE_19 STRING, MFLAG_19 STRING,QFLAG_19 STRING,SFLAG_19 STRING,
VALUE_20 STRING, MFLAG_20 STRING,QFLAG_20 STRING,SFLAG_20 STRING,
VALUE_21 STRING, MFLAG_21 STRING,QFLAG_21 STRING,SFLAG_21 STRING,
VALUE_22 STRING, MFLAG_22 STRING,QFLAG_22 STRING,SFLAG_22 STRING,
VALUE_23 STRING, MFLAG_23 STRING,QFLAG_23 STRING,SFLAG_23 STRING,
VALUE_24 STRING, MFLAG_24 STRING,QFLAG_24 STRING,SFLAG_24 STRING,
VALUE_25 STRING, MFLAG_25 STRING,QFLAG_25 STRING,SFLAG_25 STRING,
VALUE_26 STRING, MFLAG_26 STRING,QFLAG_26 STRING,SFLAG_26 STRING,
VALUE_27 STRING, MFLAG_27 STRING,QFLAG_27 STRING,SFLAG_27 STRING,
VALUE_28 STRING, MFLAG_28 STRING,QFLAG_28 STRING,SFLAG_28 STRING,
VALUE_29 STRING, MFLAG_29 STRING,QFLAG_29 STRING,SFLAG_29 STRING,
VALUE_30 STRING, MFLAG_30 STRING,QFLAG_30 STRING,SFLAG_30 STRING,
VALUE_31 STRING, MFLAG_31 STRING,QFLAG_31 STRING,SFLAG_31 STRING
)
ROW FORMAT SERDE "org.apache.hadoop.hive.contrib.serde2.RegexSerDe"
WITH SERDEPROPERTIES ("input.regex" = "(.{11})(.{4})(.{2})(.{4})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1}).*" )
LOCATION "s3://<bucket_name>/datasets/noaa_weather/ghcnd_hcn";
DROP TABLE IF EXISTS NOAA_WEATHER.GHCN_STATIONS_MASTER;
CREATE EXTERNAL TABLE NOAA_WEATHER.GHCN_STATIONS_MASTER
(
ID STRING,
LATITUDE STRING,
LONGITUDE STRING,
ELEVATION STRING,
STATE STRING,
NAME STRING,
GSN_FLAG STRING,
HCN_FLAG STRING,
WMO_ID STRING
)
ROW FORMAT SERDE "org.apache.hadoop.hive.contrib.serde2.RegexSerDe"
WITH SERDEPROPERTIES ("input.regex" = "(.{11})(.{9})(.{10})(.{7})(.{3})(.{31})(.{4})(.{4})(.{6}).*" )
LOCATION "s3://<bucket_name>/datasets/noaa_weather/ghcnd_stations";

Step 3: Analyze/Process Data to make it viable for Analytics use cases. Here we will transform the weather elements (TMAX, TMIN etc.,) stored in a single row for each day into multiple rows and save it in a refined table with optimized ORC file format. In order to do this we will use hive’s built-in table generating function(UDTF) “explode()” in conjunction with “Lateral View” & “Map” function. We will also use clustered by & bucketing to improve query performance on station_id filters. Finally we will blend the stations master data to curate a refined table for weather.

DROP TABLE IF EXISTS noaa_weather.GHCN_DAILY_WEATHER_ORC;
CREATE TABLE noaa_weather.GHCN_DAILY_WEATHER_RAW
(STATION_ID STRING,YEAR STRING, MONTH STRING, DAY STRING, ELEMENT STRING, VALUE DECIMAL(10), MFLAG STRING, QFLAG STRING, SFLAG STRING) CLUSTERED BY (STATION_ID) INTO 6 BUCKETS STORED AS ORC;
INSERT OVERWRITE TABLE noaa_weather.GHCN_DAILY_WEATHER_RAW
SELECT station_id,year, month , day, element ,cast(val["value"] as decimal(10,2)) as value, val["mflag"] as mflag, val["qflag"] as qflag, val["sflag"] as sflag
FROM
(
SELECT
station_id, element, year, month,
MAP(1, MAP('value', trim(value_1), 'mflag', trim(mflag_1), 'qflag', trim(qflag_1), 'sflag', trim(sflag_1)),
2, MAP('value', trim(value_2), 'mflag', trim(mflag_2), 'qflag', trim(qflag_2), 'sflag', trim(sflag_2)),
3, MAP('value', trim(value_3), 'mflag', trim(mflag_3), 'qflag', trim(qflag_3), 'sflag', trim(sflag_3)),
4, MAP('value', trim(value_4), 'mflag', trim(mflag_4), 'qflag', trim(qflag_4), 'sflag', trim(sflag_4)),
5, MAP('value', trim(value_5), 'mflag', trim(mflag_5), 'qflag', trim(qflag_5), 'sflag', trim(sflag_5)),
6, MAP('value', trim(value_6), 'mflag', trim(mflag_6), 'qflag', trim(qflag_6), 'sflag', trim(sflag_6)),
7, MAP('value', trim(value_7), 'mflag', trim(mflag_7), 'qflag', trim(qflag_7), 'sflag', trim(sflag_7)),
8, MAP('value', trim(value_8), 'mflag', trim(mflag_8), 'qflag', trim(qflag_8), 'sflag', trim(sflag_8)),
9, MAP('value', trim(value_9), 'mflag', trim(mflag_9), 'qflag', trim(qflag_9), 'sflag', trim(sflag_9)),
10, MAP('value', trim(value_10), 'mflag', trim(mflag_10), 'qflag', trim(qflag_10), 'sflag', trim(sflag_10)),
11, MAP('value', trim(value_11), 'mflag', trim(mflag_11), 'qflag', trim(qflag_11), 'sflag', trim(sflag_11)),
12, MAP('value', trim(value_12), 'mflag', trim(mflag_12), 'qflag', trim(qflag_12), 'sflag', trim(sflag_12)),
13, MAP('value', trim(value_13), 'mflag', trim(mflag_13), 'qflag', trim(qflag_13), 'sflag', trim(sflag_13)),
14, MAP('value', trim(value_14), 'mflag', trim(mflag_14), 'qflag', trim(qflag_14), 'sflag', trim(sflag_14)),
15, MAP('value', trim(value_15), 'mflag', trim(mflag_15), 'qflag', trim(qflag_15), 'sflag', trim(sflag_15)),
16, MAP('value', trim(value_16), 'mflag', trim(mflag_16), 'qflag', trim(qflag_16), 'sflag', trim(sflag_16)),
17, MAP('value', trim(value_17), 'mflag', trim(mflag_17), 'qflag', trim(qflag_17), 'sflag', trim(sflag_17)),
18, MAP('value', trim(value_18), 'mflag', trim(mflag_18), 'qflag', trim(qflag_18), 'sflag', trim(sflag_18)),
19, MAP('value', trim(value_19), 'mflag', trim(mflag_19), 'qflag', trim(qflag_19), 'sflag', trim(sflag_19)),
20, MAP('value', trim(value_20), 'mflag', trim(mflag_20), 'qflag', trim(qflag_20), 'sflag', trim(sflag_20)),
21, MAP('value', trim(value_21), 'mflag', trim(mflag_21), 'qflag', trim(qflag_21), 'sflag', trim(sflag_21)),
22, MAP('value', trim(value_22), 'mflag', trim(mflag_22), 'qflag', trim(qflag_22), 'sflag', trim(sflag_22)),
23, MAP('value', trim(value_23), 'mflag', trim(mflag_23), 'qflag', trim(qflag_23), 'sflag', trim(sflag_23)),
24, MAP('value', trim(value_24), 'mflag', trim(mflag_24), 'qflag', trim(qflag_24), 'sflag', trim(sflag_24)),
25, MAP('value', trim(value_25), 'mflag', trim(mflag_25), 'qflag', trim(qflag_25), 'sflag', trim(sflag_25)),
26, MAP('value', trim(value_26), 'mflag', trim(mflag_26), 'qflag', trim(qflag_26), 'sflag', trim(sflag_26)),
27, MAP('value', trim(value_27), 'mflag', trim(mflag_27), 'qflag', trim(qflag_27), 'sflag', trim(sflag_27)),
28, MAP('value', trim(value_28), 'mflag', trim(mflag_28), 'qflag', trim(qflag_28), 'sflag', trim(sflag_28)),
29, MAP('value', trim(value_29), 'mflag', trim(mflag_29), 'qflag', trim(qflag_29), 'sflag', trim(sflag_29)),
30, MAP('value', trim(value_30), 'mflag', trim(mflag_30), 'qflag', trim(qflag_30), 'sflag', trim(sflag_30)),
31, MAP('value', trim(value_31), 'mflag', trim(mflag_31), 'qflag', trim(qflag_31), 'sflag', trim(sflag_31))
) tmp_column
FROM noaa_weather.GHCN_DAILY_WEATHER
) x
LATERAL VIEW EXPLODE(tmp_column) exptbl AS day, val
CLUSTER by station_id;
DROP TABLE IF EXISTS noaa_weather.GHCN_DAILY_WEATHER_REFINED;
CREATE table noaa_weather.GHCN_DAILY_WEATHER_REFINED stored as ORC AS
SELECT w.station_id,s.latitude,s.longitude, s.elevation, s.name, s.state,w.year, w.month, w.day, w.element, w.value, w.mflag, w.qflag, w.sflag from noaa_weather.GHCN_DAILY_WEATHER_RAW w join noaa_weather.GHCN_STATIONS_MASTER s on w.station_id=s.id;

There you have it, GHCN_DAILY_WEATHER_REFINED table will have all the daily weather data from late 1800’s across most geographies and cities in the US. This process also created another lookup table, which could be joined or used to filter or trend weather for any particular geography for reporting or BI purposes.

In part2 of this series, we will see how we can achieve what we did above using Apache Spark.

Click here to proceed to Part 2

**Note: The above content was curated using Qubole’s Big Data Activation Platform. You may sign up for a free Qubole account at https://www.qubole.com/products/pricing/