Delta Lake : la taille compte

Delta Lake nous offre le confort d’un niveau d’abstraction similaire à celui d’une BDD relationnelle. En retrouvant nos vieilles habitudes du monde SQL, nous avons tendance à ne plus prendre en considération la façon dont les données sont stockées dans des fichiers, alors que les performances (surtout en lecture) en dépendent énormément. Contrairement aux SGBD classiques et aux solutions de Data Warehouse, l’optimisation du stockage d’une table Delta Lake est la responsabilité des ingénieurs qui mettent en place les pipelines d'alimentation.

Si le bon réflexe de partitionner correctement une table Delta Lake est plutôt acquis par la majorité des ingénieurs Data, l’optimisation de la taille des fichiers Parquet sous-jacents est souvent négligée. D’autant plus que ce n’est pas toujours une tâche triviale.

L’objectif de cet article est de vous montrer les différentes techniques pour maîtriser la taille des fichiers d’une table Delta Lake en fonction de la façon dont elle est alimentée (append, overwrite, merge) et dont elle est structurée (partitionnée ou pas).

L’article ne parlera pas des autres optimisations au stockage possibles comme z-ordering, bucketing ou partitioning by range.

Quelle est la taille optimale ?

Toute optimisation est une question d’équilibre entre différentes contraintes. Je vais donc vous expliquer quelles contraintes il faut prendre en compte, et cela sera à vous de déterminer la taille optimale des fichiers pour votre usage.

Mises à jour de données

Delta Lake utilise exclusivement l’approche copy on write (contrairement à Apache Hudi qui offre le choix entre copy on write et merge on read) pour mettre à jour les données dans la table. Cela veut dire que chaque mise à jour des données implique la création des copies complètes des fichiers dans lesquelles elles sont stockées. Imaginez que vous mettez à jour une seule ligne de la table qui est stockée dans un fichier Parquet de 1GB avec un million d’autres lignes, Delta Lake va créer un nouveau fichier de 1GB avec une ligne modifiée et un million de lignes copiées à l’identique. Cela veut donc dire que plus les fichiers seront gros plus les petites mises à jour seront coûteuses en termes de stockage et de trafic réseau.

Pour économiser l'espace/coût de stockage vous pouvez être tentés de faire l’opération vacuum(0) régulièrement sur votre table, mais je vous rappelle que ce n’est absolument pas recommandé ! La suppression de tout l’historique vous prive de la fonctionnalité time travel et de la possibilité d’éventuellement faire un rollback. Cela peut même corrompre vos données si d’autres processus sont en train d’écrire dans la table. Ce n’est pas pour rien qu’il y a un garde-fou désactivable qui n’autorise pas de supprimer l’historique inférieure à 168 heures (7 jours). La profondeur de l’historique à garder doit être supérieure à la durée du traitement le plus long mettant à jour les données dans la table.

Stockage et lecture de données

Peu importe que vous utilisiez un filesystem distribué (ex. HDFS) ou un service de stockage objet dans le Cloud (ex. AWS S3), la fragmentation de vos données en très grand nombre de petits fichiers est néfaste.

Sur HDFS cette fragmentation exerce une pression sur la RAM de son Namenode, d’où la recommandation de créer des fichiers dont la taille est supérieure à la taille d’un bloc HDFS (128MB par défaut).

Dans le Cloud ce sont des requêtes sur le service de stockage objet (surtout GET et LIST) qui vont poser des problèmes, elles vont notamment générer des coûts importants. Tandis que vous pouvez estimer assez facilement les coûts directs qui seront proportionnels au nombre de fichiers, les coûts indirects sont moins évidents à anticiper. La baisse des performances générales des traitements (Spark, Athena, etc.) liée au surcoût de lecture des métadonnées de chaque fichier va rendre votre facture plus salée.

Le driver du job Spark doit gérer autant de tasks dans le premier stage que de fichiers à traiter. Un nombre trop grand de fichiers exerce donc une pression trop importante sur le scheduler du driver. Le premier signe observable de faiblesse provoqué par ce phénomène est l'apparition de l’erreur Out Of Memory du driver (Attention ! OOM du driver peut également être provoquée par les problèmes liés à votre code, comme l’utilisation imprudente de collect(), par exemple). Vous pouvez bien sûr augmenter la mémoire du driver, mais à partir d’un certain nombre de fichiers cela ne va plus suffire, le job va être en erreur. D’autre part, si les données sont trop condensées, les fichiers volumineux exercent également une pression sur la RAM, celle des executors cette fois-ci. Le faible nombre de fichiers réduit la parallélisation, ce qui implique une sous-exploitation des ressources disponibles dans le cas où il est inférieur au nombre de slots (nombre total de CPUs des executors).

Une recommandation quand même ?

Sachant que dans la plupart des cas on optimise une table Delta Lake pour la lecture (requêtes analytiques), la taille des fichiers souvent recommandée est de 1GB. La commande OPTIMIZE disponible dans l’environnement Databricks vise cette taille par défaut pour les fichiers Parquet. Vous trouverez d’ailleurs des recommandations similaires dans la documentation de Athena/Presto.

Opération de compactage

Le moyen le plus simple de garder la taille optimale des fichiers Parquet de votre table Delta est d’effectuer une opération de compactage (commande OPTIMIZE de Databricks ou votre propre implémentation) de manière récurrente (synchrone ou asynchrone) suite à des opérations d’écriture. Cette mécanique est valide pour tout mode d’alimentation de la table. En revanche, elle engendre un coût non négligeable (compute, stockage et trafic réseau).

En effet, cette opération consiste à lire (charger sur le cluster) et réécrire tout ou partie des fichiers Parquet de la table. Les fichiers d’origine occupent l’espace de stockage jusqu’à ce qu’ils soient supprimés par l’opération VACUUM.

Il est souvent plus judicieux d’écrire directement les fichiers de bonne taille au lieu de les réécrire à posteriori. Dans les paragraphes qui suivent je vous présente les différentes techniques pour le faire en fonction du mode d’alimentation de la table.

Append

Si vous alimentez la table en mode append uniquement par des petits batches ou en streaming (micro batch), votre seule solution sera de programmer une opération de compactage. C’est à vous de déterminer la fréquence optimale pour votre use case en tenant compte du coût de cette opération, de la fréquence des batches et du niveau de performance souhaité.

Il est bien sûr possible d’effectuer cette opération de manière synchrone après chaque opération d’écriture dans votre code ou en configurant l’auto-compaction de la table si vous êtes dans l’environnement Databricks. Vu que cette opération a un coût (stockage et compute) considérable, il vaut mieux opter pour le mode asynchrone.

Overwrite

Table non partitionnée

Dans le cas où la table est alimentée en mode full ou en append mais par des gros batches, il vaut mieux configurer le job pour qu’il écrive directement les fichiers de la bonne taille.

Si vous connaissez le volume de données, vous pouvez très facilement estimer le nombre de fichiers Parquet à écrire en fonction de la taille souhaitée. Il suffira donc de faire une transformation coalesce(n) (ou repartition(n) s’il est nécessaire de rééquilibrer les partitions) sur le DataFrame avant de l’écrire.

Dans le cas où le volume de données n’est pas connu d’avance, ou s’il est variable, cette technique simple n’est plus applicable. Dans ce cas, la solution est de configurer finement la fonctionnalité adaptive coalesce disponible à partir de Spark 3.0. Voici l’exemple de configuration qui m’a permis d’obtenir les fichiers Parquet proche de 1GB pour un job implémenté en Spark 3.0.1 :

spark.sql.adaptive.enabled = true
spark.sql.adaptive.coalescePartitions.enabled = true
spark.sql.adaptive.advisoryPartitionSizeInBytes = 2G
spark.sql.adaptive.coalescePartitions.minPartitionNum = 1

Notez que la taille de partition cible est de 2GB vu qu’il s'agit de sa taille en RAM, le fichier Parquet sera toujours plus petit car les données sont compressées.

Faites néanmoins attention à ne pas trop dégrader les performances de votre job en réduisant le niveau de parallélisme.

Notez également que deux nouveaux paramètres pour configurer adaptive coalesce ont été ajoutés à partir de Spark 3.2.0 :

spark.sql.adaptive.coalescePartitions.parallelismFirst = true (par défaut)
spark.sql.adaptive.coalescePartitions.minPartitionSize = 1M (par défaut)

L’option spark.sql.adaptive.coalescePartitions.parallelismFirst dont la valeur par défaut est true fait ignorer par Spark la valeur du paramètre spark.sql.adaptive.advisoryPartitionSizeInBytes et respecter uniquement la valeur de spark.sql.adaptive.coalescePartitions.minPartitionSize. Il est donc nécessaire soit de désactiver le mode parallelism first, soit d'utiliser le paramètre spark.sql.adaptive.coalescePartitions.minPartitionSize pour obtenir les partitions de la taille souhaitée.

Table partitionnée

Les choses se compliquent légèrement si votre table est partitionnée. Imaginez que vous écriviez les données dans une table dont la clé de partition a une cardinalité de 100. Dans ce cas, chaque partition obtenue suite à des transformations va être découpée en 100 morceaux par le writer (écrite dans 100 fichiers Parquet) si les données sont distribuées aléatoirement dans les partitions. Cela veut dire que même si vous avez configuré auto coalesce vous vous retrouvez quand même avec des fichiers Parquet 100 fois plus petits que souhaité.

La solution dans ce cas sera de ne pas tuner auto coalesce mais de re-partitionner le DataFrame par la clé de partition de la table avant de l’écrire. Exemple pour une table partitionnée par date :

df
 .repartition($"date")
 .write
 .partitionBy("date")
 .format("delta")
 .mode("overwrite")
 .save(<path>)

Le problème est que la répartition par clé va créer une nouvelle partition pour chaque valeur de la clé sans limite de taille. On risque donc de se retrouver avec des fichiers Parquet beaucoup trop gros. La solution est de limiter le nombre de lignes écrites dans un fichier. Exemple :

spark.sql.files.maxRecordsPerFile = 5000000

Le rapport entre le nombre de lignes et la taille du fichier Parquet est difficile à estimer. Il va dépendre du nombre de colonnes, de leurs types, de leurs cardinalités et donc du taux de compression. Vous pouvez déterminer la valeur de ce paramètre de façon empirique en faisant plusieurs essais.

Merge

Table non partitionnée

L’opération merge fait des jointures entre les données présentes dans la table cible et celles provenant du DataFrame source. La solution qui consiste à tuner auto coalesce est donc parfaitement valable pour le merge.

Table partitionnée

Cela se complique lorsque la table est partitionnée pour les mêmes raisons que dans le cas de overwrite. Contrairement à ce dernier, le niveau d’abstraction de l’opération merge ne nous permet pas d'actionner le re-partitionnement des données par la clé de partition de la table avant l’écriture comme on peut le faire sur un DataFrame. Cela est faisable uniquement via ce paramètre de configuration Spark pour toutes les opérations merge de la session :

spark.databricks.delta.merge.repartitionBeforeWrite.enabled = true

Conclusion

Lorsque vous constatez une dégradation des performances d’une table Delta Lake, vous êtes probablement face à un problème de taille… Ne soyez pas complexés, vous avez désormais tous les outils nécessaires pour le résoudre.