【Microsoft Azure学习之旅】测试消息队列(Service Bus Queue)是否会丢消息


  组里最近遇到一个问题,微软的Azure Service Bus Queue是否可靠?是否会出现丢失消息的情况?

  具体缘由如下,

  由于开发的产品是SaaS产品,为防止消息丢失,跨Module消息传递使用的是微软Azure消息队列(Service Bus Queue),但是出现一个问题,一个Module向Queue里发送消息,但另一个Module没有取到该消息。因为消息发送过程中并未有异常。所以大家怀疑,是否Azure Service Bus Queue不可靠,丢失了我们的一些消息

  官方的说法是,99.5%的概率消息不会丢失。

  但我想应该没有那么凑巧,毕竟我们的消息量还在测试阶段,没有那么大,不会那么凑巧碰上。所以索性根据同事的建议,写一个测试程序来确定Service Bus Queue是否会或者容易丢失消息。

一. 测试程序简介

  原理:向消息队列(Queue)中发送一定量的消息,看能否全部取到。如可全部取到,则可认为消息队列基本可靠,问题出在我们自己身上。

  过程:

  首先建立一个消息队列(Queue),程序使用Azure .Net SDK实现向Queue发送和接受消息(接收到消息后会调用方法在Queue中删除此消息,删除成功,则视为接收成功)。

  主程序执行后,会启动两个线程,

  线程1负责不断向Queue中发送消息(总量一定,假定共发送10000条,由于SDK中Send方法无返回值告知是否发送成功,如果发送过程中无异常抛出,则视为成功发送)。

  线程2负责不断地从Queue中取消息,取到消息到本地后,即删除在Queue中的此消息。取到消息并成功删除视为成功取到消息,计数器+1。

  日志模块:

  使用Log4net记录日志  

二. 代码实现

  Class ServiceBusQueueHandler负责封装.Net SDK的发送,接收消息。

class ServiceBusQueueHandler
    {
        public static readonly log4net.ILog log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);

        public ServiceBusQueueHandler()
        {
            /* For most scenarios, it is recommended that you keep Mode to Auto. 
             * This indicates that your application will attempt to use TCP to connect to the Windows Azure Service Bus, 
             * but will use HTTP if unable to do so. In general, this allows your connection to be more efficient. 
             * However, if TCP is always unavailable to your application, 
             * you can save some time on your connection if you globally set the mode to HTTP.*/
            ServiceBusEnvironment.SystemConnectivity.Mode = ConnectivityMode.AutoDetect;
        }
        
        //Send Message
        public bool SendMessage(string strMessageBody, QueueClient client, int idelayTime = 0)
        {
            //log.Debug("=>SendMessage");
            bool bRet = false;

            try
            {
                BrokeredMessage message = new BrokeredMessage(strMessageBody);

                DateTime utcEnqueueTime = DateTime.UtcNow.AddSeconds(idelayTime);
                //log.Debug(string.Format("DateTime.UtcNow = {0}", DateTime.UtcNow.ToString()));
                //log.Debug(string.Format("EnqueuedTimeUtc = {0}", utcEnqueueTime.ToString()));

                //set the time when this message will be visiable
                message.ScheduledEnqueueTimeUtc = utcEnqueueTime;

                //http://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.queueclient.send.aspx
                client.Send(message);
                
                log.Debug(string.Format("Success send! Send Time = {0}, Body = {1}", DateTime.UtcNow.ToString(), message.GetBody<string>()));
                bRet = true;
            }
            catch (TimeoutException e)
            {
                //Thrown when operation times out. Timeout period is initialized through the MessagingFactorySettings may need to increase the value of OperationTimeout to avoid this exception if timeout value is relatively low.
                log.Debug(string.Format("TimeoutException: {0}", e.Message));
                return bRet;
            }
            catch (ArgumentException e)
            {
                //Thrown when the BrokeredMessage is null.
                log.Debug(string.Format("ArgumentException: {0}", e.Message));
                return bRet;
            }
            catch (InvalidOperationException e)
            {
                //Thrown if the message has already been sent by a QueueClient or MessageSender once already.
                log.Debug(string.Format("InvalidOperationException: {0}", e.Message));
                return bRet;
            }
            catch (OperationCanceledException e)
            {
                //Thrown if the client entity has been closed or aborted.
                log.Debug(string.Format("OperationCanceledException: {0}", e.Message));
                return bRet;
            }
            catch (UnauthorizedAccessException e)
            {
                //Thrown if there is an I/O or security error.
                log.Debug(string.Format("UnauthorizedAccessException: {0}", e.Message));
                return bRet;
            }
            catch (SerializationException e)
            {
                //Thrown when an error occurs during serialization or deserialization.
                log.Debug(string.Format("SerializationException: {0}", e.Message));
                return bRet;
            }
            catch (MessagingEntityNotFoundException e)
            {
                //Thrown if the queue does not exist.
                log.Debug(string.Format("MessagingEntityNotFoundException: {0}", e.Message));
                return bRet;
            }
            catch (MessagingException e)
            {
                log.Debug(string.Format("MessagingException: {0}", e.Message));
                if (e.IsTransient)
                {
                    //e.IsTransient: Gets a value indicating whether the exception is transient. Check this property to determine if the operation should be retried.
                    HandleTransientErrors(e);
                }
                return bRet;
            }
            catch (Exception e)
            {
                log.Debug(string.Format("Exception: {0}", e.Message));
                return bRet;
            }

            //log.Debug("<=SendMessage");
            return bRet;
        }

        //SendMessages, the maximum size of the batch is the same as the maximum size of a single message (currently 256 Kb).
        public bool SendMessages(List<string> arrayMessages, QueueClient client, int idelayTime = 0)
        {
            //log.Debug("=>SendMessage");
            bool bRet = false;

            int i = 0;

            //prepare data
            List arrayBrokedMessages = new List();

            DateTime utcEnqueueTime = DateTime.UtcNow.AddSeconds(idelayTime);
            log.Debug(string.Format("DateTime.UtcNow = {0}", DateTime.UtcNow.ToString()));
            log.Debug(string.Format("EnqueuedTimeUtc = {0}", utcEnqueueTime.ToString()));

            foreach (string strMessageBody in arrayMessages)
            {
                BrokeredMessage message = new BrokeredMessage(strMessageBody);
                
                // The Id of message must be assigned 
                message.MessageId = "Message_" + (++i).ToString();
                message.ScheduledEnqueueTimeUtc = utcEnqueueTime;

                arrayBrokedMessages.Add(message);
            }

            //send messages
            try
            {
                client.SendBatch(arrayBrokedMessages);
                log.Debug(string.Format("Success send batch messages!"));
                bRet = true;
            }
            catch (TimeoutException e)
            {
                //Thrown when operation times out. Timeout period is initialized through the MessagingFactorySettings may need to increase the value of OperationTimeout to avoid this exception if timeout value is relatively low.
                log.Debug(string.Format("TimeoutException: {0}", e.Message));
                return bRet;
            }
            catch (ArgumentException e)
            {
                //Thrown when the BrokeredMessage is null.
                log.Debug(string.Format("ArgumentException: {0}", e.Message));
                return bRet;
            }
            catch (InvalidOperationException e)
            {
                //Thrown if the message has already been sent by a QueueClient or MessageSender once already.
                log.Debug(string.Format("InvalidOperationException: {0}", e.Message));
                return bRet;
            }
            catch (OperationCanceledException e)
            {
                //Thrown if the client entity has been closed or aborted.
                log.Debug(string.Format("OperationCanceledException: {0}", e.Message));
                return bRet;
            }
            catch (UnauthorizedAccessException e)
            {
                //Thrown if there is an I/O or security error.
                log.Debug(string.Format("UnauthorizedAccessException: {0}", e.Message));
                return bRet;
            }
            catch (SerializationException e)
            {
                //Thrown when an error occurs during serialization or deserialization.
                log.Debug(string.Format("SerializationException: {0}", e.Message));
                return bRet;
            }
            catch (MessagingEntityNotFoundException e)
            {
                //Thrown if the queue does not exist.
                log.Debug(string.Format("MessagingEntityNotFoundException: {0}", e.Message));
                return bRet;
            }
            catch (MessagingException e)
            {
                log.Debug(string.Format("MessagingException: {0}", e.Message));
                if (e.IsTransient)
                {
                    //e.IsTransient: Gets a value indicating whether the exception is transient. Check this property to determine if the operation should be retried.
                    HandleTransientErrors(e);
                }
                return bRet;
            }
            catch (Exception e)
            {
                log.Debug(string.Format("Exception: {0}", e.Message));
                return bRet;
            }

            log.Debug("<=SendMessage");
            return bRet;
        }


        //get messages from a queue
        //iWaitTimeout: The time span that the server will wait for the message batch to arrive before it times out.
        public List GetMessages(int iMaxNumMsg, int iWaitTimeout, QueueClient client)
        {
            //log.Debug("=>ReceiveMessages");

            List list = new List();

            try
            {
                //receive messages from Agent Subscription
                list = client.ReceiveBatch(iMaxNumMsg, TimeSpan.FromSeconds(iWaitTimeout)).ToList();
            }
            catch (MessagingException e)
            {
                log.Debug(string.Format("ReceiveMessages, MessagingException: {0}", e.Message));
                if (e.IsTransient)
                {
                    //e.IsTransient: Gets a value indicating whether the exception is transient. Check this property to determine if the operation should be retried.
                    HandleTransientErrors(e);
                }
                return null;
            }
            catch (Exception e)
            {
                log.Debug(string.Format("ReceiveMessages, Exception: {0}", e.Message));
                return null;
            }

            //subClient.Close();
            //log.Debug("<=ReceiveMessages");
            return list;
        }

        public bool DeleteMessage(BrokeredMessage message)
        {
            //log.Debug("=>DeleteMessage");
            bool bRet = false;

            try
            {
                message.Complete();
                bRet = true;
                log.Debug(string.Format("Delete Message Successfully"));
            }
            catch (Exception e)
            {
                log.Debug(e.Message);
                return bRet;
            }

            //log.Debug("<=DeleteMessage");
            return bRet;
        }

        private void HandleTransientErrors(MessagingException e)
        {
            //If transient error/exception, let's back-off for 2 seconds and retry
            log.Debug(e.Message);
            log.Debug("Transient error happened! Will retry in 2 seconds");
            Thread.Sleep(2000);
        }
    }

  Main方法以及线程1,线程2的实现。

//this function is used to send a number of messages to a queue
        public static void SendMessageToQueue()
        {
            int sendMessageNum = 10000;

            log.Debug(string.Format("=> SendMessageToQueue, send message number = {0}", sendMessageNum));

            //prepare the handler, client
            ServiceBusQueueHandler handler = new ServiceBusQueueHandler();
            QueueClient client = QueueClient.CreateFromConnectionString(connectionString, queueName);

            
            //the message num which is sent successfully
            int count = 0;

            for (int i = 0; i < sendMessageNum; i++)
            {
                //send a message
                string strMessageBody = System.Guid.NewGuid().ToString();

                bool bRet = handler.SendMessage(strMessageBody, client, 10);

                if (bRet)
                {
                    count++;
                }

                //wait 2s, then send next message
                Thread.Sleep(2000);
            }

            log.Debug(string.Format("<= SendMessageToQueue, success sent message number = {0}", count));
        }

        public static void ReceiveMessageFromQueue()
        {
            log.Debug("=> ReceiveMessageFromQueue");

            //prepare the handler, client
            ServiceBusQueueHandler handler = new ServiceBusQueueHandler();
            QueueClient client = QueueClient.CreateFromConnectionString(connectionString, queueName);

            //the message num which is received successfully
            int count = 0;

            //if we can't get message in 1 hour(60 * 60 =  30 * 120), we think there are no more messages in the queue
            int failCount = 0;

            while (failCount < 30)
            {
                List list = handler.GetMessages(10, 120, client);

                if (list.Count > 0)
                {
                    foreach (BrokeredMessage e in list)
                    {
                        log.Debug(string.Format("Received 1 Message, Time = {0}, Message Body = {1}", DateTime.UtcNow.ToString(), e.GetBody<string>()));

                        //delete message
                        bool bRet = handler.DeleteMessage(e);

                        if (bRet)
                        {
                            count++;
                        }
                    }

                    log.Debug(string.Format("Current Count Number = {0}", count));
                }
                else
                {
                    failCount++;
                    log.Debug(string.Format("Didn't Receive any Message this time, fail count number = {0}", failCount));
                }

                //wait 10s, then send next message
                Thread.Sleep(1000);
            }

            log.Debug(string.Format("<= ReceiveMessageFromQueue, success received message number = {0}", count));
        }
        
        static void Main(string[] args)
        {
            log4net.GlobalContext.Properties["LogName"] = "TestServiceBus.log";
            log4net.Config.XmlConfigurator.Configure();

            Console.WriteLine("Start");

            Thread threadSendMessage = new Thread(SendMessageToQueue);
            Thread threadReceMessage = new Thread(ReceiveMessageFromQueue);

            threadSendMessage.Start();
            threadReceMessage.Start();

            //Console.WriteLine("Stop");
            Console.ReadLine();
        }

  当然,这里有一个小地方,因为线程1只会发送10000条消息,线程2一直在接收,但当一个小时内没有接收到消息时,则可认为队列中不会再有消息,则停止接收。

三. 测试结果

  从Log来看,程序跑了将近8个小时,最后结果如下:

  成功发送10000条消息

2015-04-30 15:01:49,576 [3] DEBUG TestServiceBus.Program <= SendMessageToQueue, success sent message number = 10000

  成功接收10000条消息

2015-04-30 15:02:03,638 [4] DEBUG TestServiceBus.Program Current Count Number = 10000

  所以仅从此次测试结果来看,Service Bus Queue并未丢失消息。所以组里遇到消息的问题,建议还是从自己代码入手检查问题,是否我们自己出了问题,而非Service Bus Queue。

---------------------------------------------------------------

2015年5月5日更新:最终找到Service Bus丢失消息的原因,问题果然出在我们自己这边,发消息时,message id有重复的可能,导致可能会丢信。message id应唯一。

  抛砖引玉,谢谢:-)

  Kevin Song

  2015年5月2日