(继续Postgresql技术内幕学习)
- 前面几节已经把XLOG所需的数据注册到内存中了,下面开始组装XLOG。
- XLogRecordAssemble完成日志组装,处理页面数据Block部分,把registered_buffers中的数据进行二次加工。
/*
* Assemble a WAL record from the registered data and buffers into an
* XLogRecData chain, ready for insertion with XLogInsertRecord().
*
* The record header fields are filled in, except for the xl_prev field. The
* calculated CRC does not include the record header yet.
*
* If there are any registered buffers, and a full-page image was not taken
* of all of them, *fpw_lsn is set to the lowest LSN among such pages. This
* signals that the assembled record is only good for insertion on the
* assumption that the RedoRecPtr and doPageWrites values were up-to-date.
*/
static XLogRecData *
XLogRecordAssemble(RmgrId rmid, uint8 info,
XLogRecPtr RedoRecPtr, bool doPageWrites,
XLogRecPtr *fpw_lsn)
{
...
【组装链表尾部】
XLogRecData *rdt_datas_last;
【组装链表头部】
XLogRecord *rechdr;
【组装链表头部,当前写到的位置】
char *scratch = hdr_scratch;
【当前写到的位置,首先让出一个XLogRecord的空间】
rechdr = (XLogRecord *) scratch;
scratch = SizeOfXLogRecord;
hdr_rdt.next = NULL;
rdt_datas_last = &hdr_rdt;
hdr_rdt.data = hdr_scratch;
【REDO之后是不是需要检查页面一致性】
if (wal_consistency_checking[rmid])
info |= XLR_CHECK_CONSISTENCY;
【开始处理】
【max_registered_block_id里面注册的所有block】
*fpw_lsn = InvalidXLogRecPtr;
for (block_id = 0; block_id < max_registered_block_id; block_id )
{
registered_buffer *regbuf = ®istered_buffers[block_id];
【full page write标记】
bool needs_backup;
bool needs_data;
【通用BLOCK头】
XLogRecordBlockHeader bkpb;
【如果做FPW,需要这个头】
XLogRecordBlockImageHeader bimg;
【如果做FPW,切需要压缩,需要这个头】
XLogRecordBlockCompressHeader cbimg = {0};
【日志记录的前一个页面和这条是不是一个表的】
bool samerel;
【页面是否已经压缩】
bool is_compressed = false;
bool include_image;
【确定这是一个已经使用中的registered_buffer】
if (!regbuf->in_use)
continue;
【决定是否需要FPW】
【先看标志位REGBUF_FORCE_IMAGE、REGBUF_NO_IMAGE】
【在看参数配置和backup状态】
【最后看LSN】
if (regbuf->flags & REGBUF_FORCE_IMAGE)
needs_backup = true;
else if (regbuf->flags & REGBUF_NO_IMAGE)
needs_backup = false;
else if (!doPageWrites)
needs_backup = false;
else
...
/* Determine if the buffer data needs to included */
【是否保存页面数据】
if (regbuf->rdata_len == 0)
needs_data = false;
else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
needs_data = true;
else
needs_data = !needs_backup;
【开始组装XlogRecordBLockHeader】
bkpb.id = block_id;
bkpb.fork_flags = regbuf->forkno;
bkpb.data_length = 0;
if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
【如果需要FPW,需要页面备份】
【如果REDO的时候需要一致性检查,需要页面备份】
include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;
if (include_image)
{
Page page = regbuf->page;
uint16 compressed_len;
/*
* The page needs to be backed up, so calculate its hole length
* and offset.
*/
if (regbuf->flags & REGBUF_STANDARD)
{
/* Assume we can omit data between pd_lower and pd_upper */
uint16 lower = ((PageHeader) page)->pd_lower;
uint16 upper = ((PageHeader) page)->pd_upper;
【lower指向页面上面的指针,正生长,upper指向页面下面的数据,逆生长】
【lower-----upper中间会有一段空洞】
if (lower >= SizeOfPageHeaderData &&
upper > lower &&
upper <= BLCKSZ)
{
【确实有空洞(没用满),记录空洞起始偏移和长度】
bimg.hole_offset = lower;
cbimg.hole_length = upper - lower;
}
else
{
...
}
}
else
{
/* Not a standard page header, don't try to eliminate "hole" */
bimg.hole_offset = 0;
cbimg.hole_length = 0;
}
【按需压缩页面】
if (wal_compression)
{
is_compressed =
XLogCompressBackupBlock(page, bimg.hole_offset,
cbimg.hole_length,
regbuf->compressed_page,
&compressed_len);
}
/*
* Fill in the remaining fields in the XLogRecordBlockHeader
* struct
*/
bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
/*
* Construct XLogRecData entries for the page content.
*/
【这里很有意思regbuf留了两个XlogRecData的位置给ASSEMBLY函数专用】
【所以rdt_datas_last指的第一个是hdr_rdt】
【rdt_datas_last指的后两个一定是bkp_rdatas[0]和[1]】【可能用不到见下面】
【一共会有三个XlogRecData】
rdt_datas_last->next = ®buf->bkp_rdatas[0];
rdt_datas_last = rdt_datas_last->next;
bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
/*
* If WAL consistency checking is enabled for the resource manager
* of this WAL record, a full-page image is included in the record
* for the block modified. During redo, the full-page is replayed
* only if BKPIMAGE_APPLY is set.
*/
if (needs_backup)
bimg.bimg_info |= BKPIMAGE_APPLY;
【情况1:如果是压缩页面,空洞信息已经保存了】
【情况2:空洞长度0,记录整个页面】
【情况3:未压缩有空洞,需要用rdt_datas_last的两个预留槽位bkp_rdatas[0]和[1]】
if (is_compressed)
{
【情况1】
bimg.length = compressed_len;
bimg.bimg_info |= BKPIMAGE_IS_COMPRESSED;
rdt_datas_last->data = regbuf->compressed_page;
rdt_datas_last->len = compressed_len;
}
else
{
bimg.length = BLCKSZ - cbimg.hole_length;
if (cbimg.hole_length == 0)
{
【情况2】
代码语言:javascript复制 rdt_datas_last->data = page;
rdt_datas_last->len = BLCKSZ;
}
else
{
【情况3】
【情况3】
代码语言:javascript复制【情况3】
/* must skip the hole */
rdt_datas_last->data = page;
rdt_datas_last->len = bimg.hole_offset;
rdt_datas_last->next = ®buf->bkp_rdatas[1];
rdt_datas_last = rdt_datas_last->next;
rdt_datas_last->data =
page (bimg.hole_offset cbimg.hole_length);
rdt_datas_last->len =
BLCKSZ - (bimg.hole_offset cbimg.hole_length);
}
}
total_len = bimg.length;
}
【把XlogRegisterBufData注册到registered_buffer中的数据链接进数组中】
【FPW的话则不需要记录页面修改的信息】
if (needs_data)
{
/*
* Link the caller-supplied rdata chain for this buffer to the
* overall list.
*/
bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
bkpb.data_length = regbuf->rdata_len;
total_len = regbuf->rdata_len;
rdt_datas_last->next = regbuf->rdata_head;
rdt_datas_last = regbuf->rdata_tail;
}
【如果连续两条记录都是一个表的,省一个filenode空间】
if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode))
{
samerel = true;
bkpb.fork_flags |= BKPBLOCK_SAME_REL;
}
else
samerel = false;
prev_regbuf = regbuf;
【开始组装】
代码语言:javascript复制 /* Ok, copy the header to the scratch buffer */
【1】复制XlogRecordBlockHeader
memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
scratch = SizeOfXLogRecordBlockHeader;
if (include_image)
{
【2】复制SizeOfXLogRecordBlockImageHeader
memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
scratch = SizeOfXLogRecordBlockImageHeader;
if (cbimg.hole_length != 0 && is_compressed)
{
【3】复制SizeOfXLogRecordBlockCompressHeader
memcpy(scratch, &cbimg,
SizeOfXLogRecordBlockCompressHeader);
scratch = SizeOfXLogRecordBlockCompressHeader;
}
}
【根据上面标记决定是否可以省一个RelFileNode】
if (!samerel)
{
memcpy(scratch, ®buf->rnode, sizeof(RelFileNode));
scratch = sizeof(RelFileNode);
}
memcpy(scratch, ®buf->block, sizeof(BlockNumber));
scratch = sizeof(BlockNumber);
}
...
【记录员maindata的长度,实际上maindata的主要内容会保存在hdr_rdt对应的数据链中】
/* followed by main data, if any */
if (mainrdata_len > 0)
{
if (mainrdata_len > 255)
{
*(scratch ) = (char) XLR_BLOCK_ID_DATA_LONG;
memcpy(scratch, &mainrdata_len, sizeof(uint32));
scratch = sizeof(uint32);
}
else
{
*(scratch ) = (char) XLR_BLOCK_ID_DATA_SHORT;
*(scratch ) = (uint8) mainrdata_len;
}
rdt_datas_last->next = mainrdata_head;
rdt_datas_last = mainrdata_last;
total_len = mainrdata_len;
}
rdt_datas_last->next = NULL;
hdr_rdt.len = (scratch - hdr_scratch);
total_len = hdr_rdt.len;
【计算C RC,保存到rechdr(XLogRecord)中】
INIT_CRC32C(rdata_crc);
COMP_CRC32C(rdata_crc, hdr_scratch SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
/*
* Fill in the fields in the record header. Prev-link is filled in later,
* once we know where in the WAL the record will be inserted. The CRC does
* not include the record header yet.
*/
rechdr->xl_xid = GetCurrentTransactionIdIfAny();
rechdr->xl_tot_len = total_len;
rechdr->xl_info = info;
rechdr->xl_rmid = rmid;
rechdr->xl_prev = InvalidXLogRecPtr;
rechdr->xl_crc = rdata_crc;
return &hdr_rdt;
}