RocketMq的事务消息发送方法,消息零丢失的实现方式,代码流程讲解,干货分享


1.消息发送mq不丢失实现方式
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @description RocketMQ事务消息发送
 */
public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
        //用来接收RocketMQ回调的监听接口
        //这里是我们自己定义的实现执行本地事务,commit.rollback,回调查询等逻辑
        TransactionListener transactionListener = new TransactionListenerImpl();
        //下面就是创建一个支持事务消息的Producer
        //对这个Producer指定一个生产者分组
        TransactionMQProducer producer = new TransactionMQProducer("test");

        //下面指定了一个线程池,里面包含一些线程
        //这个线程就是用来处理RocketMQ的回调函数
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
                1,//核心线程数
                2,//线程池最大线程数
                1000,//超时时间
                TimeUnit.SECONDS,//时间的单位
                new ArrayBlockingQueue(2000),//存放阻塞线程的列表
                new ThreadFactory() {//线程创建工厂
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        thread.setName("TestThread");
                        return thread;
                    }
                }
        );
        //给事务消息生产者设置对应的线程池,负责执行RocketMQ的回调请求
        producer.setExecutorService(poolExecutor);
        //给生产者设置对应的回调函数
        producer.setTransactionListener(transactionListener);
        //启动消息生产者
        producer.start();
        //虚拟一条成功的消息
        Message message = new Message(
                "successTopic"
                , "tag"
                , "key",
                ("成功的消息").getBytes(RemotingHelper.DEFAULT_CHARSET));
        //这里存放发送的half消息到内存或者磁盘文件中,后台开启一个线程,扫描这个文件,如果超过一定时间没有收到响应,就回滚业务
        //save(message)
        //将消息作为half消息的模式发送出去,如果发送失败,则会收到一个异常,我们捕获异常进行对应的异常处理即可
        try {
            TransactionSendResult sendResult = producer.sendMessageInTransaction(message, null);
            //这里可以更新或者删除存放在本地内存或者磁盘文件的消息记录
            //delete(message)
        }catch (Exception e){
            //half消息发送失败
            //本地系统执行业务回滚,更新数据库信息等操作
        }
    }
}

上面是发送RocketMq的事务消息发送方法

下面是RocketMq的事务消息发送方法的回调函数的实现类

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

/**
 * @description 事务监听实现类
 */
public class TransactionListenerImpl implements TransactionListener {

    //如果half消息发送成功了就会回调这个函数,就可以执行本地事务
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        //执行本地事务
        //根据本地一连串的事务执行结果,去选择commit或者rollback
        try{
            //如果本地事务都执行成功了,返回commit
            return LocalTransactionState.COMMIT_MESSAGE;
        }catch (Exception e){
            //本地事务都失败了,回滚所有的执行过的操作
            //返回rollback,标记half消息无效
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    //如果因为各种原因生产者没有返回commit或者rollback给Broker,
    //broker会定时扫描没有回应的half消息,然后回调这个函数
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        //查询本地事务,是否都执行成功
        Integer status = 0;
        //Integer status = localTrans.get(messageExt.getTransactionId());
        //根据本地事务情况选择执行commit或者rollback
        if (null != status){
            switch(status){
                case 0:return LocalTransactionState.UNKNOW;
                case 1:return LocalTransactionState.COMMIT_MESSAGE;
                case 2:return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

 以上就是RocketMq的事务消息发送方法,可以实现发送消息的零丢失,以事务的方式确保消息一定可以发送到RocketMQ

2.mq消息不丢失实现方式

  以上仅仅只能保证消息发送到mq成功,但是一定能保证消息不丢失吗?显然是不行的;

  假设1:消息发送到了mq,就一定进入到了磁盘文件了吗?rocketmq是先存入os cache中,也就是内存,如果这个时候机器宕机,内存上的数据也就全部丢失了,显然消息也会丢失

  解决方案:rocketmq默认是异步刷盘的模式,保证了数据的高吞吐量,但是有可能出现消息丢失的情况,可以改为同步刷盘策略,通过修改配置文件broker.config中flushDiskType参数为SYNC_FLUSH即可,这样,只要mq告诉我们half消息响应成功了,就代表成功写入了磁盘中了;

  假设2:消息写入磁盘就一定不会丢失吗?显然也不能;如果磁盘损坏,那么消息也会丢失

  解决方案:基于Dledger和Raft协议的rocketmq主从架构,只要消息写入master成功了,那么就一定会基于raft协议同步给其他的broker,就算master机器磁盘损坏,那么一定有broker存储了同样的消息,可以确保消息在mq上不会丢失;