diff --git a/Architecture.png b/Architecture.png new file mode 100644 index 0000000..acfa096 Binary files /dev/null and b/Architecture.png differ diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/export/KafkaDataExporter.java b/collector/server/src/main/java/com/usthe/collector/dispatch/export/KafkaDataExporter.java index a47edf2..5bb8f8d 100644 --- a/collector/server/src/main/java/com/usthe/collector/dispatch/export/KafkaDataExporter.java +++ b/collector/server/src/main/java/com/usthe/collector/dispatch/export/KafkaDataExporter.java @@ -6,6 +6,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -25,17 +26,16 @@ import java.util.Properties; @Slf4j public class KafkaDataExporter { - KafkaProducer kafkaProducer; + KafkaProducer kafkaProducer; DispatchProperties.ExportProperties.KafkaProperties kafkaProperties; public KafkaDataExporter(DispatchProperties dispatchProperties) { try { kafkaProperties = dispatchProperties.getExport().getKafka(); Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getServers()); - properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaMetricsDataSerializer.class); kafkaProducer = new KafkaProducer<>(properties); - kafkaProducer.send(new ProducerRecord<>(kafkaProperties.getTopic(), CollectRep.MetricsData.newBuilder().setApp("dddd").build())); } catch (Exception e) { log.error(e.getMessage(), e); } @@ -47,7 +47,7 @@ public class KafkaDataExporter { */ public void send(CollectRep.MetricsData metricsData) { if (kafkaProducer != null) { - kafkaProducer.send(new ProducerRecord<>(kafkaProperties.getTopic(), metricsData)); + kafkaProducer.send(new ProducerRecord<>(kafkaProperties.getTopic(), metricsData.getId(), metricsData)); } else { log.error("kafkaProducer is not enable"); } diff --git a/manager/pom.xml b/manager/pom.xml index 211832f..1c66c88 100644 --- a/manager/pom.xml +++ b/manager/pom.xml @@ -29,6 +29,12 @@ scheduler 1.0-SNAPSHOT + + + com.usthe.tancloud + warehouse + 1.0-SNAPSHOT + org.springframework.boot diff --git a/pom.xml b/pom.xml index 3480b4d..508035d 100644 --- a/pom.xml +++ b/pom.xml @@ -14,6 +14,7 @@ alerter common collector + warehouse diff --git a/warehouse/pom.xml b/warehouse/pom.xml new file mode 100644 index 0000000..eb4f04d --- /dev/null +++ b/warehouse/pom.xml @@ -0,0 +1,68 @@ + + + + monitor + com.usthe.tancloud + 1.0-SNAPSHOT + + 4.0.0 + + warehouse + + + + + com.usthe.tancloud + common + 1.0-SNAPSHOT + + + + org.springframework.boot + spring-boot-starter-web + provided + + + org.springframework.boot + spring-boot-autoconfigure + + + org.springframework.boot + spring-boot-configuration-processor + true + + + + com.influxdb + influxdb-client-java + 3.4.0 + + + + org.apache.kafka + kafka-clients + 3.0.0 + + + + io.lettuce + lettuce-core + provided + + + + io.springfox + springfox-boot-starter + provided + + + + org.springframework.cloud + spring-cloud-starter-openfeign + 3.0.5 + provided + + + \ No newline at end of file diff --git a/warehouse/src/main/java/com/usthe/warehouse/MetricsDataQueue.java b/warehouse/src/main/java/com/usthe/warehouse/MetricsDataQueue.java new file mode 100644 index 0000000..c7a82ee --- /dev/null +++ b/warehouse/src/main/java/com/usthe/warehouse/MetricsDataQueue.java @@ -0,0 +1,42 @@ +package com.usthe.warehouse; + +import com.usthe.common.entity.message.CollectRep; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * 采集数据队列 + * @author tom + * @date 2021/11/24 17:58 + */ +@Component +@Slf4j +public class MetricsDataQueue { + + private final LinkedBlockingQueue dataInfluxQueue; + private final LinkedBlockingQueue dataRedisQueue; + + public MetricsDataQueue() { + dataInfluxQueue = new LinkedBlockingQueue<>(); + dataRedisQueue = new LinkedBlockingQueue<>(); + } + + public void addMetricsDataToInflux(CollectRep.MetricsData metricsData) { + dataInfluxQueue.offer(metricsData); + } + + public CollectRep.MetricsData pollInfluxMetricsData() throws InterruptedException { + return dataInfluxQueue.poll(2, TimeUnit.SECONDS); + } + + public void addMetricsDataToRedis(CollectRep.MetricsData metricsData) { + dataRedisQueue.offer(metricsData); + } + + public CollectRep.MetricsData pollRedisMetricsData() throws InterruptedException { + return dataRedisQueue.poll(2, TimeUnit.SECONDS); + } +} diff --git a/warehouse/src/main/java/com/usthe/warehouse/WarehouseProperties.java b/warehouse/src/main/java/com/usthe/warehouse/WarehouseProperties.java new file mode 100644 index 0000000..4e50197 --- /dev/null +++ b/warehouse/src/main/java/com/usthe/warehouse/WarehouseProperties.java @@ -0,0 +1,259 @@ +package com.usthe.warehouse; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * 数据仓储配置属性 + * @author tom + * @date 2021/11/24 10:38 + */ +@Component +@ConfigurationProperties(prefix = "warehouse") +public class WarehouseProperties { + + /** + * 数据入口配置属性 + */ + private EntranceProperties entrance; + + /** + * 数据存储配置属性 + */ + private StoreProperties store; + + public EntranceProperties getEntrance() { + return entrance; + } + + public void setEntrance(EntranceProperties entrance) { + this.entrance = entrance; + } + + public StoreProperties getStore() { + return store; + } + + public void setStore(StoreProperties store) { + this.store = store; + } + + /** + * 数据入口配置属性 + * 入口可以是从kafka rabbitmq rocketmq等消息中间件获取数据 + */ + public static class EntranceProperties { + + /** + * kafka配置信息 + */ + private EntranceProperties.KafkaProperties kafka; + + public EntranceProperties.KafkaProperties getKafka() { + return kafka; + } + + public void setKafka(EntranceProperties.KafkaProperties kafka) { + this.kafka = kafka; + } + + public static class KafkaProperties { + /** + * kafka数据入口是否启动 + */ + private boolean enabled = true; + + /** + * kafka的连接服务器url + */ + private String servers = "127.0.0.1:9092"; + /** + * 接收数据的topic名称 + */ + private String topic; + /** + * 消费者组ID + */ + private String groupId; + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public String getServers() { + return servers; + } + + public void setServers(String servers) { + this.servers = servers; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + } + + } + + /** + * 调度数据出口配置属性 + */ + public static class StoreProperties { + + /** + * influxdb配置信息 + */ + private InfluxdbProperties influxdb; + /** + * redis配置信息 + */ + private RedisProperties redis; + + public InfluxdbProperties getInfluxdb() { + return influxdb; + } + + public void setInfluxdb(InfluxdbProperties influxdb) { + this.influxdb = influxdb; + } + + public RedisProperties getRedis() { + return redis; + } + + public void setRedis(RedisProperties redis) { + this.redis = redis; + } + + public static class InfluxdbProperties { + /** + * influxdb数据存储是否启动 + */ + private boolean enabled = true; + /** + * influxdb的连接服务器url + */ + private String servers = "http://127.0.0.1:8086"; + /** + * 认证token + */ + private String token; + /** + * 仓库名称 + */ + private String bucket; + /** + * 组织名称 + */ + private String org; + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public String getServers() { + return servers; + } + + public void setServers(String servers) { + this.servers = servers; + } + + public String getToken() { + return token; + } + + public void setToken(String token) { + this.token = token; + } + + public String getBucket() { + return bucket; + } + + public void setBucket(String bucket) { + this.bucket = bucket; + } + + public String getOrg() { + return org; + } + + public void setOrg(String org) { + this.org = org; + } + } + + public static class RedisProperties { + /** + * redis数据存储是否启动 + */ + private boolean enabled = true; + /** + * redis 主机host + */ + private String host = "127.0.0.1"; + /** + * redis 主机端口 + */ + private Integer port = 6379; + /** + * redis 访问密码 + */ + private String password; + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public Integer getPort() { + return port; + } + + public void setPort(Integer port) { + this.port = port; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + } + } + +} diff --git a/warehouse/src/main/java/com/usthe/warehouse/WarehouseWorkerPool.java b/warehouse/src/main/java/com/usthe/warehouse/WarehouseWorkerPool.java new file mode 100644 index 0000000..1175f13 --- /dev/null +++ b/warehouse/src/main/java/com/usthe/warehouse/WarehouseWorkerPool.java @@ -0,0 +1,54 @@ +package com.usthe.warehouse; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * warehouse 工作线程池 + * @author tom + * @date 2021/11/24 18:09 + */ +@Component +@Slf4j +public class WarehouseWorkerPool { + + private ThreadPoolExecutor workerExecutor; + + public WarehouseWorkerPool() { + initWorkExecutor(); + } + + private void initWorkExecutor() { + // 线程工厂 + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setUncaughtExceptionHandler((thread, throwable) -> { + log.error("workerExecutor has uncaughtException."); + log.error(throwable.getMessage(), throwable); }) + .setDaemon(true) + .setNameFormat("warehouse-worker-%d") + .build(); + workerExecutor = new ThreadPoolExecutor(6, + 10, + 10, + TimeUnit.SECONDS, + new SynchronousQueue<>(), + threadFactory, + new ThreadPoolExecutor.AbortPolicy()); + } + + /** + * 运行warehouse任务 + * @param runnable 任务 + * @throws RejectedExecutionException when 线程池满 + */ + public void executeJob(Runnable runnable) throws RejectedExecutionException { + workerExecutor.execute(runnable); + } +} diff --git a/warehouse/src/main/java/com/usthe/warehouse/entrance/KafkaDataConsume.java b/warehouse/src/main/java/com/usthe/warehouse/entrance/KafkaDataConsume.java new file mode 100644 index 0000000..ad99bed --- /dev/null +++ b/warehouse/src/main/java/com/usthe/warehouse/entrance/KafkaDataConsume.java @@ -0,0 +1,81 @@ +package com.usthe.warehouse.entrance; + +import com.usthe.common.entity.message.CollectRep; +import com.usthe.warehouse.MetricsDataQueue; +import com.usthe.warehouse.WarehouseProperties; +import com.usthe.warehouse.WarehouseWorkerPool; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Configuration; + +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; + +/** + * 从Kafka消费指标组采集数据处理 + * @author tom + * @date 2021/11/24 18:03 + */ +@Configuration +@AutoConfigureAfter(value = {WarehouseProperties.class}) +@ConditionalOnProperty(prefix = "warehouse.entrance.kafka", + name = "enabled", havingValue = "true", matchIfMissing = true) +@Slf4j +public class KafkaDataConsume implements DisposableBean { + + private KafkaConsumer consumer; + private WarehouseWorkerPool workerPool; + private MetricsDataQueue dataQueue; + public KafkaDataConsume(WarehouseProperties properties, WarehouseWorkerPool workerPool, + MetricsDataQueue dataQueue) { + this.workerPool = workerPool; + this.dataQueue = dataQueue; + initConsumer(properties); + startConsumeData(); + } + + private void startConsumeData() { + Runnable runnable = () -> { + Thread.currentThread().setName("warehouse-kafka-data-consumer"); + while (!Thread.currentThread().isInterrupted()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + records.forEach(record -> { + dataQueue.addMetricsDataToInflux(record.value()); + dataQueue.addMetricsDataToRedis(record.value()); + }); + } + }; + workerPool.executeJob(runnable); + } + + private void initConsumer(WarehouseProperties properties) { + if (properties == null || properties.getEntrance() == null || properties.getEntrance().getKafka() == null) { + log.error("init error, please config Warehouse kafka props in application.yml"); + throw new IllegalArgumentException("please config Warehouse kafka props"); + } + WarehouseProperties.EntranceProperties.KafkaProperties kafkaProp = properties.getEntrance().getKafka(); + Properties consumerProp = new Properties(); + consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProp.getServers()); + consumerProp.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProp.getGroupId()); + consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); + consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaMetricsDataDeserializer.class); + consumerProp.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); + consumerProp.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); + consumer = new KafkaConsumer<>(consumerProp); + consumer.subscribe(Collections.singleton(kafkaProp.getTopic())); + } + + @Override + public void destroy() throws Exception { + if (consumer != null) { + consumer.close(); + } + } +} diff --git a/warehouse/src/main/java/com/usthe/warehouse/entrance/KafkaMetricsDataDeserializer.java b/warehouse/src/main/java/com/usthe/warehouse/entrance/KafkaMetricsDataDeserializer.java new file mode 100644 index 0000000..544265a --- /dev/null +++ b/warehouse/src/main/java/com/usthe/warehouse/entrance/KafkaMetricsDataDeserializer.java @@ -0,0 +1,24 @@ +package com.usthe.warehouse.entrance; + +import com.usthe.common.entity.message.CollectRep; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.Deserializer; + +/** + * MetricsData的反序列化 + * @author tom + * @date 2021/11/24 17:29 + */ +@Slf4j +public class KafkaMetricsDataDeserializer implements Deserializer { + + @Override + public CollectRep.MetricsData deserialize(String topicName, byte[] bytes) { + try { + return CollectRep.MetricsData.parseFrom(bytes); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + return null; + } +} diff --git a/warehouse/src/main/java/com/usthe/warehouse/store/InfluxdbDataStorage.java b/warehouse/src/main/java/com/usthe/warehouse/store/InfluxdbDataStorage.java new file mode 100644 index 0000000..073d77c --- /dev/null +++ b/warehouse/src/main/java/com/usthe/warehouse/store/InfluxdbDataStorage.java @@ -0,0 +1,110 @@ +package com.usthe.warehouse.store; + +import com.google.protobuf.ProtocolStringList; +import com.influxdb.client.InfluxDBClient; +import com.influxdb.client.InfluxDBClientFactory; +import com.influxdb.client.WriteApi; +import com.influxdb.client.WriteOptions; +import com.influxdb.client.domain.WritePrecision; +import com.influxdb.client.write.Point; +import com.usthe.common.entity.message.CollectRep; +import com.usthe.warehouse.MetricsDataQueue; +import com.usthe.warehouse.WarehouseProperties; +import com.usthe.warehouse.WarehouseWorkerPool; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Configuration; + +import java.time.Instant; + +/** + * influxdb存储采集数据 + * @author tom + * @date 2021/11/24 18:23 + */ +@Configuration +@AutoConfigureAfter(value = {WarehouseProperties.class}) +@ConditionalOnProperty(prefix = "warehouse.store.influxdb", + name = "enabled", havingValue = "true", matchIfMissing = true) +@Slf4j +public class InfluxdbDataStorage implements DisposableBean { + + private InfluxDBClient influxClient; + private WriteApi writeApi; + private WarehouseWorkerPool workerPool; + private MetricsDataQueue dataQueue; + + public InfluxdbDataStorage (WarehouseProperties properties, WarehouseWorkerPool workerPool, + MetricsDataQueue dataQueue) { + this.workerPool = workerPool; + this.dataQueue = dataQueue; + initInfluxDbClient(properties); + startStorageData(); + } + + private void startStorageData() { + Runnable runnable = () -> { + Thread.currentThread().setName("warehouse-influxdb-data-storage"); + while (!Thread.currentThread().isInterrupted()) { + try { + CollectRep.MetricsData metricsData = dataQueue.pollInfluxMetricsData(); + if (metricsData != null) { + saveData(metricsData); + } + } catch (InterruptedException e) { + log.error(e.getMessage()); + } + } + }; + workerPool.executeJob(runnable); + workerPool.executeJob(runnable); + } + + private void initInfluxDbClient(WarehouseProperties properties) { + if (properties == null || properties.getStore() == null || properties.getStore().getInfluxdb() == null) { + log.error("init error, please config Warehouse influxdb props in application.yml"); + throw new IllegalArgumentException("please config Warehouse influxdb props"); + } + WarehouseProperties.StoreProperties.InfluxdbProperties influxdbProp = properties.getStore().getInfluxdb(); + influxClient = InfluxDBClientFactory.create(influxdbProp.getServers(), influxdbProp.getToken().toCharArray(), + influxdbProp.getOrg(), influxdbProp.getBucket()); + WriteOptions writeOptions = WriteOptions.builder() + .batchSize(1000) + .bufferLimit(1000) + .jitterInterval(1000) + .retryInterval(5000) + .build(); + writeApi = influxClient.makeWriteApi(writeOptions); + } + + + public void saveData(CollectRep.MetricsData metricsData) { + String measurement = metricsData.getApp() + "--" + metricsData.getMetrics(); + String monitorId = String.valueOf(metricsData.getId()); + Instant collectTime = Instant.ofEpochMilli(metricsData.getTime()); + ProtocolStringList fields = metricsData.getFieldsList(); + for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) { + Point point = Point.measurement(measurement) + .addTag("id", monitorId) + .addTag("instance", valueRow.getInstance()) + .time(collectTime, WritePrecision.MS); + for (int index = 0; index < fields.size(); index++) { + point.addField(fields.get(index), valueRow.getColumns(index)); + } + writeApi.writePoint(point); + } + } + + + @Override + public void destroy() throws Exception { + if (writeApi != null) { + writeApi.close(); + } + if (influxClient != null) { + influxClient.close(); + } + } +} diff --git a/warehouse/src/main/java/com/usthe/warehouse/store/MetricsDataRedisCodec.java b/warehouse/src/main/java/com/usthe/warehouse/store/MetricsDataRedisCodec.java new file mode 100644 index 0000000..55a94f2 --- /dev/null +++ b/warehouse/src/main/java/com/usthe/warehouse/store/MetricsDataRedisCodec.java @@ -0,0 +1,42 @@ +package com.usthe.warehouse.store; + +import com.usthe.common.entity.message.CollectRep; +import io.lettuce.core.codec.RedisCodec; +import lombok.extern.slf4j.Slf4j; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +/** + * MetricsData redis 序列化 + * @author tom + * @date 2021/11/25 10:42 + */ +@Slf4j +public class MetricsDataRedisCodec implements RedisCodec { + + @Override + public String decodeKey(ByteBuffer byteBuffer) { + return new String(byteBuffer.array(), byteBuffer.position(), byteBuffer.limit(), StandardCharsets.UTF_8); + } + + @Override + public CollectRep.MetricsData decodeValue(ByteBuffer byteBuffer) { + try { + return CollectRep.MetricsData.parseFrom(byteBuffer); + } catch (Exception e) { + log.error(e.getMessage()); + return null; + } + } + + @Override + public ByteBuffer encodeKey(String s) { + return ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public ByteBuffer encodeValue(CollectRep.MetricsData metricsData) { + return ByteBuffer.wrap(metricsData.toByteArray()); + } +} diff --git a/warehouse/src/main/java/com/usthe/warehouse/store/RedisDataStorage.java b/warehouse/src/main/java/com/usthe/warehouse/store/RedisDataStorage.java new file mode 100644 index 0000000..9f1979b --- /dev/null +++ b/warehouse/src/main/java/com/usthe/warehouse/store/RedisDataStorage.java @@ -0,0 +1,106 @@ +package com.usthe.warehouse.store; + +import com.usthe.common.entity.message.CollectRep; +import com.usthe.warehouse.MetricsDataQueue; +import com.usthe.warehouse.WarehouseProperties; +import com.usthe.warehouse.WarehouseWorkerPool; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.async.RedisAsyncCommands; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Configuration; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; + +/** + * redis存储采集实时数据 + * @author tom + * @date 2021/11/25 10:26 + */ +@Configuration +@AutoConfigureAfter(value = {WarehouseProperties.class}) +@ConditionalOnProperty(prefix = "warehouse.store.redis", + name = "enabled", havingValue = "true", matchIfMissing = true) +@Slf4j +public class RedisDataStorage implements DisposableBean { + + private RedisClient redisClient; + private StatefulRedisConnection connection; + private WarehouseWorkerPool workerPool; + private MetricsDataQueue dataQueue; + + public RedisDataStorage (WarehouseProperties properties, WarehouseWorkerPool workerPool, + MetricsDataQueue dataQueue) { + this.workerPool = workerPool; + this.dataQueue = dataQueue; + + initRedisClient(properties); + startStorageData(); + } + + private void startStorageData() { + Runnable runnable = () -> { + Thread.currentThread().setName("warehouse-redis-data-storage"); + while (!Thread.currentThread().isInterrupted()) { + try { + CollectRep.MetricsData metricsData = dataQueue.pollRedisMetricsData(); + if (metricsData != null) { + saveData(metricsData); + } + } catch (InterruptedException e) { + log.error(e.getMessage()); + } + } + }; + workerPool.executeJob(runnable); + } + + private void saveData(CollectRep.MetricsData metricsData) { + String key = String.valueOf(metricsData.getId()); + String hashKey = metricsData.getMetrics(); + if (metricsData.getValuesList().isEmpty()) { + log.info("[warehouse] redis flush metrics data {}:{} is null, ignore.", key, hashKey); + return; + } + RedisAsyncCommands commands = connection.async(); + commands.hset(key, hashKey, metricsData).thenAccept(response -> { + if (response) { + log.debug("[warehouse] redis add new data {}:{}.", key, hashKey); + } else { + log.debug("[warehouse] redis replace data {}:{}.", key, hashKey); + } + }); + } + + private void initRedisClient(WarehouseProperties properties) { + if (properties == null || properties.getStore() == null || properties.getStore().getRedis() == null) { + log.error("init error, please config Warehouse redis props in application.yml"); + throw new IllegalArgumentException("please config Warehouse redis props"); + } + WarehouseProperties.StoreProperties.RedisProperties redisProp = properties.getStore().getRedis(); + RedisURI.Builder uriBuilder = RedisURI.builder() + .withHost(redisProp.getHost()) + .withPort(redisProp.getPort()) + .withTimeout(Duration.of(10, ChronoUnit.SECONDS)); + if (redisProp.getPassword() != null && !"".equals(redisProp.getPassword())) { + uriBuilder.withPassword(redisProp.getPassword().toCharArray()); + } + redisClient = RedisClient.create(uriBuilder.build()); + connection = redisClient.connect(new MetricsDataRedisCodec()); + } + + @Override + public void destroy() throws Exception { + if (connection != null) { + connection.close(); + } + if (redisClient != null) { + redisClient.shutdown(); + } + } +} diff --git a/warehouse/src/main/resources/META-INF/spring.factories b/warehouse/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000..d5af525 --- /dev/null +++ b/warehouse/src/main/resources/META-INF/spring.factories @@ -0,0 +1,7 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ +com.usthe.warehouse.WarehouseProperties,\ +com.usthe.warehouse.MetricsDataQueue,\ +com.usthe.warehouse.WarehouseWorkerPool,\ +com.usthe.warehouse.entrance.KafkaDataConsume,\ +com.usthe.warehouse.store.InfluxdbDataStorage,\ +com.usthe.warehouse.store.RedisDataStorage \ No newline at end of file