(WebFlux)003、多数据源R2dbc事务失效分析


一、背景

最近项目持续改造,然后把SpringMVC换成了SpringWebflux,然后把Mybatis换成了R2dbc。中间没有遇到什么问题,一切都那么的美滋滋,直到最近一个新需求的出现,打破了往日的宁静。

在对需求分析了一番后,需要引入新的数据源,那就是MongoDb。然后出现了MongoDb、Mysql两种数据源,然后原来好好的事物操作就芭比Q(完蛋)了。细细来分析一下原因与解决方法。

题外话:在本地测试的时候强烈建议用虚拟机+Docker来安装MySql与MongoDb,不然Mac直连docker真的麻烦啊!!~

SpringBoot 版本号: 2.6.10, (本文基于已经会在项目中使用R2DBC与MongoDb)

二、武松打虎

2.1 单独solo Mysql

我们创建了一个测试库r2dbc_test,里面有一个user表。

# 创建测试库
create database r2dbc_test;

# 创建表
create table r2dbc_test.user(
    id int auto_increment primary key ,
    name varchar(12)
);

2.1.1 项目引入R2dbc

略..给出链接,如果感兴趣可以看看,Spring Data R2DBC,(实在太多,这个时间点懒得写了,后面有时间再补一下)

2.1.2 测试代码

创建表对结构对应实体类:user

@Data
@Table("user")
@NoArgsConstructor
@AllArgsConstructor
public class User implements Persistable {
    @Id
    private Integer id;
    private String name;

    @Override
    public boolean isNew() {
        return true;
    }
}

这里面有个坑点,那就是为什么实现org.springframework.data.domain.Persistable这个接口呢,先卖个关子,看完Repository后在描述哈。

Repository如下代码所示。

/**
 * 
User Repository
* * @author fattyca1@qq.com * @since 2022/8/26 */ @Repository public interface UserR2dbcRepository extends R2dbcRepository { }

我们直接使用了Spring提供好的org.springframework.data.r2dbc.repository.R2dbcRepository,里面有一些基础的实现类。我们在测试的时候使用了org.springframework.data.repository.reactive.ReactiveCrudRepository#save()方法,这个方法会去判断这个实体对象是不是new object,如果不是,则会去Update。而判断的方法则是org.springframework.data.domain.Persistable#isNew()方法。所以这就是我们为啥要实现这个接口。

接着写一个简单测试的Controller,代码如下所示。

@RestController
@EnableR2dbcRepositories
public class TransactionController {
    @Autowired
    private UserR2dbcRepository repository;
    @Autowired
    private TransactionalOperator operator;

    // 根据seed当做初始ID,初始化数据库对象, 便于测试
    @RequestMapping("/r2dbc/init")
    public Flux init(Integer seed) {
        Flux userFlux = Flux.range(seed, 5).map(id -> new User(id,"name" + id))
                .flatMap(repository::save);
        return userFlux;
    }

    // 先删除一条记录, 然后在添加一条记录
    @RequestMapping("/r2dbc/delete")
    public Mono delete(Integer id1, Integer id2) {
        Mono id1Mono = repository.deleteById(id1);
        Mono id2Mono = repository.save(new User(id2, "name" + id2));
        return id1Mono.then(id2Mono).as(operator::transactional);
    }
}

不要纠结没有service啥的哈,我们仅仅为了测试哈。两个方法

  • 方法一:init, 用seed当做起始Id, 然后在数据库生成数据存储起来
  • 方法二:delete, 先删处一条数据,然后在插入一条已存在的数据,通过数据库异常来回滚数据。

我们调用init方法,生成数据id=1和id=100以后的数据,如下图所示。

为了查看我们是不是插入成功,我们查一下数据库看看。结果如下图。

数据看起来是没问题的哈,是我们想要的,从1-5, 100-105

2.1.3 测试事务

数据已经准备好了,我们来进行事务测试,看看现在只有R2DBC的时候,事务是否生效。

我们来删除id=1,然后保存id=100的情况试一下看看。结果如图所示。

通过日志,我们看到结果的确是我们想要的,当id2=100的时候,抛出了Dulicate entry异常, 那我们在查询一下数据库,看看数据库的数据是否有删除掉。

结果还是用图展示。

我们通过查看数据库的查询记录,发现id=1数据没有删除。那也说明了事务是生效的,在正常情况下,发生异常不会提交事务。

2.2 引入MongoDb

略...感兴趣的老哥参考Spring Data MongoDb引入MongoDB

2.2.1 开启MongoDb事务

官方文档中有这样一句话:

Unless you specify a MongoTransactionManager within your application context, transaction support is DISABLED. You can use setSessionSynchronization(ALWAYS) to participate in ongoing non-native MongoDB transactions.

需要手动指定MongoTransactionManager,否则不可用。 引入事务,参考文档,需要如下代码。

@Bean
MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) {  
  return new MongoTransactionManager(dbFactory);
}

我们按照文档指示,在项目中添加了如下代码。因为我们用的是Webflux,所以我们创建的是Reactive的。

@EnableReactiveMongoRepositories
@Configuration
public class MongoConfig {
    @Bean
    ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {
        return new ReactiveMongoTransactionManager(dbFactory);
    }
}

这样,我们MongoDB的事物也搞定了,直接美滋滋,上手开干CRUD。

2.2.2 再来一次----测试数据删除

我们引入了新的数据源,本该美滋滋的,但是,问题也来了。我们在来进行一次数据删除操作。这次删除,我们修改一下Id,删除id=2和添加id=102的。测试如下图所示。

我们再一次看到了同样的情况,抛出了异常Duplicate entry,是我们预期的结果。那我们接着看看数据库的数据。如下图所示。

这个时候我们在查询数据,发现id=2的数据已经被删除了。这次事务没有回滚! 真是F了个K,啥情况呢?我们得一探究竟。

三、智取谜底

我们带着问题来找原因,现在事务失效了,项目能起来,没有报错。那么最有的可能那就是TransactionalOperator失效了,TransactionalOperator是Spring帮我们初始化的,我们要找问题,那就得要看看这个TransactionalOperator是如何初始化的了

3.1 看源码找原因

3.1.1 从根本入手

我们直接从TransactionalOperator代码进入,发现其需要传入ReactiveTransactionManager,部分代码如下。

final class TransactionalOperatorImpl implements TransactionalOperator {

	private final ReactiveTransactionManager transactionManager;
	private final TransactionDefinition transactionDefinition;

	/**
	 * Construct a new TransactionTemplate using the given transaction manager,
	 * taking its default settings from the given transaction definition.
	 * @param transactionManager the transaction management strategy to be used
	 * @param transactionDefinition the transaction definition to copy the
	 * default settings from. Local properties can still be set to change values.
	 */
	TransactionalOperatorImpl(ReactiveTransactionManager transactionManager, TransactionDefinition transactionDefinition) {
		
		this.transactionManager = transactionManager;
		this.transactionDefinition = transactionDefinition;
	}
}

按照一般逻辑来说,事务是放在TransactionManager中来管理的,这个符合我们的预期,我们接着看看TransactionManager的实现类有哪些。经过查看,发现有R2dbcTransactionManager实现。如下图所示。

![TransactionManager实现类]](https://img2022.cnblogs.com/blog/1495071/202208/1495071-20220827211141944-495282422.jpg)

3.1.2 按照猜想继续

我们找到了R2dbcTransactionManager,那我们就有两个思路。

1、查看其实现方式,有哪些需要我们关注的,哪些因素是可能造成事务不生效。

2、启动方式。因为R2dbcTransactionManager初始化是交由SpringBoot实现,那会不会有什么特别之处。

3.1.2.1 思路1

我们打开R2dbcTransactionManager代码,发现其实现没有特别之处。部分代码如下。

public class R2dbcTransactionManager extends AbstractReactiveTransactionManager implements InitializingBean {

	@Nullable
	private ConnectionFactory connectionFactory;
	/**
	 * Create a new {@code R2dbcTransactionManager} instance.
	 * A ConnectionFactory has to be set to be able to use it.
	 * @see #setConnectionFactory
	 */
	public R2dbcTransactionManager() {}
	/**
	 * Create a new {@code R2dbcTransactionManager} instance.
	 * @param connectionFactory the R2DBC ConnectionFactory to manage transactions for
	 */
	public R2dbcTransactionManager(ConnectionFactory connectionFactory) {
		this();
		setConnectionFactory(connectionFactory);
		afterPropertiesSet();
	}
}

可以看到,无参初始化可以不需要ConnectionFactory,也可以传入ConnectionFactory进行初始化。 也没有什么特别之处。

3.1.2.2 思路2

我们看完其实现,并没有特别之处,那就看它初始化有什么特别的地方。Double Shift 来一波,我们看到了有AutoConfiguration,来让我们瞧一瞧。

我们点进去瞧一瞧,便发现了端倪,嘴上一句 原来如此 蹦了出来。部分代码如下。

public class R2dbcTransactionManagerAutoConfiguration {
	@Bean
	@ConditionalOnMissingBean(ReactiveTransactionManager.class)
	public R2dbcTransactionManager connectionFactoryTransactionManager(ConnectionFactory connectionFactory) {
		return new R2dbcTransactionManager(connectionFactory);
	}
}

我们看到,其初始化的时候,采用了ConditionalOnMissingBean,只有在没有ReactiveTransactionManager的时候才会初始化。但是我们在初始化MongoDB事务的时候,已经初始化过ReactiveTransactionManager了啊!赶紧看看ReactiveMongoTransactionManager。

打开ReactiveMongoTransactionManager代码,果然如此。代码如下。

public class ReactiveMongoTransactionManager extends AbstractReactiveTransactionManager implements InitializingBean {
	// ...略
}

AbstractReactiveTransactionManager这个不就是ReactiveTransactionManager嘛, 已经初始化过一次了,所以导致R2dbcTransactionManager无法进行初始化,所以TransactionalOperatorImpl里面传入的不是R2dbcTransactionManager,那肯定对mysql无法失误操作了啊。

3.1.3 怎么办?

至此,我们已经找到原因了,但是,这也紧紧是猜想。我们还是得分2步骤来啊!!

  • 1、针对问题,提出具体的解决方案,并实现
  • 2、针对实现的方案进行验证
3.1.3.1 解决方案

我们知道事务没有实现的原因是R2dbcTransactionManager没有初始化,然后再TransactionalOperatorImpl种注入的不是R2dbcTransactionManager,那么我们就自己动手初始化Bean。

我们创建2个对象,分别为MongoConfig和R2dbcConfig,代码如下所示。

R2dbcConfig:

/**
 * 
r2dbc 配置
* * @author fattyca1@qq.com * @since 2022/8/27 */ @EnableR2dbcRepositories @Configuration public class R2dbcConfig { @Bean("r2dbcTransactionManager") public R2dbcTransactionManager transactionManager(ConnectionFactory pool) { return new R2dbcTransactionManager(pool); } @Bean("r2dbcTransactionalOperator") public TransactionalOperator transactionalOperator(R2dbcTransactionManager transactionManager){ return TransactionalOperator.create(transactionManager); } }

MongoConfig:

/**
 * 
mongo transaction manager
* * @author fattyca1@qq.com * @since 2022/8/27 */ @EnableReactiveMongoRepositories @Configuration public class MongoConfig { @Bean("mongoTransactionManager") public ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) { return new ReactiveMongoTransactionManager(dbFactory); } @Bean("mongoTransactionalOperator") public TransactionalOperator transactionalOperator(ReactiveMongoTransactionManager transactionManager){ return TransactionalOperator.create(transactionManager); } }

我们通过别名的方式,创建两个TransactionalOperator,这样就可以解决R2bdc无法自动创建TransactionManager的问题。

3.1.3.2 验证

我们在Controller中的TransactionalOperator指定名称。代码如下所示。

@RestController
public class TransactionController {
    @Autowired
    private UserR2dbcRepository repository;
    
    @Autowired
    @Qualifier("r2dbcTransactionalOperator") // 在这指定使用哪个operator
    private TransactionalOperator operator;
		// ... 略
}

指定了具体的名称,我们就可以接着在来测试一次。这次我们删除Id=3,然后添加id=103的数据试试看。测试过程如下图。

还是和我们刚一下,出现了Duplicate entry的问题。我们要关注事物是否回滚。

接下来就是激动人心的时刻,我们直接查库,看看事务是否回滚了。结果如下图所示。

哇喔!棒!我们看到,数据库查询出来的结果中还是包含了Id=3的数据,那完全说明了事务回滚了!

至此我们的问题算是完全解决了,舒坦!(心里长舒一口气,解决问题就这么简单?)

3.2 偷鸡

看了这么多,我们都是手动,一步步验证结果的,哪有没有快捷的方式呢?说到这,那肯定是有的。

在使用R2dbc的时候,我们其实是没有添加日志的。我们可以打开日志。可以看到操作是记录了完整的日志。我们添加日志配置(log配置文件自己添加一下)。

logging.level.org.springframework.r2dbc=debug

3.2.1 再次验证

添加完日志,我们在执行一下删除id=3,添加id=104的操作,看看日志记录了什么。贴出来测试结果。

我们可以看到,日志中清晰的记录着,创建事务,回滚事务!完全验证了我们的操作方案是对的,NO爬不浪~!

上述的所有操作,都可以通过日志验证,我就不一步步验证,大家可以自己试验一下~

四、总结

在使用新东西的时候,还是要多实验,验证结果!

遇到问题,不要慌,一步步来,就是干!

如有问题,欢迎指正,交流。