博客详情

rabbitmq高级消息队列-direct task (原创)

作者: 朝如青丝暮成雪
发布时间:2017-09-24 09:12:47  文章分类:rabbitMQ   阅读(944)  评论(0)

当要处理的任务非常繁忙时,单个机器的处理效率较低。

rabbitmq可以将 消息 (任务),发布给多个消费者。注意,这里exchange(称为交换机或路由器,exchange是rabbitmq中非常重要的概念,后面会详细介绍)类型仍然是direct,这说明同一时刻仍只有一个消费者在消费队列。

如有消费者有多个,同时监听了同一个队列。则消息队列会在多个消费者中轮询派发。


生产者,产生消息(任务)


package com.tingcream.www.test.rabbitmq.directTask; import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import com.rabbitmq.client.MessageProperties;  

/**
 * direct task 
 * 分布任务给多个消费端处理
 * 
 * @author jelly
 * @date 2017年9月23日 下午9:32:59
 */
public class NewTask {
    private static final String TASK_QUEUE_NAME="direct_task_queue";
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("192.168.9.130");    
        //指定用户 密码  
        factory.setUsername("zhangsan");  
        factory.setPassword("123456");  
     
        Connection connection=factory.newConnection();
        Channel channel=connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
        //分发信息
        for (int i=0;i<10;i++){ String message="Hello RabbitMQ"+i; channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); System.out.println("NewTask send : "+message); } channel.close(); connection.close(); } }



消费者A


package com.tingcream.www.test.rabbitmq.directTask;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 消费者A
 */
public class WorkA {
    private static final String TASK_QUEUE_NAME = "direct_task_queue";

    public static void main(String[] args) throws Exception {
        final ConnectionFactory factory = new ConnectionFactory();
        
        factory.setHost("192.168.9.130");    
        //指定用户 密码  
        factory.setUsername("lisi");  
        factory.setPassword("123456"); 
        
        
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
         //queue 持久化为true  持久到服务器本地磁盘中
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println("WorkerA  Waiting for messages");

        //每次从队列获取的数量
        channel.basicQos(1);
    
        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("WorkerA  Received :" + message );
                try {
                    
                    doWork(message);
                }catch (Exception e){
                    channel.abort();
                }finally {
                    System.out.println("WorkerA Done");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        boolean autoAck=false;
        
        
        //消息消费完成确认   不自动 ack
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
    }
    private static void doWork(String task) {
        try {
            Thread.sleep(2000); // 暂停2秒钟
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }
}



消费者B


package com.tingcream.www.test.rabbitmq.directTask;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 消费者B
 */
public class WorkB {
    private static final String TASK_QUEUE_NAME = "direct_task_queue";

    public static void main(String[] args) throws Exception {
        final ConnectionFactory factory = new ConnectionFactory();
        
        factory.setHost("192.168.9.130");    
        //指定用户 密码  
        factory.setUsername("lisi");  
        factory.setPassword("123456"); 
        
        
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
         //queue 持久化为true  持久到服务器本地磁盘中
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println("WorkerB  Waiting for messages");

        //每次从队列获取的数量
        channel.basicQos(1);

        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("WorkerB  Received :" + message );
                try {
                    
                    doWork(message);
                }catch (Exception e){
                    channel.abort();
                }finally {
                    System.out.println("WorkerB Done");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        boolean autoAck=false;
        
        
        //消息消费完成确认     不自动 ack
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
    }
    private static void doWork(String task) {
        try {
            Thread.sleep(2000); // 暂停2秒钟
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }
}



依次启动消费者A、消费者B 和生产者,查看控制台输出如下。

生产者:

NewTask send : Hello RabbitMQ0
NewTask send : Hello RabbitMQ1
NewTask send : Hello RabbitMQ2
NewTask send : Hello RabbitMQ3
NewTask send : Hello RabbitMQ4
NewTask send : Hello RabbitMQ5
NewTask send : Hello RabbitMQ6
NewTask send : Hello RabbitMQ7
NewTask send : Hello RabbitMQ8
NewTask send : Hello RabbitMQ9



消费者A:

WorkerA  Received :Hello RabbitMQ1
WorkerA Done
WorkerA  Received :Hello RabbitMQ3
WorkerA Done
WorkerA  Received :Hello RabbitMQ5
WorkerA Done
WorkerA  Received :Hello RabbitMQ7
WorkerA Done
WorkerA  Received :Hello RabbitMQ9
WorkerA Done


消费者B:


WorkerB  Received :Hello RabbitMQ0
WorkerB Done
WorkerB  Received :Hello RabbitMQ2
WorkerB Done
WorkerB  Received :Hello RabbitMQ4
WorkerB Done
WorkerB  Received :Hello RabbitMQ6
WorkerB Done
WorkerB  Received :Hello RabbitMQ8
WorkerB Done







关键字:  rabbitmq
评论信息
暂无评论
发表评论

亲,您还没有登陆,暂不能评论哦! 去 登陆 | 注册

博主信息
   
数据加载中,请稍候...
文章分类
   
数据加载中,请稍候...
阅读排行
 
数据加载中,请稍候...
评论排行
 
数据加载中,请稍候...

Copyright © 叮叮声的奶酪 版权所有
备案号:鄂ICP备17018671号-1

鄂公网安备 42011102000739号