RabbitMQ的高级特性(二)-----消费端限流


  1. 应用场景:
  2. 代码示例:
    • pom依赖和rabbitmq.properties与spring集成rabbitmq相同
    • 在spring-rabbitmq-consumer.xml 需要做如下配置:(设置消费者监听器为手动确认模式,并且配置参数prefetch表示消费端每次从mq拉取多少条消息。直到手动确认消费完毕,才会继续拉取)
    • 限流消费端-没有手动确认代码:
      /**
       * Consumer 限流机制
       *  1. 确保消息被确认,不确认是不会处理其他消息。
       *  2. listener-container配置属性
       *      prefetch = 1,表示消费端每次从mq拉去一条消息来消费,直到确认消费完毕后,才会继续拉取下一条消息。
       *  3. 手动自动都可以实现完全消费,但是只有消息确认消费完毕,才可以拉取下一条。
       *  4. 但是在限流策略中,必须设置为手动确认。因为自动确认其实就跟平常的消费没有区别了。
       */
      public class QosListener implements ChannelAwareMessageListener {
      	@Override
      	public void onMessage(Message message, Channel channel) throws Exception {
      		//1.获取消息
      		System.out.println(new String(message.getBody()));
      	}
      }
      
    • producer生产消息代码:
      @RunWith(SpringRunner.class)
      @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
      public class ProducerTest {
      
      	@Test
      	public void testSend() {
      		for (int i = 0; i < 10; i++) {
      			rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message confirm");
      		}
      	}
      }
      
    • 因为上面消费端没有手动确认,所以会出现 1条消息待确认,9条消息未消费的情况。
    • 消费端限流--手动确认代码:
  3. 消费端限流小结:
    • 在 rabbit:listener-container中配置 prefetch 属性设置消费端一次拉取多少条消息
    • 消费端的确认模式一定为手动确认。acknowledge="manual",并且手动确认一定要调用channel.basicAck();方法。
    • 实现接口一定是实现ChannelAwareMessageListener接口。

相关