1.准备工作
? 1) 下载安装,启动activemq
? 2) 下载activemq?? jar包导入项目
2.消息生产者
class="java">package com.activemq.demo1; import javax.jms.*; import org.apache.activemq.*; /** * 消息生产者,用于生成并发送消息 */ public class ProducerTool { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = ActiveMQConnection.DEFAULT_BROKER_URL; private String subject = "TOOL.DEFAULT"; private Destination destination = null; private Connection connection = null; private Session session = null; private MessageProducer producer = null; /** * 初始化 * @throws Exception */ private void initialize() throws Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); connection = connectionFactory.createConnection(); /* 创建Session,参数解释: 第一个参数 是否使用事务:当消息发送者向消息提供者(即消息代理)发送消息时,消息发送者等待消息代理的确认, 没有回应则抛出异常,消息发送程序负责处理这个错误。 第二个参数 消息的确认模式: AUTO_ACKNOWLEDGE : 指定消息提供者在每次收到消息时自动发送确认。消息只向目标发送一次, 但传输过程中可能因为错误而丢失消息。 CLIENT_ACKNOWLEDGE : 由消息接收者确认收到消息,通过调用消息的acknowledge()方法 (会通知消息提供者收到了消息) DUPS_OK_ACKNOWLEDGE : 指定消息提供者在消息接收者没有确认发送时重新发送消息 (这种确认模式不在乎接收者收到重复的消息)。*/ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(subject); producer = session.createProducer(destination); //设置是否持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } /** * 发送消息 * @param message * @throws Exception */ public void produceMessage(String message) throws Exception { initialize(); //发送TextMessage,还可发送MapMessage,ObjectMessage,StreamMessage TextMessage msg = session.createTextMessage(message); connection.start(); System.out.println("Producer:-> send start."); producer.send(msg); System.out.println("Producer:-> send complete."); close(); } /** * 关闭连接 * @throws JMSException */ public void close() throws JMSException { System.out.println("Producer:->Closing Connection."); if (producer != null) producer.close(); if (session != null) session.close(); if (connection != null) connection.close(); } }
3.消息消费者
package com.activemq.demo1; import javax.jms.*; import javax.jms.Message; import org.apache.activemq.*; /** * 消息消费者,用于接收消息 */ public class ConsumerTool implements MessageListener { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = ActiveMQConnection.DEFAULT_BROKER_URL; private String subject = "TOOL.DEFAULT"; private Destination destination = null; private Connection connection = null; private Session session = null; private MessageConsumer consumer = null; /** * 初始化 * @throws JMSException * @throws Exception */ private void initialize() throws Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(subject); consumer = session.createConsumer(destination); } /** * 消费消息 * @throws Exception */ public void consumeMessage() throws Exception { initialize(); connection.start(); System.out.println("Consumer:->Begin listening..."); // 开始监听 consumer.setMessageListener(this); } /** * 关闭连接 * @throws JMSException */ public void close() throws JMSException { System.out.println("Consumer:->Closing connection"); if (consumer != null) consumer.close(); if (session != null) session.close(); if (connection != null) connection.close(); } /** * 消息处理函数 */ public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText(); System.out.println("Consumer:->Received textMessage: " + msg); } else { System.out.println("Consumer:->Received: " + message); } close(); } catch (JMSException e) { e.printStackTrace(); } } }
4.测试类
package com.activemq.demo1; import javax.jms.*; public class Test { /** * @param args */ public static void main(String[] args) throws JMSException, Exception { ConsumerTool consumer = new ConsumerTool(); ProducerTool producer = new ProducerTool(); // 开始监听 consumer.consumeMessage(); // 延时500毫秒之后发送消息 Thread.sleep(500); producer.produceMessage("Hello, world!"); } }
?