当要处理的任务非常繁忙时,单个机器的处理效率较低。
rabbitmq可以将 消息 (任务),发布给多个消费者。注意,这里exchange(称为交换机或路由器,exchange是rabbitmq中非常重要的概念,后面会详细介绍)类型仍然是direct,这说明同一时刻仍只有一个消费者在消费队列。
如有消费者有多个,同时监听了同一个队列。则消息队列会在多个消费者中轮询派发。
生产者,产生消息(任务)
package com.tingcream.www.test.rabbitmq.directTask; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; /** * direct task * 分布任务给多个消费端处理 * * @author jelly * @date 2017年9月23日 下午9:32:59 */ public class NewTask { private static final String TASK_QUEUE_NAME="direct_task_queue"; public static void main(String[] args) throws Exception{ ConnectionFactory factory=new ConnectionFactory(); factory.setHost("192.168.9.130"); //指定用户 密码 factory.setUsername("zhangsan"); factory.setPassword("123456"); Connection connection=factory.newConnection(); Channel channel=connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null); //分发信息 for (int i=0;i<10;i++){ String message="Hello RabbitMQ"+i; channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); System.out.println("NewTask send : "+message); } channel.close(); connection.close(); } }
消费者A
package com.tingcream.www.test.rabbitmq.directTask; import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; /** * 消费者A */ public class WorkA { private static final String TASK_QUEUE_NAME = "direct_task_queue"; public static void main(String[] args) throws Exception { final ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.9.130"); //指定用户 密码 factory.setUsername("lisi"); factory.setPassword("123456"); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); //queue 持久化为true 持久到服务器本地磁盘中 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println("WorkerA Waiting for messages"); //每次从队列获取的数量 channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("WorkerA Received :" + message ); try { doWork(message); }catch (Exception e){ channel.abort(); }finally { System.out.println("WorkerA Done"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; boolean autoAck=false; //消息消费完成确认 不自动 ack channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); } private static void doWork(String task) { try { Thread.sleep(2000); // 暂停2秒钟 } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } }
消费者B
package com.tingcream.www.test.rabbitmq.directTask; import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; /** * 消费者B */ public class WorkB { private static final String TASK_QUEUE_NAME = "direct_task_queue"; public static void main(String[] args) throws Exception { final ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.9.130"); //指定用户 密码 factory.setUsername("lisi"); factory.setPassword("123456"); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); //queue 持久化为true 持久到服务器本地磁盘中 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println("WorkerB Waiting for messages"); //每次从队列获取的数量 channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("WorkerB Received :" + message ); try { doWork(message); }catch (Exception e){ channel.abort(); }finally { System.out.println("WorkerB Done"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; boolean autoAck=false; //消息消费完成确认 不自动 ack channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); } private static void doWork(String task) { try { Thread.sleep(2000); // 暂停2秒钟 } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } }
依次启动消费者A、消费者B 和生产者,查看控制台输出如下。
生产者:
NewTask send : Hello RabbitMQ0
NewTask send : Hello RabbitMQ1
NewTask send : Hello RabbitMQ2
NewTask send : Hello RabbitMQ3
NewTask send : Hello RabbitMQ4
NewTask send : Hello RabbitMQ5
NewTask send : Hello RabbitMQ6
NewTask send : Hello RabbitMQ7
NewTask send : Hello RabbitMQ8
NewTask send : Hello RabbitMQ9
消费者A:
WorkerA Received :Hello RabbitMQ1
WorkerA Done
WorkerA Received :Hello RabbitMQ3
WorkerA Done
WorkerA Received :Hello RabbitMQ5
WorkerA Done
WorkerA Received :Hello RabbitMQ7
WorkerA Done
WorkerA Received :Hello RabbitMQ9
WorkerA Done
WorkerB Received :Hello RabbitMQ0
WorkerB Done
WorkerB Received :Hello RabbitMQ2
WorkerB Done
WorkerB Received :Hello RabbitMQ4
WorkerB Done
WorkerB Received :Hello RabbitMQ6
WorkerB Done
WorkerB Received :Hello RabbitMQ8
WorkerB Done
Copyright © 叮叮声的奶酪 版权所有
备案号:鄂ICP备17018671号-1