客快物流大数据项目(一百):ClickHouse的使用

2022-12-29 15:12:42 浏览数 (3)

​ClickHouse的使用

一、使用Java操作ClickHouse

1、构建maven工程

2、​​​​​​​导入依赖

代码语言:javascript复制
<!-- Clickhouse -->
<dependency>
    <groupId>ru.yandex.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.2.2</version>
</dependency>

3、​​​​​​​​​​​​​​创建包结构

java程序包目录创建

包名

说明

cn.it.clickhouse

代码所在的包目录

4、代码案例

代码语言:javascript复制
package cn.it.demo;

import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 使用JDBC操作ClickHouse
 */
public class ClickHouseJDBC {
    public static void main(String[] args) {
        String sqlDB = "show databases";//查询数据库
        String sqlTab = "show tables";//查看表
        String sqlCount = "select count(*) count from ontime";//查询ontime数据量
        exeSql(sqlDB);
        exeSql(sqlTab);
        exeSql(sqlCount);
    }

    public static void exeSql(String sql){
        String address = "jdbc:clickhouse://node2:8123/default";
        Connection connection = null;
        Statement statement = null;
        ResultSet results = null;
        try {
            Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
            connection = DriverManager.getConnection(address);
            statement = connection.createStatement();
            long begin = System.currentTimeMillis();
            results = statement.executeQuery(sql);
            long end = System.currentTimeMillis();
            System.out.println("执行(" sql ")耗时:" (end-begin) "ms");
            ResultSetMetaData rsmd = results.getMetaData();
            List<Map> list = new ArrayList();
            while(results.next()){
                Map map = new HashMap();
                for(int i = 1;i<=rsmd.getColumnCount();i  ){
                    map.put(rsmd.getColumnName(i),results.getString(rsmd.getColumnName(i)));
                }
                list.add(map);
            }
            for(Map map : list){
                System.err.println(map);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {//关闭连接
            try {
                if(results!=null){
                    results.close();
                }
                if(statement!=null){
                    statement.close();
                }
                if(connection!=null){
                    connection.close();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}

二、使用Spark操作ClickHouse

1、导入依赖

代码语言:javascript复制
<repositories>
    <repository>
        <id>mvnrepository</id>
        <url>https://mvnrepository.com/</url>
        <layout>default</layout>
    </repository>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
    <repository>
        <id>elastic.co</id>
        <url>https://artifacts.elastic.co/maven</url>
    </repository>
</repositories>

<properties>
    <scala.version>2.11</scala.version>
    <!-- Spark -->
    <spark.version>2.4.0-cdh6.2.1</spark.version>
    <!-- Parquet -->
    <parquet.version>1.9.0-cdh6.2.1</parquet.version>
    <!-- ClickHouse -->
    <clickhouse.version>0.2.2</clickhouse.version>
    <jtuple.version>1.2</jtuple.version>
</properties>

<dependencies>
    <!-- Spark -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-common</artifactId>
        <version>${parquet.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-graphx_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>net.jpountz.lz4</groupId>
        <artifactId>lz4</artifactId>
        <version>1.3.0</version>
    </dependency>
    <dependency>
        <groupId>org.javatuples</groupId>
        <artifactId>javatuples</artifactId>
        <version>${jtuple.version}</version>
    </dependency>
    <!-- Clickhouse -->
    <dependency>
        <groupId>ru.yandex.clickhouse</groupId>
        <artifactId>clickhouse-jdbc</artifactId>
        <version>${clickhouse.version}</version>
        <exclusions>
            <exclusion>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
            </exclusion>
            <exclusion>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-core</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>

2、​​​​​​​​​​​​​​创建包结构

scala程序包目录创建

包名

说明

cn.it.clickhouse

代码所在的包目录

3、案例开发

实现步骤:

  • 创建ClickHouseJDBCDemo单例对象
  • 初始化spark运行环境
  • 加载外部数据源(资料order.json)生成DataFrame对象

代码实现

代码语言:javascript复制
package cn.it.demo

import cn.it.demo.utils.ClickHouseUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 使用jdbc方式操作clickhouse表
 */
object ClickHouseJDBCDemo {
  def main(args: Array[String]): Unit = {
    //创建上下文环境配置对象
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo")

    //创建SparkSession对象
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

    //读取json文件 创建DataFrame
    val df: DataFrame = spark.read.json("E:\input\order.json")
    df.show()

    spark.stop()
  }
}

3.1、创建表

实现步骤:

  • 创建ClickHouseUtils工具类
  • 创建方法:clickhouse的连接实例,返回连接对象
  • 创建方法:生成表的sql字符串
  • 创建方法:执行更新操作
  • ClickHouseJDBCDemo单例对象中调用创建表

实现方法:

  • 创建ClickHouseUtils工具类
代码语言:javascript复制
package cn.it.demo.utils

/**
 * ClickHouse的工具类
 */
class ClickHouseUtils extends Serializable {
}

  • 创建方法:clickhouse的连接实例,返回连接对象
代码语言:javascript复制
/**
* 创建clickhouse的连接实例,返回连接对象
* @return
*/
def createConnection = {
  //定义ClickHouse的服务器地址
  val host = "node2"
  //定义ClickHouse的连接端口号
  val port = "8123"
  val properties = new ClickHouseProperties()
  val dataSource = new ClickHouseDataSource(s"jdbc:clickhouse://${host}:${port}", properties)
  dataSource.getConnection()
}

  • 创建方法:生成表的sql字符串
代码语言:javascript复制
/**
 * 创建clickhouse的表,返回创建表的sql字符串
 * @param table
 * @param schema
 * @return
 */
def createTable(table: String, schema: StructType, primaryKeyField:String = "id"): String = {
  //生成表的列集合字符串
  val tableFieldsStr: String = schema.map { f =>
    val fName = f.name
    val fType = f.dataType match {
      case org.apache.spark.sql.types.DataTypes.StringType => "String"
      case org.apache.spark.sql.types.DataTypes.IntegerType => "Int32"
      case org.apache.spark.sql.types.DataTypes.FloatType => "Float32"
      case org.apache.spark.sql.types.DataTypes.LongType => "Int64"
      case org.apache.spark.sql.types.DataTypes.BooleanType => "UInt8"
      case org.apache.spark.sql.types.DataTypes.DoubleType => "Float64"
      case org.apache.spark.sql.types.DataTypes.DateType => "DateTime"
      case org.apache.spark.sql.types.DataTypes.TimestampType => "DateTime"
      case x => throw new Exception(s"Unsupported type: ${x.toString}")
    }
    s"$fName $fType"
  }.mkString(", ")

  //返回创建表的sql字符串
  s"CREATE TABLE IF NOT EXISTS $table(${tableFieldsStr},sign Int8,version UInt64) ENGINE=VersionedCollapsingMergeTree(sign, version) ORDER BY ${primaryKeyField}"
}

  • 创建方法:执行更新操作
代码语言:javascript复制
/**
 * 执行更新操作
 * @param sql
 * @return
 */
def executeUpdate(sql: String) = {
  //获取clickhouse的连接字符串
  val connection: ClickHouseConnection = createConnection
  println(sql)
  val statement: ClickHouseStatement = connection.createStatement()
  statement.executeUpdate(sql)
}

  • ClickHouseJDBCDemo单例对象中调用创建表
代码语言:javascript复制
//创建clickhouse工具实例对象
val clickHouseUtils: ClickHouseUtils = new ClickHouseUtils

//创建表
val strCreateTable: String = clickHouseUtils.createTable("order", df.schema)
clickHouseUtils.executeUpdate(strCreateTable);

3.2、​​​​​​​​​​​​​​插入数据

实现步骤:

  • 打开ClickHouseUtils工具类
  • 创建方法:生成插入表数据的sql字符串
  • 创建方法:根据字段类型为字段赋值默认值
  • 创建方法:将数据插入到clickhouse中
  • ClickHouseJDBCDemo单例对象中调用插入数据

实现方法:

  • 创建方法:生成插入表数据的sql字符串
代码语言:javascript复制
/**
 * 生成插入表数据的sql字符串
 * @param tableName
 * @param schema
 * @return
 */
private def createInsertStatmentSql(tableName: String)(schema: org.apache.spark.sql.types.StructType) = {
  val columns = schema.map(f => f.name).toList
  val vals = 1 to (columns.length) map (i => "?")
  s"INSERT INTO $tableName (${columns.mkString(",")}) VALUES (${vals.mkString(",")})"
}

  • 创建方法:根据字段类型为字段赋值默认值
代码语言:javascript复制
/**
 * 为sql赋值默认值
 * @param sparkType
 * @param v
 * @return
 */
private def defaultNullValue(sparkType: org.apache.spark.sql.types.DataType, v: Any) = sparkType match {
  case DoubleType => 0
  case LongType => 0
  case FloatType => 0
  case IntegerType => 0
  case StringType => null
  case BooleanType => false
  case _ => null
}

  • 创建方法:将数据插入到clickhouse中
代码语言:javascript复制
/**
 * 将数据插入到clickhouse中
 * @param tableName
 * @param df
 */
def insertToCkWithStatement(tableName :String, df:DataFrame): Unit = {
  //生成插入sql字符串
  val insertSql: String = createInsertStatmentSql(tableName)(df.schema)
  df.foreachPartition(rows => {
    var connection: ClickHouseConnection = null
    var pstmt: PreparedStatement = null
    var count = 0;
    try {
      connection = createConnection
      pstmt = connection.prepareStatement(insertSql)
      rows.foreach(line => {
        count  = 1
        line.schema.fields.foreach{ f =>
          val fieldName = f.name
          val fieldIdx = line.schema.fieldIndex(fieldName)
          val fieldVal = line.get(fieldIdx)
          if(fieldVal != null)
            pstmt.setObject(fieldIdx 1, fieldVal)
          else{
            val defVal = defaultNullValue(f.dataType, fieldVal)
            pstmt.setObject(fieldIdx 1, defVal)
          }
        }
        // 批量写入
        pstmt.addBatch()
        if (count >= 100000) {
          pstmt.executeBatch()
          count = 0
        }
      })
      pstmt.executeBatch()
    } catch {
      case ex: Exception =>
        println(ex)
    } finally {
      if (connection != null)
        connection.close()
    }
  })
}

  • ClickHouseJDBCDemo单例对象中调用插入数据
代码语言:javascript复制
//插入数据
clickHouseUtils.insertToCkWithStatement("order", df)

3.3、​​​​​​​​​​​​​​修改数据

实现步骤:

  • 打开ClickHouseUtils工具类
  • 创建方法:根据指定的字段名称获取字段对应的值
  • 创建方法:生成修改表数据的sql字符串
  • 创建方法:将数据更新到clickhouse中
  • ClickHouseJDBCDemo单例对象中调用更新数据

实现方法:

  • 创建方法:根据指定的字段名称获取字段对应的值
代码语言:javascript复制
/**
 * 根据指定字段获取该字段的值
 * @param fieldName
 * @param schema
 * @param data
 * @return
 */
private def getFieldValue(fieldName: String, schema: StructType, data: Row): Any = {
  var flag = true
  var fieldValue: String = null
  val fields = schema.fields
  for (i <- 0 until fields.length if flag) {
    val field = fields(i)
    if (fieldName == field.name) {
      fieldValue = field.dataType match {
        case DataTypes.DoubleType => if (data.isNullAt(i)) "NULL" else s"${data.getDouble(i)}"
        case DataTypes.FloatType => if (data.isNullAt(i)) "NULL" else s"${data.getFloat(i)}"
        case DataTypes.IntegerType => if (data.isNullAt(i)) "NULL" else s"${data.getInt(i)}"
        case DataTypes.LongType => if (data.isNullAt(i)) "NULL" else s"${data.getLong(i)}"
        case DataTypes.StringType => if (data.isNullAt(i)) "NULL" else s"${data.getString(i).toString.trim}"
        case DataTypes.DateType => if (data.isNullAt(i)) "NULL" else s"'${new SimpleDateFormat("yyyy-MM-dd").format(new Date(data.get(i).asInstanceOf[Date].getTime / 1000))}'"
        case DataTypes.TimestampType => if (data.isNullAt(i)) "NULL" else s"${new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(data.getLong(i) / 1000))}"
        case DataTypes.NullType => "NULL"
      }
      flag = false
    }
  }
  fieldValue
}

  • 创建方法:生成修改表数据的sql字符串
代码语言:javascript复制
/**
 * 生成修改表数据的sql字符串
 * @param tableName
 * @param schema
 * @return
 */
private def createUpdateStatmentSql(tableName: String, row:Row,  primaryKeyField:String = "id")(schema: org.apache.spark.sql.types.StructType) = {
  val primaryKeyValue = getFieldValue(primaryKeyField, schema, row)
  val noPrimaryKeyFields = schema.fields.filter(field => field.name != primaryKeyField)
  var sets = ArrayBuffer[String]()
  for (i <- 0 until noPrimaryKeyFields.length) {
    val noPrimaryKeyField = noPrimaryKeyFields(i)
    val set = noPrimaryKeyField.name   s"='${getFieldValue(noPrimaryKeyField.name, schema, row).toString}'"
    sets  = set
  }
  s"ALTER TABLE $tableName UPDATE ${sets.mkString(", ")} WHERE ${primaryKeyField}=$primaryKeyValue"
}

  • 创建方法:将数据更新到clickhouse中
代码语言:javascript复制
def updateToCkWithStatement(tableName :String, df:DataFrame, primaryKeyField:String = "id")= {
  df.foreachPartition(rows => {
    var connection: ClickHouseConnection = null
    var pstmt: ClickHouseStatement = null
    try {
      connection = createConnection
      pstmt = connection.createStatement()
      rows.foreach(line => {
        //生成修改sql字符串
        val updateSql: String = createUpdateStatmentSql(tableName, line)(line.schema)
        pstmt.executeUpdate(updateSql)
      })
    } catch {
      case ex: Exception =>
        println(ex)
    }
  })
}

  • ClickHouseJDBCDemo单例对象中调用更新数据
代码语言:javascript复制
//更新数据
clickHouseUtils.updateToCkWithStatement("order", df)

3.4、​​​​​​​​​​​​​​删除数据

实现步骤:

  • 打开ClickHouseUtils工具类
  • 创建方法:生成删除表数据的sql字符串
  • 创建方法:将数据从clickhouse中删除
  • ClickHouseJDBCDemo单例对象中调用删除数据

实现方法:

  • 创建方法:生成删除表数据的sql字符串
代码语言:javascript复制
/**
 * 生成删除表数据的sql字符串
 * @param tableName
 * @param schema
 * @return
 */
private def createDeleteStatmentSql(tableName: String, row:Row,  primaryKeyField:String = "id")(schema: org.apache.spark.sql.types.StructType) = {
  val primaryKeyValue = getFieldValue(primaryKeyField, schema, row)
  s"ALTER TABLE $tableName DELETE WHERE ${primaryKeyField} = $primaryKeyValue"
}

  • 创建方法:将数据从clickhouse中删除
代码语言:javascript复制
/**
 * 将数据从clickhouse中删除
 * @param tableName
 * @param df
 */
def deleteToCkWithStatement(tableName :String, df:DataFrame, primaryKeyField:String = "id")= {
  df.foreachPartition(rows => {
    var connection: ClickHouseConnection = null
    var pstmt: ClickHouseStatement = null
    try {
      connection = createConnection
      pstmt = connection.createStatement()
      rows.foreach(line => {
        //生成删除sql字符串
        val deleteSql: String = createDeleteStatmentSql(tableName, line)(line.schema)
        println(deleteSql)
        pstmt.executeUpdate(deleteSql)
      })
    } catch {
      case ex: Exception =>
        println(ex)
    }
  })
}

  • ClickHouseJDBCDemo单例对象中调用删除数据
代码语言:javascript复制
//删除数据
clickHouseUtils.deleteToCkWithStatement("order", df)

0 人点赞