rabbitmq的几种模式(springboot整合rabbitmq)
注: 代码demo
一、简单模式(单生产者单消费者):将消息直接发送到队列中,供消费者消费
1.创建队列
2.代码实现
1)添加依赖
org.springframework.boot
spring-boot-starter-amqp
2)消息生产者
@Autowired
private RabbitTemplate rabbitTemplate;
private final String queue = "testqueue";
// 发送字符串
public void sendMessage() {
rabbitTemplate.convertAndSend(queue, "this is a test message");
}
// 发送对象(将user转为json串的形式发送)
@Override
public void sendMessage(UserTest user) {
rabbitTemplate.convertAndSend(queue, JSONObject.toJSONString(user));
}
3)消息消费者
@Component
public class TestQueueListener {
// 消费字符串
@RabbitListener(queues = "testqueue")
public void listen(String message) {
System.err.println("this is provider1"+ message);
}
// 消费对象
@RabbitListener(queues = "testqueue")
public void listen(Message msg) {
String s = new String(msg.getBody());
UserTest userTest = JSONObject.parseObject(s, UserTest.class);
System.err.println("this is listener2" + userTest);
}
}
二、工作模式(单个生产者,多个消费者):多个消费者争抢资源,谁先抢到谁处理消息。
1.在上一个“简单模式”的基础上,再添加一个监听同一个队列的方法
@Component
public class Provider2Listener {
/**
* work模式
* @param msg
*/
@RabbitListener(queues = "testqueue")
private void listener2(Message msg) {
String s = new String(msg.getBody());
UserTest userTest = JSONObject.parseObject(s, UserTest.class);
System.err.println("this is listener 2 接收到的消息为:" + userTest);
}
}
结果
1)消息发送者
2)消息消费者1
3)消息消费者2
三、发布、订阅模式(Publish/Subscribe)(消息发送到交换机,再由交换机发送到消息队列):生产者端发送消息,多个消费者同时接收所有的消息。
1.创建交换机
2.创建队列
1)创建testqueuqe1、testqueuqe2两个队列
2)将交换机与队列进行绑定(下图是创建testqueuqe1绑定过程、testqueuqe2绑定也是如此,不再演示)
2.代码实现
2.1)消息发送者(将消息发送到交换机)
private final String exchange = "testexchange";
public void sendExchangeMessage(UserTest user) {
rabbitTemplate.convertAndSend(exchange, "", JSONObject.toJSONString(user));
}
2.2)消息消费者
@RabbitListener(queues = "testqueuqe2")
public void testqueue2Listen(Message msg) {
String s = new String(msg.getBody());
UserTest userTest = JSONObject.parseObject(s, UserTest.class);
System.err.println("this is listener 1 接收到的消息为" + userTest);
}
@RabbitListener(queues = "testqueuqe1")
private void testqueue1Listener2(Message msg) {
String s = new String(msg.getBody());
UserTest userTest = JSONObject.parseObject(s, UserTest.class);
System.err.println("this is listener2" + userTest);
}
四、路由模式(消息发送到交换机,再由交换机发送到消息队列):生产者根据key发送消息,不同的消费者按不同的key接收消息
1.创建交换机
1).创建队列testqueuqe3、testqueuqe4
2).将队列和交换机进行绑定
2.代码实现
// 消息生产者
private static final String exchange1 = "testexchange1";
public void sendExchangeMessageByKey(UserTest user) {
// exchange1代表交换机,a代表key
rabbitTemplate.convertAndSend(exchange1, "a", JSONObject.toJSONString(user));
}
交换机将消息发送到了testqueuqe3中
// 消息消费者1
@RabbitListener(queues = "testqueuqe3")
public void testqueue2ListenByKey(Message msg) {
String s = new String(msg.getBody());
UserTest userTest = JSONObject.parseObject(s, UserTest.class);
System.err.println("this is listener1" + userTest);
}
// 消息消费者2
@RabbitListener(queues = "testqueuqe4")
private void testqueue1Listener2ByKey(Message msg) {
String s = new String(msg.getBody());
UserTest userTest = JSONObject.parseObject(s, UserTest.class);
System.err.println("this is listener2" + userTest);
}
key为a,所以监听到testqueuqe3消息队列的消费者接收到消息,testqueuqe4未消费消息
五、通配符模式(消息发送到交换机,再由交换机发送到消息队列):生产者根据字符串匹配发送消息,不同的消费者按字符串匹配接收消息
- 创建队列
1).创建交换机
2).创建队列testqueuqe5、testqueuqe6
3).将交换机和队列绑定
// 定义 * 号路由,仅能匹配一个单词
private String logStart = "log.register";
// 定义 # 号路由,能匹配0个或多个单词
private String logHash = "log.register.error";
2.代码实现
// 消息发送者
// 定义 * 号路由,仅能匹配一个单词
private String logStart = "log.register";
// 定义 # 号路由,能匹配0个或多个单词
private String logHash = "log.register.error";
/**
* logStart 形式的log.*和log.#都可以匹配到
* logHash 形式的只有log.#可以匹配到
* 交换机类型必须为topic
*
* @param user
*/
@Override
public void sendExchangeMessageByTopic(UserTest user) {
try {
rabbitTemplate.convertAndSend(exchange2, logHash, JSONObject.toJSONString(user));
} catch (Exception e) {
e.printStackTrace();
}
}
// 消费者5
@RabbitListener(queues = "testqueuqe5")
public void testqueue2ListenByTopic(Message msg) {
String s = new String(msg.getBody());
UserTest userTest = JSONObject.parseObject(s, UserTest.class);
System.err.println("this is listener1" + userTest);
}
// 消费者6
@RabbitListener(queues = "testqueuqe6")
private void testqueue1Listener2ByTopic(Message msg) {
String s = new String(msg.getBody());
UserTest userTest = JSONObject.parseObject(s, UserTest.class);
System.err.println("this is listener2" + userTest);
}