springBoot + rabbitMQ +手动确认消息 + 控制(接口、定时任务)消费者上下线
这里只贴消费者的部分代码
第一部分:手动ack配置
package com.mybatis.plus.config.mq; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * * 描述: rabbitMQ配置 * * @author 官昌洪 * @date 2021/12/17 11:24 * @version V1.0 */ @Configuration public class MessageListenerConfig { @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } }
第二部分:消费消息
package com.mybatis.plus.config.mq; import com.alibaba.fastjson.JSONObject; import com.mybatis.plus.entity.Log; import com.mybatis.plus.utils.EurekaUtils; import com.mybatis.plus.utils.hash.ConsistentHash; import com.mybatis.plus.utils.hash.pojo.ConsistentHashNode; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.List; import java.util.Map; @Slf4j @Component public class Receiver { @Value("${server.port}") private String port; @Autowired RabbitTemplate rabbitTemplate; @RabbitListener(id = "testDirectQueueId1", autoStartup = "false", queues = "testDirectQueue") public void consumer(Message message, Channel channel) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { Thread.sleep(500); if ("testDirectQueue".equals(message.getMessageProperties().getConsumerQueue())) { String msg = new String(message.getBody(), "UTF-8"); Log parseObject = JSONObject.parseObject(msg, Log.class); log.info("消费的消息来自的队列名为:" + message.getMessageProperties().getConsumerQueue()); log.info("消息成功消费到 messageId:" + parseObject.getLogUuid() + " messageData:" + parseObject.getLogTitle() + " createTime:" + parseObject.getCreateTime()); log.info("================================"); // 收到来自主机的消息 进行一致性hash分配 发往不同的服务 // 获取服务节点 创建一致hash环 ConsistentHash consistentHash = InitConfig.consistentHash; List
第三部分:控制消费者开启,关闭
@Autowired private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry; @RequestMapping("/startCustomer") public R startCustomer(){ MessageListenerContainer consumer = rabbitListenerEndpointRegistry.getListenerContainer("testDirectQueueId1"); consumer.start(); return R.ok(); } @RequestMapping("/stopCustomer") public R stopCustomer(){ MessageListenerContainer consumer = rabbitListenerEndpointRegistry.getListenerContainer("testDirectQueueId1"); consumer.stop(); return R.ok(); }
主要还是指定 RabbitListener 注解的ID属性进行控制