线程队列搭配实际用法
守护线程 Thread一直在死循环 等待从 DelayQueueDelayedTask? 队列里面拿出任务提交给线程池 ExecutorService 执行线程池的任务继承 Runnable的任务把任务放入延时队列 DelayQueueDelayedTask?守护线程 死循环的搬运工一直从队列里 take() 任务线程池 真正干活的工人执行 Runnable.run()业务代码 只负责创建任务丢进队列丢完就走private void updateStudentWordInfoBatch(ListStudentWordMongostudentWordMongoList, Long studentId){DelayQueueManager managerDelayQueueManager.getInstance();UpdateStudentWordBatchTask updateStudentWordBatchTasknew UpdateStudentWordBatchTask(studentId, studentWordMongoList, this);manager.put(updateStudentWordBatchTask,0, TimeUnit.MILLISECONDS, ContentDelayed.UPDATE_STUDENT_WORD_BATCH);}package com.wj.airead.api.sys.delayed;importjava.util.concurrent.*;/** * ClassName DelayQueueManager * Description TOPO 队列处理 * Author yg * Date2021/12/209:57 * Version1.0**/ public class DelayQueueManager{private final static int DEFAULT_THREAD_NUM16;private static final int thread_num4;/** * 单例模式返回队列管理实例 */ private static final DelayQueueManager instancenew DelayQueueManager();// 固定大小线程池 private final ExecutorService executor;// 延时队列 private final DelayQueueDelayedTask?delayQueue;// 守护线程 private Thread daemonThread;privateDelayQueueManager(){executornew ThreadPoolExecutor(thread_num, DEFAULT_THREAD_NUM, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable());delayQueuenew DelayQueue();init();}public static DelayQueueManagergetInstance(){returninstance;}/** * 队列管理类初始化 */ public voidinit(){/*daemonThreadnew Thread(()-{execute();});*/ // 上下2种写法是等价的上面的Lambda写法只在java8及以后的版本中有效 daemonThreadnew Thread(newRunnable(){Override public voidrun(){execute();}});daemonThread.setName(DelayQueueMonitor);daemonThread.start();}private voidexecute(){while(true){try{// 从延时队列中获取任务 DelayedTask?delayedTaskdelayQueue.take();if(delayedTask!null){Runnable taskdelayedTask.getTask();if(nulltask){continue;}// 提交到线程池执行task executor.execute(task);}}catch(Exception e){e.printStackTrace();}}}/** * 添加任务 * * param task 任务实例化对象 * paramtime任务延后时间 * param unit 时间单位 * param rf 任务类型 * author yg * date2020年2月20日 下午8:43:28 */ public void put(Runnable task, long time, TimeUnit unit, String rf){// 获取延时时间 longtimeoutTimeUnit.NANOSECONDS.convert(time, unit);DelayedTask?delayedTasknew DelayedTask(timeout, task);delayQueue.put(delayedTask);}/** * 删除任务 * * param task * return */ public boolean removeTask(DelayedTask?task){returndelayQueue.remove(task);}}package com.wj.airead.api.sys.delayed;importjava.util.concurrent.*;/** * ClassName DelayQueueManager * Description TOPO 队列处理 * Author yg * Date2021/12/209:57 * Version1.0**/ public class DelayQueueManager{private final static int DEFAULT_THREAD_NUM16;private static final int thread_num4;/** * 单例模式返回队列管理实例 */ private static final DelayQueueManager instancenew DelayQueueManager();// 固定大小线程池 private final ExecutorService executor;// 延时队列 private final DelayQueueDelayedTask?delayQueue;// 守护线程 private Thread daemonThread;privateDelayQueueManager(){executornew ThreadPoolExecutor(thread_num, DEFAULT_THREAD_NUM, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable());delayQueuenew DelayQueue();init();}public static DelayQueueManagergetInstance(){returninstance;}/** * 队列管理类初始化 */ public voidinit(){/*daemonThreadnew Thread(()-{execute();});*/ // 上下2种写法是等价的上面的Lambda写法只在java8及以后的版本中有效 daemonThreadnew Thread(newRunnable(){Override public voidrun(){execute();}});daemonThread.setName(DelayQueueMonitor);daemonThread.start();}private voidexecute(){while(true){try{// 从延时队列中获取任务 DelayedTask?delayedTaskdelayQueue.take();if(delayedTask!null){Runnable taskdelayedTask.getTask();if(nulltask){continue;}// 提交到线程池执行task executor.execute(task);}}catch(Exception e){e.printStackTrace();}}}/** * 添加任务 * * param task 任务实例化对象 * paramtime任务延后时间 * param unit 时间单位 * param rf 任务类型 * author yg * date2020年2月20日 下午8:43:28 */ public void put(Runnable task, long time, TimeUnit unit, String rf){// 获取延时时间 longtimeoutTimeUnit.NANOSECONDS.convert(time, unit);DelayedTask?delayedTasknew DelayedTask(timeout, task);delayQueue.put(delayedTask);}/** * 删除任务 * * param task * return */ public boolean removeTask(DelayedTask?task){returndelayQueue.remove(task);}}package com.wj.airead.api.word.task;importcom.wj.airead.api.student.entity.StudentWordMongo;importcom.wj.airead.api.word.service.IStudentWordMongoService;importjava.util.List;/** * ClassName SaveStudentWordBatchTask * Description TOPO 批量添加用户词汇掌握情况 * Author yg * Date2021/12/209:57 * Version1.0**/ public class UpdateStudentWordBatchTask implements Runnable{public ListStudentWordMongostudentWordInfoList;public IStudentWordMongoService studentWordMongoService;public Long studentId;/** * 构造方法 初始化参数 * * param studentWordInfoList * param studentWordMongoService */ public UpdateStudentWordBatchTask(Long studentId, ListStudentWordMongostudentWordInfoList, IStudentWordMongoService studentWordMongoService){this.studentWordInfoListstudentWordInfoList;this.studentWordMongoServicestudentWordMongoService;this.studentIdstudentId;}/** * 执行内容 */ Override public voidrun(){studentWordMongoService.updateMongo(studentWordInfoList, studentId);}}package com.wj.airead.api.sys.delayed;importjava.util.concurrent.Delayed;importjava.util.concurrent.TimeUnit;/** * ClassName DelayedTask * Description TOPO 延迟处理 * Author yg * Date2021/12/209:57 * Version1.0**/ public class DelayedTaskT extends Runnableimplements Delayed{private final longtime;// 任务延迟时间 private final T task;// 任务类 public DelayedTask(long timeout, T task){this.timeSystem.nanoTime()timeout;this.tasktask;}Override public int compareTo(Delayed o){DelayedTask?other(DelayedTask?)o;longdifftime- other.time;if(diff0){return1;}elseif(diff0){return-1;}else{return0;}}Override public long getDelay(TimeUnit unit){returnunit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);}Override public inthashCode(){returntask.hashCode();}/** * 获取任务对象 * * return * author yg * date2020年2月20日 上午11:55:29 */ public TgetTask(){returntask;}}