diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/WorkerPool.java b/collector/server/src/main/java/com/usthe/collector/dispatch/WorkerPool.java index 39d4898..24f10a2 100644 --- a/collector/server/src/main/java/com/usthe/collector/dispatch/WorkerPool.java +++ b/collector/server/src/main/java/com/usthe/collector/dispatch/WorkerPool.java @@ -2,6 +2,7 @@ package com.usthe.collector.dispatch; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.DisposableBean; import org.springframework.stereotype.Component; import java.util.concurrent.RejectedExecutionException; @@ -17,7 +18,7 @@ import java.util.concurrent.TimeUnit; */ @Component @Slf4j -public class WorkerPool { +public class WorkerPool implements DisposableBean { private ThreadPoolExecutor workerExecutor; @@ -51,4 +52,11 @@ public class WorkerPool { public void executeJob(Runnable runnable) throws RejectedExecutionException { workerExecutor.execute(runnable); } + + @Override + public void destroy() throws Exception { + if (workerExecutor != null) { + workerExecutor.shutdownNow(); + } + } } diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/export/KafkaDataExporter.java b/collector/server/src/main/java/com/usthe/collector/dispatch/export/KafkaDataExporter.java index 5bb8f8d..ff7ba61 100644 --- a/collector/server/src/main/java/com/usthe/collector/dispatch/export/KafkaDataExporter.java +++ b/collector/server/src/main/java/com/usthe/collector/dispatch/export/KafkaDataExporter.java @@ -7,7 +7,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.LongSerializer; -import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.DisposableBean; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Configuration; @@ -24,7 +24,7 @@ import java.util.Properties; name = "enabled", havingValue = "true", matchIfMissing = true) @AutoConfigureAfter(value = {DispatchProperties.class}) @Slf4j -public class KafkaDataExporter { +public class KafkaDataExporter implements DisposableBean { KafkaProducer kafkaProducer; DispatchProperties.ExportProperties.KafkaProperties kafkaProperties; @@ -52,4 +52,11 @@ public class KafkaDataExporter { log.error("kafkaProducer is not enable"); } } + + @Override + public void destroy() throws Exception { + if (kafkaProducer != null) { + kafkaProducer.close(); + } + } }