Confluent a annoncé fin Novembre 2019, l’ajout de nouvelles features à leur moteur SQL de streaming KSQL (contraction de Kafka et SQL). Deux grosses évolutions de leur framework qui les a poussés à changer son petit nom : bienvenue à ksqlDB !
- Kesako ?
- Quelles sont ces évolutions qui entraînent tout ce remue-ménage ?
- Est-ce que Confluent vient d’inventer une base de données temps-réel ?
Nous allons essayer de répondre à toutes ces questions dans cet article. Une chose est sûre, les équipes de Confluent cherchent à simplifier les architectures temps réel !
Petit rappel sur KSQL
On ne peut pas parler de ksqlDB directement sans remettre un peu de contexte et expliquer rapidement ce qu’est KSQL, le désormais aïeul de ksqlDB (ne l’enterrons pas, il existe encore dans les anciennes versions de Confluent).
KSQL est le moteur SQL de streaming permettant de manipuler des données temps réel sur Apache Kafka. Il offre une interface SQL-like simple et puissante pour le stream processing sans besoin d’utiliser un langage de programmation (tel que Python ou Java …). Il simplifie et démocratise l’écriture de flux et de manipulation des données dans Kafka en adoptant une syntaxe SQL. Notons tout de même que KSQL utilise un langage inspiré de ANSI SQL mais n’est pas compatible (par exemple, KSQL offre la possibilité de faire du fenêtrage que n’offre pas ANSI SQL).
Pour schématiser succinctement KSQL et sa place dans l’écosystème Kafka, voilà ce que cela donne :
KSQL permet d’abstraire l’utilisation de Kafka via REST API en profitant de la puissance et la simplicité de SQL. Il est disponible au travers d’une console (CLI). A l’instar de Kafka Streams sur lequel il se base, KSQL utilise la puissance des outils mis à disposition de Kafka.
Les principaux composants de KSQL sont les suivants :
- Engine : il traite les instructions et les requêtes SQL,
- REST : il permet au client d’accéder au moteur de traitement,
- CLI : console fournissant une interface de ligne de commande au moteur.
Les fonctionnalités qu’offre KSQL (avant ksqlDB) sont les suivantes :
- Agrégation (Supporte les User Defined Functions),
- Jointure,
- Requêtes continues (push queries),
- Windowing.
KSQL et les connecteurs
Une des premières annonces faites sur cette nouvelle feature est l’intégration des Kafka-Connectors dans KSQL.
Dans les faits
Dans beaucoup de cas, les données que l’on veut traiter ne sont pas encore dans Kafka mais résident en dehors, dans des RDBMS par exemple. C’est là qu’intervient Kafka Connect qui, via des connecteurs créés par la communauté ou spécifiques à notre cas d’utilisation, permet d’ingérer et d’extraire la donnée de Kafka.
Quand auparavant nous étions obligés de maintenir un/plusieurs Kafka Connect & un KSQL pour pouvoir ingérer/extraire notre donnée et la manipuler, aujourd’hui ksqlDB intègre cette fonctionnalité nativement.
Cette feature apporte de la simplicité et de la clarté dans les architectures temps-réel éligibles à l’utilisation de KSQL car là où nous avions besoin d’une brique Kafka particulière pour chaque couche d’un ELT (Extract, Load, Transform), ksqlDB les intègre nativement.
Une architecture simplifiée devient donc comme suit :
Dans le code
Analysons comment se connecter à une source externe depuis ksqlDB. Prenons l’exemple d’une connexion à une base PostgreSQL.
CREATE SOURCE CONNECTOR jdbc_source WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector',
'connection.url' = 'jdbc:postgresql://localhost:5432/postgres',
'connection.user' = 'user',
'topic.prefix' = 'jdbc_',
'table.whitelist' = 'table1',
'mode' = 'incrementing',
'numeric.mapping' = 'best_fit',
'incrementing.column.name' = 'id',
'key' = 'id');
Le code ci-dessus va me permettre de créer un connecteur jdbc_source de type SOURCE utilisant la classe Java JdbcSourceConnector développée par la communauté. Ce connecteur va récupérer en temps réel tous changements de ma table 'table1' de mon instance PostgreSQL 'postgres' et va pousser ces changements dans un topic 'jdbc_table1'. La clé de mon message Kafka sera la valeur de ma colonne 'id'.
Tous les connecteurs ont une liste de paramètres différents à renseigner. De manière générique, pour créer un connecteur il suffit d’utiliser la commande suivante :
CREATE SOURCE | SINK CONNECTOR connector_name WITH( property_name = expression [, ...]);
Requêtes de type ‘pull’
Dans les faits
Jusque là, KSQL n’apportait que la possibilité de faire des requêtes de type ‘push’ sur les vues matérialisées. Le résultat de la requête était un flux continu de données agrégées ou transformées par la requête SQL. Il s’agit d’une requête ‘push’ car la requête pousse le message modifié par ma requête, en temps-réel, sur chaque changement de ma source. Via l’API REST de ksqlDB, une requête de type ‘push’ va renvoyer une réponse de longueur indéterminée.
KSQL n’offrait pas la possibilité d’avoir une vue de ma donnée à un instant T. C’est là qu’interviennent les requêtes de type ‘pull’. Une requête de type ‘pull’ récupère l’état courant d’une vue matérialisée. Via l’API REST de ksqlDB, une requête de type ‘pull’ renvoie une seule réponse.
Dans le code
Prenons l’exemple de transactions bancaires :
Je suis client d’une banque avec 2 cartes bleues en ma possession associées à deux comptes bancaires différents. Je veux avoir une application mobile qui me permet, en temps réel, d’avoir l’argent disponible sur mes comptes mais aussi la visibilité sur toutes mes transactions. On va voir comment faire en utilisant uniquement ksqlDB.
1/ Créons un STREAM de transactions bancaires :
CREATE STREAM transactions (compteId VARCHAR, transaction DOUBLE) WITH (kafka_topic='transactions', key=’compteId’, value_format='json', partitions=1);
‘compteId’ est l’identifiant de notre compte en banque. Transaction est la transaction bancaire effectuée sur le compte.
2/ Créons une TABLE représentant la vue matérialisée de nos comptes bancaires :
CREATE TABLE comptes AS SELECT compteId, SUM(txn) FROM transactions GROUP BY comptetId;
‘comptes’ est une vue matérialisée représentant l’état courant de nos comptes en banque.
3/ Une requête ‘push’ sur le stream ‘transactions’ nous permettra de voir toutes les transactions sur tous les comptes en temps réel :
SELECT * FROM transactions EMIT CHANGES;
A chaque transaction, la requête renvoie l’enregistrement correspondant.
4/ Une requête ‘push’ sur notre table ‘comptes’ nous permettra de voir l’état de tous nos comptes à chaque transaction :
SELECT * FROM comptes EMIT CHANGES;
A chaque transaction, la requête renvoie l’état de nos différents comptes en banque.
5/ Une requête ‘push’ sur notre table ‘comptes’ filtrée sur notre compte ‘compte_courant’ nous permet d’avoir l’état de notre compte ‘compte_courant’ à chaque transaction :
SELECT * FROM comptes WHERE compteId=’compte_courant’ EMIT CHANGES;
A chaque transaction, la requête renvoie l’état du ‘compte_courant’.
6/ Une requête de type ‘pull’ sur ma vue matérialisée, elle, nous permettra d’avoir l’état au moment de la requête du ‘compte_courant’ :
SELECT * FROM comptes WHERE ROWKEY=’compte_courant’;
A chaque exécution de la requête, UNE réponse est retournée.
Les deux points principaux que nous pouvons retirer de ces observations sont :
- Par défaut, une requête est considérée comme ‘pull’. Il faudra ajouter ‘EMIT CHANGES’ pour la transformer en ‘push .
- Les requêtes de type ‘pull’ requièrent une clause WHERE sur une ROWKEY spécifique afin d’assurer un excellent temps de réponse. Cela limite la façon de requêter les données en ‘point-in-time’. On ne pourra pas en une seule requête, dans notre exemple, avoir un état de tous nos comptes en banque comme ‘SELECT * FROM comptes’ ferait sur une base de données classique (autant de résultats que de comptes en banques).
Nous pouvons toutefois citer deux limitations :
- Asynchronisme : Comme la création de vue matérialisée est asynchrone, il n’y a pas de cohérence forte entre une donnée qui vient d’être écrite dans le système et sa disponibilité dans ma vue matérialisée.
- Performances : Tant que la donnée est assez petite pour rester en mémoire, la donnée sera rapidement accessible. Si jamais la donnée nécessite d’être persistée sur disque, alors la latence sera celle de RocksDB couplée aux contraintes de l’I/O.
L’architecture de mon application, tout en profitant de ces 2 nouvelles features, peut donc être simplifiée à son maximum en utilisant uniquement la puissance offerte par ksqlDB de la manière suivante :
Pourquoi base de données ?
Au final, ksqlDB, cette version évoluée de KSQL permet de stocker et de manipuler des tables de données ; ne serait-ce pas une base de données ?
Grâce à ksqlDB, Confluent cherche à offrir aux applications temps-réel la même simplicité qu’offre une base de données à une application CRUD mais n’a pas vocation de remplacer une base de données dont c’est le coeur de métier !
On peut quand même remettre en question ce choix vu qu’il amène une complexité sur la compréhension de ce qu’est Kafka. Le raccourci facile de se dire que c’est aussi une base de donnée peut amener à choisir ksqlDB pour de mauvaises raisons. Nous n’avons pas fini de voir des architectures avec de la rétention infinie sur tous les topics parce que cela s’appelle ksqlDB et avoir des clusters Kafka installés sur des machines avec des dizaines de téraoctets de stockage. Pour y remédier, nous aurons des initiatives comme cet article afin de lever l’alerte et nous attendons de la transparence et un peu plus de communication de la part Confluent. Attention aux quiproquos.
Conclusion
Mon avis sur ksqlDB, au cas où vous ne l’auriez pas compris … Je suis fan !
D’une part parce que les architectures de traitement de données temps réel, qui tendaient à devenir très complexes pour des cas d’utilisation assez simples dans beaucoup de cas (gestion de courses, notations de films, notation de point de restaurations, …), peuvent maintenant se simplifier drastiquement et d’autre part parce que Confluent ajoute une nouvelle corde à son arc : une gestion des requêtes interactives très simple. Ça commence à faire beaucoup de cordes pour un seul arc !
Seule ombre au tableau, la peur de voir se multiplier de mauvais design patterns suite à un manque de transparence, de communication et de choix de nommage ?