From 25d692cf374ec5d28aa5f23570624149666e8b51 Mon Sep 17 00:00:00 2001 From: tomsun28 Date: Fri, 11 Mar 2022 23:57:55 +0800 Subject: [PATCH] =?UTF-8?q?[collector,manager]=E6=94=AF=E6=8C=81Linux?= =?UTF-8?q?=E6=93=8D=E4=BD=9C=E7=B3=BB=E7=BB=9F=E7=9B=91=E6=8E=A7=E7=B1=BB?= =?UTF-8?q?=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- collector/pom.xml | 6 + .../collect/common/ssh/CommonSshClient.java | 25 +++ .../collector/collect/ssh/SshCollectImpl.java | 194 ++++++++++++++++++ .../collect/telnet/TelnetCollectImpl.java | 2 +- .../collector/dispatch/DispatchConstants.java | 4 + .../collector/dispatch/MetricsCollect.java | 4 + .../com/usthe/common/entity/job/Metrics.java | 5 + .../entity/job/protocol/SshProtocol.java | 58 ++++++ .../src/main/resources/define/app/linux.yml | 76 +++++++ .../src/main/resources/define/param/linux.yml | 22 ++ 10 files changed, 395 insertions(+), 1 deletion(-) create mode 100644 collector/src/main/java/com/usthe/collector/collect/common/ssh/CommonSshClient.java create mode 100644 collector/src/main/java/com/usthe/collector/collect/ssh/SshCollectImpl.java create mode 100644 common/src/main/java/com/usthe/common/entity/job/protocol/SshProtocol.java create mode 100644 manager/src/main/resources/define/app/linux.yml create mode 100644 manager/src/main/resources/define/param/linux.yml diff --git a/collector/pom.xml b/collector/pom.xml index 7ffd674..70deccc 100644 --- a/collector/pom.xml +++ b/collector/pom.xml @@ -97,6 +97,12 @@ postgresql 42.3.3 + + + org.apache.sshd + sshd-core + 2.8.0 + \ No newline at end of file diff --git a/collector/src/main/java/com/usthe/collector/collect/common/ssh/CommonSshClient.java b/collector/src/main/java/com/usthe/collector/collect/common/ssh/CommonSshClient.java new file mode 100644 index 0000000..509bbbc --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/common/ssh/CommonSshClient.java @@ -0,0 +1,25 @@ +package com.usthe.collector.collect.common.ssh; + +import lombok.extern.slf4j.Slf4j; +import org.apache.sshd.client.SshClient; + +/** + * ssh公共client + * @author tom + * @date 2022/3/11 15:58 + */ +@Slf4j +public class CommonSshClient { + + private static SshClient sshClient; + + + static { + sshClient = SshClient.setUpDefaultClient(); + sshClient.start(); + } + + public static SshClient getSshClient() { + return sshClient; + } +} diff --git a/collector/src/main/java/com/usthe/collector/collect/ssh/SshCollectImpl.java b/collector/src/main/java/com/usthe/collector/collect/ssh/SshCollectImpl.java new file mode 100644 index 0000000..602d27d --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/ssh/SshCollectImpl.java @@ -0,0 +1,194 @@ +package com.usthe.collector.collect.ssh; + +import com.usthe.collector.collect.AbstractCollect; +import com.usthe.collector.collect.common.cache.CacheIdentifier; +import com.usthe.collector.collect.common.cache.CommonCache; +import com.usthe.collector.collect.common.ssh.CommonSshClient; +import com.usthe.collector.util.CollectorConstants; +import com.usthe.common.entity.job.Metrics; +import com.usthe.common.entity.job.protocol.SshProtocol; +import com.usthe.common.entity.message.CollectRep; +import com.usthe.common.util.CommonConstants; +import lombok.extern.slf4j.Slf4j; +import org.apache.sshd.client.SshClient; +import org.apache.sshd.client.channel.ClientChannel; +import org.apache.sshd.client.channel.ClientChannelEvent; +import org.apache.sshd.client.session.ClientSession; +import org.springframework.util.StringUtils; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.ConnectException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +/** + * ssh协议采集实现 + * @author tom + * @date 2022/03/11 15:10 + */ +@Slf4j +public class SshCollectImpl extends AbstractCollect { + + private static final String PARSE_TYPE_ONE_ROW = "oneRow"; + private static final String PARSE_TYPE_MULTI_ROW = "multiRow"; + + private SshCollectImpl(){} + + public static SshCollectImpl getInstance() { + return SshCollectImpl.Singleton.INSTANCE; + } + + + @Override + public void collect(CollectRep.MetricsData.Builder builder, long appId, String app, Metrics metrics) { + long startTime = System.currentTimeMillis(); + // 校验参数 + try { + validateParams(metrics); + } catch (Exception e) { + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg(e.getMessage()); + return; + } + SshProtocol sshProtocol = metrics.getSsh(); + // 超时时间默认300毫秒 + int timeout = 300; + try { + timeout = Integer.parseInt(sshProtocol.getTimeout()); + } catch (Exception e) { + log.warn(e.getMessage()); + } + try { + ClientSession clientSession = getConnectSession(sshProtocol); + ClientChannel channel = clientSession.createExecChannel(sshProtocol.getScript()); + ByteArrayOutputStream response = new ByteArrayOutputStream(); + channel.setOut(response); + if (!channel.open().verify(Integer.parseInt(sshProtocol.getTimeout())).isOpened()) { + throw new Exception("open failed"); + } + List list = new ArrayList<>(); + list.add(ClientChannelEvent.CLOSED); + channel.waitFor(list, Integer.parseInt(sshProtocol.getTimeout())); + Long responseTime = System.currentTimeMillis() - startTime; + channel.close(); + String result = response.toString(); + if (!StringUtils.hasText(result)) { + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg("采集数据失败"); + } + switch (sshProtocol.getParseType()) { + case PARSE_TYPE_ONE_ROW: + parseResponseDataByOne(result, metrics.getAliasFields(), builder, responseTime); + break; + default: parseResponseDataByMulti(result, metrics.getAliasFields(), builder, responseTime); + break; + } + } catch (ConnectException connectException) { + log.debug(connectException.getMessage()); + builder.setCode(CollectRep.Code.UN_CONNECTABLE); + builder.setMsg("对端拒绝连接:服务未启动端口监听或防火墙"); + } catch (IOException ioException) { + log.debug(ioException.getMessage()); + builder.setCode(CollectRep.Code.UN_CONNECTABLE); + builder.setMsg("对端连接失败 " + ioException.getMessage()); + } catch (Exception exception) { + log.debug(exception.getMessage()); + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg(exception.getMessage()); + } + } + + private void parseResponseDataByOne(String result, List aliasFields, CollectRep.MetricsData.Builder builder, Long responseTime) { + String[] lines = result.split("\n"); + if (lines.length + 1 < aliasFields.size()) { + log.error("ssh response data not enough: {}", result); + } + CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); + for (String value : lines) { + valueRowBuilder.addColumns(value); + } + builder.addValues(valueRowBuilder.build()); + } + + private void parseResponseDataByMulti(String result, List aliasFields, + CollectRep.MetricsData.Builder builder, Long responseTime) { + String[] lines = result.split("\n"); + if (lines.length <= 1) { + log.error("ssh response data only has header: {}", result); + } + String[] fields = lines[0].split(" "); + Map fieldMapping = new HashMap<>(fields.length); + for (int i = 0; i < fields.length; i++) { + fieldMapping.put(fields[i].trim().toLowerCase(), i); + } + for (int i = 1; i < lines.length; i++) { + String[] values = lines[i].split(" "); + CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); + for (String alias : aliasFields) { + if (CollectorConstants.RESPONSE_TIME.equalsIgnoreCase(alias)) { + valueRowBuilder.addColumns(responseTime.toString()); + } else { + Integer index = fieldMapping.get(alias.toLowerCase()); + if (index != null && index < values.length) { + valueRowBuilder.addColumns(values[index]); + } else { + valueRowBuilder.addColumns(CommonConstants.NULL_VALUE); + } + } + } + builder.addValues(valueRowBuilder.build()); + } + } + + private ClientSession getConnectSession(SshProtocol sshProtocol) throws IOException { + CacheIdentifier identifier = CacheIdentifier.builder() + .ip(sshProtocol.getHost()).port(sshProtocol.getPort()) + .username(sshProtocol.getUsername()).password(sshProtocol.getPassword()) + .build(); + Optional cacheOption = CommonCache.getInstance().getCache(identifier, true); + ClientSession clientSession = null; + if (cacheOption.isPresent()) { + clientSession = (ClientSession) cacheOption.get(); + try { + if (clientSession.isClosed() || clientSession.isClosing()) { + clientSession = null; + CommonCache.getInstance().removeCache(identifier); + } + } catch (Exception e) { + log.warn(e.getMessage()); + clientSession = null; + CommonCache.getInstance().removeCache(identifier); + } + } + if (clientSession != null) { + return clientSession; + } + SshClient sshClient = CommonSshClient.getSshClient(); + clientSession = sshClient.connect(sshProtocol.getUsername(), sshProtocol.getHost(), Integer.parseInt(sshProtocol.getPort())) + .verify(Long.parseLong(sshProtocol.getTimeout()), TimeUnit.MILLISECONDS).getSession(); + if (StringUtils.hasText(sshProtocol.getPassword())) { + clientSession.addPasswordIdentity(sshProtocol.getPassword()); + } + // 进行认证 + if (!clientSession.auth().verify(Long.parseLong(sshProtocol.getTimeout()), TimeUnit.MILLISECONDS).isSuccess()) { + throw new IllegalArgumentException("认证失败"); + } + CommonCache.getInstance().addCache(identifier, clientSession, 10000L); + return clientSession; + } + + private void validateParams(Metrics metrics) throws Exception { + if (metrics == null || metrics.getSsh() == null) { + throw new Exception("Ssh collect must has ssh params"); + } + } + + private static class Singleton { + private static final SshCollectImpl INSTANCE = new SshCollectImpl(); + } +} diff --git a/collector/src/main/java/com/usthe/collector/collect/telnet/TelnetCollectImpl.java b/collector/src/main/java/com/usthe/collector/collect/telnet/TelnetCollectImpl.java index a3ac6dc..940f734 100644 --- a/collector/src/main/java/com/usthe/collector/collect/telnet/TelnetCollectImpl.java +++ b/collector/src/main/java/com/usthe/collector/collect/telnet/TelnetCollectImpl.java @@ -13,7 +13,7 @@ import java.io.IOException; import java.net.ConnectException; /** - * icmp协议采集实现 - ping + * telnet协议采集实现 * @author tom * @date 2021/12/4 12:32 */ diff --git a/collector/src/main/java/com/usthe/collector/dispatch/DispatchConstants.java b/collector/src/main/java/com/usthe/collector/dispatch/DispatchConstants.java index cc470c0..8fa7a4f 100644 --- a/collector/src/main/java/com/usthe/collector/dispatch/DispatchConstants.java +++ b/collector/src/main/java/com/usthe/collector/dispatch/DispatchConstants.java @@ -24,6 +24,10 @@ public interface DispatchConstants { * 协议 jdbc */ String PROTOCOL_JDBC = "jdbc"; + /** + * 协议 ssh + */ + String PROTOCOL_SSH = "ssh"; // 协议类型相关 - end // // http协议相关 - start 需尽可能先复用 HttpHeaders // diff --git a/collector/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java b/collector/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java index 7209535..54c1a31 100644 --- a/collector/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java +++ b/collector/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java @@ -6,6 +6,7 @@ import com.usthe.collector.collect.AbstractCollect; import com.usthe.collector.collect.database.JdbcCommonCollect; import com.usthe.collector.collect.http.HttpCollectImpl; import com.usthe.collector.collect.icmp.IcmpCollectImpl; +import com.usthe.collector.collect.ssh.SshCollectImpl; import com.usthe.collector.collect.telnet.TelnetCollectImpl; import com.usthe.collector.dispatch.timer.Timeout; import com.usthe.collector.dispatch.timer.WheelTimerTask; @@ -111,6 +112,9 @@ public class MetricsCollect implements Runnable, Comparable { case DispatchConstants.PROTOCOL_JDBC: abstractCollect = JdbcCommonCollect.getInstance(); break; + case DispatchConstants.PROTOCOL_SSH: + abstractCollect = SshCollectImpl.getInstance(); + break; // todo default: break; } diff --git a/common/src/main/java/com/usthe/common/entity/job/Metrics.java b/common/src/main/java/com/usthe/common/entity/job/Metrics.java index c329588..b7a4396 100644 --- a/common/src/main/java/com/usthe/common/entity/job/Metrics.java +++ b/common/src/main/java/com/usthe/common/entity/job/Metrics.java @@ -3,6 +3,7 @@ package com.usthe.common.entity.job; import com.usthe.common.entity.job.protocol.HttpProtocol; import com.usthe.common.entity.job.protocol.IcmpProtocol; import com.usthe.common.entity.job.protocol.JdbcProtocol; +import com.usthe.common.entity.job.protocol.SshProtocol; import com.usthe.common.entity.job.protocol.TcpUdpProtocol; import com.usthe.common.entity.job.protocol.TelnetProtocol; import lombok.AllArgsConstructor; @@ -73,6 +74,10 @@ public class Metrics { * 使用公共的jdbc规范实现的数据库配置信息 */ private JdbcProtocol jdbc; + /** + * 使用公共的ssh协议的监控配置信息 + */ + private SshProtocol ssh; @Override public boolean equals(Object o) { diff --git a/common/src/main/java/com/usthe/common/entity/job/protocol/SshProtocol.java b/common/src/main/java/com/usthe/common/entity/job/protocol/SshProtocol.java new file mode 100644 index 0000000..9c509f3 --- /dev/null +++ b/common/src/main/java/com/usthe/common/entity/job/protocol/SshProtocol.java @@ -0,0 +1,58 @@ +package com.usthe.common.entity.job.protocol; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * ssh 协议参数配置 + * @author tom + * @date 2022/3/11 15:20 + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class SshProtocol { + + /** + * 对端主机ip或域名 + */ + private String host; + + /** + * 对端主机端口 + */ + private String port; + + /** + * 超时时间 + */ + private String timeout = "3000"; + + /** + * 用户名 + */ + private String username; + + /** + * 密码(可选) + */ + private String password; + + /** + * 公钥(可选) + */ + private String publicKey; + + /** + * SSH执行脚本 + */ + private String script; + + /** + * 响应数据解析方式:oneRow, multiRow + */ + private String parseType; +} diff --git a/manager/src/main/resources/define/app/linux.yml b/manager/src/main/resources/define/app/linux.yml new file mode 100644 index 0000000..6979346 --- /dev/null +++ b/manager/src/main/resources/define/app/linux.yml @@ -0,0 +1,76 @@ +# 此监控类型所属类别:service-应用服务监控 db-数据库监控 custom-自定义监控 os-操作系统监控 +category: os +# 监控应用类型(与文件名保持一致) eg: linux windows tomcat mysql aws... +app: linux +name: + zh-CN: Linux操作系统 + en-US: OS Linux +# 参数映射map. type是参数类型: 0-number数字, 1-string明文字符串, 2-secret加密字符串 +# 强制固定必须参数 - host +configmap: + - key: host + type: 1 + - key: port + type: 0 + - key: username + type: 1 + - key: password + type: 2 +# 指标组列表 +metrics: +# 第一个监控指标组 basic +# 注意:内置监控指标有 (responseTime - 响应时间) + - name: basic + # 指标组调度优先级(0-127)越小优先级越高,优先级低的指标组会等优先级高的指标组采集完成后才会被调度,相同优先级的指标组会并行调度采集 + # 优先级为0的指标组为可用性指标组,即它会被首先调度,采集成功才会继续调度其它指标组,采集失败则中断调度 + priority: 0 + # 指标组中的具体监控指标 + fields: + # 指标信息 包括 field名称 type字段类型:0-number数字,1-string字符串 instance是否为实例主键 unit:指标单位 + - field: hostname + type: 1 + instance: true + - field: version + type: 1 + - field: uptime + type: 1 +# 监控采集使用协议 eg: sql, ssh, http, telnet, wmi, snmp, sdk + protocol: ssh +# 当protocol为http协议时具体的采集配置 + ssh: + # 主机host: ipv4 ipv6 域名 + host: ^_^host^_^ + # 端口 + port: ^_^port^_^ + username: ^_^username^_^ + password: ^_^password^_^ + script: (uname -r ; hostname ; uptime | awk -F "," '{print $1}' | sed "s/ //g") | sed ":a;N;s/\n/^/g;ta" | awk -F '^' 'BEGIN{print "version hostname uptime"} {print $1, $2, $3}' + # 响应数据解析方式:oneRow, multiRow + parseType: multiRow + + - name: cpu + priority: 1 + fields: + # 指标信息 包括 field名称 type字段类型:0-number数字,1-string字符串 instance是否为实例主键 unit:指标单位 + - field: info + type: 1 + - field: cores + type: 0 + - field: interrupt + type: 0 + - field: load + type: 1 + - field: context_switch + type: 0 + # 监控采集使用协议 eg: sql, ssh, http, telnet, wmi, snmp, sdk + protocol: ssh + # 当protocol为http协议时具体的采集配置 + ssh: + # 主机host: ipv4 ipv6 域名 + host: ^_^host^_^ + # 端口 + port: ^_^port^_^ + username: ^_^username^_^ + password: ^_^password^_^ + script: "LANG=C lscpu | awk -F: '/Model name/ {print $2}';awk '/processor/{core++} END{print core}' /proc/cpuinfo;uptime | sed 's/,/ /g' | awk '{for(i=NF-2;i<=NF;i++)print $i }' | xargs;vmstat 1 1 | awk 'NR==3{print $11}';vmstat 1 1 | awk 'NR==3{print $12}'" + parseType: oneRow \ No newline at end of file diff --git a/manager/src/main/resources/define/param/linux.yml b/manager/src/main/resources/define/param/linux.yml new file mode 100644 index 0000000..f0a255a --- /dev/null +++ b/manager/src/main/resources/define/param/linux.yml @@ -0,0 +1,22 @@ +app: linux +param: + - field: host + name: 主机Host + type: host + required: true + - field: port + name: 端口 + type: number + range: '[0,65535]' + required: true + defaultValue: 22 + placeholder: '请输入端口' + - field: username + name: 用户名 + type: text + limit: 20 + required: true + - field: password + name: 密码 + type: password + required: true \ No newline at end of file