Java:JMS
Sommaire
- 1 But
- 2 Implémentations JMS open source
- 3 Modes de diffusion
- 4 Modes de consommation
- 5 Fabrique de connexion
- 6 La connexion
- 7 La session
- 8 Le message
- 9 Objet d'envoi de messages
- 10 Listener pour le traitement de la réception asynchrone
- 11 Objet de réception de messages
- 12 Gestion des exceptions en asynchrone
But
JMS est une spécification de SUN permettant d'envoyer et de recevoir des messages entre applications ou composants Java.
Implémentations JMS open source
Modes de diffusion
Deux modes de diffusion des messages sont possibles:
- Point à point (Point to point): Un producteur publie un message dans une queue et un consommateur le lit et envoi une notification de réception du message afin de le supprimer de la queue (les messages ont une date d'expiration). Dans ce cas, le producteur connaît la destination du message. Ce mode peut s'apparenter à une messagerie mail.
- Publication/souscription (publish/subscribe): Un producteur publie un message à un ou plusieurs consommateurs abonnés au sujet (ou topic) du message. Dans ce cas, le message peut avoir plusieurs destinataires et le producteur ne les connaît pas. Ce mode peut s'apparenter à un forum de discussion.
Modes de consommation
Deux modes de consommation des messages sont possibles:
- Synchrone: L'application est arrêtée jusqu'à l'arrivée du message.
- Asynchrone: Les réponses aux messages sont traitées dans des threads permettant à l'application de faire autre chose en l'absence de message.
Fabrique de connexion
Objet fourni par l'implémentation JMS, obtenu généralement via JNDI, et implémentant l'interface:
- javax.jms.ConnectionFactory pour gérer les deux modes de diffusion avec une instance unique,
- javax.jms.QueueConnectionFactory pour un mode de diffusion en point à point (compatibilité avec JMS 1.0),
- javax.jms.TopicConnectionFactory pour un mode de diffusion en publication/souscription (compatibilité avec JMS 1.0).
Exemple avec OpenJMS
context = new InitialContext(); connectionFactory = (ConnectionFactory)context.lookup("ConnectionFactory");
Fichier jndi.properties:
... java.naming.provider.url=tcp://localhost:3035 java.naming.factory.initial=org.exolab.jms.jndi.InitialContextFactory java.naming.security.principal=admin java.naming.security.credentials=openjms ...
La connexion
Une fois la fabrique de connexion obtenu, on peut obtenir l'objet connexion suivant le type de fabrique:
Connection connection = connectionFactory.createConnection();
QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
TopicConnection topicConnection = topicConnectionFactory.createTopicConnection();
Suivant le type de fabrique, l'objet connexion implémente l'une des trois interfaces suivantes:
Pour démarrer ou redémarrer la connexion:
connection.start();
Pour fermer la connexion (à faire une fois la connexion inutile):
connection.close();
Pour suspendre temporairement la connexion:
connection.stop();
Pour indiquer le listener de gestion des exceptions (mode asynchrone):
connection.setExceptionListener(listener);
La session
Une fois la connexion obtenu, on peut obtenir l'objet session suivant le type de connexion:
Session session = connection.createSession(transacted, acknowledgeMode);
QueueSession queueSession = queueConnection.createQueueSession(transacted, acknowledgeMode);
TopicSession topicSession = topicConnection.createTopicSession(transacted, acknowledgeMode);
Le paramètre transacted est un booléen qui indique si la session gère une transaction.
Le paramètre acknowledgeMode indique le mode d'accusé de réception (ignoré si la session gère une transaction):
- javax.jms.Session.AUTO_ACKNOWLEDGE pour un accusé de réception automatique,
- javax.jms.Session.CLIENT_ACKNOWLEDGE pour un accusé de réception géré par le client à l'aide de la méthode acknowledge() du message,
- javax.jms.Session.DUPS_OK_ACKNOWLEDGE pour dupliquer un message.
Suivant le type de connexion, l'objet session implémente l'une des trois interfaces suivantes:
L'objet session permet de créer des messages et des objets pour les envoyer et les recevoir. Dans une application multithread, chaque thread doit avoir une session qui lui est propre (les objets sessions sont monothread).
Pour fermer la session:
session.close();
Pour valider la transaction (si transacted = true):
session.commit();
Pour abandonner une transaction (si transacted = true):
session.rollback();
Le message
Une fois la session obtenu, on peut créer différents types d'objets messages qui vont contenir les informations à transmettre.
Pour un message contenant un objet sérialisé:
ObjectMessage message = session.createObjectMessage();
ou:
ObjectMessage message = session.createObjectMessage(object);
Pour un message contenant du texte:
TextMessage message = session.createTextMessage();
ou:
TextMessage message = session.createTextMessage(text);
Pour un message composé d'une collection de paire clé/valeur:
MapMessage message = session.createMapMessage();
Pour un message composé d'octets:
BytesMessage message = session.createBytesMessage();
Pour un message issu d'un flux:
StreamMessage message = session.createStreamMessage();
Suivant la méthode appelée pour créer l'objet message, ce dernier va implémenter l'une des interfaces suivantes:
- javax.jms.ObjectMessage
- javax.jms.TextMessage
- javax.jms.MapMessage
- javax.jms.BytesMessage
- javax.jms.StreamMessage
Pour envoyer un accusé de réception (si acknowledgeMode = CLIENT_ACKNOWLEDGE):
message.acknowledge();
Chaque message est constitué de trois parties:
- l'en-tête (header) qui contient les données techniques pour identifier et acheminer le message,
- les propriétés (properties) qui contient les données fonctionnelles qui peuvent être utilisées pour filtrer les messages,
- et le corps du message (body) qui contient les données du message.
Champs de l'en-tête du message
- JMSMessageID: Identifiant unique du message.
- JMSDestination: File d'attente ou topic destinataire du message.
- JMSCorrelationID et JMSCorrelationIDAsBytes: Dans le cas d'un message réponse, contient l'identifiant du message requête.
- JMSDeliveryMode
- JMSExpiration
- JMSPriority
- JMSRedelivered
- JMSReplyTo
- JMSTimestamp
- JMSType
La plupart de ces données sont renseignées lors de l'envoi du message.
Les propriétés du message
Pour mettre à jour et lire une propriété de type nombre entier 8 bits:
byte value = message.getByteProperty(name); message.setByteProperty(name, value);
Pour mettre à jour et lire une propriété de type nombre entier 16 bits:
short value = message.getShortProperty(name); message.setShortProperty(name, value);
Pour mettre à jour et lire une propriété de type nombre entier 32 bits:
int value = message.getIntProperty(name); message.setIntProperty(name, value);
Pour mettre à jour et lire une propriété de type nombre entier 64 bits:
long value = message.getLongProperty(name); message.setLongProperty(name, value);
Pour mettre à jour et lire une propriété de type nombre réel simple précision:
float value = message.getFloatProperty(name); message.setFloatProperty(name, value);
Pour mettre à jour et lire une propriété de type nombre réel double précision:
double value = message.getDoubleProperty(name); message.setDoubleProperty(name, value);
Pour mettre à jour et lire une propriété de type booléen:
boolean value = message.getBooleanProperty(name); message.setBooleanProperty(name, value);
Pour mettre à jour et lire une propriété de type chaîne de caractères:
String value = message.getStringProperty(name); message.setStringProperty(name, value);
Pour mettre à jour et lire une propriété de type objet quelconque:
Object value = message.getObjectProperty(name); message.setObjectProperty(name, value);
Pour supprimer toutes les propriétés:
message.clearProperties();
Pour obtenir la liste des clés des propriétés (name):
Enumeration propertyNames = message.getPropertyNames();
Pour tester la présence d'une propriété:
boolean exists = message.propertyExists(name);
Objet d'envoi de messages
Pour envoyer un message, il faut créer un objet chargé de l'envoi des messages. Avec un objet Session et pour le mode point à point:
Destination destination = session.createQueue("queue:///file.out?expiry=0&persistence=1&targetClient=1"); MessageProducer messageProducer = session.createProducer(destination);
Pour le mode publication/souscription:
Destination destination = session.createTopic(topicName); MessageProducer messageProducer = session.createProducer(destination);
Avec les autres types de sessions:
Queue queue = queueSession.createQueue("queue:///file.out?expiry=0&persistence=1&targetClient=1"); QueueSender queueSender = queueSession.createSender(queue);
Topic topic = topicSession.createTopic(topicName); TopicPublisher topicPublisher = topicSession.createPublisher(topic);
Il est possible d'obtenir l'objet Destination via JNDI:
Destination destination = (Destination)context.lookup("queue1");
L'objet d'envoi de messages va implémenter l'une des interfaces suivantes:
- javax.jms.MessageProducer
- javax.jms.QueueSender (qui hérite de MessageProducer)
- javax.jms.TopicPublisher (qui hérite de MessageProducer)
Pour envoyer un message:
messageProducer.send(message);
queueSender.send(message);
topicPublisher.publish(message);
Listener pour le traitement de la réception asynchrone
Le listener est un objet qui va implémenter l'interface javax.jms.MessageListener. Celle-ci impose de définir la méthode suivante:
void onMessage(Message message) { // Traitement de réception du message }
Objet de réception de messages
Pour recevoir un message, il faut créer un objet chargé de la réception des messages. Avec un objet Session et pour le mode point à point:
Destination destination = session.createQueue("queue:///file.out?expiry=0&persistence=1&targetClient=1"); MessageConsumer messageConsumer = session.createConsumer(destination);
Pour le mode publication/souscription:
Destination destination = session.createTopic(topicName); MessageConsumer messageConsumer = session.createConsumer(destination);
Avec les autres types de sessions:
Queue queue = queueSession.createQueue("queue:///file.out?expiry=0&persistence=1&targetClient=1"); QueueReceiver messageConsumer = queueSession.createReceiver(queue);
Topic topic = topicSession.createTopic(topicName); TopicSubscriber messageConsumer = topicSession.createSubscriber(topic);
Il est possible d'obtenir l'objet Destination via JNDI:
Destination destination = (Destination)context.lookup("queue1");
Il est possible de définir des filtres sur les valeurs de l'en-tête ou des propriétés des messages:
MessageConsumer messageConsumer = session.createConsumer(destination, "JMSCorrelationID = '" + correlId +"'");
QueueReceiver messageConsumer = queueSession.createReceiver(queue, "JMSCorrelationID = '" + correlId +"'");
TopicSubscriber messageConsumer = topicSession.createSubscriber(topic, "JMSCorrelationID = '" + correlId +"'");
Les opérateurs autorisés pour les filtres, qui s'inspirent de la norme SQL 92, sont: =, <>, >, >=, <, <=, between X and Y, in (a1,a2, ..., an), like, is null, etc...
L'objet de réception de messages va implémenter l'une des interfaces suivantes:
- javax.jms.MessageConsumer
- javax.jms.QueueReceiver (qui hérite de MessageConsumer)
- javax.jms.TopicSubscriber (qui hérite de MessageConsumer)
Pour rendre inactif la réception des messages:
messageConsumer.close();
Pour attendre l'arrivée d'un message (mode synchrone):
Message message = messageConsumer.receive();
Pour attendre l'arrivée d'un message pendant un nombre de millisecondes (mode synchrone):
Message message = messageConsumer.receive(timeout);
Pour renvoyer un message sans attendre s'il y en a un présent (mode synchrone):
Message message = messageConsumer.receiveNoWait();
Pour indiquer le listener (mode asynchrone):
messageConsumer.setMessageListener(listener);
Gestion des exceptions en asynchrone
Pour gérer les exceptions dans le mode asynchrone, on créé un objet implémentant l'interface javax.jms.ExceptionListener. Celle-ci impose de définir la méthode suivante:
void onException(JMSException exception) { // Traitement des exceptions }
L'enregistrement de cet objet se fait à l'aide de la méthode setExceptionListener de l'objet connexion.