Postgres
服务端后台进程功能概览在PG14版本中定义了如上图中的进程基本的描述信息 代码语言: javascript
复制 // 这里定义了PG基本的进程类型const char *GetBackendTypeDesc(BackendType backendType){
const char *backendDesc = "unknown process type";
switch (backendType)
{
case B_INVALID:
backendDesc = "not initialized";
break;
case B_AUTOVAC_LAUNCHER:
backendDesc = "autovacuum launcher";
break;
case B_AUTOVAC_WORKER:
backendDesc = "autovacuum worker";
break;
case B_BACKEND:
backendDesc = "client backend";
break;
case B_BG_WORKER:
backendDesc = "background worker";
break;
case B_BG_WRITER:
backendDesc = "background writer";
break;
case B_CHECKPOINTER:
backendDesc = "checkpointer";
break;
case B_STARTUP:
backendDesc = "startup";
break;
case B_WAL_RECEIVER:
backendDesc = "walreceiver";
break;
case B_WAL_SENDER:
backendDesc = "walsender";
break;
case B_WAL_WRITER:
backendDesc = "walwriter";
break;
case B_ARCHIVER:
backendDesc = "archiver";
break;
case B_STATS_COLLECTOR:
backendDesc = "stats collector";
break;
case B_LOGGER:
backendDesc = "logger";
break;
}
return backendDesc;}
后台进程启动流程 postgres
数据库启动后会在ServerLoop
中不断的监听来自客户端的第一次IO请求
,然后在创建客户端
进程,接着判断整个postgres
中的辅助的后台进程是否存在,如果不存在会自动拉起这个进程。这种模式和oracle
类似代码语言: javascript
复制 // PG启动后的非常核心的监听循环static int ServerLoop(void){
nSockets = initMasks(&readmask);
// for循环监听是否有来自客户端的请求
for (;;)
{
fd_set rmask;
int selres;
time_t now;
// 这里采用selelct模型,因为有客户端就有创建进程,select的模型是for循环遍历,可以优化为epoll模型
selres = select(nSockets, &rmask, NULL, NULL, &timeout);
if (selres > 0)
{
int i;
for (i = 0; i < MAXLISTEN; i )
{
// 如果有新客户端进来则创建新的后台进程,专门为客户端服务
if (FD_ISSET(ListenSocket[i], &rmask))
{
Port *port;
// 创建为客户端服务的进程
port = ConnCreate(ListenSocket[i]);
if (port)
{
// 启动后台进程
BackendStartup(port);
StreamClose(port->sock);
ConnFree(port);
}
}
}
}
// 如果日志进程不存在,且开启了日志进程则进行创建
if (SysLoggerPID == 0 && Logging_collector)
SysLoggerPID = SysLogger_Start();
// 检查checkpoint进程和后台磁盘写进程,如果不存在则创建
if (pmState == PM_RUN || pmState == PM_RECOVERY ||
pmState == PM_HOT_STANDBY)
{
if (CheckpointerPID == 0)
CheckpointerPID = StartCheckpointer();
if (BgWriterPID == 0)
BgWriterPID = StartBackgroundWriter();
}
// 如果wal写进程不存在,则创建wal写进程
if (WalWriterPID == 0 && pmState == PM_RUN)
WalWriterPID = StartWalWriter();
// 判断vacuum的守护进程,如果不存在则启动
if (!IsBinaryUpgrade && AutoVacPID == 0 &&
(AutoVacuumingActive() || start_autovac_launcher) &&
pmState == PM_RUN)
{
AutoVacPID = StartAutoVacLauncher();
if (AutoVacPID != 0)
start_autovac_launcher = false; /* signal processed */
}
// 判断统计信息收集进程不存在,则启动统计信息收集进程
if (PgStatPID == 0 &&
(pmState == PM_RUN || pmState == PM_HOT_STANDBY))
PgStatPID = pgstat_start();
// 如果配置归档进程,如果不存在则启动归档进程
if (PgArchPID == 0 && PgArchStartupAllowed())
PgArchPID = StartArchiver();
// 如果是备库,则启动wal receiver进程
if (WalReceiverRequested)
MaybeStartWalReceiver();
// 启动其他的辅助进程,如果存在crash的情况
if (StartWorkerNeeded || HaveCrashedWorker)
maybe_start_bgworkers();
// 计算时间的间隔
now = time(NULL);
if (now - last_touch_time >= 58 * SECS_PER_MINUTE)
{
TouchSocketFiles();
TouchSocketLockFiles();
last_touch_time = now;
}
}}
autovacuum launcher
主要做了什么?autovacuum launcher
进程可以理解为vacuum
进程的守护进程,根据参数配置和负载动态的创建vacuum
进程,这个核心逻辑在AutoVacLauncherMain
函数中代码语言: javascript
复制 NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]){
if (!AutoVacuumingActive())
{
// 如果么有stop的请求,就直接创建vacuum工作进程
if (!ShutdownRequestPending)
do_start_worker();
proc_exit(0); /* done */
}}
background worker
主要做了什么?后台刷脏
进程核心工作是从shard buffer pool
中把脏的page
刷新到磁盘,目的是尽可能的利用好shard buffer pool
的内存缓冲区。后台刷脏
的核心工作定义在BackgroundWriterMain
代码语言: javascript
复制 void BackgroundWriterMain(void){
// 核心的loop,不断的同步脏page到磁盘
for (;;)
{
bool can_hibernate;
int rc;
ResetLatch(MyLatch);
// 刷脏的动作
can_hibernate = BgBufferSync(&wb_context);
// 发送统计数据给统计信息的进程
pgstat_send_bgwriter();
// 如果没有脏page,就休眠一会
rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
BgWriterDelay /* ms */ , WAIT_EVENT_BGWRITER_MAIN);
prev_hibernate = can_hibernate;
}}
autovacuum worker
主要做了什么?vacuum
进程的主要目的是回收当前数据库中的已经废弃或者我用的数据,目前有两种模式,一种是回收无用page的空间并不归还给操作系统,这样设计为后续再插入新的page就不用分配空间;第二种是扫描表中无用的page,重新写到新的page中,然后把无用的page的空间归还给操作系统。代码语言: javascript
复制 // 启动vacuum进程回收无用的page空间static Oid do_start_worker(void){
// 获取当前的统计信息
autovac_refresh_stats();
// 获取数据库列表
dblist = get_database_list();
// 查找可以被vacuum的数据库
foreach(cell, dblist)
{
avw_dbase *tmp = lfirst(cell);
dlist_iter iter;
skipit = false;
dlist_reverse_foreach(iter, &DatabaseList)
{
avl_dbase *dbp = dlist_container(avl_dbase, adl_node, iter.cur);
if (dbp->adl_datid == tmp->adw_datid)
{
if (!TimestampDifferenceExceeds(dbp->adl_next_worker,
current_time, 0) &&
!TimestampDifferenceExceeds(current_time,
dbp->adl_next_worker,
autovacuum_naptime * 1000))
skipit = true;
break;
}
}
if (skipit)
continue;
if (avdb == NULL ||
tmp->adw_entry->last_autovac_time < avdb->adw_entry->last_autovac_time)
avdb = tmp;
}
// 查找到数据库然后进行处理
if (avdb != NULL)
{
WorkerInfo worker;
dlist_node *wptr;
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
wptr = dlist_pop_head_node(&AutoVacuumShmem->av_freeWorkers);
worker = dlist_container(WorkerInfoData, wi_links, wptr);
worker->wi_dboid = avdb->adw_datid;
worker->wi_proc = NULL;
worker->wi_launchtime = GetCurrentTimestamp();
AutoVacuumShmem->av_startingWorker = worker;
LWLockRelease(AutovacuumLock);
SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
retval = avdb->adw_datid;
}
return retval;}
checkpointer
主要做了什么?假设数据库不断的写wal日志,很多脏page都没有刷新到磁盘,一旦数据库crash,需要从头开始进行数据库恢复,这样数据库恢复的时间非常长;如果有了checkpointer
进程定期涉及到wal日志文件中,缓存池中的脏page,定期刷新到磁盘,并在wal中做好记录说明脏page刷到哪个位置,即使数据库崩溃,可以从上一次的checkpointer
点进行恢复,这样能大大减少数据库恢复的时间。 代码语言: javascript
复制 // checkpoint进程的工作入口void CheckpointerMain(void){
/*
* Loop forever
*/
for (;;)
{
bool do_checkpoint = false;
int flags = 0;
pg_time_t now;
int elapsed_secs;
int cur_timeout;
/* Clear any already-pending wakeups */
ResetLatch(MyLatch);
/*
* Process any requests or signals received recently.
*/
AbsorbSyncRequests();
HandleCheckpointerInterrupts();
/*
* Detect a pending checkpoint request by checking whether the flags
* word in shared memory is nonzero. We shouldn't need to acquire the
* ckpt_lck for this.
*/
if (((volatile CheckpointerShmemStruct *) CheckpointerShmem)->ckpt_flags)
{
do_checkpoint = true;
BgWriterStats.m_requested_checkpoints ;
}
/*
* Force a checkpoint if too much time has elapsed since the last one.
* Note that we count a timed checkpoint in stats only when this
* occurs without an external request, but we set the CAUSE_TIME flag
* bit even if there is also an external request.
*/
now = (pg_time_t) time(NULL);
elapsed_secs = now - last_checkpoint_time;
if (elapsed_secs >= CheckPointTimeout)
{
if (!do_checkpoint)
BgWriterStats.m_timed_checkpoints ;
do_checkpoint = true;
flags |= CHECKPOINT_CAUSE_TIME;
}
/*
* Do a checkpoint if requested.
*/
if (do_checkpoint)
{
bool ckpt_performed = false;
bool do_restartpoint;
// 是否需要进行recovery
do_restartpoint = RecoveryInProgress();
// 执行检查点行为
if (!do_restartpoint)
{
CreateCheckPoint(flags);
ckpt_performed = true;
}
else
ckpt_performed = CreateRestartPoint(flags);
if (ckpt_performed)
{
last_checkpoint_time = now;
}
else
{
last_checkpoint_time = now - CheckPointTimeout 15;
}
ckpt_active = false;
}
// 必要的情况下切换xlog
CheckArchiveTimeout();
// 发送统计信息给后台刷脏进程
pgstat_send_bgwriter();
// 发送wal的统计信息给统计信息进程
pgstat_send_wal(true);
// 完成后短暂休眠
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
cur_timeout * 1000L /* convert to ms */ ,
WAIT_EVENT_CHECKPOINTER_MAIN);
}}
walwriter
主要做了什么?postgre
数据写到内存之前是先把数据写到wal日志,wal日志是write-ahead-log。目的是为了防止内存中的已提交或者未提交的page掉电而引起数据丢失。wal写进程是不断的把wal buffer
中的日志数据不断的刷盘到wal日志文件中。代码语言: javascript
复制 // wal日志写进程核心逻辑void WalWriterMain(void){
// Wal进程的LOOP循环
for (;;)
{
long cur_timeout;
// 后台刷新Wal buffer中的数据到Wal日志
if (XLogBackgroundFlush())
left_till_hibernate = LOOPS_UNTIL_HIBERNATE;
else if (left_till_hibernate > 0)
left_till_hibernate--;
// 发送Wal的日志统计信息给统计信息进程
pgstat_send_wal(false);
// 休眠短暂时间
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
cur_timeout,
WAIT_EVENT_WAL_WRITER_MAIN);
}}