Comment bien s'entendre avec avec Spring Data R2DBC... ou pas

Dans un article précédent, sur Spring Data JDBC, nous avons vu qu'il est possible d'exploiter une couche d'abstraction plus légère que JPA pour développer des applications relativement simples. Nous allons à présent voir s'il est possible d'en faire autant en mode réactif, et comment l'exposer en tant qu'API REST avec Spring.

Le web “réactif” (parfois appelé “web fonctionnel”) est non-bloquant, asynchrone et piloté par les événements. Spring 5 s’appuie sur le serveur d’applications Netty qui correspond à ces contraintes, et sur des clients HTTP “réactifs”, voire WebSocket, pour dialoguer. Cette architecture est plus efficace lorsque les dépendances auprès d’autres services ou de ressources externes sont nombreuses et les accès potentiellement dégradés.

Le module Spring WebFlux s’appuie lui-même sur le Project Reactor, qui fournit une implémentation des Reactive Streams basée sur leur interface Publisher<> (les autres concepts de cet ordre sont Subscriber, Subscription et Processor). Ils sont réutilisables (contrairement aux streams de Java 8), et prévus pour recevoir tout type de données en entrée, sans limite de durée, ouvrant la porte à la maîtrise de flux infinis.

Les streams du Project Reactor sont matérialisés par les classes Flux<> et Mono<>, qui prennent respectivement en charge des éléments multiples et isolés. Elles s’apparentent d’une part à la hiérarchie de classes issue de l’interface Collection<> (Collections Framework), et d’autre part à la classe Optional<>. Et elles implémentent toutes l'interface Publisher<>.

Bâtir une couche d'accès aux données réactive

Comme les caractéristiques du flux doivent être prises en charge dès la source, nous allons commencer par mettre en place une couche d’accès aux données qui soit non-bloquante. Et justement, le projet Spring Data a évolué pour proposer cette couche réactive, à partir de l’interface ReactiveCrudRepository. Le type de retour des fonctions DAO est naturellement passé à Flux et Mono, que vous pouvez exploiter dans vos propres méthodes de requêtes implicites.

NB: sachez qu’un Mono ne peut pas contenir de valeur nulle (on utilise Mono#empty).

Le projet R2DBC (Reactive Relational Database Connectivity)

Pour avancer, vous devrez choisir une couche de persistance adéquate en fonction des disponibilités du marché. Ce sont d’abord des bases de données NoSQL qui se sont révélées compatibles avec ces besoins, comme les systèmes MongoDB, Couchbase et Cassandra, ou encore Redis (in-memory data grid). Cependant le projet R2DBC propose des solutions réactives relationnelles, pour ce que cela vaut, compte tenu des contraintes transactionnelles et de l’architecture de ces systèmes. Pour l’instant ce sont essentiellement les bases H2, MariaDB, Microsoft SQL Server, MySQL et PostgreSQL qui sont gérées. Le projet cherche également à influencer les initiatives ADBA et Asynchronous Database Connectivity in Java (ADBCJ).

Nous allons donc créer un projet Spring Boot (vous utilisez bien le Spring Initializr, n'est-ce pas ?) et opter pour des dépendances vers Spring Data R2DBC et H2 Database :

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.6.3</version>
</parent>

...

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-h2</artifactId>
    <scope>runtime</scope>
</dependency>

Comme nous allons le voir, le projet Spring Data R2DBC propose des repositories réactifs et un template réactif d’accès aux données, dont la classe se nomme DatabaseClient et qui s’appuie elle-même sur un driver R2DBC dédié à la base de données choisie. Comme avec JDBC, l’activation de la couche d’accès aux données peut être contrôlée par annotation, et le script schema.sql doit correspondre à la base de données choisie :

@SpringBootApplication

@EnableR2dbcRepositories // actually optional if not parameterized

public class R2dbcApplication {

...

}

DROP TABLE IF EXISTS PERSONS_ADDRESS;

DROP TABLE IF EXISTS PERSONS;

DROP TABLE IF EXISTS ADDRESS;


CREATE TABLE ADDRESS (

id bigint GENERATED ALWAYS AS IDENTITY,

street_number int NOT NULL,

street varchar(30) NOT NULL,

city varchar(30) NOT NULL,

state varchar(30),

zip varchar(30) NOT NULL,

country varchar(30) NOT NULL,

PRIMARY KEY (id)

);

CREATE TABLE PERSONS (

id bigint GENERATED ALWAYS AS IDENTITY,

first_name varchar(30),

last_name varchar(30) NOT NULL,

birth_date date NOT NULL,

PRIMARY KEY (id),

UNIQUE (first_name, last_name, birth_date)

);

CREATE TABLE PERSONS_ADDRESS (

person bigint,

address bigint,

PRIMARY KEY (person, address),

FOREIGN KEY (person) REFERENCES PERSONS(id)

ON DELETE CASCADE,

FOREIGN KEY (address) REFERENCES ADDRESS(id)

);

En théorie, les entités JDBC n’ont pas à évoluer pour R2DBC. Et dans un premier temps, nous allons simplement adapter les repositories pour bénéficier de l’API réactive :

import reactor.core.publisher.Flux;

import reactor.core.publisher.Mono;

...

public interface PersonsRepository

extends ReactiveCrudRepository<Person, Long>, PersonsDao {

Flux<Person> findByFirstNameLike(String pattern); // use % as wildcard


Flux<Person> findByLastName(String lastname, Pageable pagination);

Mono<Long> countByLastName(String lastname);


@Modifying // handles void, int, long or boolean return types

@Query("DELETE FROM persons WHERE last_name = :name")

Mono<Long> removeByLastName(@Param("name") String lastname);

}

public interface AddressRepository

extends ReactiveCrudRepository<Address, Long> {

}

Nous allons pouvoir tester immédiatement ces fonctionnalités :

import static org.assertj.core.api.Assertions.assertThat;

import reactor.test.StepVerifier;

...

@DataR2dbcTest // beware: entity callbacks won't work

@Slf4j

class RepositoryDerivationTests {

@Autowired

private PersonsRepository personDao;

@Test

void dao_derivation_ShouldLoad() {

personDao.findByFirstNameLike("foo%")

.as(StepVerifier::create)

.thenConsumeWhile(person -> {

log.info("FOUND PERSON: {}", person);

assertThat(person.getId()).isNotNull();

assertThat(person.getFirstName()).startsWith("foo");

assertThat(person.getLastName()).isNotBlank();

assertThat(person.getBirthDate())

.isBeforeOrEqualTo(LocalDate.now());

return true;

}).verifyComplete();

}

@Test

void dao_derivation_ShouldLoadAndRemove() {

final String name = "bar0";

// blocking

assertThat(personDao.countByLastName(name).block())

.isEqualTo(1L);

// non-blocking

personDao.findByLastName(name, Pageable.unpaged())

.as(StepVerifier::create)

.expectNextCount(1L) // collection size test

.verifyComplete();

personDao.removeByLastName(name)

.as(StepVerifier::create)

.expectNext(1L) // mono value test

.verifyComplete();

}

}

Remarque : nous avions ajouté des requêtes DROP TABLE au script SQL de construction du schéma pour pouvoir jouer différentes méthodes dans ce test d’intégration.

L’interface StepVerifier permet à la fois de définir les vérifications à effectuer sur les données reçues de manière asynchrone, et de faire remonter les exceptions correspondantes dans le thread principal afin que JUnit se comporte normalement.

Gestion de transactions avec R2DBC

L’annotation @Transactional a toujours cours avec une couche d’accès aux données réactives avec R2DBC et la fonctionnalité peut être activée à tout moment :

@Configuration

@EnableTransactionManagement // R2dbcTransactionManager

class TransactionsConfiguration {}

Exploitation du cycle de vie des entités réactives

Malheureusement pour nous… les problèmes vont commencer ! Dans le meilleur des cas, nous constaterions que la relation JDBC n’a pas été suivie ; mais en fait, d’autres erreurs masqueraient le problème de toute façon. R2DBC ne gère pas (encore) les jointures relationnelles du SQL (un comble !), et surtout il ne supporte pas non plus les propriétés basées sur les collections (prends ça dans les dents, PostgreSQL !). C’est pourquoi nous allons devoir finalement reprendre la structure de nos entités et gérer leurs relations, si nous voulons bénéficier quand même d’une couche d’accès aux données réactive avec JDBC.

Voici un test minimal que nous voudrions voir passer à notre modèle de données :

@Test

void dao_ShouldLoadAggregate() {

personDao.count()

.as(StepVerifier::create)

.expectNext(3L) // check returned value

.verifyComplete();

personDao.findAll()

.as(StepVerifier::create)

.expectNextCount(3L) // check returned collection size

.verifyComplete();

personDao.findById(1L)

.as(StepVerifier::create)

.assertNext(person -> {

log.info("EXISTING PERSON: {}", person);

assertThat(person.getId()).isEqualTo(1);

assertThat(person.getFirstName()).isEqualTo("foo0");

assertThat(person.getLastName()).isEqualTo("bar0");

assertThat(person.getBirthDate())

.isEqualTo(LocalDate.of(2000, 1, 1));

assertThat(person.getOfficePlaces()).hasSize(2);

person.getOfficePlaces().forEach(address -> {

log.info("OFFICE PLACE: {}", address);

});

}).verifyComplete();

}

Mais pour cela, il va falloir nous appuyer sur le cycle de vie des entités pour charger et enregistrer les adresses lorsque nous manipulons des personnes… Mais d’abord nous devons modifier ces entités pour les adapter au besoin :

@Table("ADDRESS")

@Immutable @Value

@Builder

public class Address {

@Id Long id;

@Column("STREET_NUMBER") Integer number;

@NotBlank String street;

@NotBlank String city;

String state;

String zip;

@NotBlank String country;

}

@Table("PERSONS")

@Data @NoArgsConstructor

@RequiredArgsConstructor

public class Person {

@Id

private Long id;

@NonNull

private String firstName;

@NonNull

private String lastName;

@NonNull

private LocalDate birthDate;

@Setter(AccessLevel.NONE)

@Transient

private Set<Address> officePlaces = new HashSet<>();

}

Le véritable changement consiste à sortir la relation multiple de la gestion de R2DBC. Pour lui, la table PERSONS_ADDRESS n’existera tout simplement pas.

Comme on l’a dit, les observateurs fournis par R2DBC couvrent le chargement et la persistance des données pour les entités que nous avons définies en Java ; mais leur suppression n’est pas (encore ?) prise en charge. Alors voici ce que nous pouvons faire pour gérer nous-mêmes la relation many-to-many :

public interface AddressRepository extends ReactiveCrudRepository<Address, Long> {

@Query("SELECT * FROM address WHERE id IN " +

"(SELECT address FROM persons_address WHERE person = :id)")

Flux<Address> findOfficePlacesForPersonId(Long id);

}

import org.springframework.context.annotation.Lazy;

import org.springframework.data.r2dbc.mapping.event.AfterConvertCallback;

...

@Component

public class PersonLoadOfficeListener implements AfterConvertCallback<Person> {

@Autowired @Lazy

private AddressRepository dao;

@Override

public Publisher<Person> onAfterConvert(Person person, SqlIdentifier table) {

return dao.findOfficePlacesForPersonId(person.getId())

.map(address -> {

if (address != null) person.getOfficePlaces().add(address);

return person;

})

.takeLast(1) // shrink JOIN duplicates

.single(person); // avoid no address persons skip

}

}

D'abord, nous avons ajouté une méthode pour exploiter la relation entre les deux entités, via leur table de jointure. Et lorsque nous avons injecté ce repository dans l’observateur de chargement, nous avons déclaré la dépendance en mode “lazy” ; car cet observateur va lui-même être injecté dans le template de R2DBC (DatabaseClient), qui à son tour sera injecté au repository. Et par défaut cette situation de dépendance circulaire n’est pas tolérée par Spring.

L’enregistrement va nous donner un peu plus de fil à retordre. Nous avons décidé de procéder comme l’annotation @ElementCollection de JPA et de procéder en supprimant toutes les relations concernées avant de les recréer une à une. Bien sûr, cela implique d’exiger la présence d’une transaction, pour se couvrir, et d'effacer ensuite à la main les orphelins potentiels dans la base de données :

import org.springframework.data.r2dbc.mapping.event.AfterSaveCallback;

import org.springframework.r2dbc.core.DatabaseClient;

...

@Component

@Transactional(propagation = MANDATORY)

public class PersonSaveOfficeListener implements AfterSaveCallback<Person> {

@Autowired

private DatabaseClient client;

private static final String

SQL_DELETE = "DELETE FROM persons_address WHERE person = :id",

SQL_INSERT = "INSERT INTO persons_address (person, address) VALUES ($1, $2)";

@Override

public Publisher<Person> onAfterSave(Person person,

OutboundRow outboundRow, SqlIdentifier table) {

return client.sql(SQL_DELETE).bind("id", person.getId())

.fetch().rowsUpdated()

.flatMap(deleted -> {

if (person.getOfficePlaces().isEmpty()) {

return Mono.just(person);

} else {

return client.inConnection(cnx -> {

Statement stmt = cnx.createStatement(SQL_INSERT);

person.getOfficePlaces().forEach(address ->

stmt.bind(0, person.getId())

    .bind(1, address.getId())

    .add() // new INSERT line in batch

);

return Flux.from(stmt.execute())

.last().map(dontcare -> person);

});

}

});

}

}

Étant donné que la table que nous manipulons ici ne fait pas l’objet d’une implémentation en tant qu’entité, nous utilisons directement le template R2DBC et pas un repository. Il y a d’abord une requête de suppression des relations pour la personne qui vient d’être mise à jour, et ensuite une série de requêtes exécutées en batch pour recréer les relations correspondant à l’objet courant, le cas échéant. La syntaxe de ces requêtes est différente, car l’outil Statement (qui sert à l’exécution des scripts SCHEMA et DATA) se situe “en-dessous” du template DatabaseClient.

Comme on le voit, l'imbrication des opérations et la syntaxe "fluent" n’aident pas forcément à la compréhension de l’ensemble… Pour exécuter l’intégralité des commandes avant de sortir de la fonction, nous donc attendu (#last) et remplacé le compteur de lignes modifiées (toujours égal à 1) de la dernière opération par l’entité qui doit remonter vers le template.

Notre relation many-to-many est désormais prise en charge, mais nous devons encore nous charger de supprimer les adresses orphelines qui sont susceptibles de rester. En effet, on a supprimé toutes les lignes, puis on a seulement recréé celles qui sont figurées par la présence d'éléments dans la collection porteuse. Avant de nous en occuper, on va ajouter une relation one-to-many à notre modèle, histoire de faire bonne figure :

@Table("PERSONS")

...

public class Person {

...

@Transient

private Address homeAddress;

}

@Component

public class PersonLoadHomeListener implements AfterConvertCallback<Person> {

@Autowired @Lazy

private AddressRepository dao;

@Override

public Publisher<Person> onAfterConvert(Person person,

SqlIdentifier table) {

return dao.findHomeAddressForPersonId(person.getId())

.map(address -> {

person.setHomeAddress(address);

return person;

}).defaultIfEmpty(person); // no home address, that's all

}

}

@Component

@Transactional(propagation = MANDATORY)

public class PersonSaveHomeListener implements AfterSaveCallback<Person> {

@Autowired

private DatabaseClient client;

private static final String SQL_UPDATE =

"UPDATE persons SET home_address = :address WHERE id = :person";

@Override

public Publisher<Person> onAfterSave(Person person,

OutboundRow outboundRow, SqlIdentifier table) {

if (person.getHomeAddress() != null) {

return client.sql(SQL_UPDATE)

.bind("person", person.getId())

.bind("address", person.getHomeAddress().getId())

.fetch().rowsUpdated()

.map(count -> person);

} else {

return client.sql(SQL_UPDATE)

.bind("person", person.getId())

.bindNull("address", Long.class)

.fetch().rowsUpdated()

.map(count -> person);

}

}

}

À présent, nous pouvons prévoir un nettoyage en tâche de fond, à condition de nous assurer qu’il soit toujours effectué après le passage des autres observateurs :

@Component @Order(HIGHEST_PRECEDENCE) // last listener to be executed

@Transactional(propagation = MANDATORY)

public class PersonOrphansListener implements AfterSaveCallback<Person> {

@Autowired

private DatabaseClient client;

private static final String SQL_DELETE =

"DELETE FROM address WHERE " +

"id NOT IN (SELECT DISTINCT home_address FROM persons) AND " +

"id NOT IN (SELECT DISTINCT address FROM persons_address)";

@Override

public Publisher<Person> onAfterSave(Person person,

OutboundRow outboundRow, SqlIdentifier table) {

return client.sql(SQL_DELETE).fetch()

.rowsUpdated().map(deleted -> person);

}

}

Malheureusement, comme il n’y a pas de callback pour la suppression, notre mécanique restera bancale. La table de jointure ne sera pas réellement propre après une suppression de personne ; mais elle le deviendra lors d’une prochaine opération de mise à jour de n’importe quelle entité de cette nature.

Pour la beauté du geste, nous ajoutons un système de recyclage des adresses, histoire que notre base de données ne s’encombre pas inutilement et que la relation one-to-many soit optimale. Et c’est aussi l'occasion d'utiliser l’extraction des données par l’exemple fournie par Spring Data et à laquelle on ne pense pas forcément :

public interface AddressRepository extends

ReactiveCrudRepository<Address, Long>,

ReactiveQueryByExampleExecutor<Address> {

...

@Query("SELECT * FROM address WHERE id IN " +

"(SELECT home_address FROM persons WHERE id = :id)")

Mono<Address> findHomeAddressForPersonId(Long id);

}

import org.springframework.data.r2dbc.mapping.event.BeforeSaveCallback;

...

@Component

public class AddressSaveListener implements BeforeSaveCallback<Address> {

@Autowired @Lazy

private AddressRepository dao;

@Override

public Publisher<Address> onBeforeSave(Address address,

OutboundRow row, SqlIdentifier table) {

if (address.getId() == null) { // INSERT: try to recycle

ExampleMatcher matcher = ExampleMatcher.matchingAll()

.withIncludeNullValues()

.withIgnorePaths("id");

//FIXME cannot use #findOne(Example) until AfterDeleteCallback is supported

return dao.findAll(Example.of(address, matcher))

.defaultIfEmpty(address)

//DEBUG existing addresses might be "updated", eventually

.last();

} else { // UPDATE: just move along

return Mono.just(address);

}

}

}

Par souci d’exhaustivité, on peut signaler l’existence d’un module nommé lc-spring-data-r2dbc, dont les dernières mises à jour ne dataient que de quelques mois au moment où nous écrivions ces lignes. Il s’agit d’une proposition (sans gestion de cache, ni de lazy loading ou write behind) de prise en charge des aspects relationnels pour R2DBC qui est peut-être intéressante, mais nous ne l’avons pas essayée. Vous trouverez toutes les informations à l’adresse suivante :

https://github.com/lecousin/lc-spring-data-r2dbc

Du côté de JPA (et oui, quand même !) le projet Hibernate Reactive reste actif mais n’est pas à notre connaissance intégré à Spring Data. Voici où aller pour en apprendre plus :

https://github.com/hibernate/hibernate-reactive

Exposer et consommer une API web réactive

Bien… Nous sommes à présent en possession d’une couche d’accès aux données reactive à peu près utilisable, au prix d’un effort conséquent tout de même. C’est pourquoi nous allons maintenant remplacer la dépendance à Spring MVC et opter pour WebFlux ; cependant sachez que les deux pourraient cohabiter sans problème :

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

La première conséquence de ce changement est que le serveur d’applications par défaut passera de Tomcat à Netty. L’Actuator, quant à lui, s'adaptera automatiquement.

Les instruments fournis, destinés à rendre les applications réactives, concernent à la fois le côté serveur comme nous allons le voir, mais également le côté client. Depuis Spring framework version 5, le RestTemplate a donc son équivalent réactif : WebClient.

Évidemment, le changement de paradigme a un impact important sur le contrat des DAO, et nous devons donc adapter les services et les contrôleurs qui en dépendent à cette évolution. Le seul vrai changement étant que la classe ModelAndView a été remplacée par la classe Model qui serait injectée en paramètre au lieu d’être retournée dans un contexte d’application web. Enfin, la gestion des exceptions est différente, car l’architecture multi-thread ne permet pas de les faire remonter naturellement.

Nous allons d’abord remplir le contrat CREATE-READ-DELETE pour les personnes :

@RestController

@RequestMapping("/persons") // sounds familiar, so far...

public class PersonsController {

@Autowired

private PersonsRepository personDao;

@Autowired

private AddressRepository addressDao;


@PostMapping

@ResponseStatus(CREATED)

@Transactional

public Mono<Person> addPerson(@RequestBody Person person) {

Assert.isNull(person.getId(), "Invalid person creation request"); return personDao.save(person)

.flatMap(entity -> // force reload+callback

personDao.findById(entity.getId()));

}

@GetMapping

@Transactional(readOnly = true)

public Flux<Person> getPersons() {

return personDao.findAll();

}

@GetMapping("/{id}")

@Transactional(readOnly = true)

public Mono<Person> getPerson(@PathVariable Long id) {

return personDao.findById(id)

.switchIfEmpty(Mono.error(NoSuchElementException::new));

}

@DeleteMapping("/{id}")

@ResponseStatus(NO_CONTENT)

@Transactional

public Mono<Void> removeOnePerson(@PathVariable Long id) {

return personDao.existsById(id)

.flatMap(exists -> {

if (exists.booleanValue())

return personDao.deleteById(id);

throw new NoSuchElementException("PERSON");

});

}

}

Remarque : selon les comportements constatés du DAO, nous gérons nous-mêmes si besoin le cas NOT FOUND pour assurer l'homogénéité de l'API.

Voici, en guise d’opération UPDATE, comment gérer la résidence des personnes (one-to-many) :

@PutMapping("/{id}/home")

@ResponseStatus(CREATED)

@Transactional

public Mono<Person> addHomeAddress(@PathVariable Long id,

  @RequestBody Address address) {

Assert.isNull(address.getId(), "Invalid address add request");

return personDao.findById(id)

.switchIfEmpty(Mono.error(NoSuchElementException::new))

.flatMap(person -> addressDao.save(address)

.flatMap(entity -> {

if (entity != null)

person.setHomeAddress(entity);

else // well, that should not happen

person.setHomeAddress(null);

return personDao.save(person);

}))

.flatMap(entity -> // force reload+callback

personDao.findById(entity.getId()));

}

@DeleteMapping("/{id}/home")

@ResponseStatus(NO_CONTENT)

public Mono<?> removeHomeAddress(@PathVariable Long id) {

return personDao.removeHomeAddressById(id); // 404 support

//TODO 404 when homeAddress is null?

}

Voici à présent comment gérer les adresses de bureau (many-to-many) :

@PutMapping("/{id}/office")

@ResponseStatus(CREATED)

@Transactional

public Mono<Person> addOfficePlace(@PathVariable Long id,

  @RequestBody Address address) {

Assert.isNull(address.getId(), "Invalid address add request");

return personDao.findById(id)

.switchIfEmpty(Mono.error(NoSuchElementException::new))

.flatMap(person -> addressDao.save(address)

.flatMap(entity -> {

if (entity != null)

person.getOfficePlaces().add(entity);

return personDao.save(person);

}))

.flatMap(person -> // force reload+callback

personDao.findById(person.getId()));

}

@DeleteMapping("/{personId}/office/{addressId}")

@ResponseStatus(NO_CONTENT)

public Mono<?> removeOfficePlace(@PathVariable Long personId,

@PathVariable Long addressId) {

return personDao.existsById(personId)

.flatMap(personExists -> {

if (personExists.booleanValue()) {

return addressDao.existsById(addressId)

.flatMap(addressExists -> {

if (addressExists.booleanValue())

return personDao.removeOfficePlaceById(personId, addressId);

throw new NoSuchElementException("ADDRESS");

});

}

throw new NoSuchElementException("PERSON");

});

}

Et enfin, voici la façon de gérer les exceptions directement dans le contrôleur (rien de neuf) :

@ExceptionHandler(NoSuchElementException.class)

public ResponseEntity<String> handleError(NoSuchElementException exception) {

return ResponseEntity.notFound().build();

}

@ExceptionHandler(IllegalArgumentException.class) //TODO other ones, like NPE

public ResponseEntity<String> handleError(Exception exception) {

return ResponseEntity.internalServerError().build();

}

Encore une fois, la syntaxe est parfois difficile à comprendre lorsque plusieurs opérations doivent être enchaînées ; mais c’est le prix à payer pour garder la main sur l’exécution réactive, par rapport à la programmation procédurale. Malgré tout, grâce à la puissance de Spring Data et aux observateurs du cycle de vie des entités qui sont externalisés, on obtient une approche assez directe des fonctions souhaitées. L’API est désormais prête à partir en production !

Consommation des services d’une API réactive

Comme nous l’avons déjà dit, du point de vue de l’implémentation, la seule chose notable est le type de retour des méthodes endpoint, qui adoptent le typage Mono<> ou Flux<> du Project Reactor. Mais les annotations de mapping HTTP restent les mêmes. À défaut de ce typage, il est nécessaire de spécifier explicitement les routeurs réactifs du service.

Voici un exemple de consommation réactive par un client que nous pourrions créer :

WebClient http = WebClient.create();

Mono<Person> response = http.get()

.uri("http://localhost:8080/persons/{id}", "1")

// .header(HttpHeaders.AUTHORIZATION, bearerToken)

.retrieve()

.bodyToMono(Person.class);

...

Sachez que les streams streams (ou flux) proposent une méthode #subscribe au lieu de #forEach, en guise de visitor pattern. Ils disposent également de mécanismes de résilience intégrée qui sont fort complets :

WebClient http = WebClient.create("http://localhost:8080/persons");

Flux<Person> response = http.get().retrieve()

.bodyToFlux(Person.class)

.retryWhen(Retry.backoff(10, Duration.ofSeconds(1)))

.timeout(Duration.ofSeconds(1))

.onErrorResume(IllegalAccessError.class, ex -> Flux.empty()));

...

D’autre part, sachez qu’on peut facilement envelopper le Spring WebClient avec Feign pour implémenter très rapidement des clients réactifs personnalisés.

Tests spécifiques de la réactivité des API

Une API existe qui est destinée à tester le Project Reactor lui-même :

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>

Ces outils permettent de tester des scenarii pas-à-pas (StepVerifier), et offrent la possibilité de prendre le contrôle sur l'enchaînement des événements (virtual time).

import org.springframework.test.web.reactive.server.WebTestClient;

import org.springframework.web.reactive.function.BodyInserters;

...

@SpringBootTest(webEnvironment = RANDOM_PORT)

class DemoApplicationTests {

@Autowired

private WebTestClient client;

@Test

void personRUD_API() {

client.get().uri("/persons")

.exchange()

.expectStatus().isOk()

.expectBodyList(Person.class).hasSize(3);

client.get().uri("/persons/" + 1)

.exchange()

.expectStatus().isOk()

.expectBody(Person.class).value(person -> {

assertThat(person.getId()).isNotNull();

assertThat(person.getFirstName()).isEqualTo("foo0");

assertThat(person.getLastName()).isEqualTo("bar0");

assertThat(person.getBirthDate())

.isBeforeOrEqualTo(LocalDate.now());

assertThat(person.getOfficePlaces()).hasSize(2);

});

// handle office places (many-to-many)

final Address address = Address.builder()

.number(0)

.street("the street")

.city("the city")

// no state

.zip("xxxxx")

.country("the country").build();

client.put().uri("/persons/" + 1 + "/office")

.accept(MediaType.APPLICATION_JSON)

.body(BodyInserters.fromValue(address))

.exchange()

.expectStatus().isCreated()

.expectBody(Person.class).value(person -> {

assertThat(person.getOfficePlaces()).hasSize(3);

});

// handle one person deletion

client.delete().uri("/persons/" + 1)

.exchange()

.expectStatus().isNoContent();

client.get().uri("/persons/" + 1)

.exchange()

.expectStatus().isNotFound();

client.delete().uri("/persons/" + 1)

.exchange()

.expectStatus().isNotFound();

// handle all persons deletion

client.delete().uri("/persons")

.exchange()

.expectStatus().isNoContent();

client.get().uri("/persons")

.exchange()

.expectStatus().isOk()

.expectBodyList(Person.class).hasSize(0);

}

}

On n’est pas bien, là ?... On pourrait continuer comme ça pendant des heures (la version complète de notre test d’API ne couvre le code qu’à 95%, parce que nous avons été un peu laxistes avec les annotations Lombok en qui nous avons confiance) !

Avant de passer à la suite, on doit signaler l'existence d’une annotation @WebFluxTest, qui est bien sûr l’équivalent reactive de @WebMvcTest, pour tester un contrôleur WebFlux à la fois, en bouchonnant éventuellement ses dépendances.

Pour conclure sur ce chapitre d’implémentation des API web réactives, nous vous signalons le projet suivant, qui montre comment exploiter WebFlux avec Spring Data JPA :

https://github.com/rxonda/webflux-with-jpa

Superviser une application web réactive

Désormais les nombreux endpoints de l’Actuator sont regroupés par défaut sous un chemin commun /actuator. Ils sont si nombreux que la plupart d’entre eux sont désactivés par défaut, afin d’économiser les ressources. Une propriété est prévue pour les activer tous à la fois ou en partie, via une liste et des wildcards :

  • management.endpoints.web.exposure.include=*

Les endpoints peuvent être étendus très souplement, grâce à la nouvelle annotation @EndpointExtension et à ses spécialisations, @EndpointWebExtension et @EndpointJmxExtension, qui sont à la fois compatibles avec Spring MVC et WebFlux.

D'autre part il est également possible de créer ses propres endpoints à partir de “rien” avec des annotations comme @Endpoint et @Selector.

Concernant le endpoint /health en particulier, notez qu’une nouvelle interface réactive nommée ReactiveHealthIndicator a été ajoutée au système pour l’implémentation de vos tests de vie. Des mécanismes sont proposés dans la foulée, pour les catégoriser.

Écouter le cycle de vie des endpoints réactifs

De la même façon que le RestTemplate de Spring MVC supporte la mise en place d’interceptors (ClientHttpRequestInterceptor), WebFlux propose lui des filters (ExchangeFilterFunction) pour auditer les échanges avec les services distants.

Conclusions

Bon ! On a bien travaillé avec R2BC. Un peu en mode DIY quand même... Certes. Est-ce que c'est un projet intéressant pour une exploitation professionnelle ? Certainement pas à ce niveau de maturité ; on ne s'imagine pas implémenter (ou maintenir) l'ORM complet d'un projet contenant des dizaines, voire des centaines d'entités. Et l'absence de support de l'événement DELETE dans la couche de persistence est probablement rédhibitoire dans le contexte d'un trafic important. Nous n'avons d'ailleurs pas encore pris le temps de mesurer nous-mêmes les performances de notre exemple, ne serait-ce que pour le comparer notamment à son cousin JDBC (ndlr : ajouter à la liste des choses à faire).

Un ultime désavantage à noter pour le moment : il n'est pas encore possible de monter des tests d’intégration transactionnels, faute de PlatformTransactionManager. C'est la raison pour laquelle nous avons privilégié des tests fonctionnels pointés directement sur l'API ; étant donné que nous avions imposé la présence d'une transaction dans l'implantation des callbacks. Cela ne change rien à la couverture de code, c'est juste qu'ils sont moins découplés qu'on pourrait le souhaiter idéalement.

Cependant, encore une fois, la compréhension des mécanismes internes à un niveau d'abstraction suffisamment bas est souvent bénéfique. Notamment pour utiliser la technologie Reactive qui constitue un changement de paradigme de programmation. Et aussi pour se lancer enfin vers la solution Spring WebFlux qui, en pratique, est réputée pour offrir de meilleures performances que Spring Servlet, sur certains cas d'utilisation comme les API web.