MQ学习2
RabbitMQ2--队列
- producer--生产者--什么都不做,专门用来发送。一个项目里,发送消息的就是生产者
- queue--队列--消息经过RabbitMQ和你的应用,被储存在队列里。队列接近于无限大的缓存,取决于主机的内存和磁盘的大小。许多生产者可以向一个队列发送消息,许多消费者可以从一个队列获取消息。
- consum--消费者--consuming意味着接收。一个消费者在一个项目中大部分时间都是等待去接收消息
0 相关知识
0.1 try-with-resourse
- 传统的try-catch-finally 必须有finally把连接关掉
- try-with-resourse 不需要手动的关闭连接
- 因为Connection、Channel 继承了Closeable接口,而Closeable接口继承了 AutoCloseable接口,可以自动关闭
- 只有 (间接) 继承了AutoCloseable接口才能自动关闭
0.2 排他队列
1. 排他队列基于第一次创建它得连接可见--本连接创建一个排他队列其他连接是看不到这个队列的,但是本连接不同信道间是可见的
2. 其他的连接不能创建同名的排他队列,但是可以创建同名的普通队列
3. 排他队列的连接关闭或客户端退出时,这个排他队列会被自动删除,即使设置了持久化,仍然会被删除
4. 所有的交换机产生的队列都是排他队列
5. 这种队列适用于只限于一个客户端发送读取消息的应用场景
0.3 RPC
远程过程调用,A计算机通过网络调用B计算机的服务
远程过程调用最大的特点:同步
RPC一般是:客户端调用服务器,服务器提供服务
1 简单消息队列
java中两个项目:
1. 生产者--发送单一的消息
2. 队列
3. 消费者--接收消息并打印出来
1.1生产者代码
发消息
-
定义队列名
private final static String QUEUE_NAME = "hello";
-
创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
- 设置主机
factory.setHost("localhost");
-
通过连接工厂创建连接
Connection connection = factory.newConnection()
-
通过连接创建信道
Channel channel = connection.createChannel()
- 通过信道绑定队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- 参数1:队列名称queue
- 参数2:持久化 durable
- 参数3:排他队列 exclusive
- 参数4:自动删除 autoDelete
- 参数5:参数 arguments
- 准备消息
String message = "Hello World!";
- 发送消息(基础推送)
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
- 参数1:exchange
- 参数2:队列名
- 参数3:props
- 参数4:消息
-
控制台输出消息
System.out.println(" [x] Sent '" + message + "'");
创建的信道Channel是用于完成工作的大多数API所在的位置(Channel是集成了大多数API的地方)
注意:我们可以使用try-with-resources语句,因为Connection和Channerl都实现了java.io.Closeable。这样,我们无需在代码中显示关闭它们(3.0.4没实现,5.6.0实现了)
1.2 消费者代码
收消息
-
定义队列名
private final static String QUEUE_NAME="hello";
发消息的队列名与收消息的队列名一致
-
连接工厂
-
连接设置(主机、端口、用户名、密码、虚拟主机)
-
创建连接
-
创建信道
-
绑定队列(这样才能在同一个队列里收到消息)
-
接收消息的回调(获取消息、打印消息)
delivery.getBody() -- 消息体
-
基础消费
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
- 参数1:队列名
- 参数2:自动确认 autoAck (当消费者收到消息之后,发给rabbitmq服务器一个确认,服务器确认收到消息)
- 参数3:
- 参数4:
1.3 其他
生产者发送完消息立即关闭,在管控界面Connections Channels里无法看到
在Queues中可以看到
1. Name队列名 BIliBili
2. Ready 1 准备一条消息
3. Total 1 一共有一条消息
点进去可以看到有一条消息一致在阻塞
在消费者消费后,可以收到消息
管控界面发生相应变化(Ready 0 Total 0,消息被消费)
1.4 缺点
- 生产者与消费者处理速度产生差异,如生产者传递消息快于消费者消费消息,则消息堆积在队列
- 虽然队列接近于无限大的缓存,但是是基于主机的内存和磁盘大小。
- 当堆积过多,会导致RabbitMQ无法使用
2 工作队列
不再由单一消费者去消费,而是多消费者消费,弥补单一消费者处理速度不足
消息发送没有变化,消费者消费方式有变化
2.1 平均分配(轮询)
消费者轮询方式消费(a一条,b一条)
每个消费者产生一个连接,一个信道(在管控界面可以看到)
2.1.1生产者代码
更改队列名
将准备消息和发送消息代码放入for循环,多次执行
//发送20条消息
for (int i = 1; i <21 ; i++) {
//准备消息
String message ="WorkWork!"+i;
//发送消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
其余无变化
2.1.2消费者代码
新建消费者02类
其余无变化
2.1.3 平均分配/轮询优缺点
- 优点:解决了简单消息队列中,消费者消费速度赶不上生产者生产速度的问题
- 缺点:由于不同消费者的消费速率是不一样的,所以
- 消费的时间为消费者中消费最慢的消费者所花费的时间
- 消费快的消费者很多时间是处于空闲状态,这是对资源的浪费
- 解决方法:能者多劳/公平模式
2.2 能者多劳/公平模式
代码区别
- 区别1
//限制消费1条消息,消费完再继续消费下一条消息
int prefetchCount=1;
channel.basicQos(prefetchCount);
表示每次消费多少条
告诉rabbitmq,这些条消费完了,再去接收新的消息
- 区别2
//消费消息不是自动确认 autoAck为false
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
- 区别3
//手动确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
在 接收消息的回调(获取消息、打印消息) 中,进行手动确认
- 参数1:回调的消息的唯一标识
- 参数2:是否确认多条(当前只确认一条,所以false)
生产者代码
无区别
消费者代码
-
用线程阻塞模仿消费速率区别
在接收消息的回调中加入
Thread.sleep(3000);
- 结果阻塞时间短的消费者已经结束,而阻塞时间长的消费者还在消费
- 消费者的消费时间以阻塞时间长的消费者花费时间为准(以消费速率低的为准)
-
增加如上区别1.2.3.
- 结果:消费速率快的消费者消费条数远超过消费效率低的
3 发布/订阅模式
实现所有消费者收到同样的消息(工作队列无法实现)
3.1 交换机
- RabbitMQ消息传递模型的核心思想是生产者从不将任何消息直接发送到队列,实际上,生产者甚至根本不知道是否将消息传递到任何队列
- 生产者只能将消息发送到交换机
- 交换机一方面接收来自生产者的消息,一方面将消息推入队列
- 交换机必须确切知道如何处理消息:是否将消息附加到特定队列?是否将其附加到许多队列?还是丢弃它?规则由交换类型定义
- 交换机自动生成的队列全部是排他队列,名字是随机的,需要手动绑定队列
交换类型
1. direct(直连)
2. topic
3. headers(基本不用,基于消息的头部消息进行匹配,效率低)
4. fanout:广播,消息发给所有队列
生产者--交换机--多个队列--多个消费者
3.2 代码
3.2.1 生产者
-
定义交换机名字
private final static String EXCHANGE_NAME = "exchange_fanout";
-
绑定/声明交换机,不再是队列
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
- 参数1:交换机名称
- 参数2:交换机类型
-
发送的时候绑定交换机名字,不再是队列名字
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
3.2.2 消费者
-
定义交换机名字
private final static String EXCHANGE_NAME = "exchange_fanout";
-
声明交换机,而不是声明队列;指定交换机类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
-
通过队列声明获取队列名称(获取到的是排他队列)
String queueName = channel.queueDeclare().getQueue();
-
用交换机名称和队列名称--绑定交换机和队列(让交换机知道它的消息要发给哪些队列)
channel.queueBind(queueName, EXCHANGE_NAME, "");
- 参数1:队列名
- 参数2:交换机名
- 参数3:
-
消费者接收时依然绑定的是队列(消费者永远是和队列绑定在一起的,消费者监听的是队列而不是交换机)
3.2.3 结果
管控界面可以看到:
- 生成两个名称随机的队列
- 队列特征中显示:auto-delete:true 自动删除;exclusive:true 排他队列
- 默认的交换机都是持久化的 durable:true(自己写的也可以在exchangeDeclare声明时设置持久化)
- 点开新声明的交换机会看到交换机已经绑定新生成的两个队列(发给交换机的消息会发给这两个队列)
生产者发送消息后可以看到:
- 两个消费者接收到完全相同的消息
为什么交换机生成的是排他队列?
- 交换机可以绑定无数个队列
- 如果都是普通队列,则关闭连接时需要手动逐个关闭队列;如果不关闭队列,则队列会留在RabbitMQ中,长此以往会浪费过多的磁盘空间
- 如果是排他队列,则连接关闭,排他队列自动删除,不需要手动逐个关闭
- 所以交换机生成的都是排他队列
4 路由队列
举例:会员制:有些视频会员才能看,有些视频普通用户能看
生产者生产的消息中,有些消息是所有消费者都能消费,有些消息由指定消费者消费
本质还是使用交换机,但是使用的是交换机的 direct(直连)模式而不是fanout(广播)模式
生产消息时携带路由键(routing key),交换机通过路由键向特定队列发送消息
向所有用户发送消息则需要所有队列都绑定了某路由键
队列可以绑定多个路由键
4.1 生产者代码
-
声明交换机时,指定模式为direct
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
-
发布消息时,指定消息的路由键
channel.basicPublish(EXCHANGE_NAME, "level1", null, message.getBytes(StandardCharsets.UTF_8));
4.2 消费者代码
-
声明交换机时,指定模式为direct
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
-
绑定队列和交换机时,绑定路由键(此句可以写多次,绑定多个路由键)
channel.queueBind(queueName, EXCHANGE_NAME, "level1"); channel.queueBind(queueName, EXCHANGE_NAME, "level2");
4.3 结果
- 管控界面队列页面中出现两个随机名称的队列
- 点开队列可以看到队列绑定的交换机和队列绑定的路由键
- 交换机页面中可以看到新创建的交换机,模式为direct(直连)
- 点开交换机可以看到交换机绑定的路由键
5 简单队列补充
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
- 参数1:所调用方法中,第一个参数为交换机名称
- 参数2:所调用方法中,第二个参数为路由键,但是传入的是队列名称
- 管控界面可以看到默认交换机后有速度显示,表示默认交换机被使用,使用它的就是简单队列
综上:
- 当交换机名称为空时,会默认使用(AMQP default)默认交换机
- AMQP default交换机为direct直连类型
- 直连交换机需要路由键,简单队列以队列名作为路由键名称
- 当作为路由键的队列名称找不到时,默认这条消息被丢弃
- 简单消息队列本质上使用的是直连交换机,消息被发送到默认交换机 AMQP default
- 所以,简单消息队列也符合如下描述
1. RabbitMQ消息传递模型的核心思想是:生产者从不将任何消息直接发送到队列。
2. 实际上,生产者甚至根本不知道是否将消息传递到任何队列生产者只能将消息发送到交换机
3. 生产者只能将消息发送到交换机
7 主题队列
路由队列缺点:路由需要指定,所以使用越久,路由越来越多,难以管理。
主题队列:新增两个通配符
- 通配符 * :匹配精确的一个单词
- 通配符 # :匹配0个或者多个单词
路由键使用符号 . 进行分割,搭配通配符使用
- 注意通配符只有在交换机和路由键绑定时才可以使用
- 发送消息时,路由键必须是完整的,不可以使用通配符
- 匹配到多个,仍然只发一次
- 一个都匹配不到,默认消息会被丢弃
7.1 生产者代码
-
声明交换机时,交换机的类型为 topic
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
-
准备多条消息
String message1 = "Hello World! All Users"; String message2 = "Hello World! Admin"; String message3 = "Hello World! User Level 3 ";
-
准备多个路由键
String routingKey1 = "userL1.userL3.admin"; String routingKey2 = "admin"; String routingKey3 = "userL3.admin";
-
发送消息
channel.basicPublish(EXCHANGE_NAME, routingKey1, null, message1.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME, routingKey2, null, message2.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME, routingKey3, null, message3.getBytes(StandardCharsets.UTF_8));
7.2 消费者代码
-
声明交换机时,交换机的类型为 topic
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
-
绑定交换机和队列时,绑定路由键(使用通配符)(此语句可以有多条)
channel.queueBind(queueName, EXCHANGE_NAME, "#.admin");
7.3 结果
管控界面可以看到:
- 交换机页面可以看到绑定的带通配符的路由键
- 队列页面可以看到,没有消息被阻塞,即,没有匹配的消息默认会被丢弃
注意:
- 发送消息时必须是确定的路由键,不可以使用通配符
- 接收消息时可以使用通配符
8 RPC队列
RPC远程过程调用是同步的
RabbitMQ是异步的
客户端调用服务器,服务器提供服务
客户端和服务器同时既是生产者又是消费者:
- 客户端作为生产者发送消息到队列,服务端作为消费者监听队列,从队列得到消息,进行相应的服务处理
- 服务端作为生产者将处理后的服务结果发送到消息队列,客户端作为消费者从消息队列取得结果
客户端可能有多个,向同一个服务端发送同样的消息
- 为了识别客户端,请求服务端时需要有correlation_id,相当于消息的唯一标识
- 客户端通过correlation_id获取服务结果
需要使用斐波那契数列
使用较少
代码略
注意要先启动服务端,有服务端才能有客户端调用