Postgres 的事务和锁
- 事务命令
- 事务隔离级别
- READ COMMITTED 读已提交 (默认)
- REPEATABLE READ 可重复读
- SERIALIZABLE 串行化
- @Transactional 注解
- 事务内不要做其他事,最好单独一个类处理
- 锁表 (lock 命令)
- 锁行 (排它锁 select ... for update 和共享锁 select ... for share)
- 页级锁
- 死锁
- @Lock 注解
- 悲观锁和乐观锁比较
- JAP 如何不靠注解使用事务和锁
事务命令
postgres=#
postgres=# \h begin
Command: BEGIN
Description: start a transaction block
Syntax:
BEGIN [ WORK | TRANSACTION ] [ transaction_mode [, ...] ]
where transaction_mode is one of:
ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
READ WRITE | READ ONLY
[ NOT ] DEFERRABLE
postgres=#
postgres=# \h end
Command: END
Description: commit the current transaction
Syntax:
END [ WORK | TRANSACTION ]
postgres=#
postgres=# \h commit
Command: COMMIT
Description: commit the current transaction
Syntax:
COMMIT [ WORK | TRANSACTION ]
begin 和 end (或者 commit) 之间的所有 SQL 组成一个事务,数据库保证同一个事务的所有操作或者都成功,或者都回滚,同时隔离不同的事务防止其互相干扰
最典型的例子就是转账,必须保证转出和转入都成功,如果转出成功但转入失败,应该能回滚
事务隔离级别
事务之间有以下几种不同的隔离级别,由低(宽松)到高(严格)分别是
- READ UNCOMMITTED : 读未提交,可以读到其他会话未提交的数据,等于没隔离,PG 不支持,但可以设置,只是会被当成 READ COMMITTED
- READ COMMITTED : 读已提交(默认),只能读到其他会话已提交的数据,有写锁避免同时修改同一个数据,修改同一数据需要等待直到先做修改的事务提交
- REPEATABLE READ : 可重复读,事务开始后,不会读到其他会话提交的数据,有写锁避免同时修改同一个数据,并且如果修改同一数据会报错
- SERIALIZABLE : 串行化,最严格,哪怕没修改同一个数据,同样可能会有冲突
可以 begin 的同时设置隔离级别
postgres=# begin transaction isolation level repeatable read;
BEGIN
也可以 begin 后再设置
postgres=# set transaction isolation level repeatable read;
SET
查看隔离级别
postgres=# show transaction_isolation;
transaction_isolation
-----------------------
repeatable read
(1 row)
不设置默认就是 READ COMMITTED
READ COMMITTED 读已提交 (默认)
在会话 1 开启事务并查看
postgres=#
postgres=# begin;
BEGIN
postgres=#
postgres=# select * from test;
id | name
----+--------
1 | name_A
3 | name_C
4 | name_d
2 | name_b
(4 rows)
postgres=#
postgres=# select * from test where id = 2;
id | name
----+--------
2 | name_b
(1 row)
在会话 2 开启事务并更新
postgres=# begin;
BEGIN
postgres=#
postgres=# update test set name = 'name_bb' where id = 2;
UPDATE 1
继续在会话 1 查看,可以看到查询结果没有改变
postgres=# select * from test where id = 2;
id | name
----+--------
2 | name_b
(1 row)
postgres=#
postgres=# select * from test;
id | name
----+--------
1 | name_A
3 | name_C
4 | name_d
2 | name_b
(4 rows)
在会话 2 提交事务
postgres=# end;
COMMIT
继续在会话 1 查看,可以看到查询结果变了
postgres=# select * from test where id = 2;
id | name
----+---------
2 | name_bb
(1 row)
postgres=#
postgres=# select * from test;
id | name
----+---------
1 | name_A
3 | name_C
4 | name_d
2 | name_bb
(4 rows)
在会话 2 再启动事务,改数据,但不提交
postgres=# begin;
BEGIN
postgres=#
postgres=#
postgres=# update test set name = 'name_a22' where id = 1;
UPDATE 1
postgres=#
postgres=#
在会话 1 修改同一条记录,可以看到,无法执行,在等待
postgres=# update test set name = 'name_a11' where id = 1;
在会话 2 提交事务
postgres=# end;
COMMIT
这时会话 1 的修改才被执行
postgres=# update test set name = 'name_a11' where id = 1;
UPDATE 1
所以这种模式就是,不能查看其他事务未提交的数据,可以查看其他事务已提交的数据,并且有行写锁,不允许同时修改同一行数据,除非另一个事务提交了
REPEATABLE READ 可重复读
在会话 1 开启事务并查看
postgres=# begin transaction isolation level repeatable read;
BEGIN
postgres=#
postgres=# select * from test where id = 2;
id | name
----+--------
2 | name_b
(1 row)
postgres=#
postgres=# select * from test;
id | name
----+--------
1 | name_A
3 | name_C
4 | name_d
2 | name_b
(4 rows)
在会话 2 开启事务并更新,然后提交事务
postgres=# begin;
BEGIN
postgres=#
postgres=# update test set name = 'name_bb' where id = 2;
UPDATE 1
postgres=#
postgres=# end;
COMMIT
继续在会话 1 查看,可以看到查询结果没有改变,哪怕事务 2 已经提交了,这样保证事务 1 的查询结果的一致性
postgres=# select * from test where id = 2;
id | name
----+--------
2 | name_b
(1 row)
postgres=#
postgres=# select * from test;
id | name
----+--------
1 | name_A
3 | name_C
4 | name_d
2 | name_b
(4 rows)
在会话 2 再启动事务,添加新数据,并提交事务
postgres=# begin;
BEGIN
postgres=#
postgres=# insert into test values(5, 'name_e');
INSERT 0 1
postgres=#
postgres=# end;
COMMIT
在会话 1 继续查看,可以看到数据还是没有改变,哪怕事务 2 添加新数据并提交
postgres=# select * from test where id = 2;
id | name
----+--------
2 | name_b
(1 row)
postgres=#
postgres=# select * from test;
id | name
----+--------
1 | name_A
3 | name_C
4 | name_d
2 | name_b
(4 rows)
在会话 1 提交事务再查看,可以看到事务 2 修改的数据和添加的数据,都可以看到了
postgres=# end;
COMMIT
postgres=#
postgres=# select * from test;
id | name
----+---------
1 | name_A
3 | name_C
4 | name_d
2 | name_bb
5 | name_e
(5 rows)
在会话 1 再启动事务,这次不做 select all 只先查看一条数据
postgres=# begin transaction isolation level repeatable read;
BEGIN
postgres=#
postgres=# select * from test where id = 2;
id | name
----+---------
2 | name_bb
(1 row)
在会话 2 再启动事务,修改添加数据,并提交事务
postgres=# begin;
BEGIN
postgres=#
postgres=# update test set name = 'name_b' where id = 2;
UPDATE 1
postgres=#
postgres=# update test set name = 'name_cc' where id = 3;
UPDATE 1
postgres=#
postgres=# insert into test values(6, 'name_f');
INSERT 0 1
postgres=#
postgres=# end;
COMMIT
在会话 1 继续查看,可以看到数据还是没有改变,哪怕事务 2 修改添加的数据,事务 1 之前并没有命中
postgres=# select * from test where id = 2;
id | name
----+---------
2 | name_bb
(1 row)
postgres=#
postgres=# select * from test where id = 3;
id | name
----+--------
3 | name_C
(1 row)
postgres=#
postgres=# select * from test;
id | name
----+---------
1 | name_A
3 | name_C
4 | name_d
2 | name_bb
5 | name_e
(5 rows)
在会话 1 提交事务,再查看数据,可以看到事务 2 的修改了
postgres=# end;
COMMIT
postgres=#
postgres=# select * from test;
id | name
----+---------
1 | name_A
4 | name_d
5 | name_e
2 | name_b
3 | name_cc
6 | name_f
(6 rows)
这种模式下同样会有行写锁,如果修改同一行数据,需要等另一个事务先完成,但和 READ COMMITTED 不同的是,等另一个事务完成后,当前事务会失败,报如下错误
postgres=# update test set name = 'name_A22' where id = 1;
ERROR: could not serialize access due to concurrent update
使用 select ... for update 同样会报这个错误
出现这个错误就需要退出事务,然后从头开始重新执行事务
可以看到这个模式比 READ COMMITTED 更严格,它保证事务开始后,对数据的读写,完全不受其他事务的影响
如果两个事务修改同一行数据,不仅会等待还会报错,需要重新执行事务,或需要通过应用程序的锁实现两个事务的互斥
SERIALIZABLE 串行化
和 Repeatable Read 几乎一样的,只是更加严格地,保证两个事务不冲突,保证串行化
在会话 1 启动事务,执行下面命令
postgres=# begin transaction isolation level serializable;
BEGIN
postgres=#
postgres=# select count(*) from test where id = 3;
count
-------
1
(1 row)
postgres=#
postgres=# insert into test values(1, 'name_a1');
INSERT 0 1
postgres=#
在会话 2 启动事务,执行下面命令,并提交
postgres=# begin transaction isolation level serializable;
BEGIN
postgres=#
postgres=# select count(*) from test where id = 1;
count
-------
1
(1 row)
postgres=#
postgres=# insert into test values(3, 'name_c2');
INSERT 0 1
postgres=#
postgres=# end;
COMMIT
postgres=#
在会话 1 提交事务,发现报错了
postgres=# end;
ERROR: could not serialize access due to read/write dependencies among transactions
DETAIL: Reason code: Canceled on identification as a pivot, during commit attempt.
HINT: The transaction might succeed if retried.
postgres=#
如果在 REPEATABLE READ 模式是不会报错的,因为两个事务没有修改相同数据的冲突
但实际上,这两个事务互相依赖,即事务 1 的 insert 命令影响事务 2 的 count 命令,而事务 2 的 insert 命令影响事务 1 的 count 命令,如果都允许提交,会导致数据不一致,所以后面提交的事务就报错了
如果是 A 影响 B,而 B 影响 C,且 C 又影响 A 的循环,同样会有这样的问题
@Transactional 注解
@Transactional 注解可以用在类或函数上,使得进入函数的时候会启动事务,离开函数的时候会提交事务
这个注解可以是 javax.transaction.Transactional
或是 org.springframework.transaction.annotation.Transactional (用于 springboot)
后者功能更强,并有 propagation 决定如果已有事务要如何处理(默认是使用已有事务) 和 isolation 决定隔离级别
前者有 value 决定如果已有事务要如何处理(默认是使用已有事务) 但没有决定隔离级别的属性
注意这个注解在以下场景有可能会失效
- 注解的函数不是 public 的
- 被继承的基类的注解不会起作用
- propagation 配置导致如果已有事务会报错,或是直接使用当前事务,而不会启动新事务
- 同一个类中,方法 A 调用方法 B,但 A 没注解而 B 有注解,这是因为 AOP 只对被当前类以外的代码调用的函数起作用
- 如果用 try...catch 捕获异常但没抛出,同样导致事务不起作用,必须让事务自己处理异常并进行回滚操作
可以定义什么异常需要 rollback 什么异常不需要 rollback
事务内不要做其他事,最好单独一个类处理
如果事务内做的事比较多,比如直接把注解加在 controller,可能会导致一些问题
- 事务可能太大,阻塞其他操作的时间可能比较久
- 事务内混合了其他业务操作,比如事务内发了个请求给其他服务修改数据,可能会导致这个事务被回滚的时候其他服务修改的数据没被回滚,出现数据不一致
所以比较理想的做法,是有一个单独的处理数据库操作的类,这个类不做其他业务,并且只在需要的时候使用事务
锁表 (lock 命令)
锁表只能在事务中
postgres=# \h lock
Command: LOCK
Description: lock a table
Syntax:
LOCK [ TABLE ] [ ONLY ] name [ * ] [, ...] [ IN lockmode MODE ] [ NOWAIT ]
where lockmode is one of:
ACCESS SHARE | ROW SHARE | ROW EXCLUSIVE | SHARE UPDATE EXCLUSIVE
| SHARE | SHARE ROW EXCLUSIVE | EXCLUSIVE | ACCESS EXCLUSIVE
- ONLY 表示只锁当前表,否则当前表及其后代表都会被锁住
- name 是表的名字
- lockmode 指定哪些锁会与当前锁冲突,默认是 ACCESS EXCLUSIVE 即所有都冲突
- NOWAIT 表示如果有另一个锁锁住了这个表,那 lock 命令是等待,还是直接报错返回
锁住表后,其他会话对这个表的所有操作,哪怕最简单的 select 命令都会阻塞,直到拥有锁的事务结束,锁被释放
锁行 (排它锁 select ... for update 和共享锁 select ... for share)
锁行不在事务内不会报错,但不会起作用,所以还是要在事务内执行
SELECT ... FOR { UPDATE | SHARE } [ OF table_name [, ...] ] [ NOWAIT | SKIP LOCKED ] [...]
- select for update 表示排他锁,或者叫写锁,锁住命中的行,不允许其他会话执行 select for update 或 select for share,但 select 可以
- select for share 表示共享锁,或者叫读锁,锁住命中的行,不允许其他会话执行 select for update 但可以执行 select for share 或 select
- of table_name 指定要锁住的表 (select 语句可能涉及多个表)
- nowait 表示如果无法获取锁,要等待,还是直接报错
- skip locked 表示要不要跳过无法获取锁的行并立刻返回 (select 的结果可能部分被锁,部分没被锁,默认有被锁的就等待或报错)
依靠事务的隔离级别,需要真正修改数据的时候才锁,或者要等到提交事务时才知道有冲突,甚至会报错
使用 select for update/share 方便自己控制,而且能处理一些依靠事务不好处理的场景
页级锁
https://www.postgresql.org/docs/12/explicit-locking.html#LOCKING-PAGES
不了解,知道有这个就好,基本不会用到
死锁
如果线程 A 先锁数据 1 再锁数据 2,而线程 B 先锁数据 2 再锁数据 1,并且没有超时机制
这时如果两个线程同时执行并且互相等待对方释放锁,就造成了死锁
减少死锁方法
- 按相同顺序锁住数据
- 超时机制
- 事务尽可能简单,减少锁住的时间
- 尽量使用较低隔离级别
遵循这种设计一般就不会有死锁
@Lock 注解
@Lock 注解可以用在函数上,
注意 javax.ejb.Lock 不是数据库的,而是给函数加读锁或者写锁的
数据库的是 org.springframework.data.jpa.repository.Lock
@Lock(value = LockModeType.PESSIMISTIC_READ)
@Query(value = "select t from User t where t.name = :name")
User findByUserName(@Param("name") String name);
锁模式
- PESSIMISTIC_READ:悲观读锁,或共享锁,就是 select ... for share nowait
- PESSIMISTIC_WRITE:悲观写锁,或排他锁,就是 select ... for update nowait
- READ:乐观读锁,实际上没有锁,要求表有 version 字段,通过检查 version 字段在操作前后的一致性来保证不冲突
- WRITE:乐观写锁,在 READ 的基础上,操作结束后不仅会检查 version 字段,还会对 version + 1
- OPTIMISTIC:和 READ 一样
- OPTIMISTIC_FORCE_INCREMENT:和 WRITE 一样
- PESSIMISTIC_FORCE_INCREMENT:在 PESSIMISTIC_WRITE 基础上操作后对 version + 1
READ 是操作前取 version (或者操作的第一步一起取了),操作后执行
select version from [table] where [key] =?
对 version 复查
WRITE 操作后执行的是
update [table] set version=[操作前的值+1] where [key]=? and version=[操作前的值]
不仅对 version 复查,还加 1
悲观锁和乐观锁比较
悲观锁,是先取锁再操作,会减少并发能力,影响性能,优点是有真正的排他性,适合要求严格、冲突概率较大、并发要求不高的场景
乐观锁,先操作,提交时再检查 version 字段,不依赖数据库机制,并发能力强,性能好,适合读多写少、大概率不冲突、要求高并发的场景,缺点是没有真正的排他性,存在数据不一致的可能
JAP 如何不靠注解使用事务和锁
// 如果是用 @Autowired 或 @Inject 等方式注入 manager 的话,可能会报错
// Not allowed to create transaction on shared EntityManager - use Spring transactions or EJB CMT
EntityManager entityManager = entityManagerFactory.createEntityManager();
entityManager.getTransaction().begin();
String sql = "select u from User as u where name = " + name;
Query query = entityManager.createQuery(sql);
query.setLockMode(LockModeType.PESSIMISTIC_WRITE);
User user = (User) query.getResultList().get(0);
System.out.println(user);
entityManager.getTransaction().commit();
使用代码而不是注解可能灵活点但不够方便