1 快照导出、导入的使用场景
1.1 手动导出数据的场景
预设场景 假设系统中有两张大表在不停的写入数据,现在的需求是把两张大表做一个逻辑备份,要求两张表的数据必须一致。
如何定义一致?假设一个事务在两张表中各插入一行,那么导出的两张表中,这两行数据要么都在,要么都不在
简单方案 最简单的方案是直接启动一个事务,设定为RR级别,然后在事务中串行导出这两张表的数据即可。 (或者使用pg_dump不开并行,开并行后面讲)
问题 如果表比较多,且单表比较大时,串行做会拉长逻辑备份时间,且RR快照在导出的全程一直存在,会挡住vacuum造成表膨胀、锁等待等等各种长事务问题,所以降低备份时间是有必要的。
并发方案 多表最容易想到的并发导出方案就是起多个进程,每个进程各导出一张表(或者更细力度的切分)。
但多进程存在问题,多个客户端无法拿到同一个一致性位点,也就是拿到的快照都不一样。每个客户端导出的数据汇总后会不一致。
Postgresql提供的快照导出、导入功能就是为了这个场景而设计的:具体使用中,第一个会话导出快照,后续几个会话导入快照,那么大家就拿到了同一个一致性位点,导出数据就可以保证一致性了!
使用案例
快照导出函数:
pg_export_snapshot () → text _ Saves the transaction’s current snapshot and returns a text string identifying the snapshot. This string must be passed (outside the database) to clients that want to import the snapshot. The snapshot is available for import only until the end of the transaction that exported it. _ A transaction can export more than one snapshot, if needed. Note that doing so is only useful in READ COMMITTED transactions, since in REPEATABLE READ and higher isolation levels, transactions use the same snapshot throughout their lifetime. Once a transaction has exported any snapshots, it cannot be prepared with PREPARE TRANSACTION.
第一步:创建测试表、灌入数据
代码语言:javascript复制drop table t1 cascade;
drop table t2 cascade;
create table t1(
c1 serial primary key,
c2 text default lpad('',500,md5(random()::text)),
c3 timestamp default clock_timestamp()
);
create table t2(like t1 including all);
insert into t1 select generate_series(1,1e5); -- 10w
insert into t2 select generate_series(1,1e5); -- 10w
第二步:【会话一】启动RR事务,导出快快照,并查看t1表数据量
代码语言:javascript复制postgres=*# BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
postgres=*# SELECT pg_export_snapshot();
pg_export_snapshot
---------------------
00000003-0001512F-1
postgres=*# SELECT COUNT(*) FROM t1;
count
--------
100000
第三步:启动干扰事务,向t1、t2表中插入数据,并提交
代码语言:javascript复制postgres=*# insert into t2 select generate_series(1e5 1,2e5);
INSERT 0 100000
postgres=*# insert into t1 select generate_series(1e5 1,2e5);
INSERT 0 100000
postgres=*# select count(*) from t1;
count
--------
200000
(1 row)
postgres=*# select count(*) from t2;
count
--------
200000
(1 row)
第四步:【会话二】启动RR事务,并导入【会话一】的快照,看到与会话一一致的视图。
代码语言:javascript复制postgres=# BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
BEGIN
postgres=*# SET TRANSACTION SNAPSHOT '00000003-0001512F-1';
postgres=*# select count(*) from t1;
count
--------
100000
注意:SET TRANSACTION SNAPSHOT
语句必须在事务开始就设置,否则会报错。
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
BEGIN
postgres=*# select count(*) from t1;
count
--------
200000
(1 row)
postgres=*# SET TRANSACTION SNAPSHOT '00000003-0001512F-1';
ERROR: SET TRANSACTION SNAPSHOT must be called before any query
1.2 pg_dump并发场景
在使用pg_dump并发导出数据时:-j, --jobs=NUM use this many parallel jobs to dump
$ pg_dump --help
...
...
General options:
-f, --file=FILENAME output file or directory name
-F, --format=c|d|t|p output file format (custom, directory, tar,
plain text (default))
-j, --jobs=NUM use this many parallel jobs to dump
-v, --verbose verbose mode
-V, --version output version information, then exit
-Z, --compress=METHOD[:LEVEL]
compress as specified
--lock-wait-timeout=TIMEOUT fail after waiting TIMEOUT for a table lock
--no-sync do not wait for changes to be written safely to disk
-?, --help show this help, then exit
工具会自动执行pg_export_snapshot函数拿到一致性快照,并发运行时,由leader分发给子进程,多进程配置同一个快照做并发数据导出,效果和上面的用例完全相同。
代码语言:javascript复制get_synchronized_snapshot(...)
char *query = "SELECT pg_catalog.pg_export_snapshot()";
...
setup_connection(...)
...
...
else if (AH->numWorkers > 1)
AH->sync_snapshot_id = get_synchronized_snapshot(AH);
2 快照导出、导入的原理分析
快照导出的动作由ExportSnapshot函数完成,整体逻辑非常简单,需要注意的就是两个全局变量的记录,后面看到时知道是做什么用的:
- RegisteredSnapshots:用来找全局最小xmin。
- exportedSnapshots:快照跟随事务释放时,用来找到这些导出的快照,然后释放。
函数的逻辑可以总结为下面几步:
第一步:记录到全局变量
- 快照拷贝
- 记录到全局小顶堆RegisteredSnapshots中(小顶堆pairingheap,用来快速查找所有快照中xmin的最小值,比较函数xmin_cmp)
- 记录到全局链表exportedSnapshots中(链表,快照跟随事务清理时,会释放这些exportedSnapshots)
char *
ExportSnapshot(Snapshot snapshot)
...
...
snapshot = CopySnapshot(snapshot);
oldcxt = MemoryContextSwitchTo(TopTransactionContext);
esnap = (ExportedSnapshot *) palloc(sizeof(ExportedSnapshot));
esnap->snapfile = pstrdup(path);
esnap->snapshot = snapshot;
exportedSnapshots = lappend(exportedSnapshots, esnap);
MemoryContextSwitchTo(oldcxt);
snapshot->regd_count ;
pairingheap_add(&RegisteredSnapshots, &snapshot->ph_node);
第二步:拼接信息
拼接信息,包括xmin、xmax、xip列表。另外还有子事务信息。
代码语言:javascript复制 initStringInfo(&buf);
appendStringInfo(&buf, "vxid:%d/%un", MyProc->backendId, MyProc->lxid);
appendStringInfo(&buf, "pid:%dn", MyProcPid);
appendStringInfo(&buf, "dbid:%un", MyDatabaseId);
appendStringInfo(&buf, "iso:%dn", XactIsoLevel);
appendStringInfo(&buf, "ro:%dn", XactReadOnly);
appendStringInfo(&buf, "xmin:%un", snapshot->xmin);
appendStringInfo(&buf, "xmax:%un", snapshot->xmax);
addTopXid = (TransactionIdIsValid(topXid) &&
TransactionIdPrecedes(topXid, snapshot->xmax)) ? 1 : 0;
appendStringInfo(&buf, "xcnt:%dn", snapshot->xcnt addTopXid);
for (i = 0; i < snapshot->xcnt; i )
appendStringInfo(&buf, "xip:%un", snapshot->xip[i]);
if (addTopXid)
appendStringInfo(&buf, "xip:%un", topXid);
if (snapshot->suboverflowed ||
snapshot->subxcnt nchildren > GetMaxSnapshotSubxidCount())
appendStringInfoString(&buf, "sof:1n");
else
{
appendStringInfoString(&buf, "sof:0n");
appendStringInfo(&buf, "sxcnt:%dn", snapshot->subxcnt nchildren);
for (i = 0; i < snapshot->subxcnt; i )
appendStringInfo(&buf, "sxp:%un", snapshot->subxip[i]);
for (i = 0; i < nchildren; i )
appendStringInfo(&buf, "sxp:%un", children[i]);
}
appendStringInfo(&buf, "rec:%un", snapshot->takenDuringRecovery);
第三步:写文件
代码语言:javascript复制 snprintf(pathtmp, sizeof(pathtmp), "%s.tmp", path);
if (!(f = AllocateFile(pathtmp, PG_BINARY_W)))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not create file "%s": %m", pathtmp)));
if (fwrite(buf.data, buf.len, 1, f) != 1)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to file "%s": %m", pathtmp)));
/* no fsync() since file need not survive a system crash */
if (FreeFile(f))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to file "%s": %m", pathtmp)));
if (rename(pathtmp, path) < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not rename file "%s" to "%s": %m",
pathtmp, path)));
return pstrdup(path strlen(SNAPSHOT_EXPORT_DIR) 1);
}