Thermal Images and Sensor Data From Edge to Kafka to Iceberg and Beyond
Edge Imaging, Sensors, Apache Kafka, Apache NiFi, MiNiFi, Apache Iceberg
In another example, I am pushing sensor values to Kafka and Iceberg.
Let’s walk through the demo.
Data begins on our devices which we consume with MiNiFi Java agents.
NiFi receives the MiNiFi data over HTTPS. We then send the data raw to Kafka iot topic.
Another NiFi flow consumes that Kafka record, enriches and transforms it. It then sends the data to Kafka topics as JSON and AVRO. At the same time writing that data to an Apache Iceberg table.
We can monitor the events as the enter Kafka topics.
We can see that our data has arrived.
The Flink SQL job is then submitted and we can monitor it as it runs its aggregations and insert. All we had to do is write a simple INSERT statement over our aggregation SQL.
Apache Iceberg Table
CREATE TABLE icesensors
(
sensor_id INT,
sensor_ts BIGINT,
sensor_0 DOUBLE,
sensor_1 DOUBLE,
sensor_2 DOUBLE,
sensor_3 DOUBLE,
sensor_4 DOUBLE,
sensor_5 DOUBLE,
sensor_6 DOUBLE,
sensor_7 DOUBLE,
sensor_8 DOUBLE,
sensor_9 DOUBLE,
sensor_10 DOUBLE,
sensor_11 DOUBLE,
is_healthy INT,
PRIMARY KEY (sensor_id, sensor_ts)
)
STORED BY ICEBERG;
Schema
{
"type": "record",
"name": "inferredSchema",
"fields": [
{
"name": "sensor_id",
"type": "long",
"doc": "Type inferred from '50'"
},
{
"name": "sensor_ts",
"type": "long",
"doc": "Type inferred from '1702913893242577'"
},
{
"name": "is_healthy",
"type": "long",
"doc": "Type inferred from '0'"
},
{
"name": "response",
"type": {
"type": "record",
"name": "response",
"fields": [
{
"name": "result",
"type": "long",
"doc": "Type inferred from '0'"
}
]
},
"doc": "Type inferred from '{\"result\":0}'"
},
{
"name": "sensor_0",
"type": "long",
"doc": "Type inferred from '3'"
},
{
"name": "sensor_1",
"type": "long",
"doc": "Type inferred from '2'"
},
{
"name": "sensor_2",
"type": "long",
"doc": "Type inferred from '2'"
},
{
"name": "sensor_3",
"type": "long",
"doc": "Type inferred from '49'"
},
{
"name": "sensor_4",
"type": "long",
"doc": "Type inferred from '51'"
},
{
"name": "sensor_5",
"type": "long",
"doc": "Type inferred from '89'"
},
{
"name": "sensor_6",
"type": "long",
"doc": "Type inferred from '70'"
},
{
"name": "sensor_7",
"type": "long",
"doc": "Type inferred from '84'"
},
{
"name": "sensor_8",
"type": "long",
"doc": "Type inferred from '9'"
},
{
"name": "sensor_9",
"type": "long",
"doc": "Type inferred from '5'"
},
{
"name": "sensor_10",
"type": "long",
"doc": "Type inferred from '9'"
},
{
"name": "sensor_11",
"type": "long",
"doc": "Type inferred from '7'"
}
]
}
Flink SQL Tables
CREATE TABLE `ssb`.`ssb_default`.`iotenriched` (
`sensor_id` BIGINT,
`sensor_ts` BIGINT,
`is_healthy` BIGINT,
`response` ROW<`result` BIGINT>,
`sensor_0` BIGINT,
`sensor_1` BIGINT,
`sensor_2` BIGINT,
`sensor_3` BIGINT,
`sensor_4` BIGINT,
`sensor_5` BIGINT,
`sensor_6` BIGINT,
`sensor_7` BIGINT,
`sensor_8` BIGINT,
`sensor_9` BIGINT,
`sensor_10` BIGINT,
`sensor_11` BIGINT,
`event_time` AS TO_TIMESTAMP_LTZ(`sensor_ts`, 3),
WATERMARK FOR `event_time` AS `event_time` - INTERVAL '3' SECOND
) WITH (
'scan.transform.js.code' = '// parse the JSON record
var parsedVal = JSON.parse(record.value);
// Convert sensor_ts from micro to milliseconds
parsedVal[''sensor_ts''] = Math.round(parsedVal[''sensor_ts'']/1000);
// serialize output as JSON
JSON.stringify(parsedVal);',
'deserialization.failure.policy' = 'ignore_and_log',
'properties.request.timeout.ms' = '120000',
'format' = 'json',
'properties.bootstrap.servers' = 'cdp.nip.io:9092',
'connector' = 'kafka',
'properties.transaction.timeout.ms' = '900000',
'topic' = 'iot_enriched',
'scan.transform.js.b64.encoded' = 'false',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest',
'properties.group.id' = 'ssb-iot-flink-1'
);
CREATE TABLE `sensor6_stats` (
`device_id` BIGINT,
`windowEnd` TIMESTAMP(3) NOT NULL,
`sensorCount` BIGINT NOT NULL,
`sensorSum` BIGINT,
`sensorAverage` FLOAT,
`sensorMin` BIGINT,
`sensorMax` BIGINT,
`sensorGreaterThan60` INT NOT NULL
) WITH (
'connector' = 'kafka: Local Kafka',
'format' = 'json',
'scan.startup.mode' = 'group-offsets',
'topic' = 'sensor6_stats'
);
Create Apache Iceberg Table in Apache Flink SQL
CREATE TABLE `icebergsensors` (
sensor_id INT,
sensor_ts BIGINT,
sensor_0 DOUBLE,
sensor_1 DOUBLE,
sensor_2 DOUBLE,
sensor_3 DOUBLE,
sensor_4 DOUBLE,
sensor_5 DOUBLE,
sensor_6 DOUBLE,
sensor_7 DOUBLE,
sensor_8 DOUBLE,
sensor_9 DOUBLE,
sensor_10 DOUBLE,
sensor_11 DOUBLE,
is_healthy INT
) WITH (
'catalog-database' = 'default',
'catalog-name' = 'hive',
'catalog-table' = 'icesensors',
'connector' = 'iceberg',
'catalog-type' = 'hive',
'uri'='thrift://localhost:9083',
'clients'='5',
'property-version'='1',
'warehouse'='hdfs://cdp.44.216.100.85.nip.io:8020/warehouse/tablespace/external/hive/icesensors'
);
Insert Special Records Into Our New Table and Topic
INSERT INTO sensor6_stats
SELECT
sensor_id as device_id,
HOP_END(event_time, INTERVAL '1' SECOND, INTERVAL '30' SECOND) as windowEnd,
count(*) as sensorCount,
sum(sensor_6) as sensorSum,
avg(cast(sensor_6 as float)) as sensorAverage,
min(sensor_6) as sensorMin,
max(sensor_6) as sensorMax,
sum(case when sensor_6 > 70 then 1 else 0 end) as sensorGreaterThan60
FROM iotenriched
GROUP BY
sensor_id,
HOP(event_time, INTERVAL '1' SECOND, INTERVAL '30' SECOND)
Apache Iceberg Query
select * from icesensors
order by sensor_ts desc