消息中间件-RabbitMQ
一、基础知识
1. 什么是RabbitMQ
RabbitMQ是2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,简称MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法,由Erlang(专门针对于大数据高并发的语言)语言开发,可复用的企业消息系统,是当前最主流的消息中间件之一,具有可靠性、灵活的路由、消息集群简单、队列高可用、多种协议的支持、管理界面、跟踪机制以及插件机制。
2.什么是消息和队列
1.消息 就是数据,增删改查的数据。例如在员工管理系统中增删改查的数据
2.队列 指的是一端进数据一端出数据,例如C#中(Queue数据结构)
3.什么是消息队列
1.消息队列指:一端进消息,一端出消息
2.RabbitMQ就是实现了消息队列概念的一个组件,以面向对象的思想去理解,消息队列就是类,而RabbitMQ就是实例,当然不仅仅只有RabbitMQ,例如ActiveMQ,RocketMQ,Kafka,包括Redis也可以实现消息队列。
4.什么地方使用RabbitMQ
1.在常见的单体架构中,主要流程是用户UI操作发起Http请求>服务器处理>然后由服务器直接和数据库交互,最后同步反馈用户结果
2.在微服务架构中,例如下图中的员工管理系统,UI与微服务通信,主要是通过Http或者gRPC同步通信
问题分析
在上述2种情况下,我们发现在UI请求时都是同步操作 ,第2种架构虽然将整体服务按业务拆分成不同的微服务并且对应各自的数据库,但是在用户与微服务通信时,存在的问题依然没有解决,例如数据库的承载能力只能处理10w个请求,如果遇到高并发情况下,UI发起50w请求,那数据库是远远承载不了的,从而导致如下问题。
1.高并发请求导致系统性能下降响应慢,同时数据库承载风险加大
2.扩展性不强UI操作的交互对业务的依赖较大,导致用户体验下降
3.瞬时流量涌入巨大的话,服务器可能直接挂了
解决方案
- 为了解决性能瓶颈问题。我们需要将同步通信换成异步通信方式。因此就使用消息队列,用户在UI中操作直接写入RabbitMQ然后直接返回,剩下的业务操作由消息队列和各自的微服务来完成
RabbitMQ的优势
-
异步处理,响应快,增加了数据库(服务器的承载能力)
-
削峰,可以把流量的高峰分解到不同的时间段来处理
-
解耦(扩展性就更强),让UI和业务独立演化
-
高可用,处理器如果发生故障了,对其他的处理器没有影响
RabbitMQ的不足
-
增加了系统复杂性,不方便调试和开发,在使用RabbitMQ以前前端直接和服务交互,现在加了一层
-
即时性降低了,在某一程度上提升了用户操作体验,也降低了用户体验,但是避免不了,取长补短
-
更加依赖消息队列了
5.RabbitMQ组成概念
1.ConnectionFactory 为Connection的制造工厂。
2.Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。
3.Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。
4.Exchange(交换机) 我们通常认为生产者将消息投递到Queue中,实际上实际的情况是,生产者将消息发送到Exchange,由Exchange将消息路由到一个或多个Queue中(或者丢弃),而在RabbitMQ中的Exchange一共有4种策略,分别为:fanout(扇形)、direct(直连)、topic(主题)、headers(头部)
二、如何落地RabbitMQ
1.RabbitMQ环境安装
1.下载RabbitMQ
2.运行环境erlang
3.安装完成之后,加载RabbitMQ管理插件
rabbitmq-plugins enable rabbitmq_management
4.安装成功访问RabbitMQ管理后台http://localhost:15672
2.创建系统业务
1.分别创建考勤服务,请假服务,计算薪酬服务,邮件服务,短信服务消费者角色
2.创建员工管理网站用于模拟前端调用,主要充当生产者角色
3.在员工管理网站和每一个模拟微服务中通过nuget引入RabbitMQ.Client
4.在员工管理网站中创建模拟添加考勤的控制器并加入生产者代码
//创建连接
using (var connection = factory.CreateConnection())
{
//创建通道
var channel = connection.CreateModel();
//定义队列
channel.QueueDeclare("CreateAttendance", false, false, false, null);
string json = JsonConvert.SerializeObject(attendanceDto);
//创建内容对象
var properties = channel.CreateBasicProperties();
//发送消息
channel.BasicPublish(exchange: "",routingKey: "CreateAttendance",basicProperties: properties,body: Encoding.UTF8.GetBytes(json));
}
5.在考勤微服务中创建接口,并在接口中加入消费者代码
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
//创建消费者事件
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
// 1、逻辑代码,添加到数据库
var message = Encoding.UTF8.GetString(body.ToArray());
object json = JsonConvert.DeserializeObject(message);
Console.WriteLine(" [x] 创建考勤信息 {0}", message);
};
//设置消费者属性
//p1.监听队列p2.消息确认ACK p3.消费者实例赋值
channel.BasicConsume(queue: "CreateAttendance",autoAck: false,consumer:consumer);
三、Exchange交换机及实例分析
1.Fanout Exchange (扇形交换机)
fanout类型的Exchange路由规则非常简单,工作方式类似于多播一对多,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
1.生产者一个Exchange对应多个Queue,或者不声明Queue
2.消费者定义Exchange,如果生产者定义了Queue,那必须将exchange和queue绑定,如果没有定义队列,那消费者自己声明一个随机Queue用于接收消费消息
业务实例
当我们有员工需要请假,在员工管理系统提交请假,但是由于公司规定普通员工请假,需要发送短信到他的主管领导,针对此业务场景我们需要调用请假服务的同时去发送短信,这时需要两个消费者(请假服务,短信服务)来消费同一条消息,其实本质就是往RabbitMQ写入一个能被多个消费者接收的消息,所以可以使用 扇形交换机
,一个生产者,多个消费者.
生产者模拟使用调用控制器来实现
[HttpPost]
public IEnumerable<bool> CreateLeave(CreateLeaveDto createLeaveDto)
{
var factory = new ConnectionFactory()
{
HostName = "192.168.0.106",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
using (var connection = factory.CreateConnection())
{
var channel = connection.CreateModel();
//定义交换机
channel.ExchangeDeclare(exchange: "Leave_fanout", type: "fanout");
string productJson = JsonConvert.SerializeObject(createLeaveDto);
var body = Encoding.UTF8.GetBytes(productJson);
var properties = channel.CreateBasicProperties();
//设置消息持久化
properties.Persistent = true;
channel.BasicPublish(exchange: "Leave_fanout", routingKey: "", basicProperties: properties,body: body);
}
}
消费者实现IHostedService 接口创建一个监听主机
public class RabbitmqHostService : IHostedService
{
public Task StartAsync(CancellationToken cancellationToken)
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
// 1、定义交换机
channel.ExchangeDeclare(exchange: "Leave_fanout", type: ExchangeType.Fanout);
//定义随机队列
var queueName = channel.QueueDeclare().QueueName;
//队列和交换机绑定
channel.QueueBind(queueName,"Leave_fanout",routingKey: "");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Console.WriteLine($"model:{model}");
var body = ea.Body;
// 1、业务逻辑
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine(" [x] 创建请假 {0}", message);
// 1、自动确认机制缺陷,消息是否正常添加到数据库当中,所以需要使用手工确认
channel.BasicAck(ea.DeliveryTag, true);
};
// Qos(防止多个消费者,能力不一致,导致的系统质量问题。
// 每一次一个消费者只成功消费一个)
channel.BasicQos(0, 1, false);
// 消息确认(防止消息消费失败)
channel.BasicConsume(queue: queueName ,autoAck: false,consumer: consumer);
}
public Task StopAsync(CancellationToken cancellationToken)
{
// 1、关闭rabbitmq的连接
throw new NotImplementedException();
}
}
2.Direct Exchange (直连交换机)
直接交换器,工作方式类似于单播一对一,Exchange会将消息发送完全匹配ROUTING_KEY的Queue,缺陷是无法实现多生产者对一个消费者
1.生产者一个Exchange对应一个routingKey绑定,也可以声明队列并绑定,然后向指定的队列发送消息。
2.消费者需要定义Exchange和routingKey,如果生产者声明并绑定了队列,那消费者必须绑定生产者指定的Queue来接收消息,如果没有指定Queue,那消费者需要自己声明一个随机Queue然后绑定用于接收消息
当我们员工管理系统需要计算薪资并将结果以发送短信的方式告诉员工,这个时候我们就不太适合用“扇形交换机”了,因为换做是你,你也不想你的工资全公司都知道吧?这个时候就需要定制了一对一的场景了,那就在生产消息时使用直连交换机
根据routingKey发送指定的消费者.
生产者模拟使用调用控制器来实现
public IEnumerable<bool> SendCalculateSalary(CalculateSalaryDto calculateSalaryDto)
{
var factory = new ConnectionFactory()
{
HostName = "192.168.0.106",
Port = 5672,
Password = "admin",
UserName = "admin",
VirtualHost = "/"
};
using (var connection = factory.CreateConnection())
{
var channel = connection.CreateModel();
//2、定义交换机
channel.ExchangeDeclare(exchange: "CalculateSalary_direct", type: "direct");
string calculateSalaryDtoJson = JsonConvert.SerializeObject(calculateSalaryDto);
var body = Encoding.UTF8.GetBytes(calculateSalaryDtoJson);
//3、发送消息
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 设置消息持久化
//p1 指定交换机
//p2 routingKey
channel.BasicPublish(exchange: "CalculateSalary_direct",routingKey: "product-sms",basicProperties: properties,body: body);
}
}
消费者实现IHostedService 接口创建一个监听主机
public class RabbitmqHostService : IHostedService
{
public Task StartAsync(CancellationToken cancellationToken)
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
// 1、定义交换机
channel.ExchangeDeclare(exchange: "CalculateSalary_direct", type: ExchangeType.Direct);
// 2、定义随机队列
var queueName = channel.QueueDeclare().QueueName;
// 3、队列要和交换机绑定起来
channel.QueueBind(queueName,"CalculateSalary_direct",routingKey: "product-sms");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Console.WriteLine($"model:{model}");
var body = ea.Body;
// 1、业务逻辑
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine(" [x] 发送短信 {0}", message);
// 1、消息是否正常添加到数据库当中,所以需要使用手工确认
channel.BasicAck(ea.DeliveryTag, true);
};
// 3、消费消息
channel.BasicQos(0, 1, false); // Qos(防止多个消费者,能力不一致,导致的系统质量问题。
// autoAck设为false 不进行自动确认
channel.BasicConsume(queue: queueName,autoAck: false, consumer: consumer);
}
public Task StopAsync(CancellationToken cancellationToken)
{
// 1、关闭rabbitmq的连接
throw new NotImplementedException();
}
}
3.Topic Exchange (主题交换机)
Exchange绑定队列需要制定Key; Key 可以有自己的规则;Key可以有占位符;或者# ,匹配一个单词、#匹配多个单词,在Direct基础上加上模糊匹配;多生产者一个消费者,可以多对对,也可以多对1, 真实项目当中,使用主题交换机。可以满足所有场景
1.生产者定义Exchange,然后不同的routingKey绑定
2.消费者定义Exchange,如果生产者定义了Queue,那必须将exchange和queue以及routingKey绑定,如果没有定义队列,那消费者自己声明一个随机Queue用于接收消费消息,
3.消费者routingKey的模糊匹配,生产者发送消息时routingKey定义以sms.开头, * 号只能匹配的routingKey为一级,例如(sms.A)或(sms.B)的发送的消息,# 能够匹配的routingKey为一级及多级以上 ,例如 (sms.A)或者(sms.A.QWE.IOP)
在月底的时候我们需要把员工存在异常考勤信息,薪资结算信息,请假信息分别以邮件的形式发送给我们的员工查阅,我们知道这是一个典型的多个生产者,一个消费者场景,异常考勤信息,薪资结算信息,请假信息分别需要生产消息发送到RabbitMQ,然后供我们员工消费
分别模拟3个生产者:异常考勤信息,薪资结算信息,请假信息
var factory = new ConnectionFactory()
{
HostName = "192.168.0.106",
Port = 5672,
Password = "admin",
UserName = "admin",
VirtualHost = "/"
};
//计算薪资生产者
public IEnumerable<bool> SendCalculateSalary(CalculateSalaryDto calculateSalaryDto)
{
using (var connection = factory.CreateConnection())
{
var channel = connection.CreateModel();
//2、定义topic交换机
channel.ExchangeDeclare(exchange: "sms_topic", type: "topic");
string calculateSalaryDtoJson = JsonConvert.SerializeObject(calculateSalaryDto);
var body = Encoding.UTF8.GetBytes(calculateSalaryDtoJson);
//3、发送消息
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 设置消息持久化
//p1 指定交换机
//p2 routingKey
channel.BasicPublish(exchange: "sms_topic",routingKey: "sms.CalculateSalary",basicProperties: properties,body: body);
}
}
//考勤生产者
public IEnumerable<bool> SendCalculateAttendance(CalculateAttendanceDto calculateAttendance)
{
using (var connection = factory.CreateConnection())
{
var channel = connection.CreateModel();
//2、定义topic交换机
channel.ExchangeDeclare(exchange: "sms_topic", type: "topic");
string calculateAttendanceDtoJson = JsonConvert.SerializeObject(calculateAttendance);
var body = Encoding.UTF8.GetBytes(calculateAttendanceDtoJson);
//3、发送消息
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 设置消息持久化
//p1 指定交换机
//p2 routingKey
channel.BasicPublish(exchange: "sms_topic",routingKey: "sms.CalculateAttendance",basicProperties: properties,body: body);
}
}
//请假信息生产者
public IEnumerable<bool> SendCalculateLeave(CalculateLeaveDto calculateLeave)
{
using (var connection = factory.CreateConnection())
{
var channel = connection.CreateModel();
//2、定义topic交换机
channel