RabbitMQ:四种ExChange用法_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > RabbitMQ:四种ExChange用法

RabbitMQ:四种ExChange用法

 2017/8/22 19:08:46  骑猪逛街666  程序员俱乐部  我要评论(0)
  • 摘要:摘要:RabbitMQ发送消息时,都是先把消息发送给ExChange(交换机),然后再分发给有相应RoutingKey(路由)关系的Queue(队列)。ExChange和Queue之前是多对多的关系。RabbitMQ3.0之后创建ExChange时,有四种类型可选“fanout、direct、topic、headers”。RabbitMQ发送消息时,都是先把消息发送给ExChange(交换机),然后再分发给有相应RoutingKey(路由)关系的Queue(队列)
  • 标签:用法 Exchange

class="blog-summary" style="margin-top: 30px; margin-bottom: 10px; padding: 16px 30px; font-size: 16px; line-height: 32px; color: #333333; background: #f9f9f9; table-layout: fixed; overflow: hidden;">摘要:?RabbitMQ发送消息时,都是先把消息发送给ExChange(交换机),然后再分发给有相应RoutingKey(路由)关系的Queue(队列)。ExChange和Queue之前是多对多的关系。RabbitMQ 3.0之后创建ExChange时,有四种类型可选“fanout、direct、topic、headers”。

RabbitMQ发送消息时,都是先把消息发送给ExChange(交换机),然后再分发给有相应RoutingKey(路由)关系的Queue(队列)。 ExChange和Queue之前是多对多的关系。 RabbitMQ 3.0之后创建ExChange时,有四种类型可选“fanout、direct、topic、headers”。 ? 一、fanout 当向一个fanout发送一个消息时,RoutingKey的设置不起作用。 消息会被发送给同一个交换机下的所有队列,每个队列接收到的消息是一样的; 一个队列内有所有消费者(包含那些并没有相应RoutingKey的消费者),将平分队列接收到的消息 ? ----------------消息生产者---------------- ConnectionFactory factory = new ConnectionFactory(); factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机 factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口 factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名 factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码 ? Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); ? // 声明路由名字和类型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true, false, null); String message = "hello world! "; ? for(int i=0;i<100;i++) { channel.basicPublish(EXCHANGE_NAME, "", null, (message+i).getBytes()); } ? System.out.println("Sent msg finish"); ? channel.close(); connection.close();
----------------消息消费者---------------- ConnectionFactory factory = new ConnectionFactory(); ? factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机 factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口 factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名 factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码 ? Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); ? //声明路由名字和类型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true, false, null); //声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); //绑定路由和队列 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routkey2", null); ? System.out.println(" Waiting for msg...."); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, ?AMQP.BasicProperties properties, byte[] body) { String message = ""; try { message = new String(body, "UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } catch (Throwable ex) { ex.printStackTrace(); } ? System.out.println("Received msg='" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); ? 二、direct 当向一个direct发送一个消息时,消息会被发送给同一个交换机下的拥有相应RoutingKey的队列,每个队列接收到的消息是一样的; 一个队列内拥有相应RoutingKey的消费者,将平分队列接收到的消息。 ? ----------------消息生产者---------------- ConnectionFactory factory = new ConnectionFactory(); ? factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机 factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口 factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名 factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码 ? Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); ? // 声明路由名字和类型 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null); String message = "hello world! "; ? for(int i=0;i<100;i++) { channel.basicPublish(EXCHANGE_NAME, "routingkey1", null, (message+i).getBytes()); } ? System.out.println("Sent msg is '" + message + "'"); ? channel.close(); connection.close();
----------------消息消费者---------------- ConnectionFactory factory = new ConnectionFactory(); ? factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机 factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口 factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名 factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码 ? Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); ? //声明路由名字和类型 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null); //声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); //绑定路由和队列 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingkey1", null); ? System.out.println(" Waiting for msg...."); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { String message = ""; try { message = new String(body, "UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } catch (Throwable ex) { ex.printStackTrace(); } ? System.out.println("1 Received msg='" + message + "'"); } }; ? channel.basicConsume(QUEUE_NAME, true, consumer); ? 三、topic 当向一个topic发送一个消息时消息会被发送给同一个交换机下的拥有相应RoutingKey的队列,每个队列接收到的消息是一样的; 一个队列内有所有消费者(包含那些并没有相应RoutingKey的消费者),将平分队列接收到的消息 ? ----------------消息生产者---------------- ConnectionFactory factory = new ConnectionFactory(); ? factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机 factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口 factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名 factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码 ? Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); ? // 声明路由名字和类型 channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, null); String message = "hello world! "; ? // int i=101; for (int i = 0; i < 100; i++) { channel.basicPublish(EXCHANGE_NAME, "routingkey1", null, (message + i).getBytes()); } ? System.out.println("Sent msg is '" + message + "'"); ? channel.close(); connection.close(); ? ----------------消息消费者---------------- ConnectionFactory factory = new ConnectionFactory(); ? factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机 factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口 factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名 factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码 ? Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); ? // 声明路由名字和类型 channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, null); //声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); //绑定路由和队列// 把队列绑定到路由上并指定headers channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingkey1", null); ? System.out.println("1 Waiting for msg...."); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,?byte[] body) { String message = ""; try { message = new String(body, "UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } catch (Throwable ex) { ex.printStackTrace(); } ? System.out.println("1 Received msg='" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer);

?

发表评论
用户名: 匿名