Spatial and temporal partitioning of weather data with IBM Cloud Analytics Engine
Context
My team manages historical weather data at IBM. We have multiple systems that ingest, store and retrieve data.
Every hour, the system takes a snapshot of global weather conditions and ingests it in parquet format (where each column is a weather variable) into a COS bucket. That means an hourly snapshot of data is stored as a separate set of files and to retrieve data for 24 hours the system has to read 24 different sets of hourly files.
Data
The weather data consists of multiple variables such as temperature, wind speed, precipitation rate, etc.
A weather variable snapshot is a matrix of 4096 x 8192 data points. So one snapshot is a uniformly distributed grid of 33,554,432 data points that covers the entire globe. You can imagine a 8K TV screen where pixels are data points. The plots below are created from 33,554,432 data points like a picture on an 8K TV screen is created from pixels. Please, note the projection issue was left out for simplicity.
Data usage
Most of our clients use weather data as a variable in their linear regression model and other machine learning models. Therefore, in most cases they are interested in historical data for a specific location. For instance, the question would be “What was the temperature in London for the last 5 years?” rather than “What was the temperature on the Earth yesterday at 1pm UTC?”. Probably, the last one would be asked only by an alien :D
Problem
Ingesting data in hourly structure is very efficient because the system just needs to create a new set of files for that hour when a new batch of data arrives. It doesn’t need to deal with concurrent modification and synchronization problems. However, data retrieval from these hourly files becomes less efficient as the dataset grows. For example, to get a month of data for one geographical location the system has to read up to 24*31=744 files.
Solution
Data organization
To improve data retrieval speed for a request asking for data for a large time period, we had to reorganize the data from hourly to monthly format. So, instead of storing data by hours, the data will be stored by months and spatially partitioned.
In other words, we transformed the file structure from
to
In this way, to retrieve June 2020 for one geographical location the monthly structure requires 720(24 hours * 30 days) times less file reads.
Partitioning
To spatially partition data we use the Apache Spark repartitionByRange method where the partition column is the 13 most significant bits of a geohash, calculated from data point coordinates (lat, lon). This means it creates 2¹³ = 8,192 partitions. We empirically found that 8,192 is the optimal number of partitions for our needs. We have decided to partition the data by geohash because it is efficient (simple calculation) and it also preserves uniform distribution for the partitions (every monthly file contains the same number of data points).
The python script below reads hourly files and reorganizes data into monthly files. The script could be run locally as well as on IBM Analytics Engine.
repartition.py
#!/usr/bin/env python3
import argparse
from enum import Enum
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
BITS_NUM = 64
def get_partition_id(grdipoint_id: int) -> str:
binary_geohash = format(grdipoint_id, "b").zfill(BITS_NUM)
truncated = binary_geohash[0:BITS_DEPTH]
return truncated
def repartition(spark: SparkSession, params: dict):
geohash_udf = udf(lambda geohash: get_partition_id(geohash))
df = spark.read.format(params["input"]).load(params["source"])
# 2**13 == 8192
df.repartitionByRange(2 ** BITS_DEPTH, geohash_udf(col("geohash"))) \
.write \
.mode(params["mode"]) \
.format(params["output"]) \
.save(params["dest"])
However, one month of data in the hourly structure is ~500 GB (e.g. June 2020 is ~540 GB). Therefore, it is impossible to complete partitioning in reasonable time with a single machine which brings us to a cluster-based solution.
Partitioning at Scale
IBM Cloud Analytics Engine provides Apache Spark as SaaS. In order to use it, we would need to perform the following steps:
- Provision IBM Cloud COS Bucket and IBM Cloud Analytics Engine.
- Create COS bucket and link it as the home bucket for the Serverless Analytics Engine instance.
- Upload the python script into the home bucket. It could be a different bucket, but for convenience we use the same bucket.
- Submit spark application through the Analytics Engine API and refer to the python script. The application should have access to the python script.
IBM Cloud Analytics Engine and IBM Cloud COS are provisioned with terraform:
// The IAE home bucket and application files are stored
resource "ibm_cos_bucket" "analytics-engine-home-bucket" {
bucket_name = "analytics-engine-home-bucket"
resource_instance_id = module.hdat_iae_object_storage.resource_id
region_location = "us-south"
storage_class = "standard"
depends_on = // ibm cos resouce initialization
}
resource "ibm_resource_instance" "iae_instance" {
name = "analytics-engine"
plan = "standard-serverless-spark"
location = "us-south"
service = "ibmanalyticsengine"
resource_group_id = data.ibm_resource_group.default.id
depends_on = ["analytics-engine-home-bucket"]
parameters_json = jsonencode({
default_runtime = {
spark_version = "3.1"
}
instance_home = {
region = "us-south"
endpoint = "https://s3.us-south.cloud-object-storage.appdomain.cloud"
hmac_access_key = // cos_hmac_keys.access_key_id
hmac_secret_key = // cos_hmac_keys.secret_access_key
}
})
tags = module.metadata.tag_list
timeouts {
create = "30m"
update = "15m"
delete = "15m"
}
}
At the beginning of each month the ingestion system submits repartition request for the previous month:
@Scheduled(cron = "${hod.ingest.archive.repartition-schedule}")
public void repartitionLastMonth() {
YearMonth lastMonth = YearMonth.now().minusMonths(1);
MonthlyRepartition repartition = repartitionService.repartitionByRange(lastMonth, sourceBucket, targetBucket, apiKey);
log.info("Started monthly repartition of {}", lastMonth);
}
The uploaded python script in IBM Cloud COS:
The repartition of one month from hourly to monthly structure takes ~3 hours with 12 executors where each executor has 6 cores and 48 GB memory, driver has 8 cores and 16 GB memory.
Total IAE cost is (12 * 6 Cores + 8 Cores) * ($0.1475 USD/Cores-Hours * 3 Hours) + (12*48GB + 16GB) * $0.014 USD /Gigabyte-Hours * 3 Hours) = $35.4 + $24.864 = $74.6
Conclusion
- Data partitioning from hourly to monthly structure allows us to reduce file read operations by up to 744 times, greatly speeding up the data retrieval process.
- The combination of the repartitionByRange method and geohash provides an efficient way to have uniformly distributed partitions.
- IBM Cloud Analytics Engine provides a cost efficient way to process a large volume of data on demand.