迁移collector,合并到整个工程

This commit is contained in:
tomsun28
2021-11-10 20:07:27 +08:00
commit 7286263836
63 changed files with 7835 additions and 0 deletions

118
collector/server/pom.xml Normal file
View File

@@ -0,0 +1,118 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>collector</artifactId>
<groupId>com.usthe.tancloud</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>server</artifactId>
<dependencies>
<!-- spring -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!-- isolation -->
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>sofa-ark-springboot-starter</artifactId>
<version>1.1.6</version>
</dependency>
<!-- common -->
<dependency>
<groupId>com.usthe.tancloud</groupId>
<artifactId>common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- etcd -->
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
<version>0.5.10</version>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
<!-- http -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<!-- lru hashmap -->
<dependency>
<groupId>com.googlecode.concurrentlinkedhashmap</groupId>
<artifactId>concurrentlinkedhashmap-lru</artifactId>
<version>1.4.2</version>
</dependency>
<!-- 工具依赖 -->
<dependency>
<groupId>com.usthe.tancloud</groupId>
<artifactId>common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.0.1-jre</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.8</version>
</dependency>
<dependency>
<groupId>com.googlecode.aviator</groupId>
<artifactId>aviator</artifactId>
<version>5.2.7</version>
</dependency>
<!-- plugins -->
<dependency>
<groupId>com.usthe.tancloud</groupId>
<artifactId>sample-plugin</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>com.alipay.sofa</groupId>
<artifactId>sofa-ark-maven-plugin</artifactId>
<version>1.1.6</version>
<executions>
<execution>
<id>default-cli</id>
<!--goal executed to generate executable-ark-jar -->
<goals>
<goal>repackage</goal>
</goals>
<configuration>
<!--specify destination where executable-ark-jar will be saved, default saved to ${project.build.directory}-->
<outputDirectory>./target</outputDirectory>
<!--default none-->
<arkClassifier>executable</arkClassifier>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,38 @@
package com.usthe.collector;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
/**
* collector start
* @author tomsun28
* @date 2021/10/7 18:02
*/
@SpringBootApplication(exclude = {KafkaAutoConfiguration.class})
@Slf4j
public class Collector {
public static void main(String[] args) {
SpringApplication.run(Collector.class, args);
// DatagramSocket s = new DatagramSocket();
//
// /** 2、提供数据封装打包 ---DatagramPacket(byte[] buf, int length, InetAddress address, int port) */
//
// byte[] bs = "正在使用UDP发送--我是数据! ".getBytes();
// DatagramPacket dp = new DatagramPacket(bs, bs.length, InetAddress.getByName("192.168.1.189"), 8070);
//
// /** 3、使用send发送 */
// try {
// s.send(dp);
// s.receive(dp)
// } catch (IOException e) {
// System.out.println("发送失败: ");
// e.printStackTrace();
// }
}
}

View File

@@ -0,0 +1,23 @@
package com.usthe.collector.collect;
import com.usthe.common.entity.job.Metrics;
import com.usthe.common.entity.message.CollectRep;
/**
* 具体的指标组采集实现抽象类
* @author tomsun28
* @date 2021/11/4 9:35
*/
public abstract class AbstractCollect {
/**
* 真正的采集实现接口
* @param builder response builder
* @param appId 应用监控ID
* @param app 应用类型
* @param metrics 指标组配置
* return response builder
*/
public abstract void collect(CollectRep.MetricsData.Builder builder, long appId, String app, Metrics metrics);
}

View File

@@ -0,0 +1,188 @@
package com.usthe.collector.collect.database;
import com.usthe.collector.common.cache.CacheIdentifier;
import com.usthe.collector.common.cache.CommonCache;
import com.usthe.collector.common.cache.support.CommonJdbcConnect;
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
/**
* 数据库JDBC通用查询
* @author tomsun28
* @date 2021/9/1 21:37
*/
@Slf4j
public class JdbcCommonCollect {
private Statement getConnection(String username, String password, String url) {
CacheIdentifier identifier = CacheIdentifier.builder()
.ip(url)
.username(username).password(password).build();
Optional<Object> cacheOption = CommonCache.getInstance().getCache(identifier, true);
Statement statement = null;
if (cacheOption.isPresent()) {
CommonJdbcConnect jdbcConnect = (CommonJdbcConnect) cacheOption.get();
try {
statement = jdbcConnect.getConnection().createStatement();
// 设置查询超时时间10秒
statement.setQueryTimeout(10);
// 设置查询最大行数1000行
statement.setMaxRows(1000);
} catch (Exception e) {
log.info("The jdbc connect form cache, create statement error: {}", e.getMessage());
try {
if (statement != null) {
statement.close();
}
jdbcConnect.close();
statement = null;
} catch (Exception e2) {
log.error(e2.getMessage());
}
CommonCache.getInstance().removeCache(identifier);
}
}
if (statement != null) {
return statement;
}
// 复用失败则新建连接
try {
Connection connection = DriverManager.getConnection(url, username, password);
statement = connection.createStatement();
// 设置查询超时时间10秒
statement.setQueryTimeout(10);
// 设置查询最大行数1000行
statement.setMaxRows(1000);
CommonJdbcConnect jdbcConnect = new CommonJdbcConnect(connection);
CommonCache.getInstance().addCache(identifier, jdbcConnect, 10000L);
} catch (SQLException sqlException) {
log.error("Jdbc sql error: {}, code: {}.", sqlException.getMessage(), sqlException.getErrorCode(), sqlException);
} catch (Exception e) {
log.error("Jdbc error: {}.", e.getMessage(), e);
}
return statement;
}
/**
* 查询一行数据, 通过查询返回结果集的列名称,和查询的字段映射
* eg:
* 查询字段one tow three four
* 查询SQLselect one, tow, three, four from book limit 1;
* @param statement 执行器
* @param sql sql
* @param columns 查询的列头(一般是数据库表字段,也可能包含特殊字段,eg: responseTime)
* @throws Exception when error happen
*/
private List<List<String>> queryOneRow(Statement statement, String sql, List<String> columns) throws Exception {
long startTime = System.currentTimeMillis();
statement.setMaxRows(1);
ResultSet resultSet = statement.executeQuery(sql);
List<List<String>> rowList = new LinkedList<>();
try {
if (resultSet.next()) {
LinkedList<String> rows = new LinkedList<>();
for (String column : columns) {
if ("responseTime".equals(column)) {
long time = System.currentTimeMillis() - startTime;
rows.add(String.valueOf(time));
} else {
String value = resultSet.getString(column);
value = value == null ? "" : value;
rows.add(value);
}
}
rowList.add(rows);
}
} finally {
resultSet.close();
}
return rowList;
}
/**
* 查询一行数据, 通过查询的两列数据(key-value)key和查询的字段匹配value为查询字段的值
* eg:
* 查询字段one tow three four
* 查询SQLselect key, value from book;
* 返回的key映射查询字段
* @param statement 执行器
* @param sql sql
* @param columns 查询的列头(一般是数据库表字段,也可能包含特殊字段,eg: responseTime)
* @throws Exception when error happen
*/
private List<List<String>> queryOneRowByMatchTwoColumns(Statement statement, String sql, List<String> columns) throws Exception {
long startTime = System.currentTimeMillis();
ResultSet resultSet = statement.executeQuery(sql);
List<List<String>> rowList = new LinkedList<>();
try {
HashMap<String, String> values = new HashMap<>(columns.size());
while (resultSet.next()) {
if (resultSet.getString(1) != null) {
values.put(resultSet.getString(1).toLowerCase(), resultSet.getString(2));
}
}
LinkedList<String> rows = new LinkedList<>();
for (String column : columns) {
if ("responseTime".equals(column)) {
long time = System.currentTimeMillis() - startTime;
rows.add(String.valueOf(time));
} else {
String value = values.get(column.toLowerCase());
value = value == null ? "" : value;
rows.add(value);
}
}
rowList.add(rows);
} finally {
resultSet.close();
}
return rowList;
}
/**
* 查询多行数据, 通过查询返回结果集的列名称,和查询的字段映射
* eg:
* 查询字段one tow three four
* 查询SQLselect one, tow, three, four from book limit 1;
* @param statement 执行器
* @param sql sql
* @param columns 查询的列头(一般是数据库表字段,也可能包含特殊字段,eg: responseTime)
* @throws Exception when error happen
*/
private List<List<String>> queryMultiRow(Statement statement, String sql, List<String> columns) throws Exception {
long startTime = System.currentTimeMillis();
ResultSet resultSet = statement.executeQuery(sql);
List<List<String>> rowList = new LinkedList<>();
try {
while (resultSet.next()) {
LinkedList<String> rows = new LinkedList<>();
for (String column : columns) {
if ("responseTime".equals(column)) {
long time = System.currentTimeMillis() - startTime;
rows.add(String.valueOf(time));
} else {
String value = resultSet.getString(column);
value = value == null ? "" : value;
rows.add(value);
}
}
rowList.add(rows);
}
} finally {
resultSet.close();
}
return rowList;
}
}

View File

@@ -0,0 +1,247 @@
package com.usthe.collector.collect.http;
import com.usthe.collector.collect.AbstractCollect;
import com.usthe.collector.common.http.HttpClientPool;
import com.usthe.collector.dispatch.DispatchConstants;
import com.usthe.common.entity.job.Metrics;
import com.usthe.common.entity.job.protocol.HttpProtocol;
import com.usthe.common.entity.message.CollectRep;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.methods.RequestBuilder;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.springframework.http.HttpMethod;
import javax.net.ssl.SSLException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* http https 采集实现类
* @author tomsun28
* @date 2021/11/4 15:37
*/
@Slf4j
public class HttpCollectImpl extends AbstractCollect {
private HttpCollectImpl() {}
public static HttpCollectImpl getInstance() {
return Singleton.INSTANCE;
}
@Override
public void collect(CollectRep.MetricsData.Builder builder,
long appId, String app, Metrics metrics) {
// 简单校验必有参数
if (metrics == null || metrics.getHttp() == null) {
builder.setCode(CollectRep.Code.FAIL);
builder.setMsg("Http/Https collect must has http params");
return;
}
HttpContext httpContext = createHttpContext(metrics.getHttp());
HttpUriRequest request = createHttpRequest(metrics.getHttp());
try {
CloseableHttpResponse response = HttpClientPool.getHttpClient()
.execute(request, httpContext);
int statusCode = response.getStatusLine().getStatusCode();
log.debug("http response status: {}", statusCode);
if (statusCode < HttpStatus.SC_OK || statusCode >= HttpStatus.SC_MULTIPLE_CHOICES) {
// 1XX 3XX 4XX 5XX 状态码 失败
builder.setCode(CollectRep.Code.FAIL);
builder.setMsg("statusCode: " + statusCode);
return;
} else {
// 2xx 状态码 成功
String resp = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
// 根据不同的解析方式解析
if (resp == null || "".equals(resp)) {
log.info("http response entity is empty, status: {}.", statusCode);
builder.setCode(CollectRep.Code.SUCCESS);
builder.setMsg("statusCode: " + statusCode + ",entity empty.");
return;
}
String parseType = metrics.getHttp().getParseType();
try {
if (DispatchConstants.PARSE_DEFAULT.equals(parseType)) {
parseResponseByDefault(resp, metrics.getAliasFields(), metrics.getHttp(), builder);
} else if (DispatchConstants.PARSE_PROMETHEUS.equals(parseType)) {
parseResponseByPrometheus(resp, metrics.getAliasFields(), metrics.getHttp(), builder);
} else if (DispatchConstants.PARSE_JSON_PATH.equals(parseType)) {
parseResponseByJsonPath(resp, metrics.getAliasFields(), metrics.getHttp(), builder);
} else if (DispatchConstants.PARSE_XML_PATH.equals(parseType)) {
parseResponseByXmlPath(resp, metrics.getAliasFields(), metrics.getHttp(), builder);
} else {
parseResponseByDefault(resp, metrics.getAliasFields(), metrics.getHttp(), builder);
}
} catch (Exception e) {
log.info("parse error: {}.", e.getMessage(), e);
builder.setCode(CollectRep.Code.FAIL);
builder.setMsg("parse response data error.");
return;
}
}
} catch (ClientProtocolException e1) {
log.error(e1.getMessage(), e1);
} catch (UnknownHostException e2) {
// 对端不可达
log.info(e2.getMessage());
builder.setCode(CollectRep.Code.UN_REACHABLE);
builder.setMsg("unknown host");
return;
} catch (InterruptedIOException | ConnectException | SSLException e3) {
// 对端连接失败
log.info(e3.getMessage());
builder.setCode(CollectRep.Code.UN_CONNECTABLE);
builder.setMsg(e3.getMessage());
return;
} catch (IOException e4) {
// 其它IO异常
log.info(e4.getMessage());
builder.setCode(CollectRep.Code.FAIL);
builder.setMsg(e4.getMessage());
return;
} catch (Exception e) {
// 其它异常
log.error(e.getMessage(), e);
builder.setCode(CollectRep.Code.FAIL);
builder.setMsg(e.getMessage());
return;
} finally {
if (request != null) {
request.abort();
}
}
}
private void parseResponseByXmlPath(String resp, List<String> preFields, HttpProtocol http, CollectRep.MetricsData.Builder builder) {
}
private void parseResponseByJsonPath(String resp, List<String> preFields, HttpProtocol http, CollectRep.MetricsData.Builder builder) {
}
private void parseResponseByPrometheus(String resp, List<String> preFields, HttpProtocol http, CollectRep.MetricsData.Builder builder) {
}
private void parseResponseByDefault(String resp, List<String> preFields, HttpProtocol http,
CollectRep.MetricsData.Builder builder) {
}
/**
* 创建httpContext
* @param httpProtocol http protocol
* @return context
*/
private HttpContext createHttpContext(HttpProtocol httpProtocol) {
HttpProtocol.Authorization auth = httpProtocol.getAuthorization();
if (auth != null && !DispatchConstants.BEARER_TOKEN.equals(auth.getType())) {
HttpClientContext clientContext = new HttpClientContext();
if (DispatchConstants.BASIC_AUTH.equals(auth.getType())) {
CredentialsProvider provider = new BasicCredentialsProvider();
UsernamePasswordCredentials credentials
= new UsernamePasswordCredentials(auth.getBasicAuthUsername(), auth.getBasicAuthPassword());
provider.setCredentials(AuthScope.ANY, credentials);
clientContext.setCredentialsProvider(provider);
} else if (DispatchConstants.DIGEST_AUTH.equals(auth.getType())) {
CredentialsProvider provider = new BasicCredentialsProvider();
UsernamePasswordCredentials credentials
= new UsernamePasswordCredentials(auth.getBasicAuthUsername(), auth.getBasicAuthPassword());
provider.setCredentials(AuthScope.ANY, credentials);
clientContext.setCredentialsProvider(provider);
} else {
clientContext = null;
}
return clientContext;
}
return null;
}
/**
* 根据http配置参数构造请求头
* @param httpProtocol http参数配置
* @return 请求体
*/
private HttpUriRequest createHttpRequest(HttpProtocol httpProtocol) {
RequestBuilder requestBuilder;
// method
if (HttpMethod.GET.matches(httpProtocol.getMethod())) {
requestBuilder = RequestBuilder.get();
} else if (HttpMethod.POST.matches(httpProtocol.getMethod())) {
requestBuilder = RequestBuilder.post();
} else if (HttpMethod.PUT.matches(httpProtocol.getMethod())) {
requestBuilder = RequestBuilder.put();
} else if (HttpMethod.DELETE.matches(httpProtocol.getMethod())) {
requestBuilder = RequestBuilder.delete();
} else if (HttpMethod.PATCH.matches(httpProtocol.getMethod())) {
requestBuilder = RequestBuilder.patch();
} else {
// not support the method
log.error("not support the http method: {}.", httpProtocol.getMethod());
return null;
}
// params
Map<String, String> params = httpProtocol.getParams();
if (params != null && !params.isEmpty()) {
for (Map.Entry<String, String> param : params.entrySet()) {
requestBuilder.addParameter(param.getKey(), param.getValue());
}
}
// headers
Map<String, String> headers = httpProtocol.getHeaders();
if (headers != null && !headers.isEmpty()) {
for (Map.Entry<String, String> header : headers.entrySet()) {
requestBuilder.addHeader(header.getKey(), header.getValue());
}
}
// keep-alive
requestBuilder.addHeader(HttpHeaders.CONNECTION, "keep-alive");
// add accept
if (DispatchConstants.PARSE_DEFAULT.equals(httpProtocol.getParseType())
|| DispatchConstants.PARSE_JSON_PATH.equals(httpProtocol.getParseType())) {
requestBuilder.addHeader(HttpHeaders.ACCEPT, "application/json");
} else if (DispatchConstants.PARSE_XML_PATH.equals(httpProtocol.getParseType())) {
requestBuilder.addHeader(HttpHeaders.ACCEPT, "text/xml,application/xml");
} else if (DispatchConstants.PARSE_PROMETHEUS.equals(httpProtocol.getParseType())) {
requestBuilder.addHeader(HttpHeaders.ACCEPT, DispatchConstants.PARSE_PROMETHEUS_ACCEPT);
requestBuilder.addHeader(HttpHeaders.ACCEPT_ENCODING, "gzip");
}
// 判断是否使用Bearer Token认证
if (httpProtocol.getAuthorization() != null
&& DispatchConstants.BEARER_TOKEN.equals(httpProtocol.getAuthorization().getType())) {
// 若使用 将token放入到header里面
String value = DispatchConstants.BEARER + " " + httpProtocol.getAuthorization().getBearerTokenToken();
requestBuilder.addHeader(HttpHeaders.AUTHORIZATION, value);
}
// todo 处理请求内容 body 暂不支持body
// uri
requestBuilder.setUri(httpProtocol.getUrl());
return requestBuilder.build();
}
private static class Singleton {
private static final HttpCollectImpl INSTANCE = new HttpCollectImpl();
}
}

View File

@@ -0,0 +1,15 @@
package com.usthe.collector.common;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* java common 的配置属性
* @author tomsun28
* @date 2021/10/16 14:23
*/
@Component
@ConfigurationProperties(prefix = "collector.common")
public class CollectorProperties {
}

View File

@@ -0,0 +1,14 @@
package com.usthe.collector.common.cache;
/**
* 连接资源关闭回调接口
* @author tomsun28
* @date 2021/9/1 21:03
*/
public interface CacheCloseable {
/**
* 在缓存remove掉此对象前回调接口对连接对象进行相关资源的释放
*/
void close();
}

View File

@@ -0,0 +1,15 @@
package com.usthe.collector.common.cache;
/**
* 缓存可用性检测接口
* @author tomsun28
* @date 2020-08-10 23:21
*/
public interface CacheDetectable {
/**
* 可用性探测,探测缓存对象资源的可用性,若不可用,上层可删除此缓存对象
* @return true 可用, false 不可用
*/
boolean available();
}

View File

@@ -0,0 +1,23 @@
package com.usthe.collector.common.cache;
import lombok.Builder;
import lombok.Data;
/**
* 缓存key唯一标识符
* @author tomsun28
* @date 2021/9/1 21:30
*/
@Data
@Builder
public class CacheIdentifier {
private String ip;
private String port;
private String username;
private String password;
}

View File

@@ -0,0 +1,218 @@
package com.usthe.collector.common.cache;
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* lru cache 对连接对象进行缓存
* @author tomsun28
* @date 2020-08-10 23:17
*/
@Slf4j
public class CommonCache {
/**
* 默认缓存时间 30minute
*/
private static final long DEFAULT_CACHE_TIMEOUT = 30 * 60 * 1000L;
/**
* 默认最大缓存数量
*/
private static final int DEFAULT_MAX_CAPACITY = 10000;
/**
* cacheTime数组大小
*/
private static final int CACHE_TIME_LENGTH = 2;
/**
* 存储对象的数据过期时间点
*/
private Map<Object, Long[]> timeoutMap;
/**
* 存储缓存对象
*/
private ConcurrentLinkedHashMap<Object, Object> cacheMap;
/**
* 过期数据清理线程池
*/
private ThreadPoolExecutor cleanTimeoutExecutor;
private CommonCache() { init();}
/**
* 初始化 cache
*/
private void init() {
// 初始化lru hashmap
cacheMap = new ConcurrentLinkedHashMap
.Builder<>()
.maximumWeightedCapacity(DEFAULT_MAX_CAPACITY)
.listener((key, value) -> {
timeoutMap.remove(key);
if (value instanceof CacheCloseable) {
((CacheCloseable)value).close();
}
log.info("lru cache discard key: {}, value: {}.", key, value);
}).build();
// 初始化时间纪录map
timeoutMap = new ConcurrentHashMap<>(DEFAULT_MAX_CAPACITY >> 6);
// 初始化过期数据清理线程池
cleanTimeoutExecutor = new ThreadPoolExecutor(1, 1,
1, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), r -> new Thread("lru-cache-timeout-cleaner"),
new ThreadPoolExecutor.DiscardOldestPolicy());
// 初始化可用性探测定位任务,每次探测间隔时间为20分钟
ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor(1,
r -> new Thread(r, "lru-cache-available-detector"));
scheduledExecutor.scheduleWithFixedDelay(this::detectCacheAvailable,
2,20, TimeUnit.MINUTES);
}
/**
* 探测所有可探测的缓存对象的可用性,清除不可用和过期对象
*/
private void detectCacheAvailable() {
try {
cacheMap.forEach((key, value) -> {
// 先判断是否过期
Long[] cacheTime = timeoutMap.get(key);
long currentTime = System.currentTimeMillis();
if (cacheTime == null || cacheTime.length != CACHE_TIME_LENGTH
|| cacheTime[0] + cacheTime[1] < currentTime) {
cacheMap.remove(key);
timeoutMap.remove(key);
if (value instanceof CacheCloseable) {
((CacheCloseable)value).close();
}
} // 对实现了CacheDetectable 的对象探测检查
else if (value instanceof CacheDetectable && !((CacheDetectable)value).available()) {
cacheMap.remove(key);
timeoutMap.remove(key);
if (value instanceof CacheCloseable) {
((CacheCloseable)value).close();
}
}
});
} catch (Exception e) {
log.error("detect cache available error: {}.", e.getMessage(), e);
}
}
/**
* 清理过期线程
*/
private void cleanTimeoutCache() {
try {
cacheMap.forEach((key, value) -> {
// index 0 is startTime, 1 is timeDiff
Long[] cacheTime = timeoutMap.get(key);
long currentTime = System.currentTimeMillis();
if (cacheTime == null || cacheTime.length != CACHE_TIME_LENGTH) {
timeoutMap.put(key, new Long[]{currentTime, DEFAULT_CACHE_TIMEOUT});
} else if (cacheTime[0] + cacheTime[1] < currentTime) {
// 过期了 discard 关闭这个cache的资源
timeoutMap.remove(key);
cacheMap.remove(key);
if (value instanceof CacheCloseable) {
((CacheCloseable)value).close();
}
}
});
} catch (Exception e) {
log.error("clean timeout cache error: {}.", e.getMessage(), e);
}
}
/**
* 新增或更新cache
* @param key 存储对象key
* @param value 存储对象
* @param timeDiff 缓存对象保存时间 millis
*/
public void addCache(Object key, Object value, Long timeDiff) {
if (timeDiff == null) {
timeDiff = DEFAULT_CACHE_TIMEOUT;
}
cacheMap.put(key, value);
timeoutMap.put(key, new Long[]{System.currentTimeMillis(), timeDiff});
cleanTimeoutExecutor.execute(() -> {
try {
cleanTimeoutCache();
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
});
}
/**
* 根据缓存key获取缓存对象
* @param key key
* @param refreshCache 是否刷新命中的缓存的存活时间 true是,false否
* @return 缓存对象
*/
public Optional<Object> getCache(Object key, boolean refreshCache) {
Long[] cacheTime = timeoutMap.get(key);
if (cacheTime == null || cacheTime.length != CACHE_TIME_LENGTH) {
return Optional.empty();
}
if (cacheTime[0] + cacheTime[1] < System.currentTimeMillis()) {
timeoutMap.remove(key);
cacheMap.remove(key);
return Optional.empty();
}
Object value = cacheMap.get(key);
if (value == null) {
cacheMap.remove(key);
timeoutMap.remove(key);
} else if (refreshCache) {
cacheTime[0] = System.currentTimeMillis();
timeoutMap.put(key, cacheTime);
}
return Optional.ofNullable(value);
}
/**
* 根据缓存key删除缓存对象
* @param key key
*/
public void removeCache(Object key) {
timeoutMap.remove(key);
cacheMap.remove(key);
}
/**
* 获取缓存实例
* @return cache
*/
public static CommonCache getInstance() {
return SingleInstance.INSTANCE;
}
/**
* 静态内部类
*/
private static class SingleInstance {
/**
* 单例
*/
private static final CommonCache INSTANCE= new CommonCache();
}
}

View File

@@ -0,0 +1,55 @@
package com.usthe.collector.common.cache.support;
import com.usthe.collector.common.cache.CacheCloseable;
import com.usthe.collector.common.cache.CacheDetectable;
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
/**
* @author tomsun28
* @date 2021/9/1 21:24
*/
@Slf4j
public class CommonJdbcConnect implements CacheCloseable, CacheDetectable {
private Connection connection;
public CommonJdbcConnect(Connection connection) {
this.connection = connection;
}
@Override
public void close() {
try {
if (connection != null) {
connection.close();
}
} catch (Exception e) {
log.error("close jdbc connect error: {}", e.getMessage());
}
}
@Override
public boolean available() {
if (connection == null) {
return false;
}
try {
return !connection.isClosed();
} catch (Exception e) {
log.error("detect jdbc connect error: {}", e.getMessage());
return false;
}
}
@Override
protected void finalize() throws Throwable {
close();
super.finalize();
}
public Connection getConnection() {
return connection;
}
}

View File

@@ -0,0 +1,116 @@
package com.usthe.collector.common.http;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpRequest;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.impl.client.RequestWrapper;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.Args;
import javax.net.ssl.SSLException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
/**
* http request网络请求重试处理器,替换默认的 DefaultHttpRequestRetryHandler
* 判断是否需要重试请求
* @author tomsun28
* @date 2021/8/30 22:00
*/
@Slf4j
public class CustomHttpRequestRetryHandler implements HttpRequestRetryHandler {
/**
* 不需要重试的异常情况
*/
private final static Set<Class<? extends IOException>> NON_RETRY_CLASSES = new HashSet<>(Arrays.asList(
// io 中断
InterruptedIOException.class,
// 对端不可达
UnknownHostException.class,
// 连接异常
ConnectException.class,
// ssl异常
SSLException.class));
/**
* 最大重试次数
*/
private static final int MAX_RETRY_TIMES = 3;
@Override
public boolean retryRequest(IOException exception, int executionCount, HttpContext context) {
Args.notNull(exception, "Exception parameter");
Args.notNull(context, "HTTP context");
if (executionCount > MAX_RETRY_TIMES) {
return false;
}
if (NON_RETRY_CLASSES.contains(exception.getClass())) {
return false;
}
for (final Class<? extends IOException> rejectException : NON_RETRY_CLASSES) {
if (rejectException.isInstance(exception)) {
return false;
}
}
// SocketTimeoutException: Read timed 只做一次重试
// read time out 可能是对端不支持长连接 重试前清理边空闲无效连接
if (executionCount == 1 && exception instanceof SocketTimeoutException && exception.getMessage() != null
&& (exception.getMessage().contains("Read timed out")
|| exception.getMessage().contains("Request already aborted"))) {
// 清理连接池无效空闲连接
HttpClientPool.getConnectionManager().closeExpiredConnections();
return true;
}
final HttpClientContext clientContext = HttpClientContext.adapt(context);
final HttpRequest request = clientContext.getRequest();
if(requestIsAborted(request)){
return false;
}
if (handleAsIdempotent(request)) {
// Retry if the request is considered idempotent
return true;
}
if (!clientContext.isRequestSent()) {
// Retry if the request has not been sent fully or
// if it's OK to retry methods that have been sent
return true;
}
// otherwise do not retry
return false;
}
/**
* @since 4.2
*/
protected boolean handleAsIdempotent(final HttpRequest request) {
return !(request instanceof HttpEntityEnclosingRequest);
}
/**
* @since 4.2
*
* @deprecated (4.3)
*/
@Deprecated
protected boolean requestIsAborted(final HttpRequest request) {
HttpRequest req = request;
if (request instanceof RequestWrapper) {
// does not forward request to original
req = ((RequestWrapper) request).getOriginal();
}
return (req instanceof HttpUriRequest && ((HttpUriRequest)req).isAborted());
}
}

View File

@@ -0,0 +1,170 @@
package com.usthe.collector.common.http;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.config.CookieSpecs;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.config.Lookup;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.util.PublicSuffixMatcher;
import org.apache.http.conn.util.PublicSuffixMatcherLoader;
import org.apache.http.cookie.CookieSpecProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.impl.cookie.DefaultCookieSpecProvider;
import org.apache.http.impl.cookie.IgnoreSpecProvider;
import org.apache.http.impl.cookie.NetscapeDraftSpecProvider;
import org.apache.http.impl.cookie.RFC6265CookieSpecProvider;
import org.apache.http.impl.cookie.RFC6265CookieSpecProvider.CompatibilityLevel;
import org.apache.http.ssl.SSLContexts;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.concurrent.TimeUnit;
/**
* 统一的http客户端连接池
* @author tomsun28
* @date 2021/8/30 21:23
*/
@Slf4j
public class HttpClientPool {
private static CloseableHttpClient httpClient;
private static PoolingHttpClientConnectionManager connectionManager;
/**
* 此连接池所能提供的最大连接数
*/
private final static int MAX_TOTAL_CONNECTIONS = 50000;
/**
* 每个路由所能分配的最大连接数
*/
private final static int MAX_PER_ROUTE_CONNECTIONS = 80;
/**
* 从连接池中获取连接的默认超时时间 4秒
*/
private final static int REQUIRE_CONNECT_TIMEOUT = 4000;
/**
* 双端建立连接超时时间 4秒
*/
private final static int CONNECT_TIMEOUT = 4000;
/**
* socketReadTimeout 响应tcp报文的最大间隔超时时间
*/
private final static int SOCKET_TIMEOUT = 60000;
/**
* 空闲连接免检的有效时间,被重用的空闲连接若超过此时间,需检查此连接的可用性
*/
private final static int INACTIVITY_VALIDATED_TIME = 10000;
/**
* ssl版本
*/
private final static String[] SUPPORTED_SSL = {"TLSv1","TLSv1.1","TLSv1.2","SSLv3"};
static {
try {
// 初始化ssl上下文
SSLContext sslContext = SSLContexts.createDefault();
X509TrustManager x509TrustManager = new X509TrustManager() {
@Override
public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { }
@Override
public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { }
@Override
public X509Certificate[] getAcceptedIssuers() { return null; }
};
sslContext.init(null, new TrustManager[]{x509TrustManager}, null);
// 设置支持的ssl版本
SSLConnectionSocketFactory sslFactory = new SSLConnectionSocketFactory(sslContext, SUPPORTED_SSL, null, new NoopHostnameVerifier());
// 注册 http https
Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.INSTANCE)
.register("https", sslFactory)
.build();
// 网络请求默认配置
RequestConfig requestConfig = RequestConfig.custom()
// 从连接池获取连接超时时间
.setConnectionRequestTimeout(REQUIRE_CONNECT_TIMEOUT)
// 和对端新连接建立时间,三次握手时间
.setConnectTimeout(CONNECT_TIMEOUT)
// 数据传输最大响应间隔时间
.setSocketTimeout(SOCKET_TIMEOUT)
// 遇到301 302不自动重定向跳转
.setRedirectsEnabled(false)
.build();
// 连接池
connectionManager = new PoolingHttpClientConnectionManager(registry);
connectionManager.setMaxTotal(MAX_TOTAL_CONNECTIONS);
connectionManager.setDefaultMaxPerRoute(MAX_PER_ROUTE_CONNECTIONS);
connectionManager.setValidateAfterInactivity(INACTIVITY_VALIDATED_TIME);
// 请求网络异常重试处理器
HttpRequestRetryHandler requestRetryHandler = new CustomHttpRequestRetryHandler();
// cookie处理策略
PublicSuffixMatcher suffixMatcher = PublicSuffixMatcherLoader.getDefault();
CookieSpecProvider defaultCookieSpecProvider = new DefaultCookieSpecProvider(suffixMatcher);
CookieSpecProvider laxStandardProvider = new RFC6265CookieSpecProvider(CompatibilityLevel.RELAXED, suffixMatcher);
CookieSpecProvider strictStandardProvider = new RFC6265CookieSpecProvider(CompatibilityLevel.STRICT, suffixMatcher);
Lookup<CookieSpecProvider> cookieSpecProviderLookup = RegistryBuilder.<CookieSpecProvider>create()
.register(CookieSpecs.DEFAULT, defaultCookieSpecProvider)
.register("best-match", defaultCookieSpecProvider)
.register("compatibility", defaultCookieSpecProvider)
.register(CookieSpecs.STANDARD, laxStandardProvider)
.register(CookieSpecs.STANDARD_STRICT, strictStandardProvider)
.register(CookieSpecs.NETSCAPE, new NetscapeDraftSpecProvider())
.register(CookieSpecs.IGNORE_COOKIES, new IgnoreSpecProvider())
.register(IgnoreReqCookieSpec.COOKIE_SPEC_NAME, new IgnoreReqCookieSpecProvider(suffixMatcher))
.build();
// 构造单例 httpClient
httpClient = HttpClients.custom()
.setConnectionManager(connectionManager)
.setDefaultRequestConfig(requestConfig)
.setRetryHandler(requestRetryHandler)
.setDefaultCookieSpecRegistry(cookieSpecProviderLookup)
// 定期清理不可用过期连接
.evictExpiredConnections()
// 定期清理可用但空闲的连接
.evictIdleConnections(100, TimeUnit.SECONDS)
.build();
// 构造连接清理器
Thread connectCleaner = new Thread(() -> {
while (Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(30000);
connectionManager.closeExpiredConnections();
connectionManager.closeIdleConnections(100, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
}
});
connectCleaner.setName("HttpConnectCleaner");
connectCleaner.setDaemon(true);
connectCleaner.start();
} catch (Exception e) {
}
}
public static CloseableHttpClient getHttpClient() {
return httpClient;
}
public static PoolingHttpClientConnectionManager getConnectionManager() {
return connectionManager;
}
}

View File

@@ -0,0 +1,29 @@
package com.usthe.collector.common.http;
import org.apache.http.cookie.Cookie;
import org.apache.http.cookie.CookieOrigin;
import org.apache.http.impl.cookie.DefaultCookieSpec;
/**
* 在request请求时忽略携带cookie,返回response时更新cookie的策略
* @author tomsun28
* @date 2021/8/30 22:37
*/
public class IgnoreReqCookieSpec extends DefaultCookieSpec {
public static final String COOKIE_SPEC_NAME = "IGNORE_REQUEST_COOKIE";
public IgnoreReqCookieSpec() {
super();
}
@Override
public boolean match(Cookie cookie, CookieOrigin origin) {
return false;
}
@Override
public String toString() {
return COOKIE_SPEC_NAME;
}
}

View File

@@ -0,0 +1,32 @@
package com.usthe.collector.common.http;
import org.apache.http.conn.util.PublicSuffixMatcher;
import org.apache.http.cookie.CookieSpec;
import org.apache.http.impl.cookie.DefaultCookieSpecProvider;
import org.apache.http.protocol.HttpContext;
/**
* IGNORE_REQUEST_COOKIE cookie策略提供者
* @author tomsun28
* @date 2021/8/30 22:40
*/
public class IgnoreReqCookieSpecProvider extends DefaultCookieSpecProvider {
private volatile CookieSpec cookieSpec;
public IgnoreReqCookieSpecProvider(PublicSuffixMatcher publicSuffixMatcher) {
super(publicSuffixMatcher);
}
@Override
public CookieSpec create(HttpContext context) {
if (cookieSpec == null) {
synchronized (this) {
if (cookieSpec == null) {
this.cookieSpec = new IgnoreReqCookieSpec();
}
}
}
return this.cookieSpec;
}
}

View File

@@ -0,0 +1,23 @@
package com.usthe.collector.dispatch;
import com.usthe.collector.dispatch.timer.WheelTimerJob;
import com.usthe.common.entity.job.Metrics;
import com.usthe.common.entity.message.CollectRep;
/**
* 采集数据调度器接口
* @author tomsun28
* @date 2021/11/2 11:20
*/
public interface CollectDataDispatch {
/**
* 处理分发采集结果数据
* @param timerJob 时间轮任务
* @param metrics 下面的指标组采集任务
* @param metricsData 采集结果数据
*/
void dispatchCollectData(WheelTimerJob timerJob, Metrics metrics, CollectRep.MetricsData metricsData);
}

View File

@@ -0,0 +1,187 @@
package com.usthe.collector.dispatch;
import com.usthe.collector.dispatch.export.KafkaDataExporter;
import com.usthe.collector.dispatch.timer.TimerDispatch;
import com.usthe.collector.dispatch.timer.WheelTimerJob;
import com.usthe.common.entity.job.Job;
import com.usthe.common.entity.job.Metrics;
import com.usthe.common.entity.message.CollectRep;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 指标组采集任务与响应数据调度器
* @author tomsun28
* @date 2021/11/2 11:24
*/
@Component
@Slf4j
public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatch {
/**
* 指标组采集任务超时时间值
*/
private static final long DURATION_TIME = 120_000L;
/**
* 指标组采集任务优先级队列
*/
private MetricsCollectorQueue jobRequestQueue;
/**
* 时间轮任务调度器
*/
private TimerDispatch timerDispatch;
/**
* kafka采集数据导出器
*/
private KafkaDataExporter kafkaDataExporter;
/**
* 指标组任务与开始时间映射map
*/
private Map<String, MetricsTime> metricsTimeoutMonitorMap;
public CommonDispatcher(MetricsCollectorQueue jobRequestQueue, TimerDispatch timerDispatch,
KafkaDataExporter kafkaDataExporter, WorkerPool workerPool) {
this.kafkaDataExporter = kafkaDataExporter;
this.jobRequestQueue = jobRequestQueue;
this.timerDispatch = timerDispatch;
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 2, 1,
TimeUnit.SECONDS,
new SynchronousQueue<>(), r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
});
// 从任务队列拉取指标组采集任务放入线程池执行
poolExecutor.execute(() -> {
Thread.currentThread().setName("metrics-task-dispatcher");
while (!Thread.currentThread().isInterrupted()) {
MetricsCollect metricsCollect = null;
try {
metricsCollect = jobRequestQueue.getJob();
workerPool.executeJob(metricsCollect);
} catch (RejectedExecutionException rejected) {
log.info("[Dispatcher]-the worker pool is full, reject this metrics task, " +
"sleep and put in queue again.");
try {
Thread.sleep(1000);
if (metricsCollect != null) {
// 在队列里的优先级增大
metricsCollect.setRunPriority((byte) (metricsCollect.getRunPriority() + 1));
jobRequestQueue.addJob(metricsCollect);
}
} catch (InterruptedException interruptedException){}
} catch (Exception e) {
log.error("[Dispatcher]-{}.", e.getMessage(), e);
}
}
});
// 监控指标组采集任务执行时间
metricsTimeoutMonitorMap = new ConcurrentHashMap<>(128);
poolExecutor.execute(() -> {
Thread.currentThread().setName("metrics-task-monitor");
while (!Thread.currentThread().isInterrupted()) {
try {
// 检测每个指标组采集单元是否超时2分钟,超时则丢弃并返回异常
long deadline = System.currentTimeMillis() - DURATION_TIME;
for (Map.Entry<String, MetricsTime> entry : metricsTimeoutMonitorMap.entrySet()) {
MetricsTime metricsTime = entry.getValue();
if (metricsTime.getStartTime() < deadline) {
// 指标组采集超时
CollectRep.MetricsData metricsData = CollectRep.MetricsData.newBuilder()
.setId(metricsTime.getTimerJob().getJob().getAppId())
.setApp(metricsTime.getTimerJob().getJob().getApp())
.setMetrics(metricsTime.getMetrics().getName())
.setTime(System.currentTimeMillis())
.setCode(CollectRep.Code.TIMEOUT).setMsg("collect timeout").build();
dispatchCollectData(metricsTime.getTimerJob(), metricsTime.getMetrics(), metricsData);
}
}
Thread.sleep(20000);
} catch (Exception e){
log.error("[Monitor]-{}.", e.getMessage(), e);
}
}
});
}
@Override
public void dispatchMetricsTask(WheelTimerJob timerJob) {
// 将单个应用的采集任务根据其下的指标组拆分为对应的指标组采集任务 AbstractCollect
// 将每个指标组放入线程池进行调度
Job job = timerJob.getJob();
job.constructMetrics();
Set<Metrics> metricsSet = job.getNextCollectMetrics(null, true);
metricsSet.forEach(metrics -> {
MetricsCollect metricsCollect = new MetricsCollect(metrics, timerJob, this);
jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + metrics.getName(),
new MetricsTime(System.currentTimeMillis(), metrics, timerJob));
});
}
@Override
public void dispatchCollectData(WheelTimerJob timerJob, Metrics metrics, CollectRep.MetricsData metricsData) {
Job job = timerJob.getJob();
Set<Metrics> metricsSet = job.getNextCollectMetrics(metrics, false);
if (job.isCyclic()) {
// 若是异步的周期性循环任务,直接发送指标组的采集数据到消息中间件
kafkaDataExporter.send(metricsData);
if (metricsSet == null) {
// 此Job所有指标组采集执行完成
// 周期性任务再次将任务push到时间轮
// 先判断此次任务执行时间与任务采集间隔时间
long spendTime = System.currentTimeMillis() - job.getTimestamp();
long interval = job.getInterval() - spendTime / 1000;
interval = interval <= 0 ? 0 : interval;
timerDispatch.cyclicJob(timerJob, interval, TimeUnit.SECONDS);
} else if (!metricsSet.isEmpty()) {
// 当前级别指标组执行完成,开始执行下一级别的指标组
metricsSet.forEach(metricItem -> {
MetricsCollect metricsCollect = new MetricsCollect(metricItem, timerJob, this);
jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + metrics.getName(),
new MetricsTime(System.currentTimeMillis(), metrics, timerJob));
});
} else {
// 当前执行级别的指标组列表未全执行完成,
// 需等待其它同级别指标组执行完成后进入下一级别执行
}
} else {
// 若是临时性一次任务,需等待所有指标组的采集数据统一包装返回
// todo 将当前指标组数据插入job里统一组装
if (metricsSet == null) {
// 此Job所有指标组采集执行完成
// todo 将所有指标组数据组合一起发送到回调函数
} else if (!metricsSet.isEmpty()) {
// 当前级别指标组执行完成,开始执行下一级别的指标组
metricsSet.forEach(metricItem -> {
MetricsCollect metricsCollect = new MetricsCollect(metricItem, timerJob, this);
jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + metrics.getName(),
new MetricsTime(System.currentTimeMillis(), metrics, timerJob));
});
} else {
// 当前执行级别的指标组列表未全执行完成,
// 需等待其它同级别指标组执行完成后进入下一级别执行
}
}
}
@Data
@AllArgsConstructor
private static class MetricsTime {
private long startTime;
private Metrics metrics;
private WheelTimerJob timerJob;
}
}

View File

@@ -0,0 +1,21 @@
package com.usthe.collector.dispatch;
import com.googlecode.aviator.AviatorEvaluator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author tomsun28
* @date 2021/11/3 12:55
*/
@Configuration
public class DispatchConfiguration {
private static final int AVIATOR_LRU_CACHE_SIZE = 1024;
@Bean
public void configAviatorEvaluator() {
// 配置AviatorEvaluator使用LRU缓存编译后的表达式
AviatorEvaluator.getInstance().useLRUExpressionCache(AVIATOR_LRU_CACHE_SIZE);
}
}

View File

@@ -0,0 +1,65 @@
package com.usthe.collector.dispatch;
/**
* dispatch 常量
* @author tomsun28
* @date 2021/11/3 16:50
*/
public interface DispatchConstants {
/**
* 可用性指标组
*/
String AVAILABILITY = "availability";
// 协议类型相关 - start //
/**
* 协议 http
*/
String PROTOCOL_HTTP = "http";
/**
* 协议 icmp
*/
String PROTOCOL_ICMP = "icmp";
/**
* 协议 jdbc
*/
String PROTOCOL_JDBC = "jdbc";
// 协议类型相关 - end //
// http协议相关 - start 需尽可能先复用 HttpHeaders //
/**
* 认证方式 Bearer Token
*/
String BEARER_TOKEN = "Bearer Token";
/**
* Bearer Token 的认证参数字符
*/
String BEARER = "Bearer";
/**
* 认证方式 Basic Auth
*/
String BASIC_AUTH = "Basic Auth";
/**
* 认证方式 Digest Auth
*/
String DIGEST_AUTH = "Digest Auth";
/**
* 解析方式 默认规则
*/
String PARSE_DEFAULT = "default";
/**
* 解析方式 自定义json path
*/
String PARSE_JSON_PATH = "json_path";
/**
* 解析方式 自定义xml path
*/
String PARSE_XML_PATH = "xml_path";
/**
* 解析方式 prometheus规则
*/
String PARSE_PROMETHEUS = "prometheus";
String PARSE_PROMETHEUS_ACCEPT = "application/openmetrics-text; version=0.0.1,text/plain;version=0.0.4;q=0.5,*/*;q=0.1";
// http协议相关 - end //
}

View File

@@ -0,0 +1,225 @@
package com.usthe.collector.dispatch;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* 调度分发任务配置属性
* @author tomsun28
* @date 2021/10/16 14:54
*/
@Component
@ConfigurationProperties(prefix = "collector.dispatch")
public class DispatchProperties {
/**
* 调度入口配置属性
*/
private EntranceProperties entrance;
/**
* 调度数据出口配置属性
*/
private ExportProperties export;
public EntranceProperties getEntrance() {
return entrance;
}
public void setEntrance(EntranceProperties entrance) {
this.entrance = entrance;
}
public ExportProperties getExport() {
return export;
}
public void setExport(ExportProperties export) {
this.export = export;
}
/**
* 调度入口配置属性
* 入口可以时etcd信息,http请求,消息中间件消息请求
*/
public static class EntranceProperties {
/**
* etcd配置信息
*/
private EtcdProperties etcd;
public EtcdProperties getEtcd() {
return etcd;
}
public void setEtcd(EtcdProperties etcd) {
this.etcd = etcd;
}
public static class EtcdProperties {
/**
* etcd调度是否启动
*/
private boolean enabled = true;
/**
* etcd的连接端点url
*/
private String[] endpoints = new String[] {"http://127.0.0.1:2379"};
/**
* etcd连接用户名
*/
private String username;
/**
* etcd连接密码
*/
private String password;
/**
* etcd租约的有效时间 单位秒
*/
private long ttl = 200;
/**
* 采集器注册目录
*/
private String collectorDir = "/usthe/dispatch/collector/";
/**
* 任务调度分发目录
*/
private String assignDir = "/usthe/dispatch/assign/";
/**
* 任务详细目录
*/
private String jobDir = "/usthe/dispatch/job/";
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public String[] getEndpoints() {
return endpoints;
}
public void setEndpoints(String[] endpoints) {
this.endpoints = endpoints;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public long getTtl() {
return ttl;
}
public void setTtl(long ttl) {
this.ttl = ttl;
}
public String getCollectorDir() {
return collectorDir;
}
public void setCollectorDir(String collectorDir) {
this.collectorDir = collectorDir;
}
public String getAssignDir() {
return assignDir;
}
public void setAssignDir(String assignDir) {
this.assignDir = assignDir;
}
public String getJobDir() {
return jobDir;
}
public void setJobDir(String jobDir) {
this.jobDir = jobDir;
}
}
}
/**
* 调度数据出口配置属性
*/
public static class ExportProperties {
/**
* kafka配置信息
*/
private KafkaProperties kafka;
public KafkaProperties getKafka() {
return kafka;
}
public void setKafka(KafkaProperties kafka) {
this.kafka = kafka;
}
public static class KafkaProperties {
/**
* kafka数据出口是否启动
*/
private boolean enabled = true;
/**
* kafka的连接服务器url
*/
private String servers = "http://127.0.0.1:2379";
/**
* 发送数据的topic名称
*/
private String topic;
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public String getServers() {
return servers;
}
public void setServers(String servers) {
this.servers = servers;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
}
}
}

View File

@@ -0,0 +1,166 @@
package com.usthe.collector.dispatch;
import com.google.protobuf.ProtocolStringList;
import com.googlecode.aviator.AviatorEvaluator;
import com.googlecode.aviator.Expression;
import com.usthe.collector.collect.AbstractCollect;
import com.usthe.collector.collect.http.HttpCollectImpl;
import com.usthe.collector.dispatch.timer.WheelTimerJob;
import com.usthe.common.entity.job.Metrics;
import com.usthe.common.entity.message.CollectRep;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* parent job
* @author tomsun28
* @date 2021/10/10 15:35
*/
@Slf4j
@Data
public class MetricsCollect implements Runnable, Comparable<MetricsCollect> {
protected byte runPriority;
protected long newTime;
protected long startTime;
protected Metrics metrics;
protected WheelTimerJob timerJob;
protected CollectDataDispatch collectDataDispatch;
public MetricsCollect(Metrics metrics, WheelTimerJob timerJob, CollectDataDispatch collectDataDispatch) {
this.newTime = System.currentTimeMillis();
this.timerJob = timerJob;
this.metrics = metrics;
this.collectDataDispatch = collectDataDispatch;
if (DispatchConstants.AVAILABILITY.equals(metrics.getName())) {
runPriority = (byte) 1;
} else {
runPriority = (byte) -1;
}
}
@Override
public void run() {
this.startTime = System.currentTimeMillis();
setNewThreadName(timerJob, metrics);
CollectRep.MetricsData.Builder response = CollectRep.MetricsData.newBuilder();
response.setApp(timerJob.getJob().getApp());
response.setId(timerJob.getJob().getId());
response.setMetrics(metrics.getName());
// 根据指标组采集协议,应用类型等来调度到真正的应用指标组采集实现类
AbstractCollect abstractCollect = null;
switch (metrics.getProtocol()) {
case DispatchConstants.PROTOCOL_HTTP:
abstractCollect = HttpCollectImpl.getInstance();
break;
// todo
default: break;
}
if (abstractCollect == null) {
log.error("[Dispatcher] - not support this: app: {}, metrics: {}, protocol: {}.",
timerJob.getJob().getApp(), metrics.getName(), metrics.getProtocol());
response.setCode(CollectRep.Code.FAIL);
response.setMsg("not support " + timerJob.getJob().getApp() + ", "
+ metrics.getName() + ", " + metrics.getProtocol());
return;
} else {
try {
abstractCollect.collect(response, timerJob.getJob().getAppId(),
timerJob.getJob().getApp(), metrics);
} catch (Exception e) {
log.error("[Metrics Collect]: {}.", e.getMessage(), e);
response.setCode(CollectRep.Code.FAIL);
response.setMsg(e.getMessage());
}
}
// 别名属性表达式替换计算
calculateFields(metrics, response);
CollectRep.MetricsData metricsData = validateResponse(response);
collectDataDispatch.dispatchCollectData(timerJob, metrics, metricsData);
}
/**
* 根据 calculates 和 aliasFields 配置计算出真正的指标(fields)值
* @param metrics 指标组配置
* @param collectData 采集数据
*/
private void calculateFields(Metrics metrics, CollectRep.MetricsData.Builder collectData) {
collectData.addAllFields(metrics.getFields());
// 若不存在需要计算的表达式,则 别名指标aliasFields 的数据就是真正指标 fields的数据
if (metrics.getCalculates() == null || metrics.getCalculates().isEmpty()) {
return;
}
List<CollectRep.ValueRow> aliasRowList = collectData.getValuesList();
if (aliasRowList == null || aliasRowList.isEmpty()) {
return;
}
// 先预处理 calculates
Map<String, Expression> fieldExpressionMap = metrics.getCalculates()
.stream()
.map(cal -> {
int splitIndex = cal.indexOf("=");
String field = cal.substring(0, splitIndex);
String expressionStr = cal.substring(splitIndex + 1);
Expression expression = AviatorEvaluator.compile(expressionStr, true);
return new Object[]{field, expression}; })
.collect(Collectors.toMap(arr -> (String)arr[0], arr -> (Expression) arr[1]));
List<String> fields = metrics.getFields();
List<String> aliasFields = metrics.getAliasFields();
Map<String, Object> aliasFieldValueMap = new HashMap<>(16);
for (int index = 0; index < aliasRowList.size(); index++) {
CollectRep.ValueRow aliasRow = aliasRowList.get(index);
for (int aliasIndex = 0; aliasIndex < aliasFields.size(); aliasIndex++) {
aliasFieldValueMap.put(aliasFields.get(aliasIndex), aliasRow.getColumns(aliasIndex));
}
ProtocolStringList columnList = aliasRow.getColumnsList();
columnList.clear();
for (int realIndex = 0; realIndex < fields.size(); realIndex++) {
String realField = fields.get(realIndex);
Expression expression = fieldExpressionMap.get(realField);
String value = "";
if (expression != null) {
// 存在计算表达式 则计算值
value = (String) expression.execute(aliasFieldValueMap);
} else {
// 不存在 则映射别名值
value = (String) aliasFieldValueMap.get(realField);
}
columnList.add(value);
}
}
}
private CollectRep.MetricsData validateResponse(CollectRep.MetricsData.Builder builder) {
long endTime = System.currentTimeMillis();
builder.setTime(endTime);
log.debug("[Collect]: newTime: {}, startTime: {}, spendTime: {}.", newTime, startTime, endTime - startTime);
if (builder.getCode() != CollectRep.Code.SUCCESS) {
log.info("[Collect Fail]-reason:{}", builder.getMsg());
} else {
log.info("[Collect Success]-{},{},{}.", builder.getId(), builder.getApp(), builder.getMetrics());
}
return builder.build();
}
private void setNewThreadName(WheelTimerJob timerJob, Metrics metrics) {
String currentName = timerJob.getJob().getAppId() + timerJob.getJob().getApp() + metrics.getName() + timerJob.getJob().getId();
Thread.currentThread().setName(currentName);
}
@Override
public int compareTo(MetricsCollect collect) {
return runPriority - collect.runPriority;
}
}

View File

@@ -0,0 +1,32 @@
package com.usthe.collector.dispatch;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 待运行的job队列
* @author tomsun28
* @date 2021/10/10 20:20
*/
@Component
@Slf4j
public class MetricsCollectorQueue {
private final PriorityBlockingQueue<MetricsCollect> jobQueue;
public MetricsCollectorQueue() {
jobQueue = new PriorityBlockingQueue<>(2000);
}
public void addJob(MetricsCollect job) {
jobQueue.offer(job);
}
public MetricsCollect getJob() throws InterruptedException {
return jobQueue.poll(2, TimeUnit.SECONDS);
}
}

View File

@@ -0,0 +1,17 @@
package com.usthe.collector.dispatch;
import com.usthe.collector.dispatch.timer.WheelTimerJob;
/**
* 指标组采集任务调度器接口
* @author tomsun28
* @date 2021/11/2 11:19
*/
public interface MetricsTaskDispatch {
/**
* 调度
* @param timerJob timerJob
*/
void dispatchMetricsTask(WheelTimerJob timerJob);
}

View File

@@ -0,0 +1,54 @@
package com.usthe.collector.dispatch;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 采集任务工作线程池
* @author tomsun28
* @date 2021/10/15 0:01
*/
@Component
@Slf4j
public class WorkerPool {
private ThreadPoolExecutor workerExecutor;
public WorkerPool() {
initWorkExecutor();
}
private void initWorkExecutor() {
// 线程工厂
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setUncaughtExceptionHandler((thread, throwable) -> {
log.error("workerExecutor has uncaughtException.");
log.error(throwable.getMessage(), throwable); })
.setDaemon(true)
.setNameFormat("collect-worker-%d")
.build();
workerExecutor = new ThreadPoolExecutor(100,
800,
10,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
threadFactory,
new ThreadPoolExecutor.AbortPolicy());
}
/**
* 运行采集任务线程
* @param runnable 任务
* @throws RejectedExecutionException when 线程池满
*/
public void executeJob(Runnable runnable) throws RejectedExecutionException {
workerExecutor.execute(runnable);
}
}

View File

@@ -0,0 +1,25 @@
package com.usthe.collector.dispatch.entrance.http;
import com.usthe.common.entity.job.Job;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
/**
* 采集job管理提供api接口
* @author tomsun28
* @date 2021/11/6 13:58
*/
@RestController
public class CollectJobController {
/**
* 执行一次性采集任务,获取采集数据响应
* @return 采集结果
*/
@PostMapping("/job")
public Mono<Object> collectJobData(Job job) {
return null;
}
}

View File

@@ -0,0 +1,54 @@
package com.usthe.collector.dispatch.export;
import com.usthe.collector.dispatch.DispatchProperties;
import com.usthe.common.entity.message.CollectRep;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;
import java.util.Properties;
/**
* kafka采集数据消息发送
* @author tomsun28
* @date 2021/11/3 15:22
*/
@Configuration
@ConditionalOnProperty(prefix = "collector.dispatch.export.kafka",
name = "enabled", havingValue = "true", matchIfMissing = true)
@AutoConfigureAfter(value = {DispatchProperties.class})
@Slf4j
public class KafkaDataExporter {
KafkaProducer<String, CollectRep.MetricsData> kafkaProducer;
DispatchProperties.ExportProperties.KafkaProperties kafkaProperties;
public KafkaDataExporter(DispatchProperties dispatchProperties) {
try {
kafkaProperties = dispatchProperties.getExport().getKafka();
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getServers());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaMetricsDataSerializer.class);
kafkaProducer = new KafkaProducer<>(properties);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
/**
* 发送消息
* @param metricsData 指标组采集数据
*/
public void send(CollectRep.MetricsData metricsData) {
if (kafkaProducer != null) {
kafkaProducer.send(new ProducerRecord<>(kafkaProperties.getTopic(), metricsData));
} else {
log.error("kafkaProducer is not enable");
}
}
}

View File

@@ -0,0 +1,18 @@
package com.usthe.collector.dispatch.export;
import com.usthe.common.entity.message.CollectRep;
import org.apache.kafka.common.serialization.Serializer;
/**
* MetricsData的序列化
* @author tomsun28
* @date 2021/11/3 16:14
*/
public class KafkaMetricsDataSerializer implements Serializer<CollectRep.MetricsData> {
@Override
public byte[] serialize(String topicName, CollectRep.MetricsData metricsData) {
return metricsData.toByteArray();
}
}

View File

@@ -0,0 +1,809 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.usthe.collector.dispatch.timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
/**
* A {@link Timer} optimized for approximated I/O timeout scheduling.
*
* <h3>Tick Duration</h3>
* <p>
* As described with 'approximated', this timer does not execute the scheduled
* {@link TimerTask} on time. {@link HashedWheelTimer}, on every tick, will
* check if there are any {@link TimerTask}s behind the schedule and execute
* them.
* <p>
* You can increase or decrease the accuracy of the execution timing by
* specifying smaller or larger tick duration in the constructor. In most
* network applications, I/O timeout does not need to be accurate. Therefore,
* the default tick duration is 100 milliseconds and you will not need to try
* different configurations in most cases.
*
* <h3>Ticks per Wheel (Wheel Size)</h3>
* <p>
* {@link HashedWheelTimer} maintains a data structure called 'wheel'.
* To put simply, a wheel is a hash table of {@link TimerTask}s whose hash
* function is 'dead line of the task'. The default number of ticks per wheel
* (i.e. the size of the wheel) is 512. You could specify a larger value
* if you are going to schedule a lot of timeouts.
*
* <h3>Do not create many instances.</h3>
* <p>
* {@link HashedWheelTimer} creates a new thread whenever it is instantiated and
* started. Therefore, you should make sure to create only one instance and
* share it across your application. One of the common mistakes, that makes
* your application unresponsive, is to create a new instance for every connection.
*
* <h3>Implementation Details</h3>
* <p>
* {@link HashedWheelTimer} is based on
* <a href="http://cseweb.ucsd.edu/users/varghese/">George Varghese</a> and
* Tony Lauck's paper,
* <a href="http://cseweb.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z">'Hashed
* and Hierarchical Timing Wheels: data structures to efficiently implement a
* timer facility'</a>. More comprehensive slides are located
* <a href="http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt">here</a>.
* @author from netty | dubbo
*/
@SuppressWarnings("PMD")
public class HashedWheelTimer implements Timer {
private static final Logger logger = LoggerFactory.getLogger(HashedWheelTimer.class);
private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
private static final int INSTANCE_COUNT_LIMIT = 64;
private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
private final Worker worker = new Worker();
private final Thread workerThread;
private static final int WORKER_STATE_INIT = 0;
private static final int WORKER_STATE_STARTED = 1;
private static final int WORKER_STATE_SHUTDOWN = 2;
/**
* 0 - init, 1 - started, 2 - shut down
*/
@SuppressWarnings({"unused", "FieldMayBeFinal"})
private volatile int workerState;
private final long tickDuration;
private final HashedWheelBucket[] wheel;
private final int mask;
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
private final Queue<HashedWheelTimeout> timeouts = new LinkedBlockingQueue<>();
private final Queue<HashedWheelTimeout> cancelledTimeouts = new LinkedBlockingQueue<>();
private final AtomicLong pendingTimeouts = new AtomicLong(0);
private final long maxPendingTimeouts;
private volatile long startTime;
/**
* Creates a new timer with the default thread factory
* ({@link Executors#defaultThreadFactory()}), default tick duration, and
* default number of ticks per wheel.
*/
public HashedWheelTimer() {
this(Executors.defaultThreadFactory());
}
/**
* Creates a new timer with the default thread factory
* ({@link Executors#defaultThreadFactory()}) and default number of ticks
* per wheel.
*
* @param tickDuration the duration between tick
* @param unit the time unit of the {@code tickDuration}
* @throws NullPointerException if {@code unit} is {@code null}
* @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
*/
public HashedWheelTimer(long tickDuration, TimeUnit unit) {
this(Executors.defaultThreadFactory(), tickDuration, unit);
}
/**
* Creates a new timer with the default thread factory
* ({@link Executors#defaultThreadFactory()}).
*
* @param tickDuration the duration between tick
* @param unit the time unit of the {@code tickDuration}
* @param ticksPerWheel the size of the wheel
* @throws NullPointerException if {@code unit} is {@code null}
* @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
*/
public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
}
/**
* Creates a new timer with the default tick duration and default number of
* ticks per wheel.
*
* @param threadFactory a {@link ThreadFactory} that creates a
* background {@link Thread} which is dedicated to
* {@link TimerTask} execution.
* @throws NullPointerException if {@code threadFactory} is {@code null}
*/
public HashedWheelTimer(ThreadFactory threadFactory) {
this(threadFactory, 100, TimeUnit.MILLISECONDS);
}
/**
* Creates a new timer with the default number of ticks per wheel.
*
* @param threadFactory a {@link ThreadFactory} that creates a
* background {@link Thread} which is dedicated to
* {@link TimerTask} execution.
* @param tickDuration the duration between tick
* @param unit the time unit of the {@code tickDuration}
* @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
* @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
*/
public HashedWheelTimer(
ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
this(threadFactory, tickDuration, unit, 512);
}
/**
* Creates a new timer.
*
* @param threadFactory a {@link ThreadFactory} that creates a
* background {@link Thread} which is dedicated to
* {@link TimerTask} execution.
* @param tickDuration the duration between tick
* @param unit the time unit of the {@code tickDuration}
* @param ticksPerWheel the size of the wheel
* @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
* @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
*/
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(threadFactory, tickDuration, unit, ticksPerWheel, -1);
}
/**
* Creates a new timer.
*
* @param threadFactory a {@link ThreadFactory} that creates a
* background {@link Thread} which is dedicated to
* {@link TimerTask} execution.
* @param tickDuration the duration between tick
* @param unit the time unit of the {@code tickDuration}
* @param ticksPerWheel the size of the wheel
* @param maxPendingTimeouts The maximum number of pending timeouts after which call to
* {@code newTimeout} will result in
* {@link RejectedExecutionException}
* being thrown. No maximum pending timeouts limit is assumed if
* this value is 0 or negative.
* @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
* @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
*/
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel,
long maxPendingTimeouts) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
// Normalize ticksPerWheel to power of two and initialize the wheel.
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
// Convert tickDuration to nanos.
this.tickDuration = unit.toNanos(tickDuration);
// Prevent overflow.
if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}
workerThread = threadFactory.newThread(worker);
this.maxPendingTimeouts = maxPendingTimeouts;
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
@Override
protected void finalize() throws Throwable {
try {
super.finalize();
} finally {
// This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. If
// we have not yet shutdown then we want to make sure we decrement the active instance count.
if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
INSTANCE_COUNTER.decrementAndGet();
}
}
}
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException(
"ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
if (ticksPerWheel > 1073741824) {
throw new IllegalArgumentException(
"ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
}
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i++) {
wheel[i] = new HashedWheelBucket();
}
return wheel;
}
private static int normalizeTicksPerWheel(int ticksPerWheel) {
int normalizedTicksPerWheel = ticksPerWheel - 1;
normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 1;
normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 2;
normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 4;
normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 8;
normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 16;
return normalizedTicksPerWheel + 1;
}
/**
* Starts the background thread explicitly. The background thread will
* start automatically on demand even if you did not call this method.
*
* @throws IllegalStateException if this timer has been
* {@linkplain #stop() stopped} already
*/
public void start() {
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// Wait until the startTime is initialized by the worker.
while (startTime == 0) {
try {
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
@Override
public Set<Timeout> stop() {
if (Thread.currentThread() == workerThread) {
throw new IllegalStateException(
HashedWheelTimer.class.getSimpleName() +
".stop() cannot be called from " +
TimerTask.class.getSimpleName());
}
if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
// workerState can be 0 or 2 at this moment - let it always be 2.
if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
INSTANCE_COUNTER.decrementAndGet();
}
return Collections.emptySet();
}
try {
boolean interrupted = false;
while (workerThread.isAlive()) {
workerThread.interrupt();
try {
workerThread.join(100);
} catch (InterruptedException ignored) {
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
} finally {
INSTANCE_COUNTER.decrementAndGet();
}
return worker.unprocessedTimeouts();
}
@Override
public boolean isStop() {
return WORKER_STATE_SHUTDOWN == WORKER_STATE_UPDATER.get(this);
}
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
start();
// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// Guard against overflow.
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
/**
* Returns the number of pending timeouts of this {@link Timer}.
*/
public long pendingTimeouts() {
return pendingTimeouts.get();
}
private static void reportTooManyInstances() {
logger.error("You are creating too many HashedWheelTimer instances. " +
"HashedWheelTimer is a shared resource that must be reused across the JVM," +
"so that only a few instances are created.");
}
private final class Worker implements Runnable {
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
private long tick;
@Override
public void run() {
// Initialize the startTime.
startTime = System.nanoTime();
if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}
// Notify the other threads waiting for the initialization at start().
startTimeInitialized.countDown();
do {
final long deadline = waitForNextTick();
if (deadline > 0) {
int idx = (int) (tick & mask);
processCancelledTasks();
HashedWheelBucket bucket =
wheel[idx];
transferTimeoutsToBuckets();
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// Fill the unprocessedTimeouts so we can return them from stop() method.
for (HashedWheelBucket bucket : wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
for (; ; ) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
processCancelledTasks();
}
private void transferTimeoutsToBuckets() {
// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
// adds new timeouts in a loop.
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// Was cancelled in the meantime.
continue;
}
long calculated = timeout.deadline / tickDuration;
timeout.remainingRounds = (calculated - tick) / wheel.length;
// Ensure we don't schedule for past.
final long ticks = Math.max(calculated, tick);
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}
private void processCancelledTasks() {
for (; ; ) {
HashedWheelTimeout timeout = cancelledTimeouts.poll();
if (timeout == null) {
// all processed
break;
}
try {
timeout.remove();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown while process a cancellation task", t);
}
}
}
}
/**
* calculate goal nanoTime from startTime and current tick number,
* then wait until that goal has been reached.
*
* @return Long.MIN_VALUE if received a shutdown request,
* current time otherwise (with Long.MIN_VALUE changed by +1)
*/
private long waitForNextTick() {
long deadline = tickDuration * (tick + 1);
for (; ; ) {
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
if (isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}
try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
Set<Timeout> unprocessedTimeouts() {
return Collections.unmodifiableSet(unprocessedTimeouts);
}
}
private static final class HashedWheelTimeout implements Timeout {
private static final int ST_INIT = 0;
private static final int ST_CANCELLED = 1;
private static final int ST_EXPIRED = 2;
private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
private final HashedWheelTimer timer;
private final TimerTask task;
private final long deadline;
@SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization"})
private volatile int state = ST_INIT;
/**
* RemainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
* HashedWheelTimeout will be added to the correct HashedWheelBucket.
*/
long remainingRounds;
/**
* This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
* As only the workerThread will act on it there is no need for synchronization / volatile.
*/
HashedWheelTimeout next;
HashedWheelTimeout prev;
/**
* The bucket to which the timeout was added
*/
HashedWheelBucket bucket;
HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
this.timer = timer;
this.task = task;
this.deadline = deadline;
}
@Override
public Timer timer() {
return timer;
}
@Override
public TimerTask task() {
return task;
}
@Override
public boolean cancel() {
// only update the state it will be removed from HashedWheelBucket on next tick.
if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
return false;
}
// If a task should be canceled we put this to another queue which will be processed on each tick.
// So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way
// we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.
timer.cancelledTimeouts.add(this);
return true;
}
void remove() {
HashedWheelBucket bucket = this.bucket;
if (bucket != null) {
bucket.remove(this);
} else {
timer.pendingTimeouts.decrementAndGet();
}
}
public boolean compareAndSetState(int expected, int state) {
return STATE_UPDATER.compareAndSet(this, expected, state);
}
public int state() {
return state;
}
@Override
public boolean isCancelled() {
return state() == ST_CANCELLED;
}
@Override
public boolean isExpired() {
return state() == ST_EXPIRED;
}
public void expire() {
if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
return;
}
try {
task.run(this);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
}
}
}
@Override
public String toString() {
final long currentTime = System.nanoTime();
long remaining = deadline - currentTime + timer.startTime;
StringBuilder buf = new StringBuilder(192)
.append("HashedWheelTimer")
.append('(')
.append("deadline: ");
if (remaining > 0) {
buf.append(remaining)
.append(" ns later");
} else if (remaining < 0) {
buf.append(-remaining)
.append(" ns ago");
} else {
buf.append("now");
}
if (isCancelled()) {
buf.append(", cancelled");
}
return buf.append(", task: ")
.append(task())
.append(')')
.toString();
}
}
/**
* Bucket that stores HashedWheelTimeouts. These are stored in a linked-list like datastructure to allow easy
* removal of HashedWheelTimeouts in the middle. Also the HashedWheelTimeout act as nodes themself and so no
* extra object creation is needed.
*/
private static final class HashedWheelBucket {
/**
* Used for the linked-list datastructure
*/
private HashedWheelTimeout head;
private HashedWheelTimeout tail;
/**
* Add {@link HashedWheelTimeout} to this bucket.
*/
void addTimeout(HashedWheelTimeout timeout) {
assert timeout.bucket == null;
timeout.bucket = this;
if (head == null) {
head = tail = timeout;
} else {
tail.next = timeout;
timeout.prev = tail;
tail = timeout;
}
}
/**
* Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
*/
void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// process all timeouts
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
if (timeout.remainingRounds <= 0) {
next = remove(timeout);
if (timeout.deadline <= deadline) {
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
next = remove(timeout);
} else {
timeout.remainingRounds--;
}
timeout = next;
}
}
public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
HashedWheelTimeout next = timeout.next;
// remove timeout that was either processed or cancelled by updating the linked-list
if (timeout.prev != null) {
timeout.prev.next = next;
}
if (timeout.next != null) {
timeout.next.prev = timeout.prev;
}
if (timeout == head) {
// if timeout is also the tail we need to adjust the entry too
if (timeout == tail) {
tail = null;
head = null;
} else {
head = next;
}
} else if (timeout == tail) {
// if the timeout is the tail modify the tail to be the prev node.
tail = timeout.prev;
}
// null out prev, next and bucket to allow for GC.
timeout.prev = null;
timeout.next = null;
timeout.bucket = null;
timeout.timer.pendingTimeouts.decrementAndGet();
return next;
}
/**
* Clear this bucket and return all not expired / cancelled {@link Timeout}s.
*/
void clearTimeouts(Set<Timeout> set) {
for (; ; ) {
HashedWheelTimeout timeout = pollTimeout();
if (timeout == null) {
return;
}
if (timeout.isExpired() || timeout.isCancelled()) {
continue;
}
set.add(timeout);
}
}
private HashedWheelTimeout pollTimeout() {
HashedWheelTimeout head = this.head;
if (head == null) {
return null;
}
HashedWheelTimeout next = head.next;
if (next == null) {
tail = this.head = null;
} else {
this.head = next;
next.prev = null;
}
// null out prev and next to allow for GC.
head.next = null;
head.prev = null;
head.bucket = null;
return head;
}
}
private static final boolean IS_OS_WINDOWS = System.getProperty("os.name", "").toLowerCase(Locale.US).contains("win");
private boolean isWindows() {
return IS_OS_WINDOWS;
}
}

View File

@@ -0,0 +1,57 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.usthe.collector.dispatch.timer;
/**
* A handle associated with a {@link TimerTask} that is returned by a
* {@link Timer}.
* @author from netty | dubbo
*/
@SuppressWarnings("PMD")
public interface Timeout {
/**
* Returns the {@link Timer} that created this handle.
*/
Timer timer();
/**
* Returns the {@link TimerTask} which is associated with this handle.
*/
TimerTask task();
/**
* Returns {@code true} if and only if the {@link TimerTask} associated
* with this handle has been expired.
*/
boolean isExpired();
/**
* Returns {@code true} if and only if the {@link TimerTask} associated
* with this handle has been cancelled.
*/
boolean isCancelled();
/**
* Attempts to cancel the {@link TimerTask} associated with this handle.
* If the task has been executed or cancelled already, it will return with
* no side effect.
*
* @return True if the cancellation completed successfully, otherwise false
*/
boolean cancel();
}

View File

@@ -0,0 +1,58 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.usthe.collector.dispatch.timer;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* Schedules {@link TimerTask}s for one-time future execution in a background
* thread.
* @author from netty | dubbo
*/
public interface Timer {
/**
* Schedules the specified {@link TimerTask} for one-time execution after
* the specified delay.
*
* @param task the {@link TimerTask
* @param delay the delay
* @param unit the unit of time
* @return a handle which is associated with the specified task
* @throws IllegalStateException if this timer has been {@linkplain #stop() stopped} already
* @throws RejectedExecutionException if the pending timeouts are too many and creating new timeout
* can cause instability in the system.
*/
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
/**
* Releases all resources acquired by this {@link Timer} and cancels all
* tasks which were scheduled but not executed yet.
*
* @return the handles associated with the tasks which were canceled by
* this method
*/
Set<Timeout> stop();
/**
* the timer is stop
*
* @return true for stop
*/
boolean isStop();
}

View File

@@ -0,0 +1,35 @@
package com.usthe.collector.dispatch.timer;
import com.usthe.common.entity.job.Job;
import java.util.concurrent.TimeUnit;
/**
* 时间轮调度接口
* @author tomsun28
* @date 2021/10/17 22:14
*/
public interface TimerDispatch {
/**
* 增加新的job
* @param addJob job
*/
void addJob(Job addJob);
/**
* 调度循环周期性job
* @param timerTask timerTask
* @param interval 开始调度的间隔时间
* @param timeUnit 时间单位
*/
void cyclicJob(WheelTimerJob timerTask, long interval, TimeUnit timeUnit);
/**
* 删除存在的job
* @param jobId jobId
* @param isCyclic 是否是周期性任务,true是, false为临时性任务
*/
void deleteJob(long jobId, boolean isCyclic);
}

View File

@@ -0,0 +1,75 @@
package com.usthe.collector.dispatch.timer;
import com.usthe.common.entity.job.Job;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* @author tomsun28
* @date 2021/10/17 23:06
*/
@Component
public class TimerDispatcher implements TimerDispatch {
/**
* 时间轮调度
*/
private Timer wheelTimer;
/**
* 已存在的周期性调度任务
*/
private Map<Long, Timeout> currentCyclicTaskMap;
/**
* 已存在的临时性调度任务
*/
private Map<Long, Timeout> currentTempTaskMap;
public TimerDispatcher() {
this.wheelTimer = new HashedWheelTimer(r -> {
Thread ret = new Thread(r, "wheelTimer");
ret.setDaemon(true);
return ret;
}, 10, TimeUnit.SECONDS, 512);
this.currentCyclicTaskMap = new ConcurrentHashMap<>(1024);
this.currentTempTaskMap = new ConcurrentHashMap<>(1024);
}
@Override
public void addJob(Job addJob) {
WheelTimerJob timerJob = new WheelTimerJob(addJob);
Timeout timeout = wheelTimer.newTimeout(timerJob, addJob.getInterval(), TimeUnit.SECONDS);
if (addJob.isCyclic()) {
currentCyclicTaskMap.put(addJob.getId(), timeout);
} else {
currentTempTaskMap.put(addJob.getId(), timeout);
}
}
@Override
public void cyclicJob(WheelTimerJob timerTask, long interval, TimeUnit timeUnit) {
Long jobId = timerTask.getJob().getId();
// 判断此周期性job是否已经被取消
if (currentCyclicTaskMap.containsKey(jobId)) {
Timeout timeout = wheelTimer.newTimeout(timerTask, interval, TimeUnit.SECONDS);
currentCyclicTaskMap.put(timerTask.getJob().getId(), timeout);
}
}
@Override
public void deleteJob(long jobId, boolean isCyclic) {
if (isCyclic) {
Timeout timeout = currentCyclicTaskMap.remove(jobId);
if (timeout != null) {
timeout.cancel();
}
} else {
Timeout timeout = currentTempTaskMap.remove(jobId);
if (timeout != null) {
timeout.cancel();
}
}
}
}

View File

@@ -0,0 +1,36 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.usthe.collector.dispatch.timer;
import java.util.concurrent.TimeUnit;
/**
* A task which is executed after the delay specified with
* {@link Timer#newTimeout(TimerTask, long, TimeUnit)} (TimerTask, long, TimeUnit)}.
* @author from netty | dubbo
*/
public interface TimerTask {
/**
* Executed after the delay specified with
* {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
*
* @param timeout a handle which is associated with this task
* @throws Exception when error happen
*/
void run(Timeout timeout) throws Exception;
}

View File

@@ -0,0 +1,30 @@
package com.usthe.collector.dispatch.timer;
import com.usthe.collector.dispatch.MetricsTaskDispatch;
import com.usthe.collector.util.SpringContextHolder;
import com.usthe.common.entity.job.Job;
/**
* TimerTask实现
* @author tomsun28
* @date 2021/11/1 17:18
*/
public class WheelTimerJob implements TimerTask {
private Job job;
private MetricsTaskDispatch metricsTaskDispatch;
public WheelTimerJob(Job job) {
this.job = job;
this.metricsTaskDispatch = SpringContextHolder.getBean(MetricsTaskDispatch.class);
}
@Override
public void run(Timeout timeout) throws Exception {
metricsTaskDispatch.dispatchMetricsTask(this);
}
public Job getJob() {
return job;
}
}

View File

@@ -0,0 +1,12 @@
package com.usthe.collector.plugin;
/**
* @author tomsun28
* @date 2021/10/8 15:12
*/
public class SameClass {
public static String hello() {
return "hello collector";
}
}

View File

@@ -0,0 +1,20 @@
package com.usthe.collector.plugin;
import com.usthe.plugin.sample.ExportDemo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
/**
* @author tomsun28
* @date 2021/10/8 15:31
*/
@Component
@Slf4j
public class TestPlugin implements CommandLineRunner {
@Override
public void run(String... args) throws Exception {
log.info(SameClass.hello());
log.info(new ExportDemo().hello());
}
}

View File

@@ -0,0 +1,48 @@
package com.usthe.collector.util;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
/**
* Spring的ApplicationContext的持有者,可以用静态方法的方式获取spring容器中的bean
* @author tomsun28
* @date 21:07 2018/4/18
*/
@Component
public class SpringContextHolder implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
set(applicationContext);
}
private static void set(ApplicationContext applicationContext) {
SpringContextHolder.applicationContext = applicationContext;
}
public static ApplicationContext getApplicationContext() {
assertApplicationContext();
return applicationContext;
}
@SuppressWarnings("unchecked")
public static <T> T getBean(String beanName) {
assertApplicationContext();
return (T) applicationContext.getBean(beanName);
}
public static <T> T getBean(Class<T> tClass) {
assertApplicationContext();
return (T) applicationContext.getBean(tClass);
}
private static void assertApplicationContext() {
if (null == SpringContextHolder.applicationContext) {
throw new RuntimeException("applicationContext为空,请检查是否注入springContextHolder");
}
}
}

View File

@@ -0,0 +1,16 @@
server:
port: 8080
spring:
application:
name: ${HOSTNAME:@collecor@}${PID}
profiles:
active: dev
collector:
dispatch:
entrance:
etcd:
endpoints: http://139.198.109.64:2379
export:
kafka:
servers: localhost:9092
enabled: true

View File

@@ -0,0 +1,6 @@
██████╗ ██████╗ ██╗ ██╗ ███████╗ ██████╗████████╗ ██████╗ ██████╗
██╔════╝██╔═══██╗██║ ██║ ██╔════╝██╔════╝╚══██╔══╝██╔═══██╗██╔══██╗
██║ ██║ ██║██║ ██║ █████╗ ██║ ██║ ██║ ██║██████╔╝
██║ ██║ ██║██║ ██║ ██╔══╝ ██║ ██║ ██║ ██║██╔══██╗ Profile: ${spring.profiles.active}
╚██████╗╚██████╔╝███████╗███████╗███████╗╚██████╗ ██║ ╚██████╔╝██║ ██║ Name: ${spring.application.name} Port: ${server.port} Pid: ${pid}
╚═════╝ ╚═════╝ ╚══════╝╚══════╝╚══════╝ ╚═════╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝

View File

@@ -0,0 +1,79 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true">
<springProperty scope="context" name="application_name" source="spring.application.name" defaultValue="collector"/>
<!-- 输出日志到控制台 ConsoleAppender -->
<appender name="ConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<!--<pattern>%d %p (%file:%line\)- %m%n</pattern>-->
<!--格式化输出:%d:表示日期 %thread:表示线程名 %-5level:级别从左显示5个字符宽度 %msg:日志消息 %n:是换行符-->
<pattern>1-%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger - %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<appender name="SystemOutFileAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 归档的日志文件的路径。%d{yyyy-MM-dd}指定日期格式,%i指定索引 -->
<fileNamePattern>logs/${application_name}-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<!-- 除按日志记录之外还配置了日志文件不能超过200M若超过200M日志文件会以索引0开始 -->
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>200MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
</rollingPolicy>
<!-- 追加方式记录日志 -->
<append>true</append>
<!-- 日志文件的格式 -->
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>===%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %logger Line:%-3L - %msg%n</pattern>
<charset>utf-8</charset>
</encoder>
</appender>
<appender name="ErrOutFileAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>logs/${application_name}-%d{yyyy-MM-dd}-error.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>200MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
</rollingPolicy>
<!-- 追加方式记录日志 -->
<append>true</append>
<!-- 日志文件的格式 -->
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>===%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %logger Line:%-3L - %msg%n</pattern>
<charset>utf-8</charset>
</encoder>
<!-- 此日志文件记录error及以上级别的 -->
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>ERROR</level>
</filter>
</appender>
<!--这个logger的设置是举例在org.springframework包下面的所有输出日志必须级别level在info及以上级别才会被输出-->
<!--这样可以避免输出一些spring框架的许多常见debug信息!-->
<logger name="org.springframework" level="info" />
<logger name="org.json" level="error"/>
<logger name="io.netty" level="info"/>
<logger name="org.slf4j" level="info"/>
<logger name="ch.qos.logback" level="info"/>
<!-- 生产环境配置 -->
<springProfile name="prod">
<root level="DEBUG">
<appender-ref ref="ConsoleAppender"/>
<appender-ref ref="SystemOutFileAppender"/>
<appender-ref ref="ErrOutFileAppender"/>
</root>
</springProfile>
<!-- 开发环境配置 -->
<springProfile name="dev">
<root level="DEBUG">
<appender-ref ref="ConsoleAppender"/>
<appender-ref ref="SystemOutFileAppender"/>
<appender-ref ref="ErrOutFileAppender"/>
</root>
</springProfile>
</configuration>