Détection des abandonnistes avec ksqlDB

Julien Guyard
Jun 4, 2020 · 9 min read

Préambule

Dans mon travail d’Engineering Manager à la DSI AXA France, on me demande souvent dans les phases de design “Est-ce que dans ce cas là, je peux faire du streaming?” ou même parfois plus simplement “C’est quoi un stream?

Alors évidemment, vous ne lirez rien dans cet article qui concerne Netflix ou Youtube. C’est tout aussi ludique pourtant, mais on parle bien ici de data streaming. Et puisque chez AXA France, notre plateforme évènementielle est Kafka, on va parler… de Kafka Stream? Presque.

Un premier niveau de réponse pourrait être qu’un stream ce n’est que la représentation logicielle d’un flux de données continu et illimité. Pour faire encore plus simple: c’est un programme qui réagit à de la donnée en mouvement.

Si vous êtes novices dans le développement de stream, cette réponse n’est sûrement pas éclairante.

Et c’est là qu’intervient le sujet des abandonnistes. Kézako?

En marketing, abandonniste désigne un acheteur potentiel qui abandonne/quitte le processus d’achat dans lequel il était engagé avant la fin, et donc avant d’avoir finalisé son achat.

Finalement, quoi de mieux qu’un cas métier concret pour illustrer comment ce pattern de développement peut résoudre un problème qu’il aurait été plus compliqué de traiter avec une approche classique RPC/“batch processing”?

Le besoin métier auquel je vais tenter de répondre avec du streaming est donc: comment détecter ces fameux abandonnistes?

La notion de temps est évidemment essentielle dans ce sujet. Quand un client entre sur un parcours métier, il va prendre plus ou moins longtemps pour aller au bout. Cela peut dépendre de tout un tas de critères: complexité du parcours, appétence du client au digital, etc.

Décider de la durée à partir de laquelle un client doit être considéré comme abandonniste n’est donc pas une mesure universelle mais bien une propriété métier, définie en phase de design avec le métier.

Une architecture évènementielle est ici un excellent levier pour modéliser la situation d’un client abandonniste.

On peut en effet considérer que pour chaque “acteur” (prospect/client/etc.), l’entrée sur un parcours génère un évènement d’entrée, et que la conclusion d’un parcours génère à son tour un évènement de sortie.

Jusqu’ici tout va bien normalement…

Δmax est donc la durée maximum qu’on accorde à un utilisateur pour finir un parcours avant qu’il ne soit considéré comme abandonniste.

Une des difficultés lorsqu’on essaie de modéliser par l’informatique ce que doit être un comportement humain nominal est le traitement des cas à la marge qui génèrent ici des faux positifs. Par exemple:

  • (cas 1) un parcours métier à base de formulaires où le retour arrière navigateur déclenche une remise à 0 des entrées de la page précédente. Si l’utilisateur doit resaisir toutes ces données ne doit-on pas remettre à 0 le chrono du Δmax?
  • (cas 2) un utilisateur qui ferme sa page par erreur mais revient dans le parcours, comment peut-on le traiter pour qu’il ne soit pas considéré comme un abandonniste s’il finit son parcours dans les temps?
  • (cas n) et bien d’autres cas à la marge encore…

Ainsi, il faut accepter qu’il est compliqué de modéliser finement tous les scénarios et qu’il est essentiel d’avoir une discussion avec le métier pour comprendre la photographie d’ensemble. Chez AXA France, c’est en atelier d’Event Storming que l’on dessine les contours de la solution.

En choisissant par exemple un Δmax assez large, on peut imaginer se prémunir de la plupart des faux positifs du cas 1. On perd alors éventuellement en réactivité sur le traitement appliqué suite à cette alerte, mais on répond simplement au besoin à coûts acceptables.

Bon, la théorie c’est bien, mais ça ne vous dit pas qu’on le mette en œuvre?

Proof Of Concept

Introduction

Pour bien comprendre ce pattern de streaming, il est indispensable de pratiquer.

Imaginons un parcours métier complexe à plusieurs étapes.

La première étape lancera le chronomètre du Δmax en envoyant un évènement start. La sortie du parcours déclenchera un évènement stop. Pour ajouter un peu de “réalité opérationnelle”, je vais simuler des “ retours-arrière” parcours où certaines étapes critiques peuvent réinitialiser le chrono du Δmax en déclenchant elles-mêmes un évènement start.

La DSL Kafka Stream propose différentes façons de modéliser des évènements dans le temps. L’une de ces façons nous inspire ici, puisqu’elle représente assez bien la notion d’activité dans un parcours utilisateur: la window session

Copyright © Confluent, Inc. 2014–2020

Je vais donc essayer de travailler avec cette window session pour construire des agrégats pour chaque acteur de mon parcours.

Do It Yourself

Environnement

C’est maintenant que se crée l’ascenseur émotionnel. Normalement vous avez quand même lu le titre de l’article, je ne vais pas utiliser Kafka Stream, mais bien ksqlDB.

Ma motivation est assez simple. Si vous débutez dans le monde du streaming, ksqlDB est un formidable levier pour s’exercer, faire des POC, créer de l’effet “Waouh”, sans noyer votre interlocuteur dans du code Java.

ksqlDB en quelques mots, c’est l’engine SQL au dessus de la librairie Kafka Stream. Le premier génère le second.

Ce slide de Kai Wähner, évangéliste chez Confluent, résume assez bien la différence:

Copyright © Confluent, Inc. 2014–2020

Voilà ce dont nous aurons besoin:

  • kafka, évidemment
  • zookeeper, malheureusement encore ;)
  • ksqldb-server, l’engine nécessaire à l’exécution des requêtes SQL-like
  • ksqldb-cli, la command line interface qui va nous permettre de construire ensemble notre topologie de stream.
  • kafkahq, renommé dernièrement AKHQ, par confort et habitude dans les étapes de vérification de mon exercice. Et aussi parce que c’est développé avec passion par un français proche de chez moi et que c’est un outil qui mérite d’être connu ;-)

Pour injecter mon jeu de données, j’utiliserai en complément kafkacat.

Design

Travaillons avec une structure d’évènement simple pour nous concentrer uniquement sur la problématique des abandonnistes:

{
"eventId": "5ece8acbf9269864aea2c080",
"eventDateTime": "2020-05-28T10:00:00.000+02:00",
"actorId": "yellow",
"state": "start|stop"
}

Notre topic Kafka d’entrée s’appellera userActivity.

Je vais choisir arbitrairement un Δmax de 5 minutes, et chaque utilisateur sera nommé par une couleur dont voici la représentation dans notre jeu de données:

Représentation de mon jeu de données
Représentation de mon jeu de données
  • Le cas jaune est intéressant puisqu’il représente un faux-positif d’abandonniste. Pour le moment, j’accepte ce faux-positif et considère qu’il est le fruit d’un Δmax mal calibré.
  • Le cas violet représente un acteur qui n’est pas revenu en arrière et qui a terminé son parcours dans les temps. Il n’est pas abandonniste.
  • Le cas orange représente un acteur qui a fait un retour arrière et n’a pas terminé son parcours. Il est abandonniste.
  • Le cas vert représente un acteur qui a fait un retour arrière et qui a terminé son parcours dans les temps. Il n’est pas abandonniste puisque le Δmax a été réinitialisé lors de son retour arrière.

Voilà la topologie que vais construire:

Topologie de streaming
Topologie de streaming

Je représente mon topic d’entrée par un stream appelé stream_userActivity. A partir de ce stream, je construis un agrégat sur le temps que j’appelle table_leaverDetection. Cet agrégat est lui-même représenté par un topic leaverDetection.

Si vous êtes toujours perdu, rassurez-vous, c’est maintenant que tout peut devenir plus clair….

Allez on pratique

Commencez par cloner mon repository: https://github.com/juguyard/kafkadiving.git

…et naviguez dans le répertoire abandonnistes.

Lancement de l’environnement complet:

docker-compose up -d

On injecte ensuite le jeu de données avec kafkacat:

docker run --tty --mount type=bind,source=/input,target=/input --network abandonnistes_default confluentinc/cp-kafkacat kafkacat -b kafka:39092 -t userActivity -P -l -K: /input/userActivity.json

Par bonne pratique, j’ai déjà positionné en amont dans userActivity.json en clé de message l’actorId. Le partitionnement est donc déjà réalisé dans Kafka.

Dans kafkaHQ (localhost:8080), je retrouve bien mes 9 évènements correctement partitionnés mais - et c’est logique - l’heure du message kafka n’est pas l’heure du message contenue dans le json:

Évidemment, avec kafkacat j’ai tout injecté en 1 bloc, mais cette problématique est intéressante à traiter puisqu’elle reflète le scénario de la vraie vie où un évènement métier peut techniquement arriver en retard. D’où l’ intérêt dans nos applications de streaming de travailler avec l’heure définie dans l’objet métier.

Lancement de la CLI ksqlDB:

docker-compose exec ksqldb-cli ksql http://ksqldb-server:8088

Définition de stream_userActivity:

CREATE STREAM stream_userActivity
(
eventId STRING,
eventDateTime STRING,
actorId STRING,
state STRING
)
WITH
(
KAFKA_TOPIC='userActivity',
VALUE_FORMAT='JSON',
TIMESTAMP='eventDateTime',
TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ss.SSSXXX'
);
Message
----------------
Stream created
----------------

Définition de mon agrégat table_leaverDetection:

CREATE TABLE table_leaverDetection
WITH (KAFKA_TOPIC = 'leaverDetection', VALUE_FORMAT='JSON')
AS SELECT
actorId,
TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd''T''HH:mm:ss.SSSXXX') as windowStartTime,
TIMESTAMPTOSTRING(WINDOWEND,'yyyy-MM-dd''T''HH:mm:ss.SSSXXX') as windowEndTime,
(WINDOWEND-WINDOWSTART)/1000 AS windowDurationSeconds,
LATEST_BY_OFFSET(state) AS lastState
FROM stream_userActivity
WINDOW SESSION (5 MINUTES)
GROUP BY actorId
HAVING COUNT_DISTINCT(state) = 1
EMIT CHANGES;
Message
----------------------------------------------------
Created query with ID CTAS_TABLE_LEAVERDETECTION_0
----------------------------------------------------

Remarquez bien le HAVING COUNT_DISTINCT(state) = 1 dans la requête, qui me permet d’exclure dans l’agrégat les cas où le client termine son parcours dans les 5 minutes. En effet, dans cette session de temps, ce client aura bien un COUNT_DISTINCT(state) = 2 (le start et le stop)

Je peux maintenant interroger ce stream dans la CLI pour voir le résultat:

Select * from table_leaverDetection EMIT CHANGES;-------+-----------------+-----------------+-----------+----------+
ACTORID|WINDOWSTARTTIME |WINDOWENDTIME |WINDOWDURA |LASTSTATE |
| | |TIONSECONDS| |
-------+-----------------+-----------------+-----------+----------+
yellow |2020-05-28T08:00:|2020-05-28T08:00:|0 |start |
|00.000Z |00.000Z | | |
orange |2020-05-28T08:05:|2020-05-28T08:07:|120 |start |
|00.000Z |00.000Z | | |
yellow |2020-05-28T08:09:|2020-05-28T08:09:|0 |stop |
|00.000Z |00.000Z | | |
Press CTRL-C to interrupt

Je retrouve donc bien mon acteur orange qui est un abandonniste, et le jaune qui est un faux positif dont je vois bien les deux window sessions.

Évidemment un faux-positif c’est toujours gênant, et on voudra l’éviter. Deux scénarios s’offrent à nous:

  • Solution du perfectionniste: On peut complexifier la topologie de stream actuelle, en calculant d’autres agrégats en utilisant par exemple d’autres techniques de windowing, et en refaisant des jointures assez complexes. Allez lire la documentation, c’est passionnant.
  • Solution du “le mieux est l’ennemi du bien”: On peut considérer qu’il suffit de mieux calibrer le Δmax, en acceptant éventuellement de perdre en réactivité sur la capacité à prendre une action suite à la détection. C’est un trade-off acceptable pour garder une solution simple.

Vous l’aurez compris, cet article touche à sa fin, et c’est le chemin pragmatique que j’emprunte en augmentant juste mon Δmax.

Voilà un exemple de requête ksqlDB simplifiée non persistée (aucun topic n’est créé pour garder la donnée) avec 30 minutes de window session:

SELECT
actorId,
TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd''T''HH:mm:ss.SSSXXX') as windowStartTime,
TIMESTAMPTOSTRING(WINDOWEND,'yyyy-MM-dd''T''HH:mm:ss.SSSXXX') as windowEndTime
FROM stream_userActivity
WINDOW SESSION (30 MINUTES)
GROUP BY actorId
HAVING COUNT_DISTINCT(state) = 1
EMIT CHANGES;
+---------+-------------------------+-------------------------+
|ACTORID |WINDOWSTARTTIME |WINDOWENDTIME |
+---------+-------------------------+-------------------------+
|orange |2020-05-28T08:05:00.000Z |2020-05-28T08:07:00.000Z |
Press CTRL-C to interrupt

Seul orange est encore abandonniste. Le résultat est cohérent.

Perspectives

Parlons sans ambages: nous avons à peine gratté la surface du sujet dans cet article.

Néanmoins, j’espère que cette introduction vous aura donné envie d’en découvrir d’avantage. C’est un domaine passionnant, renversant également puisque vous découvrirez par vous-même à quel point ce nouveau paradigme de développement bat en brèche toutes nos habitudes.

Et oui, vos jointures vous ne les ferez plus sur une table statique dans une BDD, mais sur de la donnée qui bouge tout le temps…

Je vous conseille évidemment le blog de Confluent et leur chaine Youtube, ce sont parmi mes sites de référence pour faire ma veille techno.

Et c’est important de le rappeler, ce sont de vrais sujets de la DSI AXA France, j’ai d’ailleurs eu l’occasion d’en parler dans un meetup cette année:

https://www.youtube.com/watch?v=u-qVBekTcR4

Et le meilleur moyen de passer du POC à la réalité, c’est quand même de venir faire un câlin à nos équipes recrutement. En respectant bien sûr la distanciation sociale ;-)

Julien, https://twitter.com/juguyard

Originally published at https://github.com.

Just-Tech-IT

Sharing knowledge and best practices for valuable tech…

Just-Tech-IT

Sharing knowledge and best practices for valuable tech workers. This blog is Powered by @AXAJobs_fr

Julien Guyard

Written by

No badass bio

Just-Tech-IT

Sharing knowledge and best practices for valuable tech workers. This blog is Powered by @AXAJobs_fr