La première partie de ce post a introduit les MBeans que WebLogic expose concernant les ressources JMS.
Nous avions commencé à les utiliser pour monitorer les queues JMS.
Dans cette deuxième partie, nous allons utiliser ces même Mbeans pour agir sur ces queues.
(Là aussi, ce sont des fonctionnalités proposées par la console WebLogic, pour des besoins ponctuels elle suffira amplement)
Supprimer des messages
Il est parfois nécessaire de supprimer des messages JMS d’une queue car ils ne sont pas/plus aptes à être consommés par l’application.
Une première approche serait tout simplement d’écrire un client qui consomme les messages en question avec les apis JMS. Cette approche fonctionnerait alors quelque soit le middleware JMS.
L’approche décrite ici effectue cette suppression via les apis JMX spécifiques de WebLogic.
Le premier avantage est que cela se révèle très simple (Groovy étant là pour simplifier l’usage de JMX)
Ensuite, on peut supposer que cette approche est plus performante (en particulier, il n’est pas nécessaire de récupérer le message pour le supprimer) et cela fonctionne même avec des messages Delayed : un message avec un Time-To-Deliver n’est pas visible immédiatement par un simple client JMS.
JMSDestinationRuntimeMBean propose donc l’opération deleteMessages pour supprimer les messages. Elle prend un unique argument qui doit être un selector JMS (On trouvera une description des selector jms dans l’api de l’interface Message : http://java.sun.com/j2ee/sdk_1.3/techdocs/api/javax/jms/Message.html).
Voici donc un premier script Groovy qui supprime tous les messages de type MyCustomJMSType de la queue MyFirstDistributedQueue sur tous les serveurs sur laquelle elle est déployée ( la requête JMX renvoyant un DestinationMBean pour chacun des serveurs ) :
import javax.management.ObjectName; import javax.management.Query; import fr.ippon.groovy.util.WebLoMBean; def mbs = WebLoMBean.getDomainMBeanServer('t3://localhost:7001', 'weblogic', 'weblogic') def destFilter = javax.management.Query.with { match(attr('Name'),value('@MyFirstDistributedQueue')) } def destinationMBeanNames = mbs.queryNames(new ObjectName('com.bea:Type=JMSDestinationRuntime,'), destFilter) def destinationMbeans = destinationMBeanNames.collect { new GroovyMBean(mbs, it) } def selector = "JMSType = 'MyCustomJMSType'" destinationMbeans.each { mbean -> def nbMsg = mbean.deleteMessages(selector) def server = mbean.name().getKeyProperty('Location') println "$nbMsg message(s) removed on jms server $server" }
On pourra aussi spécifier une liste précise de message à supprimer en se basant sur leur JMSMessageID :
En groovy, on pourra alors construire le selector de cette façon :
def ids = """ ID:<294882.1256553770265.0> ID:<294882.1256553799937.1> ID:<294882.1256554027953.0> """ def selector = "JMSMessageID IN ('"+ids.readLines().join("','")+"')";
Ce qui donne :
JMSMessageID IN (”,’ID:<294882.1256553770265.0>’,’ID:<294882.1256553799937.1>’,’ID:<294882.1256554027953.0>’)
Cette fonctionnalité est disponible dans la console WebLogic depuis l’onglet Monitoring de chaque queue JMS
Déplacer des messages
La raison la plus classique pour déplacer un message d’une queue à une autre est le rejeu des messages arrivés en queue d’erreur.
On veut alors déplacer ces messages de la queue d’erreur vers la queue d’origine afin que le système réessaye de les traiter.
Là aussi l’approche classique est d’utiliser les apis JMS : consommer le message puis le reposter dans la queue cible.
Les apis jmx de WebLogic nous proposent une solution plus souple et certainement plus performante.
Il faut utiliser l’opération moveMessages de JMSDestinationRuntimeMBean.
Elle prend en particulier deux arguments : un selector JMS, la queue cible (targetDestination).
La queue source étant la queue gérée par l’instance de JMSDestinationRuntimeMBean manipulée
(Une seconde signature de moveMessages est disponible, et propose un troisième argument qui permet de spécifier le statut WebLogic des messages à déplacer, nous en parlerons dans la troisième partie)
L’argument targetDestination est de type javax.management.openmbean.CompositeData.
CompositeData est un type JMX standard permettant de décrire et d’échanger une structure de donnée arbitrairement complexe entre un client et serveur jmx.
Ici cet argument n’a pas besoin d’être construit, il doit être récupéré sur le MBean de la queue cible, via son attribut DestinationInfo
Le script suivant déplace tous les messages (le selector est vide ici) de la queue MyFirstDistributedQueue_error vers la queue MyFirstDistributedQueue.
Il est légèrement plus compliqué que le précédent car il doit recupérer le destinationInfo de la queue cible. S’agissant de queues distribuées sur plusieurs noeuds, il se débrouille pour récupérer la queue physique cible déployée sur le même serveur jms que la queue physique source. ( On notera que moveMessages fonctionne même si le déplacement s’effectue entre deux serveurs jms différents )
import javax.management.ObjectName; import javax.management.Query; import fr.ippon.groovy.util.WebLoMBean; def mbs = WebLoMBean.getDomainMBeanServer('t3://localhost:7001', 'weblogic', 'weblogic') def sourceFilter = javax.management.Query.with { match(attr('Name'),value('@MyFirstDistributedQueue_error')) } def sourceMBeanNames = mbs.queryNames(new ObjectName('com.bea:Type=JMSDestinationRuntime,'), sourceFilter) def sourceMbeans = sourceMBeanNames.collect { new GroovyMBean(mbs, it) } sourceMbeans.each { sourceMbean -> def jmsServerName = sourceMbean.name().getKeyProperty('JMSServerRuntime') // recherche de la queue cible sur le même serveur JMS : def targetFilter = javax.management.Query.with { match(attr('Name'),value('@MyFirstDistributedQueue')) } def targetPattern = "com.bea:Type=JMSDestinationRuntime,JMSServerRuntime=$jmsServerName," def targetMBeanNames = mbs.queryNames(new ObjectName(targetPattern), targetFilter) def targetMBeanName = (targetMBeanNames as List)[0] def targetMbean = new GroovyMBean(mbs, targetMBeanName) def targetDestinationInfo = targetMbean.DestinationInfo // déplacement de tous les messages def selector = null def nbMsgMoved = sourceMbean.moveMessages(selector, targetDestinationInfo) println "$nbMsgMoved message(s) moved on server $jmsServerName" }
Cette fonctionnalité est disponible dans la console WebLogic depuis l’onglet Monitoring de chaque queue JMS. Il faudra faire l’opération autant de fois que queues physiques en passant par la fonctionnalité ‘Show Messages’.
Mettre en pause la consommation des messages dans une queue
JMSDestinationRuntimeMBean propose deux méthodes très simples pour mettre en pause la consommation des messages d’une queue (sans empêcher leur dépôt) et pour ré-activer cette consommation : pauseConsumption et resumeConsumption
Le script suivant dé-active ou ré-active la consommation (suivant l’état courant) de chaque queue physique correspondant à la queue distribuée MyFirstDistributedQueue
import javax.management.ObjectName; import javax.management.Query; import fr.ippon.groovy.util.WebLoMBean; def mbs = WebLoMBean.getDomainMBeanServer('t3://localhost:7001', 'weblogic', 'weblogic') def destFilter = javax.management.Query.with { match(attr('Name'),value('@MyFirstDistributedQueue')) } def destinationMBeanNames = mbs.queryNames(new ObjectName('com.bea:Type=JMSDestinationRuntime,'), destFilter) def destinationMbeans = destinationMBeanNames.collect { new GroovyMBean(mbs, it) } destinationMbeans.each { mbean -> def isPaused = mbean.ConsumptionPaused def jmsServer = mbean.name().getKeyProperty('JMSServerRuntime') if(isPaused) { println "The consumption is paused on server $jmsServer, re-enabling it ..." mbean.resumeConsumption() } else { println "Pausing comsumption on $jmsServer ..." mbean.pauseConsumption() } }
Cette fonctionnalité est disponible dans la console WebLogic : au niveau chaque serveurs JMS : Onglet Monitoring, sous-onglet Active Destinations (pour une queue distribuée, il faudra donc faire la manip pour chacun des serveurs jms sur laquelle elle est déployée)
Cela clos cette partie sur la gestion des queues. Vous trouverez ci-joint les scripts groovy mettant en oeuvre le code proposé dans cet article.
Dans la dernière partie, nous verrons comment lire les messages d’une queue en JMX : ce qui permet en particulier de lire ceux à l’état pending.