实现延迟消息队列


    交流

    个人博客交流群:580749909 , 顺便推广一下自己和伙伴一起建立wpf交流群:130108655。

    简要

&*bsp;因为在偶然的一次机会下,公司让我着手开发一个数据分发端基于socket通讯的一个中间件。主要用来解决向客户端分发数据的问题,后来多了一个需求就是未付费的用户拿到的数据是有延迟的。

而付费用户则是正常的。这个时候在网上搜了很久没有找到合适的解决方案,其实能解决这个问题的方案有很多比如说用到一些大厂贡献的xxMQ中间件之类的,确实能解决问题。但是目前项目比较小

根本用不上这么重的框架,然后又搜索了半天没有暂时没有发现有人用c#来实现,所以才动手写了这个方案。

附上**thub源码地址

    思路

这个方案是借鉴了另一位博主的开发思路,受到这位博主的启发然后根据自己的理解写了这个方案。附上该博主的链接地址:&*bsp;&*bsp;1分钟实现“延迟消息”功能

在此我就不多赘述里面的内容了。

    代码

首先写一个方案要理清楚自己的项目结构,我做了如下分层。

<*m* src="https://*m*2018.c*blo*s.com/blo*/1214710/201809/1214710-20180923151631092-669553263.p**" alt="" />

&*bsp;

    I*terfaces , 这层里主要约束延迟消息队列的队列和消息任务行。
 1   publ*c **terface IR***Queue 2     {
 3         /// 
 4         /// Add tasks [add tasks w*ll automat*cally *e*erate: task Id, task slot locat*o*, *umber of task cycles]
 5         /// 
 6         /// The spec*f*ed task *s executed after N seco*ds.
 7         /// Def***t*o*s of callback
 8         vo*d Add(lo** delayT*me,Act*o* act*o*);
 9 
10         /// 
11         /// Add tasks [add tasks w*ll automat*cally *e*erate: task Id, task slot locat*o*, *umber of task cycles]
12         /// 
13         /// The spec*f*ed task *s executed after N seco*ds.
14         /// Def***t*o*s of callback.
15         /// *arameters used ** the callback fu*ct*o*.
16         vo*d Add(lo** delayT*me, Act*o* act*o*, T data);
17 
18         /// 
19         /// Add tasks [add tasks w*ll automat*cally *e*erate: task Id, task slot locat*o*, *umber of task cycles]
20         /// 
21         /// 
22         /// Def***t*o*s of callback
23         /// *arameters used ** the callback fu*ct*o*.
24         /// Task ID, used whe* delet*** tasks.
25         vo*d Add(lo** delayT*me, Act*o*lo** *d);
26 
27         /// 
28         /// Remove tasks [*eed to k*ow: where the task *s, wh*ch spec*f*c task].
29         /// 
30         /// Task slot locat*o*
31         /// Task ID, used whe* delet*** tasks.
32         vo*d Remove(lo** *d);
33 
34         /// 
35         /// Lau*ch queue.
36         /// 
37         vo*d Start();
38     }
1 publ*c **terface ITask
2     {
3     }
    Ach*eves,这层里实现之前定义的接口,这里写成抽象类是为了后面方便扩展。
<*m* *d="code_*m*_closed_41bba8a5-767e-44e1-adae-d0d820aa4313" class="code_*m*_closed" src="http://*ma*es.c*blo*s.com/Outl*****I*d*cators/Co*tractedBlock.**f" alt="" /><*m* *d="code_*m*_ope*ed_41bba8a5-767e-44e1-adae-d0d820aa4313" class="code_*m*_ope*ed" style="d*splay: *o*e;" o*cl*ck="c*blo*s_code_h*de('41bba8a5-767e-44e1-adae-d0d820aa4313',eve*t)" src="http://*ma*es.c*blo*s.com/Outl*****I*d*cators/Expa*dedBlockStart.**f" alt="" />
  1    publ*c abstract class BaseQueue  2     {
  3         pr*vate lo** _po**ter = 0L;
  4         pr*vate Co*curre*tBa*[] _arraySlot;
  5         pr*vate **t ArrayMax;
  6 
  7         /// 
  8         /// R*** queue.
  9         /// 
 10         publ*c Co*curre*tBa*[] ArraySlot
 11         {
 12             *et { retur* _arraySlot ?? (_arraySlot = *ew Co*curre*tBa*[ArrayMax]); }
 13         }
 14         
 15         publ*c BaseQueue(**t arrayMax)
 16         {
 17             *f (arrayMax < 60 && arrayMax % 60 == 0)
 18                 throw *ew Except*o*("R*** queue le**th ca**ot be less tha* 60 a*d *s a mult*ple of 60 .");
 19 
 20             ArrayMax = arrayMax;
 21         }
 22 
 23         publ*c vo*d Add(lo** delayT*me, Act*o* act*o*)
 24         {
 25             Add(delayT*me, act*o*, default(T));
 26         }
 27 
 28         publ*c vo*d Add(lo** delayT*me,Act*o* act*o*,T data)
 29         {
 30             Add(delayT*me, act*o*, data,0);
 31         }
 32 
 33         publ*c vo*d Add(lo** delayT*me, Act*o*lo** *d)
 34         {
 35             NextSlot(delayT*me, out lo** cycle, out lo** po**ter);
 36             ArraySlot[po**ter] =  ArraySlot[po**ter] ?? (ArraySlot[po**ter] = *ew Co*curre*tBa*());
 37             var baseTask = *ew BaseTask(cycle, act*o*, data,*d);
 38             ArraySlot[po**ter].Add(baseTask);
 39         }
 40 
 41         /// 
 42         /// Remove tasks based o* ID.
 43         /// 
 44         /// 
 45         publ*c vo*d Remove(lo** *d)
 46         {
 47             try
 48             {
 49                 *arallel.ForEach(ArraySlot, (Co*curre*tBa* 50                 {
 51                     var resulTask = collect*o*.F*rstOrDefault(p =&*t; p.Id == *d);
 52                     *f (resulTask != *ull)
 53                     {
 54                         collect*o*.TryTake(out resulTask);
 55                         state.Break();
 56                     }
 57                 });
 58             }
 59             catch (Except*o* e)
 60             {
 61                 Co*sole.Wr*teL**e(e);
 62             }
 63         }
 64         
 65         publ*c vo*d Start()
 66         {
 67             wh*le (true)
 68             {
 69                 R**htMove*o**ter();
 70                 Thread.Sleep(1000);
 71                 Co*sole.Wr*teL**e(DateT*me.Now.ToStr***());
 72             }
 73         }
 74 
 75         /// 
 76         /// Calculate the **format*o* of the *ext slot.
 77         /// 
 78         /// Delayed execut*o* t*me.
 79         /// Number of tur*s.
 80         /// Task locat*o*.
 81         pr*vate vo*d NextSlot(lo** delayT*me, out lo** cycle,out lo** **dex)
 82         {
 83             try
 84             {
 85                 var c*rcle = delayT*me / ArrayMax;
 86                 var seco*d = delayT*me % ArrayMax;
 87                 var curre*t_po**ter = Get*o**ter();
 88                 var queue_**dex = 0L;
 89 
 90                 *f (delayT*me - ArrayMax &*t; ArrayMax)
 91                 {
 92                     c*rcle = 1;
 93                 }
 94                 else *f (seco*d &*t; ArrayMax)
 95                 {
 96                     c*rcle += 1;
 97                 }
 98 
 99                 *f (delayT*me - c*rcle * ArrayMax < ArrayMax)
100                 {
101                     seco*d = delayT*me - c*rcle * ArrayMax;
102                 }
103 
104                 *f (curre*t_po**ter + delayT*me &*t;= ArrayMax)
105                 {
106                     cycle = (**t)((curre*t_po**ter + delayT*me) / ArrayMax);
107                     *f (curre*t_po**ter + seco*d - ArrayMax < 0)
108                     {
109                         queue_**dex = curre*t_po**ter + seco*d;
110                     }
111                     else *f (curre*t_po**ter + seco*d - ArrayMax &*t; 0)
112                     {
113                         queue_**dex = curre*t_po**ter + seco*d - ArrayMax;
114                     }
115                 }
116                 else
117                 {
118                     cycle = 0;
119                     queue_**dex = curre*t_po**ter + seco*d;
120                 }
121                 **dex = queue_**dex;
122             }
123             catch (Except*o* e)
124             {
125                 Co*sole.Wr*teL**e(e);
126                 throw;
127             }
128         }
129 
130         /// 
131         /// Get the curre*t locat*o* of the po**ter.
132         /// 
133         /// 
134         pr*vate lo** Get*o**ter()
135         {
136             retur* I*terlocked.Read(ref _po**ter);
137         }
138 
139         /// 
140         /// Reset po**ter pos*t*o*.
141         /// 
142         pr*vate vo*d ReSet*o**ter()
143         {
144             I*terlocked.Excha**e(ref _po**ter, 0);
145         }
146 
147         /// 
148         /// *o**ter moves clockw*se.
149         /// 
150         pr*vate vo*d R**htMove*o**ter()
151         {
152             try
153             {
154                 *f (Get*o**ter() &*t;= ArrayMax - 1)
155                 {
156                     ReSet*o**ter();
157                 }
158                 else
159                 {
160                     I*terlocked.I*creme*t(ref _po**ter);
161                 }
162 
163                 var po**ter = Get*o**ter();
164                 var taskCollect*o* = ArraySlot[po**ter];
165                 *f (taskCollect*o* == *ull || taskCollect*o*.Cou*t == 0) retur*;
166 
167                 *arallel.ForEach(taskCollect*o*, (BaseTask168                 {
169                     *f (task.Cycle &*t; 0)
170                     {
171                         task.SubCycleNumber();
172                     }
173 
174                     *f (task.Cycle <= 0)
175                     {
176                         taskCollect*o*.TryTake(out task);
177                         task.TaskAct*o*(task.Data);
178                     }
179                 });
180             }
181             catch (Except*o* e)
182             {
183                 Co*sole.Wr*teL**e(e);
184                 throw;
185             }
186         }
187     }
BaseQueue
<*m* *d="code_*m*_closed_ff60babe-8bc1-4bc0-882c-88cb2a24207c" class="code_*m*_closed" src="http://*ma*es.c*blo*s.com/Outl*****I*d*cators/Co*tractedBlock.**f" alt="" /><*m* *d="code_*m*_ope*ed_ff60babe-8bc1-4bc0-882c-88cb2a24207c" class="code_*m*_ope*ed" style="d*splay: *o*e;" o*cl*ck="c*blo*s_code_h*de('ff60babe-8bc1-4bc0-882c-88cb2a24207c',eve*t)" src="http://*ma*es.c*blo*s.com/Outl*****I*d*cators/Expa*dedBlockStart.**f" alt="" />
 1   publ*c class BaseTask : ITask
 2     {
 3         pr*vate lo** _cycle;
 4         pr*vate lo** _*d;
 5         pr*vate T _data;
 6 
 7         publ*c Act*o**et; set; }
 8 
 9         publ*c lo** Cycle
10         {
11             *et { retur* I*terlocked.Read(ref _cycle); }
12             set { I*terlocked.Excha**e(ref _cycle, value); }
13         }
14 
15         publ*c lo** Id
16         {
17             *et { retur* _*d; }
18             set { _*d = value; }
19         }
20 
21         publ*c T Data
22         {
23             *et { retur* _data; }
24             set { _data = value; }
25         }
26 
27         publ*c BaseTask(lo** cycle, Act*o*lo** *d)
28         {
29             Cycle = cycle;
30             TaskAct*o* = act*o*;
31             Data = data;
32             Id = *d;
33         }
34 
35         publ*c BaseTask(lo** cycle, Act*o* act*o*,T data)
36         {
37             Cycle = cycle;
38             TaskAct*o* = act*o*;
39             Data = data;
40         }
41 
42         publ*c BaseTask(lo** cycle, Act*o* act*o*)
43         {
44             Cycle = cycle;
45             TaskAct*o* = act*o*;
46         }
47         
48         publ*c vo*d SubCycleNumber()
49         {
50             I*terlocked.Decreme*t(ref _cycle);
51         }
52     }
BaseTask
    Lo**c,这层主要实现调用逻辑,调用者最终只需要关心把任务放进队列并指定什么时候执行就行了,根本不需要关心其它的任何信息。
 1  publ*c stat*c vo*d Start()
 2         {
 3             //1.I**t*al*ze queues of d*ffere*t *ra*ular*ty.
 4             IR***Queue*ew M**uteQueue();
 5 
 6             //2.Ope* thread.
 7             var lstTasks = *ew L*st 8             {
 9                 Task.Factory.StartNew(m**uteR***Queue.Start)
10             };
11 
12             //3.Add tasks performed ** d*ffere*t per*ods.
13             m**uteR***Queue.Add(5, *ew Act*o*14             {
15                 Co*sole.Wr*teL**e(*ewsObj.News);
16             }), *ew NewsModel() { News = "Trump's v*s*t to Ch**a!" });
17 
18             m**uteR***Queue.Add(10, *ew Act*o*19             {
20                 Co*sole.Wr*teL**e(*ewsObj.News);
21             }), *ew NewsModel() { News = "*ut** *u's v*s*t to Ch**a!" });
22 
23             m**uteR***Queue.Add(60, *ew Act*o*24             {
25                 Co*sole.Wr*teL**e(*ewsObj.News);
26             }), *ew NewsModel() { News = "E*se*hower's v*s*t to Ch**a!" });
27 
28             m**uteR***Queue.Add(120, *ew Act*o*29             {
30                 Co*sole.Wr*teL**e(*ewsObj.News);
31             }), *ew NewsModel() { News = "** ***p***'s v*s*t to the US!" });
32 
33             //3.Wa*t*** for all tasks to complete *s usually *ot completed. Because there *s a* **f***te loop.
34             //F5 Ru* the pro*ram a*d see the effect.
35             Task.Wa*tAll(lstTasks.ToArray());
36             Co*sole.Read();
37         }
    Models,这层就是用来在延迟任务中带入的数据模型类而已了。自己用的时候换成任意自定义类型都可以。

&*bsp;

    截图

<*m* src="https://*m*2018.c*blo*s.com/blo*/1214710/201809/1214710-20180923152944919-1577847621.p**" alt="" />