? 在《rabbitmq学习3:Publish/Subscribe?》中已经学习了发送一个消息,所有消费者端都能收到。那现在这节准备介绍通过路由规则来接受生产者端所发送的消费。Routing的工作示意图如下:
对于Routing的示意图与Publish/Subscribe中的示意图区别:
第一:Publish/Subscribe的Exchange的类型为“fanout”,而Routing的类型为“direct”
第二:Publish/Subscribe的路由为默认的,而Routing的路由是自定义的。
可能从上图的示意图如可以发现可以把Routing的模式也可以转化Publish/Subscribe的模式,如示意图
我们也可能把所有的数据发送到一个Queue中去,示意图如下:
下面我们就开始程序吧。
P端的程序如下:
Java代码??
class="star" src="/Upload/Images/2013121012/40B102E0EF997EA6.png">
- package?com.abin.rabbitmq;??
- ??
- import?com.rabbitmq.client.Channel;??
- import?com.rabbitmq.client.Connection;??
- import?com.rabbitmq.client.ConnectionFactory;??
- ??
- public?class?EmitLogDirect?{??
- ????private?static?final?String?EXCHANGE_NAME?=?"direct_logs";??
- ??
- ????public?static?void?main(String[]?argv)?throws?Exception?{??
- ??
- ????????ConnectionFactory?factory?=?new?ConnectionFactory();??
- ????????factory.setHost("localhost");??
- ????????Connection?connection?=?factory.newConnection();??
- ????????Channel?channel?=?connection.createChannel();??
- ??
- ????????channel.exchangeDeclare(EXCHANGE_NAME,?"direct");??
- ??
- ????????String?routingKeyOne?=?"error";??
- ????????for?(int?i?=?0;?i?<=?1;?i++)?{??
- ????????????String?messageOne?=?"this?is?a?error?logs:"?+?i;??
- ????????????channel.basicPublish(EXCHANGE_NAME,?routingKeyOne,?null,?messageOne??
- ????????????????????.getBytes());??
- ????????????System.out.println("?[x]?Sent?'"?+?routingKeyOne?+?"':'"?+?messageOne??
- ????????????????????+?"'");??
- ????????}??
- ??
- ????????System.out.println("################################");??
- ????????String?routingKeyTwo?=?"info";??
- ????????for?(int?i?=?0;?i?<=?2;?i++)?{??
- ????????????String?messageTwo?=?"this?is?a?info?logs:"?+?i;??
- ????????????channel.basicPublish(EXCHANGE_NAME,?routingKeyTwo,?null,?messageTwo??
- ????????????????????.getBytes());??
- ????????????System.out.println("?[x]?Sent?'"?+?routingKeyTwo?+?"':'"?+?messageTwo??
- ????????????????????+?"'");??
- ????????}??
- ??
- ????????System.out.println("################################");??
- ????????String?routingKeyThree?=?"all";??
- ????????for?(int?i?=?0;?i?<=?3;?i++)?{??
- ????????????String?messageThree?=?"this?is?a?all?logs:"?+?i;??
- ????????????channel.basicPublish(EXCHANGE_NAME,?routingKeyThree,?null,??
- ????????????????????messageThree.getBytes());??
- ????????????System.out.println("?[x]?Sent?'"?+?routingKeyThree?+?"':'"??
- ????????????????????+?messageThree?+?"'");??
- ????????}??
- ??
- ????????channel.close();??
- ????????connection.close();??
- ????}??
- }??
?
运行结果可能如下:
Java代码??
- ?[x]?Sent?'error':'this?is?a?error?logs:0'??
- ?[x]?Sent?'error':'this?is?a?error?logs:1'??
- ################################??
- ?[x]?Sent?'info':'this?is?a?info?logs:0'??
- ?[x]?Sent?'info':'this?is?a?info?logs:1'??
- ?[x]?Sent?'info':'this?is?a?info?logs:2'??
- ################################??
- ?[x]?Sent?'all':'this?is?a?all?logs:0'??
- ?[x]?Sent?'all':'this?is?a?all?logs:1'??
- ?[x]?Sent?'all':'this?is?a?all?logs:2'??
- ?[x]?Sent?'all':'this?is?a?all?logs:3'??
?
C端的代码如下:
Java代码??
- package?com.abin.rabbitmq;??
- ??
- import?com.rabbitmq.client.Channel;??
- import?com.rabbitmq.client.Connection;??
- import?com.rabbitmq.client.ConnectionFactory;??
- import?com.rabbitmq.client.QueueingConsumer;??
- ??
- public?class?ReceiveLogsDirect?{??
- ????private?static?final?String?EXCHANGE_NAME?=?"direct_logs";??
- ??
- ????public?static?void?main(String[]?argv)?throws?Exception?{??
- ??
- ????????ConnectionFactory?factory?=?new?ConnectionFactory();??
- ????????factory.setHost("localhost");??
- ????????Connection?connection?=?factory.newConnection();??
- ????????Channel?channel?=?connection.createChannel();??
- ??
- ????????channel.exchangeDeclare(EXCHANGE_NAME,?"direct");??
- ??
- ????????String?queueName?=?"queue_logs1";??
- ????????channel.queueDeclare(queueName,?false,?false,?false,?null);??
- ????????String?routingKeyOne?=?"error";??
- ????????channel.queueBind(queueName,?EXCHANGE_NAME,?routingKeyOne);??
- ????????String?routingKeyTwo?=?"info";??
- ????????channel.queueBind(queueName,?EXCHANGE_NAME,?routingKeyTwo);??
- ??
- ????????System.out.println("?[*]?Waiting?for?messages.");??
- ??
- ????????QueueingConsumer?consumer?=?new?QueueingConsumer(channel);??
- ????????channel.basicConsume(queueName,?true,?consumer);??
- ??
- ????????while?(true)?{??
- ????????????QueueingConsumer.Delivery?delivery?=?consumer.nextDelivery();??
- ????????????String?message?=?new?String(delivery.getBody());??
- ????????????String?routingKey?=?delivery.getEnvelope().getRoutingKey();??
- ??
- ????????????System.out.println("?[x]?Received?'"?+?routingKey?+?"':'"?+?message??
- ????????????????????+?"'");??
- ????????}??
- ????}??
- }??
?这里我做了二个消费端程序来模仿通过路由规则来分配信息给各个消费端。第二个消费者端的程序只是修改了一小部分代码;只接受路由为”error“和”all“规则的消费。
运行程序1的结果如下:
Java代码??
- [*]?Waiting?for?messages.??
- ?[x]?Received?'error':'this?is?a?error?logs:0'??
- ?[x]?Received?'error':'this?is?a?error?logs:1'??
- ?[x]?Received?'info':'this?is?a?info?logs:0'??
- ?[x]?Received?'info':'this?is?a?info?logs:1'??
- ?[x]?Received?'info':'this?is?a?info?logs:2'??
?运行程序2的运行结果如下:
Java代码??
- [*]?Waiting?for?messages.??
- [x]?Received?'error':'this?is?a?error?logs:0'??
- [x]?Received?'error':'this?is?a?error?logs:1'??
- [x]?Received?'all':'this?is?a?all?logs:0'??
- [x]?Received?'all':'this?is?a?all?logs:1'??
- [x]?Received?'all':'this?is?a?all?logs:2'??
- [x]?Received?'all':'this?is?a?all?logs:3'??
?
?
?
- 大小: 11.7 KB
- 大小: 9.6 KB
- 大小: 10.7 KB