博客详情

rabbitmq高级消息队列-fanout广播队列 (原创)

作者: 朝如青丝暮成雪
发布时间:2017-09-24 10:03:55  文章分类:rabbitMQ   阅读(2393)  评论(0)

前面介绍的几篇实际都是使用direct 交换机。 使用direct交换机发布的队列往往都涉及路由key选择或绑定(也可以直接监听某队列)的问题。下面我们介绍rabbitmq高级消息队列中另一种也是较为常见交换机 fanout ,这种类型的交换机是广播交换机,类似于IP 网段中的广播地址,只要是发往这个子网广播地址的消息都会被处于这个子网的客户端接收。由于采用的广播队列,因而生产者和消费者都不再关心routingkey, 他们只关心queue和exchange 即可。 (direct 消息队列中生产者往往关心 exchange 和routingkey ,消费者往往关心queue和routingkey,而fanout消息队列中它们均不关心routingkey )

日志生产者

package com.tingcream.www.test.rabbitmq.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * fanout  广播消息,
 * 无需指定routingkey  ,全部广播出去

 */
public class LogSender {

	 private static final String EXCHANGE_NAME = "exchange-logs";
	  
	    public static void main(String[] args) throws Exception {
	        ConnectionFactory factory=new ConnectionFactory();

	        factory.setHost("192.168.9.130");    
	        //指定用户 密码  
	        factory.setUsername("zhangsan");  
	        factory.setPassword("123456"); 
	        
	        Connection connection=factory.newConnection();
	        Channel channel=connection.createChannel();
//fanout表示扇出,所有的消费者得到同样的队列信息
	        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

	        
	        for (int i=0;i<10;i++){
	            String message="你好 World "+i;
	             channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
	             System.out.println("LogSender Send :" + message );
	        }
	        channel.close();
	        connection.close();
	    }
}


消费者A


package com.tingcream.www.test.rabbitmq.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 *  消费者A
 * fanout  交换机
 * @author jelly
 */
public class LogReceiverA {
	 private static final String EXCHANGE_NAME = "exchange-logs";

    public static void main(String[] args) throws  Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.9.130");    
        //指定用户 密码  
        factory.setUsername("lisi");  
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        //产生一个随机的队列名称
        String queue = channel.queueDeclare().getQueue();
        // channel.exchangeBind(destination, source, routingKey, arguments)
        //  channel.queueBind(queue, exchange, routingKey, arguments)
        channel.queueBind(queue, EXCHANGE_NAME, "");//客户端绑定的是队列对应的交互机,而不是路由key

System.out.println("LogReceiverA Waiting for messages");
       QueueingConsumer consumer = new QueueingConsumer(channel); 
       channel.basicConsume(queue, false, consumer);
        while (true)    {    
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();    
            String message = new String(delivery.getBody());    
            System.out.println( "LogReceiverA接收到消息:" + message );    
            //发送应答     ack应答
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);    
        }    
        }
}



消费者B


package com.tingcream.www.test.rabbitmq.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * 日志消费者B
 * @author jelly
 */
public class LogReceiverB {
	 private static final String EXCHANGE_NAME = "exchange-logs";

    public static void main(String[] args) throws  Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.9.130");    
        //指定用户 密码  
        factory.setUsername("lisi");  
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        //产生一个随机的队列名称  queue 名称由服务器端自动生成的
        String queue = channel.queueDeclare().getQueue();
        // channel.exchangeBind(destination, source, routingKey, arguments)
        //  channel.queueBind(queue, exchange, routingKey, arguments)
        channel.queueBind(queue, EXCHANGE_NAME, "");//客户端绑定的是队列对应的交互机,而不是路由key
     
        System.out.println("LogReceiverB Waiting for messages");
        
        QueueingConsumer consumer = new QueueingConsumer(channel);  
        channel.basicConsume(queue, false, consumer);
        while (true)    {    
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();    
               
            String message = new String(delivery.getBody());    
    
            System.out.println( "LogReceiverB接收到消息:" + message );    
           
            //发送应答     ack应答
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);    
         }    
      }
}


分别启动消费者A、消费者B 和生产者 ,查看控制台输出

生产者

LogSender Send :你好 World 0
LogSender Send :你好 World 1
LogSender Send :你好 World 2
LogSender Send :你好 World 3
LogSender Send :你好 World 4
LogSender Send :你好 World 5
LogSender Send :你好 World 6
LogSender Send :你好 World 7
LogSender Send :你好 World 8
LogSender Send :你好 World 9

消费者A

LogReceiverA Waiting for messages
LogReceiverA接收到消息:你好 World 0
LogReceiverA接收到消息:你好 World 1
LogReceiverA接收到消息:你好 World 2
LogReceiverA接收到消息:你好 World 3
LogReceiverA接收到消息:你好 World 4
LogReceiverA接收到消息:你好 World 5
LogReceiverA接收到消息:你好 World 6
LogReceiverA接收到消息:你好 World 7
LogReceiverA接收到消息:你好 World 8
LogReceiverA接收到消息:你好 World 9

消费者B

LogReceiverB Waiting for messages
LogReceiverB接收到消息:你好 World 0
LogReceiverB接收到消息:你好 World 1
LogReceiverB接收到消息:你好 World 2
LogReceiverB接收到消息:你好 World 3
LogReceiverB接收到消息:你好 World 4
LogReceiverB接收到消息:你好 World 5
LogReceiverB接收到消息:你好 World 6
LogReceiverB接收到消息:你好 World 7
LogReceiverB接收到消息:你好 World 8
LogReceiverB接收到消息:你好 World 9


进入rabbitmq管理网页管理控制台 。

fanout 交换机



刚刚产生的2个消费者queue



关键字:  rabbitmq
评论信息
暂无评论
发表评论

亲,您还没有登陆,暂不能评论哦! 去 登陆 | 注册

博主信息
   
数据加载中,请稍候...
文章分类
   
数据加载中,请稍候...
阅读排行
 
数据加载中,请稍候...
评论排行
 
数据加载中,请稍候...

Copyright © 叮叮声的奶酪 版权所有
备案号:鄂ICP备17018671号-1

鄂公网安备 42011102000739号