diff --git a/Architecture.jpg b/Architecture.jpg deleted file mode 100644 index f7e60a4..0000000 Binary files a/Architecture.jpg and /dev/null differ diff --git a/README.md b/README.md index 5403097..8e44e38 100644 --- a/README.md +++ b/README.md @@ -4,16 +4,12 @@ ### 模块 - **[manager](manager)** 提供监控管理,系统管理基础服务 -> 开发中,提供对监控的管理,监控应用配置的管理,系统用户租户后台管理等。 +> 提供对监控的管理,监控应用配置的管理,系统用户租户后台管理等。 - **[collector](collector)** 提供监控数据采集服务 -> 开发中,使用通用协议远程采集获取对端指标数据。 +> 使用通用协议远程采集获取对端指标数据。 - **[scheduler](scheduler)** 提供监控任务调度服务 -> 开发完成,采集任务管理,一次性任务和周期性任务的调度分发。 +> 采集任务管理,一次性任务和周期性任务的调度分发。 - **[warehouse](warehouse)** 提供监控数据仓储服务 -> 开发中,采集指标结果数据管理,数据落盘,查询,计算统计。 +> 采集指标结果数据管理,数据落盘,查询,计算统计。 - **[alerter](alerter)** 提供告警服务 -> 开发中,告警计算触发,监控状态联动,告警配置,告警通知。 - -### 结构 - -![arch](Architecture.jpg) \ No newline at end of file +> 告警计算触发,监控状态联动,告警配置,告警通知。 diff --git a/alerter/pom.xml b/alerter/pom.xml index f216a80..84eee54 100644 --- a/alerter/pom.xml +++ b/alerter/pom.xml @@ -5,7 +5,7 @@ monitor com.usthe.tancloud - 1.0-SNAPSHOT + 1.0 4.0.0 @@ -20,7 +20,14 @@ com.usthe.tancloud common - 1.0-SNAPSHOT + 1.0 + + + + com.usthe.tancloud + collector + 1.0 + provided diff --git a/alerter/src/main/java/com/usthe/alert/AlerterDataQueue.java b/alerter/src/main/java/com/usthe/alert/AlerterDataQueue.java index 6d92427..562c17a 100644 --- a/alerter/src/main/java/com/usthe/alert/AlerterDataQueue.java +++ b/alerter/src/main/java/com/usthe/alert/AlerterDataQueue.java @@ -1,7 +1,6 @@ package com.usthe.alert; import com.usthe.alert.pojo.entity.Alert; -import com.usthe.common.entity.message.CollectRep; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -17,22 +16,12 @@ import java.util.concurrent.TimeUnit; @Slf4j public class AlerterDataQueue { - private final LinkedBlockingQueue metricsDataQueue; private final LinkedBlockingQueue alertDataQueue; public AlerterDataQueue() { - metricsDataQueue = new LinkedBlockingQueue<>(); alertDataQueue = new LinkedBlockingQueue<>(); } - public void addMetricsData(CollectRep.MetricsData metricsData) { - metricsDataQueue.offer(metricsData); - } - - public CollectRep.MetricsData pollMetricsData() throws InterruptedException { - return metricsDataQueue.poll(2, TimeUnit.SECONDS); - } - public void addAlertData(Alert alert) { alertDataQueue.offer(alert); } diff --git a/alerter/src/main/java/com/usthe/alert/calculate/CalculateAlarm.java b/alerter/src/main/java/com/usthe/alert/calculate/CalculateAlarm.java index 357f106..1a2cb3a 100644 --- a/alerter/src/main/java/com/usthe/alert/calculate/CalculateAlarm.java +++ b/alerter/src/main/java/com/usthe/alert/calculate/CalculateAlarm.java @@ -4,16 +4,15 @@ import com.googlecode.aviator.AviatorEvaluator; import com.googlecode.aviator.Expression; import com.usthe.alert.AlerterWorkerPool; import com.usthe.alert.AlerterDataQueue; -import com.usthe.alert.entrance.KafkaDataConsume; import com.usthe.alert.pojo.entity.Alert; import com.usthe.alert.pojo.entity.AlertDefine; import com.usthe.alert.service.AlertDefineService; import com.usthe.alert.util.AlertTemplateUtil; +import com.usthe.collector.dispatch.export.MetricsDataExporter; import com.usthe.common.entity.message.CollectRep; import com.usthe.common.util.CommonConstants; import com.usthe.common.util.CommonUtil; import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.context.annotation.Configuration; import java.util.HashMap; @@ -27,20 +26,21 @@ import java.util.concurrent.ConcurrentHashMap; * @date 2021/12/9 14:19 */ @Configuration -@AutoConfigureAfter(value = {KafkaDataConsume.class}) @Slf4j public class CalculateAlarm { private AlerterWorkerPool workerPool; private AlerterDataQueue dataQueue; + private MetricsDataExporter dataExporter; private AlertDefineService alertDefineService; private Map triggeredAlertMap; private Map triggeredMonitorStateAlertMap; public CalculateAlarm (AlerterWorkerPool workerPool, AlerterDataQueue dataQueue, - AlertDefineService alertDefineService) { + AlertDefineService alertDefineService, MetricsDataExporter dataExporter) { this.workerPool = workerPool; this.dataQueue = dataQueue; + this.dataExporter = dataExporter; this.alertDefineService = alertDefineService; this.triggeredAlertMap = new ConcurrentHashMap<>(128); this.triggeredMonitorStateAlertMap = new ConcurrentHashMap<>(128); @@ -51,7 +51,7 @@ public class CalculateAlarm { Runnable runnable = () -> { while (!Thread.currentThread().isInterrupted()) { try { - CollectRep.MetricsData metricsData = dataQueue.pollMetricsData(); + CollectRep.MetricsData metricsData = dataExporter.pollAlertMetricsData(); if (metricsData != null) { calculate(metricsData); } diff --git a/alerter/src/main/java/com/usthe/alert/entrance/KafkaDataConsume.java b/alerter/src/main/java/com/usthe/alert/entrance/KafkaDataConsume.java deleted file mode 100644 index ef16143..0000000 --- a/alerter/src/main/java/com/usthe/alert/entrance/KafkaDataConsume.java +++ /dev/null @@ -1,80 +0,0 @@ -package com.usthe.alert.entrance; - -import com.usthe.alert.AlerterProperties; -import com.usthe.alert.AlerterWorkerPool; -import com.usthe.alert.AlerterDataQueue; -import com.usthe.common.entity.message.CollectRep; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.springframework.beans.factory.DisposableBean; -import org.springframework.boot.autoconfigure.AutoConfigureAfter; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.context.annotation.Configuration; - -import java.time.Duration; -import java.util.Collections; -import java.util.Properties; - -/** - * 从Kafka消费指标组采集数据处理 - * @author tom - * @date 2021/11/24 18:03 - */ -@Configuration -@AutoConfigureAfter(value = {AlerterProperties.class}) -@ConditionalOnProperty(prefix = "alerter.entrance.kafka", - name = "enabled", havingValue = "true", matchIfMissing = true) -@Slf4j -public class KafkaDataConsume implements DisposableBean { - - private KafkaConsumer consumer; - private AlerterWorkerPool workerPool; - private AlerterDataQueue dataQueue; - public KafkaDataConsume(AlerterProperties properties, AlerterWorkerPool workerPool, - AlerterDataQueue dataQueue) { - this.workerPool = workerPool; - this.dataQueue = dataQueue; - initConsumer(properties); - startConsumeData(); - } - - private void startConsumeData() { - Runnable runnable = () -> { - Thread.currentThread().setName("warehouse-kafka-data-consumer"); - while (!Thread.currentThread().isInterrupted()) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - records.forEach(record -> { - dataQueue.addMetricsData(record.value()); - }); - } - }; - workerPool.executeJob(runnable); - } - - private void initConsumer(AlerterProperties properties) { - if (properties == null || properties.getEntrance() == null || properties.getEntrance().getKafka() == null) { - log.error("init error, please config Warehouse kafka props in application.yml"); - throw new IllegalArgumentException("please config Warehouse kafka props"); - } - AlerterProperties.EntranceProperties.KafkaProperties kafkaProp = properties.getEntrance().getKafka(); - Properties consumerProp = new Properties(); - consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProp.getServers()); - consumerProp.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProp.getGroupId()); - consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); - consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaMetricsDataDeserializer.class); - consumerProp.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); - consumerProp.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); - consumer = new KafkaConsumer<>(consumerProp); - consumer.subscribe(Collections.singleton(kafkaProp.getTopic())); - } - - @Override - public void destroy() throws Exception { - if (consumer != null) { - consumer.close(); - } - } -} diff --git a/alerter/src/main/java/com/usthe/alert/entrance/KafkaMetricsDataDeserializer.java b/alerter/src/main/java/com/usthe/alert/entrance/KafkaMetricsDataDeserializer.java deleted file mode 100644 index c0ab7d6..0000000 --- a/alerter/src/main/java/com/usthe/alert/entrance/KafkaMetricsDataDeserializer.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.usthe.alert.entrance; - -import com.usthe.common.entity.message.CollectRep; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.serialization.Deserializer; - -/** - * MetricsData的反序列化 - * @author tom - * @date 2021/11/24 17:29 - */ -@Slf4j -public class KafkaMetricsDataDeserializer implements Deserializer { - - @Override - public CollectRep.MetricsData deserialize(String topicName, byte[] bytes) { - try { - return CollectRep.MetricsData.parseFrom(bytes); - } catch (Exception e) { - log.error(e.getMessage(), e); - } - return null; - } -} diff --git a/alerter/src/main/resources/META-INF/spring.factories b/alerter/src/main/resources/META-INF/spring.factories index 30adc51..56463db 100644 --- a/alerter/src/main/resources/META-INF/spring.factories +++ b/alerter/src/main/resources/META-INF/spring.factories @@ -6,7 +6,6 @@ com.usthe.alert.AlerterWorkerPool,\ com.usthe.alert.AlerterProperties,\ com.usthe.alert.AlerterDataQueue,\ com.usthe.alert.AlerterConfiguration,\ -com.usthe.alert.entrance.KafkaDataConsume,\ com.usthe.alert.calculate.CalculateAlarm,\ com.usthe.alert.controller.AlertsController,\ com.usthe.alert.controller.AlertDefinesController \ No newline at end of file diff --git a/assembly/collector/assembly.xml b/assembly/collector/assembly.xml deleted file mode 100644 index 5978e38..0000000 --- a/assembly/collector/assembly.xml +++ /dev/null @@ -1,44 +0,0 @@ - - - 1.0 - - - tar.gz - zip - - - - - - - ../../assembly/collector/bin - - true - bin - - - - - src/main/resources - - application.yml - logback-spring.xml - - - true - ${file.separator}config - - - - - target - / - - *executable.jar - - - - diff --git a/assembly/collector/bin/startup.sh b/assembly/collector/bin/startup.sh deleted file mode 100644 index f81450d..0000000 --- a/assembly/collector/bin/startup.sh +++ /dev/null @@ -1,109 +0,0 @@ -#!/bin/bash - -# 项目名称 -SERVER_NAME="${project.artifactId}" - -# jar名称 -JAR_NAME="${project.build.finalName}-executable.jar" - -# 进入bin目录 -cd `dirname $0` -# bin目录绝对路径 -BIN_DIR=`pwd` -# 返回到上一级项目根目录路径 -cd .. -# 打印项目根目录绝对路径 -# `pwd` 执行系统命令并获得结果 -DEPLOY_DIR=`pwd` - -# 外部配置文件绝对目录,如果是目录需要/结尾,也可以直接指定文件 -# 如果指定的是目录,spring则会读取目录中的所有配置文件 -CONF_DIR=$DEPLOY_DIR/config -# SERVER_PORT=`sed '/server.port/!d;s/.*=//' config/application.properties | tr -d '\r'` -# 获取应用的端口号 -SERVER_PORT=`sed -nr '/port: [0-9]+/ s/.*port: +([0-9]+).*/\1/p' config/application.yml` - -PIDS=`ps -f | grep java | grep "$CONF_DIR" |awk '{print $2}'` -if [ "$1" = "status" ]; then - if [ -n "$PIDS" ]; then - echo "The $SERVER_NAME is running...!" - echo "PID: $PIDS" - exit 0 - else - echo "The $SERVER_NAME is stopped" - exit 0 - fi -fi - -if [ -n "$PIDS" ]; then - echo "ERROR: The $SERVER_NAME already started!" - echo "PID: $PIDS" - exit 1 -fi - -if [ -n "$SERVER_PORT" ]; then - SERVER_PORT_COUNT=`netstat -tln | grep $SERVER_PORT | wc -l` - if [ $SERVER_PORT_COUNT -gt 0 ]; then - echo "ERROR: The $SERVER_NAME port $SERVER_PORT already used!" - exit 1 - fi -fi - -# 项目日志输出绝对路径 -LOGS_DIR=$DEPLOY_DIR/logs -# 如果logs文件夹不存在,则创建文件夹 -if [ ! -d $LOGS_DIR ]; then - mkdir $LOGS_DIR -fi -STDOUT_FILE=$LOGS_DIR/catalina.log - -# JVM Configuration -JAVA_OPTS=" -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true " -JAVA_DEBUG_OPTS="" -if [ "$1" = "debug" ]; then - JAVA_DEBUG_OPTS=" -Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=n " -fi - -JAVA_JMX_OPTS="" -if [ "$1" = "jmx" ]; then - JAVA_JMX_OPTS=" -Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false " -fi - -JAVA_MEM_OPTS="" -BITS=`java -version 2>&1 | grep -i 64-bit` -if [ -n "$BITS" ]; then - JAVA_MEM_OPTS=" -server -Xmx512m -Xms512m -Xmn256m -XX:PermSize=128m -Xss256k -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:LargePageSizeInBytes=128m -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 " -else - JAVA_MEM_OPTS=" -server -Xms512m -Xmx512m -XX:PermSize=128m -XX:SurvivorRatio=2 -XX:+UseParallelGC " -fi - -# 加载外部log4j2文件的配置 -LOG_IMPL_FILE=log4j2.xml -LOGGING_CONFIG="" -if [ -f "$CONF_DIR/$LOG_IMPL_FILE" ] -then - LOGGING_CONFIG="-Dlogging.config=$CONF_DIR/$LOG_IMPL_FILE" -fi -CONFIG_FILES=" -Dlogging.path=$LOGS_DIR $LOGGING_CONFIG -Dspring.config.location=$CONF_DIR/ " -echo -e "Starting the $SERVER_NAME ..." -nohup java $JAVA_OPTS $JAVA_MEM_OPTS $JAVA_DEBUG_OPTS $JAVA_JMX_OPTS $CONFIG_FILES -jar $DEPLOY_DIR/lib/$JAR_NAME > $STDOUT_FILE 2>&1 & - -COUNT=0 -while [ $COUNT -lt 1 ]; do - echo -e ".\c" - sleep 1 - if [ -n "$SERVER_PORT" ]; then - COUNT=`netstat -an | grep $SERVER_PORT | wc -l` - else - COUNT=`ps -f | grep java | grep "$DEPLOY_DIR" | awk '{print $2}' | wc -l` - fi - if [ $COUNT -gt 0 ]; then - break - fi -done - - -echo "OK!" -PIDS=`ps -f | grep java | grep "$DEPLOY_DIR" | awk '{print $2}'` -echo "PID: $PIDS" -echo "STDOUT: $STDOUT_FILE" \ No newline at end of file diff --git a/assembly/server/bin/shutdown.sh b/assembly/server/bin/shutdown.sh deleted file mode 100644 index 28a5e2e..0000000 --- a/assembly/server/bin/shutdown.sh +++ /dev/null @@ -1,18 +0,0 @@ -#!/bin/bash - -# 项目名称 -APPLICATION="${project.artifactId}" - -# 项目启动jar包名称 -APPLICATION_JAR="${project.build.finalName}.jar" - -# 通过项目名称查找到PI,然后kill -9 pid -PID=$(ps -ef | grep "${APPLICATION_JAR}" | grep -v grep | awk '{ print $2 }') -if [[ -z "$PID" ]] -then - echo ${APPLICATION} is already stopped -else - echo kill ${PID} - kill -9 ${PID} - echo ${APPLICATION} stopped successfully -fi \ No newline at end of file diff --git a/collector/README.md b/collector/README.md index 5d35f75..304c58c 100644 --- a/collector/README.md +++ b/collector/README.md @@ -31,18 +31,6 @@ * Ping * 服务端口 -#### HELP - -1. ARK插件类隔离未生效 -> 注意需构建在jdk1.8环境中运行 -> 插件是否配置导入并配置正确 -> 本地DEBUG时需单独IDEA打开运行collector工程,不能将plugin和collector在同一工程打开运行 - -2. metaspace元空间内存占用多或溢出 -> 建议调整JVM参数 ```-Dsun.reflect.inflationThreshold=100000``` -> 由于使用太多反射,超过参数`inflationThreshold`默认值15阈值导致触发JVM反射优化(加快反射速度), -> 反射获取类信息由使用*JNI存取器**膨胀(Inflation)* -> 为*反射每个方法生成一个类加载器DelegatingClassLoader和Java类MethodAccessor*. -> 动态加载的字节码导致PermGen持续增长. +#### HELP diff --git a/collector/plugins/pom.xml b/collector/plugins/pom.xml deleted file mode 100644 index 14c317b..0000000 --- a/collector/plugins/pom.xml +++ /dev/null @@ -1,19 +0,0 @@ - - - - collector - com.usthe.tancloud - 1.0-SNAPSHOT - - 4.0.0 - - plugins - pom - - sample-plugin - - - - \ No newline at end of file diff --git a/collector/plugins/sample-plugin/pom.xml b/collector/plugins/sample-plugin/pom.xml deleted file mode 100644 index 6ce90c0..0000000 --- a/collector/plugins/sample-plugin/pom.xml +++ /dev/null @@ -1,48 +0,0 @@ - - - - plugins - com.usthe.tancloud - 1.0-SNAPSHOT - - 4.0.0 - - sample-plugin - - - - - - com.alipay.sofa - sofa-ark-plugin-maven-plugin - 1.1.6 - - - default-cli - - ark-plugin - - - - - 2000 - - - - - - com.com.usthe.plugin.sample.ExportDemo - - - - - - - - - - - - \ No newline at end of file diff --git a/collector/plugins/sample-plugin/src/main/java/com/usthe/collector/plugin/SameClass.java b/collector/plugins/sample-plugin/src/main/java/com/usthe/collector/plugin/SameClass.java deleted file mode 100644 index b6bbe94..0000000 --- a/collector/plugins/sample-plugin/src/main/java/com/usthe/collector/plugin/SameClass.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.usthe.collector.plugin; - -/** - * @author tomsun28 - * @date 2021/10/8 15:12 - */ -public class SameClass { - - public static String hello() { - return "hello plugin"; - } -} diff --git a/collector/plugins/sample-plugin/src/main/java/com/usthe/plugin/sample/ExportDemo.java b/collector/plugins/sample-plugin/src/main/java/com/usthe/plugin/sample/ExportDemo.java deleted file mode 100644 index c3e163d..0000000 --- a/collector/plugins/sample-plugin/src/main/java/com/usthe/plugin/sample/ExportDemo.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.usthe.plugin.sample; - -import com.usthe.collector.plugin.SameClass; - -/** - * @author tomsun28 - * @date 2021/10/8 15:11 - */ -public class ExportDemo { - - public String hello() { - return SameClass.hello(); - } -} diff --git a/collector/pom.xml b/collector/pom.xml index e30de1e..a22b4a6 100644 --- a/collector/pom.xml +++ b/collector/pom.xml @@ -5,14 +5,85 @@ monitor com.usthe.tancloud - 1.0-SNAPSHOT + 1.0 4.0.0 collector - pom - - server - plugins - + + + + + org.springframework.boot + spring-boot-starter-web + provided + + + org.springframework.boot + spring-boot-configuration-processor + true + + + org.springframework.boot + spring-boot-autoconfigure + + + + com.usthe.tancloud + common + 1.0-SNAPSHOT + + + + io.etcd + jetcd-core + 0.5.11 + + + + org.apache.kafka + kafka-clients + 3.0.0 + + + + org.apache.httpcomponents + httpclient + 4.5.13 + + + + commons-net + commons-net + 3.8.0 + + + + com.jayway.jsonpath + json-path + 2.6.0 + + + + com.googlecode.concurrentlinkedhashmap + concurrentlinkedhashmap-lru + 1.4.2 + + + com.google.guava + guava + 31.0.1-jre + + + com.google.code.gson + gson + 2.8.8 + + + com.googlecode.aviator + aviator + 5.2.7 + + + \ No newline at end of file diff --git a/collector/src/main/java/com/usthe/collector/collect/AbstractCollect.java b/collector/src/main/java/com/usthe/collector/collect/AbstractCollect.java new file mode 100644 index 0000000..58eba4a --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/AbstractCollect.java @@ -0,0 +1,23 @@ +package com.usthe.collector.collect; + + +import com.usthe.common.entity.job.Metrics; +import com.usthe.common.entity.message.CollectRep; + +/** + * 具体的指标组采集实现抽象类 + * @author tomsun28 + * @date 2021/11/4 9:35 + */ +public abstract class AbstractCollect { + + /** + * 真正的采集实现接口 + * @param builder response builder + * @param appId 应用监控ID + * @param app 应用类型 + * @param metrics 指标组配置 + * return response builder + */ + public abstract void collect(CollectRep.MetricsData.Builder builder, long appId, String app, Metrics metrics); +} diff --git a/collector/src/main/java/com/usthe/collector/collect/http/HttpCollectImpl.java b/collector/src/main/java/com/usthe/collector/collect/http/HttpCollectImpl.java new file mode 100644 index 0000000..8d40f6b --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/http/HttpCollectImpl.java @@ -0,0 +1,343 @@ +package com.usthe.collector.collect.http; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.usthe.collector.collect.AbstractCollect; +import com.usthe.collector.common.http.HttpClientPool; +import com.usthe.collector.dispatch.DispatchConstants; +import com.usthe.collector.util.CollectorConstants; +import com.usthe.collector.util.JsonPathParser; +import com.usthe.common.entity.job.Metrics; +import com.usthe.common.entity.job.protocol.HttpProtocol; +import com.usthe.common.entity.message.CollectRep; +import com.usthe.common.util.CommonConstants; +import com.usthe.common.util.IpDomainUtil; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpHeaders; +import org.apache.http.HttpStatus; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.client.methods.RequestBuilder; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.protocol.HttpContext; +import org.apache.http.util.EntityUtils; +import org.springframework.http.HttpMethod; + +import javax.net.ssl.SSLException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.ConnectException; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; + + +/** + * http https 采集实现类 + * @author tomsun28 + * @date 2021/11/4 15:37 + */ +@Slf4j +public class HttpCollectImpl extends AbstractCollect { + + private HttpCollectImpl() {} + + public static HttpCollectImpl getInstance() { + return Singleton.INSTANCE; + } + + @Override + public void collect(CollectRep.MetricsData.Builder builder, + long appId, String app, Metrics metrics) { + long startTime = System.currentTimeMillis(); + // 简单校验必有参数 + if (metrics == null || metrics.getHttp() == null) { + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg("Http/Https collect must has http params"); + return; + } + HttpContext httpContext = createHttpContext(metrics.getHttp()); + HttpUriRequest request = createHttpRequest(metrics.getHttp()); + try { + CloseableHttpResponse response = HttpClientPool.getHttpClient() + .execute(request, httpContext); + int statusCode = response.getStatusLine().getStatusCode(); + log.debug("http response status: {}", statusCode); + if (statusCode < HttpStatus.SC_OK || statusCode >= HttpStatus.SC_BAD_REQUEST) { + // 1XX 3XX 4XX 5XX 状态码 失败 + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg("StatusCode " + statusCode); + return; + } else { + // 2xx 状态码 成功 + String resp = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); + // 根据不同的解析方式解析 + if (resp == null || "".equals(resp)) { + log.info("http response entity is empty, status: {}.", statusCode); + builder.setCode(CollectRep.Code.SUCCESS); + builder.setMsg("statusCode: " + statusCode + ",entity empty."); + return; + } + Long responseTime = System.currentTimeMillis() - startTime; + String parseType = metrics.getHttp().getParseType(); + try { + if (DispatchConstants.PARSE_DEFAULT.equals(parseType)) { + parseResponseByDefault(resp, metrics.getAliasFields(), builder, responseTime); + } else if (DispatchConstants.PARSE_JSON_PATH.equals(parseType)) { + parseResponseByJsonPath(resp, metrics.getAliasFields(), metrics.getHttp(), builder, responseTime); + } else if (DispatchConstants.PARSE_PROMETHEUS.equals(parseType)) { + parseResponseByPrometheus(resp, metrics.getAliasFields(), metrics.getHttp(), builder); + } else if (DispatchConstants.PARSE_XML_PATH.equals(parseType)) { + parseResponseByXmlPath(resp, metrics.getAliasFields(), metrics.getHttp(), builder); + } else if (DispatchConstants.PARSE_WEBSITE.equals(parseType)){ + parseResponseByWebsite(resp, metrics.getAliasFields(), builder, responseTime); + } else { + parseResponseByDefault(resp, metrics.getAliasFields(), builder, responseTime); + } + } catch (Exception e) { + log.info("parse error: {}.", e.getMessage(), e); + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg("parse response data error:" + e.getMessage()); + return; + } + } + } catch (ClientProtocolException e1) { + log.error(e1.getCause().getMessage(), e1); + builder.setCode(CollectRep.Code.UN_CONNECTABLE); + builder.setMsg(e1.getCause().getMessage()); + return; + } catch (UnknownHostException e2) { + // 对端不可达 + log.info(e2.getMessage()); + builder.setCode(CollectRep.Code.UN_REACHABLE); + builder.setMsg("unknown host"); + return; + } catch (InterruptedIOException | ConnectException | SSLException e3) { + // 对端连接失败 + log.info(e3.getMessage()); + builder.setCode(CollectRep.Code.UN_CONNECTABLE); + builder.setMsg(e3.getMessage()); + return; + } catch (IOException e4) { + // 其它IO异常 + log.info(e4.getMessage()); + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg(e4.getMessage()); + return; + } catch (Exception e) { + // 其它异常 + log.error(e.getMessage(), e); + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg(e.getMessage()); + return; + } finally { + if (request != null) { + request.abort(); + } + } + } + + private void parseResponseByWebsite(String resp, List aliasFields, + CollectRep.MetricsData.Builder builder, Long responseTime) { + CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); + // todo resp 网站关键字监测 + for (String alias : aliasFields) { + if (CollectorConstants.RESPONSE_TIME.equalsIgnoreCase(alias)) { + valueRowBuilder.addColumns(responseTime.toString()); + } else { + valueRowBuilder.addColumns(CommonConstants.NULL_VALUE); + } + } + builder.addValues(valueRowBuilder.build()); + } + + private void parseResponseByXmlPath(String resp, List aliasFields, HttpProtocol http, + CollectRep.MetricsData.Builder builder) { + } + + private void parseResponseByJsonPath(String resp, List aliasFields, HttpProtocol http, + CollectRep.MetricsData.Builder builder, Long responseTime) { + List> results = JsonPathParser.parseContentWithJsonPath(resp,http. getParseScript()); + for (Map stringMap : results) { + CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); + for (String alias : aliasFields) { + Object value = stringMap.get(alias); + if (value != null) { + valueRowBuilder.addColumns(String.valueOf(value)); + } else { + if (CollectorConstants.RESPONSE_TIME.equalsIgnoreCase(alias)) { + valueRowBuilder.addColumns(responseTime.toString()); + } else { + valueRowBuilder.addColumns(CommonConstants.NULL_VALUE); + } + } + } + builder.addValues(valueRowBuilder.build()); + } + } + + private void parseResponseByPrometheus(String resp, List aliasFields, HttpProtocol http, + CollectRep.MetricsData.Builder builder) { + + } + + private void parseResponseByDefault(String resp, List aliasFields, + CollectRep.MetricsData.Builder builder, Long responseTime) { + JsonElement element = JsonParser.parseString(resp); + if (element.isJsonArray()) { + JsonArray array = element.getAsJsonArray(); + for (JsonElement jsonElement : array) { + if (jsonElement.isJsonObject()) { + JsonObject object = jsonElement.getAsJsonObject(); + CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); + for (String alias : aliasFields) { + JsonElement valueElement = object.get(alias); + if (valueElement != null) { + String value = valueElement.getAsString(); + valueRowBuilder.addColumns(value); + } else { + if (CollectorConstants.RESPONSE_TIME.equalsIgnoreCase(alias)) { + valueRowBuilder.addColumns(responseTime.toString()); + } else { + valueRowBuilder.addColumns(CommonConstants.NULL_VALUE); + } + } + } + builder.addValues(valueRowBuilder.build()); + } + } + } else if (element.isJsonObject()) { + JsonObject object = element.getAsJsonObject(); + CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); + for (String alias : aliasFields) { + JsonElement valueElement = object.get(alias); + if (valueElement != null) { + String value = valueElement.getAsString(); + valueRowBuilder.addColumns(value); + } else { + valueRowBuilder.addColumns(CommonConstants.NULL_VALUE); + } + } + builder.addValues(valueRowBuilder.build()); + } + } + + /** + * 创建httpContext + * @param httpProtocol http protocol + * @return context + */ + private HttpContext createHttpContext(HttpProtocol httpProtocol) { + HttpProtocol.Authorization auth = httpProtocol.getAuthorization(); + if (auth != null && !DispatchConstants.BEARER_TOKEN.equals(auth.getType())) { + HttpClientContext clientContext = new HttpClientContext(); + if (DispatchConstants.BASIC_AUTH.equals(auth.getType()) && auth.getBasicAuthUsername() != null + && auth.getBasicAuthPassword() != null) { + CredentialsProvider provider = new BasicCredentialsProvider(); + UsernamePasswordCredentials credentials + = new UsernamePasswordCredentials(auth.getBasicAuthUsername(), auth.getBasicAuthPassword()); + provider.setCredentials(AuthScope.ANY, credentials); + clientContext.setCredentialsProvider(provider); + } else if (DispatchConstants.DIGEST_AUTH.equals(auth.getType()) && auth.getDigestAuthUsername() != null + && auth.getDigestAuthPassword() != null) { + CredentialsProvider provider = new BasicCredentialsProvider(); + UsernamePasswordCredentials credentials + = new UsernamePasswordCredentials(auth.getBasicAuthUsername(), auth.getBasicAuthPassword()); + provider.setCredentials(AuthScope.ANY, credentials); + clientContext.setCredentialsProvider(provider); + } else { + clientContext = null; + } + return clientContext; + } + return null; + } + + /** + * 根据http配置参数构造请求头 + * @param httpProtocol http参数配置 + * @return 请求体 + */ + private HttpUriRequest createHttpRequest(HttpProtocol httpProtocol) { + RequestBuilder requestBuilder; + // method + String httpMethod = httpProtocol.getMethod().toUpperCase(); + if (HttpMethod.GET.matches(httpMethod)) { + requestBuilder = RequestBuilder.get(); + } else if (HttpMethod.POST.matches(httpMethod)) { + requestBuilder = RequestBuilder.post(); + } else if (HttpMethod.PUT.matches(httpMethod)) { + requestBuilder = RequestBuilder.put(); + } else if (HttpMethod.DELETE.matches(httpMethod)) { + requestBuilder = RequestBuilder.delete(); + } else if (HttpMethod.PATCH.matches(httpMethod)) { + requestBuilder = RequestBuilder.patch(); + } else { + // not support the method + log.error("not support the http method: {}.", httpProtocol.getMethod()); + return null; + } + // params + Map params = httpProtocol.getParams(); + if (params != null && !params.isEmpty()) { + for (Map.Entry param : params.entrySet()) { + requestBuilder.addParameter(param.getKey(), param.getValue()); + } + } + // headers + Map headers = httpProtocol.getHeaders(); + if (headers != null && !headers.isEmpty()) { + for (Map.Entry header : headers.entrySet()) { + requestBuilder.addHeader(header.getKey(), header.getValue()); + } + } + // keep-alive + requestBuilder.addHeader(HttpHeaders.CONNECTION, "keep-alive"); + // add accept + if (DispatchConstants.PARSE_DEFAULT.equals(httpProtocol.getParseType()) + || DispatchConstants.PARSE_JSON_PATH.equals(httpProtocol.getParseType())) { + requestBuilder.addHeader(HttpHeaders.ACCEPT, "application/json"); + } else if (DispatchConstants.PARSE_XML_PATH.equals(httpProtocol.getParseType())) { + requestBuilder.addHeader(HttpHeaders.ACCEPT, "text/xml,application/xml"); + } else if (DispatchConstants.PARSE_PROMETHEUS.equals(httpProtocol.getParseType())) { + requestBuilder.addHeader(HttpHeaders.ACCEPT, DispatchConstants.PARSE_PROMETHEUS_ACCEPT); + requestBuilder.addHeader(HttpHeaders.ACCEPT_ENCODING, "gzip"); + } else { + requestBuilder.addHeader(HttpHeaders.ACCEPT, "*/*"); + } + + // 判断是否使用Bearer Token认证 + if (httpProtocol.getAuthorization() != null + && DispatchConstants.BEARER_TOKEN.equals(httpProtocol.getAuthorization().getType())) { + // 若使用 将token放入到header里面 + String value = DispatchConstants.BEARER + " " + httpProtocol.getAuthorization().getBearerTokenToken(); + requestBuilder.addHeader(HttpHeaders.AUTHORIZATION, value); + } + // todo 处理请求内容 body 暂不支持body + + // uri + if (IpDomainUtil.isHasSchema(httpProtocol.getHost())) { + requestBuilder.setUri(httpProtocol.getHost() + ":" + httpProtocol.getPort() + httpProtocol.getUrl()); + } else { + boolean ssl = Boolean.parseBoolean(httpProtocol.getSsl()); + if (ssl) { + requestBuilder.setUri("https://" + httpProtocol.getHost() + ":" + httpProtocol.getPort() + httpProtocol.getUrl()); + } else { + requestBuilder.setUri("http://" + httpProtocol.getHost() + ":" + httpProtocol.getPort() + httpProtocol.getUrl()); + } + } + return requestBuilder.build(); + } + + private static class Singleton { + private static final HttpCollectImpl INSTANCE = new HttpCollectImpl(); + } +} diff --git a/collector/src/main/java/com/usthe/collector/collect/icmp/IcmpCollectImpl.java b/collector/src/main/java/com/usthe/collector/collect/icmp/IcmpCollectImpl.java new file mode 100644 index 0000000..4c04985 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/icmp/IcmpCollectImpl.java @@ -0,0 +1,86 @@ +package com.usthe.collector.collect.icmp; + +import com.usthe.collector.collect.AbstractCollect; +import com.usthe.collector.util.CollectorConstants; +import com.usthe.common.entity.job.Metrics; +import com.usthe.common.entity.job.protocol.IcmpProtocol; +import com.usthe.common.entity.message.CollectRep; +import com.usthe.common.util.CommonConstants; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; + +/** + * icmp协议采集实现 - ping + * @author tom + * @date 2021/12/4 12:32 + */ +@Slf4j +public class IcmpCollectImpl extends AbstractCollect { + + private IcmpCollectImpl(){} + + public static IcmpCollectImpl getInstance() { + return IcmpCollectImpl.Singleton.INSTANCE; + } + + + @Override + public void collect(CollectRep.MetricsData.Builder builder, long appId, String app, Metrics metrics) { + long startTime = System.currentTimeMillis(); + // 简单校验必有参数 + if (metrics == null || metrics.getIcmp() == null) { + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg("ICMP collect must has icmp params"); + return; + } + IcmpProtocol icmp = metrics.getIcmp(); + // 超时时间默认300毫秒 + int timeout = 300; + try { + timeout = Integer.parseInt(icmp.getTimeout()); + } catch (Exception e) { + log.warn(e.getMessage()); + } + try { + // todo 需要配置java虚拟机root权限从而使用ICMP,否则是判断telnet对端7号端口是否开通 + // https://stackoverflow.com/questions/11506321/how-to-ping-an-ip-address + boolean status = InetAddress.getByName(icmp.getHost()).isReachable(timeout); + long responseTime = System.currentTimeMillis() - startTime; + if (status) { + CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); + for (String alias : metrics.getAliasFields()) { + if (CollectorConstants.RESPONSE_TIME.equalsIgnoreCase(alias)) { + valueRowBuilder.addColumns(Long.toString(responseTime)); + } else { + valueRowBuilder.addColumns(CommonConstants.NULL_VALUE); + } + } + builder.addValues(valueRowBuilder.build()); + } else { + builder.setCode(CollectRep.Code.UN_REACHABLE); + builder.setMsg("对端不可达,Timeout " + timeout + "ms"); + return; + } + } catch (UnknownHostException unknownHostException) { + builder.setCode(CollectRep.Code.UN_REACHABLE); + builder.setMsg("UnknownHost " + unknownHostException.getMessage()); + return; + } catch (IOException ioException) { + builder.setCode(CollectRep.Code.UN_REACHABLE); + builder.setMsg("IOException " + ioException.getMessage()); + return; + } catch (Exception e) { + log.error(e.getMessage(), e); + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg("IllegalArgument " + e.getMessage()); + } + + } + + private static class Singleton { + private static final IcmpCollectImpl INSTANCE = new IcmpCollectImpl(); + } +} diff --git a/collector/src/main/java/com/usthe/collector/collect/telnet/TelnetCollectImpl.java b/collector/src/main/java/com/usthe/collector/collect/telnet/TelnetCollectImpl.java new file mode 100644 index 0000000..a3ac6dc --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/telnet/TelnetCollectImpl.java @@ -0,0 +1,93 @@ +package com.usthe.collector.collect.telnet; + +import com.usthe.collector.collect.AbstractCollect; +import com.usthe.collector.util.CollectorConstants; +import com.usthe.common.entity.job.Metrics; +import com.usthe.common.entity.job.protocol.TelnetProtocol; +import com.usthe.common.entity.message.CollectRep; +import com.usthe.common.util.CommonConstants; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.net.telnet.TelnetClient; + +import java.io.IOException; +import java.net.ConnectException; + +/** + * icmp协议采集实现 - ping + * @author tom + * @date 2021/12/4 12:32 + */ +@Slf4j +public class TelnetCollectImpl extends AbstractCollect { + + private TelnetCollectImpl(){} + + public static TelnetCollectImpl getInstance() { + return TelnetCollectImpl.Singleton.INSTANCE; + } + + + @Override + public void collect(CollectRep.MetricsData.Builder builder, long appId, String app, Metrics metrics) { + long startTime = System.currentTimeMillis(); + // 简单校验必有参数 + if (metrics == null || metrics.getTelnet() == null) { + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg("Telnet collect must has telnet params"); + return; + } + + TelnetProtocol telnet = metrics.getTelnet(); + // 超时时间默认300毫秒 + int timeout = 300; + try { + timeout = Integer.parseInt(telnet.getTimeout()); + } catch (Exception e) { + log.warn(e.getMessage()); + } + TelnetClient telnetClient = null; + try { + //指明Telnet终端类型,否则会返回来的数据中文会乱码 + telnetClient = new TelnetClient("vt200"); + telnetClient.setConnectTimeout(timeout); + telnetClient.connect(telnet.getHost(),Integer.parseInt(telnet.getPort())); + long responseTime = System.currentTimeMillis() - startTime; + if (telnetClient.isConnected()) { + CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); + for (String alias : metrics.getAliasFields()) { + if (CollectorConstants.RESPONSE_TIME.equalsIgnoreCase(alias)) { + valueRowBuilder.addColumns(Long.toString(responseTime)); + } else { + valueRowBuilder.addColumns(CommonConstants.NULL_VALUE); + } + } + builder.addValues(valueRowBuilder.build()); + } else { + builder.setCode(CollectRep.Code.UN_CONNECTABLE); + builder.setMsg("对端连接失败,Timeout " + timeout + "ms"); + return; + } + telnetClient.disconnect(); + } catch (ConnectException connectException) { + log.debug(connectException.getMessage()); + builder.setCode(CollectRep.Code.UN_CONNECTABLE); + builder.setMsg("对端拒绝连接:服务未启动端口监听或防火墙"); + } catch (IOException ioException) { + log.debug(ioException.getMessage()); + builder.setCode(CollectRep.Code.UN_CONNECTABLE); + builder.setMsg("对端连接失败 " + ioException.getMessage()); + } finally { + if (telnetClient != null) { + try { + telnetClient.disconnect(); + } catch (Exception e) { + log.warn(e.getMessage()); + } + } + } + } + + private static class Singleton { + private static final TelnetCollectImpl INSTANCE = new TelnetCollectImpl(); + } +} diff --git a/collector/src/main/java/com/usthe/collector/common/CollectorProperties.java b/collector/src/main/java/com/usthe/collector/common/CollectorProperties.java new file mode 100644 index 0000000..9b3cb1d --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/common/CollectorProperties.java @@ -0,0 +1,15 @@ +package com.usthe.collector.common; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * java common 的配置属性 + * @author tomsun28 + * @date 2021/10/16 14:23 + */ +@Component +@ConfigurationProperties(prefix = "collector.common") +public class CollectorProperties { + +} diff --git a/collector/src/main/java/com/usthe/collector/dispatch/CollectDataDispatch.java b/collector/src/main/java/com/usthe/collector/dispatch/CollectDataDispatch.java new file mode 100644 index 0000000..df08635 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/dispatch/CollectDataDispatch.java @@ -0,0 +1,23 @@ +package com.usthe.collector.dispatch; + + +import com.usthe.collector.dispatch.timer.Timeout; +import com.usthe.common.entity.job.Metrics; +import com.usthe.common.entity.message.CollectRep; + +/** + * 采集数据调度器接口 + * @author tomsun28 + * @date 2021/11/2 11:20 + */ +public interface CollectDataDispatch { + + /** + * 处理分发采集结果数据 + * @param timeout 时间轮timeout + * @param metrics 下面的指标组采集任务 + * @param metricsData 采集结果数据 + */ + void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.MetricsData metricsData); + +} diff --git a/collector/src/main/java/com/usthe/collector/dispatch/CommonDispatcher.java b/collector/src/main/java/com/usthe/collector/dispatch/CommonDispatcher.java new file mode 100644 index 0000000..e73a6aa --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/dispatch/CommonDispatcher.java @@ -0,0 +1,202 @@ +package com.usthe.collector.dispatch; + +import com.usthe.collector.dispatch.export.MetricsDataExporter; +import com.usthe.collector.dispatch.timer.Timeout; +import com.usthe.collector.dispatch.timer.TimerDispatch; +import com.usthe.collector.dispatch.timer.WheelTimerTask; +import com.usthe.common.entity.job.Job; +import com.usthe.common.entity.job.Metrics; +import com.usthe.common.entity.message.CollectRep; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * 指标组采集任务与响应数据调度器 + * @author tomsun28 + * @date 2021/11/2 11:24 + */ +@Component +@Slf4j +public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatch { + + /** + * 指标组采集任务超时时间值 + */ + private static final long DURATION_TIME = 120_000L; + /** + * 指标组采集任务优先级队列 + */ + private MetricsCollectorQueue jobRequestQueue; + /** + * 时间轮任务调度器 + */ + private TimerDispatch timerDispatch; + /** + * kafka采集数据导出器 + */ + private MetricsDataExporter kafkaDataExporter; + /** + * 指标组任务与开始时间映射map + */ + private Map metricsTimeoutMonitorMap; + + public CommonDispatcher(MetricsCollectorQueue jobRequestQueue, TimerDispatch timerDispatch, + MetricsDataExporter kafkaDataExporter, WorkerPool workerPool) { + this.kafkaDataExporter = kafkaDataExporter; + this.jobRequestQueue = jobRequestQueue; + this.timerDispatch = timerDispatch; + ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 2, 1, + TimeUnit.SECONDS, + new SynchronousQueue<>(), r -> { + Thread thread = new Thread(r); + thread.setDaemon(true); + return thread; + }); + // 从任务队列拉取指标组采集任务放入线程池执行 + poolExecutor.execute(() -> { + Thread.currentThread().setName("metrics-task-dispatcher"); + while (!Thread.currentThread().isInterrupted()) { + MetricsCollect metricsCollect = null; + try { + metricsCollect = jobRequestQueue.getJob(); + if (metricsCollect != null) { + workerPool.executeJob(metricsCollect); + } + } catch (RejectedExecutionException rejected) { + log.info("[Dispatcher]-the worker pool is full, reject this metrics task, " + + "sleep and put in queue again."); + try { + Thread.sleep(1000); + if (metricsCollect != null) { + // 在队列里的优先级增大 + metricsCollect.setRunPriority((byte) (metricsCollect.getRunPriority() + 1)); + jobRequestQueue.addJob(metricsCollect); + } + } catch (InterruptedException interruptedException){} + } catch (Exception e) { + log.error("[Dispatcher]-{}.", e.getMessage(), e); + } + } + }); + // 监控指标组采集任务执行时间 + metricsTimeoutMonitorMap = new ConcurrentHashMap<>(128); + poolExecutor.execute(() -> { + Thread.currentThread().setName("metrics-task-monitor"); + while (!Thread.currentThread().isInterrupted()) { + try { + // 检测每个指标组采集单元是否超时2分钟,超时则丢弃并返回异常 + long deadline = System.currentTimeMillis() - DURATION_TIME; + for (Map.Entry entry : metricsTimeoutMonitorMap.entrySet()) { + MetricsTime metricsTime = entry.getValue(); + if (metricsTime.getStartTime() < deadline) { + // 指标组采集超时 + WheelTimerTask timerJob = (WheelTimerTask) metricsTime.getTimeout().task(); + CollectRep.MetricsData metricsData = CollectRep.MetricsData.newBuilder() + .setId(timerJob.getJob().getMonitorId()) + .setApp(timerJob.getJob().getApp()) + .setMetrics(metricsTime.getMetrics().getName()) + .setTime(System.currentTimeMillis()) + .setCode(CollectRep.Code.TIMEOUT).setMsg("collect timeout").build(); + dispatchCollectData(metricsTime.timeout, metricsTime.getMetrics(), metricsData); + metricsTimeoutMonitorMap.remove(entry.getKey()); + } + } + Thread.sleep(20000); + } catch (Exception e){ + log.error("[Monitor]-{}.", e.getMessage(), e); + } + } + }); + } + + @Override + public void dispatchMetricsTask(Timeout timeout) { + // 将单个应用的采集任务根据其下的指标组拆分为对应的指标组采集任务 AbstractCollect + // 将每个指标组放入线程池进行调度 + WheelTimerTask timerTask = (WheelTimerTask) timeout.task(); + Job job = timerTask.getJob(); + job.constructPriorMetrics(); + Set metricsSet = job.getNextCollectMetrics(null, true); + metricsSet.forEach(metrics -> { + MetricsCollect metricsCollect = new MetricsCollect(metrics, timeout, this); + jobRequestQueue.addJob(metricsCollect); + metricsTimeoutMonitorMap.put(job.getId() + "-" + metrics.getName(), + new MetricsTime(System.currentTimeMillis(), metrics, timeout)); + }); + } + + @Override + public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.MetricsData metricsData) { + WheelTimerTask timerJob = (WheelTimerTask) timeout.task(); + Job job = timerJob.getJob(); + metricsTimeoutMonitorMap.remove(job.getId() + "-" + metrics.getName()); + Set metricsSet = job.getNextCollectMetrics(metrics, false); + if (job.isCyclic()) { + // 若是异步的周期性循环任务,直接发送指标组的采集数据到消息中间件 + kafkaDataExporter.send(metricsData); + if (metricsSet == null) { + // 此Job所有指标组采集执行完成 + // 周期性任务再次将任务push到时间轮 + // 先判断此次任务执行时间与任务采集间隔时间 + if (timeout.isCancelled()) { + return; + } + long spendTime = System.currentTimeMillis() - job.getDispatchTime(); + long interval = job.getInterval() - spendTime / 1000; + interval = interval <= 0 ? 0 : interval; + // 重置构造执行指标组视图 + job.constructPriorMetrics(); + timerDispatch.cyclicJob(timerJob, interval, TimeUnit.SECONDS); + } else if (!metricsSet.isEmpty()) { + // 当前级别指标组执行完成,开始执行下一级别的指标组 + metricsSet.forEach(metricItem -> { + MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this); + jobRequestQueue.addJob(metricsCollect); + metricsTimeoutMonitorMap.put(job.getId() + metrics.getName(), + new MetricsTime(System.currentTimeMillis(), metrics, timeout)); + }); + } else { + // 当前执行级别的指标组列表未全执行完成, + // 需等待其它同级别指标组执行完成后进入下一级别执行 + } + } else { + // 若是临时性一次任务,需等待所有指标组的采集数据统一包装返回 + // 将当前指标组数据插入job里统一组装 + job.addCollectMetricsData(metricsData); + if (metricsSet == null) { + // 此Job所有指标组采集执行完成 + // 将所有指标组数据组合一起通知结果监听器 + timerDispatch.responseSyncJobData(job.getId(), job.getResponseDataTemp()); + } else if (!metricsSet.isEmpty()) { + // 当前级别指标组执行完成,开始执行下一级别的指标组 + metricsSet.forEach(metricItem -> { + MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this); + jobRequestQueue.addJob(metricsCollect); + metricsTimeoutMonitorMap.put(job.getId() + metrics.getName(), + new MetricsTime(System.currentTimeMillis(), metrics, timeout)); + }); + } else { + // 当前执行级别的指标组列表未全执行完成, + // 需等待其它同级别指标组执行完成后进入下一级别执行 + } + } + } + + @Data + @AllArgsConstructor + private static class MetricsTime { + private long startTime; + private Metrics metrics; + private Timeout timeout; + } +} diff --git a/collector/src/main/java/com/usthe/collector/dispatch/DispatchConstants.java b/collector/src/main/java/com/usthe/collector/dispatch/DispatchConstants.java new file mode 100644 index 0000000..4c85bf2 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/dispatch/DispatchConstants.java @@ -0,0 +1,68 @@ +package com.usthe.collector.dispatch; + +/** + * dispatch 常量 + * @author tomsun28 + * @date 2021/11/3 16:50 + */ +public interface DispatchConstants { + + // 协议类型相关 - start // + /** + * 协议 http + */ + String PROTOCOL_HTTP = "http"; + /** + * 协议 icmp + */ + String PROTOCOL_ICMP = "icmp"; + /** + * 协议 telnet + */ + String PROTOCOL_TELNET = "telnet"; + /** + * 协议 jdbc + */ + String PROTOCOL_JDBC = "jdbc"; + // 协议类型相关 - end // + + // http协议相关 - start 需尽可能先复用 HttpHeaders // + /** + * 认证方式 Bearer Token + */ + String BEARER_TOKEN = "Bearer Token"; + /** + * Bearer Token 的认证参数字符 + */ + String BEARER = "Bearer"; + /** + * 认证方式 Basic Auth + */ + String BASIC_AUTH = "Basic Auth"; + /** + * 认证方式 Digest Auth + */ + String DIGEST_AUTH = "Digest Auth"; + /** + * 解析方式 默认规则 + */ + String PARSE_DEFAULT = "default"; + /** + * 解析方式 自定义json path + */ + String PARSE_JSON_PATH = "jsonPath"; + /** + * 解析方式 自定义xml path + */ + String PARSE_XML_PATH = "xmlPath"; + /** + * 解析方式 网站可用性监控规则 提供responseTime指标 + */ + String PARSE_WEBSITE = "website"; + /** + * 解析方式 prometheus规则 + */ + String PARSE_PROMETHEUS = "prometheus"; + String PARSE_PROMETHEUS_ACCEPT = "application/openmetrics-text; version=0.0.1,text/plain;version=0.0.4;q=0.5,*/*;q=0.1"; + // http协议相关 - end // +} diff --git a/collector/src/main/java/com/usthe/collector/dispatch/DispatchProperties.java b/collector/src/main/java/com/usthe/collector/dispatch/DispatchProperties.java new file mode 100644 index 0000000..c0acde2 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/dispatch/DispatchProperties.java @@ -0,0 +1,225 @@ +package com.usthe.collector.dispatch; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * 调度分发任务配置属性 + * @author tomsun28 + * @date 2021/10/16 14:54 + */ +@Component +@ConfigurationProperties(prefix = "collector.dispatch") +public class DispatchProperties { + + /** + * 调度入口配置属性 + */ + private EntranceProperties entrance; + + /** + * 调度数据出口配置属性 + */ + private ExportProperties export; + + public EntranceProperties getEntrance() { + return entrance; + } + + public void setEntrance(EntranceProperties entrance) { + this.entrance = entrance; + } + + public ExportProperties getExport() { + return export; + } + + public void setExport(ExportProperties export) { + this.export = export; + } + + /** + * 调度入口配置属性 + * 入口可以时etcd信息,http请求,消息中间件消息请求 + */ + public static class EntranceProperties { + /** + * etcd配置信息 + */ + private EtcdProperties etcd; + + public EtcdProperties getEtcd() { + return etcd; + } + + public void setEtcd(EtcdProperties etcd) { + this.etcd = etcd; + } + + public static class EtcdProperties { + + /** + * etcd调度是否启动 + */ + private boolean enabled = true; + + /** + * etcd的连接端点url + */ + private String[] endpoints = new String[] {"http://127.0.0.1:2379"}; + + /** + * etcd连接用户名 + */ + private String username; + + /** + * etcd连接密码 + */ + private String password; + + /** + * etcd租约的有效时间 单位秒 + */ + private long ttl = 200; + + /** + * 采集器注册目录 + */ + private String collectorDir = "/usthe/dispatch/collector/"; + + /** + * 任务调度分发目录 + */ + private String assignDir = "/usthe/dispatch/assign/"; + + /** + * 任务详细目录 + */ + private String jobDir = "/usthe/dispatch/job/"; + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public String[] getEndpoints() { + return endpoints; + } + + public void setEndpoints(String[] endpoints) { + this.endpoints = endpoints; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public long getTtl() { + return ttl; + } + + public void setTtl(long ttl) { + this.ttl = ttl; + } + + public String getCollectorDir() { + return collectorDir; + } + + public void setCollectorDir(String collectorDir) { + this.collectorDir = collectorDir; + } + + public String getAssignDir() { + return assignDir; + } + + public void setAssignDir(String assignDir) { + this.assignDir = assignDir; + } + + public String getJobDir() { + return jobDir; + } + + public void setJobDir(String jobDir) { + this.jobDir = jobDir; + } + } + } + + /** + * 调度数据出口配置属性 + */ + public static class ExportProperties { + + /** + * kafka配置信息 + */ + private KafkaProperties kafka; + + public KafkaProperties getKafka() { + return kafka; + } + + public void setKafka(KafkaProperties kafka) { + this.kafka = kafka; + } + + public static class KafkaProperties { + /** + * kafka数据出口是否启动 + */ + private boolean enabled = true; + + /** + * kafka的连接服务器url + */ + private String servers = "http://127.0.0.1:2379"; + /** + * 发送数据的topic名称 + */ + private String topic; + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public String getServers() { + return servers; + } + + public void setServers(String servers) { + this.servers = servers; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + } + } +} \ No newline at end of file diff --git a/collector/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java b/collector/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java new file mode 100644 index 0000000..dc130eb --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java @@ -0,0 +1,265 @@ +package com.usthe.collector.dispatch; + +import com.googlecode.aviator.AviatorEvaluator; +import com.googlecode.aviator.Expression; +import com.usthe.collector.collect.AbstractCollect; +import com.usthe.collector.collect.http.HttpCollectImpl; +import com.usthe.collector.collect.icmp.IcmpCollectImpl; +import com.usthe.collector.collect.telnet.TelnetCollectImpl; +import com.usthe.collector.dispatch.timer.Timeout; +import com.usthe.collector.dispatch.timer.WheelTimerTask; +import com.usthe.common.entity.job.Job; +import com.usthe.common.entity.job.Metrics; +import com.usthe.common.entity.message.CollectRep; +import com.usthe.common.util.CommonConstants; +import com.usthe.common.util.CommonUtil; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * 指标组采集 + * @author tomsun28 + * @date 2021/10/10 15:35 + */ +@Slf4j +@Data +public class MetricsCollect implements Runnable, Comparable { + /** + * 监控ID + */ + protected long monitorId; + /** + * 监控类型名称 + */ + protected String app; + /** + * 指标组配置 + */ + protected Metrics metrics; + /** + * 时间轮timeout + */ + protected Timeout timeout; + /** + * 任务和数据调度 + */ + protected CollectDataDispatch collectDataDispatch; + /** + * 任务执行优先级 + */ + protected byte runPriority; + /** + * 是周期性采集还是一次性采集 true-周期性 false-一次性 + */ + protected boolean isCyclic; + /** + * 指标组采集任务新建时间 + */ + protected long newTime; + /** + * 指标组采集任务开始执行时间 + */ + protected long startTime; + + public MetricsCollect(Metrics metrics, Timeout timeout, CollectDataDispatch collectDataDispatch) { + this.newTime = System.currentTimeMillis(); + this.timeout = timeout; + this.metrics = metrics; + WheelTimerTask timerJob = (WheelTimerTask) timeout.task(); + Job job = timerJob.getJob(); + this.monitorId = job.getMonitorId(); + this.app = job.getApp(); + this.collectDataDispatch = collectDataDispatch; + this.isCyclic = job.isCyclic(); + // 临时一次性任务执行优先级高 + if (isCyclic) { + runPriority = (byte) -1; + } else { + runPriority = (byte) 1; + } + } + + @Override + public void run() { + this.startTime = System.currentTimeMillis(); + setNewThreadName(monitorId, app, startTime, metrics); + CollectRep.MetricsData.Builder response = CollectRep.MetricsData.newBuilder(); + response.setApp(app); + response.setId(monitorId); + response.setMetrics(metrics.getName()); + + // 根据指标组采集协议,应用类型等来调度到真正的应用指标组采集实现类 + AbstractCollect abstractCollect = null; + switch (metrics.getProtocol()) { + case DispatchConstants.PROTOCOL_HTTP: + abstractCollect = HttpCollectImpl.getInstance(); + break; + case DispatchConstants.PROTOCOL_ICMP: + abstractCollect = IcmpCollectImpl.getInstance(); + break; + case DispatchConstants.PROTOCOL_TELNET: + abstractCollect = TelnetCollectImpl.getInstance(); + break; + // todo + default: break; + } + if (abstractCollect == null) { + log.error("[Dispatcher] - not support this: app: {}, metrics: {}, protocol: {}.", + app, metrics.getName(), metrics.getProtocol()); + response.setCode(CollectRep.Code.FAIL); + response.setMsg("not support " + app + ", " + + metrics.getName() + ", " + metrics.getProtocol()); + return; + } else { + try { + abstractCollect.collect(response, monitorId, app, metrics); + } catch (Exception e) { + String msg = e.getMessage(); + if (msg == null && e.getCause() != null) { + msg = e.getCause().getMessage(); + } + log.error("[Metrics Collect]: {}.", msg, e); + response.setCode(CollectRep.Code.FAIL); + if (msg != null) { + response.setMsg(e.getMessage()); + } + } + } + // 别名属性表达式替换计算 + if (fastFailed()) { + return; + } + calculateFields(metrics, response); + CollectRep.MetricsData metricsData = validateResponse(response); + collectDataDispatch.dispatchCollectData(timeout, metrics, metricsData); + } + + + /** + * 根据 calculates 和 aliasFields 配置计算出真正的指标(fields)值 + * 计算instance实例值 + * @param metrics 指标组配置 + * @param collectData 采集数据 + */ + private void calculateFields(Metrics metrics, CollectRep.MetricsData.Builder collectData) { + collectData.setPriority(metrics.getPriority()); + List fieldList = new LinkedList<>(); + for (Metrics.Field field : metrics.getFields()) { + fieldList.add(CollectRep.Field.newBuilder().setName(field.getField()).setType(field.getType()).build()); + } + collectData.addAllFields(fieldList); + List aliasRowList = collectData.getValuesList(); + if (aliasRowList == null || aliasRowList.isEmpty()) { + return; + } + collectData.clearValues(); + // 先预处理 calculates + if (metrics.getCalculates() == null) { + metrics.setCalculates(Collections.emptyList()); + } + Map fieldExpressionMap = metrics.getCalculates() + .stream() + .map(cal -> { + int splitIndex = cal.indexOf("="); + String field = cal.substring(0, splitIndex); + String expressionStr = cal.substring(splitIndex + 1); + Expression expression = AviatorEvaluator.compile(expressionStr, true); + return new Object[]{field, expression}; }) + .collect(Collectors.toMap(arr -> (String)arr[0], arr -> (Expression) arr[1])); + + List fields = metrics.getFields(); + List aliasFields = metrics.getAliasFields(); + Map aliasFieldValueMap = new HashMap<>(16); + Map fieldValueMap = new HashMap<>(16); + CollectRep.ValueRow.Builder realValueRowBuilder = CollectRep.ValueRow.newBuilder(); + for (CollectRep.ValueRow aliasRow : aliasRowList) { + for (int aliasIndex = 0; aliasIndex < aliasFields.size(); aliasIndex++) { + String aliasFieldValue = aliasRow.getColumns(aliasIndex); + if (!CommonConstants.NULL_VALUE.equals(aliasFieldValue)) { + aliasFieldValueMap.put(aliasFields.get(aliasIndex), aliasFieldValue); + } + } + StringBuilder instanceBuilder = new StringBuilder(); + for (Metrics.Field field : fields) { + String realField = field.getField(); + Expression expression = fieldExpressionMap.get(realField); + String value = null; + if (expression != null) { + // 存在计算表达式 则计算值 + if (CommonConstants.TYPE_NUMBER == field.getType()) { + for (String variable : expression.getVariableNames()) { + Double doubleValue = CommonUtil.parseDoubleStr(aliasFieldValueMap.get(variable)); + if (doubleValue != null) { + fieldValueMap.put(variable, doubleValue); + } + } + } else { + for (String variable : expression.getVariableNames()) { + String strValue = aliasFieldValueMap.get(variable); + if (strValue != null && !"".equals(strValue)) { + fieldValueMap.put(variable, strValue); + } + } + } + try { + Object objValue = expression.execute(fieldValueMap); + if (objValue != null) { + value = String.valueOf(objValue); + } + } catch (Exception e) { + log.warn(e.getMessage()); + } + } else { + // 不存在 则映射别名值 + value = aliasFieldValueMap.get(realField); + } + if (value == null) { + value = CommonConstants.NULL_VALUE; + } + realValueRowBuilder.addColumns(value); + fieldValueMap.clear(); + if (field.isInstance() && !CommonConstants.NULL_VALUE.equals(value)) { + instanceBuilder.append(value); + } + } + aliasFieldValueMap.clear(); + // 设置实例instance + realValueRowBuilder.setInstance(instanceBuilder.toString()); + collectData.addValues(realValueRowBuilder.build()); + } + } + + private boolean fastFailed() { + return this.timeout == null || this.timeout.isCancelled(); + } + + private CollectRep.MetricsData validateResponse(CollectRep.MetricsData.Builder builder) { + long endTime = System.currentTimeMillis(); + builder.setTime(endTime); + log.debug("[Collect]: newTime: {}, startTime: {}, spendTime: {}.", newTime, startTime, endTime - startTime); + if (builder.getCode() != CollectRep.Code.SUCCESS) { + log.info("[Collect Fail] Reason: {}", builder.getMsg()); + } else { + log.info("[Collect Success]."); + } + return builder.build(); + } + + private void setNewThreadName(long monitorId, String app, long startTime, Metrics metrics) { + String builder = monitorId + "-" + app + "-" + metrics.getName() + + "-" + String.valueOf(startTime).substring(9); + Thread.currentThread().setName(builder); + } + + @Override + public int compareTo(MetricsCollect collect) { + return runPriority - collect.runPriority; + } +} diff --git a/collector/src/main/java/com/usthe/collector/dispatch/MetricsCollectorQueue.java b/collector/src/main/java/com/usthe/collector/dispatch/MetricsCollectorQueue.java new file mode 100644 index 0000000..8275bd9 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/dispatch/MetricsCollectorQueue.java @@ -0,0 +1,32 @@ +package com.usthe.collector.dispatch; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * 待运行的job队列 + * @author tomsun28 + * @date 2021/10/10 20:20 + */ +@Component +@Slf4j +public class MetricsCollectorQueue { + + private final PriorityBlockingQueue jobQueue; + + public MetricsCollectorQueue() { + jobQueue = new PriorityBlockingQueue<>(2000); + } + + public void addJob(MetricsCollect job) { + jobQueue.offer(job); + } + + public MetricsCollect getJob() throws InterruptedException { + return jobQueue.poll(2, TimeUnit.SECONDS); + } + +} diff --git a/collector/src/main/java/com/usthe/collector/dispatch/MetricsTaskDispatch.java b/collector/src/main/java/com/usthe/collector/dispatch/MetricsTaskDispatch.java new file mode 100644 index 0000000..f7c53d1 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/dispatch/MetricsTaskDispatch.java @@ -0,0 +1,17 @@ +package com.usthe.collector.dispatch; + +import com.usthe.collector.dispatch.timer.Timeout; + +/** + * 指标组采集任务调度器接口 + * @author tomsun28 + * @date 2021/11/2 11:19 + */ +public interface MetricsTaskDispatch { + + /** + * 调度 + * @param timeout timeout + */ + void dispatchMetricsTask(Timeout timeout); +} diff --git a/collector/src/main/java/com/usthe/collector/dispatch/WorkerPool.java b/collector/src/main/java/com/usthe/collector/dispatch/WorkerPool.java new file mode 100644 index 0000000..24f10a2 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/dispatch/WorkerPool.java @@ -0,0 +1,62 @@ +package com.usthe.collector.dispatch; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.stereotype.Component; + +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * 采集任务工作线程池 + * @author tomsun28 + * @date 2021/10/15 0:01 + */ +@Component +@Slf4j +public class WorkerPool implements DisposableBean { + + private ThreadPoolExecutor workerExecutor; + + public WorkerPool() { + initWorkExecutor(); + } + + private void initWorkExecutor() { + // 线程工厂 + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setUncaughtExceptionHandler((thread, throwable) -> { + log.error("workerExecutor has uncaughtException."); + log.error(throwable.getMessage(), throwable); }) + .setDaemon(true) + .setNameFormat("collect-worker-%d") + .build(); + workerExecutor = new ThreadPoolExecutor(100, + 800, + 10, + TimeUnit.SECONDS, + new SynchronousQueue<>(), + threadFactory, + new ThreadPoolExecutor.AbortPolicy()); + } + + /** + * 运行采集任务线程 + * @param runnable 任务 + * @throws RejectedExecutionException when 线程池满 + */ + public void executeJob(Runnable runnable) throws RejectedExecutionException { + workerExecutor.execute(runnable); + } + + @Override + public void destroy() throws Exception { + if (workerExecutor != null) { + workerExecutor.shutdownNow(); + } + } +} diff --git a/collector/src/main/java/com/usthe/collector/dispatch/entrance/internal/CollectJobService.java b/collector/src/main/java/com/usthe/collector/dispatch/entrance/internal/CollectJobService.java new file mode 100644 index 0000000..22287c3 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/dispatch/entrance/internal/CollectJobService.java @@ -0,0 +1,83 @@ +package com.usthe.collector.dispatch.entrance.internal; + +import com.usthe.collector.dispatch.timer.TimerDispatch; +import com.usthe.common.entity.job.Job; +import com.usthe.common.entity.message.CollectRep; +import com.usthe.common.util.SnowFlakeIdGenerator; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * 采集job管理提供api接口 + * @author tomsun28 + * @date 2021/11/6 13:58 + */ +@Service +@Slf4j +public class CollectJobService { + + @Autowired + private TimerDispatch timerDispatch; + + /** + * 执行一次性采集任务,获取采集数据响应 + * @param job 采集任务详情 + * @return 采集结果 + */ + public List collectSyncJobData(Job job) { + final List metricsData = new LinkedList<>(); + final CountDownLatch countDownLatch = new CountDownLatch(1); + CollectResponseEventListener listener = new CollectResponseEventListener() { + @Override + public void response(List responseMetrics) { + if (responseMetrics != null) { + metricsData.addAll(responseMetrics); + } + countDownLatch.countDown(); + } + }; + timerDispatch.addJob(job, listener); + try { + countDownLatch.await(100, TimeUnit.SECONDS); + } catch (Exception e) { + log.info("同步任务运行100秒无响应,返回"); + } + return metricsData; + } + + /** + * 下发周期性异步采集任务 + * @param job 采集任务详情 + * @return long 任务ID + */ + public long addAsyncCollectJob(Job job) { + long jobId = SnowFlakeIdGenerator.generateId(); + job.setId(jobId); + timerDispatch.addJob(job, null); + return jobId; + } + + /** + * 更新已经下发的周期性异步采集任务 + * @param modifyJob 采集任务详情 + */ + public void updateAsyncCollectJob(Job modifyJob) { + timerDispatch.deleteJob(modifyJob.getId(), true); + timerDispatch.addJob(modifyJob, null); + } + + /** + * 取消周期性异步采集任务 + * @param jobId 任务ID + */ + public void cancelAsyncCollectJob(Long jobId) { + timerDispatch.deleteJob(jobId, true); + } + +} diff --git a/collector/src/main/java/com/usthe/collector/dispatch/entrance/internal/CollectResponseEventListener.java b/collector/src/main/java/com/usthe/collector/dispatch/entrance/internal/CollectResponseEventListener.java new file mode 100644 index 0000000..c4d6db4 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/dispatch/entrance/internal/CollectResponseEventListener.java @@ -0,0 +1,20 @@ +package com.usthe.collector.dispatch.entrance.internal; + +import com.usthe.common.entity.message.CollectRep; + +import java.util.EventListener; +import java.util.List; + +/** + * 一次性采集任务响应结果监听器 + * @author tomsun28 + * @date 2021/11/16 10:09 + */ +public interface CollectResponseEventListener extends EventListener { + + /** + * 采集任务完成结果通知 + * @param responseMetrics 响应数据 + */ + default void response(List responseMetrics) {} +} diff --git a/collector/src/main/java/com/usthe/collector/dispatch/export/MetricsDataExporter.java b/collector/src/main/java/com/usthe/collector/dispatch/export/MetricsDataExporter.java new file mode 100644 index 0000000..82972a5 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/dispatch/export/MetricsDataExporter.java @@ -0,0 +1,56 @@ +package com.usthe.collector.dispatch.export; + +import com.usthe.common.entity.message.CollectRep; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.stereotype.Component; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * 采集数据消息发送 + * @author tomsun28 + * @date 2021/11/3 15:22 + */ +@Component +@Slf4j +public class MetricsDataExporter implements DisposableBean { + + private final LinkedBlockingQueue metricsDataToAlertQueue; + private final LinkedBlockingQueue metricsDataToWarehouseInfluxQueue; + private final LinkedBlockingQueue metricsDataToWarehouseRedisQueue; + + public MetricsDataExporter() { + metricsDataToAlertQueue = new LinkedBlockingQueue<>(); + metricsDataToWarehouseInfluxQueue = new LinkedBlockingQueue<>(); + metricsDataToWarehouseRedisQueue = new LinkedBlockingQueue<>(); + } + + public CollectRep.MetricsData pollAlertMetricsData() throws InterruptedException { + return metricsDataToAlertQueue.poll(2, TimeUnit.SECONDS); + } + + public CollectRep.MetricsData pollWarehouseInfluxMetricsData() throws InterruptedException { + return metricsDataToAlertQueue.poll(2, TimeUnit.SECONDS); + } + + public CollectRep.MetricsData pollWarehouseRedisMetricsData() throws InterruptedException { + return metricsDataToWarehouseRedisQueue.poll(2, TimeUnit.SECONDS); + } + + /** + * 发送消息 + * @param metricsData 指标组采集数据 + */ + public void send(CollectRep.MetricsData metricsData) { + metricsDataToAlertQueue.offer(metricsData); + metricsDataToWarehouseInfluxQueue.offer(metricsData); + metricsDataToWarehouseRedisQueue.offer(metricsData); + } + + @Override + public void destroy() throws Exception { + metricsDataToAlertQueue.clear(); + } +} diff --git a/collector/src/main/java/com/usthe/collector/dispatch/timer/HashedWheelTimer.java b/collector/src/main/java/com/usthe/collector/dispatch/timer/HashedWheelTimer.java new file mode 100644 index 0000000..9e71563 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/dispatch/timer/HashedWheelTimer.java @@ -0,0 +1,809 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.usthe.collector.dispatch.timer; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Locale; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A {@link Timer} optimized for approximated I/O timeout scheduling. + * + *

Tick Duration

+ *

+ * As described with 'approximated', this timer does not execute the scheduled + * {@link TimerTask} on time. {@link HashedWheelTimer}, on every tick, will + * check if there are any {@link TimerTask}s behind the schedule and execute + * them. + *

+ * You can increase or decrease the accuracy of the execution timing by + * specifying smaller or larger tick duration in the constructor. In most + * network applications, I/O timeout does not need to be accurate. Therefore, + * the default tick duration is 100 milliseconds and you will not need to try + * different configurations in most cases. + * + *

Ticks per Wheel (Wheel Size)

+ *

+ * {@link HashedWheelTimer} maintains a data structure called 'wheel'. + * To put simply, a wheel is a hash table of {@link TimerTask}s whose hash + * function is 'dead line of the task'. The default number of ticks per wheel + * (i.e. the size of the wheel) is 512. You could specify a larger value + * if you are going to schedule a lot of timeouts. + * + *

Do not create many instances.

+ *

+ * {@link HashedWheelTimer} creates a new thread whenever it is instantiated and + * started. Therefore, you should make sure to create only one instance and + * share it across your application. One of the common mistakes, that makes + * your application unresponsive, is to create a new instance for every connection. + * + *

Implementation Details

+ *

+ * {@link HashedWheelTimer} is based on + * George Varghese and + * Tony Lauck's paper, + * 'Hashed + * and Hierarchical Timing Wheels: data structures to efficiently implement a + * timer facility'. More comprehensive slides are located + * here. + * @author from netty | dubbo + */ +@SuppressWarnings("PMD") +public class HashedWheelTimer implements Timer { + + private static final Logger logger = LoggerFactory.getLogger(HashedWheelTimer.class); + + private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger(); + private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean(); + private static final int INSTANCE_COUNT_LIMIT = 64; + private static final AtomicIntegerFieldUpdater WORKER_STATE_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState"); + + private final Worker worker = new Worker(); + private final Thread workerThread; + + private static final int WORKER_STATE_INIT = 0; + private static final int WORKER_STATE_STARTED = 1; + private static final int WORKER_STATE_SHUTDOWN = 2; + + /** + * 0 - init, 1 - started, 2 - shut down + */ + @SuppressWarnings({"unused", "FieldMayBeFinal"}) + private volatile int workerState; + + private final long tickDuration; + private final HashedWheelBucket[] wheel; + private final int mask; + private final CountDownLatch startTimeInitialized = new CountDownLatch(1); + private final Queue timeouts = new LinkedBlockingQueue<>(); + private final Queue cancelledTimeouts = new LinkedBlockingQueue<>(); + private final AtomicLong pendingTimeouts = new AtomicLong(0); + private final long maxPendingTimeouts; + + private volatile long startTime; + + /** + * Creates a new timer with the default thread factory + * ({@link Executors#defaultThreadFactory()}), default tick duration, and + * default number of ticks per wheel. + */ + public HashedWheelTimer() { + this(Executors.defaultThreadFactory()); + } + + /** + * Creates a new timer with the default thread factory + * ({@link Executors#defaultThreadFactory()}) and default number of ticks + * per wheel. + * + * @param tickDuration the duration between tick + * @param unit the time unit of the {@code tickDuration} + * @throws NullPointerException if {@code unit} is {@code null} + * @throws IllegalArgumentException if {@code tickDuration} is <= 0 + */ + public HashedWheelTimer(long tickDuration, TimeUnit unit) { + this(Executors.defaultThreadFactory(), tickDuration, unit); + } + + /** + * Creates a new timer with the default thread factory + * ({@link Executors#defaultThreadFactory()}). + * + * @param tickDuration the duration between tick + * @param unit the time unit of the {@code tickDuration} + * @param ticksPerWheel the size of the wheel + * @throws NullPointerException if {@code unit} is {@code null} + * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0 + */ + public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) { + this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel); + } + + /** + * Creates a new timer with the default tick duration and default number of + * ticks per wheel. + * + * @param threadFactory a {@link ThreadFactory} that creates a + * background {@link Thread} which is dedicated to + * {@link TimerTask} execution. + * @throws NullPointerException if {@code threadFactory} is {@code null} + */ + public HashedWheelTimer(ThreadFactory threadFactory) { + this(threadFactory, 100, TimeUnit.MILLISECONDS); + } + + /** + * Creates a new timer with the default number of ticks per wheel. + * + * @param threadFactory a {@link ThreadFactory} that creates a + * background {@link Thread} which is dedicated to + * {@link TimerTask} execution. + * @param tickDuration the duration between tick + * @param unit the time unit of the {@code tickDuration} + * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null} + * @throws IllegalArgumentException if {@code tickDuration} is <= 0 + */ + public HashedWheelTimer( + ThreadFactory threadFactory, long tickDuration, TimeUnit unit) { + this(threadFactory, tickDuration, unit, 512); + } + + /** + * Creates a new timer. + * + * @param threadFactory a {@link ThreadFactory} that creates a + * background {@link Thread} which is dedicated to + * {@link TimerTask} execution. + * @param tickDuration the duration between tick + * @param unit the time unit of the {@code tickDuration} + * @param ticksPerWheel the size of the wheel + * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null} + * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0 + */ + public HashedWheelTimer( + ThreadFactory threadFactory, + long tickDuration, TimeUnit unit, int ticksPerWheel) { + this(threadFactory, tickDuration, unit, ticksPerWheel, -1); + } + + /** + * Creates a new timer. + * + * @param threadFactory a {@link ThreadFactory} that creates a + * background {@link Thread} which is dedicated to + * {@link TimerTask} execution. + * @param tickDuration the duration between tick + * @param unit the time unit of the {@code tickDuration} + * @param ticksPerWheel the size of the wheel + * @param maxPendingTimeouts The maximum number of pending timeouts after which call to + * {@code newTimeout} will result in + * {@link RejectedExecutionException} + * being thrown. No maximum pending timeouts limit is assumed if + * this value is 0 or negative. + * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null} + * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0 + */ + public HashedWheelTimer( + ThreadFactory threadFactory, + long tickDuration, TimeUnit unit, int ticksPerWheel, + long maxPendingTimeouts) { + + if (threadFactory == null) { + throw new NullPointerException("threadFactory"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + if (tickDuration <= 0) { + throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration); + } + if (ticksPerWheel <= 0) { + throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel); + } + + // Normalize ticksPerWheel to power of two and initialize the wheel. + wheel = createWheel(ticksPerWheel); + mask = wheel.length - 1; + + // Convert tickDuration to nanos. + this.tickDuration = unit.toNanos(tickDuration); + + // Prevent overflow. + if (this.tickDuration >= Long.MAX_VALUE / wheel.length) { + throw new IllegalArgumentException(String.format( + "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", + tickDuration, Long.MAX_VALUE / wheel.length)); + } + workerThread = threadFactory.newThread(worker); + + this.maxPendingTimeouts = maxPendingTimeouts; + + if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT && + WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) { + reportTooManyInstances(); + } + } + + @Override + protected void finalize() throws Throwable { + try { + super.finalize(); + } finally { + // This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. If + // we have not yet shutdown then we want to make sure we decrement the active instance count. + if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) { + INSTANCE_COUNTER.decrementAndGet(); + } + } + } + + private static HashedWheelBucket[] createWheel(int ticksPerWheel) { + if (ticksPerWheel <= 0) { + throw new IllegalArgumentException( + "ticksPerWheel must be greater than 0: " + ticksPerWheel); + } + if (ticksPerWheel > 1073741824) { + throw new IllegalArgumentException( + "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel); + } + + ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); + HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel]; + for (int i = 0; i < wheel.length; i++) { + wheel[i] = new HashedWheelBucket(); + } + return wheel; + } + + private static int normalizeTicksPerWheel(int ticksPerWheel) { + int normalizedTicksPerWheel = ticksPerWheel - 1; + normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 1; + normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 2; + normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 4; + normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 8; + normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 16; + return normalizedTicksPerWheel + 1; + } + + /** + * Starts the background thread explicitly. The background thread will + * start automatically on demand even if you did not call this method. + * + * @throws IllegalStateException if this timer has been + * {@linkplain #stop() stopped} already + */ + public void start() { + switch (WORKER_STATE_UPDATER.get(this)) { + case WORKER_STATE_INIT: + if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { + workerThread.start(); + } + break; + case WORKER_STATE_STARTED: + break; + case WORKER_STATE_SHUTDOWN: + throw new IllegalStateException("cannot be started once stopped"); + default: + throw new Error("Invalid WorkerState"); + } + + // Wait until the startTime is initialized by the worker. + while (startTime == 0) { + try { + startTimeInitialized.await(); + } catch (InterruptedException ignore) { + // Ignore - it will be ready very soon. + } + } + } + + @Override + public Set stop() { + if (Thread.currentThread() == workerThread) { + throw new IllegalStateException( + HashedWheelTimer.class.getSimpleName() + + ".stop() cannot be called from " + + TimerTask.class.getSimpleName()); + } + + if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) { + // workerState can be 0 or 2 at this moment - let it always be 2. + if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) { + INSTANCE_COUNTER.decrementAndGet(); + } + + return Collections.emptySet(); + } + + try { + boolean interrupted = false; + while (workerThread.isAlive()) { + workerThread.interrupt(); + try { + workerThread.join(100); + } catch (InterruptedException ignored) { + interrupted = true; + } + } + + if (interrupted) { + Thread.currentThread().interrupt(); + } + } finally { + INSTANCE_COUNTER.decrementAndGet(); + } + return worker.unprocessedTimeouts(); + } + + @Override + public boolean isStop() { + return WORKER_STATE_SHUTDOWN == WORKER_STATE_UPDATER.get(this); + } + + @Override + public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { + if (task == null) { + throw new NullPointerException("task"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + + long pendingTimeoutsCount = pendingTimeouts.incrementAndGet(); + + if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { + pendingTimeouts.decrementAndGet(); + throw new RejectedExecutionException("Number of pending timeouts (" + + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + + "timeouts (" + maxPendingTimeouts + ")"); + } + + start(); + + // Add the timeout to the timeout queue which will be processed on the next tick. + // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. + long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; + + // Guard against overflow. + if (delay > 0 && deadline < 0) { + deadline = Long.MAX_VALUE; + } + HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); + timeouts.add(timeout); + return timeout; + } + + /** + * Returns the number of pending timeouts of this {@link Timer}. + */ + public long pendingTimeouts() { + return pendingTimeouts.get(); + } + + private static void reportTooManyInstances() { + logger.error("You are creating too many HashedWheelTimer instances. " + + "HashedWheelTimer is a shared resource that must be reused across the JVM," + + "so that only a few instances are created."); + } + + private final class Worker implements Runnable { + private final Set unprocessedTimeouts = new HashSet(); + + private long tick; + + @Override + public void run() { + // Initialize the startTime. + startTime = System.nanoTime(); + if (startTime == 0) { + // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized. + startTime = 1; + } + + // Notify the other threads waiting for the initialization at start(). + startTimeInitialized.countDown(); + + do { + final long deadline = waitForNextTick(); + if (deadline > 0) { + int idx = (int) (tick & mask); + processCancelledTasks(); + HashedWheelBucket bucket = + wheel[idx]; + transferTimeoutsToBuckets(); + bucket.expireTimeouts(deadline); + tick++; + } + } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); + + // Fill the unprocessedTimeouts so we can return them from stop() method. + for (HashedWheelBucket bucket : wheel) { + bucket.clearTimeouts(unprocessedTimeouts); + } + for (; ; ) { + HashedWheelTimeout timeout = timeouts.poll(); + if (timeout == null) { + break; + } + if (!timeout.isCancelled()) { + unprocessedTimeouts.add(timeout); + } + } + processCancelledTasks(); + } + + private void transferTimeoutsToBuckets() { + // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just + // adds new timeouts in a loop. + for (int i = 0; i < 100000; i++) { + HashedWheelTimeout timeout = timeouts.poll(); + if (timeout == null) { + // all processed + break; + } + if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) { + // Was cancelled in the meantime. + continue; + } + + long calculated = timeout.deadline / tickDuration; + timeout.remainingRounds = (calculated - tick) / wheel.length; + + // Ensure we don't schedule for past. + final long ticks = Math.max(calculated, tick); + int stopIndex = (int) (ticks & mask); + + HashedWheelBucket bucket = wheel[stopIndex]; + bucket.addTimeout(timeout); + } + } + + private void processCancelledTasks() { + for (; ; ) { + HashedWheelTimeout timeout = cancelledTimeouts.poll(); + if (timeout == null) { + // all processed + break; + } + try { + timeout.remove(); + } catch (Throwable t) { + if (logger.isWarnEnabled()) { + logger.warn("An exception was thrown while process a cancellation task", t); + } + } + } + } + + /** + * calculate goal nanoTime from startTime and current tick number, + * then wait until that goal has been reached. + * + * @return Long.MIN_VALUE if received a shutdown request, + * current time otherwise (with Long.MIN_VALUE changed by +1) + */ + private long waitForNextTick() { + long deadline = tickDuration * (tick + 1); + + for (; ; ) { + final long currentTime = System.nanoTime() - startTime; + long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; + + if (sleepTimeMs <= 0) { + if (currentTime == Long.MIN_VALUE) { + return -Long.MAX_VALUE; + } else { + return currentTime; + } + } + if (isWindows()) { + sleepTimeMs = sleepTimeMs / 10 * 10; + } + + try { + Thread.sleep(sleepTimeMs); + } catch (InterruptedException ignored) { + if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) { + return Long.MIN_VALUE; + } + } + } + } + + Set unprocessedTimeouts() { + return Collections.unmodifiableSet(unprocessedTimeouts); + } + } + + private static final class HashedWheelTimeout implements Timeout { + + private static final int ST_INIT = 0; + private static final int ST_CANCELLED = 1; + private static final int ST_EXPIRED = 2; + private static final AtomicIntegerFieldUpdater STATE_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state"); + + private final HashedWheelTimer timer; + private final TimerTask task; + private final long deadline; + + @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization"}) + private volatile int state = ST_INIT; + + /** + * RemainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the + * HashedWheelTimeout will be added to the correct HashedWheelBucket. + */ + long remainingRounds; + + /** + * This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list. + * As only the workerThread will act on it there is no need for synchronization / volatile. + */ + HashedWheelTimeout next; + HashedWheelTimeout prev; + + /** + * The bucket to which the timeout was added + */ + HashedWheelBucket bucket; + + HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) { + this.timer = timer; + this.task = task; + this.deadline = deadline; + } + + @Override + public Timer timer() { + return timer; + } + + @Override + public TimerTask task() { + return task; + } + + @Override + public boolean cancel() { + // only update the state it will be removed from HashedWheelBucket on next tick. + if (!compareAndSetState(ST_INIT, ST_CANCELLED)) { + return false; + } + // If a task should be canceled we put this to another queue which will be processed on each tick. + // So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way + // we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible. + timer.cancelledTimeouts.add(this); + return true; + } + + void remove() { + HashedWheelBucket bucket = this.bucket; + if (bucket != null) { + bucket.remove(this); + } else { + timer.pendingTimeouts.decrementAndGet(); + } + } + + public boolean compareAndSetState(int expected, int state) { + return STATE_UPDATER.compareAndSet(this, expected, state); + } + + public int state() { + return state; + } + + @Override + public boolean isCancelled() { + return state() == ST_CANCELLED; + } + + @Override + public boolean isExpired() { + return state() == ST_EXPIRED; + } + + public void expire() { + if (!compareAndSetState(ST_INIT, ST_EXPIRED)) { + return; + } + + try { + task.run(this); + } catch (Throwable t) { + if (logger.isWarnEnabled()) { + logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t); + } + } + } + + @Override + public String toString() { + final long currentTime = System.nanoTime(); + long remaining = deadline - currentTime + timer.startTime; + + StringBuilder buf = new StringBuilder(192) + .append("HashedWheelTimer") + .append('(') + .append("deadline: "); + if (remaining > 0) { + buf.append(remaining) + .append(" ns later"); + } else if (remaining < 0) { + buf.append(-remaining) + .append(" ns ago"); + } else { + buf.append("now"); + } + + if (isCancelled()) { + buf.append(", cancelled"); + } + + return buf.append(", task: ") + .append(task()) + .append(')') + .toString(); + } + } + + /** + * Bucket that stores HashedWheelTimeouts. These are stored in a linked-list like datastructure to allow easy + * removal of HashedWheelTimeouts in the middle. Also the HashedWheelTimeout act as nodes themself and so no + * extra object creation is needed. + */ + private static final class HashedWheelBucket { + + /** + * Used for the linked-list datastructure + */ + private HashedWheelTimeout head; + private HashedWheelTimeout tail; + + /** + * Add {@link HashedWheelTimeout} to this bucket. + */ + void addTimeout(HashedWheelTimeout timeout) { + assert timeout.bucket == null; + timeout.bucket = this; + if (head == null) { + head = tail = timeout; + } else { + tail.next = timeout; + timeout.prev = tail; + tail = timeout; + } + } + + /** + * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}. + */ + void expireTimeouts(long deadline) { + HashedWheelTimeout timeout = head; + + // process all timeouts + while (timeout != null) { + HashedWheelTimeout next = timeout.next; + if (timeout.remainingRounds <= 0) { + next = remove(timeout); + if (timeout.deadline <= deadline) { + timeout.expire(); + } else { + // The timeout was placed into a wrong slot. This should never happen. + throw new IllegalStateException(String.format( + "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); + } + } else if (timeout.isCancelled()) { + next = remove(timeout); + } else { + timeout.remainingRounds--; + } + timeout = next; + } + } + + public HashedWheelTimeout remove(HashedWheelTimeout timeout) { + HashedWheelTimeout next = timeout.next; + // remove timeout that was either processed or cancelled by updating the linked-list + if (timeout.prev != null) { + timeout.prev.next = next; + } + if (timeout.next != null) { + timeout.next.prev = timeout.prev; + } + + if (timeout == head) { + // if timeout is also the tail we need to adjust the entry too + if (timeout == tail) { + tail = null; + head = null; + } else { + head = next; + } + } else if (timeout == tail) { + // if the timeout is the tail modify the tail to be the prev node. + tail = timeout.prev; + } + // null out prev, next and bucket to allow for GC. + timeout.prev = null; + timeout.next = null; + timeout.bucket = null; + timeout.timer.pendingTimeouts.decrementAndGet(); + return next; + } + + /** + * Clear this bucket and return all not expired / cancelled {@link Timeout}s. + */ + void clearTimeouts(Set set) { + for (; ; ) { + HashedWheelTimeout timeout = pollTimeout(); + if (timeout == null) { + return; + } + if (timeout.isExpired() || timeout.isCancelled()) { + continue; + } + set.add(timeout); + } + } + + private HashedWheelTimeout pollTimeout() { + HashedWheelTimeout head = this.head; + if (head == null) { + return null; + } + HashedWheelTimeout next = head.next; + if (next == null) { + tail = this.head = null; + } else { + this.head = next; + next.prev = null; + } + + // null out prev and next to allow for GC. + head.next = null; + head.prev = null; + head.bucket = null; + return head; + } + } + + private static final boolean IS_OS_WINDOWS = System.getProperty("os.name", "").toLowerCase(Locale.US).contains("win"); + + private boolean isWindows() { + return IS_OS_WINDOWS; + } +} diff --git a/collector/src/main/java/com/usthe/collector/dispatch/timer/Timeout.java b/collector/src/main/java/com/usthe/collector/dispatch/timer/Timeout.java new file mode 100644 index 0000000..f82a6a3 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/dispatch/timer/Timeout.java @@ -0,0 +1,57 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.usthe.collector.dispatch.timer; + +/** + * A handle associated with a {@link TimerTask} that is returned by a + * {@link Timer}. + * @author from netty | dubbo + */ +@SuppressWarnings("PMD") +public interface Timeout { + + /** + * Returns the {@link Timer} that created this handle. + */ + Timer timer(); + + /** + * Returns the {@link TimerTask} which is associated with this handle. + */ + TimerTask task(); + + /** + * Returns {@code true} if and only if the {@link TimerTask} associated + * with this handle has been expired. + */ + boolean isExpired(); + + /** + * Returns {@code true} if and only if the {@link TimerTask} associated + * with this handle has been cancelled. + */ + boolean isCancelled(); + + /** + * Attempts to cancel the {@link TimerTask} associated with this handle. + * If the task has been executed or cancelled already, it will return with + * no side effect. + * + * @return True if the cancellation completed successfully, otherwise false + */ + boolean cancel(); +} diff --git a/collector/src/main/java/com/usthe/collector/dispatch/timer/Timer.java b/collector/src/main/java/com/usthe/collector/dispatch/timer/Timer.java new file mode 100644 index 0000000..e962752 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/dispatch/timer/Timer.java @@ -0,0 +1,58 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.usthe.collector.dispatch.timer; + +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * Schedules {@link TimerTask}s for one-time future execution in a background + * thread. + * @author from netty | dubbo + */ +public interface Timer { + + /** + * Schedules the specified {@link TimerTask} for one-time execution after + * the specified delay. + * + * @param task the {@link TimerTask + * @param delay the delay + * @param unit the unit of time + * @return a handle which is associated with the specified task + * @throws IllegalStateException if this timer has been {@linkplain #stop() stopped} already + * @throws RejectedExecutionException if the pending timeouts are too many and creating new timeout + * can cause instability in the system. + */ + Timeout newTimeout(TimerTask task, long delay, TimeUnit unit); + + /** + * Releases all resources acquired by this {@link Timer} and cancels all + * tasks which were scheduled but not executed yet. + * + * @return the handles associated with the tasks which were canceled by + * this method + */ + Set stop(); + + /** + * the timer is stop + * + * @return true for stop + */ + boolean isStop(); +} diff --git a/collector/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatch.java b/collector/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatch.java new file mode 100644 index 0000000..578e504 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatch.java @@ -0,0 +1,46 @@ +package com.usthe.collector.dispatch.timer; + + +import com.usthe.collector.dispatch.entrance.internal.CollectResponseEventListener; +import com.usthe.common.entity.job.Job; +import com.usthe.common.entity.message.CollectRep; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * 时间轮调度接口 + * @author tomsun28 + * @date 2021/10/17 22:14 + */ +public interface TimerDispatch { + + /** + * 增加新的job + * @param addJob job + * @param eventListener 一次性同步任务监听器,异步任务不需要listener + */ + void addJob(Job addJob, CollectResponseEventListener eventListener); + + /** + * 调度循环周期性job + * @param timerTask timerTask + * @param interval 开始调度的间隔时间 + * @param timeUnit 时间单位 + */ + void cyclicJob(WheelTimerTask timerTask, long interval, TimeUnit timeUnit); + + /** + * 删除存在的job + * @param jobId jobId + * @param isCyclic 是否是周期性任务,true是, false为临时性任务 + */ + void deleteJob(long jobId, boolean isCyclic); + + /** + * 一次性同步采集任务采集结果通知监听器 + * @param jobId jobId + * @param metricsDataTemps 采集结果数据 + */ + void responseSyncJobData(long jobId, List metricsDataTemps); +} diff --git a/collector/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatcher.java b/collector/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatcher.java new file mode 100644 index 0000000..46a4b05 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatcher.java @@ -0,0 +1,95 @@ +package com.usthe.collector.dispatch.timer; + +import com.usthe.collector.dispatch.entrance.internal.CollectResponseEventListener; +import com.usthe.common.entity.job.Job; +import com.usthe.common.entity.message.CollectRep; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +/** + * @author tomsun28 + * @date 2021/10/17 23:06 + */ +@Component +public class TimerDispatcher implements TimerDispatch { + + /** + * 时间轮调度 + */ + private Timer wheelTimer; + /** + * 已存在的周期性调度任务 + */ + private Map currentCyclicTaskMap; + /** + * 已存在的临时性调度任务 + */ + private Map currentTempTaskMap; + /** + * 一次性任务响应监听器持有 + * jobId - listener + */ + private Map eventListeners; + + public TimerDispatcher() { + this.wheelTimer = new HashedWheelTimer(r -> { + Thread ret = new Thread(r, "wheelTimer"); + ret.setDaemon(true); + return ret; + }, 10, TimeUnit.SECONDS, 512); + this.currentCyclicTaskMap = new ConcurrentHashMap<>(1024); + this.currentTempTaskMap = new ConcurrentHashMap<>(64); + eventListeners = new ConcurrentHashMap<>(64); + } + + @Override + public void addJob(Job addJob, CollectResponseEventListener eventListener) { + WheelTimerTask timerJob = new WheelTimerTask(addJob); + if (addJob.isCyclic()) { + Timeout timeout = wheelTimer.newTimeout(timerJob, addJob.getInterval(), TimeUnit.SECONDS); + currentCyclicTaskMap.put(addJob.getId(), timeout); + } else { + Timeout timeout = wheelTimer.newTimeout(timerJob, 0, TimeUnit.SECONDS); + currentTempTaskMap.put(addJob.getId(), timeout); + eventListeners.put(addJob.getId(), eventListener); + } + } + + @Override + public void cyclicJob(WheelTimerTask timerTask, long interval, TimeUnit timeUnit) { + Long jobId = timerTask.getJob().getId(); + // 判断此周期性job是否已经被取消 + if (currentCyclicTaskMap.containsKey(jobId)) { + Timeout timeout = wheelTimer.newTimeout(timerTask, interval, TimeUnit.SECONDS); + currentCyclicTaskMap.put(timerTask.getJob().getId(), timeout); + } + } + + @Override + public void deleteJob(long jobId, boolean isCyclic) { + if (isCyclic) { + Timeout timeout = currentCyclicTaskMap.remove(jobId); + if (timeout != null) { + timeout.cancel(); + } + } else { + Timeout timeout = currentTempTaskMap.remove(jobId); + if (timeout != null) { + timeout.cancel(); + } + } + } + + @Override + public void responseSyncJobData(long jobId, List metricsDataTemps) { + currentTempTaskMap.remove(jobId); + CollectResponseEventListener eventListener = eventListeners.remove(jobId); + if (eventListener != null) { + eventListener.response(metricsDataTemps); + } + } +} diff --git a/collector/src/main/java/com/usthe/collector/dispatch/timer/TimerTask.java b/collector/src/main/java/com/usthe/collector/dispatch/timer/TimerTask.java new file mode 100644 index 0000000..bec8423 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/dispatch/timer/TimerTask.java @@ -0,0 +1,36 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.usthe.collector.dispatch.timer; + +import java.util.concurrent.TimeUnit; + +/** + * A task which is executed after the delay specified with + * {@link Timer#newTimeout(TimerTask, long, TimeUnit)} (TimerTask, long, TimeUnit)}. + * @author from netty | dubbo + */ +public interface TimerTask { + + /** + * Executed after the delay specified with + * {@link Timer#newTimeout(TimerTask, long, TimeUnit)}. + * + * @param timeout a handle which is associated with this task + * @throws Exception when error happen + */ + void run(Timeout timeout) throws Exception; +} diff --git a/collector/src/main/java/com/usthe/collector/dispatch/timer/WheelTimerTask.java b/collector/src/main/java/com/usthe/collector/dispatch/timer/WheelTimerTask.java new file mode 100644 index 0000000..6998ffe --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/dispatch/timer/WheelTimerTask.java @@ -0,0 +1,125 @@ +package com.usthe.collector.dispatch.timer; + +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; +import com.usthe.collector.dispatch.MetricsTaskDispatch; +import com.usthe.collector.util.SpringContextHolder; +import com.usthe.common.entity.job.Configmap; +import com.usthe.common.entity.job.Job; +import com.usthe.common.entity.job.Metrics; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * TimerTask实现 + * @author tomsun28 + * @date 2021/11/1 17:18 + */ +public class WheelTimerTask implements TimerTask { + + private final Job job; + private final MetricsTaskDispatch metricsTaskDispatch; + private static final Gson GSON = new Gson(); + + public WheelTimerTask(Job job) { + this.metricsTaskDispatch = SpringContextHolder.getBean(MetricsTaskDispatch.class); + this.job = job; + // 初始化job 将监控实际参数值对采集字段进行替换 + initJobMetrics(job); + } + + /** + * 初始化job填充信息 + * @param job job + */ + private void initJobMetrics(Job job) { + // 将监控实际参数值对采集字段进行替换 + List config = job.getConfigmap(); + Map configmap = config.stream().collect(Collectors.toMap(Configmap::getKey, item -> item)); + List metrics = job.getMetrics(); + List metricsTmp = new ArrayList<>(metrics.size()); + for (Metrics metric : metrics) { + JsonElement jsonElement = GSON.toJsonTree(metric); + jsonElement = replaceSpecialValue(jsonElement, configmap); + metric = GSON.fromJson(jsonElement, Metrics.class); + metricsTmp.add(metric); + } + job.setMetrics(metricsTmp); + } + + /** + * json参数替换 + * @param jsonElement json + * @param configmap 参数map + * @return json + */ + private JsonElement replaceSpecialValue(JsonElement jsonElement, Map configmap) { + if (jsonElement.isJsonObject()) { + JsonObject jsonObject = jsonElement.getAsJsonObject(); + Iterator> iterator = jsonObject.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + JsonElement element = entry.getValue(); + if (element.isJsonPrimitive()) { + // 判断是否含有特殊字符 替换 + String value = element.getAsString(); + if (value.startsWith("^_^") && value.endsWith("^_^")) { + value = value.replaceAll("\\^_\\^", ""); + Configmap param = configmap.get(value); + if (param != null) { + value = (String) param.getValue(); + jsonObject.addProperty(entry.getKey(), value); + } else { + iterator.remove(); + } + } + } else { + jsonObject.add(entry.getKey(), replaceSpecialValue(entry.getValue(), configmap)); + } + } + } else if (jsonElement.isJsonArray()) { + JsonArray jsonArray = jsonElement.getAsJsonArray(); + Iterator iterator = jsonArray.iterator(); + int index = 0; + while (iterator.hasNext()) { + JsonElement element = iterator.next(); + if (element.isJsonPrimitive()) { + // 判断是否含有特殊字符 替换 + String value = element.getAsString(); + if (value.startsWith("^_^") && value.endsWith("^_^")) { + value = value.replaceAll("\\^_\\^", ""); + Configmap param = configmap.get(value); + if (param != null) { + value = (String) param.getValue(); + jsonArray.set(index, new JsonPrimitive(value)); + } else { + iterator.remove(); + } + } + } else { + jsonArray.set(index, replaceSpecialValue(element, configmap)); + } + index++; + } + } + return jsonElement; + } + + + @Override + public void run(Timeout timeout) throws Exception { + job.setDispatchTime(System.currentTimeMillis()); + metricsTaskDispatch.dispatchMetricsTask(timeout); + } + + public Job getJob() { + return job; + } +} diff --git a/collector/src/main/java/com/usthe/collector/util/CollectorConstants.java b/collector/src/main/java/com/usthe/collector/util/CollectorConstants.java new file mode 100644 index 0000000..176acd3 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/util/CollectorConstants.java @@ -0,0 +1,11 @@ +package com.usthe.collector.util; + +/** + * collector 常量 + * @author tom + * @date 2021/12/3 12:15 + */ +public interface CollectorConstants { + + String RESPONSE_TIME = "responseTime"; +} diff --git a/collector/src/main/java/com/usthe/collector/util/JsonPathParser.java b/collector/src/main/java/com/usthe/collector/util/JsonPathParser.java new file mode 100644 index 0000000..88e78b7 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/util/JsonPathParser.java @@ -0,0 +1,44 @@ +package com.usthe.collector.util; + +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.Option; +import com.jayway.jsonpath.ParseContext; +import com.jayway.jsonpath.spi.cache.CacheProvider; +import com.jayway.jsonpath.spi.cache.LRUCache; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * json path parser + * @author tomsun28 + * @date 2021/11/20 10:16 + */ +public class JsonPathParser { + + private static final ParseContext PARSER; + + static { + Configuration conf = Configuration.defaultConfiguration() + .addOptions(Option.DEFAULT_PATH_LEAF_TO_NULL) + .addOptions(Option.ALWAYS_RETURN_LIST); + CacheProvider.setCache(new LRUCache(128)); + PARSER = JsonPath.using(conf); + } + + /** + * 使用jsonPath来解析json内容 + * @param content json内容 + * @param jsonPath jsonPath脚本 + * @return 解析后的内容 + */ + public static List> parseContentWithJsonPath(String content, String jsonPath) { + if (content == null || jsonPath == null || "".equals(content) || "".equals(jsonPath)) { + return Collections.emptyList(); + } + return PARSER.parse(content).read(jsonPath); + } + +} diff --git a/collector/src/main/java/com/usthe/collector/util/SpringContextHolder.java b/collector/src/main/java/com/usthe/collector/util/SpringContextHolder.java new file mode 100644 index 0000000..ef3a85f --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/util/SpringContextHolder.java @@ -0,0 +1,48 @@ +package com.usthe.collector.util; + +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.stereotype.Component; + +/** + * Spring的ApplicationContext的持有者,可以用静态方法的方式获取spring容器中的bean + * @author tomsun28 + * @date 21:07 2018/4/18 + */ +@Component +public class SpringContextHolder implements ApplicationContextAware { + + private static ApplicationContext applicationContext; + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + set(applicationContext); + } + + private static void set(ApplicationContext applicationContext) { + SpringContextHolder.applicationContext = applicationContext; + } + + public static ApplicationContext getApplicationContext() { + assertApplicationContext(); + return applicationContext; + } + + @SuppressWarnings("unchecked") + public static T getBean(String beanName) { + assertApplicationContext(); + return (T) applicationContext.getBean(beanName); + } + + public static T getBean(Class tClass) { + assertApplicationContext(); + return (T) applicationContext.getBean(tClass); + } + + private static void assertApplicationContext() { + if (null == SpringContextHolder.applicationContext) { + throw new RuntimeException("applicationContext为空,请检查是否注入springContextHolder"); + } + } +} diff --git a/collector/src/main/resources/META-INF/spring.factories b/collector/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000..6f385a1 --- /dev/null +++ b/collector/src/main/resources/META-INF/spring.factories @@ -0,0 +1,9 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ +com.usthe.collector.dispatch.timer.TimerDispatcher,\ +com.usthe.collector.dispatch.CommonDispatcher,\ +com.usthe.collector.dispatch.DispatchProperties,\ +com.usthe.collector.dispatch.MetricsCollectorQueue,\ +com.usthe.collector.dispatch.WorkerPool,\ +com.usthe.collector.dispatch.entrance.internal.CollectJobService,\ +com.usthe.collector.dispatch.export.MetricsDataExporter,\ +com.usthe.collector.util.SpringContextHolder diff --git a/collector/src/test/java/com/usthe/collector/collect/telnet/TelnetCollectImplTest.java b/collector/src/test/java/com/usthe/collector/collect/telnet/TelnetCollectImplTest.java new file mode 100644 index 0000000..4c4e92a --- /dev/null +++ b/collector/src/test/java/com/usthe/collector/collect/telnet/TelnetCollectImplTest.java @@ -0,0 +1,37 @@ +package com.usthe.collector.collect.telnet; + +import org.apache.commons.net.telnet.TelnetClient; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * @author tom + * @date 2021/12/4 19:39 + */ +class TelnetCollectImplTest { + + @Test + void telnet() { + TelnetClient telnetClient = null; + try { + telnetClient = new TelnetClient("vt200"); + telnetClient.setConnectTimeout(5000); + TelnetClient finalTelnetClient = telnetClient; + assertDoesNotThrow(() -> finalTelnetClient.connect("baidu.com",80)); + telnetClient.disconnect(); + } catch (IOException e) { + e.printStackTrace(); + } finally { + if (telnetClient != null) { + try { + telnetClient.disconnect(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } +} \ No newline at end of file diff --git a/common/pom.xml b/common/pom.xml index 65949be..5679795 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -5,7 +5,7 @@ monitor com.usthe.tancloud - 1.0-SNAPSHOT + 1.0 4.0.0 diff --git a/manager/pom.xml b/manager/pom.xml index d6da03c..ecca547 100644 --- a/manager/pom.xml +++ b/manager/pom.xml @@ -5,7 +5,7 @@ monitor com.usthe.tancloud - 1.0-SNAPSHOT + 1.0 4.0.0 manager @@ -22,25 +22,25 @@ com.usthe.tancloud common - 1.0-SNAPSHOT - - - - com.usthe.tancloud - scheduler - 1.0-SNAPSHOT + 1.0 com.usthe.tancloud warehouse - 1.0-SNAPSHOT + 1.0 com.usthe.tancloud alerter - 1.0-SNAPSHOT + 1.0 + + + + com.usthe.tancloud + collector + 1.0 @@ -123,6 +123,7 @@ sureness.yml banner.txt db/** + define/** @@ -167,7 +168,7 @@ - ../assembly/server/assembly.xml + ../script/assembly/server/assembly.xml diff --git a/manager/src/main/java/com/usthe/manager/service/impl/MonitorServiceImpl.java b/manager/src/main/java/com/usthe/manager/service/impl/MonitorServiceImpl.java index 024d003..2a6e9ca 100644 --- a/manager/src/main/java/com/usthe/manager/service/impl/MonitorServiceImpl.java +++ b/manager/src/main/java/com/usthe/manager/service/impl/MonitorServiceImpl.java @@ -1,5 +1,6 @@ package com.usthe.manager.service.impl; +import com.usthe.collector.dispatch.entrance.internal.CollectJobService; import com.usthe.common.entity.job.Configmap; import com.usthe.common.entity.job.Job; import com.usthe.common.entity.job.Metrics; @@ -20,7 +21,6 @@ import com.usthe.manager.service.AppService; import com.usthe.manager.service.MonitorService; import com.usthe.manager.support.exception.MonitorDatabaseException; import com.usthe.manager.support.exception.MonitorDetectException; -import com.usthe.scheduler.JobScheduling; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Page; @@ -52,7 +52,7 @@ public class MonitorServiceImpl implements MonitorService { private AppService appService; @Autowired - private JobScheduling jobScheduling; + private CollectJobService collectJobService; @Autowired private MonitorDao monitorDao; @@ -74,7 +74,7 @@ public class MonitorServiceImpl implements MonitorService { List configmaps = params.stream().map(param -> new Configmap(param.getField(), param.getValue(), param.getType())).collect(Collectors.toList()); appDefine.setConfigmap(configmaps); - List collectRep = jobScheduling.addSyncCollectJob(appDefine); + List collectRep = collectJobService.collectSyncJobData(appDefine); // 判断探测结果 失败则抛出探测异常 if (collectRep == null || collectRep.isEmpty()) { throw new MonitorDetectException("No collector response"); @@ -101,7 +101,7 @@ public class MonitorServiceImpl implements MonitorService { }).collect(Collectors.toList()); appDefine.setConfigmap(configmaps); // 下发采集任务得到jobId - long jobId = jobScheduling.addAsyncCollectJob(appDefine); + long jobId = collectJobService.addAsyncCollectJob(appDefine); // 下发成功后刷库 try { monitor.setId(monitorId); @@ -112,7 +112,7 @@ public class MonitorServiceImpl implements MonitorService { } catch (Exception e) { log.error(e.getMessage(), e); // 刷库异常取消之前的下发任务 - jobScheduling.cancelAsyncCollectJob(jobId); + collectJobService.cancelAsyncCollectJob(jobId); throw new MonitorDatabaseException(e.getMessage()); } } @@ -225,7 +225,7 @@ public class MonitorServiceImpl implements MonitorService { new Configmap(param.getField(), param.getValue(), param.getType())).collect(Collectors.toList()); appDefine.setConfigmap(configmaps); // 更新采集任务 - jobScheduling.updateAsyncCollectJob(appDefine); + collectJobService.updateAsyncCollectJob(appDefine); // 下发更新成功后刷库 try { monitor.setJobId(preMonitor.getJobId()); @@ -246,7 +246,7 @@ public class MonitorServiceImpl implements MonitorService { Monitor monitor = monitorOptional.get(); monitorDao.deleteById(id); paramDao.deleteParamsByMonitorId(id); - jobScheduling.cancelAsyncCollectJob(monitor.getJobId()); + collectJobService.cancelAsyncCollectJob(monitor.getJobId()); } } @@ -258,7 +258,7 @@ public class MonitorServiceImpl implements MonitorService { monitorDao.deleteAll(monitors); paramDao.deleteParamsByMonitorIdIn(ids); for (Monitor monitor : monitors) { - jobScheduling.cancelAsyncCollectJob(monitor.getJobId()); + collectJobService.cancelAsyncCollectJob(monitor.getJobId()); } } } @@ -299,7 +299,7 @@ public class MonitorServiceImpl implements MonitorService { if (!managedMonitors.isEmpty()) { monitorDao.saveAll(managedMonitors); for (Monitor monitor : managedMonitors) { - jobScheduling.cancelAsyncCollectJob(monitor.getJobId()); + collectJobService.cancelAsyncCollectJob(monitor.getJobId()); } } } @@ -326,7 +326,7 @@ public class MonitorServiceImpl implements MonitorService { new Configmap(param.getField(), param.getValue(), param.getType())).collect(Collectors.toList()); appDefine.setConfigmap(configmaps); // 下发采集任务 - jobScheduling.addAsyncCollectJob(appDefine); + collectJobService.addAsyncCollectJob(appDefine); } } } diff --git a/manager/src/main/java/com/usthe/manager/support/GlobalExceptionHandler.java b/manager/src/main/java/com/usthe/manager/support/GlobalExceptionHandler.java index 271234c..cde3378 100644 --- a/manager/src/main/java/com/usthe/manager/support/GlobalExceptionHandler.java +++ b/manager/src/main/java/com/usthe/manager/support/GlobalExceptionHandler.java @@ -4,7 +4,6 @@ package com.usthe.manager.support; import com.usthe.common.entity.dto.Message; import com.usthe.manager.support.exception.MonitorDatabaseException; import com.usthe.manager.support.exception.MonitorDetectException; -import com.usthe.scheduler.ScheduleException; import lombok.extern.slf4j.Slf4j; import org.springframework.dao.DataAccessException; import org.springframework.http.HttpStatus; @@ -133,23 +132,6 @@ public class GlobalExceptionHandler { return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(message); } - /** - * 处理分发调度器异常问题 - * @param exception 调度器异常问题 - * @return response - */ - @ExceptionHandler(ScheduleException.class) - @ResponseBody - ResponseEntity> handleScheduleException(ScheduleException exception) { - String errorMessage = "scheduler warning"; - if (exception != null) { - errorMessage = exception.getMessage(); - } - log.warn("[scheduler warning]-{}", errorMessage); - Message message = Message.builder().msg(errorMessage).code(MONITOR_CONFLICT_CODE).build(); - return ResponseEntity.status(HttpStatus.CONFLICT).body(message); - } - /** * handler the exception thrown for datastore error * @param exception datastore exception diff --git a/pom.xml b/pom.xml index ab68842..c4da00a 100644 --- a/pom.xml +++ b/pom.xml @@ -7,9 +7,8 @@ com.usthe.tancloud monitor pom - 1.0-SNAPSHOT + 1.0 - scheduler manager alerter common diff --git a/script/assembly/package-build.sh b/script/assembly/package-build.sh new file mode 100644 index 0000000..4bd63c6 --- /dev/null +++ b/script/assembly/package-build.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +cd ../../web-app + +ng build --prod --base-href /console/ + +cd .. + +mvn clean package diff --git a/assembly/server/assembly.xml b/script/assembly/server/assembly.xml similarity index 91% rename from assembly/server/assembly.xml rename to script/assembly/server/assembly.xml index 038731d..91d5399 100644 --- a/assembly/server/assembly.xml +++ b/script/assembly/server/assembly.xml @@ -22,8 +22,11 @@ http://maven.apache.org/ASSEMBLY/2.0.0 "> - ../assembly/server/bin + ../script/assembly/server/bin + + true bin + 0755 diff --git a/assembly/collector/bin/shutdown.sh b/script/assembly/server/bin/shutdown.sh similarity index 100% rename from assembly/collector/bin/shutdown.sh rename to script/assembly/server/bin/shutdown.sh diff --git a/assembly/server/bin/startup.sh b/script/assembly/server/bin/startup.sh similarity index 100% rename from assembly/server/bin/startup.sh rename to script/assembly/server/bin/startup.sh diff --git a/warehouse/pom.xml b/warehouse/pom.xml index eb4f04d..25bc65b 100644 --- a/warehouse/pom.xml +++ b/warehouse/pom.xml @@ -5,7 +5,7 @@ monitor com.usthe.tancloud - 1.0-SNAPSHOT + 1.0 4.0.0 @@ -16,7 +16,14 @@ com.usthe.tancloud common - 1.0-SNAPSHOT + 1.0 + + + + com.usthe.tancloud + collector + 1.0 + provided diff --git a/warehouse/src/main/java/com/usthe/warehouse/entrance/KafkaDataConsume.java b/warehouse/src/main/java/com/usthe/warehouse/entrance/KafkaDataConsume.java deleted file mode 100644 index ad99bed..0000000 --- a/warehouse/src/main/java/com/usthe/warehouse/entrance/KafkaDataConsume.java +++ /dev/null @@ -1,81 +0,0 @@ -package com.usthe.warehouse.entrance; - -import com.usthe.common.entity.message.CollectRep; -import com.usthe.warehouse.MetricsDataQueue; -import com.usthe.warehouse.WarehouseProperties; -import com.usthe.warehouse.WarehouseWorkerPool; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.springframework.beans.factory.DisposableBean; -import org.springframework.boot.autoconfigure.AutoConfigureAfter; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.context.annotation.Configuration; - -import java.time.Duration; -import java.util.Collections; -import java.util.Properties; - -/** - * 从Kafka消费指标组采集数据处理 - * @author tom - * @date 2021/11/24 18:03 - */ -@Configuration -@AutoConfigureAfter(value = {WarehouseProperties.class}) -@ConditionalOnProperty(prefix = "warehouse.entrance.kafka", - name = "enabled", havingValue = "true", matchIfMissing = true) -@Slf4j -public class KafkaDataConsume implements DisposableBean { - - private KafkaConsumer consumer; - private WarehouseWorkerPool workerPool; - private MetricsDataQueue dataQueue; - public KafkaDataConsume(WarehouseProperties properties, WarehouseWorkerPool workerPool, - MetricsDataQueue dataQueue) { - this.workerPool = workerPool; - this.dataQueue = dataQueue; - initConsumer(properties); - startConsumeData(); - } - - private void startConsumeData() { - Runnable runnable = () -> { - Thread.currentThread().setName("warehouse-kafka-data-consumer"); - while (!Thread.currentThread().isInterrupted()) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - records.forEach(record -> { - dataQueue.addMetricsDataToInflux(record.value()); - dataQueue.addMetricsDataToRedis(record.value()); - }); - } - }; - workerPool.executeJob(runnable); - } - - private void initConsumer(WarehouseProperties properties) { - if (properties == null || properties.getEntrance() == null || properties.getEntrance().getKafka() == null) { - log.error("init error, please config Warehouse kafka props in application.yml"); - throw new IllegalArgumentException("please config Warehouse kafka props"); - } - WarehouseProperties.EntranceProperties.KafkaProperties kafkaProp = properties.getEntrance().getKafka(); - Properties consumerProp = new Properties(); - consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProp.getServers()); - consumerProp.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProp.getGroupId()); - consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); - consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaMetricsDataDeserializer.class); - consumerProp.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); - consumerProp.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); - consumer = new KafkaConsumer<>(consumerProp); - consumer.subscribe(Collections.singleton(kafkaProp.getTopic())); - } - - @Override - public void destroy() throws Exception { - if (consumer != null) { - consumer.close(); - } - } -} diff --git a/warehouse/src/main/java/com/usthe/warehouse/entrance/KafkaMetricsDataDeserializer.java b/warehouse/src/main/java/com/usthe/warehouse/entrance/KafkaMetricsDataDeserializer.java deleted file mode 100644 index 544265a..0000000 --- a/warehouse/src/main/java/com/usthe/warehouse/entrance/KafkaMetricsDataDeserializer.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.usthe.warehouse.entrance; - -import com.usthe.common.entity.message.CollectRep; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.serialization.Deserializer; - -/** - * MetricsData的反序列化 - * @author tom - * @date 2021/11/24 17:29 - */ -@Slf4j -public class KafkaMetricsDataDeserializer implements Deserializer { - - @Override - public CollectRep.MetricsData deserialize(String topicName, byte[] bytes) { - try { - return CollectRep.MetricsData.parseFrom(bytes); - } catch (Exception e) { - log.error(e.getMessage(), e); - } - return null; - } -} diff --git a/warehouse/src/main/java/com/usthe/warehouse/store/InfluxdbDataStorage.java b/warehouse/src/main/java/com/usthe/warehouse/store/InfluxdbDataStorage.java deleted file mode 100644 index 22061a3..0000000 --- a/warehouse/src/main/java/com/usthe/warehouse/store/InfluxdbDataStorage.java +++ /dev/null @@ -1,134 +0,0 @@ -package com.usthe.warehouse.store; - -import com.influxdb.client.InfluxDBClient; -import com.influxdb.client.InfluxDBClientFactory; -import com.influxdb.client.WriteApi; -import com.influxdb.client.WriteOptions; -import com.influxdb.client.domain.WritePrecision; -import com.influxdb.client.write.Point; -import com.usthe.common.entity.message.CollectRep; -import com.usthe.common.util.CommonConstants; -import com.usthe.warehouse.MetricsDataQueue; -import com.usthe.warehouse.WarehouseProperties; -import com.usthe.warehouse.WarehouseWorkerPool; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.DisposableBean; -import org.springframework.boot.autoconfigure.AutoConfigureAfter; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.context.annotation.Configuration; - -import java.time.Instant; -import java.util.List; - -/** - * influxdb存储采集数据 - * @author tom - * @date 2021/11/24 18:23 - */ -@Configuration -@AutoConfigureAfter(value = {WarehouseProperties.class}) -@ConditionalOnProperty(prefix = "warehouse.store.influxdb", - name = "enabled", havingValue = "true", matchIfMissing = true) -@Slf4j -public class InfluxdbDataStorage implements DisposableBean { - - private InfluxDBClient influxClient; - private WriteApi writeApi; - private WarehouseWorkerPool workerPool; - private MetricsDataQueue dataQueue; - - public InfluxdbDataStorage (WarehouseProperties properties, WarehouseWorkerPool workerPool, - MetricsDataQueue dataQueue) { - this.workerPool = workerPool; - this.dataQueue = dataQueue; - initInfluxDbClient(properties); - startStorageData(); - } - - private void startStorageData() { - Runnable runnable = () -> { - Thread.currentThread().setName("warehouse-influxdb-data-storage"); - while (!Thread.currentThread().isInterrupted()) { - try { - CollectRep.MetricsData metricsData = dataQueue.pollInfluxMetricsData(); - if (metricsData != null) { - saveData(metricsData); - } - } catch (InterruptedException e) { - log.error(e.getMessage()); - } - } - }; - workerPool.executeJob(runnable); - workerPool.executeJob(runnable); - } - - private void initInfluxDbClient(WarehouseProperties properties) { - if (properties == null || properties.getStore() == null || properties.getStore().getInfluxdb() == null) { - log.error("init error, please config Warehouse influxdb props in application.yml"); - throw new IllegalArgumentException("please config Warehouse influxdb props"); - } - WarehouseProperties.StoreProperties.InfluxdbProperties influxdbProp = properties.getStore().getInfluxdb(); - influxClient = InfluxDBClientFactory.create(influxdbProp.getServers(), influxdbProp.getToken().toCharArray(), - influxdbProp.getOrg(), influxdbProp.getBucket()); - WriteOptions writeOptions = WriteOptions.builder() - .batchSize(1000) - .bufferLimit(1000) - .jitterInterval(1000) - .retryInterval(5000) - .build(); - writeApi = influxClient.makeWriteApi(writeOptions); - } - - - public void saveData(CollectRep.MetricsData metricsData) { - String measurement = metricsData.getApp() + "_" + metricsData.getMetrics(); - String monitorId = String.valueOf(metricsData.getId()); - Instant collectTime = Instant.ofEpochMilli(metricsData.getTime()); - - List fields = metricsData.getFieldsList(); - for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) { - Point point = Point.measurement(measurement) - .addTag("id", monitorId) - .addTag("instance", valueRow.getInstance()) - .time(collectTime, WritePrecision.MS); - for (int index = 0; index < fields.size(); index++) { - CollectRep.Field field = fields.get(index); - String value = valueRow.getColumns(index); - if (field.getType() == CommonConstants.TYPE_NUMBER) { - // number data - if (CommonConstants.NULL_VALUE.equals(value)) { - point.addField(field.getName(), (Number) null); - } else { - try { - double number = Double.parseDouble(value); - point.addField(field.getName(), number); - } catch (Exception e) { - log.warn(e.getMessage()); - point.addField(field.getName(), (Number) null); - } - } - } else { - // string - if (CommonConstants.NULL_VALUE.equals(value)) { - point.addField(field.getName(), (String) null); - } else { - point.addField(field.getName(), value); - } - } - } - writeApi.writePoint(point); - } - } - - - @Override - public void destroy() throws Exception { - if (writeApi != null) { - writeApi.close(); - } - if (influxClient != null) { - influxClient.close(); - } - } -} diff --git a/warehouse/src/main/java/com/usthe/warehouse/store/RedisDataStorage.java b/warehouse/src/main/java/com/usthe/warehouse/store/RedisDataStorage.java index 0287697..10a3d4f 100644 --- a/warehouse/src/main/java/com/usthe/warehouse/store/RedisDataStorage.java +++ b/warehouse/src/main/java/com/usthe/warehouse/store/RedisDataStorage.java @@ -1,7 +1,7 @@ package com.usthe.warehouse.store; +import com.usthe.collector.dispatch.export.MetricsDataExporter; import com.usthe.common.entity.message.CollectRep; -import com.usthe.warehouse.MetricsDataQueue; import com.usthe.warehouse.WarehouseProperties; import com.usthe.warehouse.WarehouseWorkerPool; import io.lettuce.core.RedisClient; @@ -34,13 +34,12 @@ public class RedisDataStorage implements DisposableBean { private RedisClient redisClient; private StatefulRedisConnection connection; private WarehouseWorkerPool workerPool; - private MetricsDataQueue dataQueue; + private MetricsDataExporter dataExporter; public RedisDataStorage (WarehouseProperties properties, WarehouseWorkerPool workerPool, - MetricsDataQueue dataQueue) { + MetricsDataExporter dataExporter) { this.workerPool = workerPool; - this.dataQueue = dataQueue; - + this.dataExporter = dataExporter; initRedisClient(properties); startStorageData(); } @@ -55,7 +54,7 @@ public class RedisDataStorage implements DisposableBean { Thread.currentThread().setName("warehouse-redis-data-storage"); while (!Thread.currentThread().isInterrupted()) { try { - CollectRep.MetricsData metricsData = dataQueue.pollRedisMetricsData(); + CollectRep.MetricsData metricsData = dataExporter.pollWarehouseRedisMetricsData(); if (metricsData != null) { saveData(metricsData); } diff --git a/warehouse/src/main/resources/META-INF/spring.factories b/warehouse/src/main/resources/META-INF/spring.factories index fc593ec..783832d 100644 --- a/warehouse/src/main/resources/META-INF/spring.factories +++ b/warehouse/src/main/resources/META-INF/spring.factories @@ -2,7 +2,5 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.usthe.warehouse.WarehouseProperties,\ com.usthe.warehouse.MetricsDataQueue,\ com.usthe.warehouse.WarehouseWorkerPool,\ -com.usthe.warehouse.entrance.KafkaDataConsume,\ -com.usthe.warehouse.store.InfluxdbDataStorage,\ com.usthe.warehouse.store.RedisDataStorage,\ com.usthe.warehouse.controller.MetricsDataController \ No newline at end of file