package com.xxl.job.admin.core.thread; import com.xxl.job.admin.core.complete.XxlJobCompleter; import com.xxl.job.admin.core.conf.XxlJobAdminConfig; import com.xxl.job.admin.core.model.XxlJobLog; import com.xxl.job.admin.core.util.I18nUtil; import com.xxl.job.core.biz.model.HandleCallbackParam; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.util.DateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; import java.util.List; import java.util.concurrent.*; /** * job lose-monitor instance * * @author xuxueli 2015-9-1 18:05:56 */ public class JobCompleteHelper { private static Logger logger = LoggerFactory.getLogger(JobCompleteHelper.class); private static JobCompleteHelper instance = new JobCompleteHelper(); public static JobCompleteHelper getInstance() { return instance; } // ---------------------- monitor ---------------------- private ThreadPoolExecutor callbackThreadPool = null; private Thread monitorThread; private volatile boolean toStop = false; public void start() { // for callback callbackThreadPool = new ThreadPoolExecutor( 2, 20, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue(3000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode()); } }, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { r.run(); logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now)."); } }); // for monitor monitorThread = new Thread(new Runnable() { @Override public void run() { // wait for JobTriggerPoolHelper-init try { TimeUnit.MILLISECONDS.sleep(50); } catch (InterruptedException e) { if (!toStop) { logger.error(e.getMessage(), e); } } // monitor while (!toStop) { try { // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败; Date losedTime = DateUtil.addMinutes(new Date(), -10); List losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime); if (losedJobIds != null && losedJobIds.size() > 0) { for (Long logId : losedJobIds) { XxlJobLog jobLog = new XxlJobLog(); jobLog.setId(logId); jobLog.setHandleTime(new Date()); jobLog.setHandleCode(ReturnT.FAIL_CODE); jobLog.setHandleMsg(I18nUtil.getString("joblog_lost_fail")); XxlJobCompleter.updateHandleInfoAndFinish(jobLog); } } } catch (Exception e) { if (!toStop) { logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e); } } try { TimeUnit.SECONDS.sleep(60); } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } } logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop"); } }); monitorThread.setDaemon(true); monitorThread.setName("xxl-job, admin JobLosedMonitorHelper"); monitorThread.start(); } public void toStop() { toStop = true; // stop registryOrRemoveThreadPool callbackThreadPool.shutdownNow(); // stop monitorThread (interrupt and wait) monitorThread.interrupt(); try { monitorThread.join(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } // ---------------------- helper ---------------------- public ReturnT callback(List callbackParamList) { callbackThreadPool.execute(new Runnable() { @Override public void run() { for (HandleCallbackParam handleCallbackParam : callbackParamList) { ReturnT callbackResult = callback(handleCallbackParam); logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}", (callbackResult.getCode() == ReturnT.SUCCESS_CODE ? "success" : "fail"), handleCallbackParam, callbackResult); } } }); return ReturnT.SUCCESS; } private ReturnT callback(HandleCallbackParam handleCallbackParam) { // valid log item XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId()); if (log == null) { return new ReturnT(ReturnT.FAIL_CODE, "log item not found."); } if (log.getHandleCode() > 0) { return new ReturnT(ReturnT.FAIL_CODE, "log repeate callback."); // avoid repeat callback, trigger child job etc } // handle msg StringBuffer handleMsg = new StringBuffer(); if (log.getHandleMsg() != null) { handleMsg.append(log.getHandleMsg()).append("
"); } if (handleCallbackParam.getHandleMsg() != null) { handleMsg.append(handleCallbackParam.getHandleMsg()); } // success, save log log.setHandleTime(new Date()); log.setHandleCode(handleCallbackParam.getHandleCode()); log.setHandleMsg(handleMsg.toString()); XxlJobCompleter.updateHandleInfoAndFinish(log); return ReturnT.SUCCESS; } }