MQ学习3
RAbbitMQ 3
0 两个问题
-
消费者如何让服务器知道消费者已经消费消息?
-
//消费消息 channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
-
第二个参数 autoAck(自动确认) 为true
-
当消费者消费消息之后,会向服务器发送一个回调,告诉服务器,已经消费消息
-
-
生产者如何确认生产者成功发送消息到服务器?(两种方式)
-
第一种方式:事务
通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案
-
第二种方式:确认模式
通过将channel设置成confirm模式来实现
-
1 事务
1.1 AMQP事务机制控制
RabbitMQ中与事务机制相关的方法:
1. txSelect():开启事务
2. txCommit():提交事务
3. txRollback():回滚事务
1.2 代码
-
使用try-catch-fianlly处理异常
Connection connection = null; Channel channel = null; //创建连接,创建信道 try { connection = factory.newConnection(); channel = connection.createChannel(); } catch(){} finally{}
-
catch 异常,回滚事务
catch (Exception e){ //回滚事务 channel.txRollback(); e.printStackTrace(); }
-
finally关闭资源(先关小的,再关大的)
finally { if(channel!=null){ channel.close(); } if (connection!=null){ connection.close(); } }
-
发消息前开启事务
//开启事务 channel.txSelect();
-
发消息后提交事务
//提交事务 channel.txCommit();
-
发消息后,提交事务前,手动抛出异常
int i = 2 / 0;
1.3 结果
- 未开启消费者时,生产者生产的消息会阻塞在队列
- 当有事务处理时,若出现异常,并未有消息阻塞在队列
- 当有事务处理时,若没有异常,消息正常阻塞在队列
1.4 其他
很少使用RabbitMQ的事务
- 因为RabbitMQ事务能力不强,不如关系型数据库的事务能力
- 会降低性能,因为事务是同步的,而RabbitMQ本身是异步的,如果使用事务会大幅度降低性能
但是事务确实能解决“生产者如何确认生产者成功发送消息到服务器?”的问题
- 如果事务提交成功,则成功发送消息
- 如果事务回滚,则消息发送失败
2 confirm确认模式(非常重要)
AMQP协议层面提供的事务机制解决了问题,但是会降低RabbitMQ的消息吞吐量。
有更好的方法既能保障producer知道消息已经正确送到,又能基本上不带来性能上的损失:
RabbitMQ提供了一个更好的方案,即将channel信道设置成confirm模式
2.1 原理
生产者将信道设置成confirm确认模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(ID从1开始)。一旦消息被投递到所有匹配的队列之后,broker(服务器)就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker(服务器)回传给生产者的确认消息中 deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ark 的 multiple域(多条确认的域),表示这个序列号之前的所有消息都已经得到了处理。
confirm模式最大的好处在于它是异步的(事务带来的性能降低就是由于事务是同步但RabbitMQ是异步),一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息。如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息
在channel被设置成confirm模式之后,所有被publish的后续消息都将被confirm(即ack) 或者nack一次。但是没有对消息被confirm的快慢做任何保证,并且同一条消息不会既被confirm又被nack。
注意:两种事务控制形式不能同时开启
2.2 confirm机制三种方式
-
普通confirm模式:(同步确认,不推荐)
每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm了
-
批量confirm模式:(同步确认,不推荐)
每发送一批消息后,调用waitForConfirmsOrDie()方法,等待服务器端confirm。
-
异步confirm模式:(推荐)
提供一个回调方法,服务端confirm了一条或者多条消息后client端会回调这个方法
2.3 同步确认 sync
synchronization
2.3.1 代码(生产者代码)
依然是try-catch-finally
-
发送消息前启动确认模式
//启动确认模式 channel.confirmSelect();
-
发送消息后等待确认(单条)
//普通确认 if(channel.waitForConfirms()){ System.out.println("确认成功"); }
返回值为boolean:确认成功则true,失败则false
目前使用的是本地网络,没有特别大的网络波动,所以不存在确认不成功
-
发送消息后等待确认(批量)
//批量确认 channel.waitForConfirmsOrDie();
无返回值void
批量确认,只要有一条确认不成功直接抛异常
2.4 异步确认 async
异步确认模式的编程实现最复杂,Channel对象提供的 ConfirmListener() 回调方法只包含 deliveryTag(当前Channel发出的消息序号)(可以返回一条或多条) , 我们需要自己为每一个Channel维护一个uncofirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次 handleAck 方法,unconfirm集合删掉相应的一条 (multiple=false) 或 多条(multiple=true) 记录。
从运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。
实际上, waitForConfirms() 方法也是通过SortedSet维护消息序号的
2.4.1 代码(生产者代码)
-
创建信道、连接前:维护信息发送回执deliveryTag
//维护信息发送回执deliveryTag final SortedSet
confirmSet = Collections.synchronizedSortedSet( new TreeSet ()); -
启动确认模式
//启动确认模式
channel.confirmSelect();
-
添加channel监听,在确认 (ack) 单/多条后进行处理或确认 (nack) 失败(未确认)单/多条后进行处理
channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { //multiple=true 已确认多条 multiple=false 已确认单条 if(multiple){ System.out.println("handleAck--success-->multiple" +deliveryTag); //清除前deliveryTag 项标识Id confirmSet.headSet(deliveryTag+1L).clear(); }else { System.out.println("handleAck--success-->single" +deliveryTag); confirmSet.remove(deliveryTag); } } //未确认 @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { //multiple=true 未确认多条,multiple=false未确认单条 if(multiple){ System.out.println("handleNack--failed-->multiple" + deliveryTag); confirmSet.headSet(deliveryTag+1L).clear(); }else { System.out.println("handleNack--failed-->single" +deliveryTag); confirmSet.remove(deliveryTag); } } });
-
循环发送消息 演示消息确认
for(int i =0;i<5000;i++){ //创建消息 String message = "Hello World! async"; //获取unconfirm的消息序号deliveryTag Long seqNo = channel.getNextPublishSeqNo(); //发送消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); //将消息序号deliveryTag 添加至sortedSet confirmSet.add(seqNo); }
-
在finally中关闭通道
try{ //关闭通道 if(null != channel && channel.isOpen()){ channel.close(); } if (null != connection && connection.isOpen()){ connection.close(); } }catch (TimeoutException e){ e.printStackTrace(); }catch (IOException e){ e.printStackTrace(); }