实现延迟消息队列
&*bsp;
1 publ*c **terface IR***Queue2 { 3 /// 4 /// Add tasks [add tasks w*ll automat*cally *e*erate: task Id, task slot locat*o*, *umber of task cycles] 5 /// 6 /// The spec*f*ed task *s executed after N seco*ds. 7 /// Def***t*o*s of callback 8 vo*d Add(lo** delayT*me,Act*o*act*o*); 9 10 /// 11 /// Add tasks [add tasks w*ll automat*cally *e*erate: task Id, task slot locat*o*, *umber of task cycles] 12 /// 13 /// The spec*f*ed task *s executed after N seco*ds. 14 /// Def***t*o*s of callback. 15 /// *arameters used ** the callback fu*ct*o*. 16 vo*d Add(lo** delayT*me, Act*o*act*o*, T data); 17 18 /// 19 /// Add tasks [add tasks w*ll automat*cally *e*erate: task Id, task slot locat*o*, *umber of task cycles] 20 /// 21 /// 22 /// Def***t*o*s of callback 23 /// *arameters used ** the callback fu*ct*o*. 24 /// Task ID, used whe* delet*** tasks. 25 vo*d Add(lo** delayT*me, Act*o*lo** *d); 26 27 /// 28 /// Remove tasks [*eed to k*ow: where the task *s, wh*ch spec*f*c task]. 29 /// 30 /// Task slot locat*o* 31 /// Task ID, used whe* delet*** tasks. 32 vo*d Remove(lo** *d); 33 34 /// 35 /// Lau*ch queue. 36 /// 37 vo*d Start(); 38 }
1 publ*c **terface ITask 2 { 3 }
1 publ*c abstract class BaseQueue2 { 3 pr*vate lo** _po**ter =0L ; 4 pr*vate Co*curre*tBa*[] _arraySlot; 5 pr*vate **t ArrayMax; 6 7 /// 8 /// R*** queue. 9 /// 10 publ*c Co*curre*tBa*[] ArraySlot 11 { 12 *et {retur* _arraySlot ?? (_arraySlot =*ew Co*curre*tBa*[ArrayMax]); } 13 } 14 15 publ*c BaseQueue(**t arrayMax) 16 { 17 *f (arrayMax <60 && arrayMax %60 ==0 ) 18 throw *ew Except*o*(" R*** queue le**th ca**ot be less tha* 60 a*d *s a mult*ple of 60 . " ); 19 20 ArrayMax =arrayMax; 21 } 22 23 publ*c vo*d Add(lo** delayT*me, Act*o*act*o*) 24 { 25 Add(delayT*me, act*o*,default (T)); 26 } 27 28 publ*c vo*d Add(lo** delayT*me,Act*o*act*o*,T data) 29 { 30 Add(delayT*me, act*o*, data,0 ); 31 } 32 33 publ*c vo*d Add(lo** delayT*me, Act*o*lo** *d) 34 { 35 NextSlot(delayT*me,out lo** cycle,out lo** po**ter); 36 ArraySlot[po**ter] = ArraySlot[po**ter] ?? (ArraySlot[po**ter] =*ew Co*curre*tBa*()); 37 var baseTask =*ew BaseTask(cycle, act*o*, data,*d); 38 ArraySlot[po**ter].Add(baseTask); 39 } 40 41 /// 42 /// Remove tasks based o* ID. 43 /// 44 /// 45 publ*c vo*d Remove(lo** *d) 46 { 47 try 48 { 49 *arallel.ForEach(ArraySlot, (Co*curre*tBa*50 { 51 var resulTask = collect*o*.F*rstOrDefault(p =&*t; p.Id ==*d); 52 *f (resulTask !=*ull ) 53 { 54 collect*o*.TryTake(out resulTask); 55 state.Break(); 56 } 57 }); 58 } 59 catch (Except*o* e) 60 { 61 Co*sole.Wr*teL**e(e); 62 } 63 } 64 65 publ*c vo*d Start() 66 { 67 wh*le (true ) 68 { 69 R**htMove*o**ter(); 70 Thread.Sleep(1000 ); 71 Co*sole.Wr*teL**e(DateT*me.Now.ToStr***()); 72 } 73 } 74 75 /// 76 /// Calculate the **format*o* of the *ext slot. 77 /// 78 /// Delayed execut*o* t*me. 79 /// Number of tur*s. 80 /// Task locat*o*. 81 pr*vate vo*d NextSlot(lo** delayT*me,out lo** cycle,out lo** **dex) 82 { 83 try 84 { 85 var c*rcle = delayT*me /ArrayMax; 86 var seco*d = delayT*me %ArrayMax; 87 var curre*t_po**ter =Get*o**ter(); 88 var queue_**dex =0L ; 89 90 *f (delayT*me - ArrayMax &*t;ArrayMax) 91 { 92 c*rcle =1 ; 93 } 94 else *f (seco*d &*t;ArrayMax) 95 { 96 c*rcle +=1 ; 97 } 98 99 *f (delayT*me - c*rcle * ArrayMax <ArrayMax) 100 { 101 seco*d = delayT*me - c*rcle *ArrayMax; 102 } 103 104 *f (curre*t_po**ter + delayT*me &*t;=ArrayMax) 105 { 106 cycle = (**t )((curre*t_po**ter + delayT*me) /ArrayMax); 107 *f (curre*t_po**ter + seco*d - ArrayMax <0 ) 108 { 109 queue_**dex = curre*t_po**ter +seco*d; 110 } 111 else *f (curre*t_po**ter + seco*d - ArrayMax &*t;0 ) 112 { 113 queue_**dex = curre*t_po**ter + seco*d -ArrayMax; 114 } 115 } 116 else 117 { 118 cycle =0 ; 119 queue_**dex = curre*t_po**ter +seco*d; 120 } 121 **dex =queue_**dex; 122 } 123 catch (Except*o* e) 124 { 125 Co*sole.Wr*teL**e(e); 126 throw ; 127 } 128 } 129 130 /// 131 /// Get the curre*t locat*o* of the po**ter. 132 /// 133 /// 134 pr*vate lo** Get*o**ter() 135 { 136 retur* I*terlocked.Read(ref _po**ter); 137 } 138 139 /// 140 /// Reset po**ter pos*t*o*. 141 /// 142 pr*vate vo*d ReSet*o**ter() 143 { 144 I*terlocked.Excha**e(ref _po**ter,0 ); 145 } 146 147 /// 148 /// *o**ter moves clockw*se. 149 /// 150 pr*vate vo*d R**htMove*o**ter() 151 { 152 try 153 { 154 *f (Get*o**ter() &*t;= ArrayMax -1 ) 155 { 156 ReSet*o**ter(); 157 } 158 else 159 { 160 I*terlocked.I*creme*t(ref _po**ter); 161 } 162 163 var po**ter =Get*o**ter(); 164 var taskCollect*o* =ArraySlot[po**ter]; 165 *f (taskCollect*o* ==*ull || taskCollect*o*.Cou*t ==0 )retur* ; 166 167 *arallel.ForEach(taskCollect*o*, (BaseTask168 { 169 *f (task.Cycle &*t;0 ) 170 { 171 task.SubCycleNumber(); 172 } 173 174 *f (task.Cycle <=0 ) 175 { 176 taskCollect*o*.TryTake(out task); 177 task.TaskAct*o*(task.Data); 178 } 179 }); 180 } 181 catch (Except*o* e) 182 { 183 Co*sole.Wr*teL**e(e); 184 throw ; 185 } 186 } 187 }
1 publ*c class BaseTask: ITask 2 { 3 pr*vate lo** _cycle; 4 pr*vate lo** _*d; 5 pr*vate T _data; 6 7 publ*c Act*o**et; set ; } 8 9 publ*c lo** Cycle 10 { 11 *et {retur* I*terlocked.Read(ref _cycle); } 12 set { I*terlocked.Excha**e(ref _cycle, value); } 13 } 14 15 publ*c lo** Id 16 { 17 *et {retur* _*d; } 18 set { _*d =value; } 19 } 20 21 publ*c T Data 22 { 23 *et {retur* _data; } 24 set { _data =value; } 25 } 26 27 publ*c BaseTask(lo** cycle, Act*o*lo** *d) 28 { 29 Cycle =cycle; 30 TaskAct*o* =act*o*; 31 Data =data; 32 Id =*d; 33 } 34 35 publ*c BaseTask(lo** cycle, Act*o*act*o*,T data) 36 { 37 Cycle =cycle; 38 TaskAct*o* =act*o*; 39 Data =data; 40 } 41 42 publ*c BaseTask(lo** cycle, Act*o*act*o*) 43 { 44 Cycle =cycle; 45 TaskAct*o* =act*o*; 46 } 47 48 publ*c vo*d SubCycleNumber() 49 { 50 I*terlocked.Decreme*t(ref _cycle); 51 } 52 }
1 publ*c stat*c vo*d Start() 2 { 3 // 1.I**t*al*ze queues of d*ffere*t *ra*ular*ty. 4 IR***Queue*ew M**uteQueue (); 5 6 // 2.Ope* thread. 7 var lstTasks =*ew L*st8 { 9 Task.Factory.StartNew(m**uteR***Queue.Start) 10 }; 11 12 // 3.Add tasks performed ** d*ffere*t per*ods. 13 m**uteR***Queue.Add(5 ,*ew Act*o*14 { 15 Co*sole.Wr*teL**e(*ewsObj.News); 16 }),*ew NewsModel() { News =" Trump's v*s*t to Ch**a! " }); 17 18 m**uteR***Queue.Add(10 ,*ew Act*o*19 { 20 Co*sole.Wr*teL**e(*ewsObj.News); 21 }),*ew NewsModel() { News =" *ut** *u's v*s*t to Ch**a! " }); 22 23 m**uteR***Queue.Add(60 ,*ew Act*o*24 { 25 Co*sole.Wr*teL**e(*ewsObj.News); 26 }),*ew NewsModel() { News =" E*se*hower's v*s*t to Ch**a! " }); 27 28 m**uteR***Queue.Add(120 ,*ew Act*o*29 { 30 Co*sole.Wr*teL**e(*ewsObj.News); 31 }),*ew NewsModel() { News =" ** ***p***'s v*s*t to the US! " }); 32 33 // 3.Wa*t*** for all tasks to complete *s usually *ot completed. Because there *s a* **f***te loop. 34 // F5 Ru* the pro*ram a*d see the effect. 35 Task.Wa*tAll(lstTasks.ToArray()); 36 Co*sole.Read(); 37 }
&*bsp;
<*m* src="https://*m*2018.c*blo*s.com/blo*/1214710/201809/1214710-20180923152944919-1577847621.p**" alt="" />