diff --git a/collector/pom.xml b/collector/pom.xml
index a22b4a6..5b1eacb 100644
--- a/collector/pom.xml
+++ b/collector/pom.xml
@@ -31,7 +31,7 @@
com.usthe.tancloud
common
- 1.0-SNAPSHOT
+ 1.0
@@ -84,6 +84,13 @@
aviator
5.2.7
+
+
+
+ mysql
+ mysql-connector-java
+ 8.0.27
+
\ No newline at end of file
diff --git a/collector/src/main/java/com/usthe/collector/collect/database/JdbcCommonCollect.java b/collector/src/main/java/com/usthe/collector/collect/database/JdbcCommonCollect.java
index d9883ad..8f5e6f7 100644
--- a/collector/src/main/java/com/usthe/collector/collect/database/JdbcCommonCollect.java
+++ b/collector/src/main/java/com/usthe/collector/collect/database/JdbcCommonCollect.java
@@ -1,8 +1,14 @@
package com.usthe.collector.collect.database;
+import com.usthe.collector.collect.AbstractCollect;
import com.usthe.collector.common.cache.CacheIdentifier;
import com.usthe.collector.common.cache.CommonCache;
import com.usthe.collector.common.cache.support.CommonJdbcConnect;
+import com.usthe.collector.util.CollectorConstants;
+import com.usthe.common.entity.job.Metrics;
+import com.usthe.common.entity.job.protocol.JdbcProtocol;
+import com.usthe.common.entity.message.CollectRep;
+import com.usthe.common.util.CommonConstants;
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
@@ -11,8 +17,8 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
/**
@@ -21,10 +27,61 @@ import java.util.Optional;
* @date 2021/9/1 21:37
*/
@Slf4j
-public class JdbcCommonCollect {
+public class JdbcCommonCollect extends AbstractCollect {
+
+ private static final String QUERY_TYPE_ONE_ROW = "oneRow";
+ private static final String QUERY_TYPE_MULTI_ROW = "multiRow";
+ private static final String QUERY_TYPE_COLUMNS = "columns";
+
+ private JdbcCommonCollect(){}
+
+ public static JdbcCommonCollect getInstance() {
+ return Singleton.INSTANCE;
+ }
+
+ @Override
+ public void collect(CollectRep.MetricsData.Builder builder, long appId, String app, Metrics metrics) {
+ long startTime = System.currentTimeMillis();
+ // 简单校验必有参数
+ if (metrics == null || metrics.getJdbc() == null) {
+ builder.setCode(CollectRep.Code.FAIL);
+ builder.setMsg("DATABASE collect must has jdbc params");
+ return;
+ }
+ JdbcProtocol jdbcProtocol = metrics.getJdbc();
+ String databaseUrl = constructDatabaseUrl(jdbcProtocol);
+ try {
+ Statement statement = getConnection(jdbcProtocol.getUsername(),
+ jdbcProtocol.getPassword(), databaseUrl);
+ switch (jdbcProtocol.getQueryType()) {
+ case QUERY_TYPE_ONE_ROW:
+ queryOneRow(statement, jdbcProtocol.getSql(), metrics.getAliasFields(), builder, startTime);
+ break;
+ case QUERY_TYPE_MULTI_ROW:
+ queryMultiRow(statement, jdbcProtocol.getSql(), metrics.getAliasFields(), builder, startTime);
+ break;
+ case QUERY_TYPE_COLUMNS:
+ queryOneRowByMatchTwoColumns(statement, jdbcProtocol.getSql(), metrics.getAliasFields(), builder, startTime);
+ break;
+ default:
+ builder.setCode(CollectRep.Code.FAIL);
+ builder.setMsg("Not support database query type: " + jdbcProtocol.getQueryType());
+ break;
+ }
+ } catch (SQLException sqlException) {
+ log.error("Jdbc sql error: {}, code: {}.", sqlException.getMessage(),
+ sqlException.getErrorCode(), sqlException);
+ builder.setCode(CollectRep.Code.FAIL);
+ builder.setMsg("Query Error: " + sqlException.getMessage() + " Code: " + sqlException.getErrorCode());
+ } catch (Exception e) {
+ log.error("Jdbc error: {}.", e.getMessage(), e);
+ builder.setCode(CollectRep.Code.FAIL);
+ builder.setMsg("Query Error: " + e.getMessage());
+ }
+ }
- private Statement getConnection(String username, String password, String url) {
+ private Statement getConnection(String username, String password, String url) throws Exception {
CacheIdentifier identifier = CacheIdentifier.builder()
.ip(url)
.username(username).password(password).build();
@@ -56,20 +113,14 @@ public class JdbcCommonCollect {
return statement;
}
// 复用失败则新建连接
- try {
- Connection connection = DriverManager.getConnection(url, username, password);
- statement = connection.createStatement();
- // 设置查询超时时间10秒
- statement.setQueryTimeout(10);
- // 设置查询最大行数1000行
- statement.setMaxRows(1000);
- CommonJdbcConnect jdbcConnect = new CommonJdbcConnect(connection);
- CommonCache.getInstance().addCache(identifier, jdbcConnect, 10000L);
- } catch (SQLException sqlException) {
- log.error("Jdbc sql error: {}, code: {}.", sqlException.getMessage(), sqlException.getErrorCode(), sqlException);
- } catch (Exception e) {
- log.error("Jdbc error: {}.", e.getMessage(), e);
- }
+ Connection connection = DriverManager.getConnection(url, username, password);
+ statement = connection.createStatement();
+ // 设置查询超时时间10秒
+ statement.setQueryTimeout(10);
+ // 设置查询最大行数1000行
+ statement.setMaxRows(1000);
+ CommonJdbcConnect jdbcConnect = new CommonJdbcConnect(connection);
+ CommonCache.getInstance().addCache(identifier, jdbcConnect, 10000L);
return statement;
}
@@ -83,30 +134,28 @@ public class JdbcCommonCollect {
* @param columns 查询的列头(一般是数据库表字段,也可能包含特殊字段,eg: responseTime)
* @throws Exception when error happen
*/
- private List> queryOneRow(Statement statement, String sql, List columns) throws Exception {
- long startTime = System.currentTimeMillis();
+ private void queryOneRow(Statement statement, String sql, List columns,
+ CollectRep.MetricsData.Builder builder, long startTime) throws Exception {
statement.setMaxRows(1);
ResultSet resultSet = statement.executeQuery(sql);
- List> rowList = new LinkedList<>();
try {
if (resultSet.next()) {
- LinkedList rows = new LinkedList<>();
+ CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
for (String column : columns) {
- if ("responseTime".equals(column)) {
+ if (CollectorConstants.RESPONSE_TIME.equals(column)) {
long time = System.currentTimeMillis() - startTime;
- rows.add(String.valueOf(time));
+ valueRowBuilder.addColumns(String.valueOf(time));
} else {
String value = resultSet.getString(column);
- value = value == null ? "" : value;
- rows.add(value);
+ value = value == null ? CommonConstants.NULL_VALUE : value;
+ valueRowBuilder.addColumns(value);
}
}
- rowList.add(rows);
+ builder.addValues(valueRowBuilder.build());
}
} finally {
resultSet.close();
}
- return rowList;
}
/**
@@ -120,10 +169,9 @@ public class JdbcCommonCollect {
* @param columns 查询的列头(一般是数据库表字段,也可能包含特殊字段,eg: responseTime)
* @throws Exception when error happen
*/
- private List> queryOneRowByMatchTwoColumns(Statement statement, String sql, List columns) throws Exception {
- long startTime = System.currentTimeMillis();
+ private void queryOneRowByMatchTwoColumns(Statement statement, String sql, List columns,
+ CollectRep.MetricsData.Builder builder, long startTime) throws Exception {
ResultSet resultSet = statement.executeQuery(sql);
- List> rowList = new LinkedList<>();
try {
HashMap values = new HashMap<>(columns.size());
while (resultSet.next()) {
@@ -131,22 +179,21 @@ public class JdbcCommonCollect {
values.put(resultSet.getString(1).toLowerCase(), resultSet.getString(2));
}
}
- LinkedList rows = new LinkedList<>();
+ CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
for (String column : columns) {
- if ("responseTime".equals(column)) {
+ if (CollectorConstants.RESPONSE_TIME.equals(column)) {
long time = System.currentTimeMillis() - startTime;
- rows.add(String.valueOf(time));
+ valueRowBuilder.addColumns(String.valueOf(time));
} else {
String value = values.get(column.toLowerCase());
- value = value == null ? "" : value;
- rows.add(value);
+ value = value == null ? CommonConstants.NULL_VALUE : value;
+ valueRowBuilder.addColumns(value);
}
}
- rowList.add(rows);
+ builder.addValues(valueRowBuilder.build());
} finally {
resultSet.close();
}
- return rowList;
}
/**
@@ -159,30 +206,56 @@ public class JdbcCommonCollect {
* @param columns 查询的列头(一般是数据库表字段,也可能包含特殊字段,eg: responseTime)
* @throws Exception when error happen
*/
- private List> queryMultiRow(Statement statement, String sql, List columns) throws Exception {
- long startTime = System.currentTimeMillis();
+ private void queryMultiRow(Statement statement, String sql, List columns,
+ CollectRep.MetricsData.Builder builder, long startTime) throws Exception {
ResultSet resultSet = statement.executeQuery(sql);
- List> rowList = new LinkedList<>();
try {
while (resultSet.next()) {
- LinkedList rows = new LinkedList<>();
+ CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
for (String column : columns) {
- if ("responseTime".equals(column)) {
+ if (CollectorConstants.RESPONSE_TIME.equals(column)) {
long time = System.currentTimeMillis() - startTime;
- rows.add(String.valueOf(time));
+ valueRowBuilder.addColumns(String.valueOf(time));
} else {
String value = resultSet.getString(column);
- value = value == null ? "" : value;
- rows.add(value);
+ value = value == null ? CommonConstants.NULL_VALUE : value;
+ valueRowBuilder.addColumns(value);
}
}
- rowList.add(rows);
+ builder.addValues(valueRowBuilder.build());
}
} finally {
resultSet.close();
}
- return rowList;
}
+ /**
+ * 根据jdbc入参构造数据库URL
+ * @param jdbcProtocol jdbc
+ * @return URL
+ */
+ private String constructDatabaseUrl(JdbcProtocol jdbcProtocol) {
+ if (Objects.nonNull(jdbcProtocol.getUrl())
+ && !Objects.equals("", jdbcProtocol.getUrl())
+ && jdbcProtocol.getUrl().startsWith("jdbc")) {
+ // 入参数URL有效 则优先级最高返回
+ return jdbcProtocol.getUrl();
+ }
+ String url;
+ switch (jdbcProtocol.getPlatform()) {
+ case "mysql":
+ url = "jdbc:mysql://" + jdbcProtocol.getHost() + ":" + jdbcProtocol.getPort()
+ + "/" + (jdbcProtocol.getDatabase() == null ? "" : jdbcProtocol.getDatabase())
+ + "?useUnicode=true&characterEncoding=utf-8&useSSL=false";
+ break;
+ default:
+ throw new IllegalArgumentException("Not support database platform: " + jdbcProtocol.getPlatform());
+ }
+ return url;
+ }
+
+ private static class Singleton {
+ private static final JdbcCommonCollect INSTANCE = new JdbcCommonCollect();
+ }
}
diff --git a/collector/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java b/collector/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java
index dc130eb..ab19586 100644
--- a/collector/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java
+++ b/collector/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java
@@ -3,6 +3,7 @@ package com.usthe.collector.dispatch;
import com.googlecode.aviator.AviatorEvaluator;
import com.googlecode.aviator.Expression;
import com.usthe.collector.collect.AbstractCollect;
+import com.usthe.collector.collect.database.JdbcCommonCollect;
import com.usthe.collector.collect.http.HttpCollectImpl;
import com.usthe.collector.collect.icmp.IcmpCollectImpl;
import com.usthe.collector.collect.telnet.TelnetCollectImpl;
@@ -107,6 +108,9 @@ public class MetricsCollect implements Runnable, Comparable {
case DispatchConstants.PROTOCOL_TELNET:
abstractCollect = TelnetCollectImpl.getInstance();
break;
+ case DispatchConstants.PROTOCOL_JDBC:
+ abstractCollect = JdbcCommonCollect.getInstance();
+ break;
// todo
default: break;
}
diff --git a/collector/src/main/java/com/usthe/collector/util/JsonPathParser.java b/collector/src/main/java/com/usthe/collector/util/JsonPathParser.java
index 88e78b7..25ef9bb 100644
--- a/collector/src/main/java/com/usthe/collector/util/JsonPathParser.java
+++ b/collector/src/main/java/com/usthe/collector/util/JsonPathParser.java
@@ -32,7 +32,7 @@ public class JsonPathParser {
* 使用jsonPath来解析json内容
* @param content json内容
* @param jsonPath jsonPath脚本
- * @return 解析后的内容
+ * @return 解析后的内容 [{'name': 'tom', 'speed': '433'},{'name': 'lili', 'speed': '543'}]
*/
public static List