Delta Lake & CDC : Walkthrough

L’opération merge de l’API Delta Lake, qui combine les opérations INSERT, UPDATE et DELETE, permet de réduire sensiblement votre effort cognitif lorsque vous avez besoin de développer un pipeline d’alimentation d’une table Delta Lake par CDC (Change Data Capture). Néanmoins, tout n’est pas aussi simple que la documentation le laisse penser.

Le but de cet article est de vous montrer à travers des exemples de code (que vous pouvez très facilement exécuter sur votre poste), les difficultés auxquelles vous allez être confrontés en implémentant ce genre de pipeline et vous donner des solutions éprouvées pour les surmonter.

Setup

La seule chose dont vous avez besoin pour exécuter le code des exemples est une installation de Spark 3.0 sur votre poste.

Dans cette démonstration, on va utiliser spark-shell, l'interpréteur de commandes Spark en Scala. Les résultats de nos opérations, c'est-à-dire le contenu de la table Delta Lake, seront affichés à l'écran. Le style du code a été adapté pour spark-shell qui l'exécute à la volée, ligne par ligne.

Bien évidemment, pour le code destiné à la production, la logique implémentée devra être validée par des tests automatisés. Pour les projets Spark en Scala, je vous conseille vivement d’utiliser la librairie ScalaTest.

Après avoir téléchargé et décompressé Spark 3.0, vous pouvez lancer spark-shell avec la commande suivante :

./bin/spark-shell \
--packages io.delta:delta-core_2.12:0.7.0 \
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog

Comme vous pouvez le voir, on inclut la librairie Delta Lake et on définit deux paramètres de configuration. Ces deux paramètres ne sont pas indispensables pour l’utilisation de Delta Lake mais permettent d’utiliser plus de fonctionnalités de l’API, notamment la possibilité d’exécuter la commande delete sur la table.

Commençons par ajouter les imports nécessaires et créer quelques fonctions utilitaires que nous allons utiliser dans les exemples :

import io.delta.tables.DeltaTable
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._

val tablePath = "/tmp/delta/my_table"

val initEmptyTable = (schema: StructType) => {
  spark
    .createDataFrame(spark.sparkContext.emptyRDD[Row], schema)
    .write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .save(tablePath)
}

val deleteTable = () => DeltaTable.forPath(tablePath).delete

val showTable = () => DeltaTable.forPath(tablePath).toDF.show

Pour vérifier nos implémentations, utilisons la méthode test qui :

  • supprime le contenu de la table,
  • appelle la fonction merge sur chaque batch en leur appliquant la fonction transform préalablement,
  • affiche le contenu de la table.
def test(merge: DataFrame => Unit, transform: DataFrame => DataFrame = x => x)(batches: DataFrame*): Unit = {
  deleteTable()
  batches.foreach{events => merge(transform(events))}
  showTable()
}

Les enregistrements qui représentent les modifications CDC seront dénommés events.

Pour nos exemples, prenons des events composés de quatre colonnes :

  • time - date et temps où la modification s’est produite,
  • type - type de modification (INSERT, UPDATE ou DELETE),
  • id - identifiant unique de la donnée modifiée (et surtout pas celui de l’event),
  • value - une valeur métier.

Pour la simplicité, la colonne time est de type Int et initialisée à zéro. Dans un cas réel, elle serait plutôt de type Timestamp.

Créons cinq events regroupés dans trois batches :

val eventColumns = List("time", "type", "id", "value")

val events1 = List(
  (0, "INSERT", "A", "inserted")
).toDF(eventColumns: _*)

val events2 = List(
  (1, "UPDATE", "A", "updated"),
  (2, "INSERT", "B", "inserted"),
).toDF(eventColumns: _*)

val events3 = List(
  (3, "UPDATE", "A", "updated 2nd time"),
  (4, "DELETE", "B", null)
).toDF(eventColumns: _*)

Ces events nous permettront d’alimenter la table Delta Lake avec trois colonnes : id, value et updated_at. Voici son schéma :

val tableSchema = StructType.fromDDL("id STRING, value STRING, updated_at INT")

Approche naïve

Implémentons la première version de notre fonction merge en tenant compte des trois types d’events :

val mergeV1 = (events: DataFrame) => {
  DeltaTable
    .forPath(tablePath)
    .as("table")
    .merge(events.as("event"), "table.id = event.id")
    .whenNotMatched("event.type = 'INSERT'")
    .insert(Map(
      "id" -> $"event.id",
      "value" -> $"event.value",
      "updated_at" -> $"event.time"
    ))
    .whenMatched("event.type = 'UPDATE' AND table.updated_at < event.time")
    .update(Map(
      "value" -> $"event.value",
      "updated_at" -> $"event.time"
    ))
    .whenMatched("event.type = 'DELETE' AND table.updated_at < event.time")
    .delete
    .execute
}

Testons cette implémentation en ingérant nos trois batches dans l’ordre :

initEmptyTable(tableSchema)

test(mergeV1)(events1, events2, events3)
//+---+----------------+----------+
//| id|           value|updated_at|
//+---+----------------+----------+
//|  A|updated 2nd time|         3|
//+---+----------------+----------+

Le résultat correspond parfaitement à ce qu'on attendait :

  • La ligne A contient la toute dernière mise à jour,
  • La ligne B n’existe plus car supprimée par le DELETE.

Imaginons maintenant que le système qui transmet les events ne respecte pas le principe exactly once et qu’il y ait des doublons des events de type INSERT dans le premier batch. Pour tester, faisons union du batch events1 avec lui-même :

test(mergeV1)(events1.union(events1), events2, events3)
//+---+----------------+----------+
//| id|           value|updated_at|
//+---+----------------+----------+
//|  A|updated 2nd time|         3|
//|  A|updated 2nd time|         3|
//+---+----------------+----------+

On constate que les deux lignes ont été insérées et mises à jour.

Regardons maintenant ce qui se passe si on regroupe les events2 et events3 dans un seul batch, nous aurons donc deux UPDATE pour la ligne A dans le même batch :

test(mergeV1)(events1, events2.union(events3))
//java.lang.UnsupportedOperationException: Cannot perform MERGE as multiple source rows matched and attempted to update the same
//target row in the Delta table. By SQL semantics of merge, when multiple source rows match
//on the same target row, the update operation is ambiguous as it is unclear which source
//should be used to update the matching target row.
//You can preprocess the source table to eliminate the possibility of multiple matches.

Une exception est levée, dont le message nous explique qu'il y a une ambiguïté car le batch contient de multiples UPDATE pour la même ligne.

Regardons ensuite ce qui se passe si tous les cinq events sont regroupés dans le même batch :

test(mergeV1)(events1.union(events2).union(events3))
//+---+--------+----------+
//| id|   value|updated_at|
//+---+--------+----------+
//|  A|inserted|         0|
//|  B|inserted|         2|
//+---+--------+----------+

Etonnamment, il n'y a plus d'exception, mais le résultat n'est pas du tout celui qu'on souhaite. On voit que les events UPDATE et DELETE ont été ignorés.

Comme vous pouvez le constater, nous sommes très rapidement arrivés à des limites de notre approche naïve, qui ne sera sans doute pas satisfaisante pour un cas réel.

La règle est simple : lorsqu'il y a plusieurs events avec le même id (c'est-à-dire plusieurs modifications de la même donnée), il y a une ambiguïté que le merge ne peut pas résoudre. C'est donc à vous d'implémenter la transformation pour consolider les events avant le merge en respectant vos règles métier spécifiques.

Si le dernier event suffit

Dans le cas où chaque UPDATE porte l'état complet de la donnée qu'on souhaite persister, on peut tout simplement filtrer les events en ne gardant que le dernier pour chaque id et considérer un UPDATE comme un INSERT si la donnée n'existe pas dans la table.

Pour cela il faut modifier légèrement notre fonction merge, plus précisément la condition dans whenNotMatched() :

val mergeV2 = (events: DataFrame) => {
  DeltaTable
    .forPath(tablePath)
    .as("table")
    .merge(events.as("event"), "table.id = event.id")
    .whenNotMatched("event.type IN ('INSERT', 'UPDATE')")
    .insert(Map(
      "id" -> $"event.id",
      "value" -> $"event.value",
      "updated_at" -> $"event.time"
    ))
    .whenMatched("event.type = 'UPDATE' AND table.updated_at < event.time")
    .update(Map(
      "value" -> $"event.value",
      "updated_at" -> $"event.time"
    ))
    .whenMatched("event.type = 'DELETE' AND table.updated_at < event.time")
    .delete
    .execute
}

Ensuite, on utilise une window function pour filtrer les events en ne gardant que le dernier pour chaque id :

val latestEventsV1 = (events: DataFrame) => {
  events
    .withColumn("n", row_number over Window.partitionBy("id").orderBy($"time".desc))
    .where($"n" === 1)
    .drop("n")
}

Désormais, peu importe la manière dont les events sont groupés dans les batches, le résultat sera toujours le même tant que les batches sont ingérés dans le bon ordre :

test(mergeV2, latestEventsV1)(events1, events2, events3)
//+---+----------------+----------+
//| id|           value|updated_at|
//+---+----------------+----------+
//|  A|updated 2nd time|         3|
//+---+----------------+----------+

test(mergeV2, latestEventsV1)(events1, events2.union(events3))
//+---+----------------+----------+
//| id|           value|updated_at|
//+---+----------------+----------+
//|  A|updated 2nd time|         3|
//+---+----------------+----------+

test(mergeV2, latestEventsV1)(events1.union(events2).union(events3))
//+---+----------------+----------+
//| id|           value|updated_at|
//+---+----------------+----------+
//|  A|updated 2nd time|         3|
//+---+----------------+----------+

La documentation Databricks propose une autre technique qui devrait être équivalente en termes de performance (un seul shuffle dans les deux cas) :

val latestEventsV2 = (events: DataFrame) => {
  events
    .select($"id", struct("time", "type", "value") as "other_cols")
    .groupBy("id")
    .agg(max("other_cols").as("latest"))
    .select($"id", $"latest.*")
}

Je vous laisse tester cette fonction, le résultat sera identique.

Néanmoins, je trouve cette technique moins naturelle et moins générique car la fonction doit "connaître" toutes les colonnes du DataFrame à filtrer et non seulement la colonne à rendre unique (id) et la colonne de tri (time).

L’ordre des batches compte

Il est évident que l’event DELETE de la ligne B ne sera pas pris en compte si on ingère les batches dans l'ordre inverse :

test(mergeV2, latestEventsV1)(events3, events2, events1)
//+---+----------------+----------+
//| id|           value|updated_at|
//+---+----------------+----------+
//|  A|updated 2nd time|         3|
//|  B|        inserted|         2|
//+---+----------------+----------+

Le même problème se posera si on ré-ingère le 2ème batch après avoir ingéré les trois :

test(mergeV2, latestEventsV1)(events1, events2, events3, events2)
//+---+----------------+----------+
//| id|           value|updated_at|
//+---+----------------+----------+
//|  A|updated 2nd time|         3|
//|  B|        inserted|         2|
//+---+----------------+----------+

Dans le cas où le workflow ne peut pas garantir que les batches sont ingérés dans l'ordre, on est obligé de garder une trace de la donnée supprimée dans la table. On peut alors considérer le DELETE comme un UPDATE ou un INSERT qui met toutes les colonnes à null, sauf id et updated_at, et le flag deleted à true. Voici à quoi va ressembler la fonction merge dans ce cas là :

val mergeV3 = (events: DataFrame) => {
  DeltaTable
    .forPath(tablePath)
    .as("table")
    .merge(events.as("event"), "table.id = event.id")
    .whenNotMatched()
    .insert(Map(
      "id" -> $"event.id",
      "value" -> when($"event.type" === "DELETE", null).otherwise($"event.value"),
      "updated_at" -> $"event.time",
      "deleted"-> when($"event.type" === "DELETE",true).otherwise(false)
    ))
    .whenMatched("event.type IN ('UPDATE', 'DELETE') AND table.updated_at < event.time")
    .update(Map(
      "value" -> when($"event.type" === "DELETE", null).otherwise($"event.value"),
      "updated_at" -> $"event.time",
      "deleted"-> when($"event.type" === "DELETE",true).otherwise(false)
    ))
    .execute
}

Les tests montrent que la manière dont les events sont groupés dans les batches et l’ordre dans lequel les batches sont ingérés n’ont plus d’importance :

initEmptyTable(tableSchema.add("deleted", BooleanType))

test(mergeV3, latestEventsV1)(events3, events2, events1)
//+---+----------------+----------+-------+
//| id|           value|updated_at|deleted|
//+---+----------------+----------+-------+
//|  A|updated 2nd time|         3|  false|
//|  B|            null|         4|   true|
//+---+----------------+----------+-------+

test(mergeV3, latestEventsV1)(events1, events2, events3)
//+---+----------------+----------+-------+
//| id|           value|updated_at|deleted|
//+---+----------------+----------+-------+
//|  A|updated 2nd time|         3|  false|
//|  B|            null|         4|   true|
//+---+----------------+----------+-------+

test(mergeV3, latestEventsV1)(events1.union(events2).union(events3))
//+---+----------------+----------+-------+
//| id|           value|updated_at|deleted|
//+---+----------------+----------+-------+
//|  A|updated 2nd time|         3|  false|
//|  B|            null|         4|   true|
//+---+----------------+----------+-------+

Cette approche est tout de même très contraignante car nécessite d'ajouter le filtre WHERE deleted = false dans toutes les requêtes qui vont interroger cette table.

Il vaut mieux élaborer le workflow qui garantit que les batches sont ingérés dans le bon ordre. On peut choisir parmi deux approches :

  • Le batch N ne peut être ingéré qu’une seule fois et uniquement si le batch N-1 a été ingéré avec succès ;
  • Chaque (ré)ingestion du batch N implique la (ré)ingestion de tous les batches suivants.

Merge avant merge

Jusqu'ici, tout a été plutôt simple. Les choses se compliquent lorsque nous avons besoin de persister des informations qui ne sont présentes que dans les events de type INSERT. Dans ce cas, il ne suffit plus de garder juste le dernier event pour chaque id. Il est donc nécessaire d'implémenter une transformation qui consolidera les events avant le merge dans la table Delta Lake.

Par exemple, on souhaite ajouter la colonne created_at qui stockera le time de l'INSERT afin de savoir à quel moment la donnée a été créée.

val mergeEvents = (events: DataFrame) => {
  val windowById = Window.partitionBy("id").orderBy($"time".desc)

  val isUpsert = $"type" === "UPDATE" and $"previous_event.type" === "INSERT"

  events
    .withColumn(
      "row_number",
      row_number over Window.partitionBy("id", "type").orderBy($"time".desc)
    )
    .where($"row_number" === 1)
    .drop("row_number")
    .select(
      $"*",
      row_number over windowById as "row_number",
      lag(struct($"*"), -1, null) over windowById as "previous_event"
    )
    .where(col("row_number") === 1)
    .select(
      $"id",
      $"value",
      $"time" as "updated_at",
      when(isUpsert, $"previous_event.time") otherwise $"time" as "created_at",
      when(isUpsert, "UPSERT") otherwise $"type" as "type"
    )
}

Cette transformation filtre d’abord les events pour ne garder que les plus récents pour chaque id et type (donc jusqu’à trois events par id). Ensuite, elle ne garde que le dernier event pour chaque id en transformant chaque UPDATE précédé par un INSERT en UPSERT dont la valeur de created_at letimede l’INSERT.

La fonction merge doit être à nouveau adaptée pour prendre en compte la nouvelle colonne created_at et le nouveau type d’event UPSERT :

val mergeV4 = (events: DataFrame) => {
  DeltaTable
  .forPath(tablePath)
  .as("table")
  .merge(events.as("event"), "table.id = event.id")
  .whenNotMatched("event.type IN ('INSERT', 'UPSERT')")
  .insert(Map(
    "id" -> $"event.id",
    "value" -> $"event.value",
    "updated_at" -> $"event.updated_at",
    "created_at" -> $"event.created_at"
  ))
  .whenMatched("event.type IN ('UPDATE','UPSERT') AND table.updated_at < event.updated_at")
  .update(Map(
    "value" -> $"event.value",
    "updated_at" -> $"event.updated_at"
  ))
  .whenMatched("event.type = 'DELETE' AND table.updated_at < event.updated_at")
  .delete
  .execute
}

Comme d’habitude, ne me croyez pas sur parole, testez par vous même :

initEmptyTable(tableSchema.add("created_at", IntegerType))

test(mergeV4, mergeEvents)(events1, events2, events3)
//+---+----------------+----------+----------+
//| id|           value|updated_at|created_at|
//+---+----------------+----------+----------+
//|  A|updated 2nd time|         3|         0|
//+---+----------------+----------+----------+

test(mergeV4, mergeEvents)(events1, events2.union(events3))
//+---+----------------+----------+----------+
//| id|           value|updated_at|created_at|
//+---+----------------+----------+----------+
//|  A|updated 2nd time|         3|         0|
//+---+----------------+----------+----------+

test(mergeV4, mergeEvents)(events1.union(events2).union(events3))
//+---+----------------+----------+----------+
//| id|           value|updated_at|created_at|
//+---+----------------+----------+----------+
//|  A|updated 2nd time|         3|         0|
//+---+----------------+----------+----------+

Puisqu'on a déjà implémenté le merge des events avec des transformations de DataFrame, il est très simple de créer une fonction qui va régénérer la table à partir de tous les events cumulés depuis la création de la table source :

val overwriteTable = (events: DataFrame) => {
  mergeEvents(events)
    .where($"type" === "INSERT" or $"type" === "UPSERT")
    .drop("type")
    .write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .save(tablePath)
}

overwriteTable(events1.union(events2).union(events3))
showTable()
//+---+----------------+----------+----------+
//| id|           value|updated_at|created_at|
//+---+----------------+----------+----------+
//|  A|updated 2nd time|         3|         0|
//+---+----------------+----------+----------+

Il est donc intéressant de sauvegarder l'ensemble des events en guise de données brutes. Cela permet de recréer la table à tout moment suite à des modifications de la pipeline, comme :

  • l'ajout de nouvelles colonnes qui n'étaient pas prises en compte auparavant,
  • toutes sortes de transformations sur les colonnes existantes (changement de type, parsing des dates, anonymisation, etc.).

Si vous ne disposez pas de tous les events produits depuis la création de la table source, il est judicieux de créer et de sauvegarder un snapshot de cette table sous forme d’events de type INSERT dont la valeur de colonne time correspond au moment de la création du snapshot. Ces events représentent donc l'état des données à l'instant T qui sont complétés par des events qui arrivent au fil de l'eau. Cette approche permet de régénérer la table Delta Lake à tout moment sans refaire une copie de la table source (souvent dans une BDD opérationnelle).

Conclusion

L'opération merge de l’API Delta Lake, tout en vous simplifiant la vie, ne va pas vous mettre au chômage pour autant. Une bonne maîtrise des transformations Spark, de vos règles métier spécifiques et des outils d’orchestration est toujours nécessaire pour concevoir des pipelines d’ingestion fiables et performantes.