diff --git a/collector/src/main/java/com/usthe/collector/collect/common/cache/CacheCloseable.java b/collector/src/main/java/com/usthe/collector/collect/common/cache/CacheCloseable.java new file mode 100644 index 0000000..575921a --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/common/cache/CacheCloseable.java @@ -0,0 +1,14 @@ +package com.usthe.collector.collect.common.cache; + +/** + * 连接资源关闭回调接口 + * @author tomsun28 + * @date 2022/1/1 21:03 + */ +public interface CacheCloseable { + + /** + * 在缓存remove掉此对象前,回调接口对连接对象进行相关资源的释放 + */ + void close(); +} diff --git a/collector/src/main/java/com/usthe/collector/collect/common/cache/CacheIdentifier.java b/collector/src/main/java/com/usthe/collector/collect/common/cache/CacheIdentifier.java new file mode 100644 index 0000000..75bc1e1 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/common/cache/CacheIdentifier.java @@ -0,0 +1,23 @@ +package com.usthe.collector.collect.common.cache; + +import lombok.Builder; +import lombok.Data; + +/** + * 缓存key唯一标识符 + * @author tomsun28 + * @date 2021/12/1 21:30 + */ +@Data +@Builder +public class CacheIdentifier { + + private String ip; + + private String port; + + private String username; + + private String password; + +} diff --git a/collector/src/main/java/com/usthe/collector/collect/common/cache/CommonCache.java b/collector/src/main/java/com/usthe/collector/collect/common/cache/CommonCache.java new file mode 100644 index 0000000..078ec3d --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/common/cache/CommonCache.java @@ -0,0 +1,211 @@ +package com.usthe.collector.collect.common.cache; + + +import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; +import lombok.extern.slf4j.Slf4j; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * lru cache 对连接对象进行缓存 + * @author tomsun28 + * @date 2021-12-10 23:17 + */ +@Slf4j +public class CommonCache { + + /** + * 默认缓存时间 30minute + */ + private static final long DEFAULT_CACHE_TIMEOUT = 30 * 60 * 1000L; + + /** + * 默认最大缓存数量 + */ + private static final int DEFAULT_MAX_CAPACITY = 10000; + + /** + * cacheTime数组大小 + */ + private static final int CACHE_TIME_LENGTH = 2; + + /** + * 存储对象的数据过期时间点 + */ + private Map timeoutMap; + + /** + * 存储缓存对象 + */ + private ConcurrentLinkedHashMap cacheMap; + + /** + * 过期数据清理线程池 + */ + private ThreadPoolExecutor cleanTimeoutExecutor; + + private CommonCache() { init();} + + /** + * 初始化 cache + */ + private void init() { + // 初始化lru hashmap + cacheMap = new ConcurrentLinkedHashMap + .Builder<>() + .maximumWeightedCapacity(DEFAULT_MAX_CAPACITY) + .listener((key, value) -> { + timeoutMap.remove(key); + if (value instanceof CacheCloseable) { + ((CacheCloseable)value).close(); + } + log.info("lru cache discard key: {}, value: {}.", key, value); + }).build(); + + // 初始化时间纪录map + timeoutMap = new ConcurrentHashMap<>(DEFAULT_MAX_CAPACITY >> 6); + + // 初始化过期数据清理线程池 + cleanTimeoutExecutor = new ThreadPoolExecutor(1, 1, + 1, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(1), r -> new Thread("lru-cache-timeout-cleaner"), + new ThreadPoolExecutor.DiscardOldestPolicy()); + + // 初始化可用性探测定位任务,每次探测间隔时间为20分钟 + ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor(1, + r -> new Thread(r, "lru-cache-available-detector")); + scheduledExecutor.scheduleWithFixedDelay(this::detectCacheAvailable, + 2,20, TimeUnit.MINUTES); + } + + /** + * 探测所有可探测的缓存对象的可用性,清除不可用和过期对象 + */ + private void detectCacheAvailable() { + try { + cacheMap.forEach((key, value) -> { + // 先判断是否过期 + Long[] cacheTime = timeoutMap.get(key); + long currentTime = System.currentTimeMillis(); + if (cacheTime == null || cacheTime.length != CACHE_TIME_LENGTH + || cacheTime[0] + cacheTime[1] < currentTime) { + cacheMap.remove(key); + timeoutMap.remove(key); + if (value instanceof CacheCloseable) { + ((CacheCloseable)value).close(); + } + + } + }); + } catch (Exception e) { + log.error("detect cache available error: {}.", e.getMessage(), e); + } + } + + /** + * 清理过期线程 + */ + private void cleanTimeoutCache() { + try { + cacheMap.forEach((key, value) -> { + // index 0 is startTime, 1 is timeDiff + Long[] cacheTime = timeoutMap.get(key); + long currentTime = System.currentTimeMillis(); + if (cacheTime == null || cacheTime.length != CACHE_TIME_LENGTH) { + timeoutMap.put(key, new Long[]{currentTime, DEFAULT_CACHE_TIMEOUT}); + } else if (cacheTime[0] + cacheTime[1] < currentTime) { + // 过期了 discard 关闭这个cache的资源 + timeoutMap.remove(key); + cacheMap.remove(key); + if (value instanceof CacheCloseable) { + ((CacheCloseable)value).close(); + } + } + }); + } catch (Exception e) { + log.error("clean timeout cache error: {}.", e.getMessage(), e); + } + } + + /** + * 新增或更新cache + * @param key 存储对象key + * @param value 存储对象 + * @param timeDiff 缓存对象保存时间 millis + */ + public void addCache(Object key, Object value, Long timeDiff) { + if (timeDiff == null) { + timeDiff = DEFAULT_CACHE_TIMEOUT; + } + cacheMap.put(key, value); + timeoutMap.put(key, new Long[]{System.currentTimeMillis(), timeDiff}); + cleanTimeoutExecutor.execute(() -> { + try { + cleanTimeoutCache(); + Thread.sleep(10 * 1000); + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + } + }); + } + + /** + * 根据缓存key获取缓存对象 + * @param key key + * @param refreshCache 是否刷新命中的缓存的存活时间 true是,false否 + * @return 缓存对象 + */ + public Optional getCache(Object key, boolean refreshCache) { + Long[] cacheTime = timeoutMap.get(key); + if (cacheTime == null || cacheTime.length != CACHE_TIME_LENGTH) { + return Optional.empty(); + } + if (cacheTime[0] + cacheTime[1] < System.currentTimeMillis()) { + timeoutMap.remove(key); + cacheMap.remove(key); + return Optional.empty(); + } + Object value = cacheMap.get(key); + if (value == null) { + cacheMap.remove(key); + timeoutMap.remove(key); + } else if (refreshCache) { + cacheTime[0] = System.currentTimeMillis(); + timeoutMap.put(key, cacheTime); + } + return Optional.ofNullable(value); + } + + /** + * 根据缓存key删除缓存对象 + * @param key key + */ + public void removeCache(Object key) { + timeoutMap.remove(key); + cacheMap.remove(key); + } + + /** + * 获取缓存实例 + * @return cache + */ + public static CommonCache getInstance() { + return SingleInstance.INSTANCE; + } + + /** + * 静态内部类 + */ + private static class SingleInstance { + /** + * 单例 + */ + private static final CommonCache INSTANCE= new CommonCache(); + } +} diff --git a/collector/src/main/java/com/usthe/collector/collect/common/cache/JdbcConnect.java b/collector/src/main/java/com/usthe/collector/collect/common/cache/JdbcConnect.java new file mode 100644 index 0000000..fc5b303 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/common/cache/JdbcConnect.java @@ -0,0 +1,40 @@ +package com.usthe.collector.collect.common.cache; + +import lombok.extern.slf4j.Slf4j; + +import java.sql.Connection; + +/** + * @author tomsun28 + * @date 2022/1/1 21:24 + */ +@Slf4j +public class JdbcConnect implements CacheCloseable { + + private Connection connection; + + public JdbcConnect(Connection connection) { + this.connection = connection; + } + + @Override + public void close() { + try { + if (connection != null) { + connection.close(); + } + } catch (Exception e) { + log.error("close jdbc connect error: {}", e.getMessage()); + } + } + + @Override + protected void finalize() throws Throwable { + close(); + super.finalize(); + } + + public Connection getConnection() { + return connection; + } +} diff --git a/collector/src/main/java/com/usthe/collector/collect/common/http/CommonHttpClient.java b/collector/src/main/java/com/usthe/collector/collect/common/http/CommonHttpClient.java new file mode 100644 index 0000000..1b792a1 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/common/http/CommonHttpClient.java @@ -0,0 +1,140 @@ +package com.usthe.collector.collect.common.http; + +import lombok.extern.slf4j.Slf4j; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.config.Registry; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.conn.socket.ConnectionSocketFactory; +import org.apache.http.conn.socket.PlainConnectionSocketFactory; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.ssl.SSLContexts; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.concurrent.TimeUnit; + +/** + * 统一的http客户端连接池 + * @author tomsun28 + * @date 2021/12/30 21:23 + */ +@Slf4j +public class CommonHttpClient { + + private static CloseableHttpClient httpClient; + + private static PoolingHttpClientConnectionManager connectionManager; + + /** + * 此连接池所能提供的最大连接数 + */ + private final static int MAX_TOTAL_CONNECTIONS = 50000; + + /** + * 每个路由所能分配的最大连接数 + */ + private final static int MAX_PER_ROUTE_CONNECTIONS = 80; + + /** + * 从连接池中获取连接的默认超时时间 4秒 + */ + private final static int REQUIRE_CONNECT_TIMEOUT = 4000; + + /** + * 双端建立连接超时时间 4秒 + */ + private final static int CONNECT_TIMEOUT = 4000; + + /** + * socketReadTimeout 响应tcp报文的最大间隔超时时间 + */ + private final static int SOCKET_TIMEOUT = 60000; + + /** + * 空闲连接免检的有效时间,被重用的空闲连接若超过此时间,需检查此连接的可用性 + */ + private final static int INACTIVITY_VALIDATED_TIME = 10000; + + /** + * ssl版本 + */ + private final static String[] SUPPORTED_SSL = {"TLSv1","TLSv1.1","TLSv1.2","SSLv3"}; + + static { + try { + // 初始化ssl上下文 + SSLContext sslContext = SSLContexts.createDefault(); + X509TrustManager x509TrustManager = new X509TrustManager() { + @Override + public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { } + @Override + public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { } + @Override + public X509Certificate[] getAcceptedIssuers() { return null; } + }; + sslContext.init(null, new TrustManager[]{x509TrustManager}, null); + // 设置支持的ssl版本 + SSLConnectionSocketFactory sslFactory = new SSLConnectionSocketFactory(sslContext, SUPPORTED_SSL, null, new NoopHostnameVerifier()); + // 注册 http https + Registry registry = RegistryBuilder.create() + .register("http", PlainConnectionSocketFactory.INSTANCE) + .register("https", sslFactory) + .build(); + // 网络请求默认配置 + RequestConfig requestConfig = RequestConfig.custom() + // 从连接池获取连接超时时间 + .setConnectionRequestTimeout(REQUIRE_CONNECT_TIMEOUT) + // 和对端新连接建立时间,三次握手时间 + .setConnectTimeout(CONNECT_TIMEOUT) + // 数据传输最大响应间隔时间 + .setSocketTimeout(SOCKET_TIMEOUT) + // 遇到301 302不自动重定向跳转 + .setRedirectsEnabled(false) + .build(); + // 连接池 + connectionManager = new PoolingHttpClientConnectionManager(registry); + connectionManager.setMaxTotal(MAX_TOTAL_CONNECTIONS); + connectionManager.setDefaultMaxPerRoute(MAX_PER_ROUTE_CONNECTIONS); + connectionManager.setValidateAfterInactivity(INACTIVITY_VALIDATED_TIME); + // 构造单例 httpClient + httpClient = HttpClients.custom() + .setConnectionManager(connectionManager) + .setDefaultRequestConfig(requestConfig) + // 定期清理不可用过期连接 + .evictExpiredConnections() + // 定期清理可用但空闲的连接 + .evictIdleConnections(100, TimeUnit.SECONDS) + .build(); + // 构造连接清理器 + Thread connectCleaner = new Thread(() -> { + while (Thread.currentThread().isInterrupted()) { + try { + Thread.sleep(30000); + connectionManager.closeExpiredConnections(); + connectionManager.closeIdleConnections(100, TimeUnit.SECONDS); + } catch (InterruptedException e) { + } + } + }); + connectCleaner.setName("HttpConnectCleaner"); + connectCleaner.setDaemon(true); + connectCleaner.start(); + } catch (Exception e) { + } + } + + public static CloseableHttpClient getHttpClient() { + return httpClient; + } + + public static PoolingHttpClientConnectionManager getConnectionManager() { + return connectionManager; + } +}