xm
2024-06-14 722af26bc6fec32bb289b1df51a9016a4935610f
提交 | 用户 | 时间
722af2 1 package com.xxl.job.admin.core.thread;
X 2
3 import com.xxl.job.admin.core.complete.XxlJobCompleter;
4 import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
5 import com.xxl.job.admin.core.model.XxlJobLog;
6 import com.xxl.job.admin.core.util.I18nUtil;
7 import com.xxl.job.core.biz.model.HandleCallbackParam;
8 import com.xxl.job.core.biz.model.ReturnT;
9 import com.xxl.job.core.util.DateUtil;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12
13 import java.util.Date;
14 import java.util.List;
15 import java.util.concurrent.*;
16
17 /**
18  * job lose-monitor instance
19  *
20  * @author xuxueli 2015-9-1 18:05:56
21  */
22 public class JobCompleteHelper {
23     private static Logger logger = LoggerFactory.getLogger(JobCompleteHelper.class);
24
25     private static JobCompleteHelper instance = new JobCompleteHelper();
26
27     public static JobCompleteHelper getInstance() {
28         return instance;
29     }
30
31     // ---------------------- monitor ----------------------
32
33     private ThreadPoolExecutor callbackThreadPool = null;
34     private Thread monitorThread;
35     private volatile boolean toStop = false;
36
37     public void start() {
38
39         // for callback
40         callbackThreadPool = new ThreadPoolExecutor(
41             2,
42             20,
43             30L,
44             TimeUnit.SECONDS,
45             new LinkedBlockingQueue<Runnable>(3000),
46             new ThreadFactory() {
47                 @Override
48                 public Thread newThread(Runnable r) {
49                     return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode());
50                 }
51             },
52             new RejectedExecutionHandler() {
53                 @Override
54                 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
55                     r.run();
56                     logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now).");
57                 }
58             });
59
60
61         // for monitor
62         monitorThread = new Thread(new Runnable() {
63
64             @Override
65             public void run() {
66
67                 // wait for JobTriggerPoolHelper-init
68                 try {
69                     TimeUnit.MILLISECONDS.sleep(50);
70                 } catch (InterruptedException e) {
71                     if (!toStop) {
72                         logger.error(e.getMessage(), e);
73                     }
74                 }
75
76                 // monitor
77                 while (!toStop) {
78                     try {
79                         // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
80                         Date losedTime = DateUtil.addMinutes(new Date(), -10);
81                         List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
82
83                         if (losedJobIds != null && losedJobIds.size() > 0) {
84                             for (Long logId : losedJobIds) {
85
86                                 XxlJobLog jobLog = new XxlJobLog();
87                                 jobLog.setId(logId);
88
89                                 jobLog.setHandleTime(new Date());
90                                 jobLog.setHandleCode(ReturnT.FAIL_CODE);
91                                 jobLog.setHandleMsg(I18nUtil.getString("joblog_lost_fail"));
92
93                                 XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
94                             }
95
96                         }
97                     } catch (Exception e) {
98                         if (!toStop) {
99                             logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
100                         }
101                     }
102
103                     try {
104                         TimeUnit.SECONDS.sleep(60);
105                     } catch (Exception e) {
106                         if (!toStop) {
107                             logger.error(e.getMessage(), e);
108                         }
109                     }
110
111                 }
112
113                 logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");
114
115             }
116         });
117         monitorThread.setDaemon(true);
118         monitorThread.setName("xxl-job, admin JobLosedMonitorHelper");
119         monitorThread.start();
120     }
121
122     public void toStop() {
123         toStop = true;
124
125         // stop registryOrRemoveThreadPool
126         callbackThreadPool.shutdownNow();
127
128         // stop monitorThread (interrupt and wait)
129         monitorThread.interrupt();
130         try {
131             monitorThread.join();
132         } catch (InterruptedException e) {
133             logger.error(e.getMessage(), e);
134         }
135     }
136
137
138     // ---------------------- helper ----------------------
139
140     public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
141
142         callbackThreadPool.execute(new Runnable() {
143             @Override
144             public void run() {
145                 for (HandleCallbackParam handleCallbackParam : callbackParamList) {
146                     ReturnT<String> callbackResult = callback(handleCallbackParam);
147                     logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}",
148                         (callbackResult.getCode() == ReturnT.SUCCESS_CODE ? "success" : "fail"), handleCallbackParam, callbackResult);
149                 }
150             }
151         });
152
153         return ReturnT.SUCCESS;
154     }
155
156     private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) {
157         // valid log item
158         XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId());
159         if (log == null) {
160             return new ReturnT<String>(ReturnT.FAIL_CODE, "log item not found.");
161         }
162         if (log.getHandleCode() > 0) {
163             return new ReturnT<String>(ReturnT.FAIL_CODE, "log repeate callback.");     // avoid repeat callback, trigger child job etc
164         }
165
166         // handle msg
167         StringBuffer handleMsg = new StringBuffer();
168         if (log.getHandleMsg() != null) {
169             handleMsg.append(log.getHandleMsg()).append("<br>");
170         }
171         if (handleCallbackParam.getHandleMsg() != null) {
172             handleMsg.append(handleCallbackParam.getHandleMsg());
173         }
174
175         // success, save log
176         log.setHandleTime(new Date());
177         log.setHandleCode(handleCallbackParam.getHandleCode());
178         log.setHandleMsg(handleMsg.toString());
179         XxlJobCompleter.updateHandleInfoAndFinish(log);
180
181         return ReturnT.SUCCESS;
182     }
183
184
185 }