MQ学习4


Spring AMQP

Spring AMQP 项目将Spring的核心概念应用于基于AMQP的消息传递解决方案的开发。

0 准备

  1. application.yml 配置rabbitmq

    spring:
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: yhy
        password: 123456
        virtual-host: /lisi
    server:
      port: 8090
    

    server port 防止producer和consumer端口冲突

不要先启动消费者

队列等都是在生产者绑定

所以在发送时才会创建

1.生产者代码

1.1 RabbitMQConfig @Configuration

  1. XXXExchange(直接指定交换机类型) 方法 返回带名字的交换机对象

    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("topic_exchange");
    }
    
  2. Queue 方法 返回带名字的交换机对象

    @Bean
    public Queue queue(){
        return new Queue("topic_queue");
    }
    
  3. binding 绑定队列到交换机上,并给出路由键(可使用通配符)

    @Bean
    public Binding binding(){
        return BindingBuilder.bind(queue()).to(topicExchange())
                .with("*.orange.#");
    }
    

1.2 Sender

@Component
public class Sender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void  send(){
        rabbitTemplate.convertAndSend("topic_exchange",
                "a.orange","hello");
    }
}
  • 注入RabbitTemplate

  • 调用RabbitTemplate 的 convertAndSend方法

    • 参数1:交换机名称
    • 参数2:路由键名称
    • 参数3:发送的信息

1.3 SenderTest

@SpringBootTest
public class SenderTest {

    @Autowired
    private  Sender sender;

    @Test
    public void  testSend(){
        sender.send();
    }
}

1.4 ProducerApplication

@SpringBootApplication
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }
}

2.消费者代码

2.1 UserInit

	//监听指定队列
    @RabbitListener(queues = "topic_queue")
    @Component
    public class UserInit  {

        //获取到消息之后的处理方法
        @RabbitHandler
    public void recv(String message){
        System.out.println(message);
    }

}

2.2 ConsumerApplication

@SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

3. 结果

  1. 运行生产者测试

    • 管控界面中队列界面出现队列,且队列为持久化,阻塞了一条消息
    • 点开可以看到队列绑定的交换机,绑定的路由键
  2. 运行消费者(运行的是 ConsumerApplication)

    • 得到生产的消息
    • 管控界面阻塞的消息被消费
MQ