ClickHouse ReplicatedMergeTree 迁移方案(Clickhouse Vesion >= 21.3)
基础知识
- fetich parttition https://clickhouse.com/docs/en/sql-reference/statements/alter/partition#fetch-partitionpart
- attach partition https://clickhouse.com/docs/en/sql-reference/statements/alter/partition#attach-partitionpart
- auxiliary_zookeepers https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replication
迁移方案
Engine | 操作 | 方案 |
---|---|---|
Distributed | create table on cluster | x |
ReplciatedMergeTree | create table on cluster | fetch partition, attach partition |
MergeTree | create table | insert from remote |
Others | create table | insert from remote |
配置 (配置后可能重启)
-config.xml 新增src zookeeper 'src_cluster'
代码语言:html复制 <auxiliary_zookeepers>
<src_cluster>
<node>
<host></host>
<port>2181</port>
</node>
<node>
<host></host>
<port>2181</port>
</node>
<node>
<host></host>
<port>2181</port>
</node>
</src_cluster>
</auxiliary_zookeepers>
-metrika.xml 添加源集群 cluster 配置
代码语言:html复制<?xml version="1.0" encoding="UTF-8"?>
<yandex>
<clickhouse_remote_servers>
<default_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<port>9000</port>
</replica><replica>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<port>9000</port>
</replica><replica>
<port>9000</port>
</replica>
</shard>
</default_cluster>
<!-- 源集群按照shard 拆分-->
<src_shard1_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<port>9000</port>
</replica><replica>
<port>9000</port>
</replica>
</shard>
</src_shard1_cluster>
<src_shard2_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<port>9000</port>
</replica><replica>
<port>9000</port>
</replica>
</shard>
</src_shard2_cluster>
</clickhouse_remote_servers>
<zookeeper-servers>
<node>
<port>2181</port>
</node><node>
<port>2181</port>
</node><node>
<port>2181</port>
</node>
</zookeeper-servers>
</yandex>
文件说明
Engine | 文件名 | 说明 |
---|---|---|
Replicated | dis_shard1 | 一个副本ip |
dis_shard2 | 一个副本ip | |
src_shard1 | 一个副本ip | |
src_shard2 | 一个副本ip | |
src_macros1 | shard | |
Src_macros2 | Shard | |
merge/Other | dis_ip | 目标集群ip 按顺序 |
src_ip | 源集群ip 按顺序 |
操作(shell demo)
代码语言:shell复制#!/bin/bash
export THREAD=10
export DIR="ReplicatedMergeTree"
function shipper_partition() {
SHARD=$1
DATABASE=$2
TABLE=$3
PARTITIONINDEX=$4
TASK=$5
HOST=$(cat dis_shard${SHARD})
echo "$SHARD $DATABASE $TABLE $PARTITIONINDEX Start"
FETCHSQL=$(cat $DIR/$SHARD/$DATABASE/$TABLE/FETCH.sql | sed -n "${PARTITIONINDEX}p")
ATTACHSQL=$(cat $DIR/$SHARD/$DATABASE/$TABLE/ATTACH.sql | sed -n "${PARTITIONINDEX}p")
echo "$FETCHSQL" >>$DIR/$SHARD/$DATABASE/$TABLE/log
echo "$ATTACHSQL" >>$DIR/$SHARD/$DATABASE/$TABLE/log
#clickhouse-client --max_query_size=104857600 -h $HOST -mn --port 9000 -q "$FETCHSQL" >>$DIR/$SHARD/$DATABASE/$TABLE/log
#clickhouse-client --max_query_size=104857600 -h $HOST -mn --port 9000 -q "$ATTACHSQL" >>$DIR/$SHARD/$DATABASE/$TABLE/log
echo "$SHARD $DATABASE $TABLE $PARTITIONINDEX Finish"
echo 0 >$DIR/Task/$TASK
}
function shipper_table() {
SHARD=$1
DATABASE=$2
TABLE=$3
MACROS=$(cat src_macros${SHARD})
clickhouse-client --max_query_size=104857600 -h $(hostname -i) -mn --port 9000 -q "
SELECT concat('ALTER TABLE ', database, '.', table, ' FETCH PARTITION '', partition, '' FROM 'src_cluster:',
splitByString(''',engine_full)[2] as epath , '';') AS sql
FROM clusterAllReplicas('src_shard${SHARD}_cluster', system.tables) AS t, system.parts AS p
WHERE (t.database = p.database) AND (t.name = p.table) AND
(database = '${DATABASE}') AND (table = '${TABLE}')
GROUP BY
database,
table,
epath,
partition
ORDER BY
database ASC,
table ASC,
epath ASC,
partition ASC FORMAT CSV" >$DIR/$SHARD/$DATABASE/$TABLE/FETCH.sql
sed -i "s/{shard}/$MACROS/" $DIR/$SHARD/$DATABASE/$TABLE/FETCH.sql
sed -i 's/"//g' $DIR/$SHARD/$DATABASE/$TABLE/FETCH.sql
clickhouse-client --max_query_size=104857600 -h $(hostname -i) -mn --port 9000 -q "
SELECT concat('ALTER TABLE ', database, '.', table, ' ATTACH PARTITION '', partition, '';') AS sql
FROM clusterAllReplicas('src_shard${SHARD}_cluster', system.tables) AS t, system.parts AS p
WHERE (t.database = p.database) AND (t.name = p.table) AND
(database = '${DATABASE}') AND (table = '${TABLE}')
GROUP BY
database,
table,
partition
ORDER BY
database ASC,
table ASC,
partition ASC FORMAT CSV" >$DIR/$SHARD/$DATABASE/$TABLE/ATTACH.sql
sed -i 's/"//g' $DIR/$SHARD/$DATABASE/$TABLE/ATTACH.sql
let PARTITIONINDEX=1
PARTITIONCOUNT=$(cat $DIR/$SHARD/$DATABASE/$TABLE/FETCH.sql | wc -l)
cat /dev/null >$DIR/$SHARD/$DATABASE/$TABLE/log
while (($PARTITIONINDEX <= $PARTITIONCOUNT)); do
for ((j = 1; j <= $THREAD; j )); do
task=$(cat $DIR/Task/$j)
if [ $task = 0 ]; then
#echo "$SHARD $DATABASE $TABLE $MACROS $PARTITIONCOUNT $PARTITIONINDEX $j"
echo 1 >$DIR/Task/$j
shipper_partition $SHARD $DATABASE $TABLE $PARTITIONINDEX $j &
let PARTITIONINDEX
break
fi
done
done
}
function shipper_database() {
SHARD=$1
DATABASE=$2
clickhouse-client --max_query_size=104857600 -h $(hostname -i) -mn --port 9000 -q"
SELECT table
FROM clusterAllReplicas('src_shard${SHARD}_cluster', system.tables) AS t, system.parts AS p
WHERE (t.database = p.database) AND (t.name = p.table) AND (database != 'system') AND (engine = '${DIR}')
AND (database = '${DATABASE}')
GROUP BY table
ORDER BY table
FORMAT CSV" >/tmp/create_table.sql
sed -i 's/"//g' /tmp/create_table.sql
TABLES=$(cat /tmp/create_table.sql)
for TABLE in $(echo ${TABLES}); do
if [ ! -d $DIR/$SHARD/$DATABASE/$TABLE ]; then
mkdir -p $DIR/$SHARD/$DATABASE/$TABLE
fi
shipper_table $SHARD $DATABASE $TABLE
done
}
function shipper_shard() {
SHARD=$1
clickhouse-client --max_query_size=104857600 -h $(hostname -i) -mn --port 9000 -q"
SELECT database
FROM clusterAllReplicas('src_shard${SHARD}_cluster', system.tables) AS t, system.parts AS p
WHERE (t.database = p.database) AND (t.name = p.table) AND (database != 'system') AND (engine = '${DIR}')
GROUP BY database
ORDER BY database
FORMAT CSV" >/tmp/create_database.sql
sed -i 's/"//g' /tmp/create_database.sql
DATABASES=$(cat /tmp/create_database.sql)
for DATABASE in $(echo ${DATABASES}); do
if [ ! -d "$DIR/$SHARD/$DATABASE" ]; then
mkdir -p $DIR/$SHARD/$DATABASE
fi
shipper_database $SHARD $DATABASE
done
}
if [ ! -d "$DIR/Task" ]; then
mkdir -p $DIR/Task
fi
for ((k = 1; k <= $THREAD; k )); do
echo 0 >$DIR/Task/$k
done
#START
if [ $# = 0 ]; then
SHARDCOUNT=2
for ((i = 1; i <= $SHARDCOUNT; i )); do
shipper_shard $i
done
elif [ $# = 1 ]; then
shipper_shard $1
elif [ $# = 2 ]; then
shipper_database $1 $2
elif [ $# = 3 ]; then
shipper_table $1 $2 $3
fi
wait