Google DataFlow to BigTable: Optimising AI

Nicholas Ord
Google Cloud - Community
11 min readJan 11, 2024

Optimising AI training data on Google Cloud is really a function of getting pipelines streamlined in memstore through PubSub into DataFlow before it even gets to BigTable or BigQuery.

Text and image processing for AI are growing at an amazing rate and data needs to be curated with human intervention when trained directly from the internet.

But text and image are only a small part of the AI landscape. Here we show optimisation techniques for industrial data on GCP.

Make your data engineers happy :)

Data sample from BigQuery (2 seconds, device sn obfuscated)

sn012395695xxxxxxxxx 2024-01-09 13:02:35.305000 10 light array 4 "{
""measurementRGBW"": {
""r"": ""17042.0"",
""g"": ""0.0"",
""b"": ""17040.0"",
""w"": ""0.0""
}
}"
sn012395695xxxxxxxxx 2024-01-09 13:02:35.573000 33 modbusMeter array 26 [233.7255,233.74023,233.72798,0.0,0.0,0.0,0.0,0.0,0.22018927,0.22043875,50.030506,21.28782,-0.038750924,0.033866644,0.05146439,NaN,NaN,1.3419399,1.3419399,-1.144221,3.644008,3.642516,3.64363,0.0,0.0,107.32218]
sn012395695xxxxxxxxx 2024-01-09 13:02:35.783000 13 emiFrequency 50,03 1
sn012395695xxxxxxxxx 2024-01-09 13:02:35.899000 13 emiFft array 20 "{
""measurementFFT"": {
""magnitudes"": [""0.0"", ""0.0"", ""0.0"", ""0.0"", ""0.0"", ""0.0"", ""9.596029"", ""0.0"", ""0.0"", ""0.0"", ""0.0"", ""0.0"", ""0.0"", ""0.0"", ""0.0"", ""0.0"", ""0.0"", ""0.0"", ""0.0"", ""0.6645943""],
""frequency_resolution"": ""7.81"",
""time_window"": ""1024.0""
}
}"
sn012395695xxxxxxxxx 2024-01-09 13:02:36.305000 24 fHumidity 66,07037 1
sn012395695xxxxxxxxx 2024-01-09 13:02:36.305000 8 fPressure 964,76 1
sn012395695xxxxxxxxx 2024-01-09 13:02:36.305000 22 eco2 400 1
sn012395695xxxxxxxxx 2024-01-09 13:02:36.305000 22 tvoc 3 1
sn012395695xxxxxxxxx 2024-01-09 13:02:36.305000 24 fTemperature 22,063828 1
sn012395695xxxxxxxxx 2024-01-09 13:02:36.783000 13 emiFrequency 50,01 1
sn012395695xxxxxxxxx 2024-01-09 13:02:36.923000 13 emiFft array 20 "{
""measurementFFT"": {
""magnitudes"": [""0.0"", ""0.0"", ""0.0"", ""0.0"", ""0.0"", ""0.0"", ""9.767947"", ""0.0"", ""0.0"", ""0.0"", ""0.0"", ""0.0"", ""0.0"", ""0.0"", ""0.0"", ""0.0"", ""0.0"", ""0.0"", ""0.0"", ""0.5804497""],
""frequency_resolution"": ""7.81"",
""time_window"": ""1024.0""
}
}"
sn012395695xxxxxxxxx 2024-01-09 13:02:37.305000 22 eco2 400 1
sn012395695xxxxxxxxx 2024-01-09 13:02:37.305000 22 tvoc 3 1
sn012395695xxxxxxxxx 2024-01-09 13:02:37.305000 24 fHumidity 66,10852 1
sn012395695xxxxxxxxx 2024-01-09 13:02:37.305000 8 fPressure 964,71 1
sn012395695xxxxxxxxx 2024-01-09 13:02:37.305000 24 fTemperature 22,056274 1
sn012395695xxxxxxxxx 2024-01-09 13:02:37.572000 33 modbusMeter array 26 [233.37244,233.39503,233.38272,0.0,0.0,0.0,0.0,0.0,0.22127527,0.22143851,50.028717,21.28782,-0.039031968,0.033813953,0.051641826,NaN,NaN,1.345221,1.345222,-1.1543154,3.7023478,3.6994975,3.702797,0.0,0.0,107.

Snapshot analysis:

5 things to get DataFlow right for AI:

from the left:

Time assigned for AI: most important part for AI curation

DataFlow pipe number: so we can go find the stream at any point in the data value chain in GCP later and measure latencies using server_times vs time (purple asset local time above)

Variables: the blue section where the assigned timstamps appear to be identical — its ok they are all read at the same time in bursts

Different variables mean DataFlow stream numbers will crinkle out just fine without DataFlow exceptions (which is an AI nightmare) as long as the pipe number matches

For info: the DPU (data processing unit) used for this demo is a twin core ARM processor with FPGA input fast reading and writing industrial data at microseconds: NTP refreshed to TXCO 2.5ppm / 2 hours which means close to GPS time reference accuracy (without drilling for GPS antennas and line of sight to the sky)

Arrays:

19k baud over old 1979 interfaces (still the majority of installed base equipment in industry) means a bursts of 26 measurements for DataFlow:

Meter: Schneider PM3255
Reading Index Register Parameter Units
0 3027 V L1-N Volts
1 3029 V L2-N Volts
2 3031 V L3-N Volts
3 3019 V L1-L2 Volts
4 3021 V L2-L3 Volts
5 3023 V L3-L1 Volts
6 2999 I L1 Amps
7 3001 I L2 Amps
8 3003 I L3 Amps
9 3005 I N Amps
10 3109 Frequency Hz
11 3131 Temperature °C
12 3067 S Total kVA
13 3059 P Total kW
14 3075 Q Total kVAR
15 3077 PF L1 N.A.
16 3079 PF L2 N.A.
17 3081 PF L3 N.A.
18 3083 PF Total N.A.
19 3107 Tangent ɸ Total N.A.
20 45119 THD V L1-N %
21 45121 THD V L2-N %
22 45123 THD V L3-N %
23 45099 THD I L1 %
24 45101 THD I L2 %
25 45103 THD I L3 %

Plus 20 magnetic B fields (as fast fourier transforms array) appear as “instant” in BigTable as well as the other sensors in parallel.

AI loves tiny wide parallel data.

It enrichens value and significantly reduces training times, from nine months to a few weeks.

Goal: zero data curation

  1. Ensure assigned time is in microseconds with tick structure (details below)
  2. Run firmware diagnostics to ensure the AI gets:

one timestamp

one value

from one variable

synced with all other DPUs (about 2000 per cluster)

Result: high octane AI fuel where data value > data cost

AI training data 1 month 
54 data points per sec
86400 seconds in a day
4665600 data points per day
9331200000 for a small city or factory per day (2000 devices)
2,79936E+11 1 month AI training set 30 days

That’s nine billion three hundred thirty-one million two hundred thousand data points per day for a typical city distributing renewable energy at scale or a large factory with robots and logistic.

Why so much Big Data so fast?

The basic measurement time interval is a 10/12 cycle (~200 msec) at 50/60 Hz and 150/180 cycles (~3 sec) at 50/60 Hz to hunt dips, swells and interruptions in industrial processes.

One industrial data training set is twenty-seven quadrillion nine hundred ninety-three trillion six hundred billion data points as long as it is ultra clean data (30 days vs 9 months) then throttle back to 28Mio / Month to retain the use cases low latency but at 1/10000 the insertion costs.

This is how AI makes money for industrial data.

AI Optimisation Flow:

Imagine a few rogue double values, missing timestamps or zeros with no variable hitting your cluster at that rate. Toxic data. Curating that much data (cloud side at scale) can cost millions $ unecessarily.

Step 1 — ensure high speed data sources are non toxic

Step 2

9 Billion data points a day is only needed while the AI is trained which, if the data is clean, takes just a few weeks not years.

Step 3

Once the AI is ready, throttle back data rates over the air via firmware update to 10⁴ magnitudes lower and let the AI do all the heavy lifting.

Step 4

After field verification the AI neural net is partitioned: half exported to the federated lossless section at the edge in SRAM for detecting individual asset anomalies (see below) and the other half as collective AI for devices per cluster.

M4 and M7 cores left, SRAM AI section on the right. Not shown is the FPGA section (about same size)

Below are terminal dumps from the industrial ARM cores which show the raw data structures needed before getting to GCP clusters.

For more info on GCP architechures see box 3 here Google Cloud Blog. I used J-Trace Pro for diagnostics on firmware before deployment.

In the next article we will look at how the GCP AI training is deployed and the resulting 10⁴ reduction to data rate while maintaining the data value

ARM M7 terminal time block 1 to DataFlow_write

(14:03:03.913) (7:11.699) D: Network device is connected
(14:03:03.913) (7:11.699) D: <LF><LF>serverManager: new report at 1704805383301
(14:03:03.913) (7:11.699) D: getFilesFromDatabase: 43 added since last get (linked list)
(14:03:03.913) (7:11.699) D: Stream 28 report 10 items, ts: 1704805354783, tick: 1000000
(14:03:03.913) (7:11.699) D: Stream 34 report 2 items, ts: 1704805363068, duration: 1024
(14:03:03.913) (7:11.699) D: Stream 7 report 10 items, ts: 1704805355305, tick: 1000000
(14:03:03.913) (7:11.699) D: Stream 8 report 10 items, ts: 1704805355305, tick: 1000000
(14:03:03.913) (7:11.699) D: Stream 10 report 10 items, ts: 1704805355305, tick: 1000000
(14:03:03.913) (7:11.699) D: Stream 11 report 10 items, ts: 1704805355305, tick: 1000000
(14:03:03.913) (7:11.699) D: Stream 12 report 10 items, ts: 1704805355305, tick: 1000000
(14:03:03.913) (7:11.699) D: Stream 30 report 2 items, ts: 1704805363568, tick: 1000000
(14:03:03.913) (7:11.699) D: Stream 34 report 2 items, ts: 1704805364092, duration: 1024
(14:03:03.913) (7:11.699) D: Stream 34 report 2 items, ts: 1704805365116, duration: 1024
(14:03:03.913) (7:11.699) D: Stream 30 report 2 items, ts: 1704805365567, tick: 1000000
(14:03:03.913) (7:11.699) D: Stream 34 report 2 items, ts: 1704805366140, duration: 1024
(14:03:03.913) (7:11.699) D: Stream 34 report 2 items, ts: 1704805367164, duration: 1024
(14:03:03.913) (7:11.699) D: Stream 30 report 2 items, ts: 1704805367565, tick: 1000000
(14:03:03.913) (7:11.699) D: Stream 34 report 2 items, ts: 1704805368188, duration: 1024
(14:03:03.913) (7:11.699) D: Stream 34 report 2 items, ts: 1704805369212, duration: 1024
(14:03:03.913) (7:11.699) D: Stream 30 report 2 items, ts: 1704805369564, tick: 1000000
(14:03:03.913) (7:11.699) D: Stream 34 report 2 items, ts: 1704805370236, duration: 1024
(14:03:03.913) (7:11.699) D: Stream 34 report 2 items, ts: 1704805371260, duration: 1024
(14:03:03.913) (7:11.699) D: Stream 30 report 2 items, ts: 1704805371562, tick: 1000000
(14:03:03.913) (7:11.699) D: Stream 34 report 2 items, ts: 1704805372284, duration: 1024
(14:03:03.913) (7:11.699) D: Stream 28 report 10 items, ts: 1704805364783, tick: 1000000
(14:03:03.913) (7:11.699) D: Stream 7 report 10 items, ts: 1704805365305, tick: 1000000
(14:03:03.913) (7:11.699) D: Stream 8 report 10 items, ts: 1704805365305, tick: 1000000
(14:03:03.913) (7:11.699) D: Stream 10 report 10 items, ts: 1704805365305, tick: 1000000
(14:03:03.913) (7:11.699) D: Stream 11 report 10 items, ts: 1704805365305, tick: 1000000
(14:03:03.913) (7:11.699) D: Stream 12 report 10 items, ts: 1704805365305, tick: 1000000
(14:03:03.913) (7:11.699) D: Stream 34 report 2 items, ts: 1704805373308, duration: 1024
(14:03:03.913) (7:11.699) D: Stream 30 report 2 items, ts: 1704805373562, tick: 1000000
(14:03:03.913) (7:11.699) D: Stream 34 report 2 items, ts: 1704805374332, duration: 1024
(14:03:03.913) (7:11.699) D: Stream 34 report 2 items, ts: 1704805375356, duration: 1024
(14:03:03.913) (7:11.699) D: Stream 30 report 2 items, ts: 1704805375561, tick: 1000000
(14:03:03.913) (7:11.699) D: Stream 34 report 2 items, ts: 1704805376380, duration: 1024
(14:03:03.913) (7:11.699) D: Stream 34 report 2 items, ts: 1704805377404, duration: 1024
(14:03:03.913) (7:11.699) D: Stream 30 report 2 items, ts: 1704805377560, tick: 1000000
(14:03:03.913) (7:11.699) D: Stream 34 report 2 items, ts: 1704805378428, duration: 1024
(14:03:03.913) (7:11.699) D: Stream 34 report 2 items, ts: 1704805379452, duration: 1024
(14:03:03.913) (7:11.699) D: Stream 30 report 2 items, ts: 1704805379577, tick: 1000000
(14:03:03.913) (7:11.699) D: Stream 34 report 2 items, ts: 1704805380476, duration: 1024
(14:03:03.913) (7:11.699) D: Stream 34 report 2 items, ts: 1704805381500, duration: 1024
(14:03:03.913) (7:11.699) D: Stream 30 report 2 items, ts: 1704805381575, tick: 1000000
(14:03:03.913) (7:11.699) D: getFilesFromDatabase: process 4124 bytes in 43 entries in 7 ms
(14:03:03.913) (7:11.699) D: GET_NET_INFO
(14:03:03.913) (7:11.699) D: Network info:
(14:03:03.913) (7:11.699) Type = 3
(14:03:03.913) (7:11.699) Name =
(14:03:03.913) (7:11.699) rssi = -113.50000
(14:03:03.913) (7:11.699) qual
(14:03:04.054) (7:11.840) = -8.00000
(14:03:04.054) (7:11.840) indc = 1
(14:03:04.054) (7:11.840) D: createReport successfully encoded 5434 bytes; maxHeap=29760, maxMessage=5562

ARM M7 terminal time block 2 to DataFlow_write

(14:03:23.920) (7:31.706) D: Stream 34 report  2 items, ts: 1704805382524, duration: 1024       
(14:03:23.920) (7:31.706) D: Stream 28 report 10 items, ts: 1704805374783, tick: 1000000
(14:03:23.920) (7:31.706) D: Stream 7 report 10 items, ts: 1704805375305, tick: 1000000
(14:03:23.920) (7:31.706) D: Stream 8 report 10 items, ts: 1704805375305, tick: 1000000
(14:03:23.920) (7:31.706) D: Stream 10 report 10 items, ts: 1704805375305, tick: 1000000
(14:03:23.920) (7:31.706) D: Stream 11 report 10 items, ts: 1704805375305, tick: 1000000
(14:03:23.920) (7:31.706) D: Stream 12 report 10 items, ts: 1704805375305, tick: 1000000
(14:03:23.920) (7:31.706) D: Stream 34 report 2 items, ts: 1704805383549, duration: 1024
(14:03:23.920) (7:31.706) D: Stream 30 report 2 items, ts: 1704805383576, tick: 1000000
(14:03:23.920) (7:31.706) D: Stream 34 report 2 items, ts: 1704805384573, duration: 1024
(14:03:23.920) (7:31.706) D: Stream 30 report 2 items, ts: 1704805385572, tick: 1000000
(14:03:23.920) (7:31.706) D: Stream 34 report 2 items, ts: 1704805385597, duration: 1024
(14:03:23.920) (7:31.706) D: Stream 34 report 2 items, ts: 1704805386621, duration: 1024
(14:03:23.920) (7:31.706) D: Stream 30 report 2 items, ts: 1704805387572, tick: 1000000
(14:03:23.920) (7:31.706) D: Stream 34 report 2 items, ts: 1704805387645, duration: 1024
(14:03:23.920) (7:31.706) D: Stream 34 report 2 items, ts: 1704805388669, duration: 1024
(14:03:23.920) (7:31.706) D: Stream 30 report 2 items, ts: 1704805389570, tick: 1000000
(14:03:23.920) (7:31.706) D: Stream 34 report 2 items, ts: 1704805389693, duration: 1024
(14:03:23.920) (7:31.706) D: Stream 34 report 2 items, ts: 1704805390717, duration: 1024
(14:03:23.920) (7:31.706) D: Stream 30 report 2 items, ts: 1704805391569, tick: 1000000
(14:03:23.920) (7:31.706) D: Stream 34 report 2 items, ts: 1704805391741, duration: 1024
(14:03:23.920) (7:31.706) D: Stream 28 report 10 items, ts: 1704805384783, tick: 1000000
(14:03:23.920) (7:31.706) D: Stream 34 report 2 items, ts: 1704805392765, duration: 1024
(14:03:23.920) (7:31.706) D: Stream 7 report 10 items, ts: 1704805385305, tick: 1000000
(14:03:23.920) (7:31.706) D: Stream 8 report 10 items, ts: 1704805385305, tick: 1000000
(14:03:23.920) (7:31.706) D: Stream 10 report 10 items, ts: 1704805385305, tick: 1000000
(14:03:23.920) (7:31.706) D: Stream 11 report 10 items, ts: 1704805385305, tick: 1000000
(14:03:23.920) (7:31.706) D: Stream 12 report 10 items, ts: 1704805385305, tick: 1000000
(14:03:23.920) (7:31.706) D: Stream 30 report 2 items, ts: 1704805393567, tick: 1000000
(14:03:23.920) (7:31.706) D: Stream 34 report 2 items, ts: 1704805393789, duration: 1024
(14:03:23.920) (7:31.706) D: Stream 34 report 2 items, ts: 1704805394813, duration: 1024
(14:03:23.920) (7:31.706) D: Stream 30 report 2 items, ts: 1704805395568, tick: 1000000
(14:03:23.920) (7:31.706) D: Stream 34 report 2 items, ts: 1704805395837, duration: 1024
(14:03:23.920) (7:31.706) D: Stream 34 report 2 items, ts: 1704805396861, duration: 1024
(14:03:23.920) (7:31.706) D: Stream 30 report 2 items, ts: 1704805397564, tick: 1000000
(14:03:23.920) (7:31.706) D: Stream 34 report 2 items, ts: 1704805397885, duration: 1024
(14:03:23.920) (7:31.706) D: Stream 34 report 2 items, ts: 1704805398909, duration: 1024
(14:03:23.920) (7:31.706) D: Stream 30 report 2 items, ts: 1704805399563, tick: 1000000
(14:03:23.920) (7:31.706) D: Stream 34 report 2 items, ts: 1704805399933, duration: 1024
(14:03:23.920) (7:31.706) D: Stream 34 report 2 items, ts: 1704805400957, duration: 1024
(14:03:23.920) (7:31.706) D: Stream 30 report 2 items, ts: 1704805401562, tick: 1000000
(14:03:23.920) (7:31.706) D: Stream 34 report 2 items, ts: 1704805401981, duration: 1024

ARM M4 Terminal read in from FPGA multiplexer (sensor_read)

(13:26:42.650) (5:37.490) D: Modbus block , ts: 1704803202177 (35)
(13:26:42.917) (5:37.757) D: Watchdog fed (15)
(13:26:43.361) (5:38.201) I: Since last read , 1000, 8ms, 964.67, 67.29, 21.35,-9999.00,-9999.00,280.00,-9999.00, 4.00E+02, 3.00E+00,-1.00E+04,-1.00E+04,-1.00E+04,-1.00E+04, 9.33E+00, -1.0000, 0.0000 (176)
(13:26:43.566) (5:38.406) D: buffer[ 6] : reg[ 2999] = 0.000000 (37)
(13:26:43.566) (5:38.406) D: buffer[ 7] : reg[ 3001] = 0.000000 (37)
(13:26:43.566) (5:38.406) D: buffer[ 8] : reg[ 3003] = 0.302053 (37)
(13:26:43.566) (5:38.406) D: buffer[ 9] : reg[ 3005] = 0.302257 (37)
(13:26:43.566) (5:38.406) D: buffer[ 3] : reg[ 3019] = 0.000000 (37)
(13:26:43.566) (5:38.406) D: buffer[ 4] : reg[ 3021] = 0.000000 (37)
(13:26:43.566) (5:38.406) D: buffer[ 5] : reg[ 3023] = 0.000000 (37)
(13:26:43.566) (5:38.406) D: buffer[ 0] : reg[ 3027] = 235.665619 (39)
(13:26:43.566) (5:38.406) D: buffer[ 1] : reg[ 3029] = 235.683350 (39)
(13:26:43.566) (5:38.406) D: buffer[ 2] : reg[ 3031] = 235.674484 (39)
(13:26:43.566) (5:38.406) D: buffer[13] : reg[ 3059] = 0.045604 (37)
(13:26:43.566) (5:38.406) D: buffer[12] : reg[ 3067] = -0.054661 (38)
(13:26:43.566) (5:38.406) D: buffer[14] : reg[ 3075] = 0.071186 (37)
(13:26:43.566) (5:38.406) D: buffer[15] : reg[ 3077] = -nan (33)
(13:26:43.566) (5:38.406) D: buffer[16] : reg[ 3079] = -nan (33)
(13:26:43.566) (5:38.406) D: buffer[17] : reg[ 3081] = 1.359375 (37)
(13:26:43.566) (5:38.406) D: buffer[18] : reg[ 3083] = 1.359375 (37)
(13:26:43.566) (5:38.406) D: buffer[19] : reg[ 3107] = -1.198602 (38)
(13:26:43.566) (5:38.406) D: buffer[10] : reg[ 3109] = 50.002655 (38)
(13:26:43.566) (5:38.406) D: buffer[11] : reg[ 3131] = 20.537350 (38)
(13:26:43.644) (5:38.484) D: buffer[23] : reg[45099] = 0.000000 (37)
(13:26:43.644) (5:38.484) D: buffer[24] : reg[45101] = 0.000000 (37)
(13:26:43.644) (5:38.484) D: buffer[25] : reg[45103] = 113.768578 (39)
(13:26:43.644) (5:38.484) D: buffer[20] : reg[45119] = 3.606540 (37)
(13:26:43.644) (5:38.484) D: buffer[21] : reg[45121] = 3.605746 (37)
(13:26:43.644) (5:38.484) D: buffer[22] : reg[45123] = 3.609017 (37)
(13:26:43.644) (5:38.484) D: Modbus block , ts: 1704803203177 (35)
(13:26:43.644) (5:38.484) D: Modbus report 2 items, ts: 1704803202177, tick: 1000000 (58)
(13:26:44.358) (5:39.198) I: Since last read , 999, 8ms, 964.76, 67.26, 21.36,-9999.00,-9999.00,280.00,-9999.00, 4.00E+02, 3.00E+00,-1.00E+04,-1.00E+04,-1.00E+04,-1.00E+04, 9.22E+00, -1.0000, 0.0000 (176)
(13:26:44.572) (5:39.411) D: buffer[ 6] : reg[ 2999] = 0.000000 (37)
M7 yellow JTAG left, M4 blue right for terminal diagnostics and DataFlow pipelining
My favorite game for Christmas which inspired this article <80)

--

--