(WebFlux)003、多数据源R2dbc事务失效分析
一、背景
最近项目持续改造,然后把SpringMVC换成了SpringWebflux,然后把Mybatis换成了R2dbc。中间没有遇到什么问题,一切都那么的美滋滋,直到最近一个新需求的出现,打破了往日的宁静。
在对需求分析了一番后,需要引入新的数据源,那就是MongoDb。然后出现了MongoDb、Mysql两种数据源,然后原来好好的事物操作就芭比Q(完蛋)了。细细来分析一下原因与解决方法。
题外话:在本地测试的时候强烈建议用虚拟机+Docker来安装MySql与MongoDb,不然Mac直连docker真的麻烦啊!!~
SpringBoot 版本号:
2.6.10 , (本文基于已经会在项目中使用R2DBC与MongoDb)
二、武松打虎
2.1 单独solo Mysql
我们创建了一个测试库r2dbc_test,里面有一个user表。
# 创建测试库
create database r2dbc_test;
# 创建表
create table r2dbc_test.user(
id int auto_increment primary key ,
name varchar(12)
);
2.1.1 项目引入R2dbc
略..给出链接,如果感兴趣可以看看,Spring Data R2DBC,(实在太多,这个时间点懒得写了,后面有时间再补一下),
2.1.2 测试代码
创建表对结构对应实体类:user
@Data
@Table("user")
@NoArgsConstructor
@AllArgsConstructor
public class User implements Persistable {
@Id
private Integer id;
private String name;
@Override
public boolean isNew() {
return true;
}
}
这里面有个坑点,那就是为什么实现org.springframework.data.domain.Persistable
这个接口呢,先卖个关子,看完Repository后在描述哈。
Repository如下代码所示。
/**
*
User Repository
*
* @author fattyca1@qq.com
* @since 2022/8/26
*/
@Repository
public interface UserR2dbcRepository extends R2dbcRepository {
}
我们直接使用了Spring提供好的org.springframework.data.r2dbc.repository.R2dbcRepository
,里面有一些基础的实现类。我们在测试的时候使用了org.springframework.data.repository.reactive.ReactiveCrudRepository#save()
方法,这个方法会去判断这个实体对象是不是new object,如果不是,则会去Update。而判断的方法则是org.springframework.data.domain.Persistable#isNew()
方法。所以这就是我们为啥要实现这个接口。
接着写一个简单测试的Controller,代码如下所示。
@RestController
@EnableR2dbcRepositories
public class TransactionController {
@Autowired
private UserR2dbcRepository repository;
@Autowired
private TransactionalOperator operator;
// 根据seed当做初始ID,初始化数据库对象, 便于测试
@RequestMapping("/r2dbc/init")
public Flux init(Integer seed) {
Flux userFlux = Flux.range(seed, 5).map(id -> new User(id,"name" + id))
.flatMap(repository::save);
return userFlux;
}
// 先删除一条记录, 然后在添加一条记录
@RequestMapping("/r2dbc/delete")
public Mono delete(Integer id1, Integer id2) {
Mono id1Mono = repository.deleteById(id1);
Mono id2Mono = repository.save(new User(id2, "name" + id2));
return id1Mono.then(id2Mono).as(operator::transactional);
}
}
不要纠结没有service啥的哈,我们仅仅为了测试哈。两个方法
- 方法一:init, 用seed当做起始Id, 然后在数据库生成数据存储起来
- 方法二:delete, 先删处一条数据,然后在插入一条已存在的数据,通过数据库异常来回滚数据。
我们调用init方法,生成数据id=1和id=100以后的数据,如下图所示。
为了查看我们是不是插入成功,我们查一下数据库看看。结果如下图。
数据看起来是没问题的哈,是我们想要的,从1-5, 100-105
2.1.3 测试事务
数据已经准备好了,我们来进行事务测试,看看现在只有R2DBC的时候,事务是否生效。
我们来删除id=1,然后保存id=100的情况试一下看看。结果如图所示。
通过日志,我们看到结果的确是我们想要的,当id2=100的时候,抛出了Dulicate entry异常, 那我们在查询一下数据库,看看数据库的数据是否有删除掉。
结果还是用图展示。
我们通过查看数据库的查询记录,发现id=1数据没有删除。那也说明了事务是生效的,在正常情况下,发生异常不会提交事务。
2.2 引入MongoDb
略...感兴趣的老哥参考Spring Data MongoDb引入MongoDB
2.2.1 开启MongoDb事务
官方文档中有这样一句话:
Unless you specify a
MongoTransactionManager
within your application context, transaction support is DISABLED. You can usesetSessionSynchronization(ALWAYS)
to participate in ongoing non-native MongoDB transactions.
需要手动指定MongoTransactionManager
,否则不可用。 引入事务,参考文档,需要如下代码。
@Bean
MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) {
return new MongoTransactionManager(dbFactory);
}
我们按照文档指示,在项目中添加了如下代码。因为我们用的是Webflux,所以我们创建的是Reactive的。
@EnableReactiveMongoRepositories
@Configuration
public class MongoConfig {
@Bean
ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {
return new ReactiveMongoTransactionManager(dbFactory);
}
}
这样,我们MongoDB的事物也搞定了,直接美滋滋,上手开干CRUD。
2.2.2 再来一次----测试数据删除
我们引入了新的数据源,本该美滋滋的,但是,问题也来了。我们在来进行一次数据删除操作。这次删除,我们修改一下Id,删除id=2和添加id=102的。测试如下图所示。
我们再一次看到了同样的情况,抛出了异常Duplicate entry,是我们预期的结果。那我们接着看看数据库的数据。如下图所示。
这个时候我们在查询数据,发现id=2的数据已经被删除了。这次事务没有回滚! 真是F了个K,啥情况呢?我们得一探究竟。
三、智取谜底
我们带着问题来找原因,现在事务失效了,项目能起来,没有报错。那么最有的可能那就是TransactionalOperator
失效了,TransactionalOperator
是Spring帮我们初始化的,我们要找问题,那就得要看看这个TransactionalOperator
是如何初始化的了
3.1 看源码找原因
3.1.1 从根本入手
我们直接从TransactionalOperator
代码进入,发现其需要传入ReactiveTransactionManager
,部分代码如下。
final class TransactionalOperatorImpl implements TransactionalOperator {
private final ReactiveTransactionManager transactionManager;
private final TransactionDefinition transactionDefinition;
/**
* Construct a new TransactionTemplate using the given transaction manager,
* taking its default settings from the given transaction definition.
* @param transactionManager the transaction management strategy to be used
* @param transactionDefinition the transaction definition to copy the
* default settings from. Local properties can still be set to change values.
*/
TransactionalOperatorImpl(ReactiveTransactionManager transactionManager, TransactionDefinition transactionDefinition) {
this.transactionManager = transactionManager;
this.transactionDefinition = transactionDefinition;
}
}
按照一般逻辑来说,事务是放在TransactionManager中来管理的,这个符合我们的预期,我们接着看看TransactionManager的实现类有哪些。经过查看,发现有R2dbcTransactionManager实现。如下图所示。
![TransactionManager实现类]](https://img2022.cnblogs.com/blog/1495071/202208/1495071-20220827211141944-495282422.jpg)
3.1.2 按照猜想继续
我们找到了R2dbcTransactionManager,那我们就有两个思路。
1、查看其实现方式,有哪些需要我们关注的,哪些因素是可能造成事务不生效。
2、启动方式。因为R2dbcTransactionManager初始化是交由SpringBoot实现,那会不会有什么特别之处。
3.1.2.1 思路1
我们打开R2dbcTransactionManager代码,发现其实现没有特别之处。部分代码如下。
public class R2dbcTransactionManager extends AbstractReactiveTransactionManager implements InitializingBean {
@Nullable
private ConnectionFactory connectionFactory;
/**
* Create a new {@code R2dbcTransactionManager} instance.
* A ConnectionFactory has to be set to be able to use it.
* @see #setConnectionFactory
*/
public R2dbcTransactionManager() {}
/**
* Create a new {@code R2dbcTransactionManager} instance.
* @param connectionFactory the R2DBC ConnectionFactory to manage transactions for
*/
public R2dbcTransactionManager(ConnectionFactory connectionFactory) {
this();
setConnectionFactory(connectionFactory);
afterPropertiesSet();
}
}
可以看到,无参初始化可以不需要ConnectionFactory,也可以传入ConnectionFactory进行初始化。 也没有什么特别之处。
3.1.2.2 思路2
我们看完其实现,并没有特别之处,那就看它初始化有什么特别的地方。Double Shift 来一波,我们看到了有AutoConfiguration,来让我们瞧一瞧。
我们点进去瞧一瞧,便发现了端倪,嘴上一句 原来如此 蹦了出来。部分代码如下。
public class R2dbcTransactionManagerAutoConfiguration {
@Bean
@ConditionalOnMissingBean(ReactiveTransactionManager.class)
public R2dbcTransactionManager connectionFactoryTransactionManager(ConnectionFactory connectionFactory) {
return new R2dbcTransactionManager(connectionFactory);
}
}
我们看到,其初始化的时候,采用了ConditionalOnMissingBean,只有在没有ReactiveTransactionManager的时候才会初始化。但是我们在初始化MongoDB事务的时候,已经初始化过ReactiveTransactionManager了啊!赶紧看看ReactiveMongoTransactionManager。
打开ReactiveMongoTransactionManager代码,果然如此。代码如下。
public class ReactiveMongoTransactionManager extends AbstractReactiveTransactionManager implements InitializingBean {
// ...略
}
AbstractReactiveTransactionManager这个不就是ReactiveTransactionManager嘛, 已经初始化过一次了,所以导致R2dbcTransactionManager无法进行初始化,所以TransactionalOperatorImpl里面传入的不是R2dbcTransactionManager,那肯定对mysql无法失误操作了啊。
3.1.3 怎么办?
至此,我们已经找到原因了,但是,这也紧紧是猜想。我们还是得分2步骤来啊!!
- 1、针对问题,提出具体的解决方案,并实现
- 2、针对实现的方案进行验证
3.1.3.1 解决方案
我们知道事务没有实现的原因是R2dbcTransactionManager没有初始化,然后再TransactionalOperatorImpl种注入的不是R2dbcTransactionManager,那么我们就自己动手初始化Bean。
我们创建2个对象,分别为MongoConfig和R2dbcConfig,代码如下所示。
R2dbcConfig:
/**
*
r2dbc 配置
*
* @author fattyca1@qq.com
* @since 2022/8/27
*/
@EnableR2dbcRepositories
@Configuration
public class R2dbcConfig {
@Bean("r2dbcTransactionManager")
public R2dbcTransactionManager transactionManager(ConnectionFactory pool) {
return new R2dbcTransactionManager(pool);
}
@Bean("r2dbcTransactionalOperator")
public TransactionalOperator transactionalOperator(R2dbcTransactionManager transactionManager){
return TransactionalOperator.create(transactionManager);
}
}
MongoConfig:
/**
*
mongo transaction manager
*
* @author fattyca1@qq.com
* @since 2022/8/27
*/
@EnableReactiveMongoRepositories
@Configuration
public class MongoConfig {
@Bean("mongoTransactionManager")
public ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {
return new ReactiveMongoTransactionManager(dbFactory);
}
@Bean("mongoTransactionalOperator")
public TransactionalOperator transactionalOperator(ReactiveMongoTransactionManager transactionManager){
return TransactionalOperator.create(transactionManager);
}
}
我们通过别名的方式,创建两个TransactionalOperator,这样就可以解决R2bdc无法自动创建TransactionManager的问题。
3.1.3.2 验证
我们在Controller中的TransactionalOperator指定名称。代码如下所示。
@RestController
public class TransactionController {
@Autowired
private UserR2dbcRepository repository;
@Autowired
@Qualifier("r2dbcTransactionalOperator") // 在这指定使用哪个operator
private TransactionalOperator operator;
// ... 略
}
指定了具体的名称,我们就可以接着在来测试一次。这次我们删除Id=3,然后添加id=103的数据试试看。测试过程如下图。
还是和我们刚一下,出现了Duplicate entry的问题。我们要关注事物是否回滚。
接下来就是激动人心的时刻,我们直接查库,看看事务是否回滚了。结果如下图所示。
哇喔!棒!我们看到,数据库查询出来的结果中还是包含了Id=3的数据,那完全说明了事务回滚了!
至此我们的问题算是完全解决了,舒坦!(心里长舒一口气,解决问题就这么简单?)
3.2 偷鸡
看了这么多,我们都是手动,一步步验证结果的,哪有没有快捷的方式呢?说到这,那肯定是有的。
在使用R2dbc的时候,我们其实是没有添加日志的。我们可以打开日志。可以看到操作是记录了完整的日志。我们添加日志配置(log配置文件自己添加一下)。
logging.level.org.springframework.r2dbc=debug
3.2.1 再次验证
添加完日志,我们在执行一下删除id=3,添加id=104的操作,看看日志记录了什么。贴出来测试结果。
我们可以看到,日志中清晰的记录着,创建事务,回滚事务!完全验证了我们的操作方案是对的,NO爬不浪~!
上述的所有操作,都可以通过日志验证,我就不一步步验证,大家可以自己试验一下~
四、总结
在使用新东西的时候,还是要多实验,验证结果!
遇到问题,不要慌,一步步来,就是干!
如有问题,欢迎指正,交流。