rabbitmq入门-工作队列_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > rabbitmq入门-工作队列

rabbitmq入门-工作队列

 2013/11/2 15:19:01  yugouai  程序员俱乐部  我要评论(0)
  • 摘要:工作队列:TaskQueues,为了避免等待一些占用大量资源、时间的操作。[size=13px;line-height:18px;]当我们把[/size]任务(Task)[size=13px;line-height:18px;]当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。[/size]消费者1输出[*]Waitingformessages
  • 标签:工作 队列


工作队列:Task Queues,为了避免等待一些占用大量资源、时间的操作。[size=13px; line-height: 18px;]当我们把[/size]任务(Task)[size=13px; line-height: 18px;]当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。[/size]





消费者1输出
class="java">
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'hi hi. hi.. hi...1'
 [x] Done
 [x] Received 'hi hi. hi.. hi...3'
 [x] Done
 [x] Received 'hi hi. hi.. hi...5'
 [x] Done
 [x] Received 'hi hi. hi.. hi...7'
 [x] Done
 [x] Received 'hi hi. hi.. hi...9'
 [x] Done

消费者2输出


  • Waiting for messages. To exit press CTRL+C
  • [x] Received 'hi hi. hi.. hi...0'
    [x] Done
    [x] Received 'hi hi. hi.. hi...2'
    [x] Done
    [x] Received 'hi hi. hi.. hi...4'
    [x] Done
    [x] Received 'hi hi. hi.. hi...6'
    [x] Done
    [x] Received 'hi hi. hi.. hi...8'
    [x] Done
    默认来说,RabbitMQ会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询(round-robin)。试着添加三个或更多得工作者(workers)。




    public class MultiMQClient {
        private final static String QUEUE_NAME="hello";
        public static void main( String[] args ) throws IOException
        {
           ConnectionFactory factory = new ConnectionFactory();
           factory.setHost("localhost");
           Connection connection = factory.newConnection();
           Channel channel = connection.createChannel();
           channel.queueDeclare(QUEUE_NAME, false, false, false, null);
           args = new String[]{"hi","hi.","hi..","hi..."};
           for (int i = 0; i
    消费者代码


    public class MultiMQServer {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println("
  • Waiting for messages. To exit press CTRL+C");
  •     boolean autoAck = false;
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);

        while (true) {
          QueueingConsumer.Delivery delivery = consumer.nextDelivery();
          String message = new String(delivery.getBody());
                 //确认消息已经收到
          channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
          System.out.println(" [x] Received '" + message + "'");
          doWork(message);
          System.out.println(" [x] Done");
        }

    }
    private static void doWork(String message) throws InterruptedException {
    for (char ch : message.toCharArray()) {
    if (ch == '.') {
    Thread.sleep(1000);
    }
    }
    }
    }


    消息响应

    我们不想丢失任何任务消息。如果一个工作者(worker)挂掉了,我们希望任务会重新发送给其他的工作者(worker)。

    为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会释放并删除这条消息。

    如果消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被完全处理,然后重新发送给其他消费者(consumer)。这样,及时工作者(workers)偶尔的挂掉,也不会丢失消息。

    消息是没有超时这个概念的;当工作者与它断开连的时候,RabbitMQ会重新发送消息。这样在处理一个耗时非常长的消息任务的时候就不会出问题了。

    消息响应默认是开启的。之前的例子中我们可以使用no_ack=True标识把它关闭。是时候移除这个标识了,当工作者(worker)完成了任务,就发送一个响应。

    设置消息响应

    boolean autoAck = false;
    channel.basicConsume(QUEUE_NAME, autoAck, consumer);


    运行两个消费者进程,运行生产者,关闭其中一个消费者进程,输出如下:
  • Waiting for messages. To exit press CTRL+C
  • Waiting for messages. To exit press CTRL+C
  • [x] Received 'hi hi. hi.. hi...0'
    [x] Done
    [x] Received 'hi hi. hi.. hi...2'
    运行到此,kill掉进程

  • Waiting for messages. To exit press CTRL+C
  • [x] Received 'hi hi. hi.. hi...1'
    [x] Done
    [x] Received 'hi hi. hi.. hi...3'
    [x] Done
    [x] Received 'hi hi. hi.. hi...5'
    [x] Done
    [x] Received 'hi hi. hi.. hi...7'
    [x] Done
    [x] Received 'hi hi. hi.. hi...9'
    [x] Done
    [x] Received 'hi hi. hi.. hi...0'
    [x] Done
    [x] Received 'hi hi. hi.. hi...2'
    [x] Done
    [x] Received 'hi hi. hi.. hi...4'
    [x] Done
    [x] Received 'hi hi. hi.. hi...6'
    [x] Done
    [x] Received 'hi hi. hi.. hi...8'
    [x] Done

      [size=13px; line-height: 18px;]运行上面的代码,我们发现即使使用CTRL+C杀掉了一个工作者(worker)进程,消息也不会丢失。当工作者(worker)挂掉这后,所有没有响应的消息都会重新发送。[/size]

    消息持久化

    那么在它退出或者崩溃的时候,它将会流失所有的队列和消息。为了确保信息不会丢失,有两个事情是需要注意的:我们必须把“队列”和“消息”设为持久化。

    boolean durable = true;
    channel.queueDeclare("hello", durable, false, false, null);
    尽管这行代码本身是正确的,但是仍然不会正确运行。因为我们已经定义过一个叫hello的非持久化队列。RabbitMq不允许你使用不同的参数重新定义一个队列,它会返回一个错误。但我们现在使用一个快捷的解决方法——用不同的名字,例如task_queue
    boolean durable = true;
    channel.queueDeclare("task_queue", durable, false, false, null);
    这个queue_declare必须在生产者(producer)和消费者(consumer)对应的代码中修改。

    channel.basicPublish("", "task_queue",
    MessageProperties.PERSISTENT_TEXT_PLAIN,
    message.getBytes());
    注意:



    运行以上代码,rabbitmq没有按照我们期望的那样进行分发。比如有两个工作者(workers),处理奇数消息的比较繁忙,处理偶数消息的比较轻松。然而RabbitMQ并不知道这些,它仍然一如既往的派发消息。



    int prefetchCount = 1;
    channel.basicQos(prefetchCount);
    使用消息响应和prefetch_count你就可以搭建起一个工作队列了。这些持久化的选项使得在RabbitMQ重启之后仍然能够恢复。
    发表评论
    用户名: 匿名