rabbitmq是一款优秀的,基于AMQP(高级消息队列协议)的实现 ,企业级的消息中间件,同时也支持mqtt协议(需要启用插件) 。
AMQP: Advanced Message Queuing Protocol 高级消息队列协议 ,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang实现有RabbitMQ等。
MQTT: Message Queuing Telemetry Transport,消息队列遥测传输,是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和制动器(比如通过Twitter让房屋联网)的通信协议。
其他消息中间件产品 ,有 Kafka、 alibaba RocketMQ 已捐献给apache开源软件基金会 、apache activeMQ 、apollo 等。
开始学习rabbitmq 前,我们先启动rabbitmq 服务器。
rabbitmq AMQP helloword 例子。
消息生产者
package com.tingcream.www.test.rabbitmq.helloworld; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * rabbitmq * AMQP 消息生产者 * * direct */ public class Producer { public final static String QUEUE_NAME="rabbitMQ-helloworld"; public static void main(String[] args) throws Exception { /** * 创建连接连接到MabbitMQ */ ConnectionFactory factory = new ConnectionFactory(); //设置MabbitMQ所在主机ip或者主机名 factory.setHost("192.168.9.130"); //指定用户 密码 factory.setUsername("zhangsan"); factory.setPassword("123456"); //指定端口 factory.setPort(AMQP.PROTOCOL.PORT); //创建一个连接 Connection connection = factory.newConnection(); //创建一个频道 Channel channel = connection.createChannel(); //指定一个队列 /* queueDeclare 第一个参数:队列名称、 第二个参数: 是否持久化(true表示是,队列将在服务器重启时生存)、 第三个参数:是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、 第四个参数: 当所有消费者客户端连接断开时是否自动删除队列、 第五个参数:队列的其他参数 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); //发送的消息 String message = "你好,rabbitmq"; //往队列中发出一条消息 /* * basicPublish * 第一个参数: 交换机名称 默认交换机名称 amq.direct * 第二个参数:队列映射的路由key、 * 第三个参数:消息的其他属性 * 第四个参数:发送信息的主体 message.getBytes() * */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("发送消息:" + message); //关闭频道和连接 channel.close(); connection.close(); } }
消息消费者
package com.tingcream.www.test.rabbitmq.helloworld; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; /** * rabbitmq * AMQP 消息消费者 */ public class Customer { //消息队列名称 private final static String QUEUE_NAME="rabbitMQ-helloworld"; public static void main(String[] args) throws Exception { //打开连接和创建频道,与发送端一样 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.9.130"); //指定用户 密码 factory.setUsername("lisi"); factory.setPassword("123456"); //指定端口 factory.setPort(AMQP.PROTOCOL.PORT); //创建一个连接 Connection connection = factory.newConnection(); //创建一个频道 Channel channel = connection.createChannel(); //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //创建队列消费者 QueueingConsumer consumer = new QueueingConsumer(channel); //指定消费队列 channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法) QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("接受到消息:" + message ); } } }
分别启动消费者和生产者,看看控制台的输出。
Copyright © 叮叮声的奶酪 版权所有
备案号:鄂ICP备17018671号-1