背景
本篇分享创建检查点的过程和一些细节
代码语言:javascript复制CreateCheckPoint:启动检查点 <-------- 本篇涉及
|
CheckPointGuts:检查点流程入口 <-------- 本篇涉及
|
CheckPointBuffers:刷缓存页入口
|
BufferSync:刷缓存页具体函数
|
CheckpointWriteDelay:主动延迟函数
|
IsCheckpointOnSchedule:主动延迟函数的判断算法
源码
源码较长直接在代码中加分析了。
CreateCheckPoint
这里是执行检查点的入口函数。
代码语言:javascript复制/*
* Perform a checkpoint --- either during shutdown, or on-the-fly
*
* flags is a bitwise OR of the following:
* CHECKPOINT_IS_SHUTDOWN: checkpoint is for database shutdown.
* CHECKPOINT_END_OF_RECOVERY: checkpoint is for end of WAL recovery.
* CHECKPOINT_IMMEDIATE: finish the checkpoint ASAP,
* ignoring checkpoint_completion_target parameter.
* CHECKPOINT_FORCE: force a checkpoint even if no XLOG activity has occurred
* since the last one (implied by CHECKPOINT_IS_SHUTDOWN or
* CHECKPOINT_END_OF_RECOVERY).
* CHECKPOINT_FLUSH_ALL: also flush buffers of unlogged tables.
*
* Note: flags contains other bits, of interest here only for logging purposes.
* In particular note that this routine is synchronous and does not pay
* attention to CHECKPOINT_WAIT.
*
* If !shutdown then we are writing an online checkpoint. This is a very special
* kind of operation and WAL record because the checkpoint action occurs over
* a period of time yet logically occurs at just a single LSN. The logical
* position of the WAL record (redo ptr) is the same or earlier than the
* physical position. When we replay WAL we locate the checkpoint via its
* physical position then read the redo ptr and actually start replay at the
* earlier logical position. Note that we don't write *anything* to WAL at
* the logical position, so that location could be any other kind of WAL record.
* All of this mechanism allows us to continue working while we checkpoint.
* As a result, timing of actions is critical here and be careful to note that
* this function will likely take minutes to execute on a busy system.
*/
- 非shutdown的检查点比较特殊,在逻辑上看是一个点,可以对应某一个lsn,但实际情况上却是一段时间。
- redo的时候从物理位点开始往前找逻辑位点,从逻辑位点开始重做。
--->------logical lsn---------->------------physical lsn---------->--------
--->------|--------------do check point----------------|---------->--------
void
CreateCheckPoint(int flags)
{
bool shutdown;
CheckPoint checkPoint;
XLogRecPtr recptr;
XLogCtlInsert *Insert = &XLogCtl->Insert;
uint32 freespace;
XLogRecPtr PriorRedoPtr;
XLogRecPtr curInsert;
XLogRecPtr last_important_lsn;
VirtualTransactionId *vxids;
int nvxids;
/*
* An end-of-recovery checkpoint is really a shutdown checkpoint, just
* issued at a different time.
*/
if (flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY))
shutdown = true;
else
shutdown = false;
/* sanity check */
if (RecoveryInProgress() && (flags & CHECKPOINT_END_OF_RECOVERY) == 0)
elog(ERROR, "can't create a checkpoint during recovery");
/*
* Initialize InitXLogInsert working areas before entering the critical
* section. Normally, this is done by the first call to
* RecoveryInProgress() or LocalSetXLogInsertAllowed(), but when creating
* an end-of-recovery checkpoint, the LocalSetXLogInsertAllowed call is
* done below in a critical section, and InitXLogInsert cannot be called
* in a critical section.
*/
InitXLogInsert();
/*
* Acquire CheckpointLock to ensure only one checkpoint happens at a time.
* (This is just pro forma, since in the present system structure there is
* only one process that is allowed to issue checkpoints at any given
* time.)
*/
LWLockAcquire(CheckpointLock, LW_EXCLUSIVE);
检查点一把重锁CheckpointLock,排他LW_EXCLUSIVE,避免检查点并发。
代码语言:javascript复制 /*
* Prepare to accumulate statistics.
*
* Note: because it is possible for log_checkpoints to change while a
* checkpoint proceeds, we always accumulate stats, even if
* log_checkpoints is currently off.
*/
MemSet(&CheckpointStats, 0, sizeof(CheckpointStats));
CheckpointStats.ckpt_start_t = GetCurrentTimestamp();
/*
* Use a critical section to force system panic if we have trouble.
*/
START_CRIT_SECTION();
if (shutdown)
{
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
ControlFile->state = DB_SHUTDOWNING;
ControlFile->time = (pg_time_t) time(NULL);
UpdateControlFile();
LWLockRelease(ControlFileLock);
}
停机检查点先要更新控制文件,更新ControlFile的状态和时间。
代码语言:javascript复制 /*
* Let smgr prepare for checkpoint; this has to happen before we determine
* the REDO pointer. Note that smgr must not do anything that'd have to
* be undone if we decide no checkpoint is needed.
*/
smgrpreckpt();
/* Begin filling in the checkpoint WAL record */
MemSet(&checkPoint, 0, sizeof(checkPoint));
checkPoint.time = (pg_time_t) time(NULL);
/*
* For Hot Standby, derive the oldestActiveXid before we fix the redo
* pointer. This allows us to begin accumulating changes to assemble our
* starting snapshot of locks and transactions.
*/
if (!shutdown && XLogStandbyInfoActive())
checkPoint.oldestActiveXid = GetOldestActiveTransactionId();
else
checkPoint.oldestActiveXid = InvalidTransactionId;
/*
* Get location of last important record before acquiring insert locks (as
* GetLastImportantRecPtr() also locks WAL locks).
*/
last_important_lsn = GetLastImportantRecPtr();
/*
* We must block concurrent insertions while examining insert state to
* determine the checkpoint REDO pointer.
*/
WALInsertLockAcquireExclusive();
curInsert = XLogBytePosToRecPtr(Insert->CurrBytePos);
获取检查点的位置,详见这篇。
获取位置之前要加锁避免位置推进。
代码语言:javascript复制 /*
* If this isn't a shutdown or forced checkpoint, and if there has been no
* WAL activity requiring a checkpoint, skip it. The idea here is to
* avoid inserting duplicate checkpoints when the system is idle.
*/
if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
CHECKPOINT_FORCE)) == 0)
{
if (last_important_lsn == ControlFile->checkPoint)
{
WALInsertLockRelease();
LWLockRelease(CheckpointLock);
END_CRIT_SECTION();
ereport(DEBUG1,
(errmsg("checkpoint skipped because system is idle")));
return;
}
}
/*
* An end-of-recovery checkpoint is created before anyone is allowed to
* write WAL. To allow us to write the checkpoint record, temporarily
* enable XLogInsertAllowed. (This also ensures ThisTimeLineID is
* initialized, which we need here and in AdvanceXLInsertBuffer.)
*/
if (flags & CHECKPOINT_END_OF_RECOVERY)
LocalSetXLogInsertAllowed();
checkPoint.ThisTimeLineID = ThisTimeLineID;
if (flags & CHECKPOINT_END_OF_RECOVERY)
checkPoint.PrevTimeLineID = XLogCtl->PrevTimeLineID;
else
checkPoint.PrevTimeLineID = ThisTimeLineID;
checkPoint.fullPageWrites = Insert->fullPageWrites;
恢复执行完的最后一个创建检查点的动作,创建物理检查点,首先前面加锁了现在应该无法写入xlog。这里首先打开xlog写入checkpoint物理位点。
代码语言:javascript复制 /*
* Compute new REDO record ptr = location of next XLOG record.
*
* NB: this is NOT necessarily where the checkpoint record itself will be,
* since other backends may insert more XLOG records while we're off doing
* the buffer flush work. Those XLOG records are logically after the
* checkpoint, even though physically before it. Got that?
*/
freespace = INSERT_FREESPACE(curInsert);
if (freespace == 0)
{
if (curInsert % XLogSegSize == 0)
curInsert = SizeOfXLogLongPHD;
else
curInsert = SizeOfXLogShortPHD;
}
checkPoint.redo = curInsert;
获取当前位置后,向前推进一个header的位置。
#define SizeOfXLogLongPHD MAXALIGN(sizeof(XLogLongPageHeaderData))
代码语言:javascript复制typedef struct XLogLongPageHeaderData
{
XLogPageHeaderData std; /* standard header fields */
uint64 xlp_sysid; /* system identifier from pg_control */
uint32 xlp_seg_size; /* just as a cross-check */
uint32 xlp_xlog_blcksz; /* just as a cross-check */
} XLogLongPageHeaderData;
代码语言:javascript复制 /*
* Here we update the shared RedoRecPtr for future XLogInsert calls; this
* must be done while holding all the insertion locks.
*
* Note: if we fail to complete the checkpoint, RedoRecPtr will be left
* pointing past where it really needs to point. This is okay; the only
* consequence is that XLogInsert might back up whole buffers that it
* didn't really need to. We can't postpone advancing RedoRecPtr because
* XLogInserts that happen while we are dumping buffers must assume that
* their buffer changes are not included in the checkpoint.
*/
RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
/*
* Now we can release the WAL insertion locks, allowing other xacts to
* proceed while we are flushing disk buffers.
*/
WALInsertLockRelease();
释放xlog大锁,后面的xlog都在chkl逻辑位点后了,但是可能在chk物理位点前。
代码语言:javascript复制 /* Update the info_lck-protected copy of RedoRecPtr as well */
SpinLockAcquire(&XLogCtl->info_lck);
XLogCtl->RedoRecPtr = checkPoint.redo;
SpinLockRelease(&XLogCtl->info_lck);
/*
* If enabled, log checkpoint start. We postpone this until now so as not
* to log anything if we decided to skip the checkpoint.
*/
if (log_checkpoints)
LogCheckpointStart(flags, false);
TRACE_POSTGRESQL_CHECKPOINT_START(flags);
/*
* Get the other info we need for the checkpoint record.
*
* We don't need to save oldestClogXid in the checkpoint, it only matters
* for the short period in which clog is being truncated, and if we crash
* during that we'll redo the clog truncation and fix up oldestClogXid
* there.
*/
LWLockAcquire(XidGenLock, LW_SHARED);
checkPoint.nextXid = ShmemVariableCache->nextXid;
checkPoint.oldestXid = ShmemVariableCache->oldestXid;
checkPoint.oldestXidDB = ShmemVariableCache->oldestXidDB;
LWLockRelease(XidGenLock);
LWLockAcquire(CommitTsLock, LW_SHARED);
checkPoint.oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
checkPoint.newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
LWLockRelease(CommitTsLock);
/* Increase XID epoch if we've wrapped around since last checkpoint */
checkPoint.nextXidEpoch = ControlFile->checkPointCopy.nextXidEpoch;
if (checkPoint.nextXid < ControlFile->checkPointCopy.nextXid)
checkPoint.nextXidEpoch ;
LWLockAcquire(OidGenLock, LW_SHARED);
checkPoint.nextOid = ShmemVariableCache->nextOid;
if (!shutdown)
checkPoint.nextOid = ShmemVariableCache->oidCount;
LWLockRelease(OidGenLock);
MultiXactGetCheckptMulti(shutdown,
&checkPoint.nextMulti,
&checkPoint.nextMultiOffset,
&checkPoint.oldestMulti,
&checkPoint.oldestMultiDB);
/*
* Having constructed the checkpoint record, ensure all shmem disk buffers
* and commit-log buffers are flushed to disk.
*
* This I/O could fail for various reasons. If so, we will fail to
* complete the checkpoint, but there is no reason to force a system
* panic. Accordingly, exit critical section while doing it.
*/
END_CRIT_SECTION();
/*
* In some cases there are groups of actions that must all occur on one
* side or the other of a checkpoint record. Before flushing the
* checkpoint record we must explicitly wait for any backend currently
* performing those groups of actions.
*
* One example is end of transaction, so we must wait for any transactions
* that are currently in commit critical sections. If an xact inserted
* its commit record into XLOG just before the REDO point, then a crash
* restart from the REDO point would not replay that record, which means
* that our flushing had better include the xact's update of pg_xact. So
* we wait till he's out of his commit critical section before proceeding.
* See notes in RecordTransactionCommit().
*
* Because we've already released the insertion locks, this test is a bit
* fuzzy: it is possible that we will wait for xacts we didn't really need
* to wait for. But the delay should be short and it seems better to make
* checkpoint take a bit longer than to hold off insertions longer than
* necessary. (In fact, the whole reason we have this issue is that xact.c
* does commit record XLOG insertion and clog update as two separate steps
* protected by different locks, but again that seems best on grounds of
* minimizing lock contention.)
* A transaction that has not yet set delayChkpt when we look cannot be at
* risk, since he's not inserted his commit record yet; and one that's
* already cleared it is not at risk either, since he's done fixing clog
* and we will correctly flush the update below. So we cannot miss any
* xacts we need to wait for.
*/
注意上面注释中比较重要的一点,xlog和clog是分两个阶段去做的,XLOG insertion and clog update as two separate steps,如果xlog在redo之后,但是clog没来得及update,redo的时候事务会被认为未提交不会重做。
代码语言:javascript复制 vxids = GetVirtualXIDsDelayingChkpt(&nvxids);
if (nvxids > 0)
{
do
{
pg_usleep(10000L); /* wait for 10 msec */
} while (HaveVirtualXIDsDelayingChkpt(vxids, nvxids));
}
pfree(vxids);
延后10ms等待所有 critical actions 做完,保证clog刷完。
代码语言:javascript复制 CheckPointGuts(checkPoint.redo, flags);
开始执行checkpoint的核心工作,刷盘。
代码语言:javascript复制 /*
* Take a snapshot of running transactions and write this to WAL. This
* allows us to reconstruct the state of running transactions during
* archive recovery, if required. Skip, if this info disabled.
*
* If we are shutting down, or Startup process is completing crash
* recovery we don't need to write running xact data.
*/
if (!shutdown && XLogStandbyInfoActive())
LogStandbySnapshot();
START_CRIT_SECTION();
/*
* Now insert the checkpoint record into XLOG.
*/
XLogBeginInsert();
XLogRegisterData((char *) (&checkPoint), sizeof(checkPoint));
recptr = XLogInsert(RM_XLOG_ID,
shutdown ? XLOG_CHECKPOINT_SHUTDOWN :
XLOG_CHECKPOINT_ONLINE);
XLogFlush(recptr);
刷完盘,把checkpoint结构插入xlog,记录刷盘完成。
代码语言:javascript复制 /*
* We mustn't write any new WAL after a shutdown checkpoint, or it will be
* overwritten at next startup. No-one should even try, this just allows
* sanity-checking. In the case of an end-of-recovery checkpoint, we want
* to just temporarily disable writing until the system has exited
* recovery.
*/
if (shutdown)
{
if (flags & CHECKPOINT_END_OF_RECOVERY)
LocalXLogInsertAllowed = -1; /* return to "check" state */
else
LocalXLogInsertAllowed = 0; /* never again write WAL */
}
/*
* We now have ProcLastRecPtr = start of actual checkpoint record, recptr
* = end of actual checkpoint record.
*/
if (shutdown && checkPoint.redo != ProcLastRecPtr)
ereport(PANIC,
(errmsg("concurrent write-ahead log activity while database system is shutting down")));
/*
* Remember the prior checkpoint's redo pointer, used later to determine
* the point where the log can be truncated.
*/
PriorRedoPtr = ControlFile->checkPointCopy.redo;
/*
* Update the control file.
*/
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
if (shutdown)
ControlFile->state = DB_SHUTDOWNED;
ControlFile->prevCheckPoint = ControlFile->checkPoint;
ControlFile->checkPoint = ProcLastRecPtr;
ControlFile->checkPointCopy = checkPoint;
ControlFile->time = (pg_time_t) time(NULL);
/* crash recovery should always recover to the end of WAL */
ControlFile->minRecoveryPoint = InvalidXLogRecPtr;
ControlFile->minRecoveryPointTLI = 0;
更新控制文件信息。
代码语言:javascript复制 /*
* Persist unloggedLSN value. It's reset on crash recovery, so this goes
* unused on non-shutdown checkpoints, but seems useful to store it always
* for debugging purposes.
*/
SpinLockAcquire(&XLogCtl->ulsn_lck);
ControlFile->unloggedLSN = XLogCtl->unloggedLSN;
SpinLockRelease(&XLogCtl->ulsn_lck);
UpdateControlFile();
LWLockRelease(ControlFileLock);
/* Update shared-memory copy of checkpoint XID/epoch */
SpinLockAcquire(&XLogCtl->info_lck);
XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
XLogCtl->ckptXid = checkPoint.nextXid;
SpinLockRelease(&XLogCtl->info_lck);
/*
* We are now done with critical updates; no need for system panic if we
* have trouble while fooling with old log segments.
*/
END_CRIT_SECTION();
/*
* Let smgr do post-checkpoint cleanup (eg, deleting old files).
*/
smgrpostckpt();
/*
* Delete old log files (those no longer needed even for previous
* checkpoint or the standbys in XLOG streaming).
*/
if (PriorRedoPtr != InvalidXLogRecPtr)
{
XLogSegNo _logSegNo;
/* Update the average distance between checkpoints. */
UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr);
XLByteToSeg(PriorRedoPtr, _logSegNo);
KeepLogSeg(recptr, &_logSegNo);
_logSegNo--;
RemoveOldXlogFiles(_logSegNo, PriorRedoPtr, recptr);
}
/*
* Make more log segments if needed. (Do this after recycling old log
* segments, since that may supply some of the needed files.)
*/
if (!shutdown)
PreallocXlogFiles(recptr);
/*
* Truncate pg_subtrans if possible. We can throw away all data before
* the oldest XMIN of any running transaction. No future transaction will
* attempt to reference any pg_subtrans entry older than that (see Asserts
* in subtrans.c). During recovery, though, we mustn't do this because
* StartupSUBTRANS hasn't been called yet.
*/
if (!RecoveryInProgress())
TruncateSUBTRANS(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT));
/* Real work is done, but log and update stats before releasing lock. */
LogCheckpointEnd(false);
TRACE_POSTGRESQL_CHECKPOINT_DONE(CheckpointStats.ckpt_bufs_written,
NBuffers,
CheckpointStats.ckpt_segs_added,
CheckpointStats.ckpt_segs_removed,
CheckpointStats.ckpt_segs_recycled);
LWLockRelease(CheckpointLock);
}
CheckPointGuts
flush一切检查点重量级函数,其中刷缓存CheckPointBuffers会被target参数限速。
代码语言:javascript复制/*
* Flush all data in shared memory to disk, and fsync
*
* This is the common code shared between regular checkpoints and
* recovery restartpoints.
*/
static void
CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
{
CheckPointCLOG();
CheckPointCommitTs();
CheckPointSUBTRANS();
CheckPointMultiXact();
CheckPointPredicate();
CheckPointRelationMap();
CheckPointReplicationSlots();
CheckPointSnapBuild();
CheckPointLogicalRewriteHeap();
CheckPointBuffers(flags); /* performs all required fsyncs */
CheckPointReplicationOrigin();
/* We deliberately delay 2PC checkpointing as long as possible */
CheckPointTwoPhase(checkPointRedo);
}