# RabbitMQ


RabbitMQ

一、 介绍

应用场景

异步处理

?

特点

? 一般消息队列产品都有上边这两种模式

二、RabbitMQ概念

RabbitMQ简介:
RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。

核心概念

Message

消息,消息是不具名的,它由消息头和消息体组成(类似于http请求头;头是对消息的参数和设置,体则是真正的数据内容)。消息体是不透明的,而消息头则由一系列的可选属性组成, 这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可 能需要持久性存储)等。

Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序。

Exchange

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别

Queue

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直 在队列里面,等待消费者连接到这个队列将其取走。

Binding

绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交 换器理解成一个由绑定构成的路由表。
Exchange 和Queue的绑定可以是多对多的关系。

Connection

网络连接,比如一个TCP连接。(是长连接,内有多个信道)

Channel

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道 发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都 是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

Consumer

消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

Virtual Host

虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加 密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥 有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时 指定,RabbitMQ 默认的 vhost 是 / 。

Broker

表示消息队列服务器实体

五、RabbitMQ整合

  1. 引入 spring-boot-starter-amqp
  2. application.yml配置
  3. 测试RabbitMQ

    ? AmqpAdmin:管理组件
    ? RabbitTemplate:消息发送处理组件
    ? @RabbitListener 监听消息的方法可以有三种参数(不分数量,顺序)
    ? Object content, Message message, Channel channel

package com.atguigu.gulimall.order;

import com.atguigu.gulimall.order.entity.OrderEntity;
import com.atguigu.gulimall.order.entity.OrderItemEntity;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;
import java.util.Date;
import java.util.UUID;

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class GulimallOrderApplicationTests {

    @Test
    public void contextLoads() {
    }
    @Resource
    private AmqpAdmin amqpAdmin;

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Value("${myRabbitmq.queue}")
    private String queue;

    @Value("${myRabbitmq.exchange}")
    private String exchange;

    @Value("${myRabbitmq.routeKey}")
    private String routeKey;

    /**
     * 发送的消息是一个对象 必须实现序列化
     */
    @Test
    public void sendMessageTest(){
        OrderEntity entity = new OrderEntity();
        entity.setId(1L);
        entity.setCommentTime(new Date());
        entity.setCreateTime(new Date());
        entity.setConfirmStatus(0);
        entity.setAutoConfirmDay(1);
        entity.setGrowth(1);
        entity.setMemberId(12L);

        OrderItemEntity orderEntity = new OrderItemEntity();
        orderEntity.setCategoryId(225L);
        orderEntity.setId(1L);
        orderEntity.setOrderSn("mall");
        orderEntity.setSpuName("华为");
        for (int i = 0; i < 10; i++) {
            if(i % 2 == 0){
                entity.setReceiverName("FIRE-" + i);
                rabbitTemplate.convertAndSend(this.exchange, this.routeKey, entity, new CorrelationData(UUID.randomUUID().toString().replace("-","")));
            }else {
                orderEntity.setOrderSn("mall-" + i);
                rabbitTemplate.convertAndSend(this.exchange, this.routeKey, orderEntity, new CorrelationData(UUID.randomUUID().toString().replace("-","")));
            }
            log.info("\n路由键:" + this.routeKey + "的消息发送成功");
        }
    }

    /**
     * 		  目的地					目的地类型				交换机				路由键
     * String destination, DestinationType destinationType, String exchange, String routingKey,
     *                        @Nullable Map arguments 自定义参数
     * 
     */

(3)    @Test
    public void bindIng() {

        Binding binding = new Binding(this.queue, 
                                      Binding.DestinationType.QUEUE, 
                                      this.exchange, this.routeKey, 
                                      null);
        amqpAdmin.declareBinding(binding);
        log.info("\n[" + binding.getExchange() + "] 与 [" + binding.getDestination() + "] 绑定成功");
    }

(2)    @Test
    public void createQueue() {

        // 参数: 持久化:true  是否排他:false 是否自动删除:false
        Queue queue = new Queue(this.queue, true, false, false);

        amqpAdmin.declareQueue(queue);
        log.info("\nQueue [" + queue.getName() + "] 创建成功");
    }

    /**
     * 1、使用amqpAdmin创建Exchange Queue Binding
     * 2、发送消息
     */
(1)    @Test
    public void createExchange() {
        // 1。1  参数1 交换机名字 参数2 是否持久化 参数4 可以指定一些参数
        DirectExchange exchange = new DirectExchange("hello-java-exchange", true, false);
        //DirectExchange exchange = new DirectExchange(this.exchange, true, false);
        // 1、声明一个交换机
        amqpAdmin.declareExchange(exchange);
        log.info("\nExchange [" + exchange.getName() + "] 创建成功");
    }

}

发送消息

如果是默认的话,是使用序列化转换器,我们也可以使用json转换器

image-20220222231630361

上边的是默认转换器

像下边这样写一个配置类,在类中放一个json转换器到容器中即可

相关