RxJS

RxJS (pour Reactive Extensions for JavaScript) est l’implémentation javascript de ReactiveX. ReactiveX est une API basée sur le pattern Observer et la programmation fonctionnelle pour gérer des événements asynchrones. ReactiveX est activement développé par Microsoft.

Aujourd’hui, RxJS est largement utilisée dans Angular 2, en particulier dans HTTP et EventEmitter. RxJS5 est pour l’instant en bêta et c’est ce qui est utilisé dans Angular2.

Jusqu’à maintenant les problématiques asynchrones en JavaScript étaient gérées principalement via le pattern Promise ou les EventEmitter. RxJS apporte une nouvelle approche qui permet de tirer partie du meilleur de ces deux approches.

Les Observables en bref

Les Observables sont un des concepts clés de RxJS et il est intéressant de se familiariser avec eux puisqu’ils sont introduits dans ES7. Un Observable est un producteur de données qui peut être Observer. On le mettra sous observation avec la méthode subscribe et cette observation sera exécutée par un Observer.

var observable = Rx.Observable.create(function(observer)  { observer.next("one result"); }).subscribe(e => console.log(e));

one result

Un équivalent avec les promesses donnerait quelque chose comme ça :

var promise = new Promise(function(resolve, reject) {   resolve("one result"); }).then((e) => console.log(e));

one result

Il peut également produire des données de façon asynchrone.

Rx.Observable.create(function(observer) {   setTimeout(() => observer.next("aaa"), 700);   setTimeout(() => observer.next("bbb"), 400);   setTimeout(() => observer.next("ccc"), 300);   setTimeout(() => observer.next("ddd"), 600);   setTimeout(() => observer.next("eee"), 100);   setTimeout(() => observer.next("fff"), 1100); }) .take(4) .subscribe(e => console.log(e));

eee > ccc > bbb > ddd

.take(4) nous permet de ne prendre que les quatre premières valeurs renvoyées par l’Observable.

Jusqu’ici on ne gagne pas grand chose à utiliser les Observables plutôt que les promesses. Mais maintenant regardons un peu plus ce que les Observables permettent de faire.

Stream et programmation fonctionnelle

Il est possible de merger des Observables, ce qui nous permet d’écouter plusieurs Observables avec un même subscriber. On peut également écouter un même Observable avec plusieurs subscribers par l’intermédiaire des Subjects. Il est également possible de faire les deux à la fois. Nous verrons plus loin dans cet article comment faire cela. Ce sont toutes ces possibilités offertes par les Observables qui font leur force.

L’une des différences notable entre les promesses et les Observables est que les promesses sont très utiles pour gérer les opérations asynchrones qui ne vont générer qu’une seule valeur, comme lorsque l’on fait une requête Ajax. Les Observables, en revanche, peuvent gérer une infinité de valeurs de retour.

Rx.Observable.create(function(observer) {   setTimeout(() => observer.next("valeur A"), 700);   setTimeout(() => observer.next("valeur B"), 400); }) .subscribe(e => console.log(e));

valeur B > valeur A

Ceci est possible parce que les Observables nous fournissent des streams. Et cela nous amène à l’un des aspects très intéressant de RxJS. RxJS met à notre disposition tout l’arsenal de la programmation fonctionnelle.

Rx.Observable.of(1, 2, 3, 4, 5, 6)  .filter(e => e % 2 == 0 )  .do((e) => console.log("filtred value: " + e))  .map(e => e*3)  .subscribe(e => console.log(e));

filtred value: 2 > 6 > filtred value: 4 > 12 > filtred value: 6 > 18

Notez ici l’utilisation du .do() très utile pour débugger.

Ici, j’ai mis quelques exemples plus ou moins classiques d’opérateurs de programmation fonctionnelle. Mais évidemment, il y en a plein d’autres. Je ne saurais que vous conseiller de jeter un coup d’oeil à la documentation pour voir tous ce qui est possible de faire.

Il n’y a pas que des .next() que nous pouvons passer à l’observer : le .error() et le .complete() peuvent être utilisés pour indiquer qu’un Observable a fini d’émettre des données. De manière générale, le .complete() n’est pas obligatoire, d’autant que pour certains Observables on ne sait jamais si l’envoi des données est terminé (ex: un Observable qui écoute les clics sur un bouton). Mais si c’est possible cela peut être vu comme une bonne pratique d’utiliser le .complete().

Rx.Observable.create(function(observer) {   observer.next("vue"); observer.next("vuue");   observer.complete("inutile");   observer.next("pas vue"); }).subscribe(   (e) => console.log("next: " + e) , (e) => console.log("error: " + e) , (e) => console.log("complete") );

next: vue > next: vuue > complete

Dans cet exemple on voit que le dernier .next() après le .complete() n’est pas envoyé et .complete() ne permet pas de renvoyer de valeur (.error() le peut).

Comment créer des Observables

Il y a une multitude de façons de créer un Observable :
****

  • Observable.create
  • Observable.of

On a déjà vu ces deux là dans les exemples précédents. On peut aussi créer un Observable avec :

  • Observable.fromEvent

var button = document.querySelector('button'); Rx.Observable.fromEvent(button, 'click')  .subscribe(() => console.log("Bonjour!"));

Il y en encore plusieurs autres façons par exemple un .fromPromise().

Subject

Regardons cet exemple:

var observable = Rx.Observable.create(function(observer)  {     observer.next("a");     observer.next("b"); }); console.log(“before subscribe”); observable.subscribe(e => console.log(e));

before subscribe > a > b

Ici on voit bien que l’appel au .subscribe() revient à faire un .call(). Et c’est là qu’on peut se demander comment faire pour avoir plusieurs subscribers sur un observable : en utilisant les Subjects.

Un Subject est à la fois un Observable et un Observer, c’est à dire que l’on va à la fois pouvoir .subscribe() et appeler la méthode .next() dessus.

subjectvar subject = new Rx.Subject()   .do((e) => console.log("Doing 11*"+e))   .map(e => 11*e); .filter((e) => e % 2 == 0)   .subscribe({     next: (e) => console.log('Odd numbers: ' + e)   }); subject   .filter((e) => e % 2 != 0)   .subscribe({     next: (e) => console.log('Even numbers: ' + e)   }); subject.next(11); subject.next(24); subject.next(7);

Doing 1111 > Even numbers: 121 > Doing 1124 > Odd numbers: 264 > Doing 1124 > Doing 117 > Even numbers: 77

Merge

Le merge va nous permettre de regrouper plusieurs Observables en un seul. Il peut être intéressant de constater la capacité qu’a le merge de regrouper des données de sources potentiellement très différentes. On peut par exemple traiter des données qui viendraient d’un événement, d’une Promesse ou d’un simple array avec un même Observer, le tout en conservant l’asynchronisme des différentes opérations.

var button = document.querySelector('.allo'); var observable_clicks = Rx.Observable.fromEvent(button, 'click'); var observable_timer = Rx.Observable.create(function (observer) { setInterval(() => observer.next('tick'), 500); });

Rx.Observable .merge(observable_clicks, observable_timer) .scan((e) => e + 1, 0) .subscribe((e) => console.log(e));

Ici, je log un compteur qui va s’incrémenter lorsque je clic sur les boutons avec la class .allo. Il s’incrémente aussi toutes les demi secondes.

En Conclusion

Le but de cet article était de montrer les différents points d’intérêt de RxJS, comme l’aspect fonctionnel ou les avantages que cela apporte sur les techniques existantes. Bien sûr, pour maîtriser tout cela, il ne reste plus qu’à utiliser concrètement les bases que nous avons vues dans cet article. Il y a énormément d’exemples de cas où RxJS peut être utile et je pense qu’il ne faut pas hésiter à y faire appel. Et comme je le disais en introduction ceux qui vont se lancer dans Angular2 vont de toute manière en avoir l’occasion.

Quelques liens et références utile

https://github.com/ReactiveX/rxjs
http://rxmarbles.com/ <- <3
http://reactivex.io/rxjs/