diff --git a/collector/server/src/main/java/com/usthe/collector/collect/http/HttpCollectImpl.java b/collector/server/src/main/java/com/usthe/collector/collect/http/HttpCollectImpl.java index df040e7..d246c08 100644 --- a/collector/server/src/main/java/com/usthe/collector/collect/http/HttpCollectImpl.java +++ b/collector/server/src/main/java/com/usthe/collector/collect/http/HttpCollectImpl.java @@ -17,6 +17,7 @@ import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.client.methods.RequestBuilder; import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.protocol.HttpContext; import org.apache.http.util.EntityUtils; @@ -185,15 +186,16 @@ public class HttpCollectImpl extends AbstractCollect { private HttpUriRequest createHttpRequest(HttpProtocol httpProtocol) { RequestBuilder requestBuilder; // method - if (HttpMethod.GET.matches(httpProtocol.getMethod())) { + String httpMethod = httpProtocol.getMethod().toUpperCase(); + if (HttpMethod.GET.matches(httpMethod)) { requestBuilder = RequestBuilder.get(); - } else if (HttpMethod.POST.matches(httpProtocol.getMethod())) { + } else if (HttpMethod.POST.matches(httpMethod)) { requestBuilder = RequestBuilder.post(); - } else if (HttpMethod.PUT.matches(httpProtocol.getMethod())) { + } else if (HttpMethod.PUT.matches(httpMethod)) { requestBuilder = RequestBuilder.put(); - } else if (HttpMethod.DELETE.matches(httpProtocol.getMethod())) { + } else if (HttpMethod.DELETE.matches(httpMethod)) { requestBuilder = RequestBuilder.delete(); - } else if (HttpMethod.PATCH.matches(httpProtocol.getMethod())) { + } else if (HttpMethod.PATCH.matches(httpMethod)) { requestBuilder = RequestBuilder.patch(); } else { // not support the method diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/CommonDispatcher.java b/collector/server/src/main/java/com/usthe/collector/dispatch/CommonDispatcher.java index a1f7285..4473047 100644 --- a/collector/server/src/main/java/com/usthe/collector/dispatch/CommonDispatcher.java +++ b/collector/server/src/main/java/com/usthe/collector/dispatch/CommonDispatcher.java @@ -68,7 +68,9 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc MetricsCollect metricsCollect = null; try { metricsCollect = jobRequestQueue.getJob(); - workerPool.executeJob(metricsCollect); + if (metricsCollect != null) { + workerPool.executeJob(metricsCollect); + } } catch (RejectedExecutionException rejected) { log.info("[Dispatcher]-the worker pool is full, reject this metrics task, " + "sleep and put in queue again."); @@ -119,7 +121,7 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc // 将单个应用的采集任务根据其下的指标组拆分为对应的指标组采集任务 AbstractCollect // 将每个指标组放入线程池进行调度 Job job = timerJob.getJob(); - job.constructMetrics(); + job.constructPriorMetrics(); Set metricsSet = job.getNextCollectMetrics(null, true); metricsSet.forEach(metrics -> { MetricsCollect metricsCollect = new MetricsCollect(metrics, timerJob, this); @@ -143,6 +145,8 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc long spendTime = System.currentTimeMillis() - job.getTimestamp(); long interval = job.getInterval() - spendTime / 1000; interval = interval <= 0 ? 0 : interval; + // 重置构造执行指标组视图 + job.constructPriorMetrics(); timerDispatch.cyclicJob(timerJob, interval, TimeUnit.SECONDS); } else if (!metricsSet.isEmpty()) { // 当前级别指标组执行完成,开始执行下一级别的指标组 diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java b/collector/server/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java index 9343c83..f395a3a 100644 --- a/collector/server/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java +++ b/collector/server/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java @@ -155,7 +155,8 @@ public class MetricsCollect implements Runnable, Comparable { } private void setNewThreadName(WheelTimerJob timerJob, Metrics metrics) { - String currentName = timerJob.getJob().getAppId() + timerJob.getJob().getApp() + metrics.getName() + timerJob.getJob().getId(); + String currentName = timerJob.getJob().getAppId() + "-" + timerJob.getJob().getApp() + + "-" + metrics.getName() + "-" + timerJob.getJob().getId(); Thread.currentThread().setName(currentName); } diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/export/KafkaDataExporter.java b/collector/server/src/main/java/com/usthe/collector/dispatch/export/KafkaDataExporter.java index 1bc06b5..e9f7da0 100644 --- a/collector/server/src/main/java/com/usthe/collector/dispatch/export/KafkaDataExporter.java +++ b/collector/server/src/main/java/com/usthe/collector/dispatch/export/KafkaDataExporter.java @@ -34,7 +34,7 @@ public class KafkaDataExporter { properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getServers()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaMetricsDataSerializer.class); - kafkaProducer = new KafkaProducer<>(properties); +// kafkaProducer = new KafkaProducer<>(properties); } catch (Exception e) { log.error(e.getMessage(), e); } diff --git a/collector/server/src/main/resources/application.yml b/collector/server/src/main/resources/application.yml index 047a69a..d70b3ba 100644 --- a/collector/server/src/main/resources/application.yml +++ b/collector/server/src/main/resources/application.yml @@ -1,5 +1,5 @@ server: - port: 8080 + port: 8081 spring: application: name: ${HOSTNAME:@collecor@}${PID} diff --git a/collector/server/src/main/resources/logback-spring.xml b/collector/server/src/main/resources/logback-spring.xml index b68b93d..90e6569 100644 --- a/collector/server/src/main/resources/logback-spring.xml +++ b/collector/server/src/main/resources/logback-spring.xml @@ -69,7 +69,7 @@ - + diff --git a/common/src/main/java/com/usthe/common/entity/job/Job.java b/common/src/main/java/com/usthe/common/entity/job/Job.java index f87413b..666e99b 100644 --- a/common/src/main/java/com/usthe/common/entity/job/Job.java +++ b/common/src/main/java/com/usthe/common/entity/job/Job.java @@ -80,18 +80,10 @@ public class Job { */ private transient List> priorMetrics; - /** - * collector使用 - 构造初始化标志 - */ - private transient boolean isConstruct = false; - /** * collector使用 - 构造初始化指标组 */ - public synchronized void constructMetrics() { - if (isConstruct) { - return; - } + public synchronized void constructPriorMetrics() { Map> map = metrics.stream() .peek(metric -> { // 判断是否配置aliasFields 没有则配置默认 @@ -134,26 +126,30 @@ public class Job { * 返回有数据集合表示:获取到下一组优先级的指标组任务 */ public synchronized Set getNextCollectMetrics(Metrics metrics, boolean first) { - if (!isConstruct || priorMetrics == null || priorMetrics.isEmpty()) { + if (priorMetrics == null || priorMetrics.isEmpty()) { return null; } Set metricsSet = priorMetrics.get(0); if (first) { - log.error("metrics must has one [availability] metrics at least."); + if (metricsSet.isEmpty()) { + log.error("metrics must has one [availability] metrics at least."); + + } return metricsSet; } if (metrics == null) { log.error("metrics can not null when not first get"); + return null; } - if (metrics != null && !metricsSet.remove(metrics)) { + if (!metricsSet.remove(metrics)) { log.error("Job {} appId {} app {} metrics {} remove empty error in priorMetrics.", id, appId, app, metrics.getName()); } if (metricsSet.isEmpty()) { - if (priorMetrics.size() == 1) { + priorMetrics.remove(0); + if (priorMetrics.size() == 0) { return null; } - priorMetrics.remove(0); return priorMetrics.get(0); } else { return Collections.emptySet(); diff --git a/manager/pom.xml b/manager/pom.xml index 0bf1bcb..75689fe 100644 --- a/manager/pom.xml +++ b/manager/pom.xml @@ -15,4 +15,34 @@ + + + + com.usthe.tancloud + common + 1.0-SNAPSHOT + + + + com.usthe.tancloud + scheduler + 1.0-SNAPSHOT + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-configuration-processor + true + + + + io.springfox + springfox-boot-starter + + + \ No newline at end of file diff --git a/manager/src/main/java/com/usthe/manager/Manager.java b/manager/src/main/java/com/usthe/manager/Manager.java new file mode 100644 index 0000000..2b95f0e --- /dev/null +++ b/manager/src/main/java/com/usthe/manager/Manager.java @@ -0,0 +1,17 @@ +package com.usthe.manager; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * @author tomsun28 + * @date 2021/11/11 16:45 + */ + +@SpringBootApplication +public class Manager { + + public static void main(String[] args) { + SpringApplication.run(Manager.class, args); + } +} diff --git a/manager/src/main/java/com/usthe/manager/config/SwaggerConfig.java b/manager/src/main/java/com/usthe/manager/config/SwaggerConfig.java new file mode 100644 index 0000000..4d2b35b --- /dev/null +++ b/manager/src/main/java/com/usthe/manager/config/SwaggerConfig.java @@ -0,0 +1,48 @@ +package com.usthe.manager.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import springfox.documentation.builders.PathSelectors; +import springfox.documentation.builders.RequestHandlerSelectors; +import springfox.documentation.oas.annotations.EnableOpenApi; +import springfox.documentation.service.ApiInfo; +import springfox.documentation.service.Contact; +import springfox.documentation.spi.DocumentationType; +import springfox.documentation.spring.web.plugins.Docket; + +import java.util.Collections; + +/** + * swagger config + * url: /swagger-ui/ + * @author tomsun28 + * @date 2021/11/11 17:01 + */ +@Configuration +@EnableOpenApi +public class SwaggerConfig { + + @Bean + public Docket docket(){ + return new Docket(DocumentationType.OAS_30) + .apiInfo(apiInfo()) + .enable(true) + .groupName("usthe.com") + .select() + .apis(RequestHandlerSelectors.any()) + .paths(PathSelectors.any()) + .build(); + } + + private ApiInfo apiInfo(){ + return new ApiInfo( + "usthe api", + "monitor project", + "v1.0", + "usthe.com", + new Contact("tom", "usthe.com", "tomsun28@outlook.com"), + "Apache 2.0", + "http://www.apache.org/licenses/LICENSE-2.0", + Collections.emptyList()); + } +} diff --git a/manager/src/main/resources/logback-spring.xml b/manager/src/main/resources/logback-spring.xml new file mode 100644 index 0000000..90e6569 --- /dev/null +++ b/manager/src/main/resources/logback-spring.xml @@ -0,0 +1,79 @@ + + + + + + + + + 1-%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger - %msg%n + UTF-8 + + + + + + + + logs/${application_name}-%d{yyyy-MM-dd}.%i.log + + + 200MB + + + + true + + + ===%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %logger Line:%-3L - %msg%n + utf-8 + + + + + + logs/${application_name}-%d{yyyy-MM-dd}-error.%i.log + + 200MB + + + + true + + + ===%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %logger Line:%-3L - %msg%n + utf-8 + + + + ERROR + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/pom.xml b/pom.xml index 99dc15d..41fec05 100644 --- a/pom.xml +++ b/pom.xml @@ -24,6 +24,7 @@ 1.7.21 2.3.0 1.18.20 + 3.0.0 5.7.0 4.0.2 @@ -44,6 +45,11 @@ jaxb-api ${xml.bind.version} + + io.springfox + springfox-boot-starter + ${swagger.version} +