La désérialisation dynamique avec dbt / Snowflake

Long story short : TL;DR

🖥️ Besoin d'avoir le code rapidement ? Manque de temps pour retrouver le snippet ? Flemme de lire du français ? Envie de lire du dbt ?

😌 Pas de panique, allons droit au but, sans fioriture, voici le code :

{{
    config(
        materialized="incremental",
        incremental_strategy='append',
        on_schema_change='append_new_columns'
) }}

{%- call statement('json_field_list', fetch_result=True) -%}
    select distinct 'donnee_brute:' || f.path || '::string as ' || lower(replace(f.path, '.', '_')) as select_element
    from
         {{ source("dbt", "deplacement_presidentiel_raw") }},
        lateral flatten(input => donnee_brute, recursive => true, mode => 'object') f
{%- endcall -%}

{%- set json_field_list = load_result('json_field_list')['data'] -%}

with
    get_source as (select * from {{ source("dbt", "deplacement_presidentiel_raw") }}),
    dynamic_deserialization as (
        select 
            {%- for json_field_name in json_field_list %}
                {{ json_field_name[0] }},
            {%- endfor %}
            sysdate() as date_histo
        from get_source
    )
select *
from dynamic_deserialization

deserialisation_dynamique.sql

💪 Que le 4 Mai soit avec vous.

Introduction

📆 En 2024, le format de données JSON se retrouve partout. Dans un contexte d'entreprises où de plus en plus de solutions logicielles sont amenées à échanger des informations, le JSON a su s’imposer comme un format de données lisible à la fois par l’être humain, et par la machine.

🪄 Fréquemment utilisé pour formater les requêtes d’API ainsi que leurs réponses, le JSON présente l’avantage d’être indépendant de tout langage de programmation, mais également d’être flexible. En effet, son mode de fonctionnement étant basé sur des combinaisons attributs-valeurs, il est relativement aisé d’ajouter de nouvelles informations à un JSON déjà existant. C’est par ailleurs ce qui lui vaut d’être considéré comme un format de données dit “semi-structuré”.

☁️ Également, l’avènement des cloud providers, couplés à la forte baisse des coûts de stockage motive de plus en plus d’entreprises à mettre en place des Data Lakes, où les données sont stockées dans le format le plus naturel / brut possible, avant d’être traitées par des pipelines de données. Parmi ces formats, on retrouve les formats structurés (colonnes / lignes), les formats non-structurés (eg : des PDFs), des données binaires (eg : vidéos), et naturellement, des données semi-structurées (eg : JSON).

🏞️ Cependant, bien que les Data Lakes répondent à la problématique de stockage brut de l’information, son exploitation reste, quant à elle, entière.

🏗️ Une des méthodes populaires pour exploiter de la donnée brute consiste à la structurer dans des tables composées de lignes et de colonnes.
Dans le cadre du format JSON, cette opération est appelée la “désérialisation”.

✈️ Dans cet article, à travers l’ingestion brute d’un fichier de l’OpenData Français recensant les déplacements des présidents de la République et des premiers ministres depuis 1945, nous étudierons les bases de la désérialisation de données en SQL avec Snowflake, ainsi que ses limites, puis nous proposerons une solution utilisant la technologie dbt pour venir outrepasser ces limites.

Table des matières

🔎 Jump faster into the jungle

  1. Long story short : TL;DR
  2. Introduction
  3. Table des matières
  4. Description du jeu de données
  5. Présentation du scénario
  6. Chargement des données
  7. Désérialisation manuelle
  8. Flatten option Recursive : La clef du dynamisme
  9. Génération du select dynamique et injection du code dans la clause select
  10. Pépin : La structure du JSON évolue
  11. La solution : L’option on_schema_change
  12. Nettoyage de l’environnement de travail
  13. Conclusion

Description du jeu de données

🌐 Le site officiel hébergeant le jeu de données le décrit comme suit :

“Le service des voyages officiels, [...], s'occupe des mesures de sécurité [...], ainsi que de l'organisation des déplacements officiels. [...]. Les différents inventaires détaillés ont permis d'établir une liste [...] des déplacements des présidents de la République et des premiers ministres depuis 1945.

Cette liste de plus de 2500 déplacements en France et à l'étranger [...]. Sont ainsi accessibles :

  • pour le Président de la République : 1166 déplacements, 118 pays, pour la période 1945-2008 ;
  • pour le Premier ministre : 1342 déplacements, 71 pays, pour la période 1964-2008.

Le jeu de données mis à disposition est constitué des informations suivantes :

  • Fonction : Président de la République ou Premier ministre ;
  • Individu ;
  • Lieu : d'après le contenu de l'inventaire initial ;
  • Lieu normalisé : département ou région pour la France ;
  • Pays ;
  • Date de début ;
  • Date de fin ;
  • URL(s) de la notice ;
  • Cote(s) ;
  • Contenu initial de l'inventaire : ce champ peut permettre d'avoir des compléments d'informations sur le type de déplacement, les différentes étapes d'un voyage (notamment à l'étranger), voire la mention de photographies.”

Déplacements des présidents de la République et des premiers ministres depuis 1945 — Ministère de la Culture

🔎 Ci-dessous, un exemple d’aperçu de la structuration du JSON :

{
  "annee_du_voyage": "2008",
  "code_pays": "MEX",
  "contenu_initial": "Mexico (Mexique) du 14 au 16 mai 2008.",
  "cote_du_dossier": "20220621/11",
  "date_de_debut": null,
  "date_de_fin": "2008-05-16",
  "fonction": "Président de la République",
  "geo_point_2d": {
    "lat": 23.95106846478874,
    "lon": -102.52762948245407
  },
  "individu": "Nicolas Sarkozy",
  "lieu": "Mexique",
  "lieu_contours": {
    "geometry": {
      "coordinates": [
        /** Données supprimées pour faciliter l'affichage de l'aperçu */
      ],
      "type": "MultiPolygon"
    },
    "properties": {},
    "type": "Feature"
  },
  "lieu_normalise": "Mexique",
  "pays": "Mexique",
  "url_de_la_notice": "https://www.siv.archives-nationales.culture.gouv.fr/siv/UD/FRAN_IR_060502/cyyncofeez5-18zouwywi2lux"
}

📥 Le jeu de données est téléchargeable au lien suivant (Prendre le format JSON).

Présentation du scénario

alt_text
Schéma haut niveau du scénario

📜 Pour les besoins de cet article, nous envisagerons le scénario suivant

  1. On admet un loader indépendant de données (eg : airbyte) venant déposer les données brutes dans une table de base de données.
  2. On met en place un système de désérialisation des données semi-structurées vers une table de données structurées, avec une correspondance entre les attributs des JSON sources, et le nom des colonnes cibles.Cette table sera une table d’historisation, ce qui indique que les données seront empilées, avec une date d’historisation, à chaque exécution.

⚠️
Précisions :
- La partie Data Loading étant extérieure au sujet de l’article, par souci de simplicité, nous émulerons ce chargement via un script SQL détaillé ci-après
- La table de réception étant une landing zone, on assume que cette dernière est vidée avant chaque exécution du loader
- Si vous souhaitez pousser le scénario au bout, la documentation de l’API est disponible au lien suivant

Chargement des données

📥 Après avoir téléchargé le fichier source et l’avoir placé dans le répertoire de votre choix, vous pourrez déclencher l’exécution du script SQL ci-dessous qui s’occupera de créer le matériel nécessaire, et de charger les données.

🤖 Ce script SQL est à exécuter dans la console SQL de votre choix (eg : Snowsight ou DBeaver).

Important :
- Exécutez au préalable un use database pour préciser la base de données sur laquelle vous souhaitez travailler
- La commande PUT de ce script n’est utilisable que par le biais d’un client lourd (eg : DBeaver), si vous souhaitez utiliser la console web snowflake pour cette partie, utilisez l’interface graphique snowsight de chargement de fichier dans un stage.
/** 
 * création du schéma de travail snowflake
 */
create or replace schema dbt;

/** 
 * positionnement sur le schéma de travail
 */
use schema dbt;


/** 
 * Création du stage snowflake accueillant le fichier brute
 */
create or replace stage json_file_stage;

/** 
 * Création de la table source dans laquelle les données brutes vont être insérées
 */
create or replace table deplacement_presidentiel_raw (
	donnee_brute variant
);

/** 
 * Création du descripteur de fichier permettant à Snowflake de savoir comment gérer le fichier
 */
create or replace file format json_file_format 
	type = json
	strip_outer_array = true
	allow_duplicate = true;

/** 
 * Chargement du fichier sur le stage
 * /!\ Attention : Cette commande ne peut être exécuter que avec un client lourd (Eg : DBeaver)
 * Sinon, utiliser l'interface graphique web de snowflake pour charger manuellement le fichier dans le stage
 */
put 'file://g:\\my drive\\articles\\dbt\\Dynamic Deserialization\\deplacements-presidents-republique-et-premiers-ministres-depuis-1945.json' @json_file_stage auto_compress = true;

/** 
 * Listing du contenu du stage pour s'assurer que le fichier est bien présent
 */
list @json_file_stage;

/** 
 * Chargement des données du fichier dans la table
 */
copy
into
	deplacement_presidentiel_raw (donnee_brute)
from
	(
	select
		*
	from
		@json_file_stage (file_format => json_file_format,
		pattern => '.*deplacements-presidents.*[.]json.gz') t)
on_error = 'continue';

/** 
 * Vérification de l'insertion des données
 */
select *
from deplacement_presidentiel_raw

/** 
 * Penser à Commit si votre connexion n'est pas en auto-commit
 */
commit;


/**
 * Vérification des l'ingestion des données
 */
select * from deplacement_presidentiel_raw;

deploy_environment.sql

Désérialisation manuelle

⌨️ On en vient au cœur du sujet : La méthode “basique” de désérialisation de notre JSON.

📜 Ci-dessous, le code dbt de notre modèle deserialisation_manuelle.sql :

{{
    config(
        materialized="incremental",
        incremental_strategy='append',
) }}

with
    get_source as (select * from {{ source("dbt", "deplacement_presidentiel_raw") }}),
    manual_deserialization as (
        select 
            donnee_brute:annee_du_voyage::varchar as annee_du_voyage,
            donnee_brute:code_pays::varchar as code_pays,
            donnee_brute:fonction::varchar as fonction,
            donnee_brute:geo_point_2d:lat::varchar as geo_point_2d_lat,
            donnee_brute:geo_point_2d:lon::varchar as geo_point_2d_lon,
            donnee_brute:individu::varchar as individu,
            sysdate() as date_histo
        from get_source
    )
select *
from manual_deserialization

deserialisation_manuelle.sql

🔎 Et ci-dessous, le résultat après avoir filtré sur un voyage en particulier :

alt_text
Résultat de l'exécution du modèle deserialisation_manuelle.sql

🧠 Explications :

  • Il existe différentes syntaxes pour désérialiser du JSON, ici nous avons pris le partie d’utiliser la syntaxe dite des “deux-points”
  • La syntaxe générique est la suivante
    • Nom_de_colonne:attribut_racine:attribut_imbrique_1::cast_type
  • Si vous n’êtes pas à l’aise avec les mots-clefs dbt config et source, vous pouvez trouver davantage d’explications sur la documentation officielle dbt liée à la configuration des modèles et à la fonction source()
  • In a nutshell :
    • Block config => Permet de faire la configuration pour une table d’historique telle que décrite dans le scénario.
    • Block source => Permet de référencer la table source
Avantages Inconvénients
On choisit de désérialiser uniquement les informations de notre choix C’est fastidieux. La structure du JSON pris dans ce scénario est relativement petite, et c’était déjà pénible de rédiger uniquement ces six champs.
On peut renommer un attribut comme bon nous semble C’est rigide. En cas d’ajout d’un nouvel attribut au JSON, il sera ignoré. Il faudra passer par une étape de développement et de mise en production pour pouvoir considérer ce nouvel attribut.
On peut déclencher le cast de données dès l’historisation si on le souhaite
Ca marche et c’est simple à comprendre

Flatten option Recursive : La clef du dynamisme

📜 Dans la liste des fonctions SQL existantes, Snowflake met à disposition des développeurs la fonction Flatten, permettant de collecter diverses informations sur le contenu d’une colonne JSON.

🕵️ Ici, nous nous attarderons en particulier sur la colonne PATH, décrit par la documentation comme étant :

📚
PATH : Le chemin vers l’élément d’une structure de données qui doit être aplati.

🎯 L’objectif de l’exploitation de cette colonne PATH étant de constituer des chaînes de caractère que l’on pourra par la suite injecter dans une clause select.

✍️ Ainsi, la requête SQL ci-dessous nous permettra d’obtenir le formatage suivant :

select
	distinct path, 'donnee_brute:' || f.path || '::string as ' || lower(replace(f.path, '.', '_')) as select_element
from
	deplacement_presidentiel_raw,
	lateral flatten(input => donnee_brute, recursive => true, mode => 'object') f
alt_text
Résultat de la requête de formatage

🧠 Avec quelques précisions sur le paramètres de la fonction Flatten :

  • input => Indique la colonne contenant le JSON à analyser
  • recursive => Permet de drill down dans les nested objects.
    • Typiquement, sans ce paramètre, l’objet geo_point_2d serait resté un objet.
    • Ici, le paramètre recursive nous permet de récupérer unitairement la lat et la lon
  • mode => Permet de choisir de drill down uniquement dans les objects et d’ignorer les arrays. Pourquoi ?
    • Parce que flatten des arrays présente un risque de duplication de données
    • Parce que le path est plus complexe à exploiter.

Génération du select dynamique et injection du code dans la clause select

⌨️ Maintenant que nous avons toutes les clefs en main, il est temps de wrap-up tout ça dans un modèle deserialisation_dynamique.sql :

{{
    config(
        materialized="incremental",
        incremental_strategy='append',
) }}

{%- call statement('json_field_list', fetch_result=True) -%}
    select distinct 'donnee_brute:' || f.path || '::string as ' || lower(replace(f.path, '.', '_')) as select_element
    from
         {{ source("dbt", "deplacement_presidentiel_raw") }},
        lateral flatten(input => donnee_brute, recursive => true, mode => 'object') f
{%- endcall -%}

{%- set json_field_list = load_result('json_field_list')['data'] -%}

with
    get_source as (select * from {{ source("dbt", "deplacement_presidentiel_raw") }}),
    dynamic_deserialization as (
        select 
            {%- for json_field_name in json_field_list %}
                {{ json_field_name[0] }},
            {%- endfor %}
            sysdate() as date_histo
        from get_source
    )
select *
from dynamic_deserialization

deserialisation_dynamique.sql

🧠
Explications :
- Dans le cadre de ce scénario, nous cherchons à constituer une table d’historisation brute et structurée. Aussi, c’est la raison pour laquelle nous souhaitons empiler les résultats dans la table de destination via l’utilisation de la fonction append de la configuration incremental_strategy
- Le block call statement nous permet d’exécuter notre requête d’analyse du JSON en amont de l’exécution du modèle. Plus d’informations sur le block call statement dans la documentation officielle dbt.
- Le block set / load_result fonctionne conjointement avec le block call_statement et nous permet de stocker dans une variable le résultat de notre requête d’analyse du JSON.
- Le block for nous permet de lire notre variable précédemment définie, et ainsi, générer dynamiquement le contenu de notre clause select
Avantages Inconvénients
Peu importe le nombre d’attributs JSON, ainsi que la complexité de sa structure, le temps d’implémentation reste le même. Le code du modèle est, dans un premier temps, plus compliqué à appréhender que dans le cas d’une désérialisation manuelle, et peut faire peur.
On réduit le risque d’une faute de frappe dans la constitution de la clause select Les données en sortie sont toutes castées en string. C’est un inconvénient assumé.
  • Ici on cherche “juste” à structurer la donnée.
  • On veut éviter un plantage d’exécution lié à un mauvais type.
  • Cette étape de data quality pourra être réalisée à postériori dans une autre couche de notre architecture.
On conserve une cohérence forte entre la structure du JSON et les noms de colonne dans la table structurée. On ne maîtrise que partiellement la liste des données désérialisées, ce qui augmente les coûts de stockage.
  • Cependant, on rappelle que les coûts de stockage tendent à être de plus en plus accessibles.
  • A la date de rédaction de cet article, le coût du Terabyte sur Snowflake est estimé à $23/mois.
  • Dans l’absolu, une clause where / like dans la requête d’analyse du JSON permet de ne sélectionner que les attributs JSON répondant au pattern de nommage de notre choix
  • La solution proposée ne désérialise que les objects, pas les arrays. C’est un choix, à la fois technique et fonctionnel.
Les nested objects sont à la fois stockés en brut et en désérialisé : On peut faire notre choix à postériori de sélectionner ce qui nous arrange le plus d’utiliser.
On est résilient à l’évolution de la structure du JSON. C’est le sujet de la prochaine section de cet article.

Pépin : La structure du JSON évolue

🤯 Pour l’exemple, nous injecterons dans les données sources un JSON supplémentaire avec les modifications suivantes d’attributs :

  • Suppression des attributs :
    • lieu_contours
    • lieu_normalise
  • Ajout d’attribut
    • easter_egg

⌨️ Ci-dessous, le script SQL pour injecter ce JSON dans les données sources :

insert
	into
	deplacement_presidentiel_raw
select
	parse_json(column1)
from
values ('{
  "annee_du_voyage": "2013",
  "code_pays": "JPY",
  "contenu_initial": "Asie : Japon, du 01 Mars au 31 Mars 2013",
  "cote_du_dossier": "/=4£4<13(_)><",
  "date_de_debut": "2013-03-01",
  "fonction": "Falsificateur de fonction politique",
  "date_de_fin": "2013-03-31",
  "geo_point_2d": {
    "lat": 35.7040744,
    "lon": 139.5577317
  },
  "individu": "Paul Colinmaire",
  "lieu": "Tamagawa Aqueduct Greenway ",
  "pays": "Japon",
  "url_de_la_notice": "https://www.google.com/maps/@35.7040744,139.5577317,3a,90y,290.66h,74.25t/data=!3m6!1e1!3m4!1sgT28ssf0BB2LxZ63JNcL1w!2e0!7i13312!8i6656?coh=205409&entry=ttu",
  "easter_egg": "true"
}')

add_fake_data.sql

❓ En essayant d’exécuter notre modèle deserialisation_dynamique.sql, on s’aperçoit d’une bonne nouvelle : L’exécution du modèle reste fonctionnelle

  • Les attributs supprimés peuplent les colonnes existantes à NULL
  • Cependant, les attributs ajoutés manquent à la table de destination

⛰️ Si l’on tient à conserver une certaine stabilité dans notre table de destination, alors la solution déjà en place nous conviendra.

🪛 En cas de volonté d’intégrer les nouveaux attributs, un simple ALTER TABLE ADD COLUMN suffira à peupler la colonne à l’avenir.

🧑‍🔧 Ce sera, certes, une action de maintenance à réaliser en production, mais le code source dbt restera inchangé, ce qui nous évite une nouvelle procédure de mise en production.

🤔 Cependant, que faire si l’on souhaite que la structure de notre table de destination évolue dynamiquement avec l’évolution du JSON ?

La solution : L’option on_schema_change

🛠️ Pour ce faire, dbt implémente une configuration spécifique aux modèles incrémentaux appelée on_schema_changes.

🪛 Ainsi, en paramétrant cette configuration à la valeur append_new_columns, dbt sera en mesure de faire évoluer la table de destination en conservant les colonnes de destination existantes et dont les attributs ont été supprimés.

➕ Mais également, dbt ajoutera des colonnes en conséquence des attributs supplémentaires.
😮‍💨 Pour les données historiques déjà existantes, la valeur pour cette nouvelle colonne sera renseignée à NULL

⌨️ Ainsi, il suffit de remplacer le block config existant de notre modèle par le suivant :

{{
    config(
        materialized="incremental",
        incremental_strategy='append',
        on_schema_change='append_new_columns'
) }}

Block config à actualiser pour le modèle deserialisation_dynamique.sql

Nettoyage de l’environnement de travail

🧹 Un bon atelier étant un atelier rangé, on pense à nettoyer l’espace de travail une fois que l’on a terminé.

drop schema dbt;

Conclusion

💃 Dans cet article, nous avons réussi à proposer une solution élégante et adaptable à une problématique moderne fréquente dans le domaine du traitement de la donnée : La désérialisation de données semi-structurée.

⛓️ En combinant la puissance analytique de snowflake avec les capacités génératives de dbt, nous avons été en mesure de mettre en place un code propageable dans différents contextes et pouvant drastiquement réduire les temps de développement.

🧱 Bien évidemment, cette solution se présente comme une brique technique unitaire à inclure dans une architecture plus globale.

🪜 On peut notamment envisager une étape suivante cherchant à effectuer un contrôle qualité des données avant de réaliser les cast de types.
Faites nous savoir en commentaire si un article à ce sujet pourrait vous intéresser.

📈 Également, en cas d’utilisation de la configuration append_new_columns, se pose la question du monitoring de l’évolution de la structure de la table.

💊 La panacée n’existe pas, et de nombreuses questions et problématiques restent en suspens et donnent matière à réfléchir, mais nous pouvons tout de même nous réjouir de cette proposition d’implémentation de désérialisation dynamique.

❄️ Good Luck & Have Fun pour gravir les montagnes de données et atteindre les snowflakes éternels à leurs cîmes.

💖 From Data, with love.