CAP结合RabbitMQ分布式事务快速开始


CAP结合RabbitMQ分布式事务快速开始

前言

到今天为止已经在家关了一个多月了,上海的疫情稍稍有点好转,今天一万多阳性,看到点曙光,希望早点解封吧。

介绍了CAP结合内存,这节结合RabbitMQ并集合数据库持久化数据。

CAP是一个在分布式系统(SOA微服务系统(MicroService)中实现事件总线及最终一致性(分布式事务)的一个开源的C#库,具有轻量级,高性能,易使用等特点

CAP 具有Event Bus的所有功能,简化EventBus发布/订阅

CAP 具有消息持久化的功能,服务进行重启或者宕机不比担心消息丢失保证可靠性

Cap支持事务,通过捕获数据库上下文连接对象实现消息事务,消息持久化

可以简单理解:

使用起来非常简单,主要通过这个类来实现 

发布:ICapPublisher

订阅:CapSubscribe

环境

l Win10

l VS2022

l .NET5.0

l DotNetCore.CAP 5.0.1

l DotNetCore.CAP.RabbitMQ 5.0.1

l DotNetCore.CAP.SqlServer 5.0.1

l CAP.Dashboard 5.0.0

l Microsoft.EntityFrameworkCore.Design 5.0.0

l SQLserver2012

项目实践

实现功能是通过一个接口发布消息到另外一个接口,并向数据库中插入数据,通过CAP看板和RabbitMQ看板来查看变化。

新建项目

新建四个项目

DB项目:“Yak.Cap.RabbitMQ.DB

模型项目:Yak.Cap.RabbitMQ.Models

发布接口项目:Yak.Cap.RabbitMQ.PublisherApi

订阅接口项目Yak.Cap.RabbitMQ.SubscribeApi

DB项目

DB项目用于访问数据库,在发布消息时保存用户数据到数据库,这里使用SQLserver2012

添加后所有的依赖有:

  

    

    

    

    

    

    

  

添加数据库上下文

  public class CapRabbitMQDbContext : DbContext

    {

        public CapRabbitMQDbContext(DbContextOptions options)

            : base(options)

        {

        }

        /// 

        /// 重写父类的方法 用于连接数据库

        /// 

        /// 

        protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)

        {

            if (!optionsBuilder.IsConfigured)

            {

                optionsBuilder.UseSqlServer("Data Source=.;database=CapDb;uid=sa;pwd=sa123456");

            }

        }

        public DbSet Users { get; set; }

    }

模型项目

提供模型

  [Table("Sys_User")]

    public class Sys_User

    {

        public int Id { get; set; }

        /// 

        /// 用户名

        /// 

        public string Name { get; set; }

        /// 

        /// 手机号码

        /// 

        public string C_Mobile { get; set; }

 }

发布消息接口项目

添加后所有的依赖有:

  

     

    

    

    

  

在 Startup.cs 中,添加以下配置:

  public void ConfigureServices(IServiceCollection services)

        {

            services.AddControllers();

            //添加数据库上下文服务

            services.AddDbContext();

            //添加事件总线cap

            services.AddCap(x => {

                // 使用内存存储消息(消息发送失败处理)

                //x.UseInMemoryStorage();

                x.UseEntityFramework();

                //使用RabbitMQ进行事件中心处理

                x.UseRabbitMQ(rb => {

                    rb.HostName = "localhost";

                    rb.UserName = "guest";

                    rb.Password = "guest";

                    rb.Port = 5672;

                    rb.VirtualHost = "/";

                });

                //启用仪表盘

                x.UseDashboard();

            });

}

 

添加Publish控制器,发送时间消息:

 public class PublishController : Controller

    {

        private readonly ICapPublisher _capBus;

        public PublishController(ICapPublisher capPublisher)

        {

            _capBus = capPublisher;

        }

        [Route("~/ef/transaction")]

        public IActionResult EntityFrameworkWithTransaction([FromServices] CapRabbitMQDbContext dbContext)

        {

            using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: true))

            {

                //业务代码

                dbContext.Add(new Sys_User {Name = "yak", C_Mobile = "18221546985" });

                dbContext.SaveChanges();

                _capBus.Publish("test.show.time", DateTime.Now);

            }

            return Ok();

        }

    }

 

订阅接口项目

添加后所有的依赖有:

  

     

    

    

    

  

Startup.cs 中,添加以下配置:

  public void ConfigureServices(IServiceCollection services)

        {

            services.AddControllers();

            //添加数据库上下文服务

            services.AddDbContext();

            //添加事件总线cap

            services.AddCap(x => {

                // 使用内存存储消息(消息发送失败处理)

                //x.UseInMemoryStorage();

                x.UseEntityFramework();

                //使用RabbitMQ进行事件中心处理

                x.UseRabbitMQ(rb => {

                    rb.HostName = "localhost";

                    rb.UserName = "guest";

                    rb.Password = "guest";

                    rb.Port = 5672;

                    rb.VirtualHost = "/";

                });

                //启用仪表盘

                x.UseDashboard();

            });

}

 

添加Publish控制器,发送时间消息:

 public class ConsumerController : Controller

    {

        [NonAction]

        [CapSubscribe("test.show.time")]

        public void ReceiveMessage(DateTime time)

        {

            Console.WriteLine("message time is:" + time);

        }

        

 }

 

调试

启动两个接口项目,访问http://localhost:5000/ef/transaction接口后

订阅接口接受到时间:

访问http://localhost:5000/cap,打开CAP面板。

访问http://localhost:15672/#/,打开RabbitMQ面板。

数据库中新建了用户数据

总结

通过此实践了解了CAP结合RabbitMQ和数据库SQLServer进行消息发送订阅及持久化的功能(此实际仅仅保存了用户),服务进行重启或者宕机不比担心消息丢失保证可靠性。

CAP的实现要更加的严谨、更加强大,我们不需要建过程表,也不需要处理消息队列的问题,底层很多的细节都不需要我们考虑,只管用就好了。

CAP数据库存储支持:Sql ServerMySqlPostgreSqlMongoDB

消息队列支持:RabbitMQKafkaAzure Service Bus等。

CAP同时支持使用 EntityFrameworkCore ADO.NET 的项目,你可以根据需要选择不同的配置方式。

鸣谢

https://cap.dotnetcore.xyz/user-guide/zh/storage/sqlserver/

https://www.cnblogs.com/savorboard/p/cap.html

源码

https://github.com/yandaniugithub/CAP

最后,欢迎各位大佬们打赏、点赞和评论指正

版权所有,转载请注明出处:

https://www.cnblogs.com/yakniu/p/16204192.html

CAP