seata基本使用


seata基本使用

官方文档:https://seata.io/zh-cn/docs/overview/what-is-seata.html

seata是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务

一ID+三组件模型

Transaction ID XID:全局唯一的事务ID

Transaction Coordinator(TC):事务协调者,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚

Transaction Manager(TM):事务管理器,定义全局事务的范围:开始全局事务、提交或回滚全局事务

Resource Manager(RM):资源管理器,控制分支事务,负责分支注册、状态汇报、并接收事务协调器的指令,驱动分支事务的提交和回滚

seata分布式事务处理过程

  • TM向TC申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的XID
  • XID在微服务调用链路的上下文中传播
  • RM向TC注册分支事务,将其纳入XID对应全局事务的管辖
  • TM向TC发起针对XIK的全局提交或回滚决议
  • TC调度XID下管辖的全部分支事务完成提交或回滚请求

windows安装启动seata服务端

下载seata,地址:https://github.com/seata/seata/releases

下载后解压,不同版本的seata解压后的内容不一样,但配置都没有太大的区别,笔者下载的是1.4.1版本

修改conf目录下的file.conf配置文件

1、设置存储模式为db

mode = "db"

2、设置db配置的用户名和密码(自己本地mysql的用户名和密码)

user = "root"
password = "123456"

注意:如果mysql版本是8.0及以上,driverClassName和url需要如下配置:

driverClassName = "com.mysql.cj.jdbc.Driver"
url = "jdbc:mysql://127.0.0.1:3306/seata?serverTimezone=UTC"

3、新建seata数据库并执行对应sql脚本,conf目录下的README-zh.md文件中server链接了server端的配置和sql,,为了方便,脚本内容贴在下面:

-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
    `xid`                       VARCHAR(128) NOT NULL,
    `transaction_id`            BIGINT,
    `status`                    TINYINT      NOT NULL,
    `application_id`            VARCHAR(32),
    `transaction_service_group` VARCHAR(32),
    `transaction_name`          VARCHAR(128),
    `timeout`                   INT,
    `begin_time`                BIGINT,
    `application_data`          VARCHAR(2000),
    `gmt_create`                DATETIME,
    `gmt_modified`              DATETIME,
    PRIMARY KEY (`xid`),
    KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),
    KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
    `branch_id`         BIGINT       NOT NULL,
    `xid`               VARCHAR(128) NOT NULL,
    `transaction_id`    BIGINT,
    `resource_group_id` VARCHAR(32),
    `resource_id`       VARCHAR(256),
    `branch_type`       VARCHAR(8),
    `status`            TINYINT,
    `client_id`         VARCHAR(64),
    `application_data`  VARCHAR(2000),
    `gmt_create`        DATETIME(6),
    `gmt_modified`      DATETIME(6),
    PRIMARY KEY (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
    `row_key`        VARCHAR(128) NOT NULL,
    `xid`            VARCHAR(128),
    `transaction_id` BIGINT,
    `branch_id`      BIGINT       NOT NULL,
    `resource_id`    VARCHAR(256),
    `table_name`     VARCHAR(32),
    `pk`             VARCHAR(36),
    `status`         TINYINT      NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',
    `gmt_create`     DATETIME,
    `gmt_modified`   DATETIME,
    PRIMARY KEY (`row_key`),
    KEY `idx_status` (`status`),
    KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

CREATE TABLE IF NOT EXISTS `distributed_lock`
(
    `lock_key`       CHAR(20) NOT NULL,
    `lock_value`     VARCHAR(20) NOT NULL,
    `expire`         BIGINT,
    primary key (`lock_key`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('HandleAllSession', ' ', 0);

修改conf目录下的registry.conf配置文件

4、设置注册类型为nacos

type = "nacos"

5、设置服务注册到nacos(namespace不指定默认为public)

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"
  loadBalance = "RandomLoadBalance"
  loadBalanceVirtualNodes = 10

  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = "426851b4-b7d3-4977-a440-bf6efa561760"
    cluster = "default"
    username = "nacos"
    password = "nacos"
  }
}

至此,配置完成,启动nacos,然后双击bin目录下的seata-server.bat启动seata,可以在nacos看到seata服务成功注册进去

registry.conf默认将配置存储在file.conf,也可以设置存储在nacos

实战案例

业务背景

创建三个服务,一个订单服务,一个库存服务,一个账户服务

当用户下单时,会在订单服务中创建一个订单,然后通过远程调用库存服务来扣减下单商品的库存,再通过远程调用账户服务来扣减用户账户里面的余额,最后在订单服务中修改订单状态为已完成

该操作跨越三个数据库,有两次远程调用,很明显会有分布式事务问题

数据库

准备三个服务对应数据库和表,sql脚本如下:

CREATE DATABASE seata_order;
USE seata_order;
CREATE TABLE t_order(
    id BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY ,
    user_id BIGINT(11) DEFAULT NULL COMMENT '用户id',
    product_id BIGINT(11) DEFAULT NULL COMMENT '产品id',
    count INT(11) DEFAULT NULL COMMENT '数量',
    money DECIMAL(11,0) DEFAULT NULL COMMENT '金额',
    status INT(1) DEFAULT NULL COMMENT '订单状态:0创建中,1已完结'
)ENGINE=InnoDB AUTO_INCREMENT=7 CHARSET=utf8;

CREATE DATABASE seata_storage;
USE seata_storage;
CREATE TABLE t_storage(
    id BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY ,
    product_id BIGINT(11) DEFAULT NULL COMMENT '产品id',
    total INT(11) DEFAULT NULL COMMENT '总库存',
    used INT(11) DEFAULT NULL COMMENT '已用库存',
    residue INT(11) DEFAULT NULL COMMENT '剩余库存'
)ENGINE=InnoDB AUTO_INCREMENT=7 CHARSET=utf8;
INSERT INTO t_storage(id, product_id, total, used, residue) VALUES(1,1,100,0,100);

CREATE DATABASE seata_account;
USE seata_account;
CREATE TABLE t_account(
    id BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY ,
    user_id BIGINT(11) DEFAULT NULL COMMENT '用户id',
    total DECIMAL(10,0) DEFAULT NULL COMMENT '总额度',
    used DECIMAL(10,0) DEFAULT NULL COMMENT '已用额度',
    residue DECIMAL(10,0) DEFAULT 0 COMMENT '剩余可用额度'
)ENGINE=InnoDB AUTO_INCREMENT=7 CHARSET=utf8;
INSERT INTO t_account(id, user_id, total, used, residue) VALUES(1,1,1000,0,1000);

每个数据库需要分别创建一张seata回滚日志表,conf目录下的README-zh.md文件中client 链接了该sql脚本(和旧版seata有所不同),为了方便,脚本内容贴在下面:

-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
    `branch_id`     BIGINT       NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(128) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';

新建三个项目,分别对应订单、库存、账户,项目除了业务不同配置基本一致,由于业务代码量太多(基本就是增删改查比较简单)所以下面只讲解订单项目相关配置:

导入依赖

spring-cloud-starter-alibaba-seata中包含了seata-all依赖,由于spring-cloud-starter-alibaba-seata版本可能跟本地安装的seata版本不一致,所以需要排除spring-cloud-starter-alibaba-seata中的seata依赖,重新导入seata依赖保持版本和本地安装的seata版本一致


    com.alibaba.cloud
    spring-cloud-starter-alibaba-seata
    
        
            seata-all
            io.seata
        
    


    io.seata
    seata-all
    1.4.1

seata客户端配置

在项目resource文件夹下新建registry.conf配置文件,内容如下:

# 注册管理
registry {
  # 注册类型:file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"

  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = "426851b4-b7d3-4977-a440-bf6efa561760"
    cluster = "default"
    username = "nacos"
    password = "nacos"
  }
}

# 配置管理
config {
  # 配置信息存储类型:file、nacos 、apollo、zk、consul、etcd3
  type = "nacos"

  nacos {
      serverAddr = "127.0.0.1:8848"
      namespace = ""
      group = "SEATA_GROUP"
      username = "nacos"
      password = "nacos"
    }
}

conf目录下的README-zh.md文件中config-center链接了seata客户端配置文件(config.txt)和读取seata客户端配置文件在nacos生成配置的脚本(nacos-confg-interactive.sh)

将config.txt和nacos-confg-interactive.sh下载到本地(config.txt需要放在nacos-confg-interactive.sh所在目录的上一级目录)

config.txt内容包含了seata服务端和客户端所有配置,我们只需要保存客户端相关配置即可,为了方便,内容贴在下面:

#For details about configuration items, see https://seata.io/zh-cn/docs/user/configurations.html
#Transport configuration, for client and server
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableTmClientBatchSendRequest=false
transport.enableRmClientBatchSendRequest=true
transport.enableTcServerBatchSendResponse=false
transport.rpcRmRequestTimeout=30000
transport.rpcTmRequestTimeout=30000
transport.rpcTcRequestTimeout=30000
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
transport.serialization=seata
transport.compressor=none

#Transaction routing rules configuration, only for the client 事务组,default_tx_group可以自定义配置
service.vgroupMapping.default_tx_group=default

#Transaction rule configuration, only for the client
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.rm.sagaJsonParser=fastjson
client.rm.tccActionInterceptorOrder=-2147482648
client.rm.sqlParserType=druid
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
client.tm.interceptorOrder=-2147482648
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
#For TCC transaction mode
tcc.fence.logTableName=tcc_fence_log
tcc.fence.cleanPeriod=1h

#Log rule configuration, for client and server
log.exceptionRate=100

在nacos-confg-interactive.sh所在目录启动git终端执行以下命令运行脚本(本地需要先安装git)

sh nacos-confg-interactive.sh

运行后根据提示在终端输入nacos相关参数(和registry.conf配置保持一致)在nacos生成seata配置文件

application.yml配置文件seata相关配置

spring:
  cloud:
    alibaba:
      seata:
        # 配置事务组名称
        tx-service-group: default_tx_group
        # 配置事务组值(service.vgroupMapping.default_tx_group中的default_tx_group是tx-service-group配置的值)
        service:
          vgroupMapping:
            default_tx_group: default

注意:tx-service-group配置的值和config.txt中service.vgroupMapping配置保持一致

使用Seata对数据源进行代理

新建配置类

package com.yl.seata.order.config;

import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.transaction.SpringManagedTransactionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

import javax.sql.DataSource;

/**
 * 使用Seata对数据源进行代理
 *
 * @auther Y-wee
 */
@Configuration
public class DataSourceProxyConfig {

    @Value("${mybatis.mapperLocations}")
    private String mapperLocations;

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource druidDataSource() {
        return new DruidDataSource();
    }

    @Bean
    public DataSourceProxy dataSourceProxy(DataSource dataSource) {
        return new DataSourceProxy(dataSource);
    }

    @Bean
    public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSourceProxy);
        sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
        sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
        return sqlSessionFactoryBean.getObject();
    }

}

主启动类

主动类取消数据源的自动创建

package com.yl.seata.order;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;

//@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
// 取消数据源的自动创建
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class SeataOrderApplication {

    public static void main(String[] args) {
        SpringApplication.run(SeataOrderApplication.class,args);
    }

}

业务层

业务方法加@GlobalTransactional注解实现事务控制

package com.yl.seata.order.service.impl;

import com.yl.seata.order.entity.Order;
import com.yl.seata.order.mapper.OrderMapper;
import com.yl.seata.order.service.AccountService;
import com.yl.seata.order.service.OrderService;
import com.yl.seata.order.service.StorageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import io.seata.spring.annotation.GlobalTransactional;

import javax.annotation.Resource;

/**
 * 订单
 *
 * @auther Y-wee
 */
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
    @Resource
    private OrderMapper orderDao;
    @Resource
    private StorageService storageService;
    @Resource
    private AccountService accountService;

    /**
     * 创建订单->调用库存服务扣减库存->调用账户服务扣减账户余额->修改订单状态
     * 简单说:下订单->扣库存->减余额->改状态
     *
     * @param order
     */
    @Override
    @GlobalTransactional(rollbackFor = Exception.class)
    public void create(Order order) {
        log.info("-----开始新建订单");
        //1 新建订单
        orderDao.create(order);

        //2 扣减库存
        log.info("-----订单微服务开始调用库存,做扣减Count");
        storageService.decrease(order.getProductId(), order.getCount());
        log.info("-----订单微服务开始调用库存,做扣减end");

        //3 扣减账户
        log.info("-----订单微服务开始调用账户,做扣减Money");
        accountService.decrease(order.getUserId(), order.getMoney());
        log.info("-----订单微服务开始调用账户,做扣减end");

        //4 修改订单状态,从零到1,1代表已经完成
        log.info("-----修改订单状态开始");
        orderDao.update(order.getUserId(), 0);
        log.info("-----修改订单状态结束");

        log.info("-----下订单结束了,O(∩_∩)O哈哈~");
    }
}

配置完成->启动nacos->启动seata服务端->分别启动订单、库存、账户项目->发送请求抛出异常事务回滚