关于动态定时任务的解决方案。
之前做定时任务都是用 Scheduled注解来实现,如果需要动态的配置,则不能满足这种需求。查询资料知道这种可以用时间轮算法来实现。大概就是模拟时间表盘来做任务。
具体java实现:
1 package com.education.task.provider; 2 3 import org.slf4j.Logger; 4 import org.slf4j.LoggerFactory; 5 6 import java.util.HashSet; 7 import java.util.Map; 8 import java.util.Set; 9 import java.util.concurrent.ConcurrentHashMap; 10 import java.util.concurrent.ExecutorService; 11 import java.util.concurrent.TimeUnit; 12 import java.util.concurrent.atomic.AtomicBoolean; 13 import java.util.concurrent.atomic.AtomicInteger; 14 import java.util.concurrent.locks.Condition; 15 import java.util.concurrent.locks.Lock; 16 import java.util.concurrent.locks.ReentrantLock; 17 18 /** 19 * @author pengbenlei 20 * @company leenleda 21 * @date 2020/12/21 9:53 22 * @description 时间轮算法调度任务 23 */ 24 public class RingBufferWheel { 25 26 private Logger logger = LoggerFactory.getLogger(RingBufferWheel.class); 27 /** 28 * default ring buffer size 29 */ 30 private static final int STATIC_RING_SIZE = 60; 31 32 private Object[] ringBuffer; 33 34 private int bufferSize; 35 36 /** 37 * business thread pool 38 */ 39 private ExecutorService executorService; 40 41 private volatile int size = 0; 42 43 /*** 44 * task stop sign 45 */ 46 private volatile boolean stop = false; 47 48 /** 49 * task start sign 50 */ 51 private volatile AtomicBoolean start = new AtomicBoolean(false); 52 53 /** 54 * total tick times 55 */ 56 private AtomicInteger tick = new AtomicInteger(); 57 58 private Lock lock = new ReentrantLock(); 59 private Condition condition = lock.newCondition(); 60 61 private AtomicInteger taskId = new AtomicInteger(); 62 private MapRingBufferWheeltaskMap = new ConcurrentHashMap<>(16); 63 64 /** 65 * Create a new delay task ring buffer by default size 66 * 67 * @param executorService the business thread pool 68 */ 69 public RingBufferWheel(ExecutorService executorService) { 70 this.executorService = executorService; 71 this.bufferSize = STATIC_RING_SIZE; 72 this.ringBuffer = new Object[bufferSize]; 73 } 74 75 76 /** 77 * Create a new delay task ring buffer by custom buffer size 78 * 79 * @param executorService the business thread pool 80 * @param bufferSize custom buffer size 81 */ 82 public RingBufferWheel(ExecutorService executorService, int bufferSize) { 83 this(executorService); 84 85 if (!powerOf2(bufferSize)) { 86 throw new RuntimeException("bufferSize=[" + bufferSize + "] must be a power of 2"); 87 } 88 this.bufferSize = bufferSize; 89 this.ringBuffer = new Object[bufferSize]; 90 } 91 92 /** 93 * Add a task into the ring buffer(thread safe) 94 * 95 * @param task business task extends {@link BusinessTask} 96 */ 97 public int addTask(BusinessTask task) { 98 int key = task.getKey(); 99 int id; 100 101 try { 102 lock.lock(); 103 int index = mod(key, bufferSize); 104 task.setIndex(index); 105 Set tasks = get(index); 106 107 int cycleNum = cycleNum(key, bufferSize); 108 if (tasks != null) { 109 task.setCycleNum(cycleNum); 110 tasks.add(task); 111 } else { 112 task.setIndex(index); 113 task.setCycleNum(cycleNum); 114 Set sets = new HashSet<>(); 115 sets.add(task); 116 put(key, sets); 117 } 118 id = taskId.incrementAndGet(); 119 task.setTaskId(id); 120 taskMap.put(id, task); 121 size++; 122 } finally { 123 lock.unlock(); 124 } 125 126 start(); 127 128 return id; 129 } 130 131 132 /** 133 * Cancel task by taskId 134 * 135 * @param id unique id through {@link #addTask(BusinessTask)} 136 * @return 137 */ 138 public boolean cancel(int id) { 139 140 boolean flag = false; 141 Set tempTask = new HashSet<>(); 142 143 try { 144 lock.lock(); 145 BusinessTask task = taskMap.get(id); 146 if (task == null) { 147 return false; 148 } 149 150 Set tasks = get(task.getIndex()); 151 for (BusinessTask tk : tasks) { 152 if (tk.getKey() == task.getKey() && tk.getCycleNum() == task.getCycleNum()) { 153 size--; 154 flag = true; 155 taskMap.remove(id); 156 } else { 157 tempTask.add(tk); 158 } 159 160 } 161 //update origin data 162 ringBuffer[task.getIndex()] = tempTask; 163 } finally { 164 lock.unlock(); 165 } 166 167 return flag; 168 } 169 170 /** 171 * Thread safe 172 * 173 * @return the size of ring buffer 174 */ 175 public int taskSize() { 176 return size; 177 } 178 179 /** 180 * Same with method {@link #taskSize} 181 * 182 * @return 183 */ 184 public int taskMapSize() { 185 return taskMap.size(); 186 } 187 188 /** 189 * Start background thread to consumer wheel timer, it will always run until you call method {@link #stop} 190 */ 191 public void start() { 192 if (!start.get()) { 193 194 if (start.compareAndSet(start.get(), true)) { 195 logger.info("Delay task is starting"); 196 Thread job = new Thread(new TriggerJob()); 197 job.setName("consumer RingBuffer thread"); 198 job.start(); 199 start.set(true); 200 } 201 202 } 203 } 204 205 /** 206 * Stop consumer ring buffer thread 207 * 208 * @param force True will force close consumer thread and discard all pending tasks 209 * otherwise the consumer thread waits for all tasks to completes before closing. 210 */ 211 public void stop(boolean force) { 212 if (force) { 213 logger.info("Delay task is forced stop"); 214 stop = true; 215 executorService.shutdownNow(); 216 } else { 217 logger.info("Delay task is stopping"); 218 if (taskSize() > 0) { 219 try { 220 lock.lock(); 221 condition.await(); 222 stop = true; 223 } catch (InterruptedException e) { 224 logger.error("InterruptedException", e); 225 } finally { 226 lock.unlock(); 227 } 228 } 229 executorService.shutdown(); 230 } 231 232 233 } 234 235 236 private Set get(int index) { 237 return (Set ) ringBuffer[index]; 238 } 239 240 private void put(int key, Set tasks) { 241 int index = mod(key, bufferSize); 242 ringBuffer[index] = tasks; 243 } 244 245 /** 246 * Remove and get task list. 247 * 248 * @param key 249 * @return task list 250 */ 251 private Set remove(int key) { 252 Set tempTask = new HashSet<>(); 253 Set result = new HashSet<>(); 254 255 Set tasks = (Set ) ringBuffer[key]; 256 if (tasks == null) { 257 return result; 258 } 259 260 for (BusinessTask task : tasks) { 261 if (task.getCycleNum() == 0) { 262 result.add(task); 263 264 size2Notify(); 265 } else { 266 // decrement 1 cycle number and update origin data 267 task.setCycleNum(task.getCycleNum() - 1); 268 tempTask.add(task); 269 } 270 // remove task, and free the memory. 271 taskMap.remove(task.getTaskId()); 272 } 273 274 //update origin data 275 ringBuffer[key] = tempTask; 276 277 return result; 278 } 279 280 private void size2Notify() { 281 try { 282 lock.lock(); 283 size--; 284 if (size == 0) { 285 condition.signal(); 286 } 287 } finally { 288 lock.unlock(); 289 } 290 } 291 292 private boolean powerOf2(int target) { 293 if (target < 0) { 294 return false; 295 } 296 int value = target & (target - 1); 297 if (value != 0) { 298 return false; 299 } 300 301 return true; 302 } 303 304 private int mod(int target, int mod) { 305 // equals target % mod 306 target = target + tick.get(); 307 return target & (mod - 1); 308 } 309 310 private int cycleNum(int target, int mod) { 311 //equals target/mod 312 return target >> Integer.bitCount(mod - 1); 313 } 314 315 316 317 private class TriggerJob implements Runnable { 318 319 @Override 320 public void run() { 321 int index = 0; 322 while (!stop) { 323 try { 324 System.out.println(index); 325 Set tasks = remove(index); 326 for (BusinessTask task : tasks) { 327 executorService.submit(task); 328 } 329 330 if (++index > bufferSize - 1) { 331 index = 0; 332 } 333 334 //Total tick number of records 335 tick.incrementAndGet(); 336 TimeUnit.SECONDS.sleep(1); 337 338 } catch (Exception e) { 339 logger.error("Exception", e); 340 } 341 342 } 343 344 logger.info("Delay task has stopped"); 345 } 346 } 347 }
1 package com.education.task.provider; 2 3 import lombok.Getter; 4 import lombok.Setter; 5 6 /** 7 * @author pengbenlei 8 * @company leenleda 9 * @date 2020/12/21 10:49 10 * @description 11 */ 12 @Getter 13 @Setter 14 public abstract class BusinessTask extends Thread { 15 16 /** 17 * 所在位置 18 */ 19 private int index; 20 21 /** 22 * cycleNum 轮盘上的圈数 23 */ 24 private int cycleNum; 25 26 /** 27 * 轮盘的刻度 28 */ 29 private int key; 30 31 /** 32 * The unique ID of the task 33 */ 34 private int taskId; 35 36 @Override 37 public void run() { 38 } 39 40 }BusinessTask
测试调用:
1 public static class Job extends BusinessTask{ 2 @Override 3 public void run() { 4 System.out.println("12346"); 5 } 6 }构造任务
1 SpringApplication.run(TaskProviderApplication.class, args); 2 RingBufferWheel ringBufferWheel = new RingBufferWheel( Executors.newFixedThreadPool(2)); 3 for (int i = 0; i < 2; i++) { 4 BusinessTask job = new Job(); 5 job.setKey(10); 6 job.setCycleNum(i); 7 ringBufferWheel.addTask(job); 8 }调用代码
后续,后来听朋友说有个开源项目 xxl-job : https://github.com/xuxueli/xxl-job,真香。功能比较全。