SpringCloud Stream RabbitMQ
目录
- 依赖引入
- 配置mq连接属性
- 仿照Processor
- 发送消息
- 接收消息
- 消费后返回消息
- 其他
- 多实例,仅一个实例可接受到消息
- 发送对象类型消息,在MQ中查看未消费的消息
依赖引入
org.springframework.cloud
spring-cloud-starter-stream-rabbit
配置mq连接属性
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
仿照Processor
- 常量类Constant
// 定中的常量
public static final String SEND_MSG = "sendmsg";
public static final String RECEIVED_MSG = "receivedmsg";
- 仿接口org.springframework.cloud.stream.messaging.Processor
public interface StreamClient {
@Input(Constant.SEND_MSG)
SubscribableChannel input();
@Output(Constant.SEND_MSG)
MessageChannel output();
发送消息
- 此处写在了 controller,也可以写在测试类
package com.cloud.order.controller;
import com.cloud.order.msg.StreamClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@RestController
public class SendMsg {
@Autowired
private StreamClient streamClient; // 自己定义的StreamClient
@RequestMapping("/sendmsg")
public void sendMsg() {
String msg = "hello " + new Date();
streamClient.output().send(MessageBuilder.withPayload(msg).build());
}
}
接收消息
@Component
@EnableBinding(StreamClient.class)//定义好的接口
@Slf4j
public class StreamReceiver {
@StreamListener(Constant.SEND_MSG) // 监听的消息队列
public String process(Object val){
log.info("StreamReceiver msg"+ val);
return "received msg "+new Date().getTime();
}
}
消费后返回消息
@Component
@EnableBinding(StreamClient.class)
@Slf4j
public class StreamReceiver {
@StreamListener(Constant.SEND_MSG)
@SendTo(Constant.RECEIVED_MSG) //返回给的队列,创建方式同上
public String process(Object val){
log.info("StreamReceiver msg"+ val);
return "received msg "+new Date().getTime();
}
@StreamListener(Constant.RECEIVED_MSG)
public void processReceiver(Object val){
log.info("received msg -- --"+ val);
}
}
其他
多实例,仅一个实例可接受到消息
- 添加到配置文件
# 仅让一个实例接收到消息,msg-是程序中定义的队列名字,order-根据语义自定义即可
spring.cloud.stream.bindings.msg.group=order
发送对象类型消息,在MQ中查看未消费的消息
# 可在消息队列中看到堆积的消息的(当消息为对象格式时)完整属性,msg-是程序中定义的队列名字
spring.cloud.stream.bindings.msg.content-type=application/json