From 9347097905841c1a11dc8d0ac0cb7a2b25c8ea13 Mon Sep 17 00:00:00 2001 From: tomsun28 Date: Tue, 1 Feb 2022 16:57:20 +0800 Subject: [PATCH] =?UTF-8?q?[manager]=E5=90=AF=E5=8A=A8=E9=87=87=E9=9B=86?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E5=88=9D=E5=A7=8B=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../entrance/internal/CollectJobService.java | 8 ++- .../com/usthe/manager/dao/MonitorDao.java | 7 ++ .../manager/service/JobSchedulerInit.java | 66 +++++++++++++++++++ .../manager/service/impl/AppServiceImpl.java | 2 + 4 files changed, 80 insertions(+), 3 deletions(-) create mode 100644 manager/src/main/java/com/usthe/manager/service/JobSchedulerInit.java diff --git a/collector/src/main/java/com/usthe/collector/dispatch/entrance/internal/CollectJobService.java b/collector/src/main/java/com/usthe/collector/dispatch/entrance/internal/CollectJobService.java index 22287c3..d847b08 100644 --- a/collector/src/main/java/com/usthe/collector/dispatch/entrance/internal/CollectJobService.java +++ b/collector/src/main/java/com/usthe/collector/dispatch/entrance/internal/CollectJobService.java @@ -57,10 +57,12 @@ public class CollectJobService { * @return long 任务ID */ public long addAsyncCollectJob(Job job) { - long jobId = SnowFlakeIdGenerator.generateId(); - job.setId(jobId); + if (job.getId() == 0L) { + long jobId = SnowFlakeIdGenerator.generateId(); + job.setId(jobId); + } timerDispatch.addJob(job, null); - return jobId; + return job.getId(); } /** diff --git a/manager/src/main/java/com/usthe/manager/dao/MonitorDao.java b/manager/src/main/java/com/usthe/manager/dao/MonitorDao.java index 007ad6a..4f4a28c 100644 --- a/manager/src/main/java/com/usthe/manager/dao/MonitorDao.java +++ b/manager/src/main/java/com/usthe/manager/dao/MonitorDao.java @@ -40,6 +40,13 @@ public interface MonitorDao extends JpaRepository, JpaSpecificati */ List findMonitorsByAppEquals(String app); + /** + * 查询已下发采集任务的监控 + * @param status 监控状态 + * @return 监控列表 + */ + List findMonitorsByStatusNotInAndAndJobIdNotNull(List status); + /** * 根据监控名称查询监控 * @param name 监控名称 diff --git a/manager/src/main/java/com/usthe/manager/service/JobSchedulerInit.java b/manager/src/main/java/com/usthe/manager/service/JobSchedulerInit.java new file mode 100644 index 0000000..dfce57e --- /dev/null +++ b/manager/src/main/java/com/usthe/manager/service/JobSchedulerInit.java @@ -0,0 +1,66 @@ +package com.usthe.manager.service; + +import com.usthe.collector.dispatch.entrance.internal.CollectJobService; +import com.usthe.common.entity.job.Configmap; +import com.usthe.common.entity.job.Job; +import com.usthe.common.entity.manager.Monitor; +import com.usthe.common.entity.manager.Param; +import com.usthe.manager.dao.MonitorDao; +import com.usthe.manager.dao.ParamDao; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Service; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * 采集任务调度初始化 + * @author tom + * @date 2022/2/1 16:24 + */ +@Service +@Order(value = 2) +@Slf4j +public class JobSchedulerInit implements CommandLineRunner { + + @Autowired + private AppService appService; + + @Autowired + private CollectJobService collectJobService; + + @Autowired + private MonitorDao monitorDao; + + @Autowired + private ParamDao paramDao; + + @Override + public void run(String... args) throws Exception { + // 读取数据库已经添加应用 构造采集任务 + List monitors = monitorDao.findMonitorsByStatusNotInAndAndJobIdNotNull(Arrays.asList((byte)0, (byte)4)); + for (Monitor monitor : monitors) { + try { + // 构造采集任务Job实体 + Job appDefine = appService.getAppDefine(monitor.getApp()); + appDefine.setId(monitor.getJobId()); + appDefine.setMonitorId(monitor.getId()); + appDefine.setInterval(monitor.getIntervals()); + appDefine.setCyclic(true); + appDefine.setTimestamp(System.currentTimeMillis()); + List params = paramDao.findParamsByMonitorId(monitor.getId()); + List configmaps = params.stream().map(param -> + new Configmap(param.getField(), param.getValue(), param.getType())).collect(Collectors.toList()); + appDefine.setConfigmap(configmaps); + // 下发采集任务 + collectJobService.addAsyncCollectJob(appDefine); + } catch (Exception e) { + log.error("init monitor job: {} error,continue next monitor", monitor, e); + } + } + } +} diff --git a/manager/src/main/java/com/usthe/manager/service/impl/AppServiceImpl.java b/manager/src/main/java/com/usthe/manager/service/impl/AppServiceImpl.java index 3f82a48..91045a5 100644 --- a/manager/src/main/java/com/usthe/manager/service/impl/AppServiceImpl.java +++ b/manager/src/main/java/com/usthe/manager/service/impl/AppServiceImpl.java @@ -10,6 +10,7 @@ import com.usthe.manager.service.AppService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; +import org.springframework.core.annotation.Order; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.yaml.snakeyaml.Yaml; @@ -32,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap; * @date 2021/11/14 17:17 */ @Service +@Order(value = 1) @Transactional(rollbackFor = Exception.class) @Slf4j public class AppServiceImpl implements AppService, CommandLineRunner {