CAP+RabbitMQ+SqlServer之商品订单


CAP+RabbitMQ+SqlServer之商品订单

前言

介绍了CAP结合RabbitMQ并集合数据库持久化数据,这节结合订单业务来讲。

环境

  • l Win10
  • l VS2022
  • l .NET5.0
  • l DotNetCore.CAP 5.0.1
  • l DotNetCore.CAP.RabbitMQ 5.0.1
  • l DotNetCore.CAP.SqlServer 5.0.1
  • l CAP.Dashboard 5.0.0
  • l Microsoft.EntityFrameworkCore.Design 5.0.0
  • l SQLserver2012

项目实践

用一个简单的项目举例,一个订单服务,一个产品服务(产品服务为集群);用户下单调用订单服务下单接口,下单接口需要调用产品服务的减库存接口

在目进行修改。

修改项目

新建四个项目

DB项目:“Yak.Cap.RabbitMQ.DB

模型项目:Yak.Cap.RabbitMQ.Models

发布接口项目:Yak.Cap.RabbitMQ.PublisherApi

订阅接口项目Yak.Cap.RabbitMQ.SubscribeApi

DB项目

DB项目用于访问数据库,在发布消息时保存用户数据到数据库,这里使用SQLserver2012

Step1修改数据库上下文

public class CapRabbitMQDbContext : DbContext

    {

        public CapRabbitMQDbContext(DbContextOptions options)

            : base(options)

        {

        }

        /// 

        /// 重写父类的方法 用于连接数据库

        /// 

        /// 

        protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)

        {

            if (!optionsBuilder.IsConfigured)

            {

                optionsBuilder.UseSqlServer("Data Source=.;database=CapDb2;uid=sa;pwd=sa123456");

            }

        }

        //public DbSet Users { get; set; }

        public DbSet Orders { get; set; }

    }

模型项目

Step1新建订单模型

 [Table("Orders")]

    public class Order

    {

        [Key, MaxLength(50)]

        public string Id { get; set; }

        [MaxLength(50), Required]

        public string SkuName { get; set; }

        [Required]

        public double SkuId { get; set; }

        [Required]

        public int Num { get; set; }

    }

发布消息接口项目

Startup.cs 中,修改以下配置:

  public void ConfigureServices(IServiceCollection services)

        {

            services.AddControllers();

            //添加数据库上下文服务

            services.AddDbContext();

            //添加事件总线cap

            services.AddCap(x => {

                // 使用内存存储消息(消息发送失败处理)

                //x.UseInMemoryStorage();

                x.UseEntityFramework();

                //使用RabbitMQ进行事件中心处理

                x.UseRabbitMQ(rb => {

                    rb.HostName = "localhost";

                    rb.UserName = "guest";

                    rb.Password = "guest";

                    rb.Port = 5672;

                    rb.VirtualHost = "/";

                });

x.FailedRetryCount = 10;//重试最高次数

                x.FailedRetryInterval = 10; //重试间隔

                x.FailedThresholdCallback = Failed =>

                {

                    Console.WriteLine("重试10次");

                };

                //启用仪表盘

                x.UseDashboard();

            });

}

 

修改Publish控制器,发送时间消息:

 public class PublishController : Controller

    {

        private readonly ICapPublisher _capBus;

        public PublishController(ICapPublisher capPublisher)

        {

            _capBus = capPublisher;

        }

        /// 

        /// 创建订单

        /// 

        /// 

        /// 

        [Route("~/CreateOrder")]

        public async Task CreateOrder([FromServices] CapRabbitMQDbContext dbContext)

        {

            var orderId = DateTime.Now.ToString("yyyyMMdd") + new Random().Next(0, 100000000);

            var order = new Order()

            {

                Id = orderId,

                SkuName = "测试商品",

                SkuId = 78956426666244,

                Num = 60

            };

            var headers = new Dictionary();

            headers.Add("Buy", "234");

            /*消息接收方名称*/

            var PublishName = "ProductCheckHouseNum";

            /*推送内容*/

            var PublishContent = order;

            /*推送Header*/

            var PublishHeader = headers;

            using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: false))

            {

                //业务代码

                try

                {

                    dbContext.Add(order);

                    var result = dbContext.SaveChanges();

                    await _capBus.PublishAsync(PublishName, PublishContent, PublishHeader);

                    if (result > 0) trans.Commit();

                    else trans.Rollback();

                    orderId = result > 0 ? orderId : "";

                }

                catch (Exception ex)

                {

                    trans.Rollback();

                }

            }

            return orderId;

        }

    }

 

订阅接口项目

修改Consumer控制器,接受订单消息:

 public class ConsumerController : Controller

    {

        /// 

        ///  接收下单库存验证消息

        /// 

        /// 

        /// 

        /// 

        [NonAction]

        [CapSubscribe("ProductCheckHouseNum")]

        public Order ProductCheckHouseNum(Order Order, [FromCap] CapHeader header)

        {

            if (Order != null)

            {

                Console.WriteLine(Order.SkuName);

            }

            return Order;

        }

    }

 

调试

启动两个接口项目,访问http://localhost:5000/CreateOrder接口后

订阅接口接受到消息:

访问http://localhost:5000/cap,打开CAP面板。

访问http://localhost:15672/#/,打开RabbitMQ面板。

数据库中新建了订单数据

总结

某些情况下消费者需要返回值以告诉发布者执行结果,以便于发布者实施一些动作,通常情况下这属于补偿范围,以上面的项目为例,商品在接收到消息验证之后向订单服务返回执行结果

例中幂等性指的是多次操作,结果是一致的,但是CAP无法保证消息不重复(即使是能保证该有的验证还是要自己弄),实际使用中需要自己考虑一下消息的重复过滤和幂等性   

在上面的例子中接收方是模拟的集群,相同的消息会接收两次,相应的推送方订单服务也会接收到两次  

所以为保证幂等性在商品库中建立了一个流水表,用商品Id和订单号作为联合主键保证业务处理的唯一,订单服务同理,当然这只是一种处理方式,也可以使用Redis等进行处理。

鸣谢

https://blog.csdn.net/baidu_27598031/article/details/121886654

源码

https://github.com/yandaniugithub/CAP

最后,欢迎各位大佬们打赏、点赞和评论指正

版权所有,转载请注明出处:

https://www.cnblogs.com/yakniu/p/16211455.html

CAP