(Part 2 of 3) Embrace Choice — Curate & Analyze NOAA Weather Dataset using Big Data Engines Hive/Spark & Presto

In the 1st part of this story, we saw how productive it can get using Apache Hive to Curate & Analyze the NOAA weather Dataset. In this part we will focus on Apache Spark and see how we can achieve the same results using the fast in-memory processing of Spark and also look at the tradeoffs. Below are the steps involved.

Step 1: Acquire the GHCN Historical Weather data. Use Scala’s sys.process._ api to run shell commands to bring the data into hdfs.

import sys.process._
"curl -O ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd_hcn.tar.gz".!
"tar -zxf ghcnd_hcn.tar.gz".!
"curl -O ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt".!
"hadoop fs -put ghcnd_hcn /tmp/".!
"hadoop fs -put ghcnd-stations.txt /tmp/".!

Step 2: Write Helper function, to handle the positional fixed field encoding of GHCN format as specified at ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/readme.txt. We will use the java’s Matcher & Pattern Regex API’s to achieve this.

import java.util.regex.Matcher;
import java.util.regex.Pattern;
def regExSplitter(row: String, pattern: String ): Array[String] = { 
val inputPattern = Pattern.compile(pattern, Pattern.DOTALL)
val m = inputPattern.matcher(row);
var colArr=Array[String]()
if(m.matches()){
for( a <- 1 to m.groupCount()){
colArr = colArr :+ m.group(a).toString.trim
}
}
colArr
}
val stations_regex_pattern="(.{11})(.{9})(.{10})(.{7})(.{3})(.{31})(.{4})(.{4})(.{6}).*"
val ghnc_daily_regex_pattern="(.{11})(.{4})(.{2})(.{4})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1})(.{5})(.{1})(.{1})(.{1}).*"

Step 3: Load GHCN datasets into spark as dataframes for in-memory Analysis. In this step, we will use the map function to transform the fixed length fields into a row in the dataframe.

case class NOAA_GHCND_STATION(STATION_ID :String,LATITUDE :String,LONGITUDE :String,ELEVATION :String,STATE :String,NAME :String,GSN_FLAG :String,HCN_FLAG :String,WMO_ID :String)
val stations_dataset=sc.textFile("/tmp/ghcnd-stations.txt")
.map( regExSplitter(_,stations_regex_pattern))
.map({ record: Array[String] => {
val STATION_ID = if (record.size>=1) record(0) else ""
val LATITUDE = if (record.size>=2) record(1) else ""
val LONGITUDE = if (record.size>=3) record(2) else ""
val ELEVATION = if (record.size>=4) record(3) else ""
val STATE = if (record.size>=5) record(4) else ""
val NAME = if (record.size>=6) record(5) else ""
val GSN_FLAG = if (record.size>=7) record(6) else ""
val HCN_FLAG = if (record.size>=8) record(7) else ""
val WMO_ID = if (record.size>=9) record(8) else ""
NOAA_GHCND_STATION(STATION_ID,LATITUDE,LONGITUDE,ELEVATION,STATE,NAME,GSN_FLAG,HCN_FLAG,WMO_ID) } })
.toDF.as[NOAA_GHCND_STATION]
stations_dataset.createOrReplaceTempView("GHCN_STATIONS_MASTER")
case class NOAA_GHCND_RECORD(
STATION_ID :String, YEAR :String, MONTH :String,ELEMENT :String,
VALUE_1 :Long, MFLAG_1 :String,QFLAG_1 :String,SFLAG_1 :String,
VALUE_2 :Long, MFLAG_2 :String,QFLAG_2 :String,SFLAG_2 :String,
VALUE_3 :Long, MFLAG_3 :String,QFLAG_3 :String,SFLAG_3 :String,
VALUE_4 :Long, MFLAG_4 :String,QFLAG_4 :String,SFLAG_4 :String,
VALUE_5 :Long, MFLAG_5 :String,QFLAG_5 :String,SFLAG_5 :String,
VALUE_6 :Long, MFLAG_6 :String,QFLAG_6 :String,SFLAG_6 :String,
VALUE_7 :Long, MFLAG_7 :String,QFLAG_7 :String,SFLAG_7 :String,
VALUE_8 :Long, MFLAG_8 :String,QFLAG_8 :String,SFLAG_8 :String,
VALUE_9 :Long, MFLAG_9 :String,QFLAG_9 :String,SFLAG_9 :String,
VALUE_10 :Long, MFLAG_10 :String,QFLAG_10 :String,SFLAG_10 :String,
VALUE_11 :Long, MFLAG_11 :String,QFLAG_11 :String,SFLAG_11 :String,
VALUE_12 :Long, MFLAG_12 :String,QFLAG_12 :String,SFLAG_12 :String,
VALUE_13 :Long, MFLAG_13 :String,QFLAG_13 :String,SFLAG_13 :String,
VALUE_14 :Long, MFLAG_14 :String,QFLAG_14 :String,SFLAG_14 :String,
VALUE_15 :Long, MFLAG_15 :String,QFLAG_15 :String,SFLAG_15 :String,
VALUE_16 :Long, MFLAG_16 :String,QFLAG_16 :String,SFLAG_16 :String,
VALUE_17 :Long, MFLAG_17 :String,QFLAG_17 :String,SFLAG_17 :String,
VALUE_18 :Long, MFLAG_18 :String,QFLAG_18 :String,SFLAG_18 :String,
VALUE_19 :Long, MFLAG_19 :String,QFLAG_19 :String,SFLAG_19 :String,
VALUE_20 :Long, MFLAG_20 :String,QFLAG_20 :String,SFLAG_20 :String,
VALUE_21 :Long, MFLAG_21 :String,QFLAG_21 :String,SFLAG_21 :String,
VALUE_22 :Long, MFLAG_22 :String,QFLAG_22 :String,SFLAG_22 :String,
VALUE_23 :Long, MFLAG_23 :String,QFLAG_23 :String,SFLAG_23 :String,
VALUE_24 :Long, MFLAG_24 :String,QFLAG_24 :String,SFLAG_24 :String,
VALUE_25 :Long, MFLAG_25 :String,QFLAG_25 :String,SFLAG_25 :String,
VALUE_26 :Long, MFLAG_26 :String,QFLAG_26 :String,SFLAG_26 :String,
VALUE_27 :Long, MFLAG_27 :String,QFLAG_27 :String,SFLAG_27 :String,
VALUE_28 :Long, MFLAG_28 :String,QFLAG_28 :String,SFLAG_28 :String,
VALUE_29 :Long, MFLAG_29 :String,QFLAG_29 :String,SFLAG_29 :String,
VALUE_30 :Long, MFLAG_30 :String,QFLAG_30 :String,SFLAG_30 :String,
VALUE_31 :Long, MFLAG_31 :String,QFLAG_31 :String,SFLAG_31 :String)
val weather_dataset=sc.textFile("/tmp/ghcnd_hcn/*")
.map( regExSplitter(_,ghnc_daily_regex_pattern))
.map({ record: Array[String] => NOAA_GHCND_RECORD(record(0),record(1),record(2),record(3),record(4).toLong,record(5),record(6),record(7),record(8).toLong,record(9),record(10),record(11),record(12).toLong,record(13),record(14),record(15),record(16).toLong,record(17),record(18),record(19),record(20).toLong,record(21),record(22),record(23),record(24).toLong,record(25),record(26),record(27),record(28).toLong,record(29),record(30),record(31),record(32).toLong,record(33),record(34),record(35),record(36).toLong,record(37),record(38),record(39),record(40).toLong,record(41),record(42),record(43),record(44).toLong,record(45),record(46),record(47),record(48).toLong,record(49),record(50),record(51),record(52).toLong,record(53),record(54),record(55),record(56).toLong,record(57),record(58),record(59),record(60).toLong,record(61),record(62),record(63),record(64).toLong,record(65),record(66),record(67),record(68).toLong,record(69),record(70),record(71),record(72).toLong,record(73),record(74),record(75),record(76).toLong,record(77),record(78),record(79),record(80).toLong,record(81),record(82),record(83),record(84).toLong,record(85),record(86),record(87),record(88).toLong,record(89),record(90),record(91),record(92).toLong,record(93),record(94),record(95),record(96).toLong,record(97),record(98),record(99),record(100).toLong,record(101),record(102),record(103),record(104).toLong,record(105),record(106),record(107),record(108).toLong,record(109),record(110),record(111),record(112).toLong,record(113),record(114),record(115),record(116).toLong,record(117),record(118),record(119),record(120).toLong,record(121),record(122),record(123),record(124).toLong,record(125),record(126),record(127)) })
.toDF.as[NOAA_GHCND_RECORD]
weather_dataset.createOrReplaceTempView("GHCN_DAILY_WEATHER")

Step 4: Process/Analyze dataframe by using built-in table generating function in Spark SQL explode in conjunction with lateral view and map.

val weather_dataset_exploded = sqlContext.sql("""
SELECT station_id,year, month , day, element ,cast(val["value"] as decimal(10)) as value, val["mflag"] as mflag, val["qflag"] as qflag, val["sflag"] as sflag
FROM
(
SELECT
station_id, element, year, month,
MAP(1, MAP('value', trim(value_1), 'mflag', trim(mflag_1), 'qflag', trim(qflag_1), 'sflag', trim(sflag_1)),
2, MAP('value', trim(value_2), 'mflag', trim(mflag_2), 'qflag', trim(qflag_2), 'sflag', trim(sflag_2)),
3, MAP('value', trim(value_3), 'mflag', trim(mflag_3), 'qflag', trim(qflag_3), 'sflag', trim(sflag_3)),
4, MAP('value', trim(value_4), 'mflag', trim(mflag_4), 'qflag', trim(qflag_4), 'sflag', trim(sflag_4)),
5, MAP('value', trim(value_5), 'mflag', trim(mflag_5), 'qflag', trim(qflag_5), 'sflag', trim(sflag_5)),
6, MAP('value', trim(value_6), 'mflag', trim(mflag_6), 'qflag', trim(qflag_6), 'sflag', trim(sflag_6)),
7, MAP('value', trim(value_7), 'mflag', trim(mflag_7), 'qflag', trim(qflag_7), 'sflag', trim(sflag_7)),
8, MAP('value', trim(value_8), 'mflag', trim(mflag_8), 'qflag', trim(qflag_8), 'sflag', trim(sflag_8)),
9, MAP('value', trim(value_9), 'mflag', trim(mflag_9), 'qflag', trim(qflag_9), 'sflag', trim(sflag_9)),
10, MAP('value', trim(value_10), 'mflag', trim(mflag_10), 'qflag', trim(qflag_10), 'sflag', trim(sflag_10)),
11, MAP('value', trim(value_11), 'mflag', trim(mflag_11), 'qflag', trim(qflag_11), 'sflag', trim(sflag_11)),
12, MAP('value', trim(value_12), 'mflag', trim(mflag_12), 'qflag', trim(qflag_12), 'sflag', trim(sflag_12)),
13, MAP('value', trim(value_13), 'mflag', trim(mflag_13), 'qflag', trim(qflag_13), 'sflag', trim(sflag_13)),
14, MAP('value', trim(value_14), 'mflag', trim(mflag_14), 'qflag', trim(qflag_14), 'sflag', trim(sflag_14)),
15, MAP('value', trim(value_15), 'mflag', trim(mflag_15), 'qflag', trim(qflag_15), 'sflag', trim(sflag_15)),
16, MAP('value', trim(value_16), 'mflag', trim(mflag_16), 'qflag', trim(qflag_16), 'sflag', trim(sflag_16)),
17, MAP('value', trim(value_17), 'mflag', trim(mflag_17), 'qflag', trim(qflag_17), 'sflag', trim(sflag_17)),
18, MAP('value', trim(value_18), 'mflag', trim(mflag_18), 'qflag', trim(qflag_18), 'sflag', trim(sflag_18)),
19, MAP('value', trim(value_19), 'mflag', trim(mflag_19), 'qflag', trim(qflag_19), 'sflag', trim(sflag_19)),
20, MAP('value', trim(value_20), 'mflag', trim(mflag_20), 'qflag', trim(qflag_20), 'sflag', trim(sflag_20)),
21, MAP('value', trim(value_21), 'mflag', trim(mflag_21), 'qflag', trim(qflag_21), 'sflag', trim(sflag_21)),
22, MAP('value', trim(value_22), 'mflag', trim(mflag_22), 'qflag', trim(qflag_22), 'sflag', trim(sflag_22)),
23, MAP('value', trim(value_23), 'mflag', trim(mflag_23), 'qflag', trim(qflag_23), 'sflag', trim(sflag_23)),
24, MAP('value', trim(value_24), 'mflag', trim(mflag_24), 'qflag', trim(qflag_24), 'sflag', trim(sflag_24)),
25, MAP('value', trim(value_25), 'mflag', trim(mflag_25), 'qflag', trim(qflag_25), 'sflag', trim(sflag_25)),
26, MAP('value', trim(value_26), 'mflag', trim(mflag_26), 'qflag', trim(qflag_26), 'sflag', trim(sflag_26)),
27, MAP('value', trim(value_27), 'mflag', trim(mflag_27), 'qflag', trim(qflag_27), 'sflag', trim(sflag_27)),
28, MAP('value', trim(value_28), 'mflag', trim(mflag_28), 'qflag', trim(qflag_28), 'sflag', trim(sflag_28)),
29, MAP('value', trim(value_29), 'mflag', trim(mflag_29), 'qflag', trim(qflag_29), 'sflag', trim(sflag_29)),
30, MAP('value', trim(value_30), 'mflag', trim(mflag_30), 'qflag', trim(qflag_30), 'sflag', trim(sflag_30)),
31, MAP('value', trim(value_31), 'mflag', trim(mflag_31), 'qflag', trim(qflag_31), 'sflag', trim(sflag_31))
) tmp_column
FROM GHCN_DAILY_WEATHER
) x
LATERAL VIEW EXPLODE(tmp_column) exptbl AS day, val""")
val noaa_weather_refined=weather_dataset_exploded.join(stations_dataset, Seq("STATION_ID"))
noaa_weather_refined.write.saveAsTable("NOAA_WEATHER.GHCN_DAILY_WEATHER_REFINED");

Evidently, curating the same dataset to attain similar results is more complex when compared to Apache Hive option we covered in the part 1 of this series.. But one distinct advantage with Spark is that, we could have taken the above Spark ETL forward to build a predictive model using Spark MLlib models that does feature engineering from the different historical weather data elements and predict lets say next day’s weather for any given city. The rational architect in me would also argue that, it would be better to curate the dataset as hive tables in Apache Hive and then load them up in Apache Spark for Predictive/Advanced Analytics use cases.

Having done all the hardwork here, In the last and final part of this series, we will cover Presto to demonstrate accelerated access to the curated data for BI & Reporting purposes.

Click here to proceed to Part 3

**Note: The above content was curated using Qubole’s Big Data Activation Platform. You may sign up for Qubole at https://www.qubole.com/products/pricing/