diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/CollectDataDispatch.java b/collector/server/src/main/java/com/usthe/collector/dispatch/CollectDataDispatch.java index a214a40..df08635 100644 --- a/collector/server/src/main/java/com/usthe/collector/dispatch/CollectDataDispatch.java +++ b/collector/server/src/main/java/com/usthe/collector/dispatch/CollectDataDispatch.java @@ -1,7 +1,7 @@ package com.usthe.collector.dispatch; -import com.usthe.collector.dispatch.timer.WheelTimerJob; +import com.usthe.collector.dispatch.timer.Timeout; import com.usthe.common.entity.job.Metrics; import com.usthe.common.entity.message.CollectRep; @@ -14,10 +14,10 @@ public interface CollectDataDispatch { /** * 处理分发采集结果数据 - * @param timerJob 时间轮任务 + * @param timeout 时间轮timeout * @param metrics 下面的指标组采集任务 * @param metricsData 采集结果数据 */ - void dispatchCollectData(WheelTimerJob timerJob, Metrics metrics, CollectRep.MetricsData metricsData); + void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.MetricsData metricsData); } diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/CommonDispatcher.java b/collector/server/src/main/java/com/usthe/collector/dispatch/CommonDispatcher.java index 0fa2c97..9857f83 100644 --- a/collector/server/src/main/java/com/usthe/collector/dispatch/CommonDispatcher.java +++ b/collector/server/src/main/java/com/usthe/collector/dispatch/CommonDispatcher.java @@ -1,8 +1,9 @@ package com.usthe.collector.dispatch; import com.usthe.collector.dispatch.export.KafkaDataExporter; +import com.usthe.collector.dispatch.timer.Timeout; import com.usthe.collector.dispatch.timer.TimerDispatch; -import com.usthe.collector.dispatch.timer.WheelTimerJob; +import com.usthe.collector.dispatch.timer.WheelTimerTask; import com.usthe.common.entity.job.Job; import com.usthe.common.entity.job.Metrics; import com.usthe.common.entity.message.CollectRep; @@ -11,7 +12,6 @@ import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import java.util.EventListener; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -100,13 +100,14 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc MetricsTime metricsTime = entry.getValue(); if (metricsTime.getStartTime() < deadline) { // 指标组采集超时 + WheelTimerTask timerJob = (WheelTimerTask) metricsTime.getTimeout().task(); CollectRep.MetricsData metricsData = CollectRep.MetricsData.newBuilder() - .setId(metricsTime.getTimerJob().getJob().getMonitorId()) - .setApp(metricsTime.getTimerJob().getJob().getApp()) + .setId(timerJob.getJob().getMonitorId()) + .setApp(timerJob.getJob().getApp()) .setMetrics(metricsTime.getMetrics().getName()) .setTime(System.currentTimeMillis()) .setCode(CollectRep.Code.TIMEOUT).setMsg("collect timeout").build(); - dispatchCollectData(metricsTime.getTimerJob(), metricsTime.getMetrics(), metricsData); + dispatchCollectData(metricsTime.timeout, metricsTime.getMetrics(), metricsData); } } Thread.sleep(20000); @@ -118,22 +119,24 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc } @Override - public void dispatchMetricsTask(WheelTimerJob timerJob) { + public void dispatchMetricsTask(Timeout timeout) { // 将单个应用的采集任务根据其下的指标组拆分为对应的指标组采集任务 AbstractCollect // 将每个指标组放入线程池进行调度 - Job job = timerJob.getJob(); + WheelTimerTask timerTask = (WheelTimerTask) timeout.task(); + Job job = timerTask.getJob(); job.constructPriorMetrics(); Set metricsSet = job.getNextCollectMetrics(null, true); metricsSet.forEach(metrics -> { - MetricsCollect metricsCollect = new MetricsCollect(metrics, timerJob, this); + MetricsCollect metricsCollect = new MetricsCollect(metrics, timeout, this); jobRequestQueue.addJob(metricsCollect); metricsTimeoutMonitorMap.put(job.getId() + metrics.getName(), - new MetricsTime(System.currentTimeMillis(), metrics, timerJob)); + new MetricsTime(System.currentTimeMillis(), metrics, timeout)); }); } @Override - public void dispatchCollectData(WheelTimerJob timerJob, Metrics metrics, CollectRep.MetricsData metricsData) { + public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.MetricsData metricsData) { + WheelTimerTask timerJob = (WheelTimerTask) timeout.task(); Job job = timerJob.getJob(); Set metricsSet = job.getNextCollectMetrics(metrics, false); if (job.isCyclic()) { @@ -143,7 +146,10 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc // 此Job所有指标组采集执行完成 // 周期性任务再次将任务push到时间轮 // 先判断此次任务执行时间与任务采集间隔时间 - long spendTime = System.currentTimeMillis() - job.getTimestamp(); + if (timeout.isCancelled()) { + return; + } + long spendTime = System.currentTimeMillis() - job.getDispatchTime(); long interval = job.getInterval() - spendTime / 1000; interval = interval <= 0 ? 0 : interval; // 重置构造执行指标组视图 @@ -152,10 +158,10 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc } else if (!metricsSet.isEmpty()) { // 当前级别指标组执行完成,开始执行下一级别的指标组 metricsSet.forEach(metricItem -> { - MetricsCollect metricsCollect = new MetricsCollect(metricItem, timerJob, this); + MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this); jobRequestQueue.addJob(metricsCollect); metricsTimeoutMonitorMap.put(job.getId() + metrics.getName(), - new MetricsTime(System.currentTimeMillis(), metrics, timerJob)); + new MetricsTime(System.currentTimeMillis(), metrics, timeout)); }); } else { // 当前执行级别的指标组列表未全执行完成, @@ -168,14 +174,14 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc if (metricsSet == null) { // 此Job所有指标组采集执行完成 // 将所有指标组数据组合一起通知结果监听器 - timerDispatch.responseSyncJobData(job.getId(), job.getMetricsDataTemps()); + timerDispatch.responseSyncJobData(job.getId(), job.getResponseDataTemp()); } else if (!metricsSet.isEmpty()) { // 当前级别指标组执行完成,开始执行下一级别的指标组 metricsSet.forEach(metricItem -> { - MetricsCollect metricsCollect = new MetricsCollect(metricItem, timerJob, this); + MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this); jobRequestQueue.addJob(metricsCollect); metricsTimeoutMonitorMap.put(job.getId() + metrics.getName(), - new MetricsTime(System.currentTimeMillis(), metrics, timerJob)); + new MetricsTime(System.currentTimeMillis(), metrics, timeout)); }); } else { // 当前执行级别的指标组列表未全执行完成, @@ -189,6 +195,6 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc private static class MetricsTime { private long startTime; private Metrics metrics; - private WheelTimerJob timerJob; + private Timeout timeout; } } diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java b/collector/server/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java index c1a98d6..6b87b55 100644 --- a/collector/server/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java +++ b/collector/server/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java @@ -5,7 +5,9 @@ import com.googlecode.aviator.AviatorEvaluator; import com.googlecode.aviator.Expression; import com.usthe.collector.collect.AbstractCollect; import com.usthe.collector.collect.http.HttpCollectImpl; -import com.usthe.collector.dispatch.timer.WheelTimerJob; +import com.usthe.collector.dispatch.timer.Timeout; +import com.usthe.collector.dispatch.timer.WheelTimerTask; +import com.usthe.common.entity.job.Job; import com.usthe.common.entity.job.Metrics; import com.usthe.common.entity.message.CollectRep; import lombok.Data; @@ -17,45 +19,75 @@ import java.util.Map; import java.util.stream.Collectors; /** - * parent job + * 指标组采集 * @author tomsun28 * @date 2021/10/10 15:35 */ @Slf4j @Data public class MetricsCollect implements Runnable, Comparable { - + /** + * 监控ID + */ + protected long monitorId; + /** + * 监控类型名称 + */ + protected String app; + /** + * 指标组配置 + */ + protected Metrics metrics; + /** + * 时间轮timeout + */ + protected Timeout timeout; + /** + * 任务和数据调度 + */ + protected CollectDataDispatch collectDataDispatch; + /** + * 任务执行优先级 + */ protected byte runPriority; - + /** + * 是周期性采集还是一次性采集 true-周期性 false-一次性 + */ + protected boolean isCyclic; + /** + * 指标组采集任务新建时间 + */ protected long newTime; - + /** + * 指标组采集任务开始执行时间 + */ protected long startTime; - protected Metrics metrics; - - protected WheelTimerJob timerJob; - - protected CollectDataDispatch collectDataDispatch; - - public MetricsCollect(Metrics metrics, WheelTimerJob timerJob, CollectDataDispatch collectDataDispatch) { + public MetricsCollect(Metrics metrics, Timeout timeout, CollectDataDispatch collectDataDispatch) { this.newTime = System.currentTimeMillis(); - this.timerJob = timerJob; + this.timeout = timeout; this.metrics = metrics; + WheelTimerTask timerJob = (WheelTimerTask) timeout.task(); + Job job = timerJob.getJob(); + this.monitorId = job.getMonitorId(); + this.app = job.getApp(); this.collectDataDispatch = collectDataDispatch; - if (DispatchConstants.AVAILABILITY.equals(metrics.getName())) { - runPriority = (byte) 1; - } else { + this.isCyclic = job.isCyclic(); + // 临时一次性任务执行优先级高 + if (isCyclic) { runPriority = (byte) -1; + } else { + runPriority = (byte) 1; } } @Override public void run() { this.startTime = System.currentTimeMillis(); - setNewThreadName(timerJob, metrics); + setNewThreadName(monitorId, app, startTime, metrics); CollectRep.MetricsData.Builder response = CollectRep.MetricsData.newBuilder(); - response.setApp(timerJob.getJob().getApp()); - response.setId(timerJob.getJob().getId()); + response.setApp(app); + response.setId(monitorId); response.setMetrics(metrics.getName()); // 根据指标组采集协议,应用类型等来调度到真正的应用指标组采集实现类 @@ -69,15 +101,14 @@ public class MetricsCollect implements Runnable, Comparable { } if (abstractCollect == null) { log.error("[Dispatcher] - not support this: app: {}, metrics: {}, protocol: {}.", - timerJob.getJob().getApp(), metrics.getName(), metrics.getProtocol()); + app, metrics.getName(), metrics.getProtocol()); response.setCode(CollectRep.Code.FAIL); - response.setMsg("not support " + timerJob.getJob().getApp() + ", " + response.setMsg("not support " + app + ", " + metrics.getName() + ", " + metrics.getProtocol()); return; } else { try { - abstractCollect.collect(response, timerJob.getJob().getMonitorId(), - timerJob.getJob().getApp(), metrics); + abstractCollect.collect(response, monitorId, app, metrics); } catch (Exception e) { log.error("[Metrics Collect]: {}.", e.getMessage(), e); response.setCode(CollectRep.Code.FAIL); @@ -85,11 +116,15 @@ public class MetricsCollect implements Runnable, Comparable { } } // 别名属性表达式替换计算 + if (fastFailed()) { + return; + } calculateFields(metrics, response); CollectRep.MetricsData metricsData = validateResponse(response); - collectDataDispatch.dispatchCollectData(timerJob, metrics, metricsData); + collectDataDispatch.dispatchCollectData(timeout, metrics, metricsData); } + /** * 根据 calculates 和 aliasFields 配置计算出真正的指标(fields)值 * @param metrics 指标组配置 @@ -142,6 +177,10 @@ public class MetricsCollect implements Runnable, Comparable { } } + private boolean fastFailed() { + return this.timeout == null || this.timeout.isCancelled(); + } + private CollectRep.MetricsData validateResponse(CollectRep.MetricsData.Builder builder) { long endTime = System.currentTimeMillis(); builder.setTime(endTime); @@ -154,10 +193,10 @@ public class MetricsCollect implements Runnable, Comparable { return builder.build(); } - private void setNewThreadName(WheelTimerJob timerJob, Metrics metrics) { - String currentName = timerJob.getJob().getMonitorId() + "-" + timerJob.getJob().getApp() - + "-" + metrics.getName() + "-" + timerJob.getJob().getId(); - Thread.currentThread().setName(currentName); + private void setNewThreadName(long monitorId, String app, long startTime, Metrics metrics) { + String builder = monitorId + "-" + app + "-" + metrics.getName() + + "-" + String.valueOf(startTime).substring(9); + Thread.currentThread().setName(builder); } @Override diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/MetricsTaskDispatch.java b/collector/server/src/main/java/com/usthe/collector/dispatch/MetricsTaskDispatch.java index b8d039f..f7c53d1 100644 --- a/collector/server/src/main/java/com/usthe/collector/dispatch/MetricsTaskDispatch.java +++ b/collector/server/src/main/java/com/usthe/collector/dispatch/MetricsTaskDispatch.java @@ -1,6 +1,6 @@ package com.usthe.collector.dispatch; -import com.usthe.collector.dispatch.timer.WheelTimerJob; +import com.usthe.collector.dispatch.timer.Timeout; /** * 指标组采集任务调度器接口 @@ -11,7 +11,7 @@ public interface MetricsTaskDispatch { /** * 调度 - * @param timerJob timerJob + * @param timeout timeout */ - void dispatchMetricsTask(WheelTimerJob timerJob); + void dispatchMetricsTask(Timeout timeout); } diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/export/KafkaDataExporter.java b/collector/server/src/main/java/com/usthe/collector/dispatch/export/KafkaDataExporter.java index e9f7da0..a47edf2 100644 --- a/collector/server/src/main/java/com/usthe/collector/dispatch/export/KafkaDataExporter.java +++ b/collector/server/src/main/java/com/usthe/collector/dispatch/export/KafkaDataExporter.java @@ -34,7 +34,8 @@ public class KafkaDataExporter { properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getServers()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaMetricsDataSerializer.class); -// kafkaProducer = new KafkaProducer<>(properties); + kafkaProducer = new KafkaProducer<>(properties); + kafkaProducer.send(new ProducerRecord<>(kafkaProperties.getTopic(), CollectRep.MetricsData.newBuilder().setApp("dddd").build())); } catch (Exception e) { log.error(e.getMessage(), e); } diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatch.java b/collector/server/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatch.java index ad613ec..40a022f 100644 --- a/collector/server/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatch.java +++ b/collector/server/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatch.java @@ -18,7 +18,7 @@ public interface TimerDispatch { /** * 增加新的job * @param addJob job - * @param eventListener 一次性同步任务监听器,异步任务不需要 + * @param eventListener 一次性同步任务监听器,异步任务不需要listener */ void addJob(Job addJob, CollectResponseEventListener eventListener); @@ -28,7 +28,7 @@ public interface TimerDispatch { * @param interval 开始调度的间隔时间 * @param timeUnit 时间单位 */ - void cyclicJob(WheelTimerJob timerTask, long interval, TimeUnit timeUnit); + void cyclicJob(WheelTimerTask timerTask, long interval, TimeUnit timeUnit); /** * 删除存在的job diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatcher.java b/collector/server/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatcher.java index 72b2af7..6ef414e 100644 --- a/collector/server/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatcher.java +++ b/collector/server/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatcher.java @@ -5,7 +5,6 @@ import com.usthe.common.entity.job.Job; import com.usthe.common.entity.message.CollectRep; import org.springframework.stereotype.Component; -import java.util.EventListener; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -49,7 +48,7 @@ public class TimerDispatcher implements TimerDispatch { @Override public void addJob(Job addJob, CollectResponseEventListener eventListener) { - WheelTimerJob timerJob = new WheelTimerJob(addJob); + WheelTimerTask timerJob = new WheelTimerTask(addJob); if (addJob.isCyclic()) { Timeout timeout = wheelTimer.newTimeout(timerJob, addJob.getInterval(), TimeUnit.SECONDS); currentCyclicTaskMap.put(addJob.getId(), timeout); @@ -61,7 +60,7 @@ public class TimerDispatcher implements TimerDispatch { } @Override - public void cyclicJob(WheelTimerJob timerTask, long interval, TimeUnit timeUnit) { + public void cyclicJob(WheelTimerTask timerTask, long interval, TimeUnit timeUnit) { Long jobId = timerTask.getJob().getId(); // 判断此周期性job是否已经被取消 if (currentCyclicTaskMap.containsKey(jobId)) { diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/timer/WheelTimerJob.java b/collector/server/src/main/java/com/usthe/collector/dispatch/timer/WheelTimerJob.java deleted file mode 100644 index 972a968..0000000 --- a/collector/server/src/main/java/com/usthe/collector/dispatch/timer/WheelTimerJob.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.usthe.collector.dispatch.timer; - -import com.usthe.collector.dispatch.MetricsTaskDispatch; -import com.usthe.collector.util.SpringContextHolder; -import com.usthe.common.entity.job.Job; - -/** - * TimerTask实现 - * @author tomsun28 - * @date 2021/11/1 17:18 - */ -public class WheelTimerJob implements TimerTask { - - private Job job; - private MetricsTaskDispatch metricsTaskDispatch; - - public WheelTimerJob(Job job) { - this.job = job; - this.metricsTaskDispatch = SpringContextHolder.getBean(MetricsTaskDispatch.class); - } - - @Override - public void run(Timeout timeout) throws Exception { - metricsTaskDispatch.dispatchMetricsTask(this); - } - - public Job getJob() { - return job; - } -} diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/timer/WheelTimerTask.java b/collector/server/src/main/java/com/usthe/collector/dispatch/timer/WheelTimerTask.java new file mode 100644 index 0000000..c61fdfe --- /dev/null +++ b/collector/server/src/main/java/com/usthe/collector/dispatch/timer/WheelTimerTask.java @@ -0,0 +1,106 @@ +package com.usthe.collector.dispatch.timer; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; +import com.usthe.collector.dispatch.MetricsTaskDispatch; +import com.usthe.collector.util.SpringContextHolder; +import com.usthe.common.entity.job.Configmap; +import com.usthe.common.entity.job.Job; +import com.usthe.common.entity.job.Metrics; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * TimerTask实现 + * @author tomsun28 + * @date 2021/11/1 17:18 + */ +public class WheelTimerTask implements TimerTask { + + private Job job; + private MetricsTaskDispatch metricsTaskDispatch; + + public WheelTimerTask(Job job) { + this.metricsTaskDispatch = SpringContextHolder.getBean(MetricsTaskDispatch.class); + this.job = job; + // 初始化job 将监控实际参数值对采集字段进行替换 + initJobMetrics(job); + } + + /** + * 初始化job 将监控实际参数值对采集字段进行替换 + * @param job job + */ + private void initJobMetrics(Job job) { + List config = job.getConfigmap(); + Map configmap = config.stream().collect(Collectors.toMap(Configmap::getKey, item -> item)); + List metrics = job.getMetrics(); + + Gson gson = new Gson(); + List metricsTmp = new ArrayList<>(metrics.size()); + for (Metrics metric : metrics) { + JsonElement jsonElement = gson.toJsonTree(metric); + jsonElement = replaceSpecialValue(jsonElement, configmap); + metric = gson.fromJson(jsonElement, Metrics.class); + metricsTmp.add(metric); + } + job.setMetrics(metricsTmp); + } + + private JsonElement replaceSpecialValue(JsonElement jsonElement, Map configmap) { + if (jsonElement.isJsonObject()) { + JsonObject jsonObject = jsonElement.getAsJsonObject(); + jsonObject.entrySet().forEach(entry -> { + JsonElement element = entry.getValue(); + if (element.isJsonPrimitive()) { + // 判断是否含有特殊字符 替换 + String value = element.getAsString(); + if (value.startsWith("^_^")) { + value = value.replaceAll("\\^_\\^", ""); + Configmap param = configmap.get(value); + value = (String) param.getValue(); + jsonObject.addProperty(entry.getKey(), value); + } + } else { + jsonObject.add(entry.getKey(), replaceSpecialValue(entry.getValue(), configmap)); + } + }); + } else if (jsonElement.isJsonArray()) { + JsonArray jsonArray = jsonElement.getAsJsonArray(); + for (int i = 0; i < jsonArray.size(); i++) { + JsonElement element = jsonArray.get(i); + if (element.isJsonPrimitive()) { + // 判断是否含有特殊字符 替换 + String value = element.getAsString(); + if (value.startsWith("^_^")) { + value = value.replaceAll("\\^_\\^", ""); + Configmap param = configmap.get(value); + value = (String) param.getValue(); + jsonArray.set(i, new JsonPrimitive(value)); + } + } else { + jsonArray.set(i, replaceSpecialValue(element, configmap)); + } + } + } + return jsonElement; + } + + + @Override + public void run(Timeout timeout) throws Exception { + job.setDispatchTime(System.currentTimeMillis()); + metricsTaskDispatch.dispatchMetricsTask(timeout); + } + + public Job getJob() { + return job; + } +} diff --git a/collector/server/src/main/resources/application.yml b/collector/server/src/main/resources/application.yml index e2b0708..1cf6a57 100644 --- a/collector/server/src/main/resources/application.yml +++ b/collector/server/src/main/resources/application.yml @@ -15,5 +15,5 @@ collector: export: kafka: enabled: true - servers: localhost:9092 + servers: 139.198.109.64:9092 topic: async-collect-data diff --git a/collector/server/src/main/resources/logback-spring.xml b/collector/server/src/main/resources/logback-spring.xml index b68b93d..90e6569 100644 --- a/collector/server/src/main/resources/logback-spring.xml +++ b/collector/server/src/main/resources/logback-spring.xml @@ -69,7 +69,7 @@ - + diff --git a/common/src/main/java/com/usthe/common/entity/job/Job.java b/common/src/main/java/com/usthe/common/entity/job/Job.java index a9338a6..e4799d2 100644 --- a/common/src/main/java/com/usthe/common/entity/job/Job.java +++ b/common/src/main/java/com/usthe/common/entity/job/Job.java @@ -66,6 +66,12 @@ public class Job { */ private List configmap; + /** + * collector使用 - 任务被时间轮开始调度的时间戳 + */ + @JsonIgnore + private transient long dispatchTime; + /** * collector使用 - 任务版本,此字段不存储于etcd */ @@ -88,7 +94,7 @@ public class Job { * collector使用 - 临时存储一次性任务指标组响应数据 */ @JsonIgnore - private transient List metricsDataTemps; + private transient List responseDataTemp; /** * collector使用 - 构造初始化指标组执行视图 @@ -100,13 +106,9 @@ public class Job { if (metric.getAliasFields() == null || metric.getAliasFields().isEmpty()) { metric.setAliasFields(metric.getFields().stream().map(Metrics.Field::getField).collect(Collectors.toList())); } - // 设置默认的指标组执行优先级 + // 设置默认的指标组执行优先级,不填则默认最后优先级 if (metric.getPriority() == null) { - if (AVAILABILITY.equals(metric.getName())) { - metric.setPriority((byte)0); - } else { - metric.setPriority(Byte.MAX_VALUE); - } + metric.setPriority(Byte.MAX_VALUE); } }) .collect(Collectors.groupingBy(Metrics::getPriority)); @@ -167,9 +169,9 @@ public class Job { } public void addCollectMetricsData(CollectRep.MetricsData metricsData) { - if (metricsDataTemps == null) { - metricsDataTemps = new LinkedList<>(); + if (responseDataTemp == null) { + responseDataTemp = new LinkedList<>(); } - metricsDataTemps.add(metricsData); + responseDataTemp.add(metricsData); } } diff --git a/common/src/main/java/com/usthe/common/entity/job/Metrics.java b/common/src/main/java/com/usthe/common/entity/job/Metrics.java index 76efa3e..fce17c1 100644 --- a/common/src/main/java/com/usthe/common/entity/job/Metrics.java +++ b/common/src/main/java/com/usthe/common/entity/job/Metrics.java @@ -10,6 +10,7 @@ import lombok.Data; import lombok.NoArgsConstructor; import java.util.List; +import java.util.Objects; /** * 监控采集的指标集合详情 eg: cpu | memory | health @@ -68,6 +69,23 @@ public class Metrics { */ private JdbcProtocol jdbc; + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Metrics metrics = (Metrics) o; + return name.equals(metrics.name); + } + + @Override + public int hashCode() { + return Objects.hash(name); + } + @Data @AllArgsConstructor @NoArgsConstructor diff --git a/manager/src/main/java/com/usthe/manager/pojo/entity/Monitor.java b/manager/src/main/java/com/usthe/manager/pojo/entity/Monitor.java index 56a48d2..a4ec83e 100644 --- a/manager/src/main/java/com/usthe/manager/pojo/entity/Monitor.java +++ b/manager/src/main/java/com/usthe/manager/pojo/entity/Monitor.java @@ -73,7 +73,7 @@ public class Monitor { /** * 监控状态 0:未监控,1:可用,2:不可用,3:不可达,4:挂起 */ - @ApiModelProperty(value = "监控状态 0:未监控,1:可用,2:不可用,3:不可达,4:挂起", example = "1", accessMode = READ_WRITE, position = 6) + @ApiModelProperty(value = "监控状态 0:未监控,1:可用,2:不可用,3:不可达,4:挂起", accessMode = READ_WRITE, position = 6) @Min(0) @Max(4) private byte status; diff --git a/manager/src/main/java/com/usthe/manager/pojo/entity/Param.java b/manager/src/main/java/com/usthe/manager/pojo/entity/Param.java index 20144dc..4d901fa 100644 --- a/manager/src/main/java/com/usthe/manager/pojo/entity/Param.java +++ b/manager/src/main/java/com/usthe/manager/pojo/entity/Param.java @@ -62,7 +62,7 @@ public class Param { /** * 参数类型 0:数字 1:字符串 2:加密串 */ - @ApiModelProperty(value = "参数类型 0:数字 1:字符串 2:加密串", example = "0", accessMode = READ_WRITE, position = 4) + @ApiModelProperty(value = "参数类型 0:数字 1:字符串 2:加密串", accessMode = READ_WRITE, position = 4) @Min(0) @Max(2) private byte type;