Mise en place d'une architecture Data Lakehouse conforme au RGPD avec Apache Iceberg sur AWS

Dans le cadre du Règlement Général sur la Protection des Données (RGPD), chaque client d’une entreprise a le droit de demander la suppression de ses données personnelles - sous réserve des obligations légales - et les entreprises ont l'obligation de donner suite à cette demande. Ce principe, appelé  “droit à l’oubli”, peut être difficile à mettre en place pour les entreprises, en particulier lorsqu’elles possèdent des Data Lakes avec d'énormes quantités de données, souvent dispersées à travers différentes tables, fichiers et formats au sein du même Data Lake.

L’utilisation d’un Lakehouse avec Iceberg s’avère très utile dans ce cas, notamment car Iceberg propose des fonctionnalités avancées pour la suppression des données. Avec AWS, il est possible de mettre en place une architecture robuste pour gérer les suppressions de données personnelles.

Dans cet article, nous verrons d'abord ce qu’implique le droit à l'oubli et comment Iceberg et AWS nous aident à y répondre. Nous verrons ensuite l'implémentation concrète d’une solution, illustrée par des exemples de code.

Si vous n'êtes pas encore familier avec Iceberg, j'ai rédigé un article dédié que vous pouvez consulter ici.

💡 Pressé de découvrir la solution ? Rendez-vous directement à la section Implémentation !

Comprendre le droit à l'oubli

L’Union Européenne a mis en place le Règlement Général sur la Protection des Données (RGPD) qui vise à protéger les données personnelles des citoyens. Un des principes fondamentaux du RGPD est le droit à l’oubli. Lorsqu’un citoyen demande à une entreprise de supprimer ses données, elle se doit de le faire dans un délai de 30 jours maximum.

Cette obligation légale pose plusieurs défis aux entreprises :

  • retrouver toutes les données d'un utilisateur dans les différents systèmes,
  • supprimer ces données de manière définitive et sans en oublier aucune,
  • garder le système performant pendant ces opérations.

La difficulté est d'autant plus grande lorsque les données d'un utilisateur sont dispersées :

  • dans différentes tables (profil, commandes, interactions...),
  • dans différents formats (structurés et non structurés),
  • dans différents systèmes (production, analytics, backup...).

C’est ici que Iceberg intervient.


Les fonctionnalités clés d'Iceberg pour le RGPD

Plusieurs fonctionnalités d'Iceberg sont intéressantes pour gérer le droit à l'oubli.

Différents modes d'écriture (COW et MOR)

Iceberg propose deux modes d'écriture : COW (copy-on-write) et MOR (merge-on-read). Ce sont deux façons différentes d’écrire les fichiers de données. 

Le Copy-On-Write, qui est le mode par défaut des tables Iceberg, consiste à réécrire un fichier lorsqu’une transaction a lieu. Par exemple, lors d’une opération DELETE, un nouveau fichier va être créé sans la donnée supprimée. L’ancien fichier, qui contient l’ancienne donnée, correspond maintenant au snapshot précédent. La couche de catalogue pointera alors vers le nouveau fichier pour que l'utilisateur puisse accéder à la donnée à jour. Le COW est plus avantageux pour les opérations de lecture, en effet il permet au catalogue d’accéder directement au dernier snapshot. Toutefois lorsque des opérations de suppression ou modification doivent être réalisées fréquemment, les performances peuvent être réduites en raison de la réécriture de nombreux fichiers.

Figure 1 : Schéma du fonctionnement du Copy-On-Write (COW)
Image avec Titre Centré
Figure 1 : Schéma du fonctionnement du Copy-On-Write (COW)

Le Merge-on-Read, quant à lui, ne touche pas aux fichiers de données. Il crée plutôt un nouveau fichier, que l’on appelle delete file. Ce fichier contient l’emplacement de la donnée à supprimer, c'est-à-dire le nom du fichier dans lequel la donnée à supprimer est présente mais également sa position dans le fichier. Ces delete files vont permettre, lors des suppressions, de ne pas réécrire directement les fichiers contenant les données d’un utilisateur à supprimer. Cela permet ainsi de ne pas affecter les performances du système lorsqu’une grande quantité de fichiers est impactée par des suppressions et doivent donc être toutes réécrite. Ce mode est très utile pour des opérations d’écritures fréquentes mais peu pour de la lecture car elle implique de fusionner les fichiers de données avec leur delete file pour que l'utilisateur puisse avoir les données à jour.

Image avec Titre Centré
Figure 2 : Schéma du fonctionnement du Merge-On-Read (MOR)


Le choix du mode d’écriture dépend des besoins de chacun. Il n’y a pas de mode meilleur qu’un autre. Le COW est intéressant pour les tables avec des lectures fréquentes tandis que le MOR convient mieux aux cas de modifications régulières. Toutefois, pour garantir pleinement le respect du droit à l'oubli, le mode d'écriture seul n'est pas suffisant. D'autres étapes sont nécessaires, à commencer par la compaction.


Compaction

L’opération qui permet de fusionner physiquement les delete files avec les fichiers de données auxquels ils sont rattachés est appelée Compaction. Cette fonctionnalité permet de prendre en compte la suppression des données au niveau des fichiers (et pas seulement à la lecture de la table) mais également d’éviter de se retrouver avec une multitude de petits fichiers, optimisant ainsi nos requêtes. En effet, en fusionnant les petits fichiers en fichiers plus volumineux, cela réduit le nombre d'opérations d'accès aux fichiers lors des requêtes. Cette fonctionnalité est très utile dans notre cas d’usage RGPD pour s’assurer que le fichier de données soit bien réécrit sans les données à supprimer.

Image avec Titre Centré
Figure 3 : Processus de Compaction dans un Lakehouse

Cleaning

Iceberg conserve l’historique des données grâce à sa fonctionnalité de Time Travel. Pour être en totale conformité avec le RGPD, Iceberg, grâce à l’expiration des snapshots, permet de supprimer toutes les anciennes versions d’un fichier. Ainsi, il est possible de conserver seulement les versions les plus récentes des fichiers de données. On est alors sûr qu’aucune trace des données d’un utilisateur supprimé ne soit encore présente.

Image avec Titre Centré
Figure 4 : Processus de Cleaning dans un Lakehouse

Proposition d’architecture sur AWS

Image avec Titre Centré
Figure 5 : Architecture AWS pour la gestion du droit à l'oubli

Pour mettre en place une solution RGPD pour le droit à l’oubli avec Iceberg sur AWS, nous allons utiliser :

  • S3 : stockage des tables Iceberg (données et métadonnées),
  • Glue : exécution du job de suppression de données,
  • Lake Formation : couche de sécurité pour gérer les permissions d'accès aux données,
  • Terraform : outil d'Infrastructure as Code pour déployer et maintenir notre architecture AWS.

Pour illustrer la mise en place de cette architecture, prenons l'exemple d'un service de streaming vidéo.

Deux types de données personnelles sont stockés :

  • les informations de profil des utilisateurs (email, nom, date de naissance, pays...),
  • l'historique de visionnage (contenu regardé, durée, appareil utilisé, adresse IP...).

Dans notre exemple, nous allons suivre le cas de Marie qui décide de résilier son abonnement. En plus de la résiliation, elle demande que toutes ses données personnelles soient supprimées de nos systèmes. Notre architecture doit donc :

  • supprimer son profil utilisateur,
  • effacer tout son historique de visionnage,
  • vérifier que la suppression est complète.

Voici comment va fonctionner notre architecture de suppression : chaque mois, un job Glue se déclenche et lit les utilisateurs à supprimer. Il procède alors à la suppression des données correspondantes dans nos tables Iceberg stockées sur S3. Une fois les suppressions effectuées, nous pouvons utiliser Athena pour vérifier que nos tables sont bien à jour et ne contiennent plus les données des utilisateurs concernés.


Implémentation de la solution

Voyons maintenant comment implémenter concrètement notre solution. Nous allons au préalable mettre tout cela en place avec Terraform.

Étape 1 : Création d’un bucket S3

Tout d’abord il est nécessaire d’avoir un bucket S3 pour stocker les données.

locals {
  iceberg-bucket-name = "iceberg-gdpr-streaming-data"
}

resource "aws_s3_bucket" "iceberg_data" {
  bucket = local.iceberg-bucket-name

  tags = {
    Name        = "Iceberg bucket"
  }
}

Étape 2 : Création de la base de donnée

Créons ensuite la base de données qui contiendra nos tables :

resource "aws_glue_catalog_database" "streaming_db" {
  name = "streaming_db"
  description = "Database for streaming service data with Iceberg tables"
}

Étape 3 : Création des rôles

Nous allons avoir besoin d’un rôle permettant à nos jobs Glue d’accéder à S3, Glue et Lake Formation.

resource "aws_iam_role" "glue_delete_data_role" {
  name = "glue-delete-data-role"
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "glue.amazonaws.com"
        }
      }
    ]
  })
}

resource "aws_iam_role_policy" "glue_permissions" {
  name = "glue-permissions"
  role = aws_iam_role.glue_delete_data_role.id
  
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "glue:*",
          "lakeformation:*",
          "s3:*"
        ]
        Resource = ["*"]
      },
      {
        Effect = "Allow"
        Action = [
          "iam:PassRole"
        ]
        Resource = [aws_iam_role.glue_delete_data_role.arn]
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "glue_service" {
  role       = aws_iam_role.glue_delete_data_role.name
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole"
}

Étape 4 : Configuration des permissions dans Lake Formation

Lake Formation permet d’ajouter une couche de sécurité à notre Lakehouse afin de sécuriser davantage notre table en plus des permissions IAM. Pour cela, nous devons définir quels rôles peuvent réaliser quelles actions sur notre base de données et ses tables.

Dans notre cas, deux rôles nécessitent des permissions Lake Formation :

  1. Le rôle "glue-delete-data-role" : utilisé par nos jobs Glue pour créer les tables Iceberg et supprimer les données. Il a besoin des permissions complètes sur la base de données et les tables.
  2. le rôle AWS SSO permet d'exécuter des requêtes Athena avec les permissions SELECT et DELETE sur la base et les tables.

La configuration de Lake Formation commence par établir des administrateurs Lake Formation. Ces administrateurs possèdent les droits pour configurer et gérer toutes les permissions du Data Lake. Ils peuvent accéder à toutes les données mais aussi déléguer des permissions à d'autres rôles.

resource "aws_lakeformation_data_lake_settings" "admins" {
  admins = [
            aws_iam_role.glue_delete_data_role.arn,
            "arn:aws:iam::${var.account_id}:role/aws-reserved/sso.amazonaws.com/${var.region}/AWSReservedSSO_VotreRoleSSO_XXXXX"  # Votre rôle SSO personnel
            ]

}

Au niveau de la base de données, nous définissons des permissions comme CREATE_TABLE pour permettre la création de nouvelles tables, DROP pour la suppression.

# Permissions au niveau de la base de données pour le rôle SSO personnel
resource "aws_lakeformation_permissions" "glue_database_admin" {
 principal = "arn:aws:iam::${var.account_id}:role/aws-reserved/sso.amazonaws.com/${var.region}/AWSReservedSSO_VotreRoleSSO_XXXXX"  # Votre rôle SSO personnel
 permissions = ["CREATE_TABLE", "DROP", "DESCRIBE", "ALTER"]
 database {
   name = aws_glue_catalog_database.streaming_db.name
 }
}


# Permissions au niveau de la base de données pour le rôle Glue
resource "aws_lakeformation_permissions" "glue_database_job" {
 principal = aws_iam_role.glue_delete_data_role.arn
 permissions = ["CREATE_TABLE","DROP"]
 database {
   name = aws_glue_catalog_database.streaming_db.name
 }
}

Au niveau des tables, nous pouvons être encore plus précis avec des permissions comme SELECT pour la lecture des données, INSERT pour l'ajout de nouvelles données ou DELETE pour la suppression de données. Ces permissions sont importantes pour les services qui consomment nos données comme Athena pour les requêtes SQL.

# Permissions au niveau des tables pour le rôle Glue
resource "aws_lakeformation_permissions" "glue_tables_job" {
 principal = aws_iam_role.glue_delete_data_role.arn
 permissions = ["DELETE"]
 table {
   database_name = aws_glue_catalog_database.streaming_db.name
   wildcard = true  
 }
}


# Permissions au niveau des tables pour le rôle SSO personnel
resource "aws_lakeformation_permissions" "glue_tables_admin" {
 principal = "arn:aws:iam::${var.account_id}:role/aws-reserved/sso.amazonaws.com/${var.region}/AWSReservedSSO_VotreRoleSSO_XXXXX"  
 permissions = ["SELECT", "INSERT", "DELETE", "DESCRIBE", "ALTER", "DROP"]
 table {
   database_name = aws_glue_catalog_database.streaming_db.name
   wildcard = true 
 }
}

Étape 5 : Création des tables Iceberg

La création des tables Iceberg se réalise via un job Glue. Quel est l'intérêt d’utiliser un job ? Tout simplement car Terraform n’est pas capable de créer des tables Iceberg quand on leur spécifie un partitionnement. (cf https://github.com/hashicorp/terraform-provider-aws/issues/36531)

Nous allons donc devoir écrire un script avec Spark SQL pour créer nos tables : 

import sys
from awsglue.utils import getResolvedOptions
from pyspark.sql.session import SparkSession


args = getResolvedOptions(sys.argv, ["JOB_NAME", "S3_BUCKET_NAME"])

spark = (
    SparkSession.builder
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.iceberg_catalog", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.iceberg_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
    .config("spark.sql.catalog.iceberg_catalog.warehouse", f"s3://{args['S3_BUCKET_NAME']}/")
    .config("spark.sql.catalog.glue.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .config("spark.sql.iceberg.handle-timestamp-without-timezone", "true")
    .config("spark.sql.iceberg.use-timestamp-without-timezone-in-new-tables", "true")
    .config("spark.sql.parquet.int96AsTimestamp", "true")
    .config("spark.sql.defaultCatalog", "iceberg_catalog")
    .getOrCreate()
)

spark.sql("""
            CREATE TABLE streaming_db.users (
                user_id STRING,
                email STRING,
                name STRING,
                birth_date DATE,
                country STRING,
                subscription_type STRING,
                terms_accepted BOOLEAN,
                created_at TIMESTAMP,
                updated_at TIMESTAMP
            )
            USING iceberg
            PARTITIONED BY (years(created_at))
            TBLPROPERTIES (
                'write.format.default' = 'parquet',
                'write.parquet.compression-codec' = 'snappy',
                'write.delete.mode' = 'merge-on-read', 
                'format-version' = '2'
            )
        """).show(truncate=False)

spark.sql("""
            CREATE TABLE streaming_db.viewing_history (
                activity_id STRING,
                user_id STRING,
                content_id STRING,
                watch_duration_seconds INT,
                device_type STRING,
                ip_address STRING,
                started_at TIMESTAMP,
                ended_at TIMESTAMP,
                viewing_progress FLOAT
            )
            USING iceberg
            TBLPROPERTIES (
                'write.format.default' = 'parquet',
                'write.parquet.compression-codec' = 'snappy',
                'write.delete.mode' = 'merge-on-read',
                'format-version' = '2'
            )
        """).show(truncate=False)

Pour utiliser Iceberg, plusieurs configurations sont nécessaires dans la Spark Session :

  • "spark.sql.extensions" : active les fonctionnalités Iceberg dans Spark, permettant l'utilisation des requêtes comme MERGE INTO, DELETE FROM, UPDATE ou les procédures Spark,

  • "spark.sql.catalog.iceberg_catalog" : définit le type de catalogue utilisé par Iceberg pour gérer les métadonnées des tables,

  • "spark.sql.catalog.iceberg_catalog.catalog-impl" : spécifie l'utilisation du catalogue AWS Glue,

  • "spark.sql.catalog.iceberg_catalog.warehouse" : définit l'emplacement des données Iceberg dans S3,

  • "spark.sql.catalog.glue.io-impl" : configure l'utilisation de S3 comme système de stockage pour les opérations de lecture/écriture,

  • "spark.sql.iceberg.handle-timestamp-without-timezone" : active la gestion des timestamps sans fuseau horaire,

  • "spark.sql.iceberg.use-timestamp-without-timezone-in-new-tables" : force l'utilisation du format sans timezone pour les nouvelles tables,

  • "spark.sql.parquet.int96AsTimestamp" : garantit la compatibilité des timestamps avec le format Parquet,

  • "spark.sql.defaultCatalog" : définit Iceberg comme le catalogue par défaut.

Pour plus de configurations, consultez la documentation officielle d'Iceberg

Dans notre cas, nous utiliserons le mode d'écriture Merge-On-Read puis Copy-On-Write afin de présenter chacun de leur comportement.

Une fois que le script est prêt, vous pouvez le terraformer en utilisant le code suivant :

resource "aws_glue_workflow" "create-iceberg-tables-workflow" {
  name = "create-iceberg-tables-workflow"
}

resource "aws_s3_object" "create-iceberg-tables-script" {
  bucket = aws_s3_bucket.iceberg_data.id 
  key    = "scripts/create_iceberg_tables.py"
  source = "${path.module}/../scripts/create_iceberg_tables.py"
  etag = filemd5("${path.module}/../scripts/create_iceberg_tables.py")
}


resource "aws_glue_job" "create-iceberg-tables" {
  name         = "create-iceberg-tables"
  role_arn     = aws_iam_role.glue_delete_data_role.arn
  glue_version = "4.0"
  max_capacity = 10
  description  = "Glue job that create Iceberg Tables"
  timeout      = 1440
  max_retries  = 0

  default_arguments = {
    "--job-bookmark-option"              = "job-bookmark-disable"
    "--encryption-type"                  = "sse-s3"
    "--job-language"                     = "python"
    "--enable-metrics"                   = ""
    "--enable-continuous-cloudwatch-log" = "true"
    "--enable-continuous-log-filter"     = "true"
    "--S3_BUCKET_NAME"                   = aws_s3_bucket.iceberg_data.id
    "--enable-job-insights"              = "true"
    "--enable-spark-ui"                  = "true"
    "--datalake-formats"                 = "iceberg"
  }

  command {
    script_location = "s3://${aws_s3_bucket.iceberg_data.id}/scripts/create_iceberg_tables.py"
    python_version  = "3"
  }
}

Dans la configuration du job, n'oubliez pas d'inclure l'argument --datalake-formats sans quoi le job échouera car il ne pourra pas utiliser Iceberg. Si vous souhaitez utiliser des versions spécifiques d’Iceberg, vous pouvez consulter cette documentation.

Ensuite, vous pouvez exécuter terraform apply, lancer le job via la console AWS puis vérifier que vos tables ont bien été créées.

Votre bucket contient désormais les dossiers suivants : 

  • streaming_db.db/
    • users/
      • metadata/
        • file.metadata.json
    • viewing_history/
      • metadata/
        • file.metadata.json

Chaque fichier de métadonnées contient des informations sur les manifestes, les snapshots, le schéma des tables, les fichiers de données et leur emplacement dans S3.

Étape 6 : Ajout de données dans la table

Ajoutons de la donnée dans nos tables via Athena.

INSERT INTO users 
VALUES
('user-001', 'marie@email.com', 'Marie', DATE '1990-01-15', 'France', 'premium', true, TIMESTAMP '2023-01-01 10:00:00', TIMESTAMP '2023-01-01 10:00:00'),
('user-002', 'thomas@email.com', 'Thomas', DATE '1985-03-20', 'France', 'basic', true, TIMESTAMP '2023-02-01 11:00:00', TIMESTAMP '2023-02-01 11:00:00'),
('user-003', 'julie@email.com', 'Julie', DATE '1992-07-10', 'Belgium', 'premium', true, TIMESTAMP '2023-03-15 09:00:00', TIMESTAMP '2023-03-15 09:00:00'),
('user-004', 'alex@email.com', 'Alex', DATE '1988-12-05', 'Spain', 'basic', true, TIMESTAMP '2023-04-01 14:30:00', TIMESTAMP '2023-04-01 14:30:00'),
('user-005', 'sophia@email.com', 'Sophia', DATE '1995-06-25', 'Italy', 'premium', true, TIMESTAMP '2023-05-10 16:45:00', TIMESTAMP '2023-05-10 16:45:00'),
('user-006', 'lucas@email.com', 'Lucas', DATE '1987-09-30', 'France', 'basic', true, TIMESTAMP '2023-06-15 08:20:00', TIMESTAMP '2023-06-15 08:20:00'),
('user-007', 'emma@email.com', 'Emma', DATE '1993-04-12', 'Germany', 'premium', true, TIMESTAMP '2023-07-01 11:15:00', TIMESTAMP '2023-07-01 11:15:00'):
INSERT INTO viewing_history
VALUES
('activity-001', 'user-003', 'movie-001', 7200, 'smart_tv', '192.168.1.1', TIMESTAMP '2024-01-15 20:00:00', TIMESTAMP '2024-01-15 22:00:00', 1.0),
('activity-002', 'user-005', 'series-001', 1800, 'mobile', '192.168.1.1', TIMESTAMP '2024-01-16 09:00:00', TIMESTAMP '2024-01-16 09:30:00', 0.75),
('activity-003', 'user-009', 'movie-002', 5400, 'tablet', '192.168.1.1', TIMESTAMP '2024-01-16 14:00:00', TIMESTAMP '2024-01-16 15:30:00', 0.9),
('activity-004', 'user-003', 'series-002', 3600, 'laptop', '192.168.1.1', TIMESTAMP '2024-01-17 21:00:00', TIMESTAMP '2024-01-17 22:00:00', 1.0),
('activity-005', 'user-003', 'movie-003', 6300, 'smart_tv', '192.168.1.1', TIMESTAMP '2024-01-18 18:00:00', TIMESTAMP '2024-01-18 19:45:00', 0.85),
('activity-006', 'user-003', 'series-003', 2700, 'mobile', '192.168.1.1', TIMESTAMP '2024-01-19 12:00:00', TIMESTAMP '2024-01-19 12:45:00', 0.95),
('activity-007', 'user-005', 'movie-004', 8100, 'smart_tv', '192.168.1.1', TIMESTAMP '2024-01-20 20:30:00', TIMESTAMP '2024-01-20 22:45:00', 1.0),
('activity-008', 'user-006', 'series-004', 1500, 'tablet', '192.168.1.1', TIMESTAMP '2024-01-21 14:00:00', TIMESTAMP '2024-01-21 14:25:00', 0.6),
('activity-009', 'user-007', 'movie-005', 5800, 'laptop', '192.168.1.1', TIMESTAMP '2024-01-22 19:30:00', TIMESTAMP '2024-01-22 21:00:00', 0.88),
('activity-010', 'user-002', 'series-005', 3000, 'mobile', '192.168.1.1', TIMESTAMP '2024-01-23 17:00:00', TIMESTAMP '2024-01-23 17:50:00', 1.0),
('activity-011', 'user-001', 'movie-006', 7500, 'smart_tv', '192.168.1.1', TIMESTAMP '2024-01-24 20:00:00', TIMESTAMP '2024-01-24 22:05:00', 0.95),
('activity-012', 'user-001', 'series-006', 2100, 'tablet', '192.168.1.1', TIMESTAMP '2024-01-25 13:00:00', TIMESTAMP '2024-01-25 13:35:00', 0.8),
('activity-013', 'user-001', 'movie-007', 6000, 'laptop', '192.168.1.1', TIMESTAMP '2024-01-26 21:00:00', TIMESTAMP '2024-01-26 22:40:00', 1.0),
('activity-014', 'user-001', 'series-007', 2400, 'mobile', '192.168.1.1', TIMESTAMP '2024-01-27 16:00:00', TIMESTAMP '2024-01-27 16:40:00', 0.7),
('activity-015', 'user-001', 'movie-008', 7800, 'smart_tv', '192.168.1.1', TIMESTAMP '2024-02-01 19:00:00', TIMESTAMP '2024-02-01 21:10:00', 0.92),
('activity-016', 'user-001', 'series-008', 1800, 'tablet', '192.168.1.1', TIMESTAMP '2024-02-02 15:00:00', TIMESTAMP '2024-02-02 15:30:00', 1.0),
('activity-017', 'user-001', 'movie-009', 5400, 'laptop', '192.168.1.1', TIMESTAMP '2024-02-03 20:00:00', TIMESTAMP '2024-02-03 21:30:00', 0.85),
('activity-018', 'user-001', 'series-009', 3300, 'mobile', '192.168.1.1', TIMESTAMP '2024-02-04 18:00:00', TIMESTAMP '2024-02-04 18:55:00', 0.9),
('activity-019', 'user-001', 'movie-010', 6600, 'smart_tv', '192.168.1.1', TIMESTAMP '2024-02-05 21:00:00', TIMESTAMP '2024-02-05 22:50:00', 1.0),
('activity-020', 'user-001', 'series-010', 2000, 'tablet', '192.168.1.1', TIMESTAMP '2024-02-06 14:00:00', TIMESTAMP '2024-02-06 14:33:00', 0.75),
('activity-021', 'user-001', 'movie-011', 7200, 'laptop', '192.168.1.1', TIMESTAMP '2024-02-07 19:00:00', TIMESTAMP '2024-02-07 21:00:00', 0.88),
('activity-022', 'user-001', 'series-011', 2700, 'mobile', '192.168.1.1', TIMESTAMP '2024-02-08 17:00:00', TIMESTAMP '2024-02-08 17:45:00', 1.0),
('activity-023', 'user-001', 'movie-012', 5100, 'smart_tv', '192.168.1.1', TIMESTAMP '2024-02-09 20:00:00', TIMESTAMP '2024-02-09 21:25:00', 0.95);

Cette opération crée un nouveau dossier dans les dossiers de nos tables nommé data. Ce dossier contient tous les fichiers de données de notre table au format parquet :

  • streaming_db.db/
    • users/
      • metadata/
        • file.metadata.json
        • file2.metadata.json
        • file.avro
      • data/
        • file.parquet
    • viewing_history/
      • metadata/
        • file.metadata.json
        • file2.metadata.json
        • file.avro
      • data/
        • file.parquet

Étape 7 : Création du Job de suppressions des données

Maintenant, il faut mettre en place le script de suppression. Dans notre cas, nous savons que nous voulons supprimer les données liées à Marie, dont le user_id est user-001

Dans un scénario réel, il serait plus intéressant d’utiliser, par exemple, une table DynamoDB ou RDS pour stocker les identifiants des données à supprimer. Ces tables pourraient ensuite être lues pour récupérer dynamiquement les identifiants à traiter.

Deux situations peuvent se présenter :

  • Vous travaillez en Copy-On-Write
  • Vous travaillez en Merge-On-Read

Copy-On-Write

La suppression sera réalisé en plusieurs étapes : 

  1. Un simple DELETE FROM table pour supprimer la donnée de la table ce qui va conduire à la création d’un nouveau fichier de données sans les données supprimées.

  1. (Optionnel) La Compaction permet de réécrire les fichiers parquets en les fusionnant pour régler les problèmes de small files. Cette étape se réalise grâce à la procédure spark rewrite_data_files. Dans notre cas cette étape n’est pas obligatoire car nous n’avons qu’un seul fichier parquet. De plus, par défaut, Iceberg fusionne les fichiers uniquement si une fois combinés ils atteignent environ 512 MB.

  1. Enfin, le Cleaning garantit la suppression de tous les fichiers liés à d’anciens snapshots de votre table, empêchant tout rollback de récupérer les données supprimées. Cette étape se réalise grâce à la procédure spark expire_snapshot.

import datetime
import sys
from typing import Dict, List

from awsglue.utils import getResolvedOptions
from pyspark.sql.session import SparkSession


args = getResolvedOptions(sys.argv, ["JOB_NAME", "S3_BUCKET_NAME"])


spark = (
    SparkSession.builder
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.iceberg_catalog", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.iceberg_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
    .config("spark.sql.catalog.iceberg_catalog.warehouse", f"s3://{args['S3_BUCKET_NAME']}/")
    .config("spark.sql.catalog.glue.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .config("spark.sql.iceberg.handle-timestamp-without-timezone", "true")
    .config("spark.sql.parquet.int96AsTimestamp", "true")
    .config("spark.sql.defaultCatalog", "iceberg_catalog")
    .getOrCreate()
)

ids_to_delete = ['user-001']

def delete_data(ids_to_delete : List[str]) -> None:

    table_name : str = f"streaming_db.viewing_history"

    #Étape du Delete
    ids_to_delete = ', '.join(f"'{id}'" for id in ids_to_delete)
    where_clause = f"user_id IN ({ids_to_delete})"
    spark.sql(f"""
        DELETE FROM {table_name} WHERE {where_clause}
    """)

    # (OPTIONNEL) Étape de la Compaction
    spark.sql(f"""
        CALL iceberg_catalog.system.rewrite_data_files(
        table => '{table_name}',
        strategy => 'binpack'
        )""").show(truncate=False)

    #Étape du Cleaning
    retention_timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    spark.sql(f"""
    CALL iceberg_catalog.system.expire_snapshots(
        table => '{table_name}',
        older_than => TIMESTAMP '{retention_timestamp}',
        retain_last => 1
        )
        """).show(truncate=False)


delete_data(ids_to_delete)

Merge-On-Read

Concernant le cas des tables en Merge-On-Read, cette suppression sera réalisé également en plusieurs étapes : 

1. Un DELETE FROM table pour supprimer la donnée de la table, ce qui génère le delete file.

2. La Compaction permet de réécrire les fichiers Parquet associés à un delete file. L’utilisation du paramètre delete-file-treshold est important car il définit qu’un fichier doit être réécrit dès qu'un delete file associé à un fichier de données est détecté.

3. Enfin, l’étape du Cleaning  supprime tout fichier lié à d’anciens snapshots de votre table.

import datetime
import sys
from typing import Dict, List

from awsglue.utils import getResolvedOptions
from pyspark.sql.session import SparkSession


args = getResolvedOptions(sys.argv, ["JOB_NAME", "S3_BUCKET_NAME"])


spark = (
    SparkSession.builder
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.iceberg_catalog", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.iceberg_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
    .config("spark.sql.catalog.iceberg_catalog.warehouse", f"s3://{args['S3_BUCKET_NAME']}/")
    .config("spark.sql.catalog.glue.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .config("spark.sql.iceberg.handle-timestamp-without-timezone", "true")
    .config("spark.sql.parquet.int96AsTimestamp", "true")
    .config("spark.sql.defaultCatalog", "iceberg_catalog")
    .getOrCreate()
)

ids_to_delete = ['user-001']

def delete_data(ids_to_delete : List[str]) -> None:

    table_name : str = f"streaming_db.viewing_history"

    #Étape du Delete
    ids_to_delete = ', '.join(f"'{id}'" for id in ids_to_delete)
    where_clause = f"user_id IN ({ids_to_delete})"
    spark.sql(f"""
        DELETE FROM {table_name} WHERE {where_clause}
    """)

    #Étape de la Compaction
    spark.sql(f"""
        CALL iceberg_catalog.system.rewrite_data_files(
        table => '{table_name}',
        strategy => 'binpack',
        options => map(
                'delete-file-threshold','1'
            )
        )""").show(truncate=False)

    #Étape du Cleaning
    retention_timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    spark.sql(f"""
    CALL iceberg_catalog.system.expire_snapshots(
        table => '{table_name}',
        older_than => TIMESTAMP '{retention_timestamp}',
        retain_last => 1
        )
        """).show(truncate=False)


delete_data(ids_to_delete)

Lorsque le script de suppression est prêt, il faut le terraformer.

resource "aws_glue_workflow" "delete-data-workflow" {
  name = "delete-data-workflow"
}

resource "aws_s3_object" "delete-data-script" {
  bucket = aws_s3_bucket.iceberg_data.id 
  key    = "scripts/delete_data.py"
  source = "${path.module}/../scripts/delete_data.py"
  etag = filemd5("${path.module}/../scripts/delete_data.py")
}


resource "aws_glue_job" "delete-data" {
  name         = "delete-data"
  role_arn     = aws_iam_role.glue_delete_data_role.arn
  glue_version = "4.0"
  max_capacity = 10
  description  = "Glue job that delete user data from the bucket"
  timeout      = 1440
  max_retries  = 0

  default_arguments = {
    "--job-bookmark-option"              = "job-bookmark-disable"
    "--encryption-type"                  = "sse-s3"
    "--job-language"                     = "python"
    "--enable-continuous-cloudwatch-log" = "true"
    "--enable-continuous-log-filter"     = "true"
    "--S3_BUCKET_NAME"                   = aws_s3_bucket.iceberg_data.id
    "--enable-job-insights"              = "true"
    "--enable-spark-ui"                  = "true"
    "--datalake-formats"                 = "iceberg"
  }

  command {
    script_location = "s3://${aws_s3_bucket.iceberg_data.id}/scripts/delete_data.py"
    python_version  = "3"
  }

}

resource "aws_glue_trigger" "delete-data-trigger" {
  name          = "delete-data-trigger"
  schedule      = "cron(0 3 30 * ? *)"
  type          = "SCHEDULED"
  workflow_name = "delete-data-workflow"

  actions {
    job_name = aws_glue_job.delete-data.name
  }
}

Vous pouvez désormais exécuter terraform apply et vous rendre sur la console AWS pour lancer le job de suppression.

Étapes de la solution en images

En Copy-On-Write

Lors de la création de la table viewing_history, un dossier metadata est généré, contenant le fichier de métadonnées.

Image avec Titre Centré
Figure 6 : Dossier de la table viewing_history à sa création

Ensuite, lors de l’ajout des premières données, le dossier data est créé.

Image avec Titre Centré
Figure 7 : Dossier de la table viewing_history après la création de données

Ce dossier contient nos données au format Parquet.

Image avec Titre Centré
Figure 8 : Fichier Parquet contenant les données ajoutées

Nous pouvons interroger le fichier et constater que 69 lignes liées à l'utilisateur 'user-001' sont présentes.

Image avec Titre Centré
Figure 9 : Nombre de données appartenant à Marie (user-001)

Nous pouvons ensuite lancer le DELETE FROM. Nous constatons qu'un second fichier est créé avec une taille inférieure, en raison de la suppression de données.

Image avec Titre Centré
Figure 10 : Création d'un nouveau fichier Parquet

Nous interrogeons ensuite ce nouveau fichier et constatons qu'il ne contient plus aucune donnée de Marie.

Image avec Titre Centré
Figure 11 : Nombre de données appartenant à Marie dans le nouveau fichier

Lorsque l’étape du Cleaning est correctement exécuté, nous constatons qu'il ne reste plus qu'un seul fichier dans le dossier data, celui créé après le DELETE.

Image avec Titre Centré
Figure 12 : Fichier restant après le Cleaning

En Merge-On-Read

Au départ, un seul fichier est présent dans le dossier data.

Image avec Titre Centré
Figure 13 : Fichier Parquet contenant les données ajoutées

Lorsque le DELETE FROM est exécuté, un nouveau dossier intitulé '/' est créé dans data.

Image avec Titre Centré
Figure 14 : Création d'un nouveau dossier après la suppression des données de Marie

Ce dossier contient notre delete file. En l'ouvrant, nous pouvons voir toutes les positions des lignes contenant les données de Marie dans notre fichier Parquet.

Image avec Titre Centré
Figure 15 : Contenu du delete file généré par la suppression

Après la Compaction et le Cleaning, nous constatons que le fichier a été réécrit en prenant en compte les informations du fichier de suppression et le dossier qui le contenait a été supprimé.

Image avec Titre Centré
Figure 16 : Fichier restant après la Compaction et le Cleaning


Conclusion 

Le droit à l'oubli impose aux entreprises de supprimer les données personnelles des utilisateurs dans un délai court, ce qui peut devenir compliqué lorsque ces données sont dispersées dans un Data Lake. Iceberg permet de gérer cette tâche grâce à ses modes d'écriture COW et MOR, ses fonctionnalités de compaction et d’expiration des snapshots.

En associant Iceberg avec les services AWS comme S3 et Glue, nous pouvons construire une architecture efficace pour supprimer les données tout en maintenant les performances. 

Dans un prochain article, nous nous intéresserons à la migration des tables Hive vers Iceberg avec un cas pratique afin de pouvoir profiter de cette architecture.