Thermal Images and Sensor Data From Edge to Kafka to Iceberg and Beyond

Tim Spann
Cloudera
Published in
6 min readDec 18, 2023

Edge Imaging, Sensors, Apache Kafka, Apache NiFi, MiNiFi, Apache Iceberg

In another example, I am pushing sensor values to Kafka and Iceberg.

Let’s walk through the demo.

Data begins on our devices which we consume with MiNiFi Java agents.

NiFi Flows

NiFi receives the MiNiFi data over HTTPS. We then send the data raw to Kafka iot topic.

Another NiFi flow consumes that Kafka record, enriches and transforms it. It then sends the data to Kafka topics as JSON and AVRO. At the same time writing that data to an Apache Iceberg table.

We can monitor the events as the enter Kafka topics.

Kafka events are also notified to Atlas

We can see that our data has arrived.

The Flink SQL job is then submitted and we can monitor it as it runs its aggregations and insert. All we had to do is write a simple INSERT statement over our aggregation SQL.

SQL Stream Builder (Flink SQL) for Insert
Flink Dashboard

Apache Iceberg Table

CREATE TABLE icesensors
(
sensor_id INT,
sensor_ts BIGINT,
sensor_0 DOUBLE,
sensor_1 DOUBLE,
sensor_2 DOUBLE,
sensor_3 DOUBLE,
sensor_4 DOUBLE,
sensor_5 DOUBLE,
sensor_6 DOUBLE,
sensor_7 DOUBLE,
sensor_8 DOUBLE,
sensor_9 DOUBLE,
sensor_10 DOUBLE,
sensor_11 DOUBLE,
is_healthy INT,
PRIMARY KEY (sensor_id, sensor_ts)
)
STORED BY ICEBERG;

Schema

{
"type": "record",
"name": "inferredSchema",
"fields": [
{
"name": "sensor_id",
"type": "long",
"doc": "Type inferred from '50'"
},
{
"name": "sensor_ts",
"type": "long",
"doc": "Type inferred from '1702913893242577'"
},
{
"name": "is_healthy",
"type": "long",
"doc": "Type inferred from '0'"
},
{
"name": "response",
"type": {
"type": "record",
"name": "response",
"fields": [
{
"name": "result",
"type": "long",
"doc": "Type inferred from '0'"
}
]
},
"doc": "Type inferred from '{\"result\":0}'"
},
{
"name": "sensor_0",
"type": "long",
"doc": "Type inferred from '3'"
},
{
"name": "sensor_1",
"type": "long",
"doc": "Type inferred from '2'"
},
{
"name": "sensor_2",
"type": "long",
"doc": "Type inferred from '2'"
},
{
"name": "sensor_3",
"type": "long",
"doc": "Type inferred from '49'"
},
{
"name": "sensor_4",
"type": "long",
"doc": "Type inferred from '51'"
},
{
"name": "sensor_5",
"type": "long",
"doc": "Type inferred from '89'"
},
{
"name": "sensor_6",
"type": "long",
"doc": "Type inferred from '70'"
},
{
"name": "sensor_7",
"type": "long",
"doc": "Type inferred from '84'"
},
{
"name": "sensor_8",
"type": "long",
"doc": "Type inferred from '9'"
},
{
"name": "sensor_9",
"type": "long",
"doc": "Type inferred from '5'"
},
{
"name": "sensor_10",
"type": "long",
"doc": "Type inferred from '9'"
},
{
"name": "sensor_11",
"type": "long",
"doc": "Type inferred from '7'"
}
]
}

Flink SQL Tables

CREATE TABLE `ssb`.`ssb_default`.`iotenriched` (
`sensor_id` BIGINT,
`sensor_ts` BIGINT,
`is_healthy` BIGINT,
`response` ROW<`result` BIGINT>,
`sensor_0` BIGINT,
`sensor_1` BIGINT,
`sensor_2` BIGINT,
`sensor_3` BIGINT,
`sensor_4` BIGINT,
`sensor_5` BIGINT,
`sensor_6` BIGINT,
`sensor_7` BIGINT,
`sensor_8` BIGINT,
`sensor_9` BIGINT,
`sensor_10` BIGINT,
`sensor_11` BIGINT,
`event_time` AS TO_TIMESTAMP_LTZ(`sensor_ts`, 3),
WATERMARK FOR `event_time` AS `event_time` - INTERVAL '3' SECOND
) WITH (
'scan.transform.js.code' = '// parse the JSON record
var parsedVal = JSON.parse(record.value);
// Convert sensor_ts from micro to milliseconds
parsedVal[''sensor_ts''] = Math.round(parsedVal[''sensor_ts'']/1000);
// serialize output as JSON
JSON.stringify(parsedVal);',
'deserialization.failure.policy' = 'ignore_and_log',
'properties.request.timeout.ms' = '120000',
'format' = 'json',
'properties.bootstrap.servers' = 'cdp.nip.io:9092',
'connector' = 'kafka',
'properties.transaction.timeout.ms' = '900000',
'topic' = 'iot_enriched',
'scan.transform.js.b64.encoded' = 'false',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest',
'properties.group.id' = 'ssb-iot-flink-1'
);

CREATE TABLE `sensor6_stats` (
`device_id` BIGINT,
`windowEnd` TIMESTAMP(3) NOT NULL,
`sensorCount` BIGINT NOT NULL,
`sensorSum` BIGINT,
`sensorAverage` FLOAT,
`sensorMin` BIGINT,
`sensorMax` BIGINT,
`sensorGreaterThan60` INT NOT NULL
) WITH (
'connector' = 'kafka: Local Kafka',
'format' = 'json',
'scan.startup.mode' = 'group-offsets',
'topic' = 'sensor6_stats'
);

Create Apache Iceberg Table in Apache Flink SQL

CREATE TABLE  `icebergsensors` (
sensor_id INT,
sensor_ts BIGINT,
sensor_0 DOUBLE,
sensor_1 DOUBLE,
sensor_2 DOUBLE,
sensor_3 DOUBLE,
sensor_4 DOUBLE,
sensor_5 DOUBLE,
sensor_6 DOUBLE,
sensor_7 DOUBLE,
sensor_8 DOUBLE,
sensor_9 DOUBLE,
sensor_10 DOUBLE,
sensor_11 DOUBLE,
is_healthy INT
) WITH (
'catalog-database' = 'default',
'catalog-name' = 'hive',
'catalog-table' = 'icesensors',
'connector' = 'iceberg',
'catalog-type' = 'hive',
'uri'='thrift://localhost:9083',
'clients'='5',
'property-version'='1',
'warehouse'='hdfs://cdp.44.216.100.85.nip.io:8020/warehouse/tablespace/external/hive/icesensors'
);

Insert Special Records Into Our New Table and Topic

INSERT INTO sensor6_stats
SELECT
sensor_id as device_id,
HOP_END(event_time, INTERVAL '1' SECOND, INTERVAL '30' SECOND) as windowEnd,
count(*) as sensorCount,
sum(sensor_6) as sensorSum,
avg(cast(sensor_6 as float)) as sensorAverage,
min(sensor_6) as sensorMin,
max(sensor_6) as sensorMax,
sum(case when sensor_6 > 70 then 1 else 0 end) as sensorGreaterThan60
FROM iotenriched
GROUP BY
sensor_id,
HOP(event_time, INTERVAL '1' SECOND, INTERVAL '30' SECOND)

Apache Iceberg Query

select * from icesensors
order by sensor_ts desc

RESOURCES

FLaNK-Bing

--

--

Tim Spann
Cloudera

Principal Developer Advocate, Zilliz. Milvus, Attu, Towhee, GenAI, Big Data, IoT, Deep Learning, Streaming, Machine Learning. https://www.datainmotion.dev/