select COALESCE(location,aircraftweather.station_id,'?') || ' '
|| cast(adsb.lat as string) || ',' ||
cast(adsb.lon as string) as Location,
COALESCE(adsb.flight,'-','-') || ' ' ||
COALESCE(adsb.hex, '-','-') as FlightNum,
cast(adsb.alt_baro as string) || '|' ||
cast(adsb.alt_geom as string) as Altitude,
adsb.gs as Speed,
aircraftweather.temperature_string ||
aircraftweather.weather as Weather,
adsb.mach,
adsb.baro_rate,
adsb.nav_heading,
adsb.squawk, adsb.category,
aircraftweather.observation_time,
aircraftweather.temperature_string,
aircraftweather.wind_string,
aircraftweather.dewpoint_string,
(adsb.uuid || '-' || aircraftweather.uuid
|| '-' || adsb.flight || '-' ||
cast(adsb.lat as string) || '-' ||
cast(adsb.lon as string) ) as jointkey
FROM `schemareg1`.`default_database`.`adsb` , aircraftweather
WHERE adsb.flight is not null
AND (adsb.lat > aircraftweather.latitude - 0.3)
and (adsb.lat < aircraftweather.latitude + 0.3)
and (adsb.lon < aircraftweather.longitude + 0.3)
and (adsb.lon > aircraftweather.longitude - 0.3)
The magic of Flink SQL is being able to join live Kafka topics on whatever matches, I am doing a rough match on latitude and longitude.
Another cool features of Flink SQL (powered by Apache Calcite) is COALESCE to provide a null replacement if location is null and then I concatenate a number of fields together as a field.
COALESCE(location,aircraftweather.station_id,'?') || ' '
|| cast(adsb.lat as string) || ',' ||
cast(adsb.lon as string) as Location,
Sometimes you need something a different type (usually a String as a Number).
cast(adsb.lat as string)
CAST will convert you value to another if possible.
https://www.slideshare.net/bunkertor/jconworld-continuous-sql-with-kafka-and-flink
Other SQL examples:
Left Join on Latitude and Longitude Exact Match
select `schemareg1`.`default_database`.`adsb`.hex as ICAO,
`schemareg1`.`default_database`.`adsb`.flight as IDENT,
adsb.baro_rate, adsb.category, adsb.mach, adsb.nav_heading,
adsb.squawk,
adsb.alt_baro as altitudefeet,
adsb.alt_geom as gaaltitudefeet,
adsb.gs as groundspeed,
weather1.longitude, weather1.latitude, weather1.observation_time,
weather1.temperature_string, weather1.wind_string, weather1.dewpoint_string, weather1.weather
from `schemareg1`.`default_database`.`adsb` LEFT JOIN
weather1 on `schemareg1`.`default_database`.`adsb`.lat = weather1.latitude
and adsb.lon = weather1.longitude
and adsb.flight is not null
Maximum — Minimum — Average with Group by on ADSB Kafka Topic
select max(alt_baro) as MaxAltitudeFeet,
min(alt_baro) as MinAltitudeFeet,
avg(alt_baro) as AvgAltitudeFeet,
max(alt_geom) as MaxGAltitudeFeet,
min(alt_geom) as MinGAltitudeFeet,
avg(alt_geom) as AvgGAltitudeFeet,
max(gs) as MaxGroundSpeed,
min(gs) as MinGroundSpeed,
avg(gs) as AvgGroundSpeed,
count(alt_baro) as RowCount,
hex as ICAO, flight as IDENT
from `schemareg1`.`default_database`.`adsb`
where flight is not null
group by flight, hex;
Flink SQL Table for Aircraft Weather
CREATE TABLE `ssb`.`Meetups`.`aircraftweather` (
`uuid` VARCHAR(2147483647),
`ts` BIGINT,
`credit` VARCHAR(2147483647),
`credit_URL` VARCHAR(2147483647),
`image` ROW<`url` VARCHAR(2147483647), `title` VARCHAR(2147483647), `link` VARCHAR(2147483647)>,
`suggested_pickup` VARCHAR(2147483647),
`suggested_pickup_period` BIGINT,
`location` VARCHAR(2147483647),
`station_id` VARCHAR(2147483647),
`latitude` DOUBLE,
`longitude` DOUBLE,
`observation_time` VARCHAR(2147483647),
`observation_time_rfc822` VARCHAR(2147483647),
`weather` VARCHAR(2147483647),
`temperature_string` VARCHAR(2147483647),
`temp_f` DOUBLE,
`temp_c` DOUBLE,
`relative_humidity` BIGINT,
`wind_string` VARCHAR(2147483647),
`wind_dir` VARCHAR(2147483647),
`wind_degrees` BIGINT,
`wind_mph` DOUBLE,
`wind_kt` BIGINT,
`pressure_in` DOUBLE,
`dewpoint_string` VARCHAR(2147483647),
`dewpoint_f` DOUBLE,
`dewpoint_c` DOUBLE,
`visibility_mi` DOUBLE,
`icon_url_base` VARCHAR(2147483647),
`two_day_history_url` VARCHAR(2147483647),
`icon_url_name` VARCHAR(2147483647),
`ob_url` VARCHAR(2147483647),
`disclaimer_url` VARCHAR(2147483647),
`copyright_url` VARCHAR(2147483647),
`privacy_policy_url` VARCHAR(2147483647),
`eventTimeStamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp',
WATERMARK FOR `eventTimeStamp` AS `eventTimeStamp` - INTERVAL '3' SECOND
) WITH (
'deserialization.failure.policy' = 'ignore_and_log',
'properties.request.timeout.ms' = '120000',
'format' = 'json',
'properties.bootstrap.servers' = 'kafka:9092',
'connector' = 'kafka',
'properties.transaction.timeout.ms' = '900000',
'topic' = 'weather',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest',
'properties.group.id' = 'aircraftweatherflink'
)
Before we can get the data for aircraft we have to use Apache NiFi to clean it up and make it ready for streaming to it’s Kafka topic.
Our first step is to break down with SplitRecord to just one record so we can parse it as it is a giant JSON file. We then add some metadata with UpdateAttribute.
An important step is to use EvaluateJSONPath to get the time and the messages from this group.
We then use SplitJson to split out each aircraft’s records.
We use UpdateRecord to add some important metadata fields: adsbnow, ts and uuid. These are timestamps and the unique id of the record
We use QueryRecord to filter out any records with latitude or longitude. Those are not good records for us and can’t be located properly.
Finally we use EvaluateJsonPath to grab all the fields we want to post to Slack.
At the same time we send our JSON record to the Kafka topic adsb so we can do real-time analytics with Apache Flink.
Example JSON Data
{
"hex" : "a72b6b",
"alt_baro" : 24125,
"version" : 2,
"nic_baro" : 1,
"nac_p" : 9,
"sil" : 3,
"sil_type" : "perhour",
"gva" : 2,
"sda" : 2,
"mlat" : [ ],
"tisb" : [ ],
"messages" : 2319,
"seen" : 0.2,
"rssi" : -25.7,
"flight" : "JBU1601 ",
"alt_geom" : 24350,
"gs" : 347.0,
"track" : 257.7,
"geom_rate" : 3072,
"category" : "A3",
"nac_v" : 1,
"ias" : 276,
"tas" : 396,
"mach" : 0.648,
"track_rate" : 0.03,
"roll" : 0.2,
"mag_heading" : 270.2,
"baro_rate" : 3072,
"squawk" : "7172",
"emergency" : "none",
"nav_qnh" : 1013.6,
"nav_altitude_mcp" : 28000,
"nav_heading" : 0.0,
"lat" : 40.209682,
"lon" : -74.908468,
"nic" : 8,
"rc" : 186,
"seen_pos" : 0.8,
"nav_altitude_fms" : null,
"nav_modes" : null,
"ts" : "1700583205061",
"adsbnow" : "1.7005832046E9",
"uuid" : "546e2b49-bf3a-4298-8369-bd5adf88f976"
}
We can now see things run and see there can be 95,407 messages in a single batch. Provenance Events and Lineage in Apache NiFi are amazing for building, testing, debugging and auditing your results. Anything in provenance can be automatically sent somewhere with ReportingTasks like QueryNiFiReportingTask, SiteToSiteProvenanceReportingTask and AzureLogAnalyticsProvenanceReportingTask.