【RabbitMQ】Spring boot Demo(九)


1、项目配置文件

pom文件
<?xml version="1.0" encoding="UTF-8"?>

	4.0.0
	
		org.springframework.boot
		spring-boot-starter-parent
		2.6.3
		 
	
	com.wchw
	rabbitmq-demo2
	0.0.1-SNAPSHOT
	rabbitmq-demo2
	rabbitmq demo2
	
		1.8
	
	
		
			org.springframework.boot
			spring-boot-starter-amqp
		
		
			org.springframework.boot
			spring-boot-starter-web
		

		
			org.springframework.boot
			spring-boot-starter-test
			test
		
		
			org.springframework.amqp
			spring-rabbit-test
			test
		
	
	
		
			
				org.springframework.boot
				spring-boot-maven-plugin
			
		
	

application.yml
server:
    port: 8080

spring:
    rabbitmq:
        username: admin
        password: wsxadmin
        virtual-host: /
        host: test.rabbitmq.com
        port: 5672

Fanout模式用例

生产者
@Configuration
public class RabbitMqFanoutConfiguration {

    // 1、声明 Fanout 交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(Constants.FANOUT_EXCHANGE_NAME, true, false);
    }
    // 2、声明队列

    @Bean(Constants.FANOUT_QUEUE1_NAME)
    public Queue queue1() {
        return new Queue(Constants.FANOUT_QUEUE1_NAME);
    }

    @Bean(Constants.FANOUT_QUEUE2_NAME)
    public Queue queue2() {
        return new Queue(Constants.FANOUT_QUEUE2_NAME);
    }

    // 3、绑定队列
    @Bean("queue1Binding")
    public Binding queue1Binding() {
        return BindingBuilder.bind(queue1()).to(fanoutExchange());
    }

    @Bean("queue2Binding")
    public Binding queue2Binding() {
        return BindingBuilder.bind(queue2()).to(fanoutExchange());
    }

}
@Service
public class FanoutSendMsgService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(String msg) {
        rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE_NAME, "", msg);
    }
}
消费者
@Configuration
public class FanoutMsgListener {
    /**
     * 使用手动确认
     * @param channel
     * @param message
     */
    @RabbitListener(queues = Constants.FANOUT_QUEUE1_NAME, ackMode = "MANUAL")
    public void receiveQueue1Msg(Channel channel, Message message) {
        try {
            System.out.println("receiveQueue1Msg ====>>> " + new String(message.getBody()));
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                System.out.println("消息已重复处理失败,拒绝再次接收!");
                // 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列
                try {
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            } else {
                System.out.println("消息即将再次返回队列处理!");
                // requeue为是否重新回到队列,true重新入队
                try {
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
            e.printStackTrace();
        }
    }

    @RabbitListener(queues = Constants.FANOUT_QUEUE2_NAME)
    public void receiveQueue2Msg(Message message) {
        try {
            System.out.println("receiveQueue2Msg ====>>> " + new String(message.getBody()));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Direct 模式用例

生产者
@Configuration
public class RabbitMqDirectConfiguration {

    // 1、声明 Direct 交换机
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(Constants.DIRECT_EXCHANGE_NAME, true, false);
    }
    // 2、声明队列

    @Bean(Constants.QUEUE_DIRECT_EMAIL)
    public Queue queue1() {
        return QueueBuilder.durable(Constants.QUEUE_DIRECT_EMAIL).build();
    }

    @Bean(Constants.QUEUE_DIRECT_MESSAGE)
    public Queue queue2() {
        return QueueBuilder.durable(Constants.QUEUE_DIRECT_MESSAGE).build();
    }

    // 3、绑定队列
    @Bean("queue1BindingDirect")
    public Binding queue1Binding() {
        return BindingBuilder.bind(queue1()).to(directExchange()).with("info");
    }

    @Bean("queue2BindingDirect")
    public Binding queue2Binding() {
        return BindingBuilder.bind(queue2()).to(directExchange()).with("error");
    }

}

@Service
public class DirectSendMsgService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(String msg, String routingKey) {
        rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE_NAME, routingKey, msg);
    }
}

消费者


@Configuration
public class DirectMsgListener {

    @RabbitListener(queues = Constants.QUEUE_DIRECT_EMAIL)
    public void receiveQueue1Msg(Channel channel, Message message) {
        System.out.println("receiveQueue1Msg ====>>> " + new String(message.getBody()));
    }

    @RabbitListener(queues = Constants.QUEUE_DIRECT_MESSAGE)
    public void receiveQueue2Msg(Message message) {
        try {
            System.out.println("receiveQueue2Msg ====>>> " + new String(message.getBody()));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}