diff --git a/collector/pom.xml b/collector/pom.xml
index 7ffd674..70deccc 100644
--- a/collector/pom.xml
+++ b/collector/pom.xml
@@ -97,6 +97,12 @@
postgresql
42.3.3
+
+
+ org.apache.sshd
+ sshd-core
+ 2.8.0
+
\ No newline at end of file
diff --git a/collector/src/main/java/com/usthe/collector/collect/common/ssh/CommonSshClient.java b/collector/src/main/java/com/usthe/collector/collect/common/ssh/CommonSshClient.java
new file mode 100644
index 0000000..509bbbc
--- /dev/null
+++ b/collector/src/main/java/com/usthe/collector/collect/common/ssh/CommonSshClient.java
@@ -0,0 +1,25 @@
+package com.usthe.collector.collect.common.ssh;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.sshd.client.SshClient;
+
+/**
+ * ssh公共client
+ * @author tom
+ * @date 2022/3/11 15:58
+ */
+@Slf4j
+public class CommonSshClient {
+
+ private static SshClient sshClient;
+
+
+ static {
+ sshClient = SshClient.setUpDefaultClient();
+ sshClient.start();
+ }
+
+ public static SshClient getSshClient() {
+ return sshClient;
+ }
+}
diff --git a/collector/src/main/java/com/usthe/collector/collect/ssh/SshCollectImpl.java b/collector/src/main/java/com/usthe/collector/collect/ssh/SshCollectImpl.java
new file mode 100644
index 0000000..602d27d
--- /dev/null
+++ b/collector/src/main/java/com/usthe/collector/collect/ssh/SshCollectImpl.java
@@ -0,0 +1,194 @@
+package com.usthe.collector.collect.ssh;
+
+import com.usthe.collector.collect.AbstractCollect;
+import com.usthe.collector.collect.common.cache.CacheIdentifier;
+import com.usthe.collector.collect.common.cache.CommonCache;
+import com.usthe.collector.collect.common.ssh.CommonSshClient;
+import com.usthe.collector.util.CollectorConstants;
+import com.usthe.common.entity.job.Metrics;
+import com.usthe.common.entity.job.protocol.SshProtocol;
+import com.usthe.common.entity.message.CollectRep;
+import com.usthe.common.util.CommonConstants;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.sshd.client.SshClient;
+import org.apache.sshd.client.channel.ClientChannel;
+import org.apache.sshd.client.channel.ClientChannelEvent;
+import org.apache.sshd.client.session.ClientSession;
+import org.springframework.util.StringUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * ssh协议采集实现
+ * @author tom
+ * @date 2022/03/11 15:10
+ */
+@Slf4j
+public class SshCollectImpl extends AbstractCollect {
+
+ private static final String PARSE_TYPE_ONE_ROW = "oneRow";
+ private static final String PARSE_TYPE_MULTI_ROW = "multiRow";
+
+ private SshCollectImpl(){}
+
+ public static SshCollectImpl getInstance() {
+ return SshCollectImpl.Singleton.INSTANCE;
+ }
+
+
+ @Override
+ public void collect(CollectRep.MetricsData.Builder builder, long appId, String app, Metrics metrics) {
+ long startTime = System.currentTimeMillis();
+ // 校验参数
+ try {
+ validateParams(metrics);
+ } catch (Exception e) {
+ builder.setCode(CollectRep.Code.FAIL);
+ builder.setMsg(e.getMessage());
+ return;
+ }
+ SshProtocol sshProtocol = metrics.getSsh();
+ // 超时时间默认300毫秒
+ int timeout = 300;
+ try {
+ timeout = Integer.parseInt(sshProtocol.getTimeout());
+ } catch (Exception e) {
+ log.warn(e.getMessage());
+ }
+ try {
+ ClientSession clientSession = getConnectSession(sshProtocol);
+ ClientChannel channel = clientSession.createExecChannel(sshProtocol.getScript());
+ ByteArrayOutputStream response = new ByteArrayOutputStream();
+ channel.setOut(response);
+ if (!channel.open().verify(Integer.parseInt(sshProtocol.getTimeout())).isOpened()) {
+ throw new Exception("open failed");
+ }
+ List list = new ArrayList<>();
+ list.add(ClientChannelEvent.CLOSED);
+ channel.waitFor(list, Integer.parseInt(sshProtocol.getTimeout()));
+ Long responseTime = System.currentTimeMillis() - startTime;
+ channel.close();
+ String result = response.toString();
+ if (!StringUtils.hasText(result)) {
+ builder.setCode(CollectRep.Code.FAIL);
+ builder.setMsg("采集数据失败");
+ }
+ switch (sshProtocol.getParseType()) {
+ case PARSE_TYPE_ONE_ROW:
+ parseResponseDataByOne(result, metrics.getAliasFields(), builder, responseTime);
+ break;
+ default: parseResponseDataByMulti(result, metrics.getAliasFields(), builder, responseTime);
+ break;
+ }
+ } catch (ConnectException connectException) {
+ log.debug(connectException.getMessage());
+ builder.setCode(CollectRep.Code.UN_CONNECTABLE);
+ builder.setMsg("对端拒绝连接:服务未启动端口监听或防火墙");
+ } catch (IOException ioException) {
+ log.debug(ioException.getMessage());
+ builder.setCode(CollectRep.Code.UN_CONNECTABLE);
+ builder.setMsg("对端连接失败 " + ioException.getMessage());
+ } catch (Exception exception) {
+ log.debug(exception.getMessage());
+ builder.setCode(CollectRep.Code.FAIL);
+ builder.setMsg(exception.getMessage());
+ }
+ }
+
+ private void parseResponseDataByOne(String result, List aliasFields, CollectRep.MetricsData.Builder builder, Long responseTime) {
+ String[] lines = result.split("\n");
+ if (lines.length + 1 < aliasFields.size()) {
+ log.error("ssh response data not enough: {}", result);
+ }
+ CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
+ for (String value : lines) {
+ valueRowBuilder.addColumns(value);
+ }
+ builder.addValues(valueRowBuilder.build());
+ }
+
+ private void parseResponseDataByMulti(String result, List aliasFields,
+ CollectRep.MetricsData.Builder builder, Long responseTime) {
+ String[] lines = result.split("\n");
+ if (lines.length <= 1) {
+ log.error("ssh response data only has header: {}", result);
+ }
+ String[] fields = lines[0].split(" ");
+ Map fieldMapping = new HashMap<>(fields.length);
+ for (int i = 0; i < fields.length; i++) {
+ fieldMapping.put(fields[i].trim().toLowerCase(), i);
+ }
+ for (int i = 1; i < lines.length; i++) {
+ String[] values = lines[i].split(" ");
+ CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
+ for (String alias : aliasFields) {
+ if (CollectorConstants.RESPONSE_TIME.equalsIgnoreCase(alias)) {
+ valueRowBuilder.addColumns(responseTime.toString());
+ } else {
+ Integer index = fieldMapping.get(alias.toLowerCase());
+ if (index != null && index < values.length) {
+ valueRowBuilder.addColumns(values[index]);
+ } else {
+ valueRowBuilder.addColumns(CommonConstants.NULL_VALUE);
+ }
+ }
+ }
+ builder.addValues(valueRowBuilder.build());
+ }
+ }
+
+ private ClientSession getConnectSession(SshProtocol sshProtocol) throws IOException {
+ CacheIdentifier identifier = CacheIdentifier.builder()
+ .ip(sshProtocol.getHost()).port(sshProtocol.getPort())
+ .username(sshProtocol.getUsername()).password(sshProtocol.getPassword())
+ .build();
+ Optional