前面介绍的几篇实际都是使用direct 交换机。 使用direct交换机发布的队列往往都涉及路由key选择或绑定(也可以直接监听某队列)的问题。下面我们介绍rabbitmq高级消息队列中另一种也是较为常见交换机 fanout ,这种类型的交换机是广播交换机,类似于IP 网段中的广播地址,只要是发往这个子网广播地址的消息都会被处于这个子网的客户端接收。由于采用的广播队列,因而生产者和消费者都不再关心routingkey, 他们只关心queue和exchange 即可。 (direct 消息队列中生产者往往关心 exchange 和routingkey ,消费者往往关心queue和routingkey,而fanout消息队列中它们均不关心routingkey )
日志生产者
package com.tingcream.www.test.rabbitmq.fanout; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * fanout 广播消息, * 无需指定routingkey ,全部广播出去 */ public class LogSender { private static final String EXCHANGE_NAME = "exchange-logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory=new ConnectionFactory(); factory.setHost("192.168.9.130"); //指定用户 密码 factory.setUsername("zhangsan"); factory.setPassword("123456"); Connection connection=factory.newConnection(); Channel channel=connection.createChannel(); //fanout表示扇出,所有的消费者得到同样的队列信息 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); for (int i=0;i<10;i++){ String message="你好 World "+i; channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes()); System.out.println("LogSender Send :" + message ); } channel.close(); connection.close(); } }
消费者A
package com.tingcream.www.test.rabbitmq.fanout; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; /** * 消费者A * fanout 交换机 * @author jelly */ public class LogReceiverA { private static final String EXCHANGE_NAME = "exchange-logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.9.130"); //指定用户 密码 factory.setUsername("lisi"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //产生一个随机的队列名称 String queue = channel.queueDeclare().getQueue(); // channel.exchangeBind(destination, source, routingKey, arguments) // channel.queueBind(queue, exchange, routingKey, arguments) channel.queueBind(queue, EXCHANGE_NAME, "");//客户端绑定的是队列对应的交互机,而不是路由key System.out.println("LogReceiverA Waiting for messages"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queue, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println( "LogReceiverA接收到消息:" + message ); //发送应答 ack应答 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
消费者B
package com.tingcream.www.test.rabbitmq.fanout; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; /** * 日志消费者B * @author jelly */ public class LogReceiverB { private static final String EXCHANGE_NAME = "exchange-logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.9.130"); //指定用户 密码 factory.setUsername("lisi"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //产生一个随机的队列名称 queue 名称由服务器端自动生成的 String queue = channel.queueDeclare().getQueue(); // channel.exchangeBind(destination, source, routingKey, arguments) // channel.queueBind(queue, exchange, routingKey, arguments) channel.queueBind(queue, EXCHANGE_NAME, "");//客户端绑定的是队列对应的交互机,而不是路由key System.out.println("LogReceiverB Waiting for messages"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queue, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println( "LogReceiverB接收到消息:" + message ); //发送应答 ack应答 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
分别启动消费者A、消费者B 和生产者 ,查看控制台输出
生产者
LogSender Send :你好 World 0
LogSender Send :你好 World 1
LogSender Send :你好 World 2
LogSender Send :你好 World 3
LogSender Send :你好 World 4
LogSender Send :你好 World 5
LogSender Send :你好 World 6
LogSender Send :你好 World 7
LogSender Send :你好 World 8
LogSender Send :你好 World 9
消费者A
LogReceiverA Waiting for messages
LogReceiverA接收到消息:你好 World 0
LogReceiverA接收到消息:你好 World 1
LogReceiverA接收到消息:你好 World 2
LogReceiverA接收到消息:你好 World 3
LogReceiverA接收到消息:你好 World 4
LogReceiverA接收到消息:你好 World 5
LogReceiverA接收到消息:你好 World 6
LogReceiverA接收到消息:你好 World 7
LogReceiverA接收到消息:你好 World 8
LogReceiverA接收到消息:你好 World 9
消费者B
LogReceiverB Waiting for messages
LogReceiverB接收到消息:你好 World 0
LogReceiverB接收到消息:你好 World 1
LogReceiverB接收到消息:你好 World 2
LogReceiverB接收到消息:你好 World 3
LogReceiverB接收到消息:你好 World 4
LogReceiverB接收到消息:你好 World 5
LogReceiverB接收到消息:你好 World 6
LogReceiverB接收到消息:你好 World 7
LogReceiverB接收到消息:你好 World 8
LogReceiverB接收到消息:你好 World 9
进入rabbitmq管理网页管理控制台 。
fanout 交换机
刚刚产生的2个消费者queue
Copyright © 叮叮声的奶酪 版权所有
备案号:鄂ICP备17018671号-1