Architecture Lambda, Cassandra et synchronisation des données

Les différents patterns permettant de garantir qu’une donnée stockée dans Cassandra et pouvant être mise à jour de façon concurrente par un flux batch et un flux temps réel ait toujours la valeur la plus fraîche.

Cet article a été co-écrit par Guillaume Hélouis et Sébastien Bergia.

Petit rappel : qu’est-ce qu’une architecture Lambda ?

Dans l’un de ses posts, Christophe Parageaud explique :

Une architecture Lambda permet de stocker et de traiter de larges volumes de données (batch) tout en intégrant dans les résultats les données les plus récentes.
Une architecture Lambda est composée de trois couches: :

  • Couche batch (Batch Layer) :
    • Stockage de l’ensemble des données.
    • Traitements massifs et réguliers afin de produire des vues consultables par les utilisateurs.
    • La fréquence des traitements ne doit pas être trop importante afin de minimiser les tâches de fusion des résultats et de constituer les vues.
  • Couche temps réel (Speed Layer) :
    • Ne traite que les données récentes (flux).
    • Calcul des vues incrémentales qui vont compléter les vues batch afin de fournir des résultats plus récents.
    • Suppression des vues temps réel obsolètes (postérieures à un traitement batch)
  • Couche de service (Serving Layer) :
    • Permet de stocker et d’exposer aux clients les vues créées par les couches batch et temps réel.
    • Aussi capable de calculer dynamiquement ces vues.
    • N’importe quelle base NoSQL peut convenir.

Architecture Lambda

Source : Big Data : Panorama des solutions 2016

De nombreuses technologies pourraient être utilisées dans la mise en œuvre de cette architecture. Une implémentation concrète serait par exemple :

  • Hadoop et Spark pour les traitements batch ;
  • Flink pour les traitements temps réel ;
  • Cassandra pour la couche de service.

Dans le cadre de cet article nous allons considérer l’utilisation de Cassandra pour la couche de service.

La problématique

Supposons qu’une même donnée puisse être envoyée dans une base Cassandra à la fois via un batch nocturne quotidien et au fil de l’eau (quasi temps réel).

Comment garantir que l’information qui sera restituée par la couche de service sera bien la plus fraîche ? Comment gérer le fait que durant l’exécution d’un batch l’une des données qu’il contient (le numéro de téléphone d’un client par exemple) soit modifiée et transmise en streaming ? Comment garantir que la donnée qui sera restituée par la couche de service sera bien la dernière saisie et envoyée par le canal rapide et non celle du batch ?

Autrement dit, dans une architecture lambda, une même donnée peut provenir de deux flux différents. Ce qui peut engendrer une collision. Tout le challenge est d’effectuer une réconciliation sans perte ou désynchronisation, afin que le client puisse toujours récupérer les données les plus récentes.

Dans ce qui suit, nous vous proposons une série de solutions possibles agrémentées de leurs avantages et inconvénients respectifs.

Les solutions possibles

Ne rien faire et attendre une cohérence à terme

Le batch étant exécuté quotidiennement, à un moment donné la dernière valeur saisie par l’utilisateur sera bien celle remontée par le flux batch.

  • Avantage : il n’y a rien à faire justement :)
  • Inconvénient : il saute aux yeux puisqu’il y aura un laps de temps non négligeable durant lequel l’utilisateur ne visualisera pas la bonne information.

Suspendre la consommation du flux temps réel durant l’exécution du batch

Les batchs s’exécutent généralement à l’initiative d’un ordonnanceur. Parmi les tâches orchestrées par ce dernier, il est tout à fait envisageable, juste avant de lui faire déclencher un batch, de lui faire stopper temporairement le processus consommant les données du flux temps réel pour le lui faire reprendre à l’issue du traitement batch.

Bien entendu, dans ce cas de figure, seule la consommation des données temps réel est mise en pause, pas leur production : les demandes de modification continueront à être prises en compte, mais elles s’accumuleront en vue d’être traitées (dans l’ordre) dès la reprise du processus de consommation.

  • Avantage : simplicité de mise en oeuvre et indépendance vis-à-vis de la base de données (adaptée aussi bien à Cassandra qu’à toute autre base).
  • Inconvénient : l’utilisateur ne visualisera pas sa dernière modification avant la fin du traitement batch (et de celui des données accumulées via le flux temps réel durant son exécution) ; risque pour une raison ou une autre de ne pas pouvoir redémarrer le consommateur (par exemple, au hasard, si le consommateur en question charge des informations en mémoire au démarrage et que pour une raison ou une autre certains de ces paramètres ont été (mal) modifiés depuis le dernier redémarrage).

Utiliser des transactions légères

On pourrait imaginer la manipulation d’un champ “date de dernière modification” qui serait valorisé avec une date fonctionnelle, transmise aussi bien dans le cas du streaming que du batch, indiquant la date à partir de laquelle la donnée est valide.

Avant chaque écriture en base, on commencerait par lire cette valeur puis, si la date de validité est plus récente que celle en base de données on effectuerait la modification.

Le problème de cette technique est qu’il n’est pas exclu qu’entre le moment où on lit et on compare les dates et celui où on effectue l’écriture en base pour le mode batch ou streaming une autre écriture soit effectuée dans l’autre mode.

Dans le cadre de l’utilisation de bases de données SQL on gère habituellement ce genre de cas de figure de façon pessimiste ou optimiste.

Dans le cas d’une gestion pessimiste d’accès concurrentiels les lignes à modifier sont verrouillées jusqu’à la fin de l’écriture et l’ouverture du verrou, rendant impossible une écriture parallèle durant ce laps de temps. On privilégie ce type de verrouillage lorsque sa durée est extrêmement courte.

Dans le cas d’une gestion optimiste aucun verrou n’est appliqué. C’est alors aux services appelant de gérer la problématique, en vérifiant que l’enregistrement à modifier ne l’a pas déjà été depuis la dernière lecture. La détection des violations d’accès simultanés peut se faire par exemple via un horodatage des lignes des tables de bases de données ou via la comparaison de la ligne lue initialement avec la ligne réellement en base de données.

Pour répondre à cette problématique d’accès concurrents Cassandra propose un mécanisme de transactions légères (ou LWT, Lightweight Transactions), fonctionnant sur le même principe que l’optimist lock. Ces transactions légères permettent d’effectuer des modifications conditionnées à la valeur d’un champ (un timestamp ou un numéro de version par exemple) récupéré lors de la lecture : si au moment de l’écriture la valeur du champ témoin est la même que celle lue initialement la modification est acceptée, si la valeur est différente une exception est lancée.

Grâce aux LWT on peut imaginer une requête qui insère une donnée seulement si sa date de mise à jour est plus récente que celle présente dans la base. Ce qui donnerait concrètement quelque chose comme :

UPDATE person
SET name = ‘Bob’, update_time = ’1527163894’
WHERE id = ‘1234’
IF update_time < ‘1527163894’;
  • Avantage : simplicité de mise en oeuvre basée sur un mécanisme standard de Cassandra.
  • Inconvénient : induit un coût non négligeable en termes de performance puisque quatre requêtes sont en fait effectuées (l’algorithme de consensus utilisé est Paxos), ce qui peut être tout à fait acceptable dans le cas d’un flux quasi temps réel (avec possibilité de résolution d’un cas d’erreur par l’utilisateur en ligne qui validerait la donnée à conserver lors d’un éventuel conflit), mais beaucoup plus impactant dans le cas d’une alimentation batch puisque la durée de ces quatre opérations se cumulerait sur l’ensemble des enregistrements à traiter. Dans une topologie multi-datacenters, les LWT requérant nécessairement une niveau de consistance QUORUM (une majorité de réplicas doivent acquitter la bonne écriture de la donnée) la lenteur de traitement se ferait d’autant plus ressentir.

Stocker séparément les données envoyées en streaming et celles envoyées par batch

L’idée est de séparer les données de streaming et de batch au niveau du stockage, en utilisant par exemple deux tables séparées ou mieux encore un champ d’une unique table constituant une clé de clustering. Celui-ci pourrait alors être valorisé à ‘BATCH’ ou ‘STREAMING’ selon le canal par lequel la donnée arrive (on aurait donc au maximum deux lignes pour une même donnée). Chaque flux peut dans ce cas mettre à jour ses données sans risque de collision.

Charge alors au service appelant de déterminer lequel des deux enregistrements retourner en comparant les dates de modification associées (et non les dates d’écriture en base).

  • Avantage : totale transparence au niveau de l’alimentation de la base de données, quel que soit le mode.
  • Inconvénient : augmentation potentiellement significative du volume de stockage (au pire x2), et requiert l’implémentation d’une logique (triviale) côté client.

Forcer le timestamp des enregistrements

Chaque valeur de n’importe quelle table dans Cassandra possède un horodatage. Par défaut, cet horodatage est valorisé avec la date de dernière modification (renseignée automatiquement par le driver Cassandra par exemple). Par exemple :

INSERT INTO my_table(c1, c2) values (1, 1);
SELECT c1, c2, WRITETIME (c1), WRITETIME (c2) FROM my_table;

c1 | c2 | writetime(c1) | writetime(c2)
1  | 1  | 1235846791    | 1235846791

UPDATE my_table SET c1 = 2;
SELECT c1, c2, WRITETIME (c1), WRITETIME (c2) FROM my_table;

c1 | c2 | writetime(c1) | writetime(c2)
2  | 1  | 1315647123    | 1235846791

Il est cependant possible de renseigner explicitement la valeur de ce champ avec la syntaxe suivante :

INSERT INTO my_table(c1, c2) values (1, 1);
UPDATE my_table SET c1 = 2 USING TIMESTAMP 1527710651;
SELECT c1, c2, WRITETIME (c1), WRITETIME (c2) FROM my_table;

c1 | c2 | writetime(c1) | writetime(c2)
2  | 1  | 1527710651    | 1235846791

Dans tous les cas de figure, quel que soit l’ordre dans lequel les opérations d’écriture sont effectuées, Cassandra conserve toujours uniquement les données dont la valeur d’horodatage est la plus récente.

L’idée serait donc que chaque source émettrice de données transmette, que ce soit en batch ou en streaming, la date “fonctionnelle” de modification d’une donnée (l’heure exacte à laquelle la donnée a effectivement été modifiée par un utilisateur dans une application amont) et que cette date “fonctionnelle” soit utilisée comme valeur d’horodatage en base : quel que soit le mode d’alimentation, quel que soit l’ordre dans lequel les données sont effectivement écrites en base (par exemple si un batch quotidien rencontre un problème et s’arrête et que de nouvelles mises à jour temps réel sont réalisées avant que le batch en question ne soit rejoué), et même en cas d’écritures “simultanées”, Cassandra conservera uniquement la donnée ayant l’horodatage le plus récent. À noter qu'en cas de conflit entre deux timestamps (identiques jusqu'à la microseconde), Cassandra effectuera une réconciliation champ par champ (la plus grande valeur l'emportant).

  • Avantage : transparent, utilisation d’un mécanisme de base de Cassandra.
  • Inconvénient : tout repose sur la date “fonctionnelle”, il faut donc garantir que les systèmes amonts la génèrent et la transmettent correctement.

Conclusion

À travers cet article, nous vous avons proposé différentes façons de gérer l’écriture concurrente dans une base Cassandra utilisée comme base de données de la couche service d’une architecture lambda. Comme toujours il vous reviendra d’apprécier la solution la plus adaptée à vos besoins et contraintes, notamment par rapport à des critères de performance et de fraîcheur de donnée attendue.

Références