Kafka : Migrer un consommateur vers Streams et Connect

Il n’est plus nécessaire de présenter Kafka de nos jours, cette technologie de stream processing connaît une popularité de plus en plus grande au fil des derniers mois, sa grande communauté, sa constante évolution et sa simplicité d'utilisation en font une référence dans la pléthore de technologies big data sur le marché, ci dessous se trouve un graphique montrant le pourcentage d'apparition des technologies de stream processing "connues" dans les questions de StackOverflow

Malheureusement, la popularité d’un projet open-source n’est pas proportionnelle à sa bonne utilisation, il arrive fréquemment qu’un projet utilisant du Kafka acquiert une dette technique dès le premier commit à cause d’une mauvaise architecture et d’une mauvaise identification du use-case métier.

Ceci entraîne sur le long terme un projet difficile à maintenir, difficile à faire évoluer, des microservices ressemblant à des monolithes distribués entraînant des développeurs frustrés d’y toucher car ils considèrent le projet comme une espèce de zone radioactive …

Je vais donc expliquer dans cet article, au moyen d’un use case simple, comment passer d’un consommateur Kafka “monolithique”, à des microservices orientés sur les technologies Kafka Streams et Kafka Connect.

1 - Use case :

L’application décrite dans cet article est une API permettant d’envoyer des e-mails et des SMS.

Pour chaque message envoyé, l’API doit être capable d’identifier si le message est d’abord passé par l’api (événement REQUEST), qu’il a été remis au destinataire (événement SENT), il doit aussi être possible de déterminer si le destinataire a ouvert son message et de détecter s’il a cliqué sur les liens présents dans le message.

De plus, l’API doit savoir si le message n’a pas pu être remis au destinataire et d’en connaître la raison.

Pour ce faire, une architecture construite autour de Kafka a été mise en place.

Chaque message envoyé dans l’API, et ensuite envoyé dans Kafka pour être consommé et envoyé vers un serveur SMTP si c’est un email ou SMPP si c’est un SMS.

Après le retour du serveur, le message et tous ses événements sont sauvegardés dans Couchbase, puis chaque événement associé au message est envoyé dans un topic de retour.

{
	"messageType" :  "EMAIL"
	"id" : "xxx-yyy-zzz"
	"events" : [
		{
			"eventType" : "REQUEST" // Le message à été pris en compte par l'api
			"date" : "2019-01-01 00:00:00"
		},
		{
			"eventType" : "SENT" // Le message à été envoyé au destinataire
			"date" : "2019-01-01 00:00:01"
		}
	]
}

Exemple de message envoyé et remis au destinataire.

Par la suite, nous allons nous pencher sur la partie d’envoi de message, nous ne discuterons pas du tracking de ces messages.

2 - Premiers développements

L’API a été développée à l’origine avec des Producers et des Consumers Kafka, au bout de quelques mois de développement, celle-ci a été mise en production avec les éléments d’architecture suivants :

Cette architecture bien que naïve, présente une grande simplicité.

Le consommateur se charge de distinguer les messages entre e-mail et SMS, puis les envoie au serveur associé au type de message. Si le message n’a pas été envoyé alors celui-ci est immédiatement sauvegardé dans Couchbase avec l’erreur associée, ensuite tous les événements associés au message sont envoyés dans le topic retour.

Cependant, des problèmes sont rapidement survenus après la première mise en production :

  • En cas de problème dans Couchbase, impossible de sauvegarder le message, celui-ci est perdu dans la file Kafka
  • En cas de problème d’envoi, impossible de pouvoir rejouer le message automatiquement
  • À cause des appels synchrones, lorsque le serveur SMTP a des latences d’envoi, cela impacte les envois des SMS et inversement
  • La maintenance de code est difficile car les code pour envoyer un e-mail et un SMS est différent, mais tout est mutualisé dans une seule classe
  • Les événements sont sauvegardés uniquement à la fin du traitement, ce qui engendre un risque de perte de message au milieu de l’application

C’est ainsi que le travail vers l’architecture V2 commença. Par soucis de simplicité, je vais vous présenter les évolutions étape par étape pour ensuite arriver à l’architecture finale.

Étape 1 : Séparation email / sms

Il s’agit ici de la première étape, la plus évidente, séparer chaque type de message en leur créant un topic dédié. Ceci permet de ne plus impacter les envois des emails lorsque le serveur SMPP a de la latence, mais aussi de simplifier le code en séparant toutes les classes traitant les SMS et les e-mails dans des classes spécifiques.

Afin que le client puisse s’adapter au changement d’appel pour l’api, l’ancien microservice d’exposition était encore disponible temporairement.

Étape 2 : Simplification des consommateurs

Maintenant que la séparation est faite, on peut encore simplifier le code du consommateur.

Pour rappel, chaque fois que le message à été envoyé par le consommateur, celui-ci sauvegarde tous les événements associés à l’id du message dans Couchbase, puis dans le topic de retour.

Afin de simplifier notre code, il suffit donc de séparer l’envoi des événements au topic retour dans une instance Kafka Streams, puis de déplacer la partie sauvegarde dans Couchbase en la remplaçant par du Kafka Connect.

Afin de pouvoir mettre en place du Kafka Streams et du Kafka Connect, nous devons implémenter un nouveau topic en sortie des consommateurs d’envoi sur lesquels nos nouveaux composants pourront se brancher, nous allons donc créer un topic par type de message et les appeler SendMailOk et SendSmsOk

Par la suite, afin de simplifier le schéma d’architecture, je vais parler uniquement de la partie d’envoi d’email.

Étape 3 : Gérer les erreurs et le rejeu automatique des messages en erreur

Afin de pouvoir gérer les messages en échec nous avons dû réfléchir à un moyen de renvoyer les messages automatiquement tout en sauvegardant les messages dans un endroit de secours en attendant que Couchbase se rétablisse.

Pour cela, nous avons décidé d’envoyer tous les messages en échec dans un topic dédié afin de pouvoir les rejouer instantanément. Nous avons donc créé un topic SendMailRetry.

Pour traiter les messages de ce topic, nous avons démarré une nouvelle instance du consommateur d’envoi déjà implémenté que l’on a baptisé comme son topic parent : SendMailRetry.

Nous avons en plus ajouté une nouvelle fonctionnalité permettant au consommateur de récupérer les informations du topic toutes les minutes afin de ne pas rejouer les messages en erreur instantanément. Cela laisse le temps au serveur d’envoi (SMTP ou SMPP) de se remettre en place pour augmenter les chances de réussite d’envoi du message.

Dans le cas où le deuxième envoi ne fonctionne pas, nous renvoyons le message dans le topic SendMailRetry avec un nouvel événement “RETRY”, si le troisième envoi ne fonctionne pas, nous ajoutons un nouvel événement puis nous renvoyons le message dans le topic et ainsi de suite ...

{
    "messageType" :  "EMAIL"
    "id" : "xxx-yyy-zzz"
    "events" : [
        {
            "eventType" : "REQUEST" // Le message à été pris en compte par l'api
            "date" : "2019-01-01 00:00:00"
        },
        {
            "eventType" : "RETRY" // Le message n’a pas été remis, on rejoue
            "date" : "2019-01-01 00:00:01"
        },
        {
            "eventType" : "RETRY" // Le message n’a pas été remis, on rejoue
            "date" : "2019-01-01 00:00:01"
        },
        {
            "eventType" : "SENT" // Le message à été remis au bout de 3 tentatives
            "date" : "2019-01-01 00:00:01"
        }
    ]
}

Exemple de message lors de rejeu

Si au bout de la cinquième fois le serveur d’envoi ne répond toujours pas, alors nous considérons que le message est définitivement perdu. On ajoute un événement “DEAD” au message, puis nous le mettons dans un topic “ Dead letter queue” appellée SendMailDlq.

{
    "messageType" :  "EMAIL"
    "id" : "xxx-yyy-zzz"
    "events" : [
        {
            "eventType" : "REQUEST" // Le message à été pris en compte par l'api
            "date" : "2019-01-01 00:00:00"
        },
        {
            "eventType" : "RETRY" // Le message n’a pas été remis, on rejoue
            "date" : "2019-01-01 00:00:01"
        },
        {
            "eventType" : "RETRY" // Le message n’a pas été remis, on rejoue 2 fois
            "date" : "2019-01-01 00:00:01"
        },
    …….
    …….
        {
            "eventType" : "RETRY" // Le message n’a pas été remis, on rejoue 5 fois
            "date" : "2019-01-01 00:00:01"
        },
        {
            "eventType" : "DEAD" //Le message n’est pas envoyé il termine dans la DLQ
            "date" : "2019-01-01 00:00:01"
        }
    ]
}

Exemple de message non envoyé

Voici maintenant à quoi ressemble notre architecture en appliquant ces évolutions :

Étape 4 : La sauvegarde des événements au fil de l’eau

Avant de valider la dernière architecture, il restait encore un dernier problème à adresser : Notre projet sauvegarde tous les événements lors de l’arrivée dans le topic SendMailOk ou SendMailDlq, mais comment gérer la sauvegarde de tous les événements dans la base et dans le topic de retour sans avoir à attendre que tous les événements arrivent jusqu’au bout ?

Pour cela, il faut se pencher sur la vie d’un message dans notre application. Prenons l’exemple d’un e-mail, à chaque fois qu’un message s’apprête à passer dans un service, nous lui ajoutons un événement, puis nous l’envoyons dans le topic associé à cet événement.

Ainsi, chaque message passant par un topic aura son dernier événement associé à celui-ci.

Étant donné que notre topic Retour contient un message par type d'événement, il nous suffit de brancher l’instance Streams existante sur nos nouveaux topics puis de regarder à chaque fois le dernier événement pour chaque message et de l’envoyer dans le topic Retour.

Pour la sauvegarde dans la base de donnée, nous faisons une écriture du message dans Couchbase en utilisant l’id comme clé. Le client Couchbase va se charger de faire un upsert afin de ne pas faire de prélecture et gagner en performances.

Voici donc le schéma final pour la partie e-mail :

Dans ce schéma, chaque case Kafka Connect et Streams représente une seule instance du même service.

Étape 5 - Future work : Gérer les messages en erreur dans les instances Streams et Connect.

Et comment faire dans le cas où notre instance Streams rencontre un mauvais format ?

Même si nous ne sommes pas censés récupérer des messages au mauvais format, nous ne sommes pas à l’abri qu’à cause d’un changement de format de message non rétro compatible suite à une mauvaise mise en prod notre instance Streams n’arrive plus à désérialiser un message et se mette en erreur.

Pour régler ce problème Il existe le paramètre default.deserialization.exception.handler  qui permet de définir la stratégie à adopter en cas d’erreur de désérialisation. Par défaut celui-ci est mis à FAIL. C’est à dire que l’instance va se stopper lorsqu’elle ne va pas réussir à désérialiser un message, entraînant ainsi une rupture de service.

Pour éviter ça, nous avons mis ce paramètre à CONTINUE car notre use-case nous permet d’avoir une perte de message lorsque celui-ci à déjà été envoyé par le SMPP/SMTP.

Au niveau des erreurs dans Kafka Streams, il existe deux paramètres permettant de dire à l’instance d’envoyer le message dans une DLQ en cas d’erreur :

errors.tolerance
errors.deadletterqueue.topic.name

Ainsi, en cas d'échec de l’envoi dans Couchbase d’un message, celui-ci est identifié et sauvegardé dans une file Kafka dédiée afin d’être rejoué par une autre instance Kafka Streams lancée à la demande.

Avantages et retour d’expérience :

Après plusieurs mois d’utilisations de cette architecture, voici les avantages que nous avons rencontrés :

  • Maintenance aisée :

Nous avons remarqué une maintenance du code bien plus simple, chaque modification pour un type message est facilement réalisable en modifiant le consommateur Kafka d’envoi associé.

  • Configurabilité :

Cette architecture nous permet de gérer le multi-client ou la priorisation de messages en paramétrant chaque microservice vers un Topic dédié. Tout se fait via un fichier de configuration.

  • Scale on demand :

Nous avons aussi pu vérifier la scalabilité horizontale de nos microservices lors des périodes de fortes activités en augmentant le nombre d’instances de consommateurs d’envoi.

  • Monitoring détaillé :

Cette architecture permet aussi d’avoir un monitoring bien plus détaillé de notre API, nous sommes capables d’analyser finement les logs de chaque microservice afin de déterminer si celui-ci est en bonne santé. Kafka Connect est monitoré grâce à son API REST et Kafka Streams par l’envoi des logs dans un ELK.

Cependant, cette architecture demande une bonne connaissance de Kafka et de son  écosystème car elle nécessite un cluster en bonne santé et bien maintenu.

De même, pendant toute notre expérience, nous n’avons fait qu'enlever du code à notre consommateur d’envoi principal, un travail à faire serait de convertir ce consommateur en Kafka Streams.