An exercise in Discovery, Streaming data in the analytical world. -Part 4

George Leonard
9 min readAug 14, 2024

--

Apache Kafka, Apache Flink, Apache Iceberg on S3, Apache Paimon on HDFS, Apache Hive with internal standalone metastore (DerbyDB), external on PostgreSQL & on HDFS.

(See: Part 3)

(14 August 2024)

Computations/Aggregations:

Well in the end this is what all this is about actually (everything above was scaffolding, but nevertheless important to know and understand). This is where we want to take our raw feeds (salesbaskets & salespayments) and do the magic, analysis/aggregations to derive value/insight into what’s happening on the floor, how is business doing, really.

What follows is an overview end to end of the plan.

Important to know, if you pay attention, you will realize the same output is created using different methods, that was intentional. See how the different options can be used to get to the same end point and what’s involved, how is one easier or not than the other and what comes with the easy or what comes with a little bit more effort.

So now that we have an Overview, below I will first attempt to demonstrate the “kSql high level” version after which I discuss the “Apache Flink” option:

Next up is a more detailed view of what we’re going to be doing using KSQL inside the Apache Kafka cluster to create kStreams and kTables objects.

Aggregations via KSQL & kTable

What I do here is use KSQL to first create a stream object for the salesbaskets and salespayment source from the Kafka topics.

We then use KSQL to create a new KSQL object calls salescompleted, as a join between salesbaskets and salespayment, based on the invoiceNumber column.

The salescompleted stream is then used to create kTable objects, which output (using tumbling windows):

  • Sales per store per terminal per 5 min & hour
  • Sales per store per 5 min & hour

See crekSqlFlows directory for the SQL utilized to create the various kStream and kTable objects.

Example:

Create a stream object from source salesbaskets Kafka topic, same format/serialization as source. This becomes an input table for us.

CREATE STREAM avro_salesbaskets (
InvoiceNumber VARCHAR,
SaleDateTime_Ltz VARCHAR,
SaleTimestamp_Epoc VARCHAR,
TerminalPoint VARCHAR,
Nett DOUBLE,
Vat DOUBLE,
Total DOUBLE,
Store STRUCT<
Id VARCHAR,
Name VARCHAR>,
Clerk STRUCT<
Id VARCHAR,
Name VARCHAR,
Surname VARCHAR>,
BasketItems ARRAY< STRUCT<
id VARCHAR,
Name VARCHAR,
Brand VARCHAR,
Category VARCHAR,
Price DOUBLE,
Quantity integer >>)
WITH (KAFKA_TOPIC='avro_salesbaskets',
VALUE_FORMAT='Avro',
PARTITIONS=1);

Create a stream object from source salespayments Kafka topic. This is our second input table.

CREATE STREAM avro_salespayments (
InvoiceNumber VARCHAR,
FinTransactionId VARCHAR,
PayDateTime_Ltz VARCHAR,
PayTimestamp_Epoc VARCHAR,
Paid DOUBLE )
WITH (
KAFKA_TOPIC='avro_salespayments',
VALUE_FORMAT='Avro',
PARTITIONS=1);

Now let’s create our salescompleted stream, this will hold a joined output document constructed from the previous 2 streams… We demostrate a similar join/output later using Flink SQL.

CREATE STREAM avro_salescompleted WITH (
KAFKA_TOPIC='avro_salescompleted',
VALUE_FORMAT='Avro',
PARTITIONS=1)
as
select
b.InvoiceNumber,
as_value(p.InvoiceNumber) as InvNumber,
b.SaleDateTime_Ltz,
b.SaleTimestamp_Epoc,
b.TerminalPoint,
b.Nett,
b.Vat,
b.Total,
b.store,
b.clerk,
b.BasketItems,
p.PayDateTime_Ltz,
p.PayTimestamp_Epoc,
p.Paid,
p.FinTransactionId
from
avro_salespayments p INNER JOIN
avro_salesbaskets b
WITHIN 7 DAYS
on b.InvoiceNumber = p.InvoiceNumber
emit changes;

With the above created we can now do an aggregation, the below creates a output kTable, with a tumbling window over 5 minutes.

CREATE TABLE avro_sales_per_store_per_5min WITH (
KAFKA_TOPIC='avro_sales_per_store_per_5min',
VALUE_FORMAT='AVRO',
PARTITIONS=1)
as
SELECT
store->id as store_id,
as_value(store->id) as storeid,
from_unixtime(WINDOWSTART) as Window_Start,
from_unixtime(WINDOWEND) as Window_End,
count(1) as sales_per_store
FROM avro_salescompleted
WINDOW TUMBLING (SIZE 5 MINUTE)
GROUP BY store->id
EMIT FINAL;

Aggregations via Apache Flink:

In this scenario we use Apache Flink to mirror some of what was done above using kSql previously, but this time using Apache Flink SQL and some additional magic.

In the Apache Flink case we do things in 2 steps (I just found it easier), first we create a Apache Flink table (I like to think of this as a virtual table as nothing is actually persisted in the table, further, because the table itself actually only points to the salesbaskets and salespayments Kafka topic’s as sources), we then execute a insert statement into the table selecting from source/s.

When interacting with the Apache Flink table it engages a Kafka consumer via the configured source topic specified.

When the virtual table is defined a connector parameter is configured which is either “upsert-kafka” or “kafka”. The “kafka” connector works perfectly for sourcing data or inserting/appending data/records to the back’ing Kafka topic. Pretty much how Kafka works as an immutable log.

The upsert-kafka, is however useful when consuming data from a topic, updating the Apache Flink table/record. As such upsert-kafka requires a primary key to be defined to enable it to find the record being manipulated.

See: for more on the subject.

Once the 2 source virtual tables are created, we create our 3rd table, this time it’s an output, called ‘salescompleted’. An Insert/join statement is then executed that join the 2 input tables: salesbasket and salespayments on the invoiceNumber column. By executing this insert statement data is published onto the ‘salescompleted’ topic hosted on the Kafka cluster. Now the fun begins.

The above Apache Flink SQL run on the Apache Flink cluster. If executed as per above, you will see a shorted version of you command as the description. To make it more descriptive see the SET syntax, i.e.:

SET 'pipeline.name' = 'Sales Basket[Source/Target] - Kafka[Topic/Table ]' ;

Using the above will assign the value in the quotes as the description in Running Jobs view.

From here we do aggregations, first up was sales per store per terminal per 5 min. Again, we create an output table followed by the required insert statement. That was the simple / easy one… Next up we want to compute:

  • sales per store per product (name key) per time window (hour or minutes or xxx),
  • sales per store per brand per time window
  • sales per store per category per time window
  • sales per store per terminal per time window

What I have not mentioned previously, if you look at the basketItems array of objects, you will realize it’s a nested data set (complex structure). If we want to execute aggregations on the objects in the array, we will first need to unnest the array into a flat structure.

This is done by creating a table that is flat, for which each record will be inserted into the output table for each object from the basketItems arrays, associated with the original salesbasket invoiceNumber.

unnested_sales document

  {
"invoiceNumber": "1341243123341232",
"saleDateTime_Ltz": "2023-12-23T16:53:39.911+02:00",
"saleTimestamp_Epoc": "1718117619911",
"storeid" : "1033",
"product":"Minty Frsh",
"brand": "Colgate",
"salesvalue":12412.00,
"category": "Healthcare"
}

This table can then be used as a source for the required select statements with required group by clauses based on time of sales. To improvement performance, we include a filter to run against recent data only.

Consider the difference in output that emit change vs emit final has.

  • A “emit changes” outputs data, new value for the aggregations as it arrives, in this case into the salescompleted table followed by the unnested_sales table.
  • A “emit final” outputs data at the end of the window tumble period.

Because we have a unnested structure we now have a record that can also happily, easily be sink’d into a “old style” RDBMS (rows and columns), even though our old-style RDBMS database engines themselves are extending their capabilities to include storage of JSON structured records as a field/column type.

Data serialization format’s

First there was Protobuf and then Avro came, lets not forget the JSON and then serialized/schema’s JSON known as JSOND.

What is it and why…

  • Well, what’s transferred across the wire by Kakfa, and stored inside the topic/partitions/data files is a byte representation of what you published…
  • But working with RAW bytes are no fun, so we Serialise the text into a format (above…), transport it across Kafka and then Deserialise the other end.
  • Associated with the Serialisation/de-serialisation commonly known as Serdes is the schema registry. A full set of articles, but at the simplest, the old adage of Garbage in Garbage out applies, schema registry is your contract, making sure what’s publish abide by what was agreed on, which results in trust, quality, value out.

Avro Schema

Lesson: A word of warning: Case Sensitivity between Apache Flink SQL’s: Create table <>, Select <> From <> and Kafka schema registry entries matters.

Create your pipeline, bit by bit, processing using small pieces of work, making sure each step does what you want, before moving onto the next, at least initially, once they work you can consolidate them. The old saying of, how do you an elephant, one byte at a time applies.

Data Lineage

Below is a Diagram depicting how the original 2 topics are “grown” moved around, joined and pushed to the various storage options via the different sections of this project. There will be a more section specific diagram with each.

Examples:

The below builds a table avro_salescompleted as a Flink Table, backed/sourced from the Kafka topic/kSql created table via a Stream join.

CREATE TABLE avro_salescompleted (
INVNUMBER STRING,
SALEDATETIME_LTZ STRING,
SALETIMESTAMP_EPOC STRING,
TERMINALPOINT STRING,
NETT DOUBLE,
VAT DOUBLE,
TOTAL DOUBLE,
STORE row<ID STRING, NAME STRING>,
CLERK row<ID STRING, NAME STRING, SURNAME STRING>,
BASKETITEMS array<row<ID STRING, NAME STRING, BRAND STRING, CATEGORY STRING, PRICE DOUBLE, QUANTITY INT>>,
FINTRANSACTIONID STRING,
PAYDATETIME_LTZ STRING,
PAYTIMESTAMP_EPOC STRING,
PAID DOUBLE,
SALESTIMESTAMP_WM AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(SALETIMESTAMP_EPOC AS BIGINT) / 1000)),
WATERMARK FOR SALESTIMESTAMP_WM AS SALESTIMESTAMP_WM
) WITH (
'connector' = 'kafka',
'topic' = 'avro_salescompleted',
'properties.bootstrap.servers' = 'broker:29092',
'scan.startup.mode' = 'earliest-offset',
'properties.group.id' = 'testGroup',
'value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' = 'http://schema-registry:8081',
'value.fields-include' = 'ALL'
);

We now going to use the above as a source, where we going to output the group by from this into the below table, backed by topic which we will sink to MongoDB via connector.

CREATE TABLE avro_sales_per_store_per_terminal_per_5min (
store_id STRING,
terminalpoint STRING,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
salesperterminal BIGINT,
totalperterminal DOUBLE,
PRIMARY KEY (store_id, terminalpoint, window_start, window_end) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'avro_sales_per_store_per_terminal_per_5min',
'properties.bootstrap.servers' = 'broker:29092',
'key.format' = 'avro-confluent',
'key.avro-confluent.url' = 'http://schema-registry:8081',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = 'http://schema-registry:8081',
'value.fields-include' = 'ALL'
);
Insert into avro_sales_per_store_per_terminal_per_5min
SELECT
`STORE`.`ID` as STORE_ID,
TERMINALPOINT,
window_start,
window_end,
COUNT(*) as salesperterminal,
SUM(TOTAL) as totalperterminal
FROM TABLE(
TUMBLE(TABLE avro_salescompleted, DESCRIPTOR(SALESTIMESTAMP_WM), INTERVAL '5' MINUTES))
GROUP BY `STORE`.`ID`, TERMINALPOINT, window_start, window_end;

Aggregations via MongoDB Change Stream Processing

  • Sales per store per terminal per hour / per day
  • Sales per store per product per hour / per day
  • Sales per store per brand per hour / per day
  • Sales per store per category per hour / per day

Each of the above include a count and a monetary value.

What to do with the results:

Dashboards/Charts in MongoDB

The thinking is to output the aggregated values from the MongoDB Change Streams and sink the values back into collections and then build dashboards using MongoDB Charts, all nicely inside the MongoDB Atlas platform.

The big very obvious bit here is, it’s very simple, it all local inside the MongoDB eco system, it’s all build on the very well know MongoDB aggregation framework.

Edit: (15 Aug) Removed “unnested_sales” collection as it’s not required in MongoDB Atlas for computations/aggregations on nested objects/arrays in a document. Above is the before, below is the after edit.

Apache Flink Catalog

See catalogs-in-flink-sql-a-primer & catalogs-in-flink-sql-hands-on by Robbin Moffatt. You will notice in his Decodable GIT repo he also shows how to do a catalog using Hive and how to use PostgreSQL and JDBC driver.

See my GIT repo for the entire document and code/article.

About Me

I’m a techie, a technologist, always curious, love data, have for as long as I can remember always worked with data in one form or the other, Database admin, Database product lead, data platforms architect, infrastructure architect hosting databases, backing it up, optimizing performance, accessing it. Data data data… it makes the world go round.

In recent years, pivoted into a more generic Technology Architect role, capable of full stack architecture.

George Leonard

georgelza@gmail.com

--

--

George Leonard

I'm a techie, a technologist, technology architect, full stack architect, Always curious, Love data and data platforms.