Parsing Weather Feeds to Add to Real-Time Streams

Tim Spann
4 min readSep 17, 2022

--

Apache Pulsar — Weather — Apache Flink — SQL — Continuos Analytics — Java

In the world of real-time streaming data, we often need to enrich the data events are flowing. In this example, we have Apache NiFi reading the entire 15-minute NOAA feed for all the weather stations in the United States. This is a single tarred, gzipped set of XML files that NiFi parses and converts into JSON that is streamed as events in the Apache Pulsar topic “persistent://public/default/weather”.

I have written a Java function in Pulsar that takes that topic as input and outputs enriched cleaned data as “persistent://public/default/aircraftweather”.

(Source Code)

Our function does not need to do too much.

public class WeatherFunction implements Function<byte[], Void> {// .... @Override
public Void process(byte[] input, Context context) {
if (input == null || context == null) {
return null;
}

WeatherParserService service = new WeatherParserService();
Weather weather = service.deserialize(input);

if (weather != null ) {
if (context.getLogger() != null
&&
context.getLogger().isDebugEnabled())
{
context.getLogger().debug(
MESSAGE_JSON +
weather.toString());
}
try {
context.newOutputMessage(
PERSISTENT_PUBLIC_DEFAULT,
JSONSchema.of(Weather.class))
.key(UUID.randomUUID().toString())
.property(LANGUAGE, JAVA)
.value(weather)
.send();
} catch (PulsarClientException ex) {
if (context.getLogger() != null) {
context.getLogger().error(
ERROR +
ex.getLocalizedMessage());
}
}
}
return null;
}

As we can see in the function we get in a byte[] of data which we convert into a JSON Schema. We don’t have a return (Void), since we are going to dynamically determine the output topic. This way we can send it to as many or no topics based on the data we receive. We could also send it to another sink such as a database or anything we have a Java library for including ML.

We have an easy way to integration testing with LocalRunner. We can send real data to the input topic to have it tested in this local running function.

Our next step is to deploy our function to a runtime. I like to stop it in case it is still running, then delete it. Then we can create the function from the JAR file and the class name.

bin/pulsar-admin functions stop --name Weather 
--namespace default --tenant public
bin/pulsar-admin functions delete --name Weather
--namespace default --tenant public
bin/pulsar-admin functions create
--auto-ack true
--jar /weather-1.0.jar
--classname "dev.pulsarfunction.weather.WeatherFunction"
--dead-letter-topic "persistent://public/default/aircraftweatherdead"
--inputs "persistent://public/default/weather"
--log-topic "persistent://public/default/aircraftweatherlog"
--name Weather --namespace default --tenant public
--max-message-retries 5

I wanted this data cleaned up and prepared to be used in joining with Aircraft ADS-B data via near latitude and longitude points. We get the forecast of what is probably the closest airport weather station. A more precise geolocation algorithm is in order, but we leave that for future improvements.

(Join SQL)

select COALESCE(location,station_id,'?') || ' ' || 
cast(lat as string) || ',' ||
cast(lon as string) as PlaneLocation,
cast(latitude as string)|| ',' ||
cast(longitude as string) as WeatherLocation,
COALESCE(flight,'-','-') || ' * ' ||
COALESCE(hex, '-','-') as FlightNum,
cast(alt_baro as string) || ' / ' ||
cast(alt_geom as string) as Altitude,
gs as Speed,
temperature_string || weather as Weather,
mach, pressure_string, dewpoint_string,
heat_index_string, wind_string,
baro_rate,
NOW() as now
FROM aircraft /*+ OPTIONS('scan.startup.mode' = 'earliest') */,
aircraftweather /*+ OPTIONS('scan.startup.mode' = 'earliest') */
WHERE (aircraft.lat > aircraftweather.latitude - 0.1)
and (aircraft.lat < aircraftweather.latitude + 0.1)
and (aircraft.lon < aircraftweather.longitude + 0.1)
and (aircraft.lon > aircraftweather.longitude - 0.1);

We are using the Flink SQL Hint to use the earliest events in the topic, there are other ways to do this.

For this to work, you need to set it in your Flink SQL session.

set table.dynamic-table-options.enabled = true;

An example of the code can be seen in this short video.

--

--

Tim Spann

Principal Developer Advocate, Zilliz. Milvus, Attu, Towhee, GenAI, Big Data, IoT, Deep Learning, Streaming, Machine Learning. https://www.datainmotion.dev/