RocketMq的事务消息发送方法,消息零丢失的实现方式,代码流程讲解,干货分享
1.消息发送mq不丢失实现方式
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.io.UnsupportedEncodingException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @description RocketMQ事务消息发送 */ public class TransactionProducer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { //用来接收RocketMQ回调的监听接口 //这里是我们自己定义的实现执行本地事务,commit.rollback,回调查询等逻辑 TransactionListener transactionListener = new TransactionListenerImpl(); //下面就是创建一个支持事务消息的Producer //对这个Producer指定一个生产者分组 TransactionMQProducer producer = new TransactionMQProducer("test"); //下面指定了一个线程池,里面包含一些线程 //这个线程就是用来处理RocketMQ的回调函数 ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor( 1,//核心线程数 2,//线程池最大线程数 1000,//超时时间 TimeUnit.SECONDS,//时间的单位 new ArrayBlockingQueue(2000),//存放阻塞线程的列表 new ThreadFactory() {//线程创建工厂 @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("TestThread"); return thread; } } ); //给事务消息生产者设置对应的线程池,负责执行RocketMQ的回调请求 producer.setExecutorService(poolExecutor); //给生产者设置对应的回调函数 producer.setTransactionListener(transactionListener); //启动消息生产者 producer.start(); //虚拟一条成功的消息 Message message = new Message( "successTopic" , "tag" , "key", ("成功的消息").getBytes(RemotingHelper.DEFAULT_CHARSET)); //这里存放发送的half消息到内存或者磁盘文件中,后台开启一个线程,扫描这个文件,如果超过一定时间没有收到响应,就回滚业务 //save(message) //将消息作为half消息的模式发送出去,如果发送失败,则会收到一个异常,我们捕获异常进行对应的异常处理即可 try { TransactionSendResult sendResult = producer.sendMessageInTransaction(message, null); //这里可以更新或者删除存放在本地内存或者磁盘文件的消息记录 //delete(message) }catch (Exception e){ //half消息发送失败 //本地系统执行业务回滚,更新数据库信息等操作 } } }
上面是发送RocketMq的事务消息发送方法
下面是RocketMq的事务消息发送方法的回调函数的实现类
import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; /** * @description 事务监听实现类 */ public class TransactionListenerImpl implements TransactionListener { //如果half消息发送成功了就会回调这个函数,就可以执行本地事务 @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { //执行本地事务 //根据本地一连串的事务执行结果,去选择commit或者rollback try{ //如果本地事务都执行成功了,返回commit return LocalTransactionState.COMMIT_MESSAGE; }catch (Exception e){ //本地事务都失败了,回滚所有的执行过的操作 //返回rollback,标记half消息无效 return LocalTransactionState.ROLLBACK_MESSAGE; } } //如果因为各种原因生产者没有返回commit或者rollback给Broker, //broker会定时扫描没有回应的half消息,然后回调这个函数 @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { //查询本地事务,是否都执行成功 Integer status = 0; //Integer status = localTrans.get(messageExt.getTransactionId()); //根据本地事务情况选择执行commit或者rollback if (null != status){ switch(status){ case 0:return LocalTransactionState.UNKNOW; case 1:return LocalTransactionState.COMMIT_MESSAGE; case 2:return LocalTransactionState.ROLLBACK_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; } }
以上就是RocketMq的事务消息发送方法,可以实现发送消息的零丢失,以事务的方式确保消息一定可以发送到RocketMQ
2.mq消息不丢失实现方式
以上仅仅只能保证消息发送到mq成功,但是一定能保证消息不丢失吗?显然是不行的;
假设1:消息发送到了mq,就一定进入到了磁盘文件了吗?rocketmq是先存入os cache中,也就是内存,如果这个时候机器宕机,内存上的数据也就全部丢失了,显然消息也会丢失
解决方案:rocketmq默认是异步刷盘的模式,保证了数据的高吞吐量,但是有可能出现消息丢失的情况,可以改为同步刷盘策略,通过修改配置文件broker.config中flushDiskType参数为SYNC_FLUSH即可,这样,只要mq告诉我们half消息响应成功了,就代表成功写入了磁盘中了;
假设2:消息写入磁盘就一定不会丢失吗?显然也不能;如果磁盘损坏,那么消息也会丢失
解决方案:基于Dledger和Raft协议的rocketmq主从架构,只要消息写入master成功了,那么就一定会基于raft协议同步给其他的broker,就算master机器磁盘损坏,那么一定有broker存储了同样的消息,可以确保消息在mq上不会丢失;