MQ学习4
Spring AMQP
Spring AMQP 项目将Spring的核心概念应用于基于AMQP的消息传递解决方案的开发。
0 准备
-
application.yml 配置rabbitmq
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: yhy password: 123456 virtual-host: /lisi server: port: 8090
server port 防止producer和consumer端口冲突
不要先启动消费者
队列等都是在生产者绑定
所以在发送时才会创建
1.生产者代码
1.1 RabbitMQConfig @Configuration
-
XXXExchange(直接指定交换机类型) 方法 返回带名字的交换机对象
@Bean public TopicExchange topicExchange(){ return new TopicExchange("topic_exchange"); }
-
Queue 方法 返回带名字的交换机对象
@Bean public Queue queue(){ return new Queue("topic_queue"); }
-
binding 绑定队列到交换机上,并给出路由键(可使用通配符)
@Bean public Binding binding(){ return BindingBuilder.bind(queue()).to(topicExchange()) .with("*.orange.#"); }
1.2 Sender
@Component
public class Sender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(){
rabbitTemplate.convertAndSend("topic_exchange",
"a.orange","hello");
}
}
-
注入RabbitTemplate
-
调用RabbitTemplate 的 convertAndSend方法
- 参数1:交换机名称
- 参数2:路由键名称
- 参数3:发送的信息
1.3 SenderTest
@SpringBootTest
public class SenderTest {
@Autowired
private Sender sender;
@Test
public void testSend(){
sender.send();
}
}
1.4 ProducerApplication
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
2.消费者代码
2.1 UserInit
//监听指定队列
@RabbitListener(queues = "topic_queue")
@Component
public class UserInit {
//获取到消息之后的处理方法
@RabbitHandler
public void recv(String message){
System.out.println(message);
}
}
2.2 ConsumerApplication
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
3. 结果
-
运行生产者测试
- 管控界面中队列界面出现队列,且队列为持久化,阻塞了一条消息
- 点开可以看到队列绑定的交换机,绑定的路由键
-
运行消费者(运行的是 ConsumerApplication)
- 得到生产的消息
- 管控界面阻塞的消息被消费