diff --git a/Architecture.png b/Architecture.png
new file mode 100644
index 0000000..acfa096
Binary files /dev/null and b/Architecture.png differ
diff --git a/manager/pom.xml b/manager/pom.xml
index 211832f..1c66c88 100644
--- a/manager/pom.xml
+++ b/manager/pom.xml
@@ -29,6 +29,12 @@
scheduler
1.0-SNAPSHOT
+
+
+ com.usthe.tancloud
+ warehouse
+ 1.0-SNAPSHOT
+
org.springframework.boot
diff --git a/pom.xml b/pom.xml
index 3480b4d..508035d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -14,6 +14,7 @@
alerter
common
collector
+ warehouse
diff --git a/warehouse/pom.xml b/warehouse/pom.xml
new file mode 100644
index 0000000..eb4f04d
--- /dev/null
+++ b/warehouse/pom.xml
@@ -0,0 +1,68 @@
+
+
+
+ monitor
+ com.usthe.tancloud
+ 1.0-SNAPSHOT
+
+ 4.0.0
+
+ warehouse
+
+
+
+
+ com.usthe.tancloud
+ common
+ 1.0-SNAPSHOT
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+ provided
+
+
+ org.springframework.boot
+ spring-boot-autoconfigure
+
+
+ org.springframework.boot
+ spring-boot-configuration-processor
+ true
+
+
+
+ com.influxdb
+ influxdb-client-java
+ 3.4.0
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 3.0.0
+
+
+
+ io.lettuce
+ lettuce-core
+ provided
+
+
+
+ io.springfox
+ springfox-boot-starter
+ provided
+
+
+
+ org.springframework.cloud
+ spring-cloud-starter-openfeign
+ 3.0.5
+ provided
+
+
+
\ No newline at end of file
diff --git a/warehouse/src/main/java/com/usthe/warehouse/MetricsDataQueue.java b/warehouse/src/main/java/com/usthe/warehouse/MetricsDataQueue.java
new file mode 100644
index 0000000..c7a82ee
--- /dev/null
+++ b/warehouse/src/main/java/com/usthe/warehouse/MetricsDataQueue.java
@@ -0,0 +1,42 @@
+package com.usthe.warehouse;
+
+import com.usthe.common.entity.message.CollectRep;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 采集数据队列
+ * @author tom
+ * @date 2021/11/24 17:58
+ */
+@Component
+@Slf4j
+public class MetricsDataQueue {
+
+ private final LinkedBlockingQueue dataInfluxQueue;
+ private final LinkedBlockingQueue dataRedisQueue;
+
+ public MetricsDataQueue() {
+ dataInfluxQueue = new LinkedBlockingQueue<>();
+ dataRedisQueue = new LinkedBlockingQueue<>();
+ }
+
+ public void addMetricsDataToInflux(CollectRep.MetricsData metricsData) {
+ dataInfluxQueue.offer(metricsData);
+ }
+
+ public CollectRep.MetricsData pollInfluxMetricsData() throws InterruptedException {
+ return dataInfluxQueue.poll(2, TimeUnit.SECONDS);
+ }
+
+ public void addMetricsDataToRedis(CollectRep.MetricsData metricsData) {
+ dataRedisQueue.offer(metricsData);
+ }
+
+ public CollectRep.MetricsData pollRedisMetricsData() throws InterruptedException {
+ return dataRedisQueue.poll(2, TimeUnit.SECONDS);
+ }
+}
diff --git a/warehouse/src/main/java/com/usthe/warehouse/WarehouseProperties.java b/warehouse/src/main/java/com/usthe/warehouse/WarehouseProperties.java
new file mode 100644
index 0000000..4e50197
--- /dev/null
+++ b/warehouse/src/main/java/com/usthe/warehouse/WarehouseProperties.java
@@ -0,0 +1,259 @@
+package com.usthe.warehouse;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+/**
+ * 数据仓储配置属性
+ * @author tom
+ * @date 2021/11/24 10:38
+ */
+@Component
+@ConfigurationProperties(prefix = "warehouse")
+public class WarehouseProperties {
+
+ /**
+ * 数据入口配置属性
+ */
+ private EntranceProperties entrance;
+
+ /**
+ * 数据存储配置属性
+ */
+ private StoreProperties store;
+
+ public EntranceProperties getEntrance() {
+ return entrance;
+ }
+
+ public void setEntrance(EntranceProperties entrance) {
+ this.entrance = entrance;
+ }
+
+ public StoreProperties getStore() {
+ return store;
+ }
+
+ public void setStore(StoreProperties store) {
+ this.store = store;
+ }
+
+ /**
+ * 数据入口配置属性
+ * 入口可以是从kafka rabbitmq rocketmq等消息中间件获取数据
+ */
+ public static class EntranceProperties {
+
+ /**
+ * kafka配置信息
+ */
+ private EntranceProperties.KafkaProperties kafka;
+
+ public EntranceProperties.KafkaProperties getKafka() {
+ return kafka;
+ }
+
+ public void setKafka(EntranceProperties.KafkaProperties kafka) {
+ this.kafka = kafka;
+ }
+
+ public static class KafkaProperties {
+ /**
+ * kafka数据入口是否启动
+ */
+ private boolean enabled = true;
+
+ /**
+ * kafka的连接服务器url
+ */
+ private String servers = "127.0.0.1:9092";
+ /**
+ * 接收数据的topic名称
+ */
+ private String topic;
+ /**
+ * 消费者组ID
+ */
+ private String groupId;
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ public String getServers() {
+ return servers;
+ }
+
+ public void setServers(String servers) {
+ this.servers = servers;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public String getGroupId() {
+ return groupId;
+ }
+
+ public void setGroupId(String groupId) {
+ this.groupId = groupId;
+ }
+ }
+
+ }
+
+ /**
+ * 调度数据出口配置属性
+ */
+ public static class StoreProperties {
+
+ /**
+ * influxdb配置信息
+ */
+ private InfluxdbProperties influxdb;
+ /**
+ * redis配置信息
+ */
+ private RedisProperties redis;
+
+ public InfluxdbProperties getInfluxdb() {
+ return influxdb;
+ }
+
+ public void setInfluxdb(InfluxdbProperties influxdb) {
+ this.influxdb = influxdb;
+ }
+
+ public RedisProperties getRedis() {
+ return redis;
+ }
+
+ public void setRedis(RedisProperties redis) {
+ this.redis = redis;
+ }
+
+ public static class InfluxdbProperties {
+ /**
+ * influxdb数据存储是否启动
+ */
+ private boolean enabled = true;
+ /**
+ * influxdb的连接服务器url
+ */
+ private String servers = "http://127.0.0.1:8086";
+ /**
+ * 认证token
+ */
+ private String token;
+ /**
+ * 仓库名称
+ */
+ private String bucket;
+ /**
+ * 组织名称
+ */
+ private String org;
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ public String getServers() {
+ return servers;
+ }
+
+ public void setServers(String servers) {
+ this.servers = servers;
+ }
+
+ public String getToken() {
+ return token;
+ }
+
+ public void setToken(String token) {
+ this.token = token;
+ }
+
+ public String getBucket() {
+ return bucket;
+ }
+
+ public void setBucket(String bucket) {
+ this.bucket = bucket;
+ }
+
+ public String getOrg() {
+ return org;
+ }
+
+ public void setOrg(String org) {
+ this.org = org;
+ }
+ }
+
+ public static class RedisProperties {
+ /**
+ * redis数据存储是否启动
+ */
+ private boolean enabled = true;
+ /**
+ * redis 主机host
+ */
+ private String host = "127.0.0.1";
+ /**
+ * redis 主机端口
+ */
+ private Integer port = 6379;
+ /**
+ * redis 访问密码
+ */
+ private String password;
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public Integer getPort() {
+ return port;
+ }
+
+ public void setPort(Integer port) {
+ this.port = port;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+ }
+ }
+
+}
diff --git a/warehouse/src/main/java/com/usthe/warehouse/WarehouseWorkerPool.java b/warehouse/src/main/java/com/usthe/warehouse/WarehouseWorkerPool.java
new file mode 100644
index 0000000..1175f13
--- /dev/null
+++ b/warehouse/src/main/java/com/usthe/warehouse/WarehouseWorkerPool.java
@@ -0,0 +1,54 @@
+package com.usthe.warehouse;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * warehouse 工作线程池
+ * @author tom
+ * @date 2021/11/24 18:09
+ */
+@Component
+@Slf4j
+public class WarehouseWorkerPool {
+
+ private ThreadPoolExecutor workerExecutor;
+
+ public WarehouseWorkerPool() {
+ initWorkExecutor();
+ }
+
+ private void initWorkExecutor() {
+ // 线程工厂
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setUncaughtExceptionHandler((thread, throwable) -> {
+ log.error("workerExecutor has uncaughtException.");
+ log.error(throwable.getMessage(), throwable); })
+ .setDaemon(true)
+ .setNameFormat("warehouse-worker-%d")
+ .build();
+ workerExecutor = new ThreadPoolExecutor(6,
+ 10,
+ 10,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ threadFactory,
+ new ThreadPoolExecutor.AbortPolicy());
+ }
+
+ /**
+ * 运行warehouse任务
+ * @param runnable 任务
+ * @throws RejectedExecutionException when 线程池满
+ */
+ public void executeJob(Runnable runnable) throws RejectedExecutionException {
+ workerExecutor.execute(runnable);
+ }
+}
diff --git a/warehouse/src/main/java/com/usthe/warehouse/entrance/KafkaDataConsume.java b/warehouse/src/main/java/com/usthe/warehouse/entrance/KafkaDataConsume.java
new file mode 100644
index 0000000..ad99bed
--- /dev/null
+++ b/warehouse/src/main/java/com/usthe/warehouse/entrance/KafkaDataConsume.java
@@ -0,0 +1,81 @@
+package com.usthe.warehouse.entrance;
+
+import com.usthe.common.entity.message.CollectRep;
+import com.usthe.warehouse.MetricsDataQueue;
+import com.usthe.warehouse.WarehouseProperties;
+import com.usthe.warehouse.WarehouseWorkerPool;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.boot.autoconfigure.AutoConfigureAfter;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Configuration;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+
+/**
+ * 从Kafka消费指标组采集数据处理
+ * @author tom
+ * @date 2021/11/24 18:03
+ */
+@Configuration
+@AutoConfigureAfter(value = {WarehouseProperties.class})
+@ConditionalOnProperty(prefix = "warehouse.entrance.kafka",
+ name = "enabled", havingValue = "true", matchIfMissing = true)
+@Slf4j
+public class KafkaDataConsume implements DisposableBean {
+
+ private KafkaConsumer consumer;
+ private WarehouseWorkerPool workerPool;
+ private MetricsDataQueue dataQueue;
+ public KafkaDataConsume(WarehouseProperties properties, WarehouseWorkerPool workerPool,
+ MetricsDataQueue dataQueue) {
+ this.workerPool = workerPool;
+ this.dataQueue = dataQueue;
+ initConsumer(properties);
+ startConsumeData();
+ }
+
+ private void startConsumeData() {
+ Runnable runnable = () -> {
+ Thread.currentThread().setName("warehouse-kafka-data-consumer");
+ while (!Thread.currentThread().isInterrupted()) {
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
+ records.forEach(record -> {
+ dataQueue.addMetricsDataToInflux(record.value());
+ dataQueue.addMetricsDataToRedis(record.value());
+ });
+ }
+ };
+ workerPool.executeJob(runnable);
+ }
+
+ private void initConsumer(WarehouseProperties properties) {
+ if (properties == null || properties.getEntrance() == null || properties.getEntrance().getKafka() == null) {
+ log.error("init error, please config Warehouse kafka props in application.yml");
+ throw new IllegalArgumentException("please config Warehouse kafka props");
+ }
+ WarehouseProperties.EntranceProperties.KafkaProperties kafkaProp = properties.getEntrance().getKafka();
+ Properties consumerProp = new Properties();
+ consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProp.getServers());
+ consumerProp.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProp.getGroupId());
+ consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+ consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaMetricsDataDeserializer.class);
+ consumerProp.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
+ consumerProp.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
+ consumer = new KafkaConsumer<>(consumerProp);
+ consumer.subscribe(Collections.singleton(kafkaProp.getTopic()));
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ if (consumer != null) {
+ consumer.close();
+ }
+ }
+}
diff --git a/warehouse/src/main/java/com/usthe/warehouse/entrance/KafkaMetricsDataDeserializer.java b/warehouse/src/main/java/com/usthe/warehouse/entrance/KafkaMetricsDataDeserializer.java
new file mode 100644
index 0000000..544265a
--- /dev/null
+++ b/warehouse/src/main/java/com/usthe/warehouse/entrance/KafkaMetricsDataDeserializer.java
@@ -0,0 +1,24 @@
+package com.usthe.warehouse.entrance;
+
+import com.usthe.common.entity.message.CollectRep;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.Deserializer;
+
+/**
+ * MetricsData的反序列化
+ * @author tom
+ * @date 2021/11/24 17:29
+ */
+@Slf4j
+public class KafkaMetricsDataDeserializer implements Deserializer {
+
+ @Override
+ public CollectRep.MetricsData deserialize(String topicName, byte[] bytes) {
+ try {
+ return CollectRep.MetricsData.parseFrom(bytes);
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
+ return null;
+ }
+}
diff --git a/warehouse/src/main/java/com/usthe/warehouse/store/InfluxdbDataStorage.java b/warehouse/src/main/java/com/usthe/warehouse/store/InfluxdbDataStorage.java
new file mode 100644
index 0000000..073d77c
--- /dev/null
+++ b/warehouse/src/main/java/com/usthe/warehouse/store/InfluxdbDataStorage.java
@@ -0,0 +1,110 @@
+package com.usthe.warehouse.store;
+
+import com.google.protobuf.ProtocolStringList;
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.InfluxDBClientFactory;
+import com.influxdb.client.WriteApi;
+import com.influxdb.client.WriteOptions;
+import com.influxdb.client.domain.WritePrecision;
+import com.influxdb.client.write.Point;
+import com.usthe.common.entity.message.CollectRep;
+import com.usthe.warehouse.MetricsDataQueue;
+import com.usthe.warehouse.WarehouseProperties;
+import com.usthe.warehouse.WarehouseWorkerPool;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.boot.autoconfigure.AutoConfigureAfter;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Configuration;
+
+import java.time.Instant;
+
+/**
+ * influxdb存储采集数据
+ * @author tom
+ * @date 2021/11/24 18:23
+ */
+@Configuration
+@AutoConfigureAfter(value = {WarehouseProperties.class})
+@ConditionalOnProperty(prefix = "warehouse.store.influxdb",
+ name = "enabled", havingValue = "true", matchIfMissing = true)
+@Slf4j
+public class InfluxdbDataStorage implements DisposableBean {
+
+ private InfluxDBClient influxClient;
+ private WriteApi writeApi;
+ private WarehouseWorkerPool workerPool;
+ private MetricsDataQueue dataQueue;
+
+ public InfluxdbDataStorage (WarehouseProperties properties, WarehouseWorkerPool workerPool,
+ MetricsDataQueue dataQueue) {
+ this.workerPool = workerPool;
+ this.dataQueue = dataQueue;
+ initInfluxDbClient(properties);
+ startStorageData();
+ }
+
+ private void startStorageData() {
+ Runnable runnable = () -> {
+ Thread.currentThread().setName("warehouse-influxdb-data-storage");
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ CollectRep.MetricsData metricsData = dataQueue.pollInfluxMetricsData();
+ if (metricsData != null) {
+ saveData(metricsData);
+ }
+ } catch (InterruptedException e) {
+ log.error(e.getMessage());
+ }
+ }
+ };
+ workerPool.executeJob(runnable);
+ workerPool.executeJob(runnable);
+ }
+
+ private void initInfluxDbClient(WarehouseProperties properties) {
+ if (properties == null || properties.getStore() == null || properties.getStore().getInfluxdb() == null) {
+ log.error("init error, please config Warehouse influxdb props in application.yml");
+ throw new IllegalArgumentException("please config Warehouse influxdb props");
+ }
+ WarehouseProperties.StoreProperties.InfluxdbProperties influxdbProp = properties.getStore().getInfluxdb();
+ influxClient = InfluxDBClientFactory.create(influxdbProp.getServers(), influxdbProp.getToken().toCharArray(),
+ influxdbProp.getOrg(), influxdbProp.getBucket());
+ WriteOptions writeOptions = WriteOptions.builder()
+ .batchSize(1000)
+ .bufferLimit(1000)
+ .jitterInterval(1000)
+ .retryInterval(5000)
+ .build();
+ writeApi = influxClient.makeWriteApi(writeOptions);
+ }
+
+
+ public void saveData(CollectRep.MetricsData metricsData) {
+ String measurement = metricsData.getApp() + "--" + metricsData.getMetrics();
+ String monitorId = String.valueOf(metricsData.getId());
+ Instant collectTime = Instant.ofEpochMilli(metricsData.getTime());
+ ProtocolStringList fields = metricsData.getFieldsList();
+ for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
+ Point point = Point.measurement(measurement)
+ .addTag("id", monitorId)
+ .addTag("instance", valueRow.getInstance())
+ .time(collectTime, WritePrecision.MS);
+ for (int index = 0; index < fields.size(); index++) {
+ point.addField(fields.get(index), valueRow.getColumns(index));
+ }
+ writeApi.writePoint(point);
+ }
+ }
+
+
+ @Override
+ public void destroy() throws Exception {
+ if (writeApi != null) {
+ writeApi.close();
+ }
+ if (influxClient != null) {
+ influxClient.close();
+ }
+ }
+}
diff --git a/warehouse/src/main/java/com/usthe/warehouse/store/MetricsDataRedisCodec.java b/warehouse/src/main/java/com/usthe/warehouse/store/MetricsDataRedisCodec.java
new file mode 100644
index 0000000..55a94f2
--- /dev/null
+++ b/warehouse/src/main/java/com/usthe/warehouse/store/MetricsDataRedisCodec.java
@@ -0,0 +1,42 @@
+package com.usthe.warehouse.store;
+
+import com.usthe.common.entity.message.CollectRep;
+import io.lettuce.core.codec.RedisCodec;
+import lombok.extern.slf4j.Slf4j;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * MetricsData redis 序列化
+ * @author tom
+ * @date 2021/11/25 10:42
+ */
+@Slf4j
+public class MetricsDataRedisCodec implements RedisCodec {
+
+ @Override
+ public String decodeKey(ByteBuffer byteBuffer) {
+ return new String(byteBuffer.array(), byteBuffer.position(), byteBuffer.limit(), StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public CollectRep.MetricsData decodeValue(ByteBuffer byteBuffer) {
+ try {
+ return CollectRep.MetricsData.parseFrom(byteBuffer);
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ return null;
+ }
+ }
+
+ @Override
+ public ByteBuffer encodeKey(String s) {
+ return ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8));
+ }
+
+ @Override
+ public ByteBuffer encodeValue(CollectRep.MetricsData metricsData) {
+ return ByteBuffer.wrap(metricsData.toByteArray());
+ }
+}
diff --git a/warehouse/src/main/java/com/usthe/warehouse/store/RedisDataStorage.java b/warehouse/src/main/java/com/usthe/warehouse/store/RedisDataStorage.java
new file mode 100644
index 0000000..9f1979b
--- /dev/null
+++ b/warehouse/src/main/java/com/usthe/warehouse/store/RedisDataStorage.java
@@ -0,0 +1,106 @@
+package com.usthe.warehouse.store;
+
+import com.usthe.common.entity.message.CollectRep;
+import com.usthe.warehouse.MetricsDataQueue;
+import com.usthe.warehouse.WarehouseProperties;
+import com.usthe.warehouse.WarehouseWorkerPool;
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.RedisURI;
+import io.lettuce.core.api.StatefulRedisConnection;
+import io.lettuce.core.api.async.RedisAsyncCommands;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.boot.autoconfigure.AutoConfigureAfter;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Configuration;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+
+/**
+ * redis存储采集实时数据
+ * @author tom
+ * @date 2021/11/25 10:26
+ */
+@Configuration
+@AutoConfigureAfter(value = {WarehouseProperties.class})
+@ConditionalOnProperty(prefix = "warehouse.store.redis",
+ name = "enabled", havingValue = "true", matchIfMissing = true)
+@Slf4j
+public class RedisDataStorage implements DisposableBean {
+
+ private RedisClient redisClient;
+ private StatefulRedisConnection connection;
+ private WarehouseWorkerPool workerPool;
+ private MetricsDataQueue dataQueue;
+
+ public RedisDataStorage (WarehouseProperties properties, WarehouseWorkerPool workerPool,
+ MetricsDataQueue dataQueue) {
+ this.workerPool = workerPool;
+ this.dataQueue = dataQueue;
+
+ initRedisClient(properties);
+ startStorageData();
+ }
+
+ private void startStorageData() {
+ Runnable runnable = () -> {
+ Thread.currentThread().setName("warehouse-redis-data-storage");
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ CollectRep.MetricsData metricsData = dataQueue.pollRedisMetricsData();
+ if (metricsData != null) {
+ saveData(metricsData);
+ }
+ } catch (InterruptedException e) {
+ log.error(e.getMessage());
+ }
+ }
+ };
+ workerPool.executeJob(runnable);
+ }
+
+ private void saveData(CollectRep.MetricsData metricsData) {
+ String key = String.valueOf(metricsData.getId());
+ String hashKey = metricsData.getMetrics();
+ if (metricsData.getValuesList().isEmpty()) {
+ log.info("[warehouse] redis flush metrics data {}:{} is null, ignore.", key, hashKey);
+ return;
+ }
+ RedisAsyncCommands commands = connection.async();
+ commands.hset(key, hashKey, metricsData).thenAccept(response -> {
+ if (response) {
+ log.debug("[warehouse] redis add new data {}:{}.", key, hashKey);
+ } else {
+ log.debug("[warehouse] redis replace data {}:{}.", key, hashKey);
+ }
+ });
+ }
+
+ private void initRedisClient(WarehouseProperties properties) {
+ if (properties == null || properties.getStore() == null || properties.getStore().getRedis() == null) {
+ log.error("init error, please config Warehouse redis props in application.yml");
+ throw new IllegalArgumentException("please config Warehouse redis props");
+ }
+ WarehouseProperties.StoreProperties.RedisProperties redisProp = properties.getStore().getRedis();
+ RedisURI.Builder uriBuilder = RedisURI.builder()
+ .withHost(redisProp.getHost())
+ .withPort(redisProp.getPort())
+ .withTimeout(Duration.of(10, ChronoUnit.SECONDS));
+ if (redisProp.getPassword() != null && !"".equals(redisProp.getPassword())) {
+ uriBuilder.withPassword(redisProp.getPassword().toCharArray());
+ }
+ redisClient = RedisClient.create(uriBuilder.build());
+ connection = redisClient.connect(new MetricsDataRedisCodec());
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ if (connection != null) {
+ connection.close();
+ }
+ if (redisClient != null) {
+ redisClient.shutdown();
+ }
+ }
+}
diff --git a/warehouse/src/main/resources/META-INF/spring.factories b/warehouse/src/main/resources/META-INF/spring.factories
new file mode 100644
index 0000000..d5af525
--- /dev/null
+++ b/warehouse/src/main/resources/META-INF/spring.factories
@@ -0,0 +1,7 @@
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
+com.usthe.warehouse.WarehouseProperties,\
+com.usthe.warehouse.MetricsDataQueue,\
+com.usthe.warehouse.WarehouseWorkerPool,\
+com.usthe.warehouse.entrance.KafkaDataConsume,\
+com.usthe.warehouse.store.InfluxdbDataStorage,\
+com.usthe.warehouse.store.RedisDataStorage
\ No newline at end of file