RabbitMQ的高级特性(二)-----消费端限流
- 应用场景:
- 代码示例:
- pom依赖和rabbitmq.properties与spring集成rabbitmq相同
- 在spring-rabbitmq-consumer.xml 需要做如下配置:(设置消费者监听器为手动确认模式,并且配置参数prefetch表示消费端每次从mq拉取多少条消息。直到手动确认消费完毕,才会继续拉取)
- 限流消费端-没有手动确认代码:
/**
* Consumer 限流机制
* 1. 确保消息被确认,不确认是不会处理其他消息。
* 2. listener-container配置属性
* prefetch = 1,表示消费端每次从mq拉去一条消息来消费,直到确认消费完毕后,才会继续拉取下一条消息。
* 3. 手动自动都可以实现完全消费,但是只有消息确认消费完毕,才可以拉取下一条。
* 4. 但是在限流策略中,必须设置为手动确认。因为自动确认其实就跟平常的消费没有区别了。
*/
public class QosListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//1.获取消息
System.out.println(new String(message.getBody()));
}
}
- producer生产消息代码:
@RunWith(SpringRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
@Test
public void testSend() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message confirm");
}
}
}
- 因为上面消费端没有手动确认,所以会出现 1条消息待确认,9条消息未消费的情况。
- 消费端限流--手动确认代码:
- 消费端限流小结:
- 在 rabbit:listener-container中配置 prefetch 属性设置消费端一次拉取多少条消息
- 消费端的确认模式一定为手动确认。acknowledge="manual",并且手动确认一定要调用channel.basicAck();方法。
- 实现接口一定是实现ChannelAwareMessageListener接口。