Introduction à RxJS

Si vous êtes un développeur débutant dans l’écosystème Angular, ou même si vous en avez déjà une certaine expérience, la partie que vous trouverez la plus difficile à comprendre est très probablement RxJS.

Ceci est dû au fait que RxJS et la programmation réactive en général ont une courbe d’apprentissage abrupte, ce qui rend difficile sa compréhension en s’y lançant tête baissée. Au lieu de cela, nous devons commencer par le commencement et apprendre d’abord quelques concepts de base sur la conception réactive.

Dans un premier temps, nous allons découvrir ce qu’est RxJS. Ensuite nous allons voir un peu plus en profondeur la notion de stream (flux de données) qui transite au travers des observables. De plus, nous verrons comment “souscrire” à nos observables et enfin comment transformer nos données via des operators.

RxJS, qu’est-ce que c’est ?

RxJS est une bibliothèque de programmation réactive utilisant des Observables pour faciliter la composition de code asynchrone.

Quelle est l’utilité de RxJS ?

ReactiveX combine l’Observer pattern avec l’Iterator pattern et la programmation fonctionnelle avec des collections pour répondre au besoin de gérer les séquences d'événements.

Un événement se produit par exemple lorsque l'utilisateur ou le navigateur modifie d’une quelconque façon une page. Lorsque la page se charge, cela s'appelle un événement. Lorsque l'utilisateur clique sur un bouton, ce clic est également un événement.

Les concepts essentiels de RxJS qui résolvent la gestion des événements asynchrones sont :

  • Observable : représente l’idée d’une collection invocable de valeurs ou d’événements futurs. (Équivalent aux streams)
  • Observer : est une collection de callbacks qui sait écouter les valeurs délivrées par l’Observable.
  • Subscription : représente l’exécution d’un Observable, est principalement utile pour annuler l’exécution.
  • Operators : sont des fonctions pures qui permettent un style de programmation fonctionnelle pour traiter les collections avec des opérations telles que map, filter, concat, reduce, etc.

Schéma d'observable avec plusieurs opérateurs

  • Subject : équivaut à un EventEmitter et est le seul moyen de multicaster une valeur ou un événement à plusieurs observateurs.

Schéma de subject notifiant plusieurs observables

Les streams

Nous allons introduire l’une des premières notions de base pour la compréhension de RxJS. Il s’agit de la notion de stream (flux) de données.

Souvent, les développeurs plongent directement dans la notion d’observables sans comprendre la notion de stream en premier. Mais c’est le meilleur point de départ pour ensuite bien comprendre les observables.

Comme vous le savez, dans un programme JavaScript, la notion d’asynchrone est fondamentale.

Listons quelques exemples de cas asynchrones que nous pouvons rencontrer :

  • Nous pouvons avoir des requêtes API à effectuer pour récupérer de la donnée d’un back-end.
  • Nous pouvons avoir des délais d’attente qui se produisent dans le front-end.
  • Nous avons des interactions d’utilisateurs via des événements clavier et souris.

Ce sont tous des événements asynchrones que nous devons traiter/combiner pour le bon fonctionnement de notre programme.

Voyons donc quelques exemples de streams.

document.addEventListener('click', 
    evt => console.log(evt);
);

Interceptons chaque clic de l’utilisateur sous forme de stream de données et affichons le résultat :

MouseEvent {isTrusted: true, screenX: 1008, screenY: 177, clientX: 228, clientY: 103, ...}
MouseEvent {isTrusted: true, screenX: 906, screenY: 197, clientX: 326, clientY: 104, ...}
...

Une série d’événements s’imprime à l’écran.

Donc, ce que nous pouvons observer ici, à chaque fois que nous cliquons sur la souris, est qu’un exemple d’un flux de valeur est émis.

Un autre exemple que nous trouvons souvent dans les programmes JavaScript est l’intervalle JavaScript, qui est périodiquement exécuté au moment de l’exécution.

let counter = 0;

setInterval(() => {
    console.log(counter);
    counter++;
}, 1000);

setTimeout(() => {
    console.log("finished...");
}, 3000);

Nous avons un premier flux de données qui, à chaque seconde, augmentera de 1 notre variable counter et un second flux de données qui, au bout de 3 secondes, affichera le message "finished...".

1
2 
3 
finished...
...

En effet, nous avons ici le second flux de données (setTimeout) qui a émis une valeur et le flux s’est terminé, ce qui signifie qu’il n’émettra plus de nouvelle valeur.

Cependant, le flux d’intervalle émet des valeurs en permanence. Donc ce flux est un flux à valeurs multiples : il continuera à émettre des valeurs au fil du temps et il ne se terminera jamais.

Le premier stream émet des valeurs en permanence et ne se termine jamais, tandis que le second émet une valeur et se termine.

C’est donc une notion importante. Les streams peuvent se compléter ou ne jamais se compléter.

Les observables

Nous avons défini ce qu’est un flux de données, par notre exemple de l'intervalle de temps.

Créons maintenant ici un observable intervalle. Nous pouvons faire cela en utilisant la méthode interval et en spécifiant l’intervalle de temps en millisecondes.

const interval$ = interval(1000);
Une bonne pratique est de nommer un observable de la sorte : maVariable$ avec un signe dollar à la fin du nom de variable pour indiquer qu’il s’agit d’un observable.

interval est un observable de nombre (Observable<number>). Cet observable émet donc des nombres.

Une fois que notre Observable est défini, nous devons préciser ce que nous devons en faire. Tant que l’observable n’est pas “utilisé”, nous ne constaterons pas de flux de valeurs.

La déclaration d’un observable est donc une définition d’un flux de valeurs. C’est comme un schéma de la façon dont le flux se comportera si nous l’instancions.

Pour que l’observable devienne un flux de valeurs, nous devons y souscrire avec la méthode subscribe. Une fois l’observablesubscribed”, nous pouvons enfin manipuler les données qui transitent via l’observable et les imprimer à l’écran par exemple :

const interval$ = interval(1000);
interval$.subscribe(val => console.log("stream => " + val));

Ce qui affichera le résultat suivant :

stream => 0
stream => 1
stream => 2
stream => 3
...

Méthode subscribe

this.http.get<Plant[]>(`${this.apiUrl}/plants`)
    .subscribe(
        (plants: Plants[]) => console.log(plants),
        (err) => throw err,
        () => console.log('finished'),
    );

Comme vu précédemment, un observable est un “plan d’exécution” pour un stream. Lorsque la donnée va transiter dans le stream, une succession d’opérations pourra être effectuée sur cette donnée.

La méthode subscribe va nous permettre d’activer le stream, c’est un peu comme un robinet qui permet d’activer ou désactiver le flux. Cette méthode subscribe dispose de quelques arguments supplémentaires que nous pouvons lui fournir :

  • Le 1er argument (obligatoire) est une fonction callback via laquelle transitent toutes les valeurs de l’observable.
  • Le 2ème argument (optionnel) est une fonction callback de gestion d’erreur.
  • Le 3ème argument (optionnel) est une fonction callback appelée lorsque l’observable s’arrête.

Schéma de souscription à un observable avec la méthode subscribe

Il est important de comprendre qu’un stream peut rencontrer une erreur. Nous pourrions avoir un flux qui représente une série d'appels que nous faisons à un back-end pour récupérer des données périodiquement. Un appel API peut retourner une erreur, et c’est dans ce cas précis que le second argument de subscribe est utile. Nous pouvons par exemple déterminer qu’en cas d’erreur, nous allons afficher un message d’erreur à l’utilisateur pour l’informer du problème rencontré.

Comme nous l’avions vu, un observable peut se terminer ou durer indéfiniment. Dans le cas où il peut se terminer, on pourra exécuter une dernière action avant que l’observable ne se complète. Enfin, nous avons la méthode unsubscribe qui permet l’arrêt du flux dans l’observable.

let homeViewSubscription = null;

function onEnterView() {
    homeViewSubscription = obs$.subscribe(data =>
        doSomethingWithDataReceived(data));
}

function onLeaveView() {
    homeViewSubscription.unsubscribe();
}
subscribe, c’est le robinet qui active un flux, et unsubscribe permet de le désactiver

unsubscribe est une méthode qui se trouve directement dans l’objet de type Subscription qui est retourné par la méthode subscribe. Il suffit de l’appeler au moment souhaité pour provoquer l’arrêt de l’observable.

Les operators

RxJS est surtout utile pour ses opérateurs, même si les Observables en sont la base. Les opérateurs sont des éléments essentiels qui permettent de composer facilement un code asynchrone complexe de manière déclarative.

Qu’est-ce qu’un opérateur ?

Les opérateurs sont des fonctions. Il en existe deux types :

  • Les opérateurs pipeables qui peuvent être redirigés vers des observables à l’aide de la syntaxe observableInstance.pipe(operator()). Ceux-ci incluent map(), pluck(), et bien d’autres. Lorsqu’ils sont appelés, ils ne modifient pas l’instance Observable existante. Au lieu de cela, ils renvoient un nouvel Observable, dont la logique d’abonnement est basée sur le premier Observable. Un opérateur Pipeable est une fonction qui prend un Observable en entrée et renvoie un autre Observable. C’est une opération pure : l’Observable précédent reste inchangé. Les opérateurs de création qui peuvent être appelés en tant que fonctions autonomes pour créer un nouvel observable. Par exemple : of(1, 2, 3) crée un observable qui émettra 1, 2 et 3, l’un après l’autre.

Quelques Operators pour débuter

of (création)

Émet une quantité variable de valeurs dans une séquence.

// RxJS v6+
import { of } from 'rxjs';
// emits any number of provided values in sequence
const source = of(1, 2, 3, 4, 5);
// output: 1,2,3,4,5
const subscribe = source.subscribe(val => console.log(val));

from (création)

Transforme un tableau, une promise ou un itérable en un observable.

// RxJS v6+
import { from } from 'rxjs';//emit array as a sequence of values
const arraySource = from([1, 2, 3, 4, 5]);
// output: 1,2,3,4,5
const subscribe = arraySource.subscribe(val => console.log(val));

map (pipeable)

Applique une fonction donnée à chaque valeur émise par l’Observable source et émet les valeurs résultantes en tant qu’Observable.

Diagrame de bille de l'opérateur map

// RxJS v6+
import { from } from 'rxjs';
import { map } from 'rxjs/operators';
// emit (1,2,3,4,5)
const source = of([1, 2, 3, 4, 5]);
// add 10 to each value
const example = source.pipe(map(val => val * 10));
// output: 10,20,30,40,50
const subscribe = example.subscribe(val => console.log(val));

combineLatest (pipeable)

Crée un observable qui combine les dernières valeurs de tous les observables passés et de la source dans des tableaux et les émet.

Diagrame de bille de l'opérateur combineLastest

// RxJS v6+
import { timer, combineLatest } from 'rxjs';
// timerOne emits first value at 1s, then once every 4s
const timerOne$ = timer(1000, 4000);
// timerTwo emits first value at 2s, then once every 4s
const timerTwo$ = timer(2000, 4000);
// timerThree emits first value at 3s, then once every 4s
const timerThree$ = timer(3000, 4000);// when one timer emits, emit the latest values from each timer as an array
combineLatest(timerOne$, timerTwo$, timerThree$).subscribe(
    ([timerValOne, timerValTwo, timerValThree]) => {
        /*
            Example:
          timerThree first tick: 'Timer One Latest: 0, Timer Two Latest: 0, Timer Three Latest: 0
          timerOne second tick: 'Timer One Latest: 1, Timer Two Latest: 0, Timer Three Latest: 0
          timerTwo second tick: 'Timer One Latest: 1, Timer Two Latest: 1, Timer Three Latest: 0
        */
        console.log(
           `Timer One Latest: ${timerValOne},
            Timer Two Latest: ${timerValTwo},
            Timer Three Latest: ${timerValThree}`
        );
    }
);

merge (pipeable)

Fusionne différentes sources d’observable pour n’en faire qu’un observable.

Diagrame de bille de l'opérateur merge

// RxJS v6+
import { mapTo } from 'rxjs/operators';
import { interval, merge } from 'rxjs';
// emit every 2.5 seconds
const first = interval(2500);
// emit every 2 seconds
const second = interval(2000);
// emit every 1.5 seconds
const third = interval(1500);
// emit every 1 second
const fourth = interval(1000);//emit outputs from one observable
const example = merge(
    first.pipe(mapTo('FIRST!')),
    second.pipe(mapTo('SECOND!')),
    third.pipe(mapTo('THIRD')),
    fourth.pipe(mapTo('FOURTH'))
);
// output: "FOURTH", "THIRD", "SECOND!", "FOURTH", "FIRST!", "THIRD", "FOURTH"
const subscribe = example.subscribe(val => console.log(val));

Pour conclure

RxJS est un outil remarquable pour la gestion asynchrone et les évènements. De plus, l’utilisation du declarative pattern permet une très bonne visibilité du code.

Voici quelques exemples impressionnants de mini-jeux que certains ont pu réaliser avec RxJS :