分布式高性能消息处理中心HPMessageCenter


HPMessageCenter

高性能消息分发中心。用户只需写好restful接口,在portal里面配置消息的处理地址,消息消费者就会自动访问相关接口,完成消息任务。(其实HPMessageCenter有两个版本,这次开源的是第二个版本。在第一个版本,消息消费失败的重试依赖数据库进行,造成了很大的性能影响。我一直在思考怎样不依赖数据库来进行消息重试,看过一些实现是通过在内存中计数,这种方式存在很大的风险,系统宕机过后这个失败的消息就只能手动处理。我一直在想,如果每次RabbitMQ重入队的消息能在消息属性中加一个消息发送次数的计数就好了。最后,我利用延时队列和死信队列,通过在MessageHeader中设置RetryNumbers来重新设计了消息消费失败的重试机制。
Git地址https://gitee.com/dugukuangshao/HPMessageCenter

部署说明

 **创建数据库执行Scripts文件夹中的数据库脚本** 

 **配置数据库连接字符串** 

打开MessageCenter.Portal\Configuration\Data\Database.config

在图片中画红线的地方修改链接字符串

 **配置RabbitMQ连接属性** 

打开MessageCenter.Portal\appsettings.json

在图片中画红线的地方修改RabbitMQ地址和用户名密码

部署MessageCenter.Portal到IIS或者Docker中,即可访问

使用说明

 **在首页中配置App和Exchange** 

 **在Topic管理页面中配置Topic信息** 

ProcessorConfig为消息消费者处理消息的Restful接口

 **最后在消费者管理页面中添加Sever要运行的Consumer** 

此时,通过Publisher/Publish接口发送消息到RabbitMQ,系统中的消费者会自动访问配置的对应消息处理接口处理消息。

### 为什么要开发这样一个消息系统

使用此消息系统处理消息,开发者只需调用接口发送消息、写消息处理的接口,不必关心MQ的实现和使用,使开发者更关注业务,提高开发效率。

扩展性

MessageTransit模块是一个高度抽象的模块,开发者可以继承它的接口实现其他MQ(ActiveMQ和RocketMQ等)的对接。该模块还设计了IMonitor和ILogger接口,开发者可以继承IMonitor接口,自定义实现消息处理失败的短信、邮件通知等功能,继承ILogger接口,可以实现MQ断线的通知等,当然默认的RabbitMQ实现支持断线重连。

性能

一个Sever里每个Topic对应一个消费者。一个消费者一次处理一个消息,如果发生消息处理不及时的情况,可以部署多个Sever,并在消费者管理页面添加对应的消息消费者。如果消息数量进一步提高,就需要同步增加消息Restful处理接口的处理能力。比如,一个消息处理接口可以处理每分钟300个并发,一个Sever每分钟只能处理60个消息,此时可以部署5个Sever。消息进一步增加,消息处理接口性能达到瓶颈,增加消息处理接口的处理能力,再增加Sever数即可提高性能。