CAP+RabbitMQ+SqlServer之商品订单
CAP+RabbitMQ+SqlServer之商品订单
前言
介绍了CAP结合RabbitMQ并集合数据库持久化数据,这节结合订单业务来讲。
环境
- l Win10
- l VS2022
- l .NET5.0
- l DotNetCore.CAP 5.0.1
- l DotNetCore.CAP.RabbitMQ 5.0.1
- l DotNetCore.CAP.SqlServer 5.0.1
- l CAP.Dashboard 5.0.0
- l Microsoft.EntityFrameworkCore.Design 5.0.0
- l SQLserver2012
项目实践
用一个简单的项目举例,一个订单服务,一个产品服务(产品服务为集群);用户下单调用订单服务下单接口,下单接口需要调用产品服务的减库存接口。
在目进行修改。
修改项目
新建四个项目
DB项目:“Yak.Cap.RabbitMQ.DB”
模型项目:“Yak.Cap.RabbitMQ.Models”
发布接口项目:“Yak.Cap.RabbitMQ.PublisherApi”
订阅接口项目“Yak.Cap.RabbitMQ.SubscribeApi”
DB项目
DB项目用于访问数据库,在发布消息时保存用户数据到数据库,这里使用SQLserver2012。
Step1修改数据库上下文
public class CapRabbitMQDbContext : DbContext { public CapRabbitMQDbContext(DbContextOptions : base(options) { } /// /// 重写父类的方法 用于连接数据库 /// /// protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) { if (!optionsBuilder.IsConfigured) { optionsBuilder.UseSqlServer("Data Source=.;database=CapDb2;uid=sa;pwd=sa123456"); } } //public DbSet public DbSet } |
模型项目
Step1新建订单模型
[Table("Orders")] public class Order { [Key, MaxLength(50)] public string Id { get; set; } [MaxLength(50), Required] public string SkuName { get; set; } [Required] public double SkuId { get; set; } [Required] public int Num { get; set; } } |
发布消息接口项目
在 Startup.cs 中,修改以下配置:
public void ConfigureServices(IServiceCollection services) { services.AddControllers(); //添加数据库上下文服务 services.AddDbContext //添加事件总线cap services.AddCap(x => { // 使用内存存储消息(消息发送失败处理) //x.UseInMemoryStorage(); x.UseEntityFramework //使用RabbitMQ进行事件中心处理 x.UseRabbitMQ(rb => { rb.HostName = "localhost"; rb.UserName = "guest"; rb.Password = "guest"; rb.Port = 5672; rb.VirtualHost = "/"; }); x.FailedRetryCount = 10;//重试最高次数 x.FailedRetryInterval = 10; //重试间隔 x.FailedThresholdCallback = Failed => { Console.WriteLine("重试10次"); }; //启用仪表盘 x.UseDashboard(); }); } |
修改Publish控制器,发送时间消息:
public class PublishController : Controller { private readonly ICapPublisher _capBus; public PublishController(ICapPublisher capPublisher) { _capBus = capPublisher; } /// /// 创建订单 /// /// /// [Route("~/CreateOrder")] public async Task { var orderId = DateTime.Now.ToString("yyyyMMdd") + new Random().Next(0, 100000000); var order = new Order() { Id = orderId, SkuName = "测试商品", SkuId = 78956426666244, Num = 60 }; var headers = new Dictionary headers.Add("Buy", "234"); /*消息接收方名称*/ var PublishName = "ProductCheckHouseNum"; /*推送内容*/ var PublishContent = order; /*推送Header*/ var PublishHeader = headers; using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: false)) { //业务代码 try { dbContext.Add var result = dbContext.SaveChanges(); await _capBus.PublishAsync(PublishName, PublishContent, PublishHeader); if (result > 0) trans.Commit(); else trans.Rollback(); orderId = result > 0 ? orderId : ""; } catch (Exception ex) { trans.Rollback(); } } return orderId; } } |
订阅接口项目
修改Consumer控制器,接受订单消息:
public class ConsumerController : Controller { /// /// 接收下单库存验证消息 /// /// /// /// [NonAction] [CapSubscribe("ProductCheckHouseNum")] public Order ProductCheckHouseNum(Order Order, [FromCap] CapHeader header) { if (Order != null) { Console.WriteLine(Order.SkuName); } return Order; } } |
调试
启动两个接口项目,访问http://localhost:5000/CreateOrder接口后
订阅接口接受到消息:
访问http://localhost:5000/cap,打开CAP面板。
访问http://localhost:15672/#/,打开RabbitMQ面板。
数据库中新建了订单数据
总结
某些情况下消费者需要返回值以告诉发布者执行结果,以便于发布者实施一些动作,通常情况下这属于补偿范围,以上面的项目为例,商品在接收到消息验证之后向订单服务返回执行结果。
例中幂等性指的是多次操作,结果是一致的,但是CAP无法保证消息不重复(即使是能保证该有的验证还是要自己弄),实际使用中需要自己考虑一下消息的重复过滤和幂等性
在上面的例子中接收方是模拟的集群,相同的消息会接收两次,相应的推送方订单服务也会接收到两次
所以为保证幂等性在商品库中建立了一个流水表,用商品Id和订单号作为联合主键保证业务处理的唯一,订单服务同理,当然这只是一种处理方式,也可以使用Redis等进行处理。
鸣谢
https://blog.csdn.net/baidu_27598031/article/details/121886654
源码
https://github.com/yandaniugithub/CAP
最后,欢迎各位大佬们打赏、点赞和评论指正
版权所有,转载请注明出处:
https://www.cnblogs.com/yakniu/p/16211455.html