导读
通常我们会使用 mysqldump
导出数据, 然后使用mysql命令导入. 我们可以根据 上一篇文章 提供的脚本来查看进度, 但是该等的时间还是不能少.
mysql导入是单线程的, 很慢. 那么我们可以把.sql
文件拆分为多个文件, 然后并发导入, 这样就快很多了.
其实之前也测试过的, 但是效果不佳, 开32并发速度都是差不多的..... 因为当时是把每个INSERT
语句都均匀的分在每个文件. 这样并不会提高导入速度.
原理
吸取了上次的教训, 这次就按照 每张表一个文件
来拆分. 然后并发导入, 这样同时导入, 速度就会提示.
MYSQLDUMP 文件格式
mysql 5.7 和 8.0 的mysqldump导出的数据是差不多的, 只有一点点区别
格式如下:
代码语言:note复制客户端和服务端 版本信息
字符集等变量设置
SQL_LOG_BIN
GLOBAL.GTID_PURGED 如果是8.0的话
CHANGE MASTER
db1
create table 建表
insert into 插入数据
trigger 触发器
v1 view 视图
db2 ....
db1
event
routin
db2
...
db1
view
db2 ....
GLOBAL.GTID_PURGED 如果是5.7的话
字符集等变量设置(改回去)
关键词匹配
那我们就可以根据这个格式来写相关脚本了.
官方可能也考虑到了这一点, 还是提供了相关的关键字的.
关键字关系如下
代码语言:note复制GTID 8.0 (开头的)
--
-- GTID state at the beginning of the backup
--
GTID 5.7 (结尾的)
--
-- GTID state at the end of the backup
--
CHANGE MASTER
--
-- Position to start replication or point-in-time recovery from
--
建库语句
--
-- Current Database: `ibd2sql`
--
表结构
--
-- Table structure for table `AllTypesExample`
--
INSERT
--
-- Dumping data for table `AllTypesExample`
--
视图(和表放一起的) 8.0
--
-- Temporary view structure for view `v1`
--
视图(和表放一起的) 5.7
--
-- Temporary table structure for view `v_1`
--
EVENT
--
-- Dumping events for database 'db2'
--
ROUTINES
--
-- Dumping routines for database 'db2'
--
VIEW (最后的)
--
-- Final view structure for view `v_1`
--
并发导入原理
并发导入的原理比较简单, 其实就是把进程放后台就行. 主要是注意导入顺序
如果是 5.7 导入到8.0 的话, 需要注意统计信息表是的DROP
和CREATE
是无法执行的, 可以人工注释掉,然后导入, 或者人工收集统计信息.
脚本说明
.sql文件拆分脚本
说明
MysqlDumpSplitSQL.py
使用python2
编写的(python3有点编码问题). 可以将mysqldump导出的.sql文件拆分为多个文件, 按照如下结构分布:
如果是8.0的话, 还有dbs/special.sql
记录统计信息
splitByddcw_20240229_165143
├── dbs
│ ├── chartest
│ │ ├── appmap.sql
│ │ └── app.sql
│ ├── create.sql
│ ├── gtid.sql
│ ├── master_info.txt
│ ├── T20240227_2
│ └── test
│ └── test.sql
├── events
│ ├── chartest.sql
│ └── test.sql
├── routines
│ ├── chartest.sql
│ ├── db1.sql
└── views
├── db1.sql
└── t20240227.sql
速度还是很快的(嘎嘎快), 1.7GB
的文件只要4秒
左右就能拆分完.
使用方法
代码语言:shell复制python MysqlDumpSplitSQL.py t20240228_alldb.sql
如果只要某张表的话, 还可以使用 --table tablename
来匹配需要的表. 支持正则表达式
详细用法如下
代码语言:shell复制python MysqlDumpSplitSQL.py -h
usage: MysqlDumpSplitSQL.py [-h] [--version] [--database DATABASE]
[--table TABLE] [--output-dir OUTPUT_DIR]
[--presql PRESQL] [--file FILENAME]
[--log-file LOG_FILENAME]
[files [files ...]]
拆分 mysqldump 导出的.sql文件.
positional arguments:
files 要拆分的 mysqldump.sql 文件
optional arguments:
-h, --help show this help message and exit
--version, -v, -V 版本信息
--database DATABASE 只导入的数据库
--table TABLE 只导入的表
--output-dir OUTPUT_DIR
输出的目录
--presql PRESQL 每个.sql文件开头部分, 比如 set
sql_log_bin=off; set names utf8mb4
--file FILENAME 要拆分的 mysqldump.sql 文件
--log-file LOG_FILENAME
日志
导入脚本说明
说明
testparallel.sh
按照mysqldump导出的顺序做导入操作, 还额外检查了下 某些特殊参数, 然后开了并发.差不多就这些. 并发逻辑就是 放后台, 然后循环检查 如果跑完了, 就下一个导入开始. 由于是基于文件级别的, 所以存在短板效应.
使用方法
修改脚本中的连接信息
和 并发度
等信息, 然后执行脚本,后面跟上上面拆分的路径就行. 剩下的脚本自己去识别.
sh testparallel.sh splitByddcw_20240229_165143
修改的信息参考如下:
代码语言:shell复制#可以修改的参数
CONCURRENCY=4 #并发数量
SLEEP_INTERNAL="0.01" #每隔 SLEEP_INTERNAL 秒, 检查一次 是否有导入完成的进程
IGNORE_GTID_CHECK="0" #如果为1, 表示不检查GTID是否存在
IGNORE_FUCNTION_CREATOR="0" #如果为1, 表示不检查log_bin_trust_function_creators是否为1
IGNORE_DISABLE_ENGINE="0" #如果为1, 表示不检查disabled_storage_engines是否含MyISAM
LOGFILE="import.log" #导入日志. 和控制台输出的内容一样
DIRNAME=$1 #已经拆分了的 mysqldump 导出的SQL文件目录
脚本演示
说这么多, 还不如直接演示一下, 由于打印的日志太长了, 这里就把中间的信息省略了.
为了验证结果正确性, 我们要在导入前后使用如下命令来记录相关表的校验值, 然后验证我们的脚本确实没毛病!
校验命令参考如下:
代码语言:shell复制导入前数据校验:
mysql -h127.0.0.1 -P3314 -p123456 -NB -e "select concat('CHECKSUM TABLE `',TABLE_SCHEMA,'`.`',TABLE_NAME,'`;') FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA NOT IN('sys','mysql','information_schema','performance_schema');" | sort | mysql -h127.0.0.1 -P3314 -p123456 > /tmp/before_check.txt
导入后数据校验:
mysql -h127.0.0.1 -P3314 -p123456 -NB -e "select concat('CHECKSUM TABLE `',TABLE_SCHEMA,'`.`',TABLE_NAME,'`;') FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA NOT IN('sys','mysql','information_schema','performance_schema');" | sort | mysql -h127.0.0.1 -P3314 -p123456 > /tmp/after_check.txt
前后数据比较
diff /tmp/before_check.txt /tmp/after_check.txt
导出
导出没啥好说的, 直接全库导出即可
代码语言:shell复制17:05:28 [root@ddcw21 mysqldump_t20240226]#mysqldump -h127.0.0.1 -P3314 -p123456 --events --triggers --single-transaction --routines --master-data=2 -A > t20240229_alldb.sql
mysqldump: [Warning] Using a password on the command line interface can be insecure.
17:06:48 [root@ddcw21 mysqldump_t20240226]#
17:06:48 [root@ddcw21 mysqldump_t20240226]#ll -ahrlt t20240229_alldb.sql
-rw-r--r-- 1 root root 1.7G Feb 29 17:06 t20240229_alldb.sql
拆分
直接使用脚本拆即可
代码语言:shell复制17:07:55 [root@ddcw21 mysqldump_t20240226]#python MysqlDumpSplitSQL.py t20240229_alldb.sql
2024-02-29 17:08:03 CLIENT_VERSION: 8.0.28, SERVER_VERSION: 8.0.28FILE_HEADER:
-- AUTO SPLIT MYSQLDUMP FILE BY DDCW @https://github.com/ddcw
/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
/*!50503 SET NAMES utf8mb4 */;
/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */;
/*!40103 SET TIME_ZONE=' 00:00' */;
/*!50606 SET @OLD_INNODB_STATS_AUTO_RECALC=@@INNODB_STATS_AUTO_RECALC */;
/*!50606 SET GLOBAL INNODB_STATS_AUTO_RECALC=OFF */;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
SET @MYSQLDUMP_TEMP_LOG_BIN = @@SESSION.SQL_LOG_BIN;
SET @@SESSION.SQL_LOG_BIN= 0;
2024-02-29 17:08:03 READ TABLE FOR mysql.columns_priv BEGIN
2024-02-29 17:08:03 READ TABLE FOR mysql.columns_priv FINISH. COST TIME: 0.0 seconds
2024-02-29 17:08:03 READ TABLE FOR mysql.component BEGIN
2024-02-29 17:08:03 READ TABLE FOR mysql.component FINISH. COST TIME: 0.0 seconds
2024-02-29 17:08:03 READ TABLE FOR mysql.db BEGIN
2024-02-29 17:08:03 READ TABLE FOR mysql.db FINISH. COST TIME: 0.0 seconds
........................................
日志太多就省略中间部分了
........................................
2024-02-29 17:08:07 READ ROUTINE FOR DATABASE t20240227_2 FINISH COST TIME: 0.0 seconds
2024-02-29 17:08:07 READ TABLE FOR test.test BEGIN
2024-02-29 17:08:07 READ TABLE FOR test.test FINISH. COST TIME: 0.0 seconds
2024-02-29 17:08:07 READ EVENT FOR DATABASE test BEGIN
2024-02-29 17:08:07 READ EVENT FOR DATABASE test FINISH COST TIME: 0.0 seconds
2024-02-29 17:08:07 READ ROUTINE FOR DATABASE test BEGIN
2024-02-29 17:08:07 READ ROUTINE FOR DATABASE test FINISH COST TIME: 0.0 seconds
2024-02-29 17:08:07 READ VIEW FOR DATABASE db1 BEGIN
2024-02-29 17:08:07 READ VIEW FOR DATABASE db1 FINISH COST TIME: 0.0 seconds
2024-02-29 17:08:07 READ VIEW FOR DATABASE t20240227 BEGIN
2024-02-29 17:08:07 READ VIEW FOR DATABASE t20240227 FINISH COST TIME: 0.0 seconds
2024-02-29 17:08:07 READ VIEW FOR DATABASE t20240227_2 BEGIN
2024-02-29 17:08:07 READ VIEW FOR DATABASE t20240227_2 FINISH COST TIME: 0.0 seconds
2024-02-29 17:08:07 READ ALL FINISH
2024-02-29 17:08:07 FILENAME : /root/mysqldump_t20240226/t20240229_alldb.sql
2024-02-29 17:08:07 OUTPUT_DIR : /root/mysqldump_t20240226/splitByddcw_20240229_170803
2024-02-29 17:08:07 LOG_FILENAME : /root/mysqldump_t20240226/SplitMysqlDumpSQL.log
2024-02-29 17:08:07 COST TIME : 4.02 SECONDS. TABLES COUNT: 412
2024-02-29 17:08:07 WARNING : 0
17:08:07 [root@ddcw21 mysqldump_t20240226]#
就 4 秒 -_-
并发导入
直接导入的话, 由于GTID问题, 会导不进去
代码语言:shell复制17:09:54 [root@ddcw21 mysqldump_t20240226]#sh testparallel.sh splitByddcw_20240229_170803
********** BEGIN CHECK **************
2024-02-29 17:10:03 CONNCT SUCCESS.
2024-02-29 17:10:03 CHECK_CONN OK
2024-02-29 17:10:03 CURRENT GTID: b68e2434-cd30-11ec-b536-000c2980c11e:1-35 FAILED!
17:10:03 [root@ddcw21 mysqldump_t20240226]#
所以我们要修改脚本, 把忽略GTID给打开, 把IGNORE_FUCNTION_CREATOR也打开吧.
代码语言:shell复制IGNORE_GTID_CHECK="1"
IGNORE_FUCNTION_CREATOR="1"
然后再次导入
代码语言:shell复制17:11:38 [root@ddcw21 mysqldump_t20240226]#sh testparallel.sh splitByddcw_20240229_170803
********** BEGIN CHECK **************
2024-02-29 17:11:39 CONNCT SUCCESS.
2024-02-29 17:11:39 CHECK_CONN OK
2024-02-29 17:11:39 CHECK_GTID OK
2024-02-29 17:11:39 MYSQL VERSION: 8 0 28
2024-02-29 17:11:39 CHECK_VERSION OK
2024-02-29 17:11:39 disabled_storage_engines= OK
2024-02-29 17:11:39 log_bin_trust_function_creators=1 OK
........................................
日志太多就省略中间部分了
........................................
#################### IMPORT APP VIEWS #####################
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/db1.sql BEGIN...
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/db1.sql SUCCESS.
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/db1.sql FINISH. cost 0 seconds
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/t20240227.sql BEGIN...
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/t20240227.sql SUCCESS.
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/t20240227.sql FINISH. cost 0 seconds
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/t20240227_2.sql BEGIN...
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/t20240227_2.sql SUCCESS.
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/t20240227_2.sql FINISH. cost 0 seconds
2024-02-29 17:15:16 APP DATABASE COUNT: 24 APP TABLE COUNT: 377 APP DATA IMPORT COST_TIME: 215 SECONDS.
2024-02-29 17:15:16 IMPORT ALL FINISH. TOTAL COST TIME 217 SECONDS. FAILED COUNT: 1
2024-02-29 17:15:16 ERROR COUNT: 1
2024-02-29 17:15:16
splitByddcw_20240229_170803/dbs/gtid.sql IMPORT FAILED
耗时 217秒
, 还行... 主要是那种大表太慢了(170秒)....
这里有个ERROR, 是gtid.sql导入失败了. 我们不用管它. 因为是我们自己忽略的. 当然也可以选择reset master
然后再导入这个gtid.sql
也是可以的
正常导入
直接导入会报错, 有gtid问题, 所以要先reset master
17:19:53 [root@ddcw21 mysqldump_t20240226]#mysql -h127.0.0.1 -P3314 -p123456 < t20240229_alldb.sql
mysql: [Warning] Using a password on the command line interface can be insecure.
ERROR 3546 (HY000) at line 26: @@GLOBAL.GTID_PURGED cannot be changed: the added gtid set must not overlap with @@GLOBAL.GTID_EXECUTED
reset master后, 开始导入 (加个time, 不然看不到时间, 不好比较)
代码语言:shell复制17:21:22 [root@ddcw21 mysqldump_t20240226]#time mysql -h127.0.0.1 -P3314 -p123456 < t20240229_alldb.sql
mysql: [Warning] Using a password on the command line interface can be insecure.
real 4m39.067s
user 0m11.893s
sys 0m4.000s
17:26:02 [root@ddcw21 mysqldump_t20240226]#
感觉上就比较慢了.
验证
我这里忘了校验前后数据一致性了.....
之前做测试的时候 校验过的, 是一致的.
时间对比
拆分时间4秒 加上 导入217秒, 耗时3min37s
导入类型 | 时间 |
---|---|
原生导入 | 4min39s |
4并发 | 3min37s |
8并发 | 3min12s |
效果还是有的, 但是有短板效应.
总结
- mysql并发导入确实能提升速度, 但是存在短板效应, 如果有一张表占比特别大的话, 并发导入的优势就不明显.
- mysql 5.7和8.0 的mysqldump命令导出的文件还是有区别的. 对于统计信息表, 5.7 是含有
DROP
和CREATE
的, 但 8.0 只有INSERT IGNORE INTO
- MySQL 5.7和8.0的GTID位置也不一样.
- 4并发就够了, 并发多了, 提升不是很明显(IO瓶颈了)
附相关源码
github地址: https://github.com/ddcw/ddcw/tree/master/python/MySQL并发导入
墨天轮地址: https://www.modb.pro/doc/125805 mysqldump拆分脚本
代码语言:python代码运行次数:0复制#!/usr/bin/env python
# -*- coding: utf-8 -*-
# write by ddcw @https://github.com/ddcw
# 拆分 mysqldump 导出的.sql文件 (暂不支持过滤)
# 本脚本没有恢复 会话变量, 所以不建议使用 source
import sys,os
import re
import datetime,time
import errno
import argparse,glob
# 解析参数
def _argparse():
parser = argparse.ArgumentParser(add_help=True, description='拆分 mysqldump 导出的.sql文件.')
parser.add_argument('--version', '-v', '-V', action='store_true', dest="VERSION", default=False, help='版本信息')
parser.add_argument('--database', dest="DATABASE", default="*", help='只导入的数据库')
parser.add_argument('--table', dest="TABLE", default="*", help='只导入的表')
parser.add_argument('--output-dir', dest="OUTPUT_DIR", default="", help='输出的目录')
#parser.add_argument('--parallel', dest="PARALLEL", default=4, help='导入并发度')
parser.add_argument('--presql', dest="PRESQL", default="", help='每个.sql文件开头部分, 比如 set sql_log_bin=off; set names utf8mb4')
#parser.add_argument('--postsql', dest="POSTSQL", help='每个.sql文件结尾部分, 比如 set sql_log_bin=on;')
#parser.add_argument('--mysqlbin', dest="MYSQLBIN", default="mysql -c ", help='导入时, 使用的mysql命令')
parser.add_argument('--file', dest="FILENAME", default="", help='要拆分的 mysqldump.sql 文件')
parser.add_argument('--log-file', dest="LOG_FILENAME", default="", help='日志')
parser.add_argument("files", nargs="*", help="要拆分的 mysqldump.sql 文件")
if parser.parse_args().VERSION:
print('VERSION: v0.1')
sys.exit(0)
return parser.parse_args()
#匹配表是否符合要求的
def match_table(dbname,tablename):
dtname = str(dbname) "." tablename
return True if re.search(PATTERN,dtname) else False
def log(*args):
msg = str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) " " " ".join([ str(x) for x in args ]) "n"
print(msg[:-1])
LOG_FD.write(msg)
def read_header(f):
client_version = "unknown"
server_version = "unknown"
gtid = "" # 兼容5.7
master_info = ""
header = ""
while True:
old_offset = f.tell()
data = f.readline()
if data[:17] == "-- Server version":
server_version = data.split()[-1:][0]
elif data[:14] == "-- MySQL dump ":
client_version = data.split()[5]
elif data[:21] == "-- GTID state at the ":
tdata = ""
f.seek(old_offset,0)
while True:
_data = f.readline()
tdata = _data
if _data[-2:] == ";n":
break
gtid = tdata
elif data[:21] == "-- Position to start ":
tdata = ""
f.seek(old_offset,0)
while True:
_data = f.readline()
tdata = _data
if _data[-2:] == ";n":
break
master_info = tdata
elif data[:3] == "/*!" or data[:4] == "SET ":
header = data
elif data[:20] == "-- Current Database:":
f.seek(old_offset,0)
break
return header,client_version,server_version,gtid,master_info
def read_view(f,header,db):
view_name = "views" "/" str(db) ".sql"
with open(view_name, 'w') as fd:
fd.write(header)
fd.write("USE `" db "`;n")
while True:
old_offset = f.tell()
data = f.readline()
if data[:3] == "-- " and data[:4] != "-- F":
f.seek(old_offset,0)
break
fd.write(data)
def read_routine(f,header,db):
routine_filename = "routines" "/" str(db) ".sql"
with open(routine_filename, 'w') as fd:
fd.write(header)
data = f.readline()
data = f.readline()
fd.write("USE `" db "`;n")
fd.write(data)
while True:
old_offset = f.tell()
data = f.readline()
if data[:3] == "--n":
#f.seek(old_offset,0)
break
fd.write(data)
def read_event(f,header,db):
event_filename = "events/" db ".sql"
with open(event_filename, 'w') as fd:
fd.write(header)
data = f.readline()
data = f.readline()
fd.write("USE `" db "`;n")
fd.write(data)
while True:
data = f.readline()
fd.write(data)
if data == "DELIMITER ;n" or data[:3] == "--n":
break
#data = f.readline()
class NULLPASS(object):
def __init__(self,*args):
pass
def write(self,*args):
pass
def read_table(f,header,db):
data = f.readline()
table_name = re.compile("`(. )`").findall(data)[0]
data = "--n" f.readline()
filename = "dbs/" db "/" table_name ".sql"
with open(filename,'w') as fd:
if not match_table(db,table_name):
#log(db "." table_name " NOT MATCH " PATTERN " SKIP IT!")
fd = NULLPASS()
fd.write(header)
fd.write(data)
fd.write("USE `" db "`;")
#读表结构
while True:
data = f.readline()
fd.write(data)
if data == "--n":
break
#读数据
old_offset = f.tell()
data = f.readline()
if data[:26] == "-- Dumping data for table ":
data = f.readline() f.readline()
fd.write(data)
while True:
data = f.readline()
fd.write(data)
if data == "--n": #可能有触发器, 所以不能以UNLOCK TABLES;结束
break
else:
f.seek(old_offset,0)
return True
# 建目录
def mkdir_exists(dirname):
try:
os.makedirs(dirname)
except OSError as e:
if e.errno != errno.EEXIST:
raise
if __name__ == '__main__':
START_TIME = time.time()
parser = _argparse()
FILTER_DATABASE = parser.DATABASE
FILTER_TABLE = parser.TABLE
PATTERN = str(FILTER_DATABASE).replace("*",".*") "." str(FILTER_TABLE).replace("*",".*")
filelist = []
for pattern in parser.files:
filelist = glob.glob(pattern)
fileset = filelist
FILENAME = parser.FILENAME if parser.FILENAME != "" and os.path.exists(parser.FILENAME) else ""
FILENAME = fileset[0] if len(fileset) >= 1 and FILENAME == "" and os.path.exists(fileset[0]) else ""
if FILENAME == "":
print('At least one binlog file')
sys.exit(1)
FILENAME = os.path.abspath(FILENAME) # 设置为绝对路径, 因为后面要切换rootdir
LOG_FILENAME = "SplitMysqlDumpSQL.log" if parser.LOG_FILENAME == "" else parser.LOG_FILENAME
LOG_FILENAME = os.path.abspath(LOG_FILENAME)
OUTPUT_DIR = parser.OUTPUT_DIR if parser.OUTPUT_DIR != "" else "splitByddcw_" str(datetime.datetime.now().strftime("%Y%m%d_%H%M%S"))
OUTPUT_DIR = os.path.abspath(OUTPUT_DIR)
#print("READ FILENAME: " FILENAME " OUTPUT_DIR: " OUTPUT_DIR)
mkdir_exists(OUTPUT_DIR)
# 不检查空间是否足够了(lazy), 要自行检查(要求1.1倍.sql文件大小)
# 创建相关目录
os.chdir(OUTPUT_DIR) # 切换工作目录, 懒得去拼接目录了....
mkdir_exists("dbs") #库表信息
mkdir_exists("events") #event
#mkdir_exists("triggers") #触发器
mkdir_exists("routines") #存储过程和函数
mkdir_exists("views") #单独的view(Final view structure) 不含Temporary table structure for view
f = open(FILENAME,'r')
LOG_FD = open(LOG_FILENAME,'a')
CREATE_DB_FD = open('dbs/create.sql','w') #建库语句
FILE_HEADER,client_version,server_version,gtid_info,master_info = read_header(f)
CREATE_DB_FD.write(FILE_HEADER)
CREATE_DB_FD.write("n")
FILE_HEADER = "-- AUTO SPLIT MYSQLDUMP FILE BY DDCW @https://github.com/ddcwn" FILE_HEADER "n" parser.PRESQL "n"
_msg = "CLIENT_VERSION: " client_version " SERVER_VERSION: " server_version "FILE_HEADER:n" FILE_HEADER
log(_msg)
# 写change master
if master_info != "":
_master_file = "dbs/master_info.txt"
with open(_master_file, 'w') as _mf:
_mf.write(master_info)
# 读库表信息
CURRENT_DB = ""
TABLE_COUNT = 0
WARNING_COUNT = 0
while True:
old_offset = f.tell()
data = f.readline()
if data == "": #读完了.
break
elif data[:20] == "-- Current Database:":
CURRENT_DB = re.compile("`(. )`").findall(data)[0]
_dirname = "dbs/" str(CURRENT_DB)
mkdir_exists(_dirname)
elif data[:16] == "CREATE DATABASE ":
CREATE_DB_FD.write(data)
#CREATE TABLE, INSERT, CREATE TRIGGER, CREATE VIEW
elif data[:28] == "-- Table structure for table" or data[:36] == "-- Temporary view structure for view" or data[:37] == "-- Temporary table structure for view":
TABLE_COUNT = 1
f.seek(old_offset,0)
table_name = re.compile("`(. )`").findall(data)[0]
log("READ TABLE FOR " CURRENT_DB "." table_name,'BEGIN')
_st = time.time()
read_table(f,FILE_HEADER,CURRENT_DB)
_et = time.time()
log("READ TABLE FOR " CURRENT_DB "." table_name,"FINISH.","COST TIME: " str(round((_et-_st),2)) " seconds")
#READ EVENT
elif data[:31] == "-- Dumping events for database ":
CURRENT_DB = re.compile("'(. )'").findall(data)[0]
f.seek(old_offset,0)
log("READ EVENT FOR DATABASE " CURRENT_DB " BEGIN")
_st = time.time()
read_event(f,FILE_HEADER,CURRENT_DB)
_et = time.time()
log("READ EVENT FOR DATABASE " CURRENT_DB " FINISH","COST TIME: " str(round((_et-_st),2)) " seconds")
#READ ROUTINE
elif data[:33] == "-- Dumping routines for database ":
CURRENT_DB = re.compile("'(. )'").findall(data)[0]
f.seek(old_offset,0)
log("READ ROUTINE FOR DATABASE " CURRENT_DB " BEGIN")
_st = time.time()
read_routine(f,FILE_HEADER,CURRENT_DB)
_et = time.time()
log("READ ROUTINE FOR DATABASE " CURRENT_DB " FINISH","COST TIME: " str(round((_et-_st),2)) " seconds")
#READ VIEW
elif data[:33] == "-- Final view structure for view ":
f.seek(old_offset,0)
log("READ VIEW FOR DATABASE " CURRENT_DB " BEGIN")
_st = time.time()
read_view(f,FILE_HEADER,CURRENT_DB)
_et = time.time()
log("READ VIEW FOR DATABASE " CURRENT_DB " FINISH","COST TIME: " str(round((_et-_st),2)) " seconds")
elif data[:21] == "-- GTID state at the ":
tdata = ""
f.seek(old_offset,0)
log("READ GTID PURGED")
while True:
_data = f.readline()
tdata = _data
if _data[-2:] == ";n":
break
gtid_info = tdata
elif data[:26] == "-- Dumping data for table ": #系统库, 主要是统计信息
# ONLY FOR MYSQL 8.x
_filename = "dbs/special.sql"
with open(_filename,'a') as fd:
fd.write(FILE_HEADER)
fd.write("n")
fd.write(data)
fd.write("USE `" CURRENT_DB "`;n")
while True:
_old_offset = f.tell()
data = f.readline()
if data[:3] == "-- ":
f.seek(_old_offset,0)
break
fd.write(data)
elif data[:20] == "-- Dump completed on":
log("READ ALL FINISH")
break
elif data[:4] == "USE " or data == "n" or data == "--n":
pass
elif data[:3] == "/*!" or data[:4] == "SET ": #跳过结尾的注释和一些set
pass
else:
WARNING_COUNT = 1
log("========== SKIP ==========n" data)
# 写 gtid信息, 5.7 在结尾, 所以后写
if gtid_info != "":
_gtid_file = "dbs/gtid.sql"
with open(_gtid_file, 'w') as _mf:
_mf.write(FILE_HEADER)
_mf.write("n")
_mf.write(gtid_info)
STOP_TIME = time.time()
log("FILENAME :",FILENAME)
log("OUTPUT_DIR :",OUTPUT_DIR)
log("LOG_FILENAME :",LOG_FILENAME)
log("COST TIME :",str(round((STOP_TIME - START_TIME),2)), "SECONDS. TABLES COUNT:",TABLE_COUNT)
log("WARNING :",WARNING_COUNT)
f.close()
LOG_FD.close()
CREATE_DB_FD.close()
并发导入脚本
代码语言:shell复制#!/usr/bin/env bash
#write by ddcw @https://github.com/ddcw
#可以修改的参数
CONCURRENCY=4 #并发数量
SLEEP_INTERNAL="0.01" #每隔 SLEEP_INTERNAL 秒, 检查一次 是否有导入完成的进程
IGNORE_GTID_CHECK="1" #如果为1, 表示不检查GTID是否存在
IGNORE_FUCNTION_CREATOR="1" #如果为1, 表示不检查log_bin_trust_function_creators是否为1
IGNORE_DISABLE_ENGINE="0" #如果为1, 表示不检查disabled_storage_engines是否含MyISAM
LOGFILE="import.log" #导入日志. 和控制台输出的内容一样
DIRNAME=$1 #已经拆分了的 mysqldump 导出的SQL文件目录
#MYSQL连接信息
MYSQL_COM="mysql -h127.0.0.1 -p123456 -P3314 -uroot "
touch ${LOGFILE}
#不可修改参数
MYSQL_VERSION=() #MYSQL SERVER版本信息
export LANG="en_US.UTF-8" #设置LANG
POSTMSG="" #结尾时打印的信息, 先保存起来
APP_TABLE_TIME_COUNT="0" #导入业务表的时间
DB_COUNT=0 #导入的库计数, 不含系统库
FILES_COUNT=0 #导入的文件计数. 只考虑业务表
FAIL_COUNT_1=`grep ' FAILED$' ${LOGFILE} | wc -l` #记录之前日志的报错信息
ERROR_COUNT=0 #error计数
exit1(){
echo "`date " %Y-%m-%d %H:%M:%S"` ${@}"
exit 1
}
[ "${DIRNAME}" == "" ] && exit1 "sh $0 splitByddcw_XXXXXX"
[ -d ${DIRNAME} ] || exit1 "${DIRNAME} is not exists"
DIRNAME=${DIRNAME%/} #格式化目录变量, 去掉结尾的/ 方便目录拼接
log(){
_msg="$(date ' %Y-%m-%d %H:%M:%S') $@"
echo -e ${_msg} # | tee -a ${LOGFILE}
echo -e ${_msg} >> ${LOGFILE}
}
import_sql(){
file=$1
ts=`date %s`
log "IMPORT ${file} BEGIN..."
#${MYSQL_COM} < $file >>${LOGFILE} 2>&1 && log "IMPORT ${file} SUCCESS." || log "IMPORT ${file} FAILED"
if ${MYSQL_COM} < $file >>${LOGFILE} 2>&1;then
log "IMPORT ${file} SUCCESS."
else
log "IMPORT ${file} FAILED"
((ERROR_COUNT ))
return 1
fi
te=`date %s`
log "IMPORT ${file} FINISH. cost $[ ${te} - ${ts} ] seconds"
return 0
}
CHECK_CONN(){
if ${MYSQL_COM} -e "select 1 1" >>${LOGFILE} 2>&1; then
log "CONNCT SUCCESS."
else
exit1 "CONNCT FAILD. Please read log (${LOGFILE}) FAILED!"
fi
log "CHECK_CONN OK"
}
CHECK_GTID(){
current_gtid=`${MYSQL_COM} -NB -e "select @@GLOBAL.GTID_EXECUTED" 2>>${LOGFILE}`
if [ "${current_gtid}" != "" ] && [ "${IGNORE_GTID_CHECK}" != "1" ];then
exit1 "CURRENT GTID: ${current_gtid} FAILED!"
fi
log "CHECK_GTID OK"
}
CHECK_VERSION(){
IFS='.' read -r -a MYSQL_VERSION <<< `${MYSQL_COM} -NB -e "select @@version" 2>>${LOGFILE}`
if [ "${MYSQL_VERSION[0]}" == "5" ] || [ "${MYSQL_VERSION[0]}" == "8" ] ;then
log "MYSQL VERSION: ${MYSQL_VERSION[@]}"
else
exit1 "ONLY FOR MYSQL 5/8, CURRENT MYSQL VERSION:${MYSQL_VERSION[@]} FAILED!"
fi
log "CHECK_VERSION OK"
}
CHECK_VARIABELS(){
disabled_storage_engines=`${MYSQL_COM} -NB -e "select @@disabled_storage_engines" 2>>${LOGFILE}`
if echo "${disabled_storage_engines}" | grep -i "MyISAM" >/dev/null;then
if [ "${IGNORE_DISABLE_ENGINE}" != "1" ];then
exit1 "disabled_storage_engines=${disabled_storage_engines} FAILED"
fi
else
log "disabled_storage_engines=${disabled_storage_engines} OK"
fi
log_bin_trust_function_creators=`${MYSQL_COM} -NB -e "select @@log_bin_trust_function_creators" 2>>${LOGFILE}`
if [ "${log_bin_trust_function_creators}" == "0" ] && [ "${IGNORE_FUCNTION_CREATOR}" != "1" ];then
exit1 "log_bin_trust_function_creators=${log_bin_trust_function_creators} FAILED"
else
log "log_bin_trust_function_creators=${log_bin_trust_function_creators} OK"
fi
log "CHECK_VARIABELS OK"
}
IMPORT_CHANGE_MASTER(){
echo ""
}
IMPORT_GTID(){
if [ -f ${DIRNAME}/dbs/gtid.sql ];then
log "nn#################### IMPORT GTID #####################"
else
log "SKIP IMPORT GTID"
return 1
fi
import_sql ${DIRNAME}/dbs/gtid.sql || POSTMSG="${POSTMSG}n ${DIRNAME}/dbs/gtid.sql IMPORT FAILED"
}
IMPORT_DATABASE_DDL(){
if [ -f ${DIRNAME}/dbs/create.sql ];then
log "nn#################### IMPORT CREATE DATABASE DDL #####################"
else
log "SKIP IMPORT DATABASE DDL"
return 1
fi
import_sql ${DIRNAME}/dbs/create.sql || POSTMSG="${POSTMSG}n ${DIRNAME}/dbs/create.sql IMPORT FAILED"
}
IMPORT_MYSQL_DATABASE(){
if [ -d ${DIRNAME}/dbs/mysql ];then
log "nn#################### IMPORT MYSQL DB #####################"
else
log "SKIP IMPORT mysql DATABASE"
return 1
fi
for filename in ${DIRNAME}/dbs/mysql/*.sql; do
import_sql ${filename} || POSTMSG="${POSTMSG}n ${filename} IMPORT FAILED"
done
}
IMPORT_MYSQL_STATICS(){
if [ -f ${DIRNAME}/dbs/special.sql ];then
log "nn#################### IMPORT MYSQL DB STATICS (ONLY FOR mysql 8.x) #####################"
else
log "SKIP IMPORT mysql statics. MYSQL_VERSION: ${MYSQL_VERSION[@]}"
return 1
fi
import_sql ${DIRNAME}/dbs/special.sql || POSTMSG="${POSTMSG}n ${DIRNAME}/dbs/special.sql IMPORT FAILED"
}
#并发导入业务表
IMPORT_APP_TABLE(){
log "nn#################### IMPORT APP TABLE&DATA #####################"
T_START=`date %s`
PIDS=()
for dirname in `find ${DIRNAME}/dbs -type d`;do
postname=`echo ${dirname} | awk -F '/' '{print $NF}'`
if [ "${postname}" == "mysql" ] || [ "${postname}" == "" ] || [ "${DIRNAME}/dbs" == "${dirname}" ];then
continue #跳过mysql库
fi
log "IMPORT DATABSE FOR ${postname} BEGIN"
#PIDS=()
db_start_time=`date %s`
for filename in `find ${dirname} -name '*.sql'`;do
while [ ${#PIDS[@]} -ge ${CONCURRENCY} ];do
sleep ${SLEEP_INTERNAL}
for i in "${!PIDS[@]}";do
if ! kill -0 "${PIDS[$i]}" 2>/dev/null;then
unset 'PIDS[i]' #这个导入进程跑完了. 就从数组里面移除
fi
done
PIDS=("${PIDS[@]}") #重新初始化一下PIDS
done
import_sql "${filename}" & #放后台导入, 也就是开并发
PIDS =($!)
((FILES_COUNT ))
done
#wait #等待这个库的所有表导完. 不然进程数可能超过设置的大小. 也就是去掉之后性能能提升一部分
db_stop_time=`date %s`
((DB_COUNT ))
log "IMPORT DATABSE FOR ${postname} FINISH. COST_TIME: $[ ${db_stop_time} - ${db_start_time} ] SECONDS."
done
wait #等待所有后台进程跑完
T_STOP=`date %s`
APP_TABLE_TIME_COUNT="$[ ${T_STOP} - ${T_START} ]"
}
IMPORT_APP_EVENT(){
log "nn#################### IMPORT APP EVENTS #####################"
for filename in `find ${DIRNAME}/events -name '*.sql'`;do
import_sql ${filename}
done
}
IMPORT_APP_ROUTINE(){
log "nn#################### IMPORT APP ROUTINES #####################"
for filename in `find ${DIRNAME}/routines -name '*.sql'`;do
import_sql ${filename}
done
}
IMPORT_APP_VIEW(){
log "nn#################### IMPORT APP VIEWS #####################"
for filename in `find ${DIRNAME}/views -name '*.sql'`;do
import_sql ${filename}
done
}
_START_DT=`date %s`
echo -e "nn********** BEGIN CHECK **************"
#检查能否连接
CHECK_CONN
#检查GTID是否存在
CHECK_GTID
#检查版本
CHECK_VERSION
#检查变量disabled_storage_engines log_bin_trust_function_creators
CHECK_VARIABELS
echo "********** CHECK FINISH **************nn"
#数据导入
echo "nn********** BEGIN IMPORT DATA **************"
#导入change master语句. 默认注释, 需要人工启用
IMPORT_CHANGE_MASTER
#导入GTID(8.0.x)
if [ "${MYSQL_VERSION[0]}" == "8" ];then
IMPORT_GTID
fi
#导入数据库DDL
IMPORT_DATABASE_DDL
#导入系统库表
IMPORT_MYSQL_DATABASE
#导入统计信息
IMPORT_MYSQL_STATICS
#业务表(并发)(可能含触发器)
IMPORT_APP_TABLE
#导入EVENT
IMPORT_APP_EVENT
#业务存储过程和函数
IMPORT_APP_ROUTINE
#业务视图
IMPORT_APP_VIEW
#导入GTID(5.7.x)
if [ "${MYSQL_VERSION[0]}" == "5" ];then
IMPORT_GTID
fi
_STOP_DT=`date %s`
FAIL_COUNT_2=`grep ' FAILED$' ${LOGFILE} | wc -l`
log "APP DATABASE COUNT: ${DB_COUNT} APP TABLE COUNT: ${FILES_COUNT} APP DATA IMPORT COST_TIME: ${APP_TABLE_TIME_COUNT} SECONDS."
log "IMPORT ALL FINISH. TOTAL COST TIME $[ ${_STOP_DT} - ${_START_DT} ] SECONDS. FAILED COUNT: $[ ${FAIL_COUNT_2} - ${FAIL_COUNT_1} ]"
log "ERROR COUNT: ${ERROR_COUNT}"
if [ "${POSTMSG}" != "" ];then
log "${POSTMSG}"
fi
#统计信息导入失败
if echo "${POST}" | grep -E "innodb_index_stats.sql|innodb_table_stats.sql" >/dev/null 2>&1 ;then
log "统计信息导入失败, 原因可能为 5.7 --> 8.0 建议如下:nt 1. 手动 导入统计信息表(删除DROP TABLE和CREATE TABLE之后在导入)nt 2. 手动使用 ANALYZE TABLE 去收集统计信息"
fi