package com.xxl.job.admin.core.trigger; import com.xxl.job.admin.core.conf.XxlJobAdminConfig; import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobLog; import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum; import com.xxl.job.admin.core.scheduler.XxlJobScheduler; import com.xxl.job.admin.core.util.I18nUtil; import com.xxl.job.core.biz.ExecutorBiz; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.TriggerParam; import com.xxl.job.core.enums.ExecutorBlockStrategyEnum; import com.xxl.job.core.util.IpUtil; import com.xxl.job.core.util.ThrowableUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; /** * xxl-job trigger * Created by xuxueli on 17/7/13. */ public class XxlJobTrigger { private static Logger logger = LoggerFactory.getLogger(XxlJobTrigger.class); /** * trigger job * * @param jobId * @param triggerType * @param failRetryCount >=0: use this param * <0: use param from job info config * @param executorShardingParam * @param executorParam null: use job param * not null: cover job param * @param addressList null: use executor addressList * not null: cover */ public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) { // load data XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId); if (jobInfo == null) { logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId); return; } if (executorParam != null) { jobInfo.setExecutorParam(executorParam); } int finalFailRetryCount = failRetryCount >= 0 ? failRetryCount : jobInfo.getExecutorFailRetryCount(); XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup()); // cover addressList if (addressList != null && addressList.trim().length() > 0) { group.setAddressType(1); group.setAddressList(addressList.trim()); } // sharding param int[] shardingParam = null; if (executorShardingParam != null) { String[] shardingArr = executorShardingParam.split("/"); if (shardingArr.length == 2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) { shardingParam = new int[2]; shardingParam[0] = Integer.valueOf(shardingArr[0]); shardingParam[1] = Integer.valueOf(shardingArr[1]); } } if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) && group.getRegistryList() != null && !group.getRegistryList().isEmpty() && shardingParam == null) { for (int i = 0; i < group.getRegistryList().size(); i++) { processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size()); } } else { if (shardingParam == null) { shardingParam = new int[]{0, 1}; } processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]); } } private static boolean isNumeric(String str) { try { int result = Integer.valueOf(str); return true; } catch (NumberFormatException e) { return false; } } /** * @param group job group, registry list may be empty * @param jobInfo * @param finalFailRetryCount * @param triggerType * @param index sharding index * @param total sharding index */ private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total) { // param ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) ? String.valueOf(index).concat("/").concat(String.valueOf(total)) : null; // 1、save log-id XxlJobLog jobLog = new XxlJobLog(); jobLog.setJobGroup(jobInfo.getJobGroup()); jobLog.setJobId(jobInfo.getId()); jobLog.setTriggerTime(new Date()); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog); logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId()); // 2、init trigger-param TriggerParam triggerParam = new TriggerParam(); triggerParam.setJobId(jobInfo.getId()); triggerParam.setExecutorHandler(jobInfo.getExecutorHandler()); triggerParam.setExecutorParams(jobInfo.getExecutorParam()); triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy()); triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout()); triggerParam.setLogId(jobLog.getId()); triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime()); triggerParam.setGlueType(jobInfo.getGlueType()); triggerParam.setGlueSource(jobInfo.getGlueSource()); triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime()); triggerParam.setBroadcastIndex(index); triggerParam.setBroadcastTotal(total); // 3、init address String address = null; ReturnT routeAddressResult = null; if (group.getRegistryList() != null && !group.getRegistryList().isEmpty()) { if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) { if (index < group.getRegistryList().size()) { address = group.getRegistryList().get(index); } else { address = group.getRegistryList().get(0); } } else { routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList()); if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) { address = routeAddressResult.getContent(); } } } else { routeAddressResult = new ReturnT(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty")); } // 4、trigger remote executor ReturnT triggerResult = null; if (address != null) { triggerResult = runExecutor(triggerParam, address); } else { triggerResult = new ReturnT(ReturnT.FAIL_CODE, null); } // 5、collection trigger info StringBuffer triggerMsgSb = new StringBuffer(); triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle()); triggerMsgSb.append("
").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp()); triggerMsgSb.append("
").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":") .append((group.getAddressType() == 0) ? I18nUtil.getString("jobgroup_field_addressType_0") : I18nUtil.getString("jobgroup_field_addressType_1")); triggerMsgSb.append("
").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList()); triggerMsgSb.append("
").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle()); if (shardingParam != null) { triggerMsgSb.append("(" + shardingParam + ")"); } triggerMsgSb.append("
").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle()); triggerMsgSb.append("
").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout()); triggerMsgSb.append("
").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount); triggerMsgSb.append("

>>>>>>>>>>>" + I18nUtil.getString("jobconf_trigger_run") + "<<<<<<<<<<<
") .append((routeAddressResult != null && routeAddressResult.getMsg() != null) ? routeAddressResult.getMsg() + "

" : "").append(triggerResult.getMsg() != null ? triggerResult.getMsg() : ""); // 6、save log trigger-info jobLog.setExecutorAddress(address); jobLog.setExecutorHandler(jobInfo.getExecutorHandler()); jobLog.setExecutorParam(jobInfo.getExecutorParam()); jobLog.setExecutorShardingParam(shardingParam); jobLog.setExecutorFailRetryCount(finalFailRetryCount); //jobLog.setTriggerTime(); jobLog.setTriggerCode(triggerResult.getCode()); jobLog.setTriggerMsg(triggerMsgSb.toString()); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog); logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId()); } /** * run executor * * @param triggerParam * @param address * @return */ public static ReturnT runExecutor(TriggerParam triggerParam, String address) { ReturnT runResult = null; try { ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address); runResult = executorBiz.run(triggerParam); } catch (Exception e) { logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e); runResult = new ReturnT(ReturnT.FAIL_CODE, ThrowableUtil.toString(e)); } StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":"); runResultSB.append("
address:").append(address); runResultSB.append("
code:").append(runResult.getCode()); runResultSB.append("
msg:").append(runResult.getMsg()); runResult.setMsg(runResultSB.toString()); return runResult; } }