Spark : Dataset c'est mal

Il m’arrive souvent de me lancer dans un débat sur l’API Dataset où je défends la thèse qu’il ne faut surtout pas l’utiliser dans les projets Spark. Je constate qu’il y a souvent une mauvaise compréhension de la manière dont cette API fonctionne et certaines sources la mettent au sommet de l'évolution de Spark, comme ce bouquin par exemple. Le fait que techniquement DataFrame n’est qu’un cas particulier de Dataset (type DataFrame = Dataset[Row]) alors qu’en réalité c’est plutôt l'inverse, ajoute encore de la confusion. J’ai donc décidé de donner mes arguments anti-Dataset dans cet article une fois pour toute.

Je vous assure, je n’ai rien contre la vérification à la compilation, bien au contraire, je me sens beaucoup plus à l’aise avec un langage fortement typé et un compilateur bien sévère qu’avec un certain langage de script devenu excessivement populaire dans le monde de la Data. J’adore Scala (même si je ne le maîtrise pas aussi bien que j'aurais voulu) ! Et oui, moi aussi j’étais tombé dans le piège, dans mon premier projet Spark j'utilisais exclusivement l’API Dataset. C’est bien pour cela que j’ai pu découvrir ses défauts qui pour moi étaient rédhibitoires.

Vous pouvez exécuter tous les exemples de code de cet article dans la console spark-shell.

Que peut-on vraiment vérifier à la compilation ?

La vérification des types à la compilation est l’argument principal de tous ceux qui plaident en faveur de Dataset. Rappelez-vous que DataFrame est un Dataset, rien ne nous oblige donc à écrire les transformations sous forme de fonctions Scala.

Ce bout de code, par exemple, compilera parfaitement alors qu’on essaie d’utiliser une colonne qui n’existe pas dans notre case class, l’erreur ne se produira qu’à l'exécution :

scala> import spark.implicits._

scala> case class Employee(name: String, salary: String)

scala> val employeeDS = Seq(Employee("Boris", "30000"), Employee("Jack", "130K")).toDS()

scala> employeeDS.select("bonus").show()
org.apache.spark.sql.AnalysisException: cannot resolve '`bonus`' given input columns: [name, salary];

Pour que le compilateur détecte l’erreur il faudrait utiliser la fonction map() pour réaliser cette transformation :

scala> employeeDS.map(_.bonus).show()
<console>:50: error: value bonus is not a member of Employee
       employeeDS.map(_.bonus).show()

Faisons une autre opération qui n’a pas de sens, essayons de multiplier le salaire par deux (belle augmentation !) alors que le champ salary est de type String :

scala> employeeDS.withColumn("salary", $"salary" * 2).show()
+-----+-------+
| name| salary|
+-----+-------+
|Boris|60000.0|
| Jack|   null|
+-----+-------+

On constate que cette opération est tout à fait valable pour Spark, il n’y a aucune exception, mais le résultat peut paraître surprenant (pauvre Jack !).

Regardons maintenant si le compilateur peut nous mettre en garde contre cette opération douteuse dans le cas où on l’écrit sous forme de fonction Scala :

scala> employeeDS.map{employee => employee.copy(salary = employee.salary * 2)}.show()
+-----+----------+
| name|    salary|
+-----+----------+
|Boris|3000030000|
| Jack|  130K130K|
+-----+----------+

Et non, le code est parfaitement valide car en Scala la classe String est enrichie d’une méthode def *(n: Int): String qui crée une nouvelle valeur String en concaténant celle d’origine n fois.

Pour écrire du code fonctionnel et robuste, faites-le avec des tests unitaires !

Surcoût encode/décode

Contrairement à ce qu’on pourrait penser, Spark ne manipule pas les objets de votre classe juste parce que vous avez créé un Dataset. Les données résident au format Tungsten (types optimisés et propres à Spark), les instances de votre classe ne sont créées qu’au moment où elles sont passées à la fonction Scala/Java que vous avez définie pour la transformation. Les résultats retournés par la fonction sont à nouveau convertis dans les types spécifiques de Spark. Cette conversion génère un surcoût et dégrade par conséquent les performances.

Si les fonctions intégrées de Spark ne vous suffisent pas et que vous avez vraiment besoin de toute la puissance d’un langage impératif, il vaut mieux utiliser une UDF. Elle fera également cette conversion des types mais uniquement sur les colonnes nécessaires à la transformation.

Pas de projection

La mécanique décrite ci-dessus a une autre conséquence bien plus grave à mon avis qu’une légère baisse de performance. Si aucune transformation nécessitant l’instantiation de votre classe n’est réalisée sur le Dataset, Spark ne fait pas de projection. Le Dataset[A] peut donc contenir plus de colonnes que d’attributs dans la classe A. Vous ne pouvez donc pas vous fier à la signature d’une méthode qui retourne un Dataset sans voir son implémentation.

Regardons cet exemple de code :

scala> import java.nio.file.Paths
scala> import java.util.Base64
scala> import java.nio.file.Files
scala> import org.apache.spark.sql.Dataset

scala> Files.write(Paths.get("/tmp/users.csv"), Base64.getDecoder.decode("ImZpcnN0X25hbWUiLCJsYXN0X25hbWUiLCJhZGRyZXNzIiwicGhvbmVfbnVtYmVyIgoiQm9yaXMiLCJQZXJldmFsb3YiLCIyIHJ1ZSBWYXZpbG92LCBUb21zaywgVVJTUyIsIis3MzgyMjI1OTM2NSI="))

scala> case class User(first_name: String, last_name: String)

scala> def loadUsers(): Dataset[User] = spark.read.option("header", "true").csv("/tmp/users.csv").as[User]

scala> val users = loadUsers()

scala> users.show(false)

+----------+---------+--------------------------+------------+
|first_name|last_name|address                   |phone_number|
+----------+---------+--------------------------+------------+
|Boris     |Perevalov|2 rue Vavilov, Tomsk, URSS|+73822259365|
+----------+---------+--------------------------+------------+

La fonction loadUsers() retourne un Dataset[User], la classe User contient deux attributs et vous êtes persuadés que vous n'exposez que ces informations-là en déclenchant une action (ici juste un show, un writer se comportera de la même manière), alors que vous avez divulgué les données personnelles.

Vous pouvez bien sûr forcer la création des instances la class User et donc la projection de cette manière-là :

scala> users.map(x => x).show(false)
+----------+---------+
|first_name|last_name|
+----------+---------+
|Boris     |Perevalov|
+----------+---------+

Cette technique est plus que douteuse pour des raisons de lisibilité de code et de performances. Un développeur qui ne connaît pas cette particularité de Dataset va s'interroger au sujet de votre santé mentale en lisant ce code. En termes de performances, non seulement cela génère un surcoût de conversion des types et d’instanciation des objets, mais cette technique empêche Spark d’optimiser le plan d'exécution en rendant impossible d’appliquer la projection au plus tôt (ou même directement à la source de données).

Pas de pushdown/partition filter

Comparons les plans d’exécution de ces deux transformations très similaires en termes syntaxe :

scala> users.filter($"first_name" === "Boris").explain()

== Physical Plan ==
*(1) Project [first_name#16, last_name#17, address#18, phone_number#19]
+- *(1) Filter (isnotnull(first_name#16) AND (first_name#16 = Boris))
   +- FileScan csv [first_name#16,last_name#17,address#18,phone_number#19] Batched: false, DataFilters: [isnotnull(first_name#16), (first_name#16 = Boris)], Format: CSV, Location: InMemoryFileIndex[file:/tmp/users.csv], PartitionFilters: [], PushedFilters: [IsNotNull(first_name), EqualTo(first_name,Boris)], ReadSchema: struct<first_name:string,last_name:string,address:string,phone_number:string>


scala> users.filter(_.first_name == "Boris").explain()

== Physical Plan ==
*(1) Filter $line31.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$3528/720155857@73ce275a.apply
+- FileScan csv [first_name#16,last_name#17,address#18,phone_number#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/tmp/users.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<first_name:string,last_name:string,address:string,phone_number:string>

On constate que dans le premier cas le plan contient PushedFilters:[IsNotNull(first_name), EqualTo(first_name,Boris)] et dans le deuxième il contient PushedFilters:[]. Le premier filtre exprimé avec la syntaxe DataFrame a donc pu être envoyé à la source (ici purement théorique car le format CSV ne le supporte pas), ce qui n'est pas le cas de celui exprimé avec la syntaxe Dataset.

Cela peut paraître surprenant de prime abord, mais c’est tout à fait logique : Spark ne peut pas interpréter le filtre exprimé sous forme d’une fonction Scala et donc l’envoyer à la source de données.

Bien évidemment, si les données sont partitionnées, le mécanisme de partition filter, qui réduit drastiquement le volume de données à scanner en visant directement les bons sous-dossiers, subira le même sort avec Dataset. Vous pouvez le vérifier vous-même.

Ces deux lignes de code très similaires représentent donc des mécanismes très différents. Le fait d’effacer des optimisations si importantes telles que pushdown filter et partition filter est à mon avis une faute grave et rédhibitoire. Carton rouge à Dataset !

Conclusion

L’idée de mélanger les paradigmes de programmation impératif et déclaratif en laissant le choix au développeur ne pouvait rien donner de bon à mon avis. On peut facilement écrire du code qui n’est ni optimisable par le moteur Spark ni garanti par un typage fort.

Si l’API Dataset a autant de défauts, faut-il la bannir totalement ? Ma réponse est non, je lui ai trouvé une utilisation dont je vais parler dans mon prochain article.