ActiveMQ实战1:ActiveMQ Java_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > ActiveMQ实战1:ActiveMQ Java

ActiveMQ实战1:ActiveMQ Java

 2015/4/2 16:03:49  _wy_  程序员俱乐部  我要评论(0)
  • 摘要:1.准备工作1)下载安装,启动activemq2)下载activemqjar包导入项目2.消息生产者packagecom.activemq.demo1;importjavax.jms.*;importorg.apache.activemq.*;/***消息生产者,用于生成并发送消息*/publicclassProducerTool{privateStringuser=ActiveMQConnection.DEFAULT_USER
  • 标签:Java

1.准备工作

? 1) 下载安装,启动activemq

? 2) 下载activemq?? jar包导入项目

2.消息生产者

class="java">package com.activemq.demo1;

import javax.jms.*;
import org.apache.activemq.*;
/**
 * 消息生产者,用于生成并发送消息
 */
public class ProducerTool {
	
    private String user = ActiveMQConnection.DEFAULT_USER;    
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;    
    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;    
    private String subject = "TOOL.DEFAULT";    
    private Destination destination = null;    
    private Connection connection = null;    
    private Session session = null;    
    private MessageProducer producer = null;    
   
    /**
     * 初始化    
     * @throws Exception
     */
    private void initialize() throws Exception {    
        ActiveMQConnectionFactory connectionFactory = 
        		new ActiveMQConnectionFactory(user, password, url);    
        connection = connectionFactory.createConnection();
        /* 创建Session,参数解释:            
                            第一个参数     是否使用事务:当消息发送者向消息提供者(即消息代理)发送消息时,消息发送者等待消息代理的确认,
        	没有回应则抛出异常,消息发送程序负责处理这个错误。            
                            第二个参数      消息的确认模式:            
              AUTO_ACKNOWLEDGE : 指定消息提供者在每次收到消息时自动发送确认。消息只向目标发送一次,
        		但传输过程中可能因为错误而丢失消息。           
         	  CLIENT_ACKNOWLEDGE : 由消息接收者确认收到消息,通过调用消息的acknowledge()方法
         	  	(会通知消息提供者收到了消息)            
              DUPS_OK_ACKNOWLEDGE : 指定消息提供者在消息接收者没有确认发送时重新发送消息
              	(这种确认模式不在乎接收者收到重复的消息)。*/
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);    
        destination = session.createQueue(subject);    
        producer = session.createProducer(destination);  
        //设置是否持久化
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);    
    }    
   
    /**
     * 发送消息    
     * @param message
     * @throws Exception
     */
    public void produceMessage(String message) throws Exception {    
        initialize(); 
        //发送TextMessage,还可发送MapMessage,ObjectMessage,StreamMessage
        TextMessage msg = session.createTextMessage(message);
        connection.start(); 
        System.out.println("Producer:-> send start.");
        producer.send(msg); 
        System.out.println("Producer:-> send complete.");
        close();
    }    
   
    /**
     * 关闭连接    
     * @throws JMSException
     */
    public void close() throws JMSException { 
    	System.out.println("Producer:->Closing Connection.");
        if (producer != null)    
            producer.close();    
        if (session != null)    
            session.close();    
        if (connection != null)    
            connection.close();    
    }    
}    

3.消息消费者

package com.activemq.demo1;

import javax.jms.*;
import javax.jms.Message;

import org.apache.activemq.*;
/**
 * 消息消费者,用于接收消息
 */
public class ConsumerTool implements MessageListener {  
	
    private String user = ActiveMQConnection.DEFAULT_USER;    
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;    
    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;    
    private String subject = "TOOL.DEFAULT";    
    private Destination destination = null;    
    private Connection connection = null;    
    private Session session = null;    
    private MessageConsumer consumer = null;    
   
    /**
     * 初始化    
     * @throws JMSException
     * @throws Exception
     */
    private void initialize() throws Exception {    
        ActiveMQConnectionFactory connectionFactory = 
        		new ActiveMQConnectionFactory(user, password, url);    
        connection = connectionFactory.createConnection();    
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);    
        destination = session.createQueue(subject);    
        consumer = session.createConsumer(destination);    
    }    
   
    /**
     * 消费消息    
     * @throws Exception
     */
    public void consumeMessage() throws Exception {    
        initialize();    
        connection.start();    
        System.out.println("Consumer:->Begin listening...");    
        // 开始监听    
        consumer.setMessageListener(this); 
    }    
   
    /**
     * 关闭连接    
     * @throws JMSException
     */
    public void close() throws JMSException {    
        System.out.println("Consumer:->Closing connection");    
        if (consumer != null)    
            consumer.close();    
        if (session != null)    
            session.close();    
        if (connection != null)    
            connection.close();    
    }    
   
    /**
     *  消息处理函数    
     */
    public void onMessage(Message message) { 
        try {    
            if (message instanceof TextMessage) {    
                TextMessage txtMsg = (TextMessage) message;    
                String msg = txtMsg.getText();    
                System.out.println("Consumer:->Received textMessage: " + msg);    
            } else {    
                System.out.println("Consumer:->Received: " + message);    
            } 
            close();
        } catch (JMSException e) {    
            e.printStackTrace();    
        }    
    }

}  

4.测试类

package com.activemq.demo1;

import javax.jms.*;

public class Test {    
   
    /**   
     * @param args   
     */   
    public static void main(String[] args) throws JMSException, Exception {    
        ConsumerTool consumer = new ConsumerTool();    
        ProducerTool producer = new ProducerTool();    
        // 开始监听    
        consumer.consumeMessage();    
            
        // 延时500毫秒之后发送消息    
        Thread.sleep(500);    
        producer.produceMessage("Hello, world!");    
            
    }    
}    

?

发表评论
用户名: 匿名