.NET 5 RabbitMQ
1.工具类
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace RabbitMQTest { public class RabbitMQInvoker { #region Identy private static IConnection _CurrentConnection = null; private readonly string _HostName = null; private readonly string _UserName = null; private readonly string _Password = null; #endregion public RabbitMQInvoker(string hostName = "localhost", string userName = "guest", string password = "guest") { this._HostName = hostName; this._UserName = userName; this._Password = password; } #region 初始化链接 private static object RabbitMQInvoker_InitLock = new object(); private void InitConnection() { if (_CurrentConnection == null || !_CurrentConnection.IsOpen) { lock (RabbitMQInvoker_InitLock) { if (_CurrentConnection == null || !_CurrentConnection.IsOpen) { var factory = new ConnectionFactory() { HostName = this._HostName, Password = this._Password, UserName = this._UserName }; _CurrentConnection = factory.CreateConnection(); } } } } #endregion #region 初始化交换机 private static DictionaryRabbitMQInvoker_ExchangeQueue = new Dictionary (); private static object RabbitMQInvoker_BindQueueLock = new object(); /// /// 必须先声明exchange--检查+初始化 /// /// private void InitExchange(string exchangeName) { if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitExchange_{exchangeName}"))//没用api确认 { lock (RabbitMQInvoker_BindQueueLock) { if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitExchange_{exchangeName}")) { this.InitConnection(); using (IModel channel = _CurrentConnection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null); } RabbitMQInvoker_ExchangeQueue[$"InitExchange_{exchangeName}"] = true; } } } } ////// 初始化绑定关系 /// /// private void InitBindQueue(RabbitMQConsumerModel rabbitMQConsumerModel) { if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitBindQueue_{rabbitMQConsumerModel.ExchangeName}_{rabbitMQConsumerModel.QueueName}")) { lock (RabbitMQInvoker_BindQueueLock) { if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitBindQueue_{rabbitMQConsumerModel.ExchangeName}_{rabbitMQConsumerModel.QueueName}")) { this.InitConnection(); using (IModel channel = _CurrentConnection.CreateModel()) { channel.ExchangeDeclare(exchange: rabbitMQConsumerModel.ExchangeName, type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null); channel.QueueDeclare(queue: rabbitMQConsumerModel.QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueBind(queue: rabbitMQConsumerModel.QueueName, exchange: rabbitMQConsumerModel.ExchangeName, routingKey: string.Empty, arguments: null); } RabbitMQInvoker_ExchangeQueue[$"InitBindQueue_{rabbitMQConsumerModel.ExchangeName}_{rabbitMQConsumerModel.QueueName}"] = true; } } } } #endregion #region 发送消息 ////// 只管exchange--- /// 4种路由类型? /// /// Send前完成交换机初始化 /// /// /// 建议Json格式 public void Send(string exchangeName, string message) { this.InitExchange(exchangeName); if (_CurrentConnection == null || !_CurrentConnection.IsOpen) { this.InitConnection(); } using (var channel = _CurrentConnection.CreateModel())//开辟新的信道通信 { try { channel.TxSelect();//开启Tx事务---RabbitMQ协议级的事务-----强事务 var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: exchangeName, routingKey: string.Empty, basicProperties: null, body: body); channel.TxCommit();//提交 Console.WriteLine($" [x] Sent {body.Length}"); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine($"【{message}】发送到Broker失败!{ex.Message}"); channel.TxRollback(); //事务回滚--前面的所有操作就全部作废了。。。。 } } } #endregion #region Receive ////// 注册处理动作 /// /// /// public void RegistReciveAction(RabbitMQConsumerModel rabbitMQConsumerMode, Funcfunc) { this.InitBindQueue(rabbitMQConsumerMode); Task.Run(() => { using (var channel = _CurrentConnection.CreateModel()) { var consumer = new EventingBasicConsumer(channel); channel.BasicQos(0, 0, true); consumer.Received += (sender, ea) => { string str = Encoding.UTF8.GetString(ea.Body.ToArray()); if (func(str)) { channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//确认已消费 } else { channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);//放回队列--重新包装信息,放入其他队列 } }; channel.BasicConsume(queue: rabbitMQConsumerMode.QueueName, autoAck: false,//不ACK consumer: consumer); Console.WriteLine($" Register Consumer To {rabbitMQConsumerMode.ExchangeName}-{rabbitMQConsumerMode.QueueName}"); Console.ReadLine(); Console.WriteLine($" After Register Consumer To {rabbitMQConsumerMode.ExchangeName}-{rabbitMQConsumerMode.QueueName}"); } }); } #endregion } }
2.实体类
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace RabbitMQTest { public class RabbitMQConsumerModel { ////// 队列名称 /// public string QueueName { get; set; } ////// 交换机名称 /// public string ExchangeName { get; set; } } }
3.通过构造函数调用
RabbitMQInvoker rabbitMQ = new RabbitMQInvoker("", "", "");