ClickHouse Replicated 高效迁移方案

2023-03-21 11:44:46 浏览数 (3)

ClickHouse ReplicatedMergeTree 迁移方案(Clickhouse Vesion >= 21.3)

基础知识

  • fetich parttition https://clickhouse.com/docs/en/sql-reference/statements/alter/partition#fetch-partitionpart
  • attach partition https://clickhouse.com/docs/en/sql-reference/statements/alter/partition#attach-partitionpart
  • auxiliary_zookeepers https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replication

迁移方案

Engine

操作

方案

Distributed

create table on cluster

x

ReplciatedMergeTree

create table on cluster

fetch partition, attach partition

MergeTree

create table

insert from remote

Others

create table

insert from remote

配置 (配置后可能重启)

-config.xml 新增src zookeeper 'src_cluster'

代码语言:html复制
    <auxiliary_zookeepers>
        <src_cluster>
            <node>
                <host></host>
                <port>2181</port>
            </node>
            <node>
                <host></host>
                <port>2181</port>
            </node>
            <node>
                <host></host>
                <port>2181</port>
            </node>
        </src_cluster>
    </auxiliary_zookeepers>

-metrika.xml 添加源集群 cluster 配置

代码语言:html复制
<?xml version="1.0" encoding="UTF-8"?>
<yandex>
    <clickhouse_remote_servers>
        <default_cluster>
            <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <port>9000</port>
                </replica><replica>
                    <port>9000</port>
                </replica>
            </shard>
            <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <port>9000</port>
                </replica><replica>
                    <port>9000</port>
                </replica>
            </shard>
        </default_cluster>
      
        <!-- 源集群按照shard 拆分-->
        <src_shard1_cluster>
            <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <port>9000</port>
                </replica><replica>
                    <port>9000</port>
                </replica>
            </shard>
        </src_shard1_cluster>
        <src_shard2_cluster>
             <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <port>9000</port>
                </replica><replica>
                    <port>9000</port>
                </replica>
            </shard>
        </src_shard2_cluster> 
    </clickhouse_remote_servers>
    <zookeeper-servers>
        <node>
            <port>2181</port>
        </node><node>
            <port>2181</port>
        </node><node>
            <port>2181</port>
        </node>
    </zookeeper-servers>
</yandex>

文件说明

Engine

文件名

说明

Replicated

dis_shard1

一个副本ip

dis_shard2

一个副本ip

src_shard1

一个副本ip

src_shard2

一个副本ip

src_macros1

shard

Src_macros2

Shard

merge/Other

dis_ip

目标集群ip 按顺序

src_ip

源集群ip 按顺序

操作(shell demo)

代码语言:shell复制
#!/bin/bash
​
export THREAD=10
export DIR="ReplicatedMergeTree"
​
function shipper_partition() {
  SHARD=$1
  DATABASE=$2
  TABLE=$3
  PARTITIONINDEX=$4
  TASK=$5
  HOST=$(cat dis_shard${SHARD})
​
  echo "$SHARD $DATABASE $TABLE $PARTITIONINDEX Start"
​
  FETCHSQL=$(cat $DIR/$SHARD/$DATABASE/$TABLE/FETCH.sql | sed -n "${PARTITIONINDEX}p")
  ATTACHSQL=$(cat $DIR/$SHARD/$DATABASE/$TABLE/ATTACH.sql | sed -n "${PARTITIONINDEX}p")
​
  echo "$FETCHSQL" >>$DIR/$SHARD/$DATABASE/$TABLE/log
  echo "$ATTACHSQL" >>$DIR/$SHARD/$DATABASE/$TABLE/log
  #clickhouse-client --max_query_size=104857600 -h $HOST -mn --port 9000 -q "$FETCHSQL" >>$DIR/$SHARD/$DATABASE/$TABLE/log
  #clickhouse-client --max_query_size=104857600 -h $HOST -mn --port 9000 -q "$ATTACHSQL" >>$DIR/$SHARD/$DATABASE/$TABLE/log
​
  echo "$SHARD $DATABASE $TABLE $PARTITIONINDEX Finish"
  echo 0 >$DIR/Task/$TASK
}
​
function shipper_table() {
  SHARD=$1
  DATABASE=$2
  TABLE=$3
  MACROS=$(cat src_macros${SHARD})
​
  clickhouse-client --max_query_size=104857600 -h $(hostname -i) -mn --port 9000 -q "
SELECT concat('ALTER TABLE ', database, '.', table, ' FETCH PARTITION '', partition, '' FROM 'src_cluster:',
splitByString(''',engine_full)[2] as epath , '';') AS sql
FROM clusterAllReplicas('src_shard${SHARD}_cluster', system.tables) AS t, system.parts AS p
WHERE (t.database = p.database) AND (t.name = p.table) AND
(database = '${DATABASE}') AND (table = '${TABLE}')
GROUP BY
    database,
    table,
    epath,
    partition
ORDER BY
    database ASC,
    table ASC,
    epath ASC,
    partition ASC FORMAT CSV" >$DIR/$SHARD/$DATABASE/$TABLE/FETCH.sql
  sed -i "s/{shard}/$MACROS/" $DIR/$SHARD/$DATABASE/$TABLE/FETCH.sql
  sed -i 's/"//g' $DIR/$SHARD/$DATABASE/$TABLE/FETCH.sql
​
  clickhouse-client --max_query_size=104857600 -h $(hostname -i) -mn --port 9000 -q "
SELECT concat('ALTER TABLE ', database, '.', table, ' ATTACH PARTITION '', partition, '';') AS sql
FROM clusterAllReplicas('src_shard${SHARD}_cluster', system.tables) AS t, system.parts AS p
WHERE (t.database = p.database) AND (t.name = p.table) AND
(database = '${DATABASE}') AND (table = '${TABLE}')
GROUP BY
    database,
    table,
    partition
ORDER BY
    database ASC,
    table ASC,
    partition ASC FORMAT CSV" >$DIR/$SHARD/$DATABASE/$TABLE/ATTACH.sql
  sed -i 's/"//g' $DIR/$SHARD/$DATABASE/$TABLE/ATTACH.sql
​
  let PARTITIONINDEX=1
  PARTITIONCOUNT=$(cat $DIR/$SHARD/$DATABASE/$TABLE/FETCH.sql | wc -l)
  cat /dev/null >$DIR/$SHARD/$DATABASE/$TABLE/log
​
  while (($PARTITIONINDEX <= $PARTITIONCOUNT)); do
    for ((j = 1; j <= $THREAD; j  )); do
      task=$(cat $DIR/Task/$j)
      if [ $task = 0 ]; then
        #echo "$SHARD $DATABASE $TABLE $MACROS $PARTITIONCOUNT $PARTITIONINDEX $j"
        echo 1 >$DIR/Task/$j
        shipper_partition $SHARD $DATABASE $TABLE $PARTITIONINDEX $j &
        let PARTITIONINDEX  
        break
      fi
    done
  done
}
​
function shipper_database() {
  SHARD=$1
  DATABASE=$2
  clickhouse-client --max_query_size=104857600 -h $(hostname -i) -mn --port 9000 -q"
SELECT table
FROM clusterAllReplicas('src_shard${SHARD}_cluster', system.tables) AS t, system.parts AS p
WHERE (t.database = p.database) AND (t.name = p.table) AND (database != 'system') AND (engine = '${DIR}')
AND (database = '${DATABASE}')
GROUP BY table
ORDER BY table
FORMAT CSV" >/tmp/create_table.sql
  sed -i 's/"//g' /tmp/create_table.sql
​
  TABLES=$(cat /tmp/create_table.sql)
  for TABLE in $(echo ${TABLES}); do
    if [ ! -d $DIR/$SHARD/$DATABASE/$TABLE ]; then
      mkdir -p $DIR/$SHARD/$DATABASE/$TABLE
    fi
    shipper_table $SHARD $DATABASE $TABLE
​
  done
​
}
​
function shipper_shard() {
  SHARD=$1
​
  clickhouse-client --max_query_size=104857600 -h $(hostname -i) -mn --port 9000 -q"
SELECT database
FROM clusterAllReplicas('src_shard${SHARD}_cluster', system.tables) AS t, system.parts AS p
WHERE (t.database = p.database) AND (t.name = p.table) AND (database != 'system') AND (engine = '${DIR}')
GROUP BY database
ORDER BY database
FORMAT CSV" >/tmp/create_database.sql
  sed -i 's/"//g' /tmp/create_database.sql
​
  DATABASES=$(cat /tmp/create_database.sql)
  for DATABASE in $(echo ${DATABASES}); do
    if [ ! -d "$DIR/$SHARD/$DATABASE" ]; then
      mkdir -p $DIR/$SHARD/$DATABASE
    fi
    shipper_database $SHARD $DATABASE
  done
}
​
if [ ! -d "$DIR/Task" ]; then
  mkdir -p $DIR/Task
fi
​
for ((k = 1; k <= $THREAD; k  )); do
  echo 0 >$DIR/Task/$k
done
​
#START
if [ $# = 0 ]; then
  SHARDCOUNT=2
  for ((i = 1; i <= $SHARDCOUNT; i  )); do
    shipper_shard $i
  done
elif [ $# = 1 ]; then
  shipper_shard $1
elif [ $# = 2 ]; then
  shipper_database $1 $2
elif [ $# = 3 ]; then
  shipper_table $1 $2 $3
fi
​
wait

0 人点赞