RabbitMQ高级特性
0. 学习目标
- 能够说出什么是消息中间件
- 能够安装RabbitMQ
- 能够编写RabbitMQ的入门程序
- 能够说出RabbitMQ的5种模式特征
- 能够使用Spring整合RabbitMQ
1. RabbitMQ高级--内容介绍
1.1、 消息的可靠投递
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
- confirm 确认模式
- return 退回模式
rabbitmq 整个消息投递的路径为:
producer--->rabbitmq broker--->exchange--->queue--->consumer
- confi2rm 确认模式: 消息从 producer 到 exchange 则会返回一个 confirmCallback 。
- return 退回模式: 消息从 exchange-->queue 投递失败则会返回一个 returnCallback 。
我们将利用这两个 callback 控制消息的可靠性投递间。
1.1.1、消息的可靠投递 -- confirm 确认模式
具体代码和配置文件如下:
src/main/resources/rabbitmq.properties
rabbitmq.host=172.16.98.133
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=/
src/main/resources/spring-rabbitmq-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
测试使用:
com.itheima.test.ProducerTest
package com.itheima.test;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 确认模式:
* 步骤:
* 1. 确认模式开启:ConnectionFactory中开启publisher-confirms="true"
* 2. 在rabbitTemplate定义ConfirmCallBack回调函数
*/
@Test
public void testConfirm() {
//2. 定义回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { // 此处是匿名内部类的形式
/**
*
* @param correlationData 相关配置信息 correlation 英 /?k?r??le??n/
* @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm方法被执行了....");
if (ack) {
//接收成功
System.out.println("接收成功消息" + cause);
} else {
//接收失败
System.out.println("接收失败消息" + cause);
//做一些处理,让消息再次发送。
}
}
});
//3. 发送消息
rabbitTemplate.convertAndSend("test_exchange_confirm111", "confirm", "message confirm....");
}
1.1.2、消息的可靠投递 -- return 退回模式
其他和上边的一样,只有com.itheima.test.ProducerTest这个测试类中增加了代码如下
com.itheima.test.ProducerTest
/**
* 回退模式: 当消息发送给Exchange后,Exchange路由到Queue失败是 才会执行 ReturnCallBack
* 步骤:
* 1. 开启回退模式:publisher-returns="true"
* 2. 设置ReturnCallBack
* 3. 设置Exchange处理消息的模式://即: 设置交换机处理失败消息的模式 rabbitTemplate.setMandatory(true);
* 1. 如果消息没有路由到Queue,则丢弃消息(默认)
* 2. 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack
*/
@Test
public void testReturn() {
//设置交换机处理失败消息的模式
rabbitTemplate.setMandatory(true); // TODO: 这一步少了的话即使发送消息失败也不会调用回调方法
//2.设置ReturnCallBack
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
*
* @param message 发送的消息对象
* @param replyCode 返回的错误码
* @param replyText 错误信息
* @param exchange 交换机
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return 执行了....");
System.out.println(message);
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);
//处理
}
});
1.1.3、 消息的可靠投递小结
-
设置ConnectionFactory的publisher-confirms="true" 开启 确认模式。
-
使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。
-
设置ConnectionFactory的publisher-returns="true" 开启 退回模式。
-
使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。
-
在RabbitMQ中也提供了事务机制(即发送失败会进行回滚等),但是性能较差,此处不做讲解。
使用channel下列方法,完成事务控制:
txSelect(), 用于将当前channel设置成transaction模式
txCommit(),用于提交事务
txRollback(),用于回滚事务
1.2、 Consumer Ack
ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。
-
有三种确认方式:
- 自动确认:acknowledge="none"
- 手动确认:acknowledge="manual"
- 根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)
其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
此小节具体目录结构如下
1.2.1、具体步骤:
一、rabbitmq.properties
rabbitmq.host=172.16.98.133
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=/
二、spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
// acknowledge="manual" 设置手动确认 prefetch="1"
三、com.itheima.listener.AckListener
package com.itheima.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* Consumer ACK机制:
* 1. 设置手动签收。acknowledge="manual"
* 2. 让监听器类实现ChannelAwareMessageListener接口(因为这个接口里有这个方法onMessage(Message message, Channel channel)带Channel)
* 3. 如果消息成功处理,则调用channel的 basicAck()签收
* 4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,让broker重新发送给consumer
*
*
*/
@Component
public class AckListener implements ChannelAwareMessageListener {
// 这个onMessage(Message message, Channel channel)方法就会收到消息,并进行处理
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//1.接收转换消息
System.out.println(new String(message.getBody()));
//2. 处理业务逻辑
System.out.println("处理业务逻辑...");
int i = 3/0;//出现错误
//3. 手动签收
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
//e.printStackTrace();
//4.拒绝签收
/*
第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
*/
channel.basicNack(deliveryTag,true,true);
//channel.basicReject(deliveryTag,true);
}
}
}
测试类com.itheima.test.ConsumerTest
package com.itheima.test;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {
@Test
public void test(){
while (true){
// 这个死循环会一直监听消息
}
}
}
1.2.2、Consumer Ack 小结
-
在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认
-
如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息
-
如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。
其他可靠性机制
- 持久化
- exchange要持久化
- queue要持久化
- message要持久化
- 生产方确认Confirm
- 消费方确认Ack
- Broker高可用
myps:
- 上面那些则是手动确认的代码(包括下面xml文件中的这个配置之后Ack才会起作用)
- 下图是没有加入Ack确认机制的代码(直接就是自动确认消息) ;
市场上常见的消息队列有如下:
- ActiveMQ:基于JMS
- ZeroMQ:基于C语言开发
- RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
- RocketMQ:基于JMS,阿里巴巴产品
- Kafka:类似MQ的产品;分布式消息系统,高吞吐量
1.3、消费端限流
步骤一、新建com.itheima.listener.QosListener
Qos:服务质量保障
package com.itheima.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
/**
* Consumer 限流机制
* 实现步骤:
* 1. 确保ack机制为手动确认。
* 2.在listener-container中配置属性
* perfetch = 1,表示消费端每次从mq只拉取一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。
*/
@Component
public class QosListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(1000);
//1.获取消息
System.out.println(new String(message.getBody()));
//2. 处理业务逻辑
//3. 签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}
步骤二、修改xml文件配置
增加属性: perfetch = 1,表示消费端每次从mq只拉取一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。
1.3 消费端限流小结
- 在https://www.rabbitmq.com/clustering.html
首先确保RabbitMQ运行没有问题
[root@super ~]# rabbitmqctl status Status of node rabbit@super ... [{pid,10232}, {running_applications, [{rabbitmq_management,"RabbitMQ Management Console","3.6.5"}, {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.6.5"}, {webmachine,"webmachine","1.10.3"}, {mochiweb,"MochiMedia Web Server","2.13.1"}, {rabbitmq_management_agent,"RabbitMQ Management Agent","3.6.5"}, {rabbit,"RabbitMQ","3.6.5"}, {os_mon,"CPO CXC 138 46","2.4"}, {syntax_tools,"Syntax tools","1.7"}, {inets,"INETS CXC 138 49","6.2"}, {amqp_client,"RabbitMQ AMQP Client","3.6.5"}, {rabbit_common,[],"3.6.5"}, {ssl,"Erlang/OTP SSL application","7.3"}, {public_key,"Public key infrastructure","1.1.1"}, {asn1,"The Erlang ASN1 compiler version 4.0.2","4.0.2"}, {ranch,"Socket acceptor pool for TCP protocols.","1.2.1"}, {mnesia,"MNESIA CXC 138 12","4.13.3"}, {compiler,"ERTS CXC 138 10","6.0.3"}, {crypto,"CRYPTO","3.6.3"}, {xmerl,"XML parser","1.3.10"}, {sasl,"SASL CXC 138 11","2.7"}, {stdlib,"ERTS CXC 138 10","2.8"}, {kernel,"ERTS CXC 138 10","4.2"}]}, {os,{unix,linux}}, {erlang_version, "Erlang/OTP 18 [erts-7.3] [source] [64-bit] [async-threads:64] [hipe] [kernel-poll:true]\n"}, {memory, [{total,56066752}, {connection_readers,0}, {connection_writers,0}, {connection_channels,0}, {connection_other,2680}, {queue_procs,268248}, {queue_slave_procs,0}, {plugins,1131936}, {other_proc,18144280}, {mnesia,125304}, {mgmt_db,921312}, {msg_index,69440}, {other_ets,1413664}, {binary,755736}, {code,27824046}, {atom,1000601}, {other_system,4409505}]}, {alarms,[]}, {listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]}, {vm_memory_high_watermark,0.4}, {vm_memory_limit,411294105}, {disk_free_limit,50000000}, {disk_free,13270233088}, {file_descriptors, [{total_limit,924},{total_used,6},{sockets_limit,829},{sockets_used,0}]}, {processes,[{limit,1048576},{used,262}]}, {run_queue,0}, {uptime,43651}, {kernel,{net_ticktime,60}}]
停止rabbitmq服务
[root@super sbin]# service rabbitmq-server stop Stopping rabbitmq-server: rabbitmq-server.
启动第一个节点:
[root@super sbin]# RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit1 rabbitmq-server start RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc. ## ## Licensed under the MPL. See http://www.rabbitmq.com/ ## ## ########## Logs: /var/log/rabbitmq/rabbit1.log ###### ## /var/log/rabbitmq/rabbit1-sasl.log ########## Starting broker... completed with 6 plugins.
启动第二个节点:
web管理插件端口占用,所以还要指定其web插件占用的端口号。
[root@super ~]# RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server start RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc. ## ## Licensed under the MPL. See http://www.rabbitmq.com/ ## ## ########## Logs: /var/log/rabbitmq/rabbit2.log ###### ## /var/log/rabbitmq/rabbit2-sasl.log ########## Starting broker... completed with 6 plugins.
结束命令:
rabbitmqctl -n rabbit1 stop rabbitmqctl -n rabbit2 stop
rabbit1操作作为主节点:
[root@super ~]# rabbitmqctl -n rabbit1 stop_app Stopping node rabbit1@super ... [root@super ~]# rabbitmqctl -n rabbit1 reset Resetting node rabbit1@super ... [root@super ~]# rabbitmqctl -n rabbit1 start_app Starting node rabbit1@super ... [root@super ~]#
rabbit2操作为从节点:
[root@super ~]# rabbitmqctl -n rabbit2 stop_app Stopping node rabbit2@super ... [root@super ~]# rabbitmqctl -n rabbit2 reset Resetting node rabbit2@super ... [root@super ~]# rabbitmqctl -n rabbit2 join_cluster rabbit1@'super' ###''内是主机名换成自己的 Clustering node rabbit2@super with rabbit1@super ... [root@super ~]# rabbitmqctl -n rabbit2 start_app Starting node rabbit2@super ...
查看集群状态:
[root@super ~]# rabbitmqctl cluster_status -n rabbit1 Cluster status of node rabbit1@super ... [{nodes,[{disc,[rabbit1@super,rabbit2@super]}]}, {running_nodes,[rabbit2@super,rabbit1@super]}, {cluster_name,<<"rabbit1@super">>}, {partitions,[]}, {alarms,[{rabbit2@super,[]},{rabbit1@super,[]}]}]
web监控:
3.3 集群管理
rabbitmqctl join_cluster {cluster_node} [–ram]
将节点加入指定集群中。在这个命令执行前需要停止RabbitMQ应用并重置节点。rabbitmqctl cluster_status
显示集群的状态。rabbitmqctl change_cluster_node_type {disc|ram}
修改集群节点的类型。在这个命令执行前需要停止RabbitMQ应用。rabbitmqctl forget_cluster_node [–offline]
将节点从集群中删除,允许离线执行。rabbitmqctl update_cluster_nodes {clusternode}
在集群中的节点应用启动前咨询clusternode节点的最新信息,并更新相应的集群信息。这个和join_cluster不同,它不加入集群。考虑这样一种情况,节点A和节点B都在集群中,当节点A离线了,节点C又和节点B组成了一个集群,然后节点B又离开了集群,当A醒来的时候,它会尝试联系节点B,但是这样会失败,因为节点B已经不在集群中了。
rabbitmqctl cancel_sync_queue [-p vhost] {queue}
取消队列queue同步镜像的操作。rabbitmqctl set_cluster_name {name}
设置集群名称。集群名称在客户端连接时会通报给客户端。Federation和Shovel插件也会有用到集群名称的地方。集群名称默认是集群中第一个节点的名称,通过这个命令可以重新设置。3.4 RabbitMQ镜像集群配置
上面已经完成RabbitMQ默认集群模式,但并不保证队列的高可用性,尽管交换机、绑定这些可以复制到集群里的任何一个节点,但是队列内容不会复制。虽然该模式解决一项目组节点压力,但队列节点宕机直接导致该队列无法应用,只能等待重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内容到集群里的每个节点,必须要创建镜像队列。
镜像队列是基于普通的集群模式的,然后再添加一些策略,所以你还是得先配置普通集群,然后才能设置镜像队列,我们就以上面的集群接着做。
设置的镜像队列可以通过开启的网页的管理端Admin->Policies,也可以通过命令。
rabbitmqctl set_policy my_ha "^" '{"ha-mode":"all"}'
- Name:策略名称
- Pattern:匹配的规则,如果是匹配所有的队列,是^.
- Definition:使用ha-mode模式中的all,也就是同步所有匹配的队列。问号链接帮助文档。
3.5 负载均衡-HAProxy
HAProxy提供高可用性、负载均衡以及基于TCP和HTTP应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案,包括Twitter,Reddit,StackOverflow,GitHub在内的多家知名互联网公司在使用。HAProxy实现了一种事件驱动、单一进程模型,此模型支持非常大的并发连接数。
3.5.1 安装HAProxy
//下载依赖包 yum install gcc vim wget //上传haproxy源码包 //解压 tar -zxvf haproxy-1.6.5.tar.gz -C /usr/local //进入目录、进行编译、安装 cd /usr/local/haproxy-1.6.5 make TARGET=linux31 PREFIX=/usr/local/haproxy make install PREFIX=/usr/local/haproxy mkdir /etc/haproxy //赋权 groupadd -r -g 149 haproxy useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy //创建haproxy配置文件 mkdir /etc/haproxy vim /etc/haproxy/haproxy.cfg
3.5.2 配置HAProxy
配置文件路径:/etc/haproxy/haproxy.cfg
#logging options global log 127.0.0.1 local0 info maxconn 5120 chroot /usr/local/haproxy uid 99 gid 99 daemon quiet nbproc 20 pidfile /var/run/haproxy.pid defaults log global mode tcp option tcplog option dontlognull retries 3 option redispatch maxconn 2000 contimeout 5s clitimeout 60s srvtimeout 15s #front-end IP for consumers and producters listen rabbitmq_cluster bind 0.0.0.0:5672 mode tcp #balance url_param userid #balance url_param session_id check_post 64 #balance hdr(User-Agent) #balance hdr(host) #balance hdr(Host) use_domain_only #balance rdp-cookie #balance leastconn #balance source //ip balance roundrobin server node1 127.0.0.1:5673 check inter 5000 rise 2 fall 2 server node2 127.0.0.1:5674 check inter 5000 rise 2 fall 2 listen stats bind 172.16.98.133:8100 mode http option httplog stats enable stats uri /rabbitmq-stats stats refresh 5s
启动HAproxy负载
/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg //查看haproxy进程状态 ps -ef | grep haproxy 访问如下地址对mq节点进行监控 http://172.16.98.133:8100/rabbitmq-stats
代码中访问mq集群地址,则变为访问haproxy地址:5672