Java springboot使用mybatis-plus druid连接池接入mysql和clickhouse多数据源 自定义sql实现批量插入array map复杂类型等

2022-05-11 09:47:24 浏览数 (1)

项目之前是springboot简单的mybatis接入mysql,后续需求要接入clickhouse,顺便借此机会引入mybatis-plus,因为在迭代过程中时不时要加字段,每次加字段都要手动改mapper.xml文件的resultmap等sql实在是很没必要。

但在接入之后,在往clickhouse插入测试数据时,使用了mybatis-plus自带的batchSave()方法,发现速度非常慢,完全不是clickhouse该有的写入速度。查阅源码发现其实框架里的batchSave是一个for循环,调用单次insert动作。这样就不奇怪,那就只能自己写批量插入语句来解决这个问题了。

接入流程:首先pom引入相关依赖 注意排除原有mybatis相关依赖以免依赖冲突

代码语言:javascript复制
        <!--mybatis-plus-->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>${mybatis-plus.version}</version>
        </dependency>
        <!--多数据源-->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
            <version>${mybatis-plus.version}</version>
        </dependency>
        <!--clickhouse-->
        <dependency>
            <groupId>com.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.3.2-patch8</version>
        </dependency>

然后编写配置文件 application.properties(yml同理) 这里由于项目要求密码需要混淆不能写明文,于是使用了druid连接池的加密功能。

代码语言:javascript复制
# 密码加密 使用阿里的configTools生成即可
public-key=省略
# 多数据源设置默认数据为mysql
spring.datasource.dynamic.primary=mysql
# 下划线转驼峰
mybatis-plus.configuration.map-underscore-to-camel-case=false
# 别名映射
# 还可以指定mapper文件位置 默认classpath/mapper/**/*.xml
mybatis-plus.configuration.use-column-label=true
# mysql
spring.datasource.dynamic.datasource.mysql.url=jdbc:mysql://127.0.0.1:3306/dbname?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowMultiQueries=true&serverTimezone=UTC
spring.datasource.dynamic.datasource.mysql.username=root
# 密码加密 使用阿里的configTools生成即可
spring.datasource.dynamic.datasource.mysql.password=asdweNfXgh0vruAeua2qC1h/kxSzzdQ/I9Eou6sdwetFGtIJOTjtTRNcuKrbVNp10FF9DukdX9EKsEwerRaAA==
spring.datasource.dynamic.datasource.mysql.driver-class-name=com.mysql.cj.jdbc.Driver
# 针对每个数据源可以有不同的连接池配置 这里mysql加密 clickhouse暂时使用默认的
spring.datasource.dynamic.datasource.mysql.druid.public-key=${public-key}
# clickhouse
spring.datasource.dynamic.datasource.clickhouse.url=jdbc:clickhouse://127.0.0.1:8123/dbname
spring.datasource.dynamic.datasource.clickhouse.username=
spring.datasource.dynamic.datasource.clickhouse.password=
spring.datasource.dynamic.datasource.clickhouse.driver-class-name=com.clickhouse.jdbc.ClickHouseDriver
# druid
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
# 排除druid自带自动配置 及druid相关配置 按需调整
spring.autoconfigure.exclude=com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
spring.datasource.dynamic.druid.initial-size=10
spring.datasource.dynamic.druid.max-active=40
spring.datasource.dynamic.druid.min-idle=5
spring.datasource.dynamic.druid.validation-query=select 1
spring.datasource.dynamic.druid.max-wait=28000
spring.datasource.dynamic.druid.time-between-eviction-runs-millis=60000
spring.datasource.dynamic.druid.min-evictable-idle-time-millis=300000
spring.datasource.dynamic.druid.test-while-idle=true
spring.datasource.dynamic.druid.test-on-borrow=true
spring.datasource.dynamic.druid.test-on-return=false
spring.datasource.dynamic.druid.keep-alive=true
spring.web.resources.static-locations=classpath:/static

在springboot配置类或启动类上加注解 来扫描指定接口

代码语言:javascript复制
@MapperScan(basePackages = {"cn.heasy.server.dao.*"})

dao接口文件位置 使用了包来区分哪个数据源 

 mapper文件同理

创建clickhouse表的sql

代码语言:javascript复制
create table hit_ioc
(
    create_time      DateTime,
    ioc              String,
    ioc_type         String,
    confidence       UInt8,
    confidence_level String,
    is_malicious     UInt8,
    judgments Array(String) default [],
    severity         String,
    severity_value   UInt8,
    basic_tag Nullable(String),
    main_tag Nullable(String),
    platform Array(String) default [],
    tlp Nullable(UInt8),
    uuid             String,
    apikey           String,
    direction        String,
    src_ip Nullable(String),
    host Nullable(String),
    url Nullable(String),
    event_name Nullable(String),
    source_id UInt32,
    raw String,
    query_type String,
    hit_scenes Array(UInt32) default [],
    business_input_type Nullable(UInt8),
    business_input_id Nullable(UInt32),
    business_input_name Nullable(String),
    is_wb            UInt8,
    asn Map(String,String) default map()
)
    engine = MergeTree
        partition by toYYYYMMDD(create_time)
        order by (toYYYYMMDD(create_time), apikey);

 对应实体类

代码语言:javascript复制
package cn.heasy.model.po.clickhouse;

import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.util.Map;
import java.util.Set;

/**
 * @author heasy
 * @date 2022/4/28 15:49
 **/
@Data
@TableName("hit_ioc")
public class HitIocChPo {

    Long createTime;
    String ioc;
    String iocType;
    Integer confidence;
    String confidenceLevel;
    Boolean isMalicious;
    @TableField(typeHandler = ClickArrayToStringSetHandler.class)
    Set<String> judgments;
    String severity;
    Integer severityValue;
    String basicTag;
    String mainTag;
    @TableField(typeHandler = ClickArrayToStringSetHandler.class)
    Set<String> platform;
    Integer tlp;
    String uuid;
    String apikey;
    String direction;
    String srcIp;
    String host;
    String url;
    String eventName;
    Long sourceId;
    String raw;
    String queryType;
    @TableField(typeHandler = ClickArrayToLongSetHandler.class)
    Set<Long> hitScenes;
    Integer businessInputType;
    Long businessInputId;
    String businessInputName;
    Boolean isWb;
    @TableField(typeHandler = ClickMapHandler.class)
    Map<String, Object> asn;

}

上面用到了几个handler 是自己写的,为了兼容clickhouse的数组和map类型。代码如下

代码语言:javascript复制
package cn.heasy.handler.db;


import cn.hutool.core.collection.CollUtil;
import com.clickhouse.jdbc.ClickHouseArray;

import java.sql.SQLException;
import java.util.HashSet;
import java.util.Set;

/**
 * @author heasy
 * @date 2022/4/28 16:22
 **/
public class ClickArrayHandler {

    public static <T> Set<T> parseClickArray(Object object, Class<T> clazz) throws SQLException {
        if (object instanceof ClickHouseArray) {
            final ClickHouseArray array = (ClickHouseArray) object;
            T[] integers = (T[]) array.getArray();
            return CollUtil.newHashSet(integers);
        }
        return CollUtil.empty(HashSet.class);
    }
}
代码语言:javascript复制
package cn.heasy.handler.db;

import cn.hutool.core.util.ArrayUtil;
import org.apache.ibatis.type.BaseTypeHandler;
import org.apache.ibatis.type.JdbcType;

import java.sql.CallableStatement;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Set;

/**
 * @author heasy
 * @date 2022/4/28 16:13
 **/
public class ClickArrayToStringSetHandler extends BaseTypeHandler<Set<String>> {

    @Override
    public void setNonNullParameter(PreparedStatement preparedStatement, int i, Set<String> integers, JdbcType jdbcType) throws SQLException {
        preparedStatement.setObject(i, ArrayUtil.toArray(integers, String.class));
    }

    @Override
    public Set<String> getNullableResult(ResultSet resultSet, String s) throws SQLException {
        final Object object = resultSet.getObject(s);
        return parseClickHouseArrayToInt(object);
    }

    @Override
    public Set<String> getNullableResult(ResultSet resultSet, int i) throws SQLException {
        final Object object = resultSet.getObject(i);
        return parseClickHouseArrayToInt(object);
    }

    @Override
    public Set<String> getNullableResult(CallableStatement callableStatement, int i) throws SQLException {
        final Object object = callableStatement.getObject(i);
        return parseClickHouseArrayToInt(object);
    }


    private Set<String> parseClickHouseArrayToInt(Object object) throws SQLException {
        return ClickArrayHandler.parseClickArray(object, String.class);
    }
}
代码语言:javascript复制
package cn.heasy.handler.db;

import cn.hutool.core.util.ArrayUtil;
import org.apache.ibatis.type.BaseTypeHandler;
import org.apache.ibatis.type.JdbcType;

import java.sql.CallableStatement;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Set;

/**
 * @author heasy
 * @date 2022/4/28 16:13
 **/
public class ClickArrayToLongSetHandler extends BaseTypeHandler<Set<Long>> {

    @Override
    public void setNonNullParameter(PreparedStatement preparedStatement, int i, Set<Long> integers, JdbcType jdbcType) throws SQLException {
        preparedStatement.setObject(i, ArrayUtil.toArray(integers, Long.class));
    }

    @Override
    public Set<Long> getNullableResult(ResultSet resultSet, String s) throws SQLException {
        final Object object = resultSet.getObject(s);
        return parseClickHouseArrayToInt(object);
    }

    @Override
    public Set<Long> getNullableResult(ResultSet resultSet, int i) throws SQLException {
        final Object object = resultSet.getObject(i);
        return parseClickHouseArrayToInt(object);
    }

    @Override
    public Set<Long> getNullableResult(CallableStatement callableStatement, int i) throws SQLException {
        final Object object = callableStatement.getObject(i);
        return parseClickHouseArrayToInt(object);
    }


    private Set<Long> parseClickHouseArrayToInt(Object object) throws SQLException {
        return ClickArrayHandler.parseClickArray(object, Long.class);
    }
}
代码语言:javascript复制
package cn.heasy.handler.db;

import cn.heasy.util.JsonUtil;
import org.apache.ibatis.type.BaseTypeHandler;
import org.apache.ibatis.type.JdbcType;

import java.sql.CallableStatement;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Map;

/**
 * @author wyh
 * @date 2022/4/28 16:52
 **/
public class ClickMapHandler extends BaseTypeHandler<Map<Object, Object>> {

    @Override
    public void setNonNullParameter(PreparedStatement preparedStatement, int i, Map<Object, Object> objectObjectMap, JdbcType jdbcType) throws SQLException {
        preparedStatement.setObject(i, objectObjectMap);
    }

    @Override
    public Map<Object, Object> getNullableResult(ResultSet resultSet, String s) throws SQLException {
        final Object object = resultSet.getObject(s);
        return JsonUtil.fromJsonQuiet(JsonUtil.toJsonQuiet(object), Map.class);
    }

    @Override
    public Map<Object, Object> getNullableResult(ResultSet resultSet, int i) throws SQLException {
        final Object object = resultSet.getObject(i);
        return JsonUtil.fromJsonQuiet(JsonUtil.toJsonQuiet(object), Map.class);
    }

    @Override
    public Map<Object, Object> getNullableResult(CallableStatement callableStatement, int i) throws SQLException {
        final Object object = callableStatement.getObject(i);
        return JsonUtil.fromJsonQuiet(JsonUtil.toJsonQuiet(object), Map.class);
    }
}

mapper接口

代码语言:javascript复制
package cn.heasy.dao.clickhouse;

import cn.heasy.model.po.clickhouse.HitIocChPo;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.springframework.stereotype.Repository;

import java.util.List;

/**
 * @author heasy
 * @date 2022/4/28 15:53
 **/
@Repository
public interface HitIocChMapper extends BaseMapper<HitIocChPo> {
    void batchInsert(List<HitIocChPo> list);

}

service类

代码语言:javascript复制
package cn.heasy.service.clickhouse;

import cn.heasy.dao.clickhouse.HitIocChMapper;
import cn.heasy.model.po.clickhouse.HitIocChPo;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.List;

/**
 * @author heasy
 * @date 2022/4/28 15:54
 **/
@Service
//使用该注解来指定数据源为clickhouse
@DS("clickhouse")
public class HitIocChService extends ServiceImpl<HitIocChMapper, HitIocChPo> {

    @Resource
    HitIocChMapper hitIocChMapper;

    public void batchInsert(List<HitIocChPo> list) {
        hitIocChMapper.batchInsert(list);
    }

}

mapper.xml文件 

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.heasy.dao.clickhouse.HitIocChMapper">
    <resultMap id="BaseResultMap" type="cn.heasy.model.po.clickhouse.HitIocChPo">
        <!--@mbg.generated-->
        <result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
        <result column="ioc" jdbcType="OTHER" property="ioc"/>
        <result column="ioc_type" jdbcType="OTHER" property="iocType"/>
        <result column="confidence" jdbcType="INTEGER" property="confidence"/>
        <result column="confidence_level" jdbcType="OTHER" property="confidenceLevel"/>
        <result column="is_malicious" jdbcType="INTEGER" property="isMalicious"/>
        <result column="judgments" jdbcType="OTHER" property="judgments"
                typeHandler="cn.heasy.handler.db.ClickArrayToStringSetHandler"/>
        <result column="severity" jdbcType="OTHER" property="severity"/>
        <result column="severity_value" jdbcType="INTEGER" property="severityValue"/>
        <result column="basic_tag" jdbcType="OTHER" property="basicTag"/>
        <result column="main_tag" jdbcType="OTHER" property="mainTag"/>
        <result column="platform" jdbcType="OTHER" property="platform"
                typeHandler="cn.heasy.handler.db.ClickArrayToStringSetHandler"/>
        <result column="tlp" jdbcType="INTEGER" property="tlp"/>
        <result column="uuid" jdbcType="OTHER" property="uuid"/>
        <result column="apikey" jdbcType="OTHER" property="apikey"/>
        <result column="direction" jdbcType="OTHER" property="direction"/>
        <result column="src_ip" jdbcType="OTHER" property="srcIp"/>
        <result column="host" jdbcType="OTHER" property="host"/>
        <result column="url" jdbcType="OTHER" property="url"/>
        <result column="event_name" jdbcType="OTHER" property="eventName"/>
        <result column="source_id" jdbcType="INTEGER" property="sourceId"/>
        <result column="raw" jdbcType="OTHER" property="raw"/>
        <result column="query_type" jdbcType="OTHER" property="queryType"/>
        <result column="hit_scenes" jdbcType="OTHER" property="hitScenes"
                typeHandler="cn.heasy.handler.db.ClickArrayToLongSetHandler"/>
        <result column="business_input_type" jdbcType="INTEGER" property="businessInputType"/>
        <result column="business_input_id" jdbcType="INTEGER" property="businessInputId"/>
        <result column="business_input_name" jdbcType="OTHER" property="businessInputName"/>
        <result column="is_wb" jdbcType="INTEGER" property="isWb"/>
        <result column="asn" jdbcType="OTHER" property="asn"
                typeHandler="cn.heasy.handler.db.ClickMapHandler"/>
    </resultMap>
    <sql id="Base_Column_List">
        <!--@mbg.generated-->
        create_time, ioc, ioc_type, confidence, confidence_level, is_malicious, judgments,
        severity, severity_value, basic_tag, main_tag, platform, tlp, uuid, apikey, direction,
        src_ip, "host", url, event_name, source_id, "raw", query_type, hit_scenes, business_input_type,
        business_input_id, business_input_name, is_wb, asn
    </sql>

    <insert id="batchInsert">
        <!--@mbg.generated-->
        INSERT INTO hit_ioc
        (create_time, ioc, ioc_type, confidence, confidence_level, is_malicious, judgments,
        severity, severity_value, basic_tag, main_tag, platform, tlp, uuid, apikey, direction,
        src_ip, "host", url, event_name, source_id, "raw", query_type, hit_scenes, business_input_type,
        business_input_id, business_input_name, is_wb, asn)
        FORMAT Values
        <foreach collection="list" item="item" separator="," index="index">
            (#{item.createTime,jdbcType=TIMESTAMP}, #{item.ioc}, #{item.iocType},
            #{item.confidence}, #{item.confidenceLevel},
            #{item.isMalicious},
            #{item.judgments,typeHandler=cn.heasy.handler.db.ClickArrayToStringSetHandler},
            #{item.severity}, #{item.severityValue},
            #{item.basicTag}, #{item.mainTag},
            #{item.platform,typeHandler=cn.heasy.handler.db.ClickArrayToStringSetHandler},
            #{item.tlp}, #{item.uuid}, #{item.apikey},
            #{item.direction}, #{item.srcIp}, #{item.host},
            #{item.url}, #{item.eventName}, #{item.sourceId},
            #{item.raw}, #{item.queryType},
            #{item.hitScenes,typeHandler=cn.heasy.handler.db.ClickArrayToLongSetHandler},
            #{item.businessInputType}, #{item.businessInputId},
            #{item.businessInputName}, #{item.isWb},
            #{item.asn,typeHandler=cn.hesay.handler.db.ClickMapHandler})
        </foreach>
        ;
    </insert>

</mapper>

特别注意:此处我使用mysql同样的sql写法 insert into xxx values(v1),(v2).... 执行时如果参数为单个,则正常执行,但超过1个时会报nullpointException异常 查阅源码时发现 异常发生处为

代码语言:javascript复制
ClickHouseConnectionImpl的618行
代码语言:javascript复制
int startIndex = parsedStmt.getPositions().get(ClickHouseSqlStatement.KEYWORD_VALUES_START);

debug时发现positions这个map里只有3个值 没有keyword_values对应的值,于是用int来接收null就抛出了空指针。但查阅许多资料发现大家的批量插入都是这么写的,不知道是不是版本更新有所不同了。

经异常猜测应该是sql语句少了某些关键字,positions里存放的是关键字及其在sql语句的偏移量。 

最后翻阅官方文档 INSERT INTO Statement | ClickHouse Docs 发现它的示例中使用了FORMAT values。 改用后不再报错。插入速度经测试要比单条循环插入快了许多。

0 人点赞