【RabbitMQ】Spring boot Demo(九)
1、项目配置文件
pom文件
<?xml version="1.0" encoding="UTF-8"?>
4.0.0
org.springframework.boot
spring-boot-starter-parent
2.6.3
com.wchw
rabbitmq-demo2
0.0.1-SNAPSHOT
rabbitmq-demo2
rabbitmq demo2
1.8
org.springframework.boot
spring-boot-starter-amqp
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-test
test
org.springframework.amqp
spring-rabbit-test
test
org.springframework.boot
spring-boot-maven-plugin
application.yml
server:
port: 8080
spring:
rabbitmq:
username: admin
password: wsxadmin
virtual-host: /
host: test.rabbitmq.com
port: 5672
Fanout模式用例
生产者
@Configuration
public class RabbitMqFanoutConfiguration {
// 1、声明 Fanout 交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(Constants.FANOUT_EXCHANGE_NAME, true, false);
}
// 2、声明队列
@Bean(Constants.FANOUT_QUEUE1_NAME)
public Queue queue1() {
return new Queue(Constants.FANOUT_QUEUE1_NAME);
}
@Bean(Constants.FANOUT_QUEUE2_NAME)
public Queue queue2() {
return new Queue(Constants.FANOUT_QUEUE2_NAME);
}
// 3、绑定队列
@Bean("queue1Binding")
public Binding queue1Binding() {
return BindingBuilder.bind(queue1()).to(fanoutExchange());
}
@Bean("queue2Binding")
public Binding queue2Binding() {
return BindingBuilder.bind(queue2()).to(fanoutExchange());
}
}
@Service
public class FanoutSendMsgService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(String msg) {
rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE_NAME, "", msg);
}
}
消费者
@Configuration
public class FanoutMsgListener {
/**
* 使用手动确认
* @param channel
* @param message
*/
@RabbitListener(queues = Constants.FANOUT_QUEUE1_NAME, ackMode = "MANUAL")
public void receiveQueue1Msg(Channel channel, Message message) {
try {
System.out.println("receiveQueue1Msg ====>>> " + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
System.out.println("消息已重复处理失败,拒绝再次接收!");
// 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e1) {
e1.printStackTrace();
}
} else {
System.out.println("消息即将再次返回队列处理!");
// requeue为是否重新回到队列,true重新入队
try {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} catch (IOException e1) {
e1.printStackTrace();
}
}
e.printStackTrace();
}
}
@RabbitListener(queues = Constants.FANOUT_QUEUE2_NAME)
public void receiveQueue2Msg(Message message) {
try {
System.out.println("receiveQueue2Msg ====>>> " + new String(message.getBody()));
} catch (Exception e) {
e.printStackTrace();
}
}
}
Direct 模式用例
生产者
@Configuration
public class RabbitMqDirectConfiguration {
// 1、声明 Direct 交换机
@Bean
public DirectExchange directExchange() {
return new DirectExchange(Constants.DIRECT_EXCHANGE_NAME, true, false);
}
// 2、声明队列
@Bean(Constants.QUEUE_DIRECT_EMAIL)
public Queue queue1() {
return QueueBuilder.durable(Constants.QUEUE_DIRECT_EMAIL).build();
}
@Bean(Constants.QUEUE_DIRECT_MESSAGE)
public Queue queue2() {
return QueueBuilder.durable(Constants.QUEUE_DIRECT_MESSAGE).build();
}
// 3、绑定队列
@Bean("queue1BindingDirect")
public Binding queue1Binding() {
return BindingBuilder.bind(queue1()).to(directExchange()).with("info");
}
@Bean("queue2BindingDirect")
public Binding queue2Binding() {
return BindingBuilder.bind(queue2()).to(directExchange()).with("error");
}
}
@Service
public class DirectSendMsgService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(String msg, String routingKey) {
rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE_NAME, routingKey, msg);
}
}
消费者
@Configuration
public class DirectMsgListener {
@RabbitListener(queues = Constants.QUEUE_DIRECT_EMAIL)
public void receiveQueue1Msg(Channel channel, Message message) {
System.out.println("receiveQueue1Msg ====>>> " + new String(message.getBody()));
}
@RabbitListener(queues = Constants.QUEUE_DIRECT_MESSAGE)
public void receiveQueue2Msg(Message message) {
try {
System.out.println("receiveQueue2Msg ====>>> " + new String(message.getBody()));
} catch (Exception e) {
e.printStackTrace();
}
}
}