Is Weather Actually Changing ? — NOAA 100 years Weather Data — Time Series Analysis in Python
25GB Data, 100,790 files, Time Series Analysis & Forecasting, Hadoop, Spark, Matplotlib, Pandas, Machine Learning
Used the climate data from the database at ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ which has daily data files.
The data consists of records of monthly data collected at several weather stations. Each record is broken down into days of the month so global climate data is available for each day of every month since 1900 which includes information about high and low temperature, precipitation broken down into types and depth, humidity, cloudiness, wind and more.
Main Python Libraries Used — Pandas, NumPy, Matplotlib, Seaborn, Scikit-learn, SciPy, Plotly
Main Targets:
Combine data from various weather stations by uploading into Hadoop Cluster, using MapReduce and converting to Parquet files for easy processing
Identify Temperature change around the world since 1900 and visualize on World Map to identify the countries affected the most.
Create time series for temperature and Bubble plot for precipitation in 100 years according to the seasons using Pandas and Matplotlib
Make time-series stationary using Rolling Statistics ( Moving Averages) to identify trends and seasonality
Predict temperature trend in NewYork over next 50 years using Deep Learning & ARIMA Modelling.
Code on Github.
Wrote Hadoop MapReduce programs to combine the data from different weather stations
Mapper.py looks something like this:
import sys
import numpy as npdef isInt(value):
try:
int(value)
return True
except ValueError:
return False# input comes from STDIN (standard input)
for line in sys.stdin:
line = line.strip()
kv = line.split()
k = kv[0].split(“-”)[0]
count = 0
sum = []
kv.pop(0)
for v in kv:
if isInt(v.strip()):
if (int(v.strip()) != -9999):
a = int(v.strip())
sum.append(a)
print “%s, %f” %(str(k), np.mean(sum))
Reducer.py looked something like this:
from operator import itemgetter
import syscurrent_k = None
current_v = 0
k = Nonefor line in sys.stdin:
line = line.strip() # in case there is any space
k, v = line.split(‘,’, 1)try:
v = float(v)
except ValueError:
continueif current_k == k:
current_v += v
else:
if current_k:
print ‘%s, %f’ %(current_k, current_v)
current_v = v
current_k = kif current_k == k:
print ‘%s, %f’ %(current_k, current_v)
Convert the tab separated file output of MapReduce into a parquet file for faster execution
import os
os.environ[‘SPARK_HOME’] =”/opt/cloudera/parcels/CDH-5.8.0–1.cdh5.8.0.p0.42/lib/spark”
import findspark
findspark.init()import pyspark
conf = pyspark.SparkConf().\
setAppName(‘test_app’).\
set(‘spark.port.maxRetries’, 60).\
set(‘spark.yarn.appMasterEnv.PYSPARK_PYTHON’, ‘/home/deacuna/anaconda3/bin/python’).\
set(‘spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON’, ‘/home/deacuna/anaconda3/bin/python’).\
setMaster(‘yarn-client’).\
set(‘executor.memory’, ‘1000m’).\
set(‘spark.yarn.executor.memoryOverhead’, ‘4098’).\
set(‘spark.sql.codegen’, ‘true’).\
set(‘spark.yarn.executor.memory’, ‘500m’).\
set(‘yarn.scheduler.minimum-allocation-mb’, ‘500m’).\
set(‘spark.dynamicAllocation.maxExecutors’, ‘3’).\
set(‘jars’, ‘hdfs://eggs/graphframes-0.1.0-spark1.6.jar’).\
set(‘spark.driver.maxResultSize’, ‘4g’)from pyspark.sql import SQLContext, HiveContext
sc = pyspark.SparkContext(conf=conf)
sqlContext = HiveContext(sc)
from pyspark.sql import functions as fnmapRed_rdd = sc.textFile(“/user/mohit/IST718/data/data.tsv”) # read the tab separated filemapRed_rdd = mapRed_rdd.map(lambda x: x.split(‘\t’))mapRed_df = mapRed_rdd.toDF([‘StationID’, ‘CountryCode’, ‘Year’, ‘Month’, ‘Element’, ‘MonthAverage’])mapRed_df.write.parquet(‘IST718/data/mapRed.parquet’) # save the parquet file
The next step was representing the change on the world map, calculating the temperature changes in the countries over 100 years ( 1900–2016). As seen above.
Further to identify the Seasonal Trends in the maximum temperature and Precipitation over 100 years in USA.
Full Code of the Visualizations created on Github.
Aswe can see clearly in these visualizations, over time the max temp. during summers spiked around 2000 and also the precipitation increased with time.
The plots were created using spark, sql and hive context to create pandas data frames saved as .csv files from the parquet file on the cluster.
The time-series of temperature from 1900–2016 of each season was created using pandas and matplotlib.
The time-series created was first made/tested stationary using the concepts of Rolling Statistics (Moving Averages) and Dickey-Fuller test. This also gives us rolling mean and standard deviation to determine how stationary the timeseries is.
from statsmodels.tsa.stattools import adfuller
def test_stationarity(timeseries):
# rolling statistics
rolmean = pd.rolling_mean(timeseries, window=12)
rolstd = pd.rolling_std(timeseries, window=12)
#rolling statistics:
orig = plt.plot(timeseries, color=’blue’,label=’Original’)
mean = plt.plot(rolmean, color=’red’, label=’Rolling Mean’)
std = plt.plot(rolstd, color=’orange’, label = ‘Rolling Std’)
plt.legend(loc=’best’)
plt.title(‘Rolling Mean & Standard Deviation’)
plt.show(block=False)
#Dickey-Fuller test to test the weather time series is stationary or not
print (‘Results of Dickey-Fuller Test:’)
dftest = adfuller(timeseries, autolag=’AIC’)
dfoutput = pd.Series(dftest[0:4], index=[‘Test Statistic’,’p-value’,’#Lags Used’,’Number of Observations Used’])
for key,value in dftest[4].items():
dfoutput[‘Critical Value (%s)’%key] = value
print (dfoutput)
Sample result of the Rolling mean and standard deviation for a time-series of max temperature in US in Summer from 1992–2016.
Furthermore, for Prediction of Temperature change of NewYork was done using the ARIMA Modelling and Deep Learning.
Refer to Github for full code.