LMAX : 6 millions d'opérations par seconde

logo
Pour rebondir sur le challenge USI, voici un cas à part dans les architectures hautes performances et haute disponibilité.

LMAX (http://www.lmaxtrader.co.uk : plate forme de trading) va à contre courant avec son architecture et montre que l’on peut faire du XTP uniquement avec le JDK mais non sans revoir quelques idées reçues.

En effet alors que la tendance est :

  1. coté hardware, à multiplier le nombre de core plutôt que leur puissance brute,
  2. coté logiciel à introduire le multi threading,

LMAX est spécialisé dans le passage d’ordres de bourse ce qui suppose du temps réel pour l’affichage des cotations, un volume de transactions très important, le respect de l’ordonnancement des ordres et un temps réduit pour leur exécution.

Leur architecture est dimensionnée pour des millions d’utilisateurs, que ce soit par leur site ou bien en utilisant les API publiées.

LMAX peut gérer 6 millions d’ordres par seconde avec une machine tout à fait courante (2 CPU, 4 core à 3Ghz et 32Go RAM) et le tout en Java et sans framework supplémentaire.

LMAX est une filiale de Betfair (site de paris en ligne) et devant les difficultés de mise au point et de maintenance ils ont décidé de revoir complètement l’architecture mise en place pour construire LMAX.

Ils ont tout d’abord pensé que le multithreading serait la solution idéale.

Après les premiers tests ils ont fait le choix délibéré de ne pas recourir au multi-thread qui comporte les inconvénients suivants :

  • Plus complexe à mettre en œuvre
  • Plus complexe à tester
  • Coût hardware plus élevé

On peut modérer ce dernier point car le nombre de cœurs va directement donner le nombre de traitements possibles comme nous le verrons par la suite.

Plus que l’architecture mise en place, se sont les observations faites qui sont intéressantes (sur Java, sur les CPU, …) et qui parfois remettent en cause tout ce que l’on pouvait penser jusqu’alors.

Leur choix d’architecture et surtout les tests de performances les ont amené à faire une analyse très poussée des CPU modernes.

Ils se sont rendu compte que ce qui est communément admis (et sans doute vrai avec les anciens processeurs) ne l’est plus du tout aujourd’hui.

Ils ont donc conçu un système en adéquation avec le fonctionnement des processeurs modernes (c’est ce qu’ils nomment « mechanical sympathy »).

Leur besoin extrême en performance les ont entraîné bien au delà des analyses habituelles.

Pour vous donner une idée on entend bien souvent la maxime suivante « disk is the new tape » sous entendu que les temps de lecture/écriture sur disque sont très pénalisants en terme de performances et que les capacités actuelles des machines (fiabilité, volume) encouragent une utilisation de la mémoire uniquement.

Pour LMAX même les performances de la mémoire principale (RAM) ne sont pas suffisantes et ils ont tout fait pour que leur code privilégie le cache du CPU (leur code est déclaré cache safe).

Comparaison des coûts de lecture en fonction du type de mémoire

**Latency from CPU t** **Approx. number of CPU cycles** **Approx. time in nanoseconds**
 NETWORK  240,000,000 cycles
 DISK 41,000,000 cycles
 Main memory 250 cycles ~65ns
 QPI bus transit between sockets ~20ns
 L3 cache ~40-45 cycles ~15ns
 L2 cache ~10 cycles ~3ns
L1 cache ~3-4 cycles ~1ns
Register 1 cycle <1ns< td>
Rapidement leur exigence de performance les a obligé a abandonner tout lock de ressource car synonyme de « context switch » (en basculant d’un thread à l’autre le CPU peut écraser les valeurs contenues dans le cache).

En effet en cas de lock, le système donne la main à un autre thread en attendant la libération de la ressource. Ce qui implique un changement de contexte (CPU et données) et le contenu du cache est donc susceptible d’être écrasé avant d’être de nouveau chargé.

Il ne suffit donc pas d’utiliser les caches du processeur, encore faut il comprendre et maîtriser leur fonctionnement.

En terme de méthodologies rien de révolutionnaire, bien au contraire, puisqu’ils approuvent et cautionnent le TDD ainsi que la notation UML.

Problématiques et phénomènes analysés :

  • False sharing
  • Memory barrier
  • Context switch
  • CAS
  • Cache line

Leur conclusion vis à vis des méthodes existantes :

  1. Les queues (java) sont coûteuses (temps de traitement et lock des ressources)
  2. Le multi-threading est coûteux (contentions sur les ressources communes) et difficile a mettre en œuvre.
  3. De surcroit le multi-threading n’est pas le plus efficace en terme de performances.

Évidemment leur conclusion est très liée au contexte de leur étude et leur architecture ne pourra convenir à tous.

Parmi les contraintes ont peut citer :

Les messages doivent être lus séquentiellement (un consommateur ne choisi pas ses messages)

Chaque message doit donc avoir un sens au niveau fonctionnel (et non pas un ensemble de messages).

Cerise sur le gâteau, en plus d’une transparence assez rare sur leur architecture, ils ont décidé de faire partager leur framework qui est maintenant disponible en open source LMAX Disruptor.

Ce qu’a mis en œuvre LMAX c’est une façon plus rapide de partager les données entre les threads, sans recourir aux mécanismes utilisés habituellement (variables synchronisées).

Particularités de l’architecture :

  • Utilisation du pattern Disruptor
  • Pas de base de données, tout en mémoire
  • Pas de queuing/messaging classique
  • Pas de locking

NB : l’avantage de ne pas recourir à un SGBD, en plus de n’avoir qu’un seul langage de programmation (exit SQL, EJBQL, etc.), est que donc seule une JVM est nécessaire (Java 5+). Il en ressort une architecture simplifiée sans pour autant sacrifier la disponibilité.

Pattern Disruptor

Tout l’intérêt du pattern Disruptor réside dans la façon de produire et consommer des messages stockés dans le Ring Buffer.

Les producteurs, le Ring Buffer et les consommateurs sont chacun dans des threads différents.

Les acteurs : Producteur, Consommateur

Les objets : RingBuffer, Barrier

Les stratégies : Wait, Claim

Cette structuration du code oblige a certaines adaptations.

Le traitement d’un message (le cœur du process) doit être le plus rapide possible. Si il s’agit d’un traitement long (persistance, IO, journalisation) ce traitement doit être déporté dans un process indépendant qui ne ralentira pas les autres. Ces traitements sont placés dans les consommateurs.

C’est le royaume de l’asynchronisme.

Un peu de théorie

Compare and Swap (CAS)

De nombreux processeurs modernes possèdent une instruction qui effectue une opération atomique de type comparaison et échange ou compare-and-swap (CAS) au niveau matériel.

Utile afin d’éviter l’utilisation des techniques habituelles d’accès concurrents telles que Sémaphores et Mutex.

Le principe est assez simple à implémenter en Java (ici c’est purement théorique puisque l’opération est réalisée par le CPU et non par code).

@ThreadSafe public class SimulatedCAS { @GuardedBy("this") private int value; public synchronized int get() { return value; } public synchronized int compareAndSwap(int expectedValue, int newValue) { int oldValue = value; if (oldValue == expectedValue) value = newValue; return oldValue; } public synchronized boolean compareAndSet(int expectedValue, int newValue) { return (expectedValue == compareAndSwap(expectedValue, newValue)); } }

Chaque client conserve l’ancienne valeur et lorsqu’il souhaite la modifier il suffit de comparer cette ancienne valeur avec la valeur actuelle et si elle est différente c’est que quelqu’un d’autre est passé par là.

Support CAS dans la JVM

Avant Java 5, il était impossible de forcer le processeur à utiliser une instruction CAS (à moins d’utiliser du code natif).Depuis un nouveau package à été introduit “java.util.concurrent.atomic”.

Les classes associées sont AtomicBoolean, AtomicInteger, AtomicLong, et AtomicReference pour les références objet.

CAS est plus rapide qu’un lock car :

  • Une seule instruction pour deux opérations (mise à jour conditionnelle)
  • Pas de context switch contrairement aux locks.

Toutefois CAS doit se limiter aux opérations simples sur des objets simples (incrément d’un compteur par exemple) car il peut s’avérer très complexe à maitriser (et pas plus rapide plus que les locks).

Comparaison des temps de traitement (incrément d’un long 500 millions de fois)

**Method****   Time (ms)   **
 Single thread 300
Single thread with lock10000
Two threads with lock224000
  Single thread with CAS5700
Two threads with CAS30000
Single thread with volatile write4700
### Memory Barrier

Les barrières mémoires sont une instruction CPU qui permet d’isoler une partie des instructions du reste du code.

Le compilateur mais aussi le CPU peuvent réorganiser les instructions (changer l’ordre d’exécution) tant que le résultat final est le même pour des raisons de performances.

Les barrières mémoires indiquent au CPU et au compilateur que les instructions entourées de barrières ne doivent pas en sortir et ne doivent pas être réorganisées. De plus lorsque le CPU franchi une barrière il rafraichit systématiquement ces caches afin d’être certain que le code travaille avec la dernière valeur d’une données.

Le Java Memory Model précise que lorsque l’on utilise des données volatiles, alors le compilateur insère automatiquement une barrière d’écriture après une modification du champ et une barrière de lecture avant toute lecture.

Les barrière mémoire apporte donc une certaine sécurité dans le code pour un coût certes non négligeable mais inférieure à un lock.

C’est peut être la réhabilitation/redécouverte du mot clé volatile en Java.

Cache Line/padding/false sharing

La mémoire est stockée dans le processeur dans ce que appelle des « caches lines » (blocs contigus de mémoire).

Ces blocs font de 32 à 256 octets en fonction des processeurs (64 octets est la valeur la plus courante).

Ainsi les données stockées dans ces caches ne sont pas isolées mais sont traitées par bloc, donc si deux variables sont contenues dans le même bloc alors elles sont traitées de manière commune.

Ce qui signifie qu’un lock sur une des variables bloquera obligatoirement toutes les données du bloc.

C’est ce qu’on appelle le « false sharing ».

Ce qui est acceptable pour un traitement mono-thread, l’est beaucoup moins lorsque l’on travaille en multi-threads.

Dans le cas de l’architecture mise en place par LMAX et le pattern Disruptor un des composants que l’on doit absolument protéger est le Ring Buffer et en particulier le curseur. De manière générale il s’agit des variables de type volatile.

Il existe heureusement une technique pour s’affranchir de ce problème, il s’agit du padding (remplissage).

Plutôt que de déclarer uniquement notre curseur on va déclarer tout un ensemble de variable qui ne serviront pas mais permettront de s’assurer que l’ensemble remplisse totalement la taille du cache line.

Extrait du code du RingBuffer

private volatile long cursor = INITIAL_CURSOR_VALUE; public long p8, p9, p10, p11, p12, p13, p14; // cache line padding

Au lieu d’un seul long ce sont 8 longs qui sont déclarés soit 8*taille d’un long : 64 octets.

Sur des processeurs utilisant des blocs de 64 octets ont est assuré que le bloc n’est pas partagés par plusieurs variables sensibles (sur une JVM 32 bits).

On pourrait croire cette technique contre productive (déclarer plus de variables) que ce dont on a réellement besoin mais les tests menés tendent à prouver le contraire.

Aperçu des performances obtenues en éliminant le false sharing (http://mechanical-sympathy.blogspot.com/2011/07/false-sharing.html).

NB : Comme il n’est décidément pas simple d’obtenir le meilleur de Java, Java 7 est plus intelligent que ces prédécesseurs (en tout cas le compilateur) et élimine les variables inutilisées rendant obsolète la technique utilisée ci dessus.

Heureusement LMAX travaille déjà sur une parade (http://mechanical-sympathy.blogspot.com/2011/08/false-sharing-java-7.html) en attendant un jour que Java, à l’image du C++ et du C# pour les structures, permette au développeur de gérer les caches lines pour les structures).

Disruptor

Ring Buffer

Le Ring Buffer est l’élément central puisque c’est lui qui stocke les messages. C’est une implémentation maison des collections Java. Afin de le rendre le plus performant possible, il est déchargé de certaines opérations habituellement dévolues à ce type de structure (cycle de vie des données, …).
Caractéristiques :

  • Buffer de taille fixe (tableau d’Entry).

  • Si la file est pleine on remplace le premier enregistrement (principe FIFO).

Deux pointeurs sont utilisés :

Cursor : Dernière insertion/modification d’un consommateur ou d’un producteur.

Next : Premier emplacement disponible dans le Ring Buffer.

On accède à un message par son numéro de séquence (long et non Long car plus rapide).

Chaque acteur accédant au Ring Buffer possède son propre compteur (emplacement actuellement accédé du Ring Buffer) et évidemment il est possible de synchroniser un producteur et un consommateur (un acteur gère son propre compteur pour les écritures mais peut lire les autres compteurs).

Ceci permet de gérer les multiples accès au Ring Buffer.

Le numéro de séquence n’est pas le numéro de slot du Ring Buffer mais un compteur incrémenté par le producteur (il augmente toujours).

Une fois le message consommé, l’emplacement n’est pas libéré (vidé), il est simplement disponible pour un nouveau message.

Un message est considéré comme consommé seulement si un accusé de réception est transmis assurant une certaine robustesse au système.

Une fois consommé le message est sous la responsabilité du consommateur (différence avec du queuing par exemple).

Messages (Entry)

Les éléments stockés sont des Entry (classe à surcharger obligatoirement, seule une classe abstraite est fournie) qui peuvent être soit directement des valeurs, soit des pointeurs vers ces valeurs. C’est pour cela que seule une classe abstraite est fournie, l’implémentation étant trop liée au contexte du projet.

Exemple simple ou le message est un long :

public final static class ValueEntry extends AbstractEntry { private long value; public long getValue() { return value; } public void setValue(final long value) { this.value = value; } public final static EntryFactory ENTRY_FACTORY = new EntryFactory() { public ValueEntry create() { return new ValueEntry(); } }; }

Consommateur :

Un même message peut être consommé par différents consommateurs en parallèle ce qui permet par exemple de gérer les logs en parallèle du traitement.

Définit comment est transmis le message : par copie ou par référence.

On peut chainer les consommateurs, ainsi un consommateur peut attendre qu’un premier consommateur ait traité la donnée avant d’intervenir (transformation, complétion du message par exemple).

L’implémentation d’un consommateur n’est pas aisé mais est facilité par la présence d’une implémentation par défaut « BatchConsumer » qui conviendra dans bien des cas.

Ainsi le consommateur implémente simplement l’interface “BatchHandler” (callback invoqué lorsqu’une nouvelle entrée est disponible), soit l’implémentation de la méthode « onAvailable(T entry) » qui sera appelée lorsqu’un nouveau message aura été déposé dans le Ring Buffer.

 /*** Callback interface to be implemented for processing {@link AbstractEntry}s as they become * available in the {@link RingBuffer} * @see BatchConsumer#setExceptionHandler(ExceptionHandler) if you want to handle exceptions * propagated out of the handler. * * @param AbstractEntry implementation storing the data for sharing during exchange or * parallel coordination of an event. */ public interface BatchHandler { /*** Called when a publisher has committed an {@link AbstractEntry} to the {@link RingBuffer} * * @param entry committed to the {@link RingBuffer} * @throws Exception if the BatchHandler would like the exception handled further up * the chain. */ void onAvailable(T entry) throws Exception; /** * Called after each batch of items has been have been processed before the next waitFor * call on a {@link ConsumerBarrier}. * * This can be taken as a hint to do flush type operations before waiting once again on * the {@link ConsumerBarrier}. * The user should not expect any pattern or frequency to the batch size. * @throws Exception if the BatchHandler would like the exception handled further * up the chain. */ void onEndOfBatch() throws Exception; }

Producteur :

C’est lui qui insère les messages dans le Ring Buffer.

Son interaction avec le Ring Buffer est le plus souvent assez simple :

Effectue une demande d’emplacement libre,

Commit le message une fois valorisé, le message est alors visible de tous.

Productor Barrier :

L’objet est obligatoire pour insérer des messages dans le Ring Buffer.

Les producteurs ne dialoguent pas directement avec le Ring Buffer mais avec le ProductorBarrier.

La ProductorBarrier est créée par le Ring Buffer.

Consumer Barrier :

Obligatoire pour consommer des messages.

Un consommateur accède au Ring Buffer au travers d’un ConsumerBarrier.

La ConsumerBarrier est créée par le Ring Buffer

Consommation d’un message :

Le consommateur doit connaître le numéro de séquence avant de consommer un message.

il peut :

  1. Interroger le Ring Buffer pour connaitre le curseur courant et obtenir le message par ce numéro de séquence.
  2. Obtenir le prochain message disponible.
  3. Gérer son propre compteur et interroger le Ring Buffer pour connaitre le nombre de messages disponibles avec un numéro de séquence supérieur (et les récupérer en masse).

En théorie les consommateurs ne font que lire les messages, toutefois il est possible pour un consommateur d’écrire dans le Ring Buffer. La condition est qu’une entrée ne peut être modifiée que par un seul consommateur.

Garbage Collecteur et Ring Buffer :

Le Ring Buffer est un tableau d’Entry.

Sa taille est fixe et allouée dès le début, la probabilité pour que l’ensemble du Ring Buffer soit contenu dans des espaces contigus est donc forte.

Le Ring Buffer permet de bénéficier du cache striding.

Les messages consommés sont remplacés par d’autres au besoin afin de limiter la consommation de la mémoire.

Les messages sont évidemment recyclés par le Garbage Collector mais pas le Ring Buffer.

Les objets ayant une durée de vie courte ou les données qui persistent tout le temps de vie d’une application ne sont pas problématique en terme de performance du Garbage Collector.

Ceux qui posent problèmes sont ceux qui peuvent survivre à un Garbage Collector pour être nettoyés ensuite.

Concrètement ce ne sont ni les variables locales ni les variables globales mais bien celles qui s’échangent entre fonctions ou entre thread.

LMAX a donc cherché à les minimiser.

Stratégies

Lorsque l’on crée un Ring Buffer on va préciser la manière dont les messages seront consommés (consommateur) et la gestion des numéro de séquence (producteur)

WaitStrategy

Stratégie utilisée par le consommateur en attente de messages postés par le producteur.

BLOCKING

Stratégie classique qui utilise un lock tant que le message n’est pas disponible, bloque le consommateur mais ne consomme pas de CPU.

C’est la stratégie par défaut.

YIELDING

Cette stratégie utilise un Thread.yield() pour rendre la main au CPU évitant ainsi de bloquer les traitements.

Cette stratégie est évidemment moins rapide mais permet de préserver les ressources.

BUSY_SPIN

On attend tant que la ressource n’est pas disponible mais sans poser de lock sur la ressource (la plus rapide mais mobilise CPU et consommateur).

ClaimStrategy

Stratégie utilisée pour récupérer le prochain numéro de séquence libre (producteur).

MULTI_THREADED : un thread pour chaque demande.

C’est la stratégie par défaut compatible avec plusieurs producteurs.

SINGLE_THREADED : La plus optimisée (un seul thread en écriture).

Gestion des erreurs :

La gestion des erreurs est particulière et surprenante.

Leur principe est le suivant : il n’y a pas d’erreur ! (les tests sont là pour adapter le code à toutes les éventualités).

Dans leur esprit une entrée (un message), ne doit pénétrer le système qu’après validation afin d’être certain qu’elle pourra être traitée correctement.

En cas de faille du système un autre prend le relai.

Leur framework ne permet pas le retour en arrière (rollback d’une transaction).

Toutefois pour les plus sceptiques il y a possibilité de surcharger la classe ExceptionHandler pour pouvoir exécuter du code en cas d’erreur de traitement d’une Entry.

NB : Seules les exceptions de type RuntimeException mettent fin au programme (après un log).

Failover :

Avec un tel système (event processing) il suffit de pouvoir sauvegarder les messages pour pouvoir les rejouer et donc rétablir une plateforme après un crash.

C’est le système retenu par LMAX. Toutefois afin d’éviter d’avoir a rejouer un trop grand nombre de messages et d’avoir un temps de rétablissement trop important il ont décider de prendre une photo (snapshot) de leur système quotidiennement (état des données en sortie) et de simplement journaliser les événements de la journée.

LMAX a plusieurs systèmes qui tournent en parallèle (3) mais dont un seul n’est réellement actif.

Chaque message est traité par l’ensemble des systèmes ce qui oblige à synchroniser les trois nœuds.

Ce système de cluster détecte en permanence si le maître est toujours actif (multicast). Un autre prend le relais en cas de défaillance.

Plus d’informations dans le Bliki de Martin FOWLER.

Remarques et problématiques :

Le pattern idéal pour le Disruptor est :

  1. Un seul producteur.
  2. Plusieurs consommateurs (dans des threads différents).
  3. Un seul thread par CPU (si 8 cœurs alors 8 threads maximum).

Avoir un seul producteur est un gain considérable :

  • Pas de contentions sur les écritures du Ring Buffer (donc pas de lock, de CAS, etc..).
  • Gestion plus simple du curseur (dernier ajout) qui ne dépend que d’un seul traitement.
  • Un seul thread responsable des écritures.
  • Meilleure utilisation du cache CPU.

Une amélioration du Disruptor est le batching effect.

Un consommateur conserve le numéro de séquence de la dernière entrée traitée (point de référence) ainsi lorsqu’il interroge le Ring Buffer pour savoir si un nouvel élément est disponible et que plusieurs entrées ont été mises à jour alors le Ring Buffer envoie l’ensemble des séquences ce qui permet de les traiter toutes sans multiplier les interrogations du Ring Buffer.

Comment s’assurer qu’un message n’est traité qu’une seule fois en cas de multiples consommateurs ?

Nativement impossible mais avec un peu d’ingéniosité on y arrive (par exemple si on utilise deux consommateurs un premier traitera les séquences paires, l’autre les impaires) en cas de consommateurs plus nombreux on utilisera les modulos.

Dans la majorité des cas un seul Ring Buffer est nécessaire même si les consommateurs sont chainés (contrairement aux queues par exemple) par contre il faut une consumerBarrier par consommateur.

Comparaison des systèmes possibles (traitement en parallèle)

Multithreading : Lock sur les ressources partagées pour gérer les accès concurrents.

SEDA (staged event-driven architecture) : Utilisation de queues pour les ressources partagées.

LMAX : Utilisation de RingBuffer (Array non synchronisé) pour les ressources partagées.

Conclusion

Nous avons le cas d’une recherche extrême de la performance.

Heureusement que toutes les applications n’ont pas besoin de grappiller les millisecondes.

Le modèle choisi par LMAX n’est surement pas applicable à tous les cas mais il permet de faire évoluer des certitudes :

  • Pour un traitement simple (un calcul) par exemple le multithreading n’est pas la solution miracle.
  • Ne partager des ressources que si on ne peut pas faire autrement.
Même si la courbe d’apprentissage n’est pas très  longue on est loin de la simplicité de Java 7 Fork/Join.
Enfin il semblerait qu’il n’y ait pas de solution universelle, j’en veux pour preuve ce benchmark tiré de [VanillaJava](http://vanillajava.blogspot.com/2011/07/synchronized-vs-lock-performance.html).
Dans ce test on compare les performances des trois techniques suivantes :
- Synchronized. - Lock. - Volatile (AtomicInteger).
Le tableau présente le temps de traitement en secondes de 25 millions de locks (Java 6U26).
Nombre de ThreadsSynchronized 1 compteurLock 1 compteurAtomicInteger 1 compteurSynchronized 2 compteursLock 2 compteursAtomicInteger 2 compteurs
10.9370.7860.4001.5321.4840.569
22.7664.5970.6765.3986.3551.278
43.9041.2670.6946.3302.6571.354
83.8840.9531.0115.4182.0732.761
163.2070.8691.1714.8171.6562.800
323.2130.8531.2404.9151.6802.843
643.3220.9211.2695.0491.6392.843
Conclusion du benchmark : moins il y a de threads, meilleures sont les performances.

Les performances ne sont pas linéaires avec le nombre de threads.

De manière globale, on peut extrapoler que les locks sont plus performants que l’utilisation des synchronized.

En fonction du nombre de threads, la meilleure technologie n’est pas la même. Ce qui milite fortement pour des tests systématiques lors du choix de la technologie la plus adaptée au contexte du projet.

Compléments :

Bliki M. FOWLER

Blog équipe LMAX

Exemple de code

Le code est là pour illustrer l’utilisation du pattern Disruptor (ne chercher pas l’utilité d’un tel code moi même je la cherche encore).

Nous avons un producteur qui va poster des messages correspondant à un chiffre de 0 à 1000.

Nous avons divisé la tache en trois :

  • Un consommateur pour calculer le cube des chiffre pair.
  • Un consommateur pour calculer le cube des chiffre impair.
  • Un consommateur pour afficher les résultats.

Le consommateur 3 (logger) a besoin des deux autres pour pouvoir fonctionner.

Cet exemple va donc illustrer une dépendance entre des consommateurs ainsi qu’un partage des tâches entre deux consommateurs.

Enfin les consommateurs de type calcul écrivent dans le Ring Buffer le résultat de l’opération.

Code d’une Entrée

 public final static class ValueEntry extends AbstractEntry { private long value; private long cubic; private String className; public long getCube() { return cubic; } public void setCube(long cube) { this.cubic = cube; } public long getValue() { return value; } public void setValue(final long value) { this.value = value; } public void setClassName(String className) { this.className = className; } public String getClassName() { return className; } public final static EntryFactory ENTRY_FACTORY = new EntryFactory() { public ValueEntry create() { return new ValueEntry(); } }; }

Il y a trois champs :

  1. La valeur initiale.
  2. Le résultat du calcul.
  3. Le nom de la classe qui va permettre d’identifier le consommateur qui a effectué le traitement.

Code du producteur

 for (long i = 0; i < =1001; i++) { // Producers claim entries in sequence ValueEntry entry = producerBarrier.nextEntry(); // copy data into the entry container entry.setValue(i); // make the entry available to consumers producerBarrier.commit(entry); }

Code des consommateurs (calcul)

 public void onAvailable(final ValueEntry entry) throws Exception { // process a new entry as it becomes available. entry.setCube((entry.value) * (entry.value) * (entry.value)); StackTraceElement trace = Thread.currentThread().getStackTrace()[3]; entry.setClassName(trace.getClassName()); }

Code du consommateur (logger)

 public void onAvailable(final ValueEntry entry) throws Exception { // process a new entry as it becomes available. System.out.println("Processing entry : " + entry.getValue()); System.out.println("Cube : " + entry.getCube()); System.out.println("Processed by : " + entry.getClassName().toString()); }

Modification de la classe BatchConsumer

On va modifier le consommateur fourni par défaut en deux classes distinctes, l’une traitant les séquences paires l’autre les séquences impaires.

Pour le consommateur impair on commence avec un décalage

private volatile long sequence = RingBuffer.INITIAL_CURSOR_VALUE + 1;

Dans les deux consommateurs on remplace le code suivant :

for (; nextSequence < = availableSequence; nextSequence++) {

par :

for (; nextSequence < = availableSequence; nextSequence = nextSequence + 2) {

Code complet

 package disruptor; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import com.lmax.disruptor.AbstractEntry; import com.lmax.disruptor.BatchConsumer; import com.lmax.disruptor.BatchHandler; import com.lmax.disruptor.ClaimStrategy; import com.lmax.disruptor.ConsumerBarrier; import com.lmax.disruptor.EntryFactory; import com.lmax.disruptor.ProducerBarrier; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.WaitStrategy; public class Disruptor { // Ring Buffer size private static int SIZE = 100; private static Executor EXECUTOR = Executors.newCachedThreadPool(); // Creation of the RB with optimized claim strategy and aggressive wait // strategy private static RingBuffer ringBuffer = new RingBuffer( ValueEntry.ENTRY_FACTORY, SIZE, ClaimStrategy.Option.SINGLE_THREADED, WaitStrategy.Option.BUSY_SPIN); /** * @param args * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { // Callback handler which can be implemented by consumers final BatchHandler batchHandler = new BatchHandler() { public void onAvailable(final ValueEntry entry) throws Exception { // process a new entry as it becomes available. entry.setCube((entry.value) * (entry.value) * (entry.value)); StackTraceElement trace = Thread.currentThread().getStackTrace()[3]; entry.setClassName(trace.getClassName()); } public void onEndOfBatch() throws Exception { // useful for flushing results to an IO device if necessary. } }; final BatchHandler batchLoger = new BatchHandler() { public void onAvailable(final ValueEntry entry) throws Exception { // process a new entry as it becomes available. System.out.println("Processing entry : " + entry.getValue()); System.out.println("Cube : " + entry.getCube()); System.out.println("Processed by : " + entry.getClassName().toString()); } public void onEndOfBatch() throws Exception { // useful for flushing results to an IO device if necessary. } }; ConsumerBarrier consumerBarrier1 = ringBuffer.createConsumerBarrier(); // batch consumers are tied to consumerBarrier1 BatchConsumerEven batchConsumerEven = new BatchConsumerEven(consumerBarrier1, batchHandler); BatchConsumerOdd batchConsumerOdd = new BatchConsumerOdd(consumerBarrier1, batchHandler); // consumerBarrier2 is tied to consumers batchConsumerEven and // batchConsumerOdd ConsumerBarrier consumerBarrier2 = ringBuffer.createConsumerBarrier(batchConsumerEven, batchConsumerOdd); // log consumer is tied to consumerBarrier2 BatchConsumer logConsumer = new BatchConsumer(consumerBarrier2, batchLoger); // Each consumer can run on a separate thread EXECUTOR.execute(batchConsumerEven); EXECUTOR.execute(batchConsumerOdd); EXECUTOR.execute(logConsumer); // Producer code ProducerBarrier producerBarrier = ringBuffer.createProducerBarrier(logConsumer); for (long i = 0; i < = 1001; i++) { // Producer claims entries in sequence ValueEntry entry = producerBarrier.nextEntry(); // copy data into the entry container entry.setValue(i); // make the entry available to consumers producerBarrier.commit(entry); } // Give 1 sec to end before exit Thread.sleep(1000); System.exit(0); } public final static class ValueEntry extends AbstractEntry { private long value; private long cubic; private String className; public long getCube() { return cubic; } public void setCube(long cube) { this.cubic = cube; } public long getValue() { return value; } public void setValue(final long value) { this.value = value; } public void setClassName(String className) { this.className = className; } public String getClassName() { return className; } public final static EntryFactory ENTRY_FACTORY = new EntryFactory() { public ValueEntry create() { return new ValueEntry(); } }; } }

A noter la taille du Ring Buffer, alors que l’on envisage de traiter 1000 données, on ne déclare qu’une taille de 100.

C’est pour illustrer le principe de recyclage des entrées du Ring Buffer.

De plus la taille est obligatoirement de type 2n (soit 128 dans notre cas).

Affichage en sortie (Extrait)

Processing entry : 0 Cube : 0 Processed by : disruptor.BatchConsumerEven Processing entry : 1 Cube : 1 Processed by : disruptor.BatchConsumerOdd Processing entry : 2 Cube : 8 Processed by : disruptor.BatchConsumerEven Processing entry : 3 Cube : 27 Processed by : disruptor.BatchConsumerOdd Processing entry : 4 Cube : 64 Processed by : disruptor.BatchConsumerEven Processing entry : 5 Cube : 125 Processed by : disruptor.BatchConsumerOdd Processing entry : 6 Cube : 216 Processed by : disruptor.BatchConsumerEven Processing entry : 7 Cube : 343 Processed by : disruptor.BatchConsumerOdd