rabbitmq学习4:Routing_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > rabbitmq学习4:Routing

rabbitmq学习4:Routing

 2013/12/10 12:26:08  yimeng528  程序员俱乐部  我要评论(0)
  • 摘要:在《rabbitmq学习3:Publish/Subscribe》中已经学习了发送一个消息,所有消费者端都能收到。那现在这节准备介绍通过路由规则来接受生产者端所发送的消费。Routing的工作示意图如下:对于Routing的示意图与Publish/Subscribe中的示意图区别:第一:Publish/Subscribe的Exchange的类型为“fanout”,而Routing的类型为“direct”第二:Publish/Subscribe的路由为默认的,而Routing的路由是自定义的
  • 标签:学习

? 在《rabbitmq学习3:Publish/Subscribe?》中已经学习了发送一个消息,所有消费者端都能收到。那现在这节准备介绍通过路由规则来接受生产者端所发送的消费。Routing的工作示意图如下:


对于Routing的示意图与Publish/Subscribe中的示意图区别:

第一:Publish/Subscribe的Exchange的类型为“fanout”,而Routing的类型为“direct”

第二:Publish/Subscribe的路由为默认的,而Routing的路由是自定义的。

可能从上图的示意图如可以发现可以把Routing的模式也可以转化Publish/Subscribe的模式,如示意图


我们也可能把所有的数据发送到一个Queue中去,示意图如下:


下面我们就开始程序吧。

P端的程序如下:

Java代码??收藏代码class="star" src="/Upload/Images/2013121012/40B102E0EF997EA6.png">
  1. package?com.abin.rabbitmq;??
  2. ??
  3. import?com.rabbitmq.client.Channel;??
  4. import?com.rabbitmq.client.Connection;??
  5. import?com.rabbitmq.client.ConnectionFactory;??
  6. ??
  7. public?class?EmitLogDirect?{??
  8. ????private?static?final?String?EXCHANGE_NAME?=?"direct_logs";??
  9. ??
  10. ????public?static?void?main(String[]?argv)?throws?Exception?{??
  11. ??
  12. ????????ConnectionFactory?factory?=?new?ConnectionFactory();??
  13. ????????factory.setHost("localhost");??
  14. ????????Connection?connection?=?factory.newConnection();??
  15. ????????Channel?channel?=?connection.createChannel();??
  16. ??
  17. ????????channel.exchangeDeclare(EXCHANGE_NAME,?"direct");//rounting模式??
  18. ??
  19. ????????String?routingKeyOne?=?"error";//定义一个路由名为“error”??
  20. ????????for?(int?i?=?0;?i?<=?1;?i++)?{??
  21. ????????????String?messageOne?=?"this?is?a?error?logs:"?+?i;??
  22. ????????????channel.basicPublish(EXCHANGE_NAME,?routingKeyOne,?null,?messageOne??
  23. ????????????????????.getBytes());??
  24. ????????????System.out.println("?[x]?Sent?'"?+?routingKeyOne?+?"':'"?+?messageOne??
  25. ????????????????????+?"'");??
  26. ????????}??
  27. ??
  28. ????????System.out.println("################################");??
  29. ????????String?routingKeyTwo?=?"info";??
  30. ????????for?(int?i?=?0;?i?<=?2;?i++)?{??
  31. ????????????String?messageTwo?=?"this?is?a?info?logs:"?+?i;??
  32. ????????????channel.basicPublish(EXCHANGE_NAME,?routingKeyTwo,?null,?messageTwo??
  33. ????????????????????.getBytes());??
  34. ????????????System.out.println("?[x]?Sent?'"?+?routingKeyTwo?+?"':'"?+?messageTwo??
  35. ????????????????????+?"'");??
  36. ????????}??
  37. ??
  38. ????????System.out.println("################################");??
  39. ????????String?routingKeyThree?=?"all";??
  40. ????????for?(int?i?=?0;?i?<=?3;?i++)?{??
  41. ????????????String?messageThree?=?"this?is?a?all?logs:"?+?i;??
  42. ????????????channel.basicPublish(EXCHANGE_NAME,?routingKeyThree,?null,??
  43. ????????????????????messageThree.getBytes());??
  44. ????????????System.out.println("?[x]?Sent?'"?+?routingKeyThree?+?"':'"??
  45. ????????????????????+?messageThree?+?"'");??
  46. ????????}??
  47. ??
  48. ????????channel.close();??
  49. ????????connection.close();??
  50. ????}??
  51. }??

?

运行结果可能如下:

Java代码??收藏代码
  1. ?[x]?Sent?'error':'this?is?a?error?logs:0'??
  2. ?[x]?Sent?'error':'this?is?a?error?logs:1'??
  3. ################################??
  4. ?[x]?Sent?'info':'this?is?a?info?logs:0'??
  5. ?[x]?Sent?'info':'this?is?a?info?logs:1'??
  6. ?[x]?Sent?'info':'this?is?a?info?logs:2'??
  7. ################################??
  8. ?[x]?Sent?'all':'this?is?a?all?logs:0'??
  9. ?[x]?Sent?'all':'this?is?a?all?logs:1'??
  10. ?[x]?Sent?'all':'this?is?a?all?logs:2'??
  11. ?[x]?Sent?'all':'this?is?a?all?logs:3'??

?

C端的代码如下:

Java代码??收藏代码
  1. package?com.abin.rabbitmq;??
  2. ??
  3. import?com.rabbitmq.client.Channel;??
  4. import?com.rabbitmq.client.Connection;??
  5. import?com.rabbitmq.client.ConnectionFactory;??
  6. import?com.rabbitmq.client.QueueingConsumer;??
  7. ??
  8. public?class?ReceiveLogsDirect?{??
  9. ????private?static?final?String?EXCHANGE_NAME?=?"direct_logs";//定义Exchange名称??
  10. ??
  11. ????public?static?void?main(String[]?argv)?throws?Exception?{??
  12. ??
  13. ????????ConnectionFactory?factory?=?new?ConnectionFactory();??
  14. ????????factory.setHost("localhost");??
  15. ????????Connection?connection?=?factory.newConnection();??
  16. ????????Channel?channel?=?connection.createChannel();??
  17. ??
  18. ????????channel.exchangeDeclare(EXCHANGE_NAME,?"direct");//声明Exchange??
  19. ??
  20. ????????String?queueName?=?"queue_logs1";//定义队列名为“queue_logs1”的Queue??
  21. ????????channel.queueDeclare(queueName,?false,?false,?false,?null);??
  22. ????????String?routingKeyOne?=?"error";//"error"路由规则??
  23. ????????channel.queueBind(queueName,?EXCHANGE_NAME,?routingKeyOne);//把Queue、Exchange及路由绑定??
  24. ????????String?routingKeyTwo?=?"info";??
  25. ????????channel.queueBind(queueName,?EXCHANGE_NAME,?routingKeyTwo);??
  26. ??
  27. ????????System.out.println("?[*]?Waiting?for?messages.");??
  28. ??
  29. ????????QueueingConsumer?consumer?=?new?QueueingConsumer(channel);??
  30. ????????channel.basicConsume(queueName,?true,?consumer);??
  31. ??
  32. ????????while?(true)?{??
  33. ????????????QueueingConsumer.Delivery?delivery?=?consumer.nextDelivery();??
  34. ????????????String?message?=?new?String(delivery.getBody());??
  35. ????????????String?routingKey?=?delivery.getEnvelope().getRoutingKey();??
  36. ??
  37. ????????????System.out.println("?[x]?Received?'"?+?routingKey?+?"':'"?+?message??
  38. ????????????????????+?"'");??
  39. ????????}??
  40. ????}??
  41. }??

?这里我做了二个消费端程序来模仿通过路由规则来分配信息给各个消费端。第二个消费者端的程序只是修改了一小部分代码;只接受路由为”error“和”all“规则的消费。

运行程序1的结果如下:

Java代码??收藏代码
  1. [*]?Waiting?for?messages.??
  2. ?[x]?Received?'error':'this?is?a?error?logs:0'??
  3. ?[x]?Received?'error':'this?is?a?error?logs:1'??
  4. ?[x]?Received?'info':'this?is?a?info?logs:0'??
  5. ?[x]?Received?'info':'this?is?a?info?logs:1'??
  6. ?[x]?Received?'info':'this?is?a?info?logs:2'??

?运行程序2的运行结果如下:

Java代码??收藏代码
  1. [*]?Waiting?for?messages.??
  2. [x]?Received?'error':'this?is?a?error?logs:0'??
  3. [x]?Received?'error':'this?is?a?error?logs:1'??
  4. [x]?Received?'all':'this?is?a?all?logs:0'??
  5. [x]?Received?'all':'this?is?a?all?logs:1'??
  6. [x]?Received?'all':'this?is?a?all?logs:2'??
  7. [x]?Received?'all':'this?is?a?all?logs:3'??

?

?

?

  • 大小: 11.7 KB
  • 大小: 9.6 KB
  • 大小: 10.7 KB
发表评论
用户名: 匿名