ClickHouse 如何查询指定时间段内导入的数据

2021-08-24 17:44:19 浏览数 (2)

一 用途

  1. 数据查询
  2. 数据迁移和导入

二 为什么讲ClickHouse 数据迁移

  1. Clickhouse copier 没有增量导入
  2. Clickhouse remote 较慢,且为ClickHouse内部表
  3. 数据过滤维度小

三 ClickHouse MergeTreeData

代码语言:txt复制
QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
    MergeTreeData::DataPartsVector parts,
    const Names & column_names_to_return,
    const StorageMetadataPtr & metadata_snapshot,
    const SelectQueryInfo & query_info,
    const Context & context,
    const UInt64 max_block_size,
    const unsigned num_streams,
    const PartitionIdToMaxBlock * max_block_numbers_to_read) const
{
    for (const String & name : column_names_to_return)
    {
        if (name == "_part")
        {
            part_column_queried = true;
            virt_column_names.push_back(name);
        }
        else if (name == "_part_index")
        {
            virt_column_names.push_back(name);
        }
        else if (name == "_partition_id")
        {
            virt_column_names.push_back(name);
        }
        else if (name == "_part_uuid")
        {
            part_uuid_column_queried = true;
            virt_column_names.push_back(name);
        }
        else if (name == "_sample_factor")
        {
            sample_factor_column_queried = true;
            virt_column_names.push_back(name);
        }
        else
        {
            real_column_names.push_back(name);
        }
    }

3.1 如何利用

  • ClickHouse MergeTree 数据拥有以上虚拟字段
  • 这么看来我们可以简单直接不通过修改代码的将数据维度限制的part 的粒度

四操作

4.1 建表和导入

代码语言:txt复制
## 1 查看表字段
DESCRIBE TABLE db_1.test_26

Query id: 856af95b-cb07-43d9-a776-5e6fd3d3c456

┌─name──┬─type───┬─default_type─┬─default_expression─┬─comment─┬─codec_expression─┬─ttl_expression─┐
│ id    │ UInt16 │              │                    │         │                  │                │
│ value │ UInt32 │              │                    │         │                  │                │
│ dt    │ Date   │              │                    │         │                  │                │
└───────┴────────┴──────────────┴────────────────────┴─────────┴──────────────────┴────────────────┘

3 rows in set. Elapsed: 0.004 sec.

## 写入忽略

4.2 查询

代码语言:txt复制
## 2 查看所有数据
SELECT *
FROM db_1.test_26

Query id: 6211055b-02af-482e-bc55-ccd765b0b929

┌─id─┬─value─┬─────────dt─┐
│ 11 │  2013 │ 1975-06-12 │
└────┴───────┴────────────┘
┌─id─┬─value─┬─────────dt─┐
│ 11 │  2013 │ 1975-06-12 │
│ 11 │  2013 │ 1975-06-12 │
│ 11 │  2013 │ 1975-06-12 │
│ 11 │  2013 │ 1975-06-12 │
└────┴───────┴────────────┘
┌─id─┬─value─┬─────────dt─┐
│ 11 │  2013 │ 1975-06-12 │
└────┴───────┴────────────┘

6 rows in set. Elapsed: 0.148 sec. 

4.3 _part 虚拟隐藏字段

代码语言:txt复制
## 3 查看数据对应的part

SELECT
    id,
    value,
    dt,
    _part
FROM db_1.test_26

Query id: b7d81a80-089a-4434-b82e-a0e27c60c8ac

┌─id─┬─value─┬─────────dt─┬─_part────────┐
│ 11 │  2013 │ 1975-06-12 │ 197506_5_5_0 │
└────┴───────┴────────────┴──────────────┘
┌─id─┬─value─┬─────────dt─┬─_part────────┐
│ 11 │  2013 │ 1975-06-12 │ 197506_1_4_1 │
│ 11 │  2013 │ 1975-06-12 │ 197506_1_4_1 │
│ 11 │  2013 │ 1975-06-12 │ 197506_1_4_1 │
│ 11 │  2013 │ 1975-06-12 │ 197506_1_4_1 │
└────┴───────┴────────────┴──────────────┘
┌─id─┬─value─┬─────────dt─┬─_part────────┐
│ 11 │  2013 │ 1975-06-12 │ 197506_6_6_0 │
└────┴───────┴────────────┴──────────────┘

6 rows in set. Elapsed: 0.111 sec. 

4.4 system.parts 利用

代码语言:txt复制
DESCRIBE TABLE system.parts

Query id: 2dea5ab6-6857-4708-8919-a09f2382f059

┌─name──────────────────────────────────┬─type────────────┬─default_type─┬─default_expression─┬─comment─┬─codec_expression─┬─ttl_expression─┐
│ partition                             │ String          │              │                    │         │                  │                │
│ name                                  │ String          │              │                    │         │                  │                │
│ uuid                                  │ UUID            │              │                    │         │                  │                │
│ part_type                             │ String          │              │                    │         │                  │                │
│ active                                │ UInt8           │              │                    │         │                  │                │
│ marks                                 │ UInt64          │              │                    │         │                  │                │
│ rows                                  │ UInt64          │              │                    │         │                  │                │
│ bytes_on_disk                         │ UInt64          │              │                    │         │                  │                │
│ data_compressed_bytes                 │ UInt64          │              │                    │         │                  │                │
│ data_uncompressed_bytes               │ UInt64          │              │                    │         │                  │                │
│ marks_bytes                           │ UInt64          │              │                    │         │                  │                │
│ modification_time                     │ DateTime        │              │                    │         │                  │                │
│ remove_time                           │ DateTime        │              │                    │         │                  │                │
│ refcount                              │ UInt32          │              │                    │         │                  │                │
│ min_date                              │ Date            │              │                    │         │                  │                │
│ max_date                              │ Date            │              │                    │         │                  │                │
│ min_time                              │ DateTime        │              │                    │         │                  │                │
│ max_time                              │ DateTime        │              │                    │         │                  │                │
│ partition_id                          │ String          │              │                    │         │                  │                │
│ min_block_number                      │ Int64           │              │                    │         │                  │                │
│ max_block_number                      │ Int64           │              │                    │         │                  │                │
│ level                                 │ UInt32          │              │                    │         │                  │                │
│ data_version                          │ UInt64          │              │                    │         │                  │                │
│ primary_key_bytes_in_memory           │ UInt64          │              │                    │         │                  │                │
│ primary_key_bytes_in_memory_allocated │ UInt64          │              │                    │         │                  │                │
│ is_frozen                             │ UInt8           │              │                    │         │                  │                │
│ database                              │ String          │              │                    │         │                  │                │
│ table                                 │ String          │              │                    │         │                  │                │
│ engine                                │ String          │              │                    │         │                  │                │
│ disk_name                             │ String          │              │                    │         │                  │                │
│ path                                  │ String          │              │                    │         │                  │                │
│ hash_of_all_files                     │ String          │              │                    │         │                  │                │
│ hash_of_uncompressed_files            │ String          │              │                    │         │                  │                │
│ uncompressed_hash_of_compressed_files │ String          │              │                    │         │                  │                │
│ delete_ttl_info_min                   │ DateTime        │              │                    │         │                  │                │
│ delete_ttl_info_max                   │ DateTime        │              │                    │         │                  │                │
│ move_ttl_info.expression              │ Array(String)   │              │                    │         │                  │                │
│ move_ttl_info.min                     │ Array(DateTime) │              │                    │         │                  │                │
│ move_ttl_info.max                     │ Array(DateTime) │              │                    │         │                  │                │
│ default_compression_codec             │ String          │              │                    │         │                  │                │
│ recompression_ttl_info.expression     │ Array(String)   │              │                    │         │                  │                │
│ recompression_ttl_info.min            │ Array(DateTime) │              │                    │         │                  │                │
│ recompression_ttl_info.max            │ Array(DateTime) │              │                    │         │                  │                │
│ group_by_ttl_info.expression          │ Array(String)   │              │                    │         │                  │                │
│ group_by_ttl_info.min                 │ Array(DateTime) │              │                    │         │                  │                │
│ group_by_ttl_info.max                 │ Array(DateTime) │              │                    │         │                  │                │
│ rows_where_ttl_info.expression        │ Array(String)   │              │                    │         │                  │                │
│ rows_where_ttl_info.min               │ Array(DateTime) │              │                    │         │                  │                │
│ rows_where_ttl_info.max               │ Array(DateTime) │              │                    │         │                  │                │
│ bytes                                 │ UInt64          │ ALIAS        │ bytes_on_disk      │         │                  │                │
│ marks_size                            │ UInt64          │ ALIAS        │ marks_bytes        │         │                  │                │
└───────────────────────────────────────┴─────────────────┴──────────────┴────────────────────┴─────────┴──────────────────┴────────────────┘

51 rows in set. Elapsed: 0.006 sec. 

## 4 查看part 修改日志
SELECT
    name,
    modification_time
FROM system.parts
WHERE (database = 'db_1') AND (table = 'test_26')

Query id: 3e8b8a92-cfbe-4a87-bdc3-8a3b420a29a4

┌─name─────────┬───modification_time─┐
│ 197506_1_4_1 │ 2021-08-14 23:39:19 │
│ 197506_5_5_0 │ 2021-08-17 09:55:16 │
│ 197506_6_6_0 │ 2021-08-24 16:54:11 │### 当前part 数据是我们后面会过滤掉的
└──────────────┴─────────────────────┘

3 rows in set. Elapsed: 0.020 sec.

4.5 过滤

代码语言:txt复制
### 5 过滤我们想要的数据
### eg : part 日期在 2021-08-24 16:00:00 之前的数据
### 通过原表和系统表system.parts 进行迁移
### 197506_6_6_0 该part 数据被过滤掉了
SELECT
    id,
    value,
    dt,
    _part
FROM db_1.test_26 AS A
INNER JOIN system.parts AS B ON A._part = B.name
WHERE B.modification_time < '2021-08-24 16:00:00'

Query id: 8f9345dd-3529-4d80-beaf-bc0457d64dc9

┌─id─┬─value─┬─────────dt─┬─_part────────┐
│ 11 │  2013 │ 1975-06-12 │ 197506_5_5_0 │
└────┴───────┴────────────┴──────────────┘
┌─id─┬─value─┬─────────dt─┬─_part────────┐
│ 11 │  2013 │ 1975-06-12 │ 197506_1_4_1 │
│ 11 │  2013 │ 1975-06-12 │ 197506_1_4_1 │
│ 11 │  2013 │ 1975-06-12 │ 197506_1_4_1 │
│ 11 │  2013 │ 1975-06-12 │ 197506_1_4_1 │
└────┴───────┴────────────┴──────────────┘

4.6 拿到我们数据

代码语言:txt复制
### 6 最终需要执行的SQL 
SELECT
    id,
    value,
    dt
FROM db_1.test_26 AS A
INNER JOIN system.parts AS B ON A._part = B.name
WHERE B.modification_time < '2021-08-24 16:00:00'

Query id: 29794880-0ccb-43c9-8618-65b8c438086a

┌─id─┬─value─┬─────────dt─┐
│ 11 │  2013 │ 1975-06-12 │
│ 11 │  2013 │ 1975-06-12 │
│ 11 │  2013 │ 1975-06-12 │
│ 11 │  2013 │ 1975-06-12 │
└────┴───────┴────────────┘
┌─id─┬─value─┬─────────dt─┐
│ 11 │  2013 │ 1975-06-12 │
└────┴───────┴────────────┘

5 rows in set. Elapsed: 0.138 sec. 

五 CDW-ClickHouse

腾讯云CDW-ClickHouse 数据ETL交给了 Oceanus

Oceanus 使用ClickHouse-JDBC 操作链接ClickHouse

进而我们可以通过Oceanus 控制时间范围

实现ClickHouse 全量和增量的导入和ClickHouse 和迁移ClickHouse

Oceanus ClickHouse数据仓库

Oceanus ClickHouse 导入文档

clickhouse format

0 人点赞