Real time stock con Confluent Kafka

Giulio Scotti
Quantyca
Published in
11 min readJul 23, 2019

Come usare Kafka per calcoli in real-time

Photo by Chris Liverani on Unsplash

Ciao a tutti, dopo aver pubblicato il mio primo post “Progettare un datawarehouse moderno”, in cui ho affrontato le principali tematiche di design di un datawarehouse al giorno d’oggi, in questo secondo articolo vorrei presentare un caso d’uso reale di Kafka e della Confluent Platform che si è presentato nella mia esperienza progettuale in Quantyca — Data At Core.

Un’applicazione che, per sua natura, si presta ad essere implementata per mezzo delle tecnologie basate su Kafka e sulla Confluent Platform è il calcolo dello stock in real-time. Si supponga, infatti, di dover processare le vendite di una catena di negozi, effettuabili sia da canale diretto (in store) sia online, e di voler rendere disponibile a diversi sistemi consumatori in tempo reale l’informazione sull'esatta giacenza di ogni prodotto presente in ciascun negozio della catena.

Per una buona comprensione dell’articolo, è consigliabile avere qualche conoscenza di base di Kafka, Zookeeper e degli altri tool disponibili nella Confluent Platform:

  • Kafka Connect
  • Schema Registry (e qualche infarinatura di Avro come protocollo di serializzazione)
  • KSQL

Descrizione del problema: overview

Nel caso reale che ci è capitato di affrontare, si partiva da una situazione as-is in cui lo stock veniva calcolato da flussi batch a frequenza giornaliera, in fascia notturna. In particolare, i contributi alle variazioni di giacenza erano determinati dai movimenti di magazzino, recepiti dal sistema in tempo reale, e dalle vendite in negozio, elaborate massivamente in modalità batch al termine della giornata di vendita.

Nel momento in cui la società decide di avviare il canale di vendita online, uno scenario simile non è più adeguato: il sito e-commerce, per poter accettare o rifiutare gli ordini ed evitare che l’utente ordini dei prodotti che nel frattempo si siano esauriti, ha bisogno di conoscere lo stato della giacenza dei prodotti nei negozi in tempo reale, non lo snapshot aggiornato al giorno precedente.

Inoltre, nella situazione as-is, le informazioni sullo stock venivano utilizzate solamente da un unico sistema consumatore, che rappresentava l’owner dei dati di stock. Nello scenario to-be, lo stock sarà calcolato in real time dai flussi e sarà fruito presumibilmente da diversi sistemi, che ne potranno far uso a proprio piacimento: uno di questi sarà appunto il sito web di e-commerce.

Tuttavia, sul sistema che che attualmente è l’owner dello stock sono basate delle logiche legacy che sarebbe troppo oneroso sostituire o riprogettare in toto, per via degli impatti su diversi altri sistemi del cliente che svolgono parti critiche dei processi di business. Per tali ragioni, è richiesto di garantire ancora l’alimentazione di questo sistema con i dati dei movimenti di magazzino, delle vendite fisiche e degli ordini e-commerce (contributo nuovo non presente nell’as-is), che il sistema continuerà a processare una volta al giorno per costruirsi la sua versione master dello stock. Per l’importanza che ricopre questo sistema dentro il panorama della società, è preferibile che questo rimanga l’owner dello stock, anche se in tutti gli istanti della giornata, eccetto appena dopo le sue elaborazioni massive, lo stato dello stock che contiene non sarà aggiornato in tempo reale. Per garantire che le due versioni dello stock (quella master e quella calcolata in real time) non divergano, si vuole prevedere la possibilità di riallineare il flusso dello stock real time con l’informazione del sistema owner a determinati checkpoint periodici, appena dopo che l’owner ha terminato le sue elaborazioni massive (negli unici istanti della giornata in cui la versione master dello stock risulta up-to-date).

Descrizione del problema: dettagli

E’ disponibile su un database Oracle una tabella SOURCE_MOVEMENTS contenente le informazioni dei movimenti di magazzino inserite in tempo reale, tra cui il codice del prodotto, il codice del negozio, il momento in cui il movimento (di carico o scarico dal magazzino) è avvenuto e la quantità movimentata, che determina una variazione di stock, positiva nel caso di un rifornimento, negativa nel caso di un trasferimento merce verso un altro negozio.

Si prevede, inoltre, di avere una tabella STOCK_TABLE_MASTER, dove viene inserito lo snapshot “master” dello stock calcolato dall’owner durante la sua elaborazione batch giornaliera. Appena reso disponibile, questo snapshot verrà considerato come nuovo stato della giacenza a t0 da cui far ripartire il flusso di calcolo dello stock in tempo reale, secondo la logica di riallineamento descritta in precedenza.

Il sistema owner è un database Oracle.

Gli obiettivi che ci si pone sono i seguenti:

  • Calcolare lo stock in real-time e rendere disponibile lo stato aggiornato nella tabella STOCK_TABLE su database Oracle;
  • Depositare su due tabelle dedicate (rispettivamente DEST_SALES e DEST_ORDERS) su Oracle i dati delle vendite retail e degli ordini e-commerce, per alimentare il sistema owner e permettere al sistema di continuare ad eseguire le sue elaborazioni batch
  • Implementare la procedura di riallineamento notturno del flusso di stock real-time con la versione “master” contenuta nella tabella STOCK_TABLE_MASTER presente su Oracle

Proposta di soluzione: overview

La soluzione che si prevede di adottare ha come elemento centrale il data bus, che fornisce una vera e propria streaming platform per l’implementazione dei flussi real time di stream processing.

Come primo step è necessaria l’importazione dei messaggi di ciascuno dei contributi che generano una variazione di stock dai rispettivi sistemi sorgente. Ogni contributo sarà importato, ad un primo livello, in una coda dedicata sul data bus.

Il secondo step richiede di convogliare gli stream dei diversi contributi in un unico stream che rappresenti l’insieme delle variazioni di stock: per far questo è necessario trasformare gli schemi dei messaggi sorgente dei singoli contributi in uno schema comune per l’unica coda contenente le variazioni di stock.

Dalla coda delle variazioni di stock sarà possibile prelevare le informazioni rilevanti dei singoli contributi, opportunamente filtrati, per esportarli nel database esterno che rimarrà l’owner dello stock.

La coda unica di delta stock così ricavata sarà la base per il calcolo dello snapshot corrente dello stock, che è calcolabile sommando in modo continuo le variazioni di giacenza che si osservano nella coda, raggruppando per ogni coppia <prodotto, negozio>.

I risultati in tempo reale del calcolo saranno depositati anch'essi su una coda dedicata, da cui potranno essere esportati verso il database esterno per garantire maggior facilità di consultazione.

L’immagine seguente illustra un diagramma dei flussi che si intende realizzare per arrivare alla soluzione desiderata.

Diagramma dei flussi della soluzione proposta

Vista la panoramica ad alto livello, approfondiamo meglio i dettagli.

E’ necessario importare sul data bus tramite flussi real-time i dati dei tre contributi che andranno a creare variazioni di stock:

  • movimenti di magazzino: estratti da database Oracle (tabella SOURCE_MOVEMENTS_TABLE) tramite JDBC Source Connector, che fa polling incrementale sulla tabella per ricevere i record nuovi o modificati e li produce come messaggi su un topic Kafka dedicato, in formato Avro;
  • vendite da negozio: prodotte da applicativi a monte su un topic Kafka dedicato in formato JSON
  • ordini online: prodotti da applicazioni terze su un topic Kafka dedicato in formato JSON

Avremo quindi a disposizione i tre contributi su tre topic e sarà necessario effettuare delle conversioni tramite il framework di stream processing KSQL per convertire i dati delle vendite e degli ordini (che sono prodotti in JSON) in formato Avro, che è il formato standard con cui vogliamo serializzare i messaggi da questo punto in avanti della pipe di processing. Si sceglie di usare KSQL come framework di stream processing perché permette di definire vere e proprie applicazioni streaming sotto forma di continuous queries definite in un linguaggio SQL-like, che fornisce un livello di astrazione superiore rispetto all'utilizzo delle API Java offerte dalla libreria Kafka Streams: KSQL dà la possibilità di svilippare applicazioni streaming senza richiedere una conoscenza profonda di Java e delle tecniche di sviluppo software.

La scelta di gestire il formato in Avro si coniuga bene con l’utilizzo dello Schema Registry come repository centrale di memorizzazione e versionamento degli schemi, che ne permette l’evoluzione controllata applicando dei controlli di compatibilità ogni qual volta un’applicazione producer o consumer chieda di registrare e utilizzare una nuova versione di un certo schema dati. L’introduzione dello Schema Registry garantisce maggior robustezza all'architettura, evitando la rottura delle applicazioni dovuta a incompatibilità tra lo schema usato per produrre i messaggi e quello usato per consumarli. Inoltre, salvando lo schema dei messaggi nello Schema Registry, questo può essere omesso dal body dei messaggi scritti sui topic Kafka, nei quali sarà inserito solamente un riferimento (id univoco) allo schema nello Schema Registry.

In aggiunta ai tre contributi ordinari, dobbiamo considerare anche un quarto contributo straordinario, che è rappresentato dal JDBC Source Connector utilizzato per importare lo snapshot master dello stock in seguito al riallineamento del sistema owner, in modalità bulk.

Per costruire le applicazioni di stream processing che ci servono, dobbiamo definire degli oggetti stream sopra ai topic corrispondenti ai quattro contributi. Uno stream è una delle possibili astrazioni con cui si può modellare il contenuto di un topic Kafka: è una struttura gestita in append, in cui ogni nuovo messaggio che viene prodotto nel topic viene appeso allo stream di eventi. Definiamo gli stream tramite KSQL.

A questo punto dobbiamo unire i quattro stream dei diversi contributi in un unico stream che rappresenti la UNION degli stream sorgente, per far convogliare in un unico flusso tutte le variazioni subite dalla giacenza di stock. Chiamiamo questo stream DELTA_STOCK: definiamo la logica che lo alimenta con quattro statement KSQL.

Nella logica di UNION nello stream DELTA_STOCK, ogni messaggio viene marcato con un campo che tiene traccia del tipo di contributo sorgente (indica se si tratta di un movimento, di una vendita o di un ordine). Questo è utile per poter passare al database Oracle, che è l’owner dello stock, i dati delle vendite e degli ordini, per permettere la sua elaborazione notturna. Ordini e vendite vengono filtrati dal DELTA_STOCK e inseriti in due tabelle dedicate in Oracle in modalità upsert tramite JDBC Sink Connector.

Sullo stream di DELTA_STOCK si basa il calcolo dello snapshot corrente della giacenza (real time stock): il calcolo viene fatto tramite aggregazione sulla coppia <PRODUCT_COD, STORE_COD>, applicando una somma continua delle variazioni di quantità (i delta). Si tratta di un’applicazione di stream processing stateful, creata tramite KSQL: ogni nuovo messaggio in ciascun contributo che genera una variazione di giacenza viene prodotto nello stream DELTA_STOCK e innesca l’aggiornamento della tabella di stock corrente.

Infatti, il risultato dell’aggregazione su uno stream da luogo ad una tabella, che è l’altro tipo di astrazione che si può costruire nelle applicazioni di stream processing: una tabella, così come uno stream, è persistita in un topic Kafka. A differenza di uno stream, una tabella è gestita in upsert e tiene solamente l’ultimo valore inserito nel topic per ogni chiave. Di fatto, mentre uno stream racconta la storia degli eventi accaduti, la tabella contiene lo snapshot dello stato corrente.

I consumatori interessati a fruire della fotografia dello stock aggiornato in tempo reale si possono sottoscrivere al topic che persiste la tabella dello stock: in questo modo ricevono in tempo reale gli aggiornamenti della giacenza di ogni prodotto in ogni negozio, ogni qual volta avviene un evento che ne provochi una variazione.

Come accennato in precedenza, per garantire una consultazione più agevole tramite un qualsiasi SQL client, si vogliono esportare in tempo reale gli aggiornamenti della giacenza prodotti sul topic nel database Oracle: a questo scopo si avvia un JDBC Sink Connector che consuma i dati del topic e li scrive in una tabella relazionale, in modalità upsert. Questa modalità permette di mantenere il numero di righe della tabella relazionale limitato ed evita la duplicazione delle chiavi su cui è calcolato lo stock: un’alimentazione che lavora in upsert ha però un costo maggiore rispetto ad una che opera in append e non è supportata per tutti i DBMS, perché dipende dalla particolare sintassi messa a disposizione dal DBMS per realizzare una logica di update o insert.

Esempio semplificato

Mostriamo ora un esempio di quello che ci aspettiamo di ottenere, considerando una situazione iniziale della giacenza e simulando di effettuare vendite e ordini.

Supponiamo di avere all'istante t0 uno stato di giacenza come il seguente, contenuto nella tabella Oracle STOCK_TABLE_MASTER:

Stato dello stock (master) al 17 Aprile 2019 07:36:40

A questo punto, il 17 Aprile alle ore 08:38 viene effettuata una vendita di 3 unità del prodotto PROD1 nel negozio STORE1. Produciamo questa informazione in formato JSON nel topic SALES_LINES_TOPIC.

curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" --data '{"records":[{"key":"STORE1","value":{"STORE_COD":"STORE1", "PRODUCT_COD":"PROD1", "SOLD_QTY":3}}]}' "http://ext_broker:8082/topics/SALES_LINES_TOPIC"

Il giorno dopo alla stessa ora viene effettuato un ordine e-commerce di 4 unità del prodotto PROD2 dal negozio STORE2. Produciamo questa informazione in formato JSON nel topic ORDERS_LINES_TOPIC.

curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" --data '{"records":[{"key":"STORE2","value":{"STORE_COD":"STORE2", "PRODUCT_COD":"PROD2", "SOLD_QTY":4}}]}' "http://ext_broker:8082/topics/ORDERS_LINES_TOPIC"

Interrogando la tabella con lo snapshot dello stock corrente su Oracle alle 08:38 del 17 Aprile, si ottiene la situazione di stock seguente:

Stato dello stock (aggiornato in tempo reale) al 18 Aprile 2019 08:38:02

L’elaborazione mostrata avviene in tempo reale. Lo stesso risultato sarà calcolato anche dal sistema legacy in modalità batch durante l’elaborazione nottura. Al termine dell’elaborazione legacy, una procedura di riallineamento forzerà lo snapshot calcolato dal sistema legacy come nuovo stato a t0 della giacenza nel sistema di calcolo in tempo reale, per assicurare di mantenere una consistenza tra i due flussi.

Componenti architetturali

Nell'immagine seguente si può osservare uno schema raffigurante i diversi componenti dell’architettura prevista.

Architettura della soluzione

Un DBMS relazionale (nel caso specifico viene utilizzato Oracle) rappresenta la sorgente dei movimenti di magazzino e della versione master dello stock, ma anche la destinazione delle vendite, degli ordini e dei dati di stock in tempo reale.

Kafka Connect viene utilizzato come framework in cui deployare i connettori di integrazione con il database, sia in input, tramite l’uso di JDBC Source Connector, sia in output, tramite i JDBC Sink Connector.

Per lo sviluppo e il deploy delle logiche di stream processing si fa uso del framework KSQL.

Sia Kafka Connect sia l’engine KSQL comunicano con lo Schema Registry per la registrazione e il recupero degli schemi Avro con cui sono prodotti e consumati i messaggi nei/dai topic.

Demo

Nel repository demo-dai-kafka trovate una demo semplificata con cui verificare la soluzione proposta. Nel file readme trovate le istruzioni per eseguire i comandi step-by-step (sono circa una ventina) e arrivare al risultato. La demo è pensata per essere semplice e user-friendly: vi permette di capire meglio la logica di calcolo dello stock e di prendere confidenza con le API che i vari componenti della Confluent Platform mettono a disposizione per eseguire i comandi senza richiedere installazioni di particolari tool client. Il tutto si basa su un’infrastruttura virtualizzata a container, facile da avviare, per permettervi di concentrarvi sui passaggi logici che eseguite. Enjoy!

Conclusione

In questo articolo si è visto come è possibile utilizzare l’ecosistema Kafka e i tool della Confluent Platform per creare un sistema in grado di calcolare in tempo reale lo stock dei prodotti per una qualunque attività di vendita, comprendendo sia vendite fisiche che ordini online. L’utilizzo di Kafka ha permesso di ottenere una soluzione estremamente affidabile e veloce, passando da quello che era un calcolo massivo ad un flusso real-time streaming, rendendo disponibile ai consumatori lo stato corrente dello stock.

L’applicazione si è limitata ovviamente ad un singolo caso d’uso ma non è difficile immaginare di espandere quanto fatto a possibili altri casi.

Ciascun tool della piattaforma mette a disposizione una serie di utility per potersi interfacciare e per permetterne le attività di amministrazione e di monitoraggio. Oltre alle utility ufficiali della distribuzione, che siano API REST, script di shell o vere e proprie CLI, è reperibile online un ampio panorama di altri tool sviluppati da terzi, per rendere più agevole l’esperienza utente con Kafka.

Vista la vastità di scelta, può risultare non banale selezionare il sottoinsieme di tool che fanno al proprio caso. Ad esempio, per il monitoraggio dei componenti della piattaforma, suggeriamo di utilizzare i seguenti tool:

Il monitoraggio dei componenti e delle applicazioni che utilizzano Kafka è di per se un aspetto di primaria importanza: in Quantyca stiamo approfondendo soluzioni basate sullo stack Elastic, che probabilmente sarà oggetto di un blogpost successivo.

Se ti è piaciuto questo post e sei interessato alle nostre attività, seguici su Linkedin!

--

--