ClickHouse案例:查询结果不一致

2020-11-16 11:11:52 浏览数 (1)

问题背景

某用户反馈其使用的ClickHouse集群同样的查询返回了不同的结果,是否是ClickHouse数据不能够保证一致性,还是集群有问题。

对于数据库来说,查询数据的准确性至关重要,我查询确定的数据你给我返回不一致的结果,那这结果还有何可用性而言,因此这个问题对用户的重要性不言而喻。

在收到用户反馈的这个问题后,第一时间和用户确认了用户具体的使用情况。

  1. 在集群中的各个节点创建本地表,表引擎为Kafka同时创建了对应的视图(消费Kafka里的数据);
  2. 创建分布式表,表引擎Distributed,汇总视图;
  3. 多次执行同一条查询返回了不一致的结果。

查询数据是通过分布式表来进行的,要想弄清楚为何每次查询返回的数据不一致,首先就需要弄清楚分布式表的原理。

分布式表

具有分布式引擎的表本身不存储任何数据,但可以在多个节点上进行分布式查询。读取会自动并行化进行,无需参数配置或手动干预。 查询时随机选择某个shard的replica进行读取。如果表有索引优先使用索引。

分布式引擎参数:服务器配置文件中的集群名远程数据库名远程表名数据分片键(可选)。

代码语言:txt复制
Distributed(logs, default, hits[, sharding_key])

查询时将从集群中每个服务器上的default.hits表中读取数据。

本文示例的集群配置如下:

代码语言:txt复制
<?xml version="1.0" encoding="UTF-8"?>
<yandex>
    <clickhouse_remote_servers>
        <default_cluster>
            <shard>
                <internal_replication>false</internal_replication>
                <replica>
                    <host>10.0.3.27</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>10.0.3.41</host>
                    <port>9000</port>
                </replica>
            </shard>
            <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <host>10.0.3.46</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>10.0.3.26</host>
                    <port>9000</port>
                </replica>
            </shard>
        </default_cluster>
    </clickhouse_remote_servers>
    <zookeeper-servers>
        <node>
            <host>10.0.3.12</host>
            <port>2181</port>
        </node>
        <node>
            <host>10.0.3.3</host>
            <port>2181</port>
        </node>
        <node>
            <host>10.0.3.23</host>
            <port>2181</port>
        </node>
    </zookeeper-servers>
</yandex>

集群名:default_cluster,包括两个分片,每个分片两个副本。

分片:各个分片(服务器)包含不同的数据(为了读取所有数据,必须访问所有分片)。

副本:多个相同冗余的服务器(读取数据时可以访问任何一个副本上的数据)。

当指定了副本时,读取的操作将为每个分片选择一个可用副本。也可以配置用于负载均衡的算法(访问副本的首选项(load_balancing = random/nearest_hostname/first_or_random/round_robin)–具体参阅官方文档load_balancing设定。

问题复现

集群:

代码语言:txt复制
┌─cluster─────────┬─shard_num─┬─shard_weight─┬─replica_num─┬─host_name─┬─host_address─┬─port─┬─is_local─┬─user────┬─default_database─┬─errors_count─┬─estimated_recovery_time─┐
│ default_cluster │         1 │            1 │           1 │ 10.0.3.27 │ 10.0.3.27    │ 9000 │        1 │ default │                  │            0 │                       0 │
│ default_cluster │         1 │            1 │           2 │ 10.0.3.41 │ 10.0.3.41    │ 9000 │        0 │ default │                  │            0 │                       0 │
│ default_cluster │         2 │            1 │           1 │ 10.0.3.46 │ 10.0.3.46    │ 9000 │        0 │ default │                  │            0 │                       0 │
│ default_cluster │         2 │            1 │           2 │ 10.0.3.26 │ 10.0.3.26    │ 9000 │        0 │ default │                  │            0 │                       0 │
└─────────────────┴───────────┴──────────────┴─────────────┴───────────┴──────────────┴──────┴──────────┴─────────┴──────────────────┴──────────────┴─────────────────────────┘

如上所示:集群中有两个分片,每个分片两个2副本

shard_num 1: 10.0.3.27 10.0.3.41

shard_num 1: 10.0.3.46 10.0.3.26

创建本地非复制表、创建分布式表、向分布式表中插入数据、查询分布式表

代码语言:txt复制
CREATE TABLE test.ddl_test ON cluster default_cluster(
  `Year` UInt16,
  `Quarter` UInt8,
  `Month` UInt8,
  `DayofMonth` UInt8,
  `DayOfWeek` UInt8,
  ...
) ENGINE = MergeTree
PARTITION BY Year
ORDER BY (Carrier, FlightDate)
SETTINGS index_granularity = 8192;
代码语言:txt复制
CREATE TABLE test.ddl_all ON cluster default_cluster AS test.ddl_test
ENGINE = Distributed(default_cluster, test, ddl_test, rand())
代码语言:txt复制
10.0.3.27 :) INSERT INTO ddl_all SELECT * FROM ontime;

INSERT INTO ddl_all SELECT *
FROM ontime

↘ Progress: 185.13 million rows, 134.51 GB (413.80 thousand rows/s., 300.65 MB/s.) ██████████ 99%Ok.

0 rows in set. Elapsed: 447.398 sec. Processed 185.13 million rows, 134.51 GB (413.80 thousand rows/s., 300.65 MB/s.)
代码语言:txt复制
# clickhouse-client -h 10.0.3.27 <<< "select count(1) from test.ddl_all"
142975770
# clickhouse-client -h 10.0.3.27 <<< "select count(1) from test.ddl_all"
134720581

可以看到相同的查询语句返回了不同的结果。

代码语言:txt复制
## 41与27同一分片并将internal_replication设定为false
# clickhouse-client -h 10.0.3.41  <<< "select count(1) from test.ddl_test" 
92562599
# clickhouse-client -h 10.0.3.27  <<< "select count(1) from test.ddl_test" 
92562599
## 46与26同一分片并将internal_replication设定为true
# clickhouse-client -h 10.0.3.46  <<< "select count(1) from test.ddl_test" 
50413171
# clickhouse-client -h 10.0.3.26  <<< "select count(1) from test.ddl_test" 
42157982

# clickhouse-client -h 10.0.3.41 <<< "select count(1) from test.ddl_all"
142975770
# clickhouse-client -h 10.0.3.27 <<< "select count(1) from test.ddl_all"
142975770
# clickhouse-client -h 10.0.3.46 <<< "select count(1) from test.ddl_all"
142975770
# clickhouse-client -h 10.0.3.26 <<< "select count(1) from test.ddl_all"
134720581

# clickhouse-client -h 10.0.3.27 --max_parallel_replicas=2 <<< "select count(1) from test.ddl_all"
185133752
# clickhouse-client -h 10.0.3.26 --max_parallel_replicas=2 <<< "select count(1) from test.ddl_all"
227283180
# clickhouse-client -h 10.0.3.41 --max_parallel_replicas=2 <<< "select count(1) from test.ddl_all"
185133752
# clickhouse-client -h 10.0.3.46 --max_parallel_replicas=2 <<< "select count(1) from test.ddl_all"
235538369

原理分析

  1. 首先查询Distributed表引擎的过程是先通过本地的表查询,和当前节点处于同一个分片下的Replication副本节点不会接收到查询的请求,和当前节点不同的分片会随机选择这个分片中的一个副本发送请求,然后再聚合各个分片返回的数据最后返回最终结果。
  2. 查询的过程中如果指定--max_parallel_replicas大于1个,会同时并行向多个(前项指定)不和当前节点处于同一个分片下的副本发送查询请求,之后会聚合这些接收请求的副本返回的结果(不会去除重复),因此可以看到上一章节中我们通过指定--max_parallel_replicas=2在节点10.0.3.46和10.0.3.26 上都查询出了多于正确结果185133752的数量。
  3. 分布式表和本地表关联紧密(类似视图概念),如果同一分片中设定了<internal_replication>false</internal_replication>通过分布式表插入数据会同时向多个副本写入,这样每个副本都有完整的数据,此时通过Distributed表引擎查询分布式表则可以返回正确的结果。但这种情况可能会导致最终的各个副本状态不一致(如果不使用Zookeeper来进行协调,任何单一节点的中断都会导致最终数据的不一致)。

最后我们通过将集群中的所有分片都设定:<internal_replication>false</internal_replication>

后再执行以上试验,得到了如下的结果:

代码语言:txt复制
# clickhouse-client -h 10.0.3.41  <<< "select count(1) from test.ddl_local"
92567953
# clickhouse-client -h 10.0.3.27  <<< "select count(1) from test.ddl_local"
92567953
# clickhouse-client -h 10.0.3.46  <<< "select count(1) from test.ddl_local"
92565799
# clickhouse-client -h 10.0.3.26  <<< "select count(1) from test.ddl_local"
92565799
# clickhouse-client -h 10.0.3.41  <<< "select count(1) from test.ddl_all"
185133752
# clickhouse-client -h 10.0.3.27  <<< "select count(1) from test.ddl_all"
185133752
# clickhouse-client -h 10.0.3.46  <<< "select count(1) from test.ddl_all"
185133752
# clickhouse-client -h 10.0.3.26  <<< "select count(1) from test.ddl_all"
185133752
# clickhouse-client -h 10.0.3.26 --max_parallel_replicas=2  <<< "select count(1) from test.ddl_all"
277701705

可以看到结果如上诉分析的,正常查询分布式表能够得到正确的结果,使用 --max_parallel_replicas=2指定同时并行查询的副本数量为2得到了有冗余的结果277701705,这不是正确的结果185133752。

解决方案

  1. 使用Replicated MergeTree family表
  2. 建议不要通过Distributed表插入数据(适合查询)
  3. 副本数大于等于2的时候,分布式表一定要建立在Replicated引擎本地表上,这样能够避免遇到很多异常情况。

参考文献

1 https://clickhouse.tech/docs/en/engines/table-engines/special/distributed/

2 https://github.com/ClickHouse/ClickHouse/issues/5835

3 https://github.com/ClickHouse/ClickHouse/issues/1443

4 https://clickhouse.tech/docs/zh/engines/table-engines/special/distributed/

5 https://clickhouse.tech/docs/en/operations/settings/settings/#settings-load_balancing

6 https://clickhouse.tech/docs/en/engines/table-engines/mergetree-family/replication/

7 https://github.com/ClickHouse/ClickHouse/issues/1854

0 人点赞