1 JMS应用程序有四个组成部分:JMS服务提供者、消息管理对象、消息的生产者
消费者和消息本身。
JMS服务提供者实现消息
队列和通知,同时实现消息管理的API。JMS已经是
J2EE API的一部分,J2EE服务器都提供JMS服务。
消息管理对象提供对消息进行操作的API。JMS API中有两个消息管理对象:ConnectionFactory和Destination,根据消息的消费方式的不同 ConnectionFactory可以分为QueueConnectionFactory和 TopicConnectionFactory,Destination可以分为Queue和Topic。用这两个管理对象可以建立到消息服务的会话。
消息的生产者和消费者。它们可以毫不相干,只需要消息的消费者知道如何使用消息即可。根据消息消费者数目的不同,消息的消费者分为两类:subscriber 和receiver,同样
消息发送者也分为两类:Publisher和Sender。
消息。JMS API规定了五种消息:Message、MapMessage、TextMessage、ByteMessage、StreamMessage和ObjectMessage。
2 API:
QueueConnectionFactory和TopicConnectionFactory 连接工厂用来生成QueueConnection和TopicConnection的实例
QueueConnection和TopicConnection连接对象用来建立到JMS的连接并生成会话实例
QueueSession和TopicSession会话对象用来创建消息、消息的生产者和消息的消费者(解释一下消息的生产者:它并不代表生成消息实例的对象而是指将消息发送到JMS的对象)
QueueSender、TopicPUblisher和QueueReciever、TopicSubscriber。消息的生产者和消费 者,QueueSender的send方法和TopicPublisher的publish方法发送消息到Destination。 QueueReciever和TopicSubscriber直接使用父
接口MessageConsumer中定义的方法receive、 recieveNoWait等方法来接收消息,setMessageListener方法来设置消息
监听器。QueueReciever的 getQueue方法得到Queue的引用,TopicSubscriber的getTopic方法得到Topic的引用。
MessageListener,消息监听器。改接口只有一个方法onMessage(),改方法只有一个Message类型的参数,通过 MessageConsumer(QueueReciever和TopicSubscriber共同的父接口)的setMessageListener方法注册后,系统在收到消息后调用方法。
Queue和Topic,消息Destination。主要的作用就是存储消息。
package com.learn;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.ejb.EJB;
import javax.ejb.Remote;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.InitialContext;
/**
* EventReceiver
*/
@Stateless(mappedName = EventReporterRemote.JNDI_NAME)
@Remote(EventReporterRemote.
class)
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
public class EventHandler implements EventReporterRemote {
private static String destinationName = "/queue/MessageQueue";
private QueueConnection queueConnection = null;
private QueueSession queueSession = null;
private Queue queue = null;
@Override
public void notify(Event event) {
TO to = event.getEventTO();
if (to == null) {
System.out.println("Unable to get data from this event "
+ event.getEventType());
}
Message msg = null;
switch (event.getEventType()) {
case CREATE:
msg = new Message(Message.Type.CREATE, to);
break;
case DELETE:
msg = new Message(Message.Type.DELETE, pto);
break;
default:
System.out.println("Wrong event type from this event "
+ event.getEventType().toString());
}
if (msg != null) {
sendMessage(msg);
}
}
@PostConstruct
private void init() {
initQueue();
}
private void initQueue() {
InitialContext ctx = null;
try {
ctx = new InitialContext();
QueueConnectionFactory cf = (QueueConnectionFactory) ctx
.lookup("java:/ConnectionFactory");
queue = (Queue) ctx.lookup(destinationName);
try {
if (queueConnection != null)
queueConnection.close();
queueConnection = null;
} catch (Exception ex) {
System.out.println("Closing queueConnection error.");
}
queueConnection = cf.createQueueConnection();
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
queueConnection.start();
} catch (Exception e) {
System.out.println("Lookup ConnectionFactory error.", e);
try {
if (queueConnection != null)
queueConnection.close();
} catch (JMSException ignored) {
System.out.println("This Exception should be ignored.");
}
queueConnection = null;
} finally {
if (ctx != null) {
try {
ctx.close();
} catch (Exception e) {
System.out.println("Failed in closing context.", e);
}
}
}
}
@PreDestroy
private void closeConnection() {
try {
if (queueConnection != null) {
queueConnection.close();
}
} catch (JMSException jmse) {
System.out.println("Could not close connection.", jmse);
}
}
private void sendMessage(Message message) {
try {
System.out.println("Send message:" + message.getMessageType());
if (queueConnection == null || queueSession == null) {
initQueue();
}
if (queueConnection != null || queueSession != null) {
final ObjectMessage message = queueSession
.createObjectMessage(message);
QueueSender publisher = queueSession.createSender(queue);
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
publisher.send(message);
} else {
System.out.println("No available message queue.");
}
} catch (Exception ex) {
System.out.println("Failed to send message to queue." + ex);
}
}
}