守护线程 Thread
一直在死循环 等待从 DelayQueue<DelayedTask<?>> 队列里面拿出任务
提交给线程池 ExecutorService 执行线程池的任务
继承 Runnable的任务
把任务放入延时队列 DelayQueue<DelayedTask<?>>
守护线程 = 死循环的搬运工,一直从队列里 take() 任务
线程池 = 真正干活的工人,执行 Runnable.run()
业务代码 = 只负责创建任务丢进队列,丢完就走
private void updateStudentWordInfoBatch(List<StudentWordMongo>studentWordMongoList, Long studentId){DelayQueueManager manager=DelayQueueManager.getInstance();UpdateStudentWordBatchTask updateStudentWordBatchTask=new UpdateStudentWordBatchTask(studentId, studentWordMongoList, this);manager.put(updateStudentWordBatchTask,0, TimeUnit.MILLISECONDS, ContentDelayed.UPDATE_STUDENT_WORD_BATCH);}package com.wj.airead.api.sys.delayed;importjava.util.concurrent.*;/** * @ClassName DelayQueueManager * @Description TOPO 队列处理 * @Author yg * @Date2021/12/209:57 * @Version1.0**/ public class DelayQueueManager{private final static int DEFAULT_THREAD_NUM=16;private static final int thread_num=4;/** * 单例模式,返回队列管理实例 */ private static final DelayQueueManager instance=new DelayQueueManager();// 固定大小线程池 private final ExecutorService executor;// 延时队列 private final DelayQueue<DelayedTask<?>>delayQueue;// 守护线程 private Thread daemonThread;privateDelayQueueManager(){executor=new ThreadPoolExecutor(thread_num, DEFAULT_THREAD_NUM, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());delayQueue=new DelayQueue<>();init();}public static DelayQueueManagergetInstance(){returninstance;}/** * 队列管理类初始化 */ public voidinit(){/*daemonThread=new Thread(()->{execute();});*/ // 上下2种写法是等价的,上面的Lambda写法只在java8及以后的版本中有效 daemonThread=new Thread(newRunnable(){@Override public voidrun(){execute();}});daemonThread.setName("DelayQueueMonitor");daemonThread.start();}private voidexecute(){while(true){try{// 从延时队列中获取任务 DelayedTask<?>delayedTask=delayQueue.take();if(delayedTask!=null){Runnable task=delayedTask.getTask();if(null==task){continue;}// 提交到线程池执行task executor.execute(task);}}catch(Exception e){e.printStackTrace();}}}/** * 添加任务 * * @param task 任务实例化对象 * @paramtime任务延后时间 * @param unit 时间单位 * @param rf 任务类型 * @author yg * @date2020年2月20日 下午8:43:28 */ public void put(Runnable task, long time, TimeUnit unit, String rf){// 获取延时时间 longtimeout=TimeUnit.NANOSECONDS.convert(time, unit);DelayedTask<?>delayedTask=new DelayedTask<>(timeout, task);delayQueue.put(delayedTask);}/** * 删除任务 * * @param task * @return */ public boolean removeTask(DelayedTask<?>task){returndelayQueue.remove(task);}}package com.wj.airead.api.sys.delayed;importjava.util.concurrent.*;/** * @ClassName DelayQueueManager * @Description TOPO 队列处理 * @Author yg * @Date2021/12/209:57 * @Version1.0**/ public class DelayQueueManager{private final static int DEFAULT_THREAD_NUM=16;private static final int thread_num=4;/** * 单例模式,返回队列管理实例 */ private static final DelayQueueManager instance=new DelayQueueManager();// 固定大小线程池 private final ExecutorService executor;// 延时队列 private final DelayQueue<DelayedTask<?>>delayQueue;// 守护线程 private Thread daemonThread;privateDelayQueueManager(){executor=new ThreadPoolExecutor(thread_num, DEFAULT_THREAD_NUM, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());delayQueue=new DelayQueue<>();init();}public static DelayQueueManagergetInstance(){returninstance;}/** * 队列管理类初始化 */ public voidinit(){/*daemonThread=new Thread(()->{execute();});*/ // 上下2种写法是等价的,上面的Lambda写法只在java8及以后的版本中有效 daemonThread=new Thread(newRunnable(){@Override public voidrun(){execute();}});daemonThread.setName("DelayQueueMonitor");daemonThread.start();}private voidexecute(){while(true){try{// 从延时队列中获取任务 DelayedTask<?>delayedTask=delayQueue.take();if(delayedTask!=null){Runnable task=delayedTask.getTask();if(null==task){continue;}// 提交到线程池执行task executor.execute(task);}}catch(Exception e){e.printStackTrace();}}}/** * 添加任务 * * @param task 任务实例化对象 * @paramtime任务延后时间 * @param unit 时间单位 * @param rf 任务类型 * @author yg * @date2020年2月20日 下午8:43:28 */ public void put(Runnable task, long time, TimeUnit unit, String rf){// 获取延时时间 longtimeout=TimeUnit.NANOSECONDS.convert(time, unit);DelayedTask<?>delayedTask=new DelayedTask<>(timeout, task);delayQueue.put(delayedTask);}/** * 删除任务 * * @param task * @return */ public boolean removeTask(DelayedTask<?>task){returndelayQueue.remove(task);}}package com.wj.airead.api.word.task;importcom.wj.airead.api.student.entity.StudentWordMongo;importcom.wj.airead.api.word.service.IStudentWordMongoService;importjava.util.List;/** * @ClassName SaveStudentWordBatchTask * @Description TOPO 批量添加用户词汇掌握情况 * @Author yg * @Date2021/12/209:57 * @Version1.0**/ public class UpdateStudentWordBatchTask implements Runnable{public List<StudentWordMongo>studentWordInfoList;public IStudentWordMongoService studentWordMongoService;public Long studentId;/** * 构造方法 初始化参数 * * @param studentWordInfoList * @param studentWordMongoService */ public UpdateStudentWordBatchTask(Long studentId, List<StudentWordMongo>studentWordInfoList, IStudentWordMongoService studentWordMongoService){this.studentWordInfoList=studentWordInfoList;this.studentWordMongoService=studentWordMongoService;this.studentId=studentId;}/** * 执行内容 */ @Override public voidrun(){studentWordMongoService.updateMongo(studentWordInfoList, studentId);}}package com.wj.airead.api.sys.delayed;importjava.util.concurrent.Delayed;importjava.util.concurrent.TimeUnit;/** * @ClassName DelayedTask * @Description TOPO 延迟处理 * @Author yg * @Date2021/12/209:57 * @Version1.0**/ public class DelayedTask<T extends Runnable>implements Delayed{private final longtime;// 任务延迟时间 private final T task;// 任务类 public DelayedTask(long timeout, T task){this.time=System.nanoTime()+timeout;this.task=task;}@Override public int compareTo(Delayed o){DelayedTask<?>other=(DelayedTask<?>)o;longdiff=time- other.time;if(diff>0){return1;}elseif(diff<0){return-1;}else{return0;}}@Override public long getDelay(TimeUnit unit){returnunit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);}@Override public inthashCode(){returntask.hashCode();}/** * 获取任务对象 * * @return * @author yg * @date2020年2月20日 上午11:55:29 */ public TgetTask(){returntask;}}