From 1cac5251b011318783be9a303d688d4a4c0652c3 Mon Sep 17 00:00:00 2001 From: tomsun28 Date: Sat, 5 Feb 2022 15:57:24 +0800 Subject: [PATCH] =?UTF-8?q?[collector]http=20database=20=E9=87=87=E9=9B=86?= =?UTF-8?q?=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../collect/database/JdbcCommonCollect.java | 261 ++++++++++++++++++ 1 file changed, 261 insertions(+) create mode 100644 collector/src/main/java/com/usthe/collector/collect/database/JdbcCommonCollect.java diff --git a/collector/src/main/java/com/usthe/collector/collect/database/JdbcCommonCollect.java b/collector/src/main/java/com/usthe/collector/collect/database/JdbcCommonCollect.java new file mode 100644 index 0000000..25908fd --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/collect/database/JdbcCommonCollect.java @@ -0,0 +1,261 @@ +package com.usthe.collector.collect.database; + +import com.usthe.collector.collect.AbstractCollect; +import com.usthe.collector.common.cache.CacheIdentifier; +import com.usthe.collector.common.cache.CommonCache; +import com.usthe.collector.common.cache.support.CommonJdbcConnect; +import com.usthe.collector.util.CollectorConstants; +import com.usthe.common.entity.job.Metrics; +import com.usthe.common.entity.job.protocol.JdbcProtocol; +import com.usthe.common.entity.message.CollectRep; +import com.usthe.common.util.CommonConstants; +import lombok.extern.slf4j.Slf4j; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * 数据库JDBC通用查询 + * @author tomsun28 + * @date 2021/12/1 21:37 + */ +@Slf4j +public class JdbcCommonCollect extends AbstractCollect { + + private static final String QUERY_TYPE_ONE_ROW = "oneRow"; + private static final String QUERY_TYPE_MULTI_ROW = "multiRow"; + private static final String QUERY_TYPE_COLUMNS = "columns"; + + private JdbcCommonCollect(){} + + public static JdbcCommonCollect getInstance() { + return Singleton.INSTANCE; + } + + @Override + public void collect(CollectRep.MetricsData.Builder builder, long appId, String app, Metrics metrics) { + long startTime = System.currentTimeMillis(); + // 简单校验必有参数 + if (metrics == null || metrics.getJdbc() == null) { + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg("DATABASE collect must has jdbc params"); + return; + } + JdbcProtocol jdbcProtocol = metrics.getJdbc(); + String databaseUrl = constructDatabaseUrl(jdbcProtocol); + try { + Statement statement = getConnection(jdbcProtocol.getUsername(), + jdbcProtocol.getPassword(), databaseUrl); + switch (jdbcProtocol.getQueryType()) { + case QUERY_TYPE_ONE_ROW: + queryOneRow(statement, jdbcProtocol.getSql(), metrics.getAliasFields(), builder, startTime); + break; + case QUERY_TYPE_MULTI_ROW: + queryMultiRow(statement, jdbcProtocol.getSql(), metrics.getAliasFields(), builder, startTime); + break; + case QUERY_TYPE_COLUMNS: + queryOneRowByMatchTwoColumns(statement, jdbcProtocol.getSql(), metrics.getAliasFields(), builder, startTime); + break; + default: + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg("Not support database query type: " + jdbcProtocol.getQueryType()); + break; + } + } catch (SQLException sqlException) { + log.error("Jdbc sql error: {}, code: {}.", sqlException.getMessage(), + sqlException.getErrorCode(), sqlException); + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg("Query Error: " + sqlException.getMessage() + " Code: " + sqlException.getErrorCode()); + } catch (Exception e) { + log.error("Jdbc error: {}.", e.getMessage(), e); + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg("Query Error: " + e.getMessage()); + } + } + + + private Statement getConnection(String username, String password, String url) throws Exception { + CacheIdentifier identifier = CacheIdentifier.builder() + .ip(url) + .username(username).password(password).build(); + Optional cacheOption = CommonCache.getInstance().getCache(identifier, true); + Statement statement = null; + if (cacheOption.isPresent()) { + CommonJdbcConnect jdbcConnect = (CommonJdbcConnect) cacheOption.get(); + try { + statement = jdbcConnect.getConnection().createStatement(); + // 设置查询超时时间10秒 + statement.setQueryTimeout(10); + // 设置查询最大行数1000行 + statement.setMaxRows(1000); + } catch (Exception e) { + log.info("The jdbc connect form cache, create statement error: {}", e.getMessage()); + try { + if (statement != null) { + statement.close(); + } + jdbcConnect.close(); + statement = null; + } catch (Exception e2) { + log.error(e2.getMessage()); + } + CommonCache.getInstance().removeCache(identifier); + } + } + if (statement != null) { + return statement; + } + // 复用失败则新建连接 + Connection connection = DriverManager.getConnection(url, username, password); + statement = connection.createStatement(); + // 设置查询超时时间10秒 + statement.setQueryTimeout(10); + // 设置查询最大行数1000行 + statement.setMaxRows(1000); + CommonJdbcConnect jdbcConnect = new CommonJdbcConnect(connection); + CommonCache.getInstance().addCache(identifier, jdbcConnect, 10000L); + return statement; + } + + /** + * 查询一行数据, 通过查询返回结果集的列名称,和查询的字段映射 + * eg: + * 查询字段:one tow three four + * 查询SQL:select one, tow, three, four from book limit 1; + * @param statement 执行器 + * @param sql sql + * @param columns 查询的列头(一般是数据库表字段,也可能包含特殊字段,eg: responseTime) + * @throws Exception when error happen + */ + private void queryOneRow(Statement statement, String sql, List columns, + CollectRep.MetricsData.Builder builder, long startTime) throws Exception { + statement.setMaxRows(1); + ResultSet resultSet = statement.executeQuery(sql); + try { + if (resultSet.next()) { + CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); + for (String column : columns) { + if (CollectorConstants.RESPONSE_TIME.equals(column)) { + long time = System.currentTimeMillis() - startTime; + valueRowBuilder.addColumns(String.valueOf(time)); + } else { + String value = resultSet.getString(column); + value = value == null ? CommonConstants.NULL_VALUE : value; + valueRowBuilder.addColumns(value); + } + } + builder.addValues(valueRowBuilder.build()); + } + } finally { + resultSet.close(); + } + } + + /** + * 查询一行数据, 通过查询的两列数据(key-value),key和查询的字段匹配,value为查询字段的值 + * eg: + * 查询字段:one tow three four + * 查询SQL:select key, value from book; + * 返回的key映射查询字段 + * @param statement 执行器 + * @param sql sql + * @param columns 查询的列头(一般是数据库表字段,也可能包含特殊字段,eg: responseTime) + * @throws Exception when error happen + */ + private void queryOneRowByMatchTwoColumns(Statement statement, String sql, List columns, + CollectRep.MetricsData.Builder builder, long startTime) throws Exception { + ResultSet resultSet = statement.executeQuery(sql); + try { + HashMap values = new HashMap<>(columns.size()); + while (resultSet.next()) { + if (resultSet.getString(1) != null) { + values.put(resultSet.getString(1).toLowerCase(), resultSet.getString(2)); + } + } + CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); + for (String column : columns) { + if (CollectorConstants.RESPONSE_TIME.equals(column)) { + long time = System.currentTimeMillis() - startTime; + valueRowBuilder.addColumns(String.valueOf(time)); + } else { + String value = values.get(column.toLowerCase()); + value = value == null ? CommonConstants.NULL_VALUE : value; + valueRowBuilder.addColumns(value); + } + } + builder.addValues(valueRowBuilder.build()); + } finally { + resultSet.close(); + } + } + + /** + * 查询多行数据, 通过查询返回结果集的列名称,和查询的字段映射 + * eg: + * 查询字段:one tow three four + * 查询SQL:select one, tow, three, four from book limit 1; + * @param statement 执行器 + * @param sql sql + * @param columns 查询的列头(一般是数据库表字段,也可能包含特殊字段,eg: responseTime) + * @throws Exception when error happen + */ + private void queryMultiRow(Statement statement, String sql, List columns, + CollectRep.MetricsData.Builder builder, long startTime) throws Exception { + ResultSet resultSet = statement.executeQuery(sql); + try { + while (resultSet.next()) { + CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); + for (String column : columns) { + if (CollectorConstants.RESPONSE_TIME.equals(column)) { + long time = System.currentTimeMillis() - startTime; + valueRowBuilder.addColumns(String.valueOf(time)); + } else { + String value = resultSet.getString(column); + value = value == null ? CommonConstants.NULL_VALUE : value; + valueRowBuilder.addColumns(value); + } + } + builder.addValues(valueRowBuilder.build()); + } + } finally { + resultSet.close(); + } + } + + /** + * 根据jdbc入参构造数据库URL + * @param jdbcProtocol jdbc + * @return URL + */ + private String constructDatabaseUrl(JdbcProtocol jdbcProtocol) { + if (Objects.nonNull(jdbcProtocol.getUrl()) + && !Objects.equals("", jdbcProtocol.getUrl()) + && jdbcProtocol.getUrl().startsWith("jdbc")) { + // 入参数URL有效 则优先级最高返回 + return jdbcProtocol.getUrl(); + } + String url; + switch (jdbcProtocol.getPlatform()) { + case "mysql": + url = "jdbc:mysql://" + jdbcProtocol.getHost() + ":" + jdbcProtocol.getPort() + + "/" + (jdbcProtocol.getDatabase() == null ? "" : jdbcProtocol.getDatabase()) + + "?useUnicode=true&characterEncoding=utf-8&useSSL=false"; + break; + default: + throw new IllegalArgumentException("Not support database platform: " + jdbcProtocol.getPlatform()); + + } + return url; + } + + private static class Singleton { + private static final JdbcCommonCollect INSTANCE = new JdbcCommonCollect(); + } +}