记录一次MySQL大表拆分和迁移

2022-10-28 10:42:50 浏览数 (1)

1. 背景#

最近遇到一个关于MySQL单表过大的问题,该表存放的主要是日志文件,且其中有一个字段存放的数据过大,导致占用空间过大以及查询效率的降低,这种设计其实是不合理的。目前该表占用1.2T容量,数据量超过3亿条,而这个RDS数据库的容量总共就2T,且由于种种原因无法扩容,迫不得已急需给出解决方案。

2. 解决方案#

根据上面的背景,可得出以下这些问题,也给出了解决方案:

问题

解决方法

1

某字段占用空间较大,在MySQL中为text类型,存储的是json格式的数据,该字段平均占用空间为5KB

对字段进行压缩,把json格式压缩成字节序列,压缩后可节省5倍空间左右

2

单表数据量过大,而我们的业务是基本只取本年的数据,该表中很多不使用的数据导致查询效率降低

对该表按年份分表,本年的数据为热数据,之前的数据为冷数据

3

RDS服务器容量不足且无法扩容

考虑到以后业务数据的增长,我们决定直接买另一台RDS服务器,把冷数据迁移到新RDS服务器

具体步骤:在原先的数据库批量压缩字段 —> 批量迁移数据到新数据库

2.1 压缩代码#

由于数据库一直在业务上被使用着,无法停下来专门给我们做这些处理,那么为了降低对业务的影响,我们只能选择在节假日或者晚上凌晨时候操作,因此所有的脚本文件都需要提前写好,到时候直接做批量处理。

现在先给出压缩与解压代码如下,把json的数据压缩成字节格式,然后采用Base64编码格式存储:

代码语言:javascript复制
func Compress(s string, jsonFlag bool) string {
	// 开启 了 json 判断 并且 传入的 值 并非 是 有效的 json,直接返回原来的值
	if jsonFlag && !json.Valid([]byte(s)) || len(s) == 0 {
		return s
	}
	data := []byte(s)
	// 压缩
	data = zipBytes(data)
	// 转base64 存储
	return setBase64(data)
}

func UnCompress(s string, jsonFlag bool) string {
	// 开启 了 json 验证 并且 是有效的 json ,则 直接返回 原来的值
	if jsonFlag && json.Valid([]byte(s)) || len(s) == 0 {
		return s
	}
	data, err := getBase64(s)
	if err != nil {
		log.Error("解析报错")
		return ""
	}
	data = uZipBytes(data)
	return string(data)
}

func setBase64(data []byte) string {
	a := base64.StdEncoding.EncodeToString(data)
	return a
}

func getBase64(data string) ([]byte, error) {
	a, err := base64.StdEncoding.DecodeString(data)

	return a, err
}

func zipBytes(data []byte) []byte {
	var in bytes.Buffer
	z := zlib.NewWriter(&in)
	z.Write(data)
	z.Close()
	return in.Bytes()
}

//zip解压
func uZipBytes(data []byte) []byte {
	var out bytes.Buffer
	var in bytes.Buffer
	in.Write(data)
	r, _ := zlib.NewReader(&in)
	r.Close()
	io.Copy(&out, r)
	return out.Bytes()
}

2.2 更新具体步骤#

上面的代码是对单条数据的压缩,现在需要从数据库查出数据,然后批量的压缩,采用更新的操作,需要考虑如下问题:

① 每一批取出多少条数据

② 批量压缩采用goroutine并发压缩

③ 批量更新如何操作

2.2.1 步骤一查询#

由于数据超过3亿条,因此要保证查询效率,不然查询速度会非常慢。具体做法:

  • 每次查询1万条数据
  • 查询的时候只查询需要的字段,即id字段和需要压缩的字段,id字段为主键,采用主键索引
  • 采用分页查询的方式,即每次查询完记录最后一条数据的id,下一次查询直接在这个id的基础上查询,语句如:select id, detail from log_table where id > last_id limit 10000;这里假设要压缩的字段名为detail,表名为log_table,这种方式不仅命中了索引,还避免了全表扫描
2.2.2 步骤二压缩#

上面查出了1万条数据,接着要做的就是批量压缩,如果采用for循环1个1个的压缩,那么效率必然不是最高的,可以利用go语言并发的优势,把1万条数据分成10组,每组1千条数据,让这10组数据同时进行压缩,代码如下:

代码语言:javascript复制
func() {

    ......

    var wg sync.WaitGroup
    wg.Add(10)
    flagMap := make(map[int64]int, 0)
    for i := 0; i < 10; i   {
        go func(ii int) {
            defer wg.Done()

            defer func() { // 防止发生panic之后暂停
                err := recover()
                if err != nil {
                    log.Info("err:", err)
                }
            }()

            rwLock.Lock()
            for j := ii * 1000; j < (ii 1)*1000; j   { // 给goroutine划分执行区间
                flag := false
                detailTemp := processLogData[j].Detail
                detailTemp, flag = util.CompressForUpdate(detailTemp, true)
                if flag { // 如果不是有效json,则跳过
                    log.Info("该条json压缩失败或为空,已跳过,id:", processLogData[j].Id)
                    flagMap[processLogData[j].Id] = 2
                } else {
                    processLogData[j].Detail = detailTemp
                }
            }
            rwLock.Unlock()
        }(i)
    }
    wg.Wait() // 等待所有goroutine执行完毕

    ......

}

func CompressForUpdate(s string, jsonFlag bool) (string, bool) {
	// 开启 了 json 判断 并且 传入的 值 并非 是 有效的 json,直接返回原来的值
	if jsonFlag && !json.Valid([]byte(s)) || len(s) == 0 {
		return s, true
	}
	data := []byte(s)
	data = zipBytes(data)
	return setBase64(data), false
}

上面需要注意在每组goroutine内需要采用读写锁锁住,防止并发安全问题,因为里面有一个用来判空的map,是否需要判空根据不同业务决定

2.2.3 步骤三更新#

如果一条一条更新速度是极慢的,所以不推荐这种方法,这里采用的是批量更新的方式,经过试验,更新数据库字段,一次更新1000条,更新十次,会比一次更新1万条速度快很多,所以下面函数的tempList切片放的数据量是1千条,需要循环该函数10次才是1万条

代码语言:javascript复制
func batchUpdate(tableName, fieldName string, tempList []models.LogTable) string {
	/*
	例子:
		UPDATE tableName
		    SET fieldName = CASE id
		        WHEN 1 THEN 'value'
		        WHEN 2 THEN 'value'
		        WHEN 3 THEN 'value'
		    END
		WHERE id IN (1,2,3);
	*/
	var sqlStr string
	sqlStr = "UPDATE "   tableName   " SET "   fieldName   " = CASE id"
	for i := 0; i < len(tempList); i   {
		id := fmt.Sprintf("%d", tempList[i].Id)
		sqlStr  = " WHEN "   id   " THEN "   tempList[i].Detail
		if i == len(tempList)-1 {
			sqlStr  = " END"
		}
	}
	sqlStr  = " WHERE id IN ("
	for i := 0; i < len(tempList); i   {
		sqlStr  = fmt.Sprintf("%d", tempList[i].Id)
		if i != len(tempList)-1 {
			sqlStr  = ","
		} else {
			sqlStr  = ");"
		}
	}

	return sqlStr
}

当脚本写好后需要把程序放到服务器上跑,且在内网的环境下进行。经过实验,查询 压缩 更新 1万条数据共花费4s左右时间,那么3亿条数据需要花费大概33小时

2.3 迁移具体步骤#

迁移主要包括查询和插入两个步骤,查询和上面的查询方法一样;经过比较,批量插入的时候每500条插入一次速度最快

0 人点赞