.NET 5 RabbitMQ


1.工具类

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace RabbitMQTest
{
    public class RabbitMQInvoker
    {

        #region Identy 
        private static IConnection _CurrentConnection = null;
        private readonly string _HostName = null;
        private readonly string _UserName = null;
        private readonly string _Password = null;
        #endregion

        public RabbitMQInvoker(string hostName = "localhost", string userName = "guest", string password = "guest")
        {
            this._HostName = hostName;
            this._UserName = userName;
            this._Password = password;
        }

        #region 初始化链接 
        private static object RabbitMQInvoker_InitLock = new object();
        private void InitConnection()
        {
            if (_CurrentConnection == null || !_CurrentConnection.IsOpen)
            {
                lock (RabbitMQInvoker_InitLock)
                {
                    if (_CurrentConnection == null || !_CurrentConnection.IsOpen)
                    {
                        var factory = new ConnectionFactory()
                        {
                            HostName = this._HostName,
                            Password = this._Password,
                            UserName = this._UserName
                        };
                        _CurrentConnection = factory.CreateConnection();
                    }
                }
            }
        }
        #endregion

        #region 初始化交换机 
        private static Dictionary RabbitMQInvoker_ExchangeQueue = new Dictionary();
        private static object RabbitMQInvoker_BindQueueLock = new object();
        /// 
        /// 必须先声明exchange--检查+初始化
        /// 
        /// 
        private void InitExchange(string exchangeName)
        {
            if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitExchange_{exchangeName}"))//没用api确认
            {
                lock (RabbitMQInvoker_BindQueueLock)
                {
                    if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitExchange_{exchangeName}"))
                    {
                        this.InitConnection();
                        using (IModel channel = _CurrentConnection.CreateModel())
                        {
                            channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);
                        }
                        RabbitMQInvoker_ExchangeQueue[$"InitExchange_{exchangeName}"] = true;
                    }
                }
            }
        }

        /// 
        /// 初始化绑定关系
        /// 
        /// 
        private void InitBindQueue(RabbitMQConsumerModel rabbitMQConsumerModel)
        {
            if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitBindQueue_{rabbitMQConsumerModel.ExchangeName}_{rabbitMQConsumerModel.QueueName}"))
            {
                lock (RabbitMQInvoker_BindQueueLock)
                {
                    if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitBindQueue_{rabbitMQConsumerModel.ExchangeName}_{rabbitMQConsumerModel.QueueName}"))
                    {
                        this.InitConnection();
                        using (IModel channel = _CurrentConnection.CreateModel())
                        {
                            channel.ExchangeDeclare(exchange: rabbitMQConsumerModel.ExchangeName, type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);
                            channel.QueueDeclare(queue: rabbitMQConsumerModel.QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                            channel.QueueBind(queue: rabbitMQConsumerModel.QueueName, exchange: rabbitMQConsumerModel.ExchangeName, routingKey: string.Empty, arguments: null);
                        }
                        RabbitMQInvoker_ExchangeQueue[$"InitBindQueue_{rabbitMQConsumerModel.ExchangeName}_{rabbitMQConsumerModel.QueueName}"] = true;
                    }
                }
            }
        }
        #endregion
         
        #region 发送消息
        /// 
        /// 只管exchange---
        /// 4种路由类型?
        /// 
        /// Send前完成交换机初始化
        /// 
        /// 
        /// 建议Json格式
        public void Send(string exchangeName, string message)
        {
            this.InitExchange(exchangeName);

            if (_CurrentConnection == null || !_CurrentConnection.IsOpen)
            {
                this.InitConnection();
            }
            using (var channel = _CurrentConnection.CreateModel())//开辟新的信道通信
            {
                try
                {
                    channel.TxSelect();//开启Tx事务---RabbitMQ协议级的事务-----强事务

                    var body = Encoding.UTF8.GetBytes(message);
                    channel.BasicPublish(exchange: exchangeName,
                                         routingKey: string.Empty,
                                         basicProperties: null,
                                         body: body);
                    channel.TxCommit();//提交
                    Console.WriteLine($" [x] Sent {body.Length}");
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                    Console.WriteLine($"【{message}】发送到Broker失败!{ex.Message}");
                    channel.TxRollback(); //事务回滚--前面的所有操作就全部作废了。。。。
                }
            }
        }
        #endregion

        #region Receive
        /// 
        /// 注册处理动作
        /// 
        /// 
        /// 
        public void RegistReciveAction(RabbitMQConsumerModel rabbitMQConsumerMode, Func func)
        {
            this.InitBindQueue(rabbitMQConsumerMode);

            Task.Run(() =>
            {
                using (var channel = _CurrentConnection.CreateModel())
                {
                    var consumer = new EventingBasicConsumer(channel);
                    channel.BasicQos(0, 0, true);
                    consumer.Received += (sender, ea) =>
                    {
                        string str = Encoding.UTF8.GetString(ea.Body.ToArray());
                        if (func(str))
                        {
                            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//确认已消费
                        }
                        else
                        {
                            channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);//放回队列--重新包装信息,放入其他队列
                        }
                    };
                    channel.BasicConsume(queue: rabbitMQConsumerMode.QueueName,
                                         autoAck: false,//不ACK
                                         consumer: consumer);
                    Console.WriteLine($" Register Consumer To {rabbitMQConsumerMode.ExchangeName}-{rabbitMQConsumerMode.QueueName}");
                    Console.ReadLine();
                    Console.WriteLine($" After Register Consumer To {rabbitMQConsumerMode.ExchangeName}-{rabbitMQConsumerMode.QueueName}");
                }
            });
        }
        #endregion


    }
}

2.实体类

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace RabbitMQTest
{
    public class RabbitMQConsumerModel
    {
        /// 
        /// 队列名称
        /// 
        public string QueueName { get; set; } 

        /// 
        /// 交换机名称
        /// 
        public string ExchangeName { get; set; }
    }
}

3.通过构造函数调用

RabbitMQInvoker rabbitMQ = new RabbitMQInvoker("", "", "");  

相关