Gestion des ressources JMS de WebLogic en JMX avec Groovy – 3ème partie accès au contenu des messages

Les deux premières parties de ce post ont introduit le MBean que WebLogic expose pour les ressources JMS et montré comment l’utiliser pour monitorer ou manipuler les qeues JMS, cette troisième et dernière partie l’utilisera pour lire le contenu des messages JMS.

Vous allez me dire à quoi cela sert-il d’utiliser JMX pour lire le contenu d’un message ? Et vous aurez raison, la plupart du temps l’utilisation des apis JMS (en particulier QueueBrowser) est préférable puisque le code est alors portable quelque soit le provider jms.
L’utilisation d’une client JMS dédié (tel que HermesJMS) est encore plus simple.
Dans les deux cas, il faudra juste faire attention dans le cas de queues distribuées (Sous Weblogic 9 et 10, pour des queues uniformément distribuées, il faut préfixer le nom jndi de la queue distribuée par le nom du serveur jms pour atteindre une queue physique en particulier : par exemple : JMSServer-0@jms/MyFirstDistributedQueue)

Il existe toutefois une très bonne raison pour utiliser les api jmx de WebLogic pour lire le contenu d’un message jms : lorsque l’on veut pouvoir lire les messages pending.

Les messages pending dans WebLogic

Un message est à l’état pending lorsqu’il est présent dans une queue mais qu’il n’est pas encore ou plus visible par un client utilisant les APIs JMS.
Un message est pending dans les trois cas suivant :

  • il a été déposé dans la queue dans le cadre d’une transaction (qu’elle soit JMS ou JTA) qui n’a pas encore été commitée (ou rollbackée, auquel cas, il ne sera jamais visible dans la queue ) => ‘Pending sent’ dans la console WebLogic
  • le message est en cours de consommation par un client, là encore dans le cadre d’une transaction qui n’est pas terminée. D’autres clients jms lisant la queue ne voit alors plus le message. => ‘Pending received’ dans la console WebLogic
  • le message a été déposé avec un “DeliveryTime” dans le futur (on trouve aussi “TimeToDeliver” dans la doc) : cette fonctionnalité de WebLogic permet de poster un message mais de faire en sorte qu’il ne soit visible et donc consommable que plus tard ( cf http://download.oracle.com/docs/cd/E13222_01/wls/docs103/jms/manage_apps.html#SettingMessageDeliveryTimes), on le voit alors dans la console au statut “Delayed”.

Dans les deux premiers cas, l’état pending dure normalement peu de temps (le temps de la transaction), dans le dernier cas, cela peut durer quelque milli-secondes comme des jours ou des semaines.

Dans mon cas, j’ai du m’intéresser aux messages pending pour pouvoir lire les messages jms de WLI (WebLogic Integration) qui utilise des messages jms au statut “Delayed” pour implémenter les timer.

1 – Lister les messages d’une queue

Rentrons maintenant dans le vif du sujet, comme nous l’avions vu très rapidement dans la première partie, le MBean JMSDestinationRuntime propose une opération getMessages permettant de sélectionner une liste de messages dans une queue avec éventuellement un selector jms.
Nous allons voir ici comment vraiment l’utiliser.

getMessages prend en paramètre un selector jms et un timeout déterminant la durée de validité du curseur renvoyée. (Ce curseur est un String identifiant le résultat de la recherche resté sur le serveur)
Il faut ensuite utiliser des apis de pagination permettant de manipuler ce curseur : j’utiliserais ici uniquement l’opération getNext()
getNext() prend en paramètre le curseur et le nombre de message à renvoyer. Les messages sont renvoyés sous la forme de CompositeData (que l’on avait déjà rencontré la dernière fois).

Ce CompositeData a ici une structure simple : c’est uniquement une liste de clef/valeur.
Dans le code groovy ci-dessous, la closure describeCompositeData a pour rôle d’afficher le contenu de l’objet CompositeData.

Voici donc un premier script, nous permettant de voir le contenu de ces CompositeData pour l’ensemble des messages JMS de type MyCustomJMSType (filtrer via le selector) de la queue MyFirstDistributedQueue (sur tous les noeuds du domaine WebLogic) :

import javax.management.ObjectName;
import javax.management.Query;
import javax.management.openmbean.CompositeData;

import fr.ippon.groovy.util.WebLoMBean;

def mbs = WebLoMBean.getDomainMBeanServer('t3://localhost:7001', 'weblogic', 'weblogic')

def destFilter = javax.management.Query.with {
  match(attr('Name'),value('*@MyFirstDistributedQueue'))
}

def destinationMBeanNames = mbs.queryNames(new ObjectName('com.bea:Type=JMSDestinationRuntime,*'), destFilter)
def destinationMbeans = destinationMBeanNames.collect { new GroovyMBean(mbs, it) }

def selector = "JMSType = 'MyCustomJMSType'"

// closure d'analyse des CompositeData utilisée plus bas
def describeCompositeData = { CompositeData item ->
  def compositeType = item.getCompositeType()
  def keySet = compositeType.keySet()
  keySet.each { key ->
    def opentype = compositeType.getType(key)
    // rq : tous les champs de ce compositeData sont des
    // javax.management.openmbean.SimpleType
    // Le type est accessible directement :
    def type = opentype.className
    def description = compositeType.getDescription(key)
    def value = item.get(key)
    printf " %-15s : %-17s -> %s %n %s %n", key , type, description, value
  }
}

destinationMbeans.each { mbean ->

  def timeout = 1000
  def cursor = mbean.getMessages(selector, timeout)

  def nbMsg = mbean.getCursorSize(cursor)
  println "Found $nbMsg msg(s) on queue $mbean.Name"

  def batchSize = 10
  def readMessage = 0
  while(readMessage < nbMsg) {
    def items = mbean.getNext(cursor,batchSize)
    items.each { CompositeData item ->
      ++readMessage
      println "\nMsg n°$readMessage :"
      describeCompositeData(item)
    }
  }
  mbean.closeCursor(cursor)
}

Voici le résultat d’une exécution :

Found 1 msg on queue MyModule!JMSServer-0@MyFirstDistributedQueue

Msg n°1 :
  BodyIncluded : java.lang.Boolean -> A boolean that indicates whether the JMS message item includes the body.
    false
  ConsumerID : java.lang.String -> Information that identifies the consumer of the message
    null
  DestinationName : java.lang.String -> The destination name on which the message is pending.
    MyModule!JMSServer-0@MyFirstDistributedQueue
  Handle : java.lang.Long -> A handle that identifies this object in the cursor.
    9
  MessageSize : java.lang.Long -> The size of the message in bytes.
    49
  MessageXMLText : java.lang.String -> The message in XML String representation. Note that the message body may be ommitted if the IncludeBody attribute is false.
<mes:WLJMSMessage xmlns:mes="http://www.bea.com/WLS/JMS/Message">
  <mes:Header>
    <mes:JMSMessageID>ID:&lt;841334.1277232706718.0></mes:JMSMessageID>
    <mes:JMSDeliveryMode>PERSISTENT</mes:JMSDeliveryMode>
    <mes:JMSExpiration>0</mes:JMSExpiration>
    <mes:JMSPriority>4</mes:JMSPriority>
    <mes:JMSRedelivered>false</mes:JMSRedelivered>
    <mes:JMSTimestamp>1277232706718</mes:JMSTimestamp>
    <mes:JMSType>MyCustomJMSType</mes:JMSType>
    <mes:Properties>
      <mes:property name="JMS_BEA_DeliveryTime">
        <mes:Long>2555936567849</mes:Long>
      </mes:property>
      <mes:property name="JMSXDeliveryCount">
        <mes:Int>0</mes:Int>
      </mes:property>
    </mes:Properties>
  </mes:Header>
</mes:WLJMSMessage>
  SequenceNumber  : java.lang.Long    -> The sequence number of the message that indicates its position in the FIFO ordering of the destination.
    9
  State           : java.lang.Integer -> The state of the message at the time of the management operation invocation.
    32
  VersionNumber   : java.lang.Integer -> The JMS version number.
    1
  XidString       : java.lang.String  -> The Xid of the transaction for which this message is pending.
    null

La description accessible via le CompositeType du ComposiData permettent d’expliquer le rôle de chaque clef.
En particulier, on peut lire que le contenu du message (body) n’est accessible dans le contenu de la clef MessageXMLText que si la clef BodyIncluded est à vrai. Ce qui n’est manifestement pas le cas ici …

2 – Lire le contenu d’un message

Pour avoir le contenu du message, il faut appeler l’opération getMessage par chaque message.
Le MBean JMSDestinationRuntime propose 3 signatures de l’opération getMessage :

String [] { "java.lang.String" }                     => JMSMessageID
String [] { "java.lang.String", "java.lang.Long" }   => curseur, handle
String [] { "java.lang.String", "java.lang.String" } => curseur, JMSMessageID

Les trois renvoient à nouveau un CompositeData, mais seules les deux dernières permettent de voir les messages pending. J’utiliserais ici la deuxième.
Le paramètre handle doit être récupérée depuis le CompositeData déjà obtenue. (Notons qu’en Groovy, une méthode get(key) peut être appelée sur n’importe quelle objet avec la syntaxe [], le handle peut donc s’obtenir simplement via : item[‘Handle’])

Normalement il suffirait donc de faire cela dans la boucle du précédent script pour obtenir un nouveau CompositeData mais avec le contenu chargé cette fois :

def msgHandle = item['Handle']

def itemWithBody = mbean.getMessage(cursor,msgHandle)
describeCompositeData(itemWithBody)

Toutefois, sous WebLogic 9, nous tomberons sur un bug/une limitation de GroovyMBean : (http://jira.codehaus.org/browse/GROOVY-2807) : en gros lorsqu’une opération JMX a été surchargée, GroovyMBean ne peut pas différencier celle qui ont le même nombre de paramètre et appelle toujours l’une ou l’autre, ce qui provoque alors des exceptions côtés serveur car les paramètres n’auront pas forcément le bon type. C’est le cas de l’opération getMessage. (Rq : sous WebLogic 10, le bug se manifestera si on essaye d’appeler la troisième signature de getMessage).
Pour contourner le bug, il est possible d’utiliser directement les apis JMX : (Grrr… justement celles qu’on voulait éviter… ;-() ) :

def itemWithBody = mbean.server().invoke(
  mbean.name(),
  'getMessage',
  [cursor,msgHandle] as Object[],
  ['java.lang.String', 'java.lang.Long'] as String[])

Ces corrections apportées voici un exemple de sortie du script modifié :

Found 1 msg(s) on queue MyModule!JMSServer-0@MyFirstDistributedQueue

Msg n°1 :
  BodyIncluded : java.lang.Boolean -> A boolean that indicates whether the JMS message item includes the body.
    true
  ConsumerID : java.lang.String -> Information that identifies the consumer of the message
    null
  DestinationName : java.lang.String -> The destination name on which the message is pending.
    MyModule!JMSServer-0@MyFirstDistributedQueue
  Handle : java.lang.Long -> A handle that identifies this object in the cursor.
    9
  MessageSize : java.lang.Long -> The size of the message in bytes.
    49
  MessageXMLText : java.lang.String -> The message in XML String representation. Note that the message body may be ommitted if the IncludeBody attribute is false.
<mes:WLJMSMessage xmlns:mes="http://www.bea.com/WLS/JMS/Message">
  <mes:Header>
    <mes:JMSMessageID>ID:&lt;841334.1277232706718.0></mes:JMSMessageID>
    <mes:JMSDeliveryMode>PERSISTENT</mes:JMSDeliveryMode>
    <mes:JMSExpiration>0</mes:JMSExpiration>
    <mes:JMSPriority>4</mes:JMSPriority>
    <mes:JMSRedelivered>false</mes:JMSRedelivered>
    <mes:JMSTimestamp>1277232706718</mes:JMSTimestamp>
    <mes:JMSType>MyCustomJMSType</mes:JMSType>
    <mes:Properties>
      <mes:property name="JMS_BEA_DeliveryTime">
        <mes:Long>2555936567849</mes:Long>
      </mes:property>
      <mes:property name="JMSXDeliveryCount">
        <mes:Int>0</mes:Int>
      </mes:property>
    </mes:Properties>
  </mes:Header>
  <mes:Body>
    <mes:Text>Text Payload of my delayed message</mes:Text>
  </mes:Body>
</mes:WLJMSMessage>
  SequenceNumber  : java.lang.Long    -> The sequence number of the message that indicates its position in the FIFO ordering of the destination.
    9
  State           : java.lang.Integer -> The state of the message at the time of the management operation invocation.
    32
  VersionNumber   : java.lang.Integer -> The JMS version number.
    1
  XidString       : java.lang.String  -> The Xid of the transaction for which this message is pending.
    null

La valeur de clef MessageXMLText du CompositeData contient maintenant une balise <mes:Body> contenant le contenu du message.
Ici il s’agit d’un message jms de type texte : la payload est contenue dans la balise <mes:Text>
Les messages de type objet seront stockés dans une balise <mes:Object> (objet java sérialisé, encodé en base64)
(Les autres types de message (Bytes, Map, Stream) ont probablement leur balise dédiée, je ne les ai pas rencontrés)

3 – Extraire les informations du message

Les informations obtenues jusqu’à présent sont encore peu exploitables. Je vais maintenant m’attacher à deux points : décoder la clef State du CompositeData et extraire les différents éléments du message jms de la clef MessageXMLText

3.1 – Decodage du statut

Dans le CompositeData représentant le message jms, la clef State contient le statut du message. Sa valeur est un entier qu’il faut décoder à partir des bits de sa représentation binaire.

En comparant la valeur de ce champ State avec ce qu’affiche la console WebLogic, j’ai pu déduire la signification de la plupart de ces bits :

WebloJMSBitState.jpg

Ainsi un état à 32 correspond uniquement au bit 5 activé : le message est Delayed.
Un état à 12 (4+8, 00001100 en binaire) correspond à un message en cours de réception ( 4 : “Pending Receive”) dans le cadre d’une transaction (8 : “In transaction”) (je suppose qu’il s’agit ici d’une transaction JTA, je n’ai jamais pris le temps de vérifier en détail)

Certaines combinaisons de bit ne sont pas possibles : par exemple le bit 0 à 1 (message visible) est incompatible avec l’activation de tous les autres bits.

Je n’ai jamais rencontré le bit 4 activé (c’est probablement un état très bref, peut-être la 2ème phase d’un Two-Phase commit ?). Les bit 6 et 7 ne sont probablement pas utilisés. En tout cas, si vous rencontrez des messages avec ces bits activés, merci de nous en faire profiter.

Voici un bout de code Groovy permettant de décoder le champ State du CompositeData :

def state = item['State']

def BIT_TO_DESCRIPTION = [
  (1 << 0) : "Visible",
  (1 << 1) : "Pending sent",
  (1 << 2) : "Pending receive",
  (1 << 3) : "In transaction",
  (1 << 4) : "Bit 2^4 (?)",
  (1 << 5) : "Delayed",
  (1 << 6) : "Bit 2^6 (?)",
  (1 << 7) : "Bit 2^7 (?)"
]

// décodage du statut :
def stateElement = [];
BIT_TO_DESCRIPTION.each { bit, description ->
  if(state & bit) {
    stateElement << description
  }
}
def stateDescription = stateElement.join(", ")
println "State : $stateDescription"

3.2 – Parsing du XML

Pour extraire les headers, les properties et la payload du champ MessageXMLText, il faut parser le xml.

Groovy offre plusieurs méthodes de parsing XML, j’utiliserais ici XMLParser, qui est très simple d’utilisation. Cette classe permet de naviguer dans un document XML d’une manière librement inspirée du langage XPath (et adaptée à la syntaxe Groovy)

Parsing du xml :

def messageXMLText = itemWithBody['MessageXMLText']
def parser = new XmlParser(false,false)
def xml = parser.parseText(messageXMLText)

Rq : ici, XmlParser est utilisé sans gestion des namespaces (2ème param du constructeur à false), c’est moins robuste mais simplifie beaucoup le code de parsing.

La variable xml correspond ici au noeud racine : ‘mes:WLJMSMessage’
On sélectionne ensuite l’unique noeud “mes:Header” de cette façon :

def header = xml.'mes:Header'[0]

Le noeud “mes:Header” contient un enfant par headers JMS (les headers sont standarts) plus un sous-élément “mes:Properties” qui contient les propriétés JMS custom présentent sur le message

L’extraction des header jms peut s’effectuer en considérant tous les noeuds enfants ( header.’*’ ) à l’exception du noeud “mes:Propertes” (le findAll permet de le filtrer ) :

// headers jms:
println "Headers : "
header.'*'.findAll { it.name() != 'mes:Properties' }.each { node ->
  def headerName = node.name()[4..-1] // suppression du préfixe mes:
  def value = node.text()
  printf "  %-20s : %s%n" , headerName, value
}

L’extraction des properties, s’effectue en considérant tous les sous-éléments “mes:property” de l’élément “mes:Properties” (header.’mes:Properties'[0].’mes:property’). Pour chacun de ces éléments :
– l’attribut name donne le nom de la propriété (node.’@name’)
– un unique sous-élément nommé suivant le type de la propriété : par exemple “mes:Long” ou “mes:Int” ( node.’*'[0].name() ) ; contient la valeur de la propriété : ( node.’*'[0].text() )

// custom properties :
println "Properties : "
header.'mes:Properties'[0].'mes:property'.each { node ->
  def propertyName = node.'@name'
  def valueNode = node.'*'[0]
  // le type peut-être en particulier : String, Boolean, Int, Long, ...
  def type = valueNode.name()[4..-1] // suppression du préfixe mes: ...
  def value = valueNode.text()
  printf "  %-20s (type : %5s) : %s%n" , propertyName, type, value
}

Finalement, la payload doit être extrait de l’élément “mes:Body” ( xml.’mes:Body'[0] ) : s’il s’agit d’un message jms de type texte, il est contenu dans un élément “mes:Text” ( xml.’mes:Body'[0].’mes:Text'[0] ).
Si ce n’est pas un message text, le sous-élément de “mes:Body” sera différent et il faudra décoder sa valeur ( ainsi s’il s’agit d’un “mes:object”, son contenu étant un objet java sérialisé, encodé en base64, il faudra décoder son contenu avec Base64Decoder par exemple puis lire les octets obtenues via un ObjectInputStream pour obtenir l’objet java , ce qui n’est pas montré ici). Pour simplifier, le code suivant ne gère que les messages de type texte :

def payload = xml.'mes:Body'[0]

if(payload.'mes:Text') {
  println "Content : (this is a text message) : "
  println payload.'mes:Text'[0].text();
} else {
  def payloadType = payload.'*'[0].name()[4..-1] // suppression du préfixe mes: ...
  println "Content : The type of this message is '$payloadType', extraction not implemented"
}

Exemple de sortie pour le message des exemples précédents :

State : Delayed
Headers :
  JMSMessageID : ID:<841334.1277232706718.0>
  JMSDeliveryMode : PERSISTENT
  JMSExpiration : 0
  JMSPriority : 4
  JMSRedelivered : false
  JMSTimestamp : 1277232706718
  JMSType : MyCustomJMSType
Properties :
  JMS_BEA_DeliveryTime (type : Long) : 2555936567849
  JMSXDeliveryCount (type : Int) : 0
Content : (this is a text message) :
Text Payload of my delayed message

4 – Paramètre state de l’opération getMessages

Un dernier mot sur l’opération getMessages, elle a une deuxième signature :

String [] { "java.lang.String", "java.lang.Integer", "java.lang.Integer" } -> selector, timeout, state

Le troisième paramètre permet de filtrer les messages obtenues sur le champ State de ceux-ci.
Il s’agit d’un masque (bitmask) appliqué sur le champ State des messages JMS.

Pour obtenir tous les messages, on peut utiliser : 1111 1111 (255)
Pour obtenir les messages visibles on peut utiliser : 0000 0001 (1)
Pour obtenir les messages pending quelqu’il soit : 1111 1110 (254)
(C’est à dire tous les messages sauf les visibles)
Pour obtenir uniquement les messages delayed : 1111 1000 (248)
(C’est à dire ni visible, ni pending receive, ni pending sent)

Voilà, notre grand tour de JMSDestinationRuntime est terminé. J’espère que les courageux qui seront arrivés jusque là y auront trouvés des informations utiles pour leur projet actuel ou futur mettant en oeuvre le module jms de WebLogic.

Vous trouverez ci-joint 2 scripts groovy mettant en oeuvre le code proposé dans cette article.

Tweet about this on TwitterShare on FacebookGoogle+Share on LinkedIn

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *


*