RabbitMQ 优先级队列
一、概述
在实际应用场景中,我们推送消息,希望给消息设置优先级,比如说京东双 11 活动,它希望将消息优先推送给京东的 vip,而对于非 vip 用户消息推送的优先级就低一些,那么怎么实现呢?
其实很简单,通过优先级队列就可以完美解决上述应用场景
二、原理图
三、编码
1、applicaiton.yml
spring:
rabbitmq:
host: 192.168.59.135
port: 5672
username: admin
password: admin123
publisher-confirm-type: correlated
publisher-returns: true
# 开启 ack
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual #采取手动应答
#concurrency: 1 # 指定最小的消费者数量
#max-concurrency: 1 #指定最大的消费者数量
retry:
enabled: true # 是否支持重试
2、自定义配置类 PriorityConfig
@Configuration
public class PriorityConfig {
private static final String PRIORITY_EXCHANGE = "priority_exchange";
private static final String PRIORITY_QUEUE = "priority_queue";
private static final String PRIORITY_KEY = "priority";
// 声明优先级交换机(type = direct)
@Bean(PRIORITY_EXCHANGE)
public DirectExchange priorityExchange() {
return ExchangeBuilder.directExchange(PRIORITY_EXCHANGE).durable(true).build();
}
// 声明优先级队列
@Bean(PRIORITY_QUEUE)
public Queue priorityQueue() {
/**
* maxPriority(int maxPriority):设置队列支持的最大优先级数量,如果没有设置,则队列将不支持消息优先级
* 官方支持的优先级范围是 0 ~ 255,超过 255 就会发生报错,但是一般企业使用的优先级是 0 ~ 10,如果 maxPriority 设置
* 的太大,会浪费 cpu 和 内存,因为消息是要在队列中排队的,队列长度太大,排序的过程中会损耗性能
*/
return QueueBuilder.durable(PRIORITY_QUEUE).maxPriority(10).build();
}
// 优先级队列绑定优先级交换机
@Bean
public Binding priorityQueueBindingPriorityExchange(@Qualifier(PRIORITY_QUEUE) Queue queue,
@Qualifier(PRIORITY_EXCHANGE) DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(PRIORITY_KEY);
}
}
3、发布确认自定义类 MyConfirmCallback
@Slf4j
@Component
public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
/**
* 交换机确认回调方法
* 1、Producer 发送的消息,交换机确认收到
* correlationData:保存消息回调 ID 及其它相关的信息
* ack:true
* cause:null
*
* 2、Producer 发送的消息,交换机没有收到
* correlationData:保存消息回调 ID 及其它相关的信息
* ack:false
* cause:交换机没有收到消息的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("交换机已经收到 id 为:{}的消息", id);
} else {
log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
}
}
/**
* 如果交换机没有将消息路由到队列,会触发该回调方法
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("消息: {} 被服务器退回--->退回原因: {},交换机是: {},路由key是:{},退回编号是:{}",
new String(returned.getMessage().getBody()), returned.getReplyText(), returned.getExchange(),
returned.getRoutingKey(), returned.getReplyCode());
}
}
4、Producer
@Slf4j
@RestController
public class Producer {
private static final String PRIORITY_EXCHANGE = "priority_exchange";
private static final String PRIORITY_QUEUE = "priority_queue";
private static final String PRIORITY_KEY = "priority";
// 1、依赖注入 rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
// 2、依赖注入 myConfirmCallback
@Autowired
private MyConfirmCallback myConfirmCallback;
// 3、完成了 1、2 的注入之后再设置 rabbitTemplate 的回调对象
@PostConstruct
public void init() {
// 消息成功传递给交换机时会触发 MyConfirmCallback 中的回调方法 confirm()
rabbitTemplate.setConfirmCallback(myConfirmCallback);
// 消息回退时会触发 MyConfirmCallback 中的回调方法 returnedMessage()
rabbitTemplate.setReturnsCallback(myConfirmCallback);
}
@GetMapping("/priority/sendMessage/{msg}")
public void sendMessage(@PathVariable("msg") String msg) {
// 设置唯一 ID
CorrelationData correlationData = new CorrelationData();
correlationData.setId(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg, correlationData);
log.info("发送一条未设置优先级的消息", msg);
String msg1 = msg + 0;
rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg1, (message -> {
message.getMessageProperties().setPriority(0);
return message;
}), correlationData);
log.info("发送一条优先级为 0 的消息", msg1);
String msg2 = msg + 2;
rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg2, (message -> {
message.getMessageProperties().setPriority(2);
return message;
}), correlationData);
log.info("发送一条优先级为 2 的消息", msg2);
String msg3 = msg + 5;
rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg3, (message -> {
message.getMessageProperties().setPriority(5);
return message;
}), correlationData);
log.info("发送一条优先级为 5 的消息", msg3);
String msg4 = msg + 10;
rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg4, (message -> {
message.getMessageProperties().setPriority(10);
return message;
}), correlationData);
log.info("发送一条优先级为 10 的消息", msg4);
}
}
5、Consumer
@Slf4j
@Component
public class Consumer {
private static final String PRIORITY_QUEUE = "priority_queue";
@RabbitListener(queues = {PRIORITY_QUEUE})
public void receivedMessage(Message message, Channel channel, CorrelationData correlationData) throws IOException {
try {
String msg = new String(message.getBody());
log.info("消费者成功接收到消息:{}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.info("消息消费错误");
// 出现异常之后拒绝消息,并且消息重新入队
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
四、测试
要让队列实现优先级需要做的事情如下
1、队列需要设置为优先级队列
2、消息需要设置消息的优先级
3、生产者必须先将消息发送到队列中,让队列对设置了优先级的消息进行排队
4、1、2、3 完成之后再启动消费者进行消费即可
要想实现上述功能,我们先将 Consumer 的 @RabbitListener 注解注释掉,然后启动 Springboot 项目
浏览器发送请求: http://localhost:8080/priority/sendMessage/小毛毛是最可爱的
消息发送完成之后,然后打开 Consumer 的 @RabbitListener 注解,再次启动 Springboot 项目
从消费者的消费结果可以看出,优先级越高的消息越早被消费,如果未设置消息的优先级,那么该默认的优先级看起来是 1