RabbitMQ高级特性


0. 学习目标

  • 能够说出什么是消息中间件
  • 能够安装RabbitMQ
  • 能够编写RabbitMQ的入门程序
  • 能够说出RabbitMQ的5种模式特征
  • 能够使用Spring整合RabbitMQ

1. RabbitMQ高级--内容介绍

1.1、 消息的可靠投递

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

  • confirm 确认模式
  • return 退回模式

rabbitmq 整个消息投递的路径为
producer--->rabbitmq broker--->exchange--->queue--->consumer

  • confi2rm 确认模式: 消息从 producer 到 exchange 则会返回一个 confirmCallback 。
  • return 退回模式: 消息从 exchange-->queue 投递失败则会返回一个 returnCallback 。

我们将利用这两个 callback 控制消息的可靠性投递间。

1.1.1、消息的可靠投递 -- confirm 确认模式

具体代码和配置文件如下:

src/main/resources/rabbitmq.properties
rabbitmq.host=172.16.98.133
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=/
src/main/resources/spring-rabbitmq-producer.xml
<?xml version="1.0" encoding="UTF-8"?>

    
    

    
    
    
    

    
    



    
    
    
    
        
            
        
    


测试使用:

com.itheima.test.ProducerTest
package com.itheima.test;


import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    /**
     * 确认模式:
     * 步骤:
     * 1. 确认模式开启:ConnectionFactory中开启publisher-confirms="true"
     * 2. 在rabbitTemplate定义ConfirmCallBack回调函数
     */
    @Test
    public void testConfirm() {

        //2. 定义回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { // 此处是匿名内部类的形式
            /**
             *
             * @param correlationData 相关配置信息 correlation 英 /?k?r??le??n/ 
             * @param ack   exchange交换机 是否成功收到了消息。true 成功,false代表失败
             * @param cause 失败原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirm方法被执行了....");

                if (ack) {
                    //接收成功
                    System.out.println("接收成功消息" + cause);
                } else {
                    //接收失败
                    System.out.println("接收失败消息" + cause);
                    //做一些处理,让消息再次发送。
                }
            }
        });

        //3. 发送消息
        rabbitTemplate.convertAndSend("test_exchange_confirm111", "confirm", "message confirm....");
    }

1.1.2、消息的可靠投递 -- return 退回模式

其他和上边的一样,只有com.itheima.test.ProducerTest这个测试类中增加了代码如下

com.itheima.test.ProducerTest

    /**
     * 回退模式: 当消息发送给Exchange后,Exchange路由到Queue失败是 才会执行 ReturnCallBack
     * 步骤:
     * 1. 开启回退模式:publisher-returns="true"
     * 2. 设置ReturnCallBack
     * 3. 设置Exchange处理消息的模式://即: 设置交换机处理失败消息的模式 rabbitTemplate.setMandatory(true);
     *      1. 如果消息没有路由到Queue,则丢弃消息(默认)
     *      2. 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack
     */

    @Test
    public void testReturn() {

        //设置交换机处理失败消息的模式 
        rabbitTemplate.setMandatory(true); // TODO: 这一步少了的话即使发送消息失败也不会调用回调方法

        //2.设置ReturnCallBack
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             *
             * @param message   发送的消息对象
             * @param replyCode 返回的错误码
             * @param replyText 错误信息
             * @param exchange  交换机
             * @param routingKey 路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("return 执行了....");

                System.out.println(message);
                System.out.println(replyCode);
                System.out.println(replyText);
                System.out.println(exchange);
                System.out.println(routingKey);

                //处理
            }
        });


1.1.3、 消息的可靠投递小结

  • 设置ConnectionFactory的publisher-confirms="true" 开启 确认模式。

  • 使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。

  • 设置ConnectionFactory的publisher-returns="true" 开启 退回模式。

  • 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。

  • 在RabbitMQ中也提供了事务机制(即发送失败会进行回滚等),但是性能较差,此处不做讲解。
    使用channel下列方法,完成事务控制:
    txSelect(), 用于将当前channel设置成transaction模式
    txCommit(),用于提交事务
    txRollback(),用于回滚事务


1.2、 Consumer Ack

ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。

  • 有三种确认方式:

    • 自动确认:acknowledge="none"
    • 手动确认:acknowledge="manual"
    • 根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)

    其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

此小节具体目录结构如下

1.2.1、具体步骤:

一、rabbitmq.properties

rabbitmq.host=172.16.98.133
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=/

二、spring-rabbitmq-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>

    
    

    
    


    

     //  acknowledge="manual" 设置手动确认    prefetch="1" 
    
   		
    



三、com.itheima.listener.AckListener

package com.itheima.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * Consumer ACK机制:
 *  1. 设置手动签收。acknowledge="manual"
 *  2. 让监听器类实现ChannelAwareMessageListener接口(因为这个接口里有这个方法onMessage(Message message, Channel channel)带Channel)
 *  3. 如果消息成功处理,则调用channel的 basicAck()签收
 *  4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,让broker重新发送给consumer
 *
 *
 */

@Component
public class AckListener implements ChannelAwareMessageListener {

    // 这个onMessage(Message message, Channel channel)方法就会收到消息,并进行处理
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            //1.接收转换消息
            System.out.println(new String(message.getBody()));

            //2. 处理业务逻辑
            System.out.println("处理业务逻辑...");
            int i = 3/0;//出现错误
            //3. 手动签收
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            //e.printStackTrace();

            //4.拒绝签收
            /*
            第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
             */
            channel.basicNack(deliveryTag,true,true);
            //channel.basicReject(deliveryTag,true);
        }
    }
}

测试类com.itheima.test.ConsumerTest

package com.itheima.test;


import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {

    @Test
    public void test(){
        while (true){
			// 这个死循环会一直监听消息
        }
    }
}

1.2.2、Consumer Ack 小结

  • 在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认

  • 如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息

  • 如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。

其他可靠性机制

  • 持久化
    • exchange要持久化
    • queue要持久化
    • message要持久化
  • 生产方确认Confirm
  • 消费方确认Ack
  • Broker高可用

myps:
  • 上面那些则是手动确认的代码(包括下面xml文件中的这个配置之后Ack才会起作用)

  • 下图是没有加入Ack确认机制的代码(直接就是自动确认消息) ;

市场上常见的消息队列有如下:

  • ActiveMQ:基于JMS
  • ZeroMQ:基于C语言开发
  • RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
  • RocketMQ:基于JMS,阿里巴巴产品
  • Kafka:类似MQ的产品;分布式消息系统,高吞吐量

1.3、消费端限流

步骤一、新建com.itheima.listener.QosListener

Qos:服务质量保障

package com.itheima.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

/**
 * Consumer 限流机制
 * 实现步骤:
 *  1. 确保ack机制为手动确认。
 *  2.在listener-container中配置属性
 *      perfetch = 1,表示消费端每次从mq只拉取一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。
 */

@Component
public class QosListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {

        Thread.sleep(1000);
        //1.获取消息
        System.out.println(new String(message.getBody()));

        //2. 处理业务逻辑

        //3. 签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);

    }
}

步骤二、修改xml文件配置

增加属性: perfetch = 1,表示消费端每次从mq只拉取一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。

    
    

1.3 消费端限流小结

  • 在https://www.rabbitmq.com/clustering.html

    首先确保RabbitMQ运行没有问题

    [root@super ~]# rabbitmqctl status
    Status of node rabbit@super ...
    [{pid,10232},
     {running_applications,
         [{rabbitmq_management,"RabbitMQ Management Console","3.6.5"},
          {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.6.5"},
          {webmachine,"webmachine","1.10.3"},
          {mochiweb,"MochiMedia Web Server","2.13.1"},
          {rabbitmq_management_agent,"RabbitMQ Management Agent","3.6.5"},
          {rabbit,"RabbitMQ","3.6.5"},
          {os_mon,"CPO  CXC 138 46","2.4"},
          {syntax_tools,"Syntax tools","1.7"},
          {inets,"INETS  CXC 138 49","6.2"},
          {amqp_client,"RabbitMQ AMQP Client","3.6.5"},
          {rabbit_common,[],"3.6.5"},
          {ssl,"Erlang/OTP SSL application","7.3"},
          {public_key,"Public key infrastructure","1.1.1"},
          {asn1,"The Erlang ASN1 compiler version 4.0.2","4.0.2"},
          {ranch,"Socket acceptor pool for TCP protocols.","1.2.1"},
          {mnesia,"MNESIA  CXC 138 12","4.13.3"},
          {compiler,"ERTS  CXC 138 10","6.0.3"},
          {crypto,"CRYPTO","3.6.3"},
          {xmerl,"XML parser","1.3.10"},
          {sasl,"SASL  CXC 138 11","2.7"},
          {stdlib,"ERTS  CXC 138 10","2.8"},
          {kernel,"ERTS  CXC 138 10","4.2"}]},
     {os,{unix,linux}},
     {erlang_version,
         "Erlang/OTP 18 [erts-7.3] [source] [64-bit] [async-threads:64] [hipe] [kernel-poll:true]\n"},
     {memory,
         [{total,56066752},
          {connection_readers,0},
          {connection_writers,0},
          {connection_channels,0},
          {connection_other,2680},
          {queue_procs,268248},
          {queue_slave_procs,0},
          {plugins,1131936},
          {other_proc,18144280},
          {mnesia,125304},
          {mgmt_db,921312},
          {msg_index,69440},
          {other_ets,1413664},
          {binary,755736},
          {code,27824046},
          {atom,1000601},
          {other_system,4409505}]},
     {alarms,[]},
     {listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]},
     {vm_memory_high_watermark,0.4},
     {vm_memory_limit,411294105},
     {disk_free_limit,50000000},
     {disk_free,13270233088},
     {file_descriptors,
         [{total_limit,924},{total_used,6},{sockets_limit,829},{sockets_used,0}]},
     {processes,[{limit,1048576},{used,262}]},
     {run_queue,0},
     {uptime,43651},
     {kernel,{net_ticktime,60}}]
    

    停止rabbitmq服务

    [root@super sbin]# service rabbitmq-server stop
    Stopping rabbitmq-server: rabbitmq-server.
    
    

    启动第一个节点:

    [root@super sbin]# RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit1 rabbitmq-server start
    
                  RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.
      ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
      ##  ##
      ##########  Logs: /var/log/rabbitmq/rabbit1.log
      ######  ##        /var/log/rabbitmq/rabbit1-sasl.log
      ##########
                  Starting broker...
     completed with 6 plugins.
    

    启动第二个节点:

    web管理插件端口占用,所以还要指定其web插件占用的端口号。

    [root@super ~]# RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server start
    
                  RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.
      ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
      ##  ##
      ##########  Logs: /var/log/rabbitmq/rabbit2.log
      ######  ##        /var/log/rabbitmq/rabbit2-sasl.log
      ##########
                  Starting broker...
     completed with 6 plugins.
    
    

    结束命令:

    rabbitmqctl -n rabbit1 stop
    rabbitmqctl -n rabbit2 stop
    

    rabbit1操作作为主节点:

    [root@super ~]# rabbitmqctl -n rabbit1 stop_app  
    Stopping node rabbit1@super ...
    [root@super ~]# rabbitmqctl -n rabbit1 reset	 
    Resetting node rabbit1@super ...
    [root@super ~]# rabbitmqctl -n rabbit1 start_app
    Starting node rabbit1@super ...
    [root@super ~]# 
    

    rabbit2操作为从节点:

    [root@super ~]# rabbitmqctl -n rabbit2 stop_app
    Stopping node rabbit2@super ...
    [root@super ~]# rabbitmqctl -n rabbit2 reset
    Resetting node rabbit2@super ...
    [root@super ~]# rabbitmqctl -n rabbit2 join_cluster rabbit1@'super' ###''内是主机名换成自己的
    Clustering node rabbit2@super with rabbit1@super ...
    [root@super ~]# rabbitmqctl -n rabbit2 start_app
    Starting node rabbit2@super ...
    
    

    查看集群状态:

    [root@super ~]# rabbitmqctl cluster_status -n rabbit1
    Cluster status of node rabbit1@super ...
    [{nodes,[{disc,[rabbit1@super,rabbit2@super]}]},
     {running_nodes,[rabbit2@super,rabbit1@super]},
     {cluster_name,<<"rabbit1@super">>},
     {partitions,[]},
     {alarms,[{rabbit2@super,[]},{rabbit1@super,[]}]}]
    

    web监控:

    1566065096459

    3.3 集群管理

    rabbitmqctl join_cluster {cluster_node} [–ram]
    将节点加入指定集群中。在这个命令执行前需要停止RabbitMQ应用并重置节点。

    rabbitmqctl cluster_status
    显示集群的状态。

    rabbitmqctl change_cluster_node_type {disc|ram}
    修改集群节点的类型。在这个命令执行前需要停止RabbitMQ应用。

    rabbitmqctl forget_cluster_node [–offline]
    将节点从集群中删除,允许离线执行。

    rabbitmqctl update_cluster_nodes {clusternode}

    在集群中的节点应用启动前咨询clusternode节点的最新信息,并更新相应的集群信息。这个和join_cluster不同,它不加入集群。考虑这样一种情况,节点A和节点B都在集群中,当节点A离线了,节点C又和节点B组成了一个集群,然后节点B又离开了集群,当A醒来的时候,它会尝试联系节点B,但是这样会失败,因为节点B已经不在集群中了。

    rabbitmqctl cancel_sync_queue [-p vhost] {queue}
    取消队列queue同步镜像的操作。

    rabbitmqctl set_cluster_name {name}
    设置集群名称。集群名称在客户端连接时会通报给客户端。Federation和Shovel插件也会有用到集群名称的地方。集群名称默认是集群中第一个节点的名称,通过这个命令可以重新设置。

    3.4 RabbitMQ镜像集群配置

    上面已经完成RabbitMQ默认集群模式,但并不保证队列的高可用性,尽管交换机、绑定这些可以复制到集群里的任何一个节点,但是队列内容不会复制。虽然该模式解决一项目组节点压力,但队列节点宕机直接导致该队列无法应用,只能等待重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内容到集群里的每个节点,必须要创建镜像队列。

    镜像队列是基于普通的集群模式的,然后再添加一些策略,所以你还是得先配置普通集群,然后才能设置镜像队列,我们就以上面的集群接着做。

    设置的镜像队列可以通过开启的网页的管理端Admin->Policies,也可以通过命令。

    rabbitmqctl set_policy my_ha "^" '{"ha-mode":"all"}'

    1566072300852

    • Name:策略名称
    • Pattern:匹配的规则,如果是匹配所有的队列,是^.
    • Definition:使用ha-mode模式中的all,也就是同步所有匹配的队列。问号链接帮助文档。

    3.5 负载均衡-HAProxy

    HAProxy提供高可用性、负载均衡以及基于TCP和HTTP应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案,包括Twitter,Reddit,StackOverflow,GitHub在内的多家知名互联网公司在使用。HAProxy实现了一种事件驱动、单一进程模型,此模型支持非常大的并发连接数。

    3.5.1 安装HAProxy
    //下载依赖包
    yum install gcc vim wget
    //上传haproxy源码包
    //解压
    tar -zxvf haproxy-1.6.5.tar.gz -C /usr/local
    //进入目录、进行编译、安装
    cd /usr/local/haproxy-1.6.5
    make TARGET=linux31 PREFIX=/usr/local/haproxy
    make install PREFIX=/usr/local/haproxy
    mkdir /etc/haproxy
    //赋权
    groupadd -r -g 149 haproxy
    useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy
    //创建haproxy配置文件
    mkdir /etc/haproxy
    vim /etc/haproxy/haproxy.cfg
    
    3.5.2 配置HAProxy

    配置文件路径:/etc/haproxy/haproxy.cfg

    #logging options
    global
    	log 127.0.0.1 local0 info
    	maxconn 5120
    	chroot /usr/local/haproxy
    	uid 99
    	gid 99
    	daemon
    	quiet
    	nbproc 20
    	pidfile /var/run/haproxy.pid
    
    defaults
    	log global
    	
    	mode tcp
    
    	option tcplog
    	option dontlognull
    	retries 3
    	option redispatch
    	maxconn 2000
    	contimeout 5s
       
         clitimeout 60s
    
         srvtimeout 15s	
    #front-end IP for consumers and producters
    
    listen rabbitmq_cluster
    	bind 0.0.0.0:5672
    	
    	mode tcp
    	#balance url_param userid
    	#balance url_param session_id check_post 64
    	#balance hdr(User-Agent)
    	#balance hdr(host)
    	#balance hdr(Host) use_domain_only
    	#balance rdp-cookie
    	#balance leastconn
    	#balance source //ip
    	
    	balance roundrobin
    	
            server node1 127.0.0.1:5673 check inter 5000 rise 2 fall 2
            server node2 127.0.0.1:5674 check inter 5000 rise 2 fall 2
    
    listen stats
    	bind 172.16.98.133:8100
    	mode http
    	option httplog
    	stats enable
    	stats uri /rabbitmq-stats
    	stats refresh 5s
    

    启动HAproxy负载

    /usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
    //查看haproxy进程状态
    ps -ef | grep haproxy
    
    访问如下地址对mq节点进行监控
    http://172.16.98.133:8100/rabbitmq-stats
    

    代码中访问mq集群地址,则变为访问haproxy地址:5672

相关