Real-Time Flight Analytics

Tim Spann
Cloudera
Published in
6 min readDec 8, 2023

More than one source of data in real-time — Join All The Things

Bing AI
select COALESCE(location,aircraftweather.station_id,'?') || ' ' 
|| cast(adsb.lat as string) || ',' ||
cast(adsb.lon as string) as Location,
COALESCE(adsb.flight,'-','-') || ' ' ||
COALESCE(adsb.hex, '-','-') as FlightNum,
cast(adsb.alt_baro as string) || '|' ||
cast(adsb.alt_geom as string) as Altitude,
adsb.gs as Speed,
aircraftweather.temperature_string ||
aircraftweather.weather as Weather,
adsb.mach,
adsb.baro_rate,
adsb.nav_heading,
adsb.squawk, adsb.category,
aircraftweather.observation_time,
aircraftweather.temperature_string,
aircraftweather.wind_string,
aircraftweather.dewpoint_string,
(adsb.uuid || '-' || aircraftweather.uuid
|| '-' || adsb.flight || '-' ||
cast(adsb.lat as string) || '-' ||
cast(adsb.lon as string) ) as jointkey
FROM `schemareg1`.`default_database`.`adsb` , aircraftweather
WHERE adsb.flight is not null
AND (adsb.lat > aircraftweather.latitude - 0.3)
and (adsb.lat < aircraftweather.latitude + 0.3)
and (adsb.lon < aircraftweather.longitude + 0.3)
and (adsb.lon > aircraftweather.longitude - 0.3)

The magic of Flink SQL is being able to join live Kafka topics on whatever matches, I am doing a rough match on latitude and longitude.

Another cool features of Flink SQL (powered by Apache Calcite) is COALESCE to provide a null replacement if location is null and then I concatenate a number of fields together as a field.

COALESCE(location,aircraftweather.station_id,'?') || ' ' 
|| cast(adsb.lat as string) || ',' ||
cast(adsb.lon as string) as Location,

Sometimes you need something a different type (usually a String as a Number).

cast(adsb.lat as string)

CAST will convert you value to another if possible.

\

https://www.slideshare.net/bunkertor/jconworld-continuous-sql-with-kafka-and-flink

Other SQL examples:

Left Join on Latitude and Longitude Exact Match

select `schemareg1`.`default_database`.`adsb`.hex as ICAO, 
`schemareg1`.`default_database`.`adsb`.flight as IDENT,
adsb.baro_rate, adsb.category, adsb.mach, adsb.nav_heading,
adsb.squawk,
adsb.alt_baro as altitudefeet,
adsb.alt_geom as gaaltitudefeet,
adsb.gs as groundspeed,
weather1.longitude, weather1.latitude, weather1.observation_time,
weather1.temperature_string, weather1.wind_string, weather1.dewpoint_string, weather1.weather
from `schemareg1`.`default_database`.`adsb` LEFT JOIN
weather1 on `schemareg1`.`default_database`.`adsb`.lat = weather1.latitude
and adsb.lon = weather1.longitude
and adsb.flight is not null

Maximum — Minimum — Average with Group by on ADSB Kafka Topic

select max(alt_baro) as MaxAltitudeFeet, 
min(alt_baro) as MinAltitudeFeet,
avg(alt_baro) as AvgAltitudeFeet,
max(alt_geom) as MaxGAltitudeFeet,
min(alt_geom) as MinGAltitudeFeet,
avg(alt_geom) as AvgGAltitudeFeet,
max(gs) as MaxGroundSpeed,
min(gs) as MinGroundSpeed,
avg(gs) as AvgGroundSpeed,
count(alt_baro) as RowCount,
hex as ICAO, flight as IDENT
from `schemareg1`.`default_database`.`adsb`
where flight is not null
group by flight, hex;

Flink SQL Table for Aircraft Weather

CREATE TABLE `ssb`.`Meetups`.`aircraftweather` (
`uuid` VARCHAR(2147483647),
`ts` BIGINT,
`credit` VARCHAR(2147483647),
`credit_URL` VARCHAR(2147483647),
`image` ROW<`url` VARCHAR(2147483647), `title` VARCHAR(2147483647), `link` VARCHAR(2147483647)>,
`suggested_pickup` VARCHAR(2147483647),
`suggested_pickup_period` BIGINT,
`location` VARCHAR(2147483647),
`station_id` VARCHAR(2147483647),
`latitude` DOUBLE,
`longitude` DOUBLE,
`observation_time` VARCHAR(2147483647),
`observation_time_rfc822` VARCHAR(2147483647),
`weather` VARCHAR(2147483647),
`temperature_string` VARCHAR(2147483647),
`temp_f` DOUBLE,
`temp_c` DOUBLE,
`relative_humidity` BIGINT,
`wind_string` VARCHAR(2147483647),
`wind_dir` VARCHAR(2147483647),
`wind_degrees` BIGINT,
`wind_mph` DOUBLE,
`wind_kt` BIGINT,
`pressure_in` DOUBLE,
`dewpoint_string` VARCHAR(2147483647),
`dewpoint_f` DOUBLE,
`dewpoint_c` DOUBLE,
`visibility_mi` DOUBLE,
`icon_url_base` VARCHAR(2147483647),
`two_day_history_url` VARCHAR(2147483647),
`icon_url_name` VARCHAR(2147483647),
`ob_url` VARCHAR(2147483647),
`disclaimer_url` VARCHAR(2147483647),
`copyright_url` VARCHAR(2147483647),
`privacy_policy_url` VARCHAR(2147483647),
`eventTimeStamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp',
WATERMARK FOR `eventTimeStamp` AS `eventTimeStamp` - INTERVAL '3' SECOND
) WITH (
'deserialization.failure.policy' = 'ignore_and_log',
'properties.request.timeout.ms' = '120000',
'format' = 'json',
'properties.bootstrap.servers' = 'kafka:9092',
'connector' = 'kafka',
'properties.transaction.timeout.ms' = '900000',
'topic' = 'weather',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest',
'properties.group.id' = 'aircraftweatherflink'
)

Before we can get the data for aircraft we have to use Apache NiFi to clean it up and make it ready for streaming to it’s Kafka topic.

Over All Flow
end of Flow — Terminating in Alerts and Kafka messages

Our first step is to break down with SplitRecord to just one record so we can parse it as it is a giant JSON file. We then add some metadata with UpdateAttribute.

An important step is to use EvaluateJSONPath to get the time and the messages from this group.

We then use SplitJson to split out each aircraft’s records.

We use UpdateRecord to add some important metadata fields: adsbnow, ts and uuid. These are timestamps and the unique id of the record

We use QueryRecord to filter out any records with latitude or longitude. Those are not good records for us and can’t be located properly.

Finally we use EvaluateJsonPath to grab all the fields we want to post to Slack.

At the same time we send our JSON record to the Kafka topic adsb so we can do real-time analytics with Apache Flink.

Example JSON Data

{
"hex" : "a72b6b",
"alt_baro" : 24125,
"version" : 2,
"nic_baro" : 1,
"nac_p" : 9,
"sil" : 3,
"sil_type" : "perhour",
"gva" : 2,
"sda" : 2,
"mlat" : [ ],
"tisb" : [ ],
"messages" : 2319,
"seen" : 0.2,
"rssi" : -25.7,
"flight" : "JBU1601 ",
"alt_geom" : 24350,
"gs" : 347.0,
"track" : 257.7,
"geom_rate" : 3072,
"category" : "A3",
"nac_v" : 1,
"ias" : 276,
"tas" : 396,
"mach" : 0.648,
"track_rate" : 0.03,
"roll" : 0.2,
"mag_heading" : 270.2,
"baro_rate" : 3072,
"squawk" : "7172",
"emergency" : "none",
"nav_qnh" : 1013.6,
"nav_altitude_mcp" : 28000,
"nav_heading" : 0.0,
"lat" : 40.209682,
"lon" : -74.908468,
"nic" : 8,
"rc" : 186,
"seen_pos" : 0.8,
"nav_altitude_fms" : null,
"nav_modes" : null,
"ts" : "1700583205061",
"adsbnow" : "1.7005832046E9",
"uuid" : "546e2b49-bf3a-4298-8369-bd5adf88f976"
}

We can now see things run and see there can be 95,407 messages in a single batch. Provenance Events and Lineage in Apache NiFi are amazing for building, testing, debugging and auditing your results. Anything in provenance can be automatically sent somewhere with ReportingTasks like QueryNiFiReportingTask, SiteToSiteProvenanceReportingTask and AzureLogAnalyticsProvenanceReportingTask.

Bing AI

RESOURCES

--

--

Tim Spann
Cloudera

Principal Developer Advocate, Zilliz. Milvus, Attu, Towhee, GenAI, Big Data, IoT, Deep Learning, Streaming, Machine Learning. https://www.datainmotion.dev/