相关 《Postgresql源码(76)执行器专用元组格式TupleTableSlot》 《Postgresql源码(82)SPI模块拆解分析一:执行简单SQL获取结果》 《Postgresql源码(83)执行器的结果接收系统——DestReceiver》
0 总结
- 执行器进入前会配置_DestReceiver(一套接口)
- 执行器在内部跑一遍计划,产生一条tts(参考《Postgresql源码(76)执行器专用元组格式TupleTableSlot》),执行器调用接口函数receiveSlot,按当前接口定义输出到指定位置。
- 例如
- 【SELECT xxx】receiveSlot为printtup:把tts按列解析后拿到数据,按客户端服务端协议拼装包内容,调用libpq返回给客户端。
- 【COPY TO 文件】receiveSlot为copy_dest_receive:把tts按列解析后拿到数据,按copy语法提供的分隔符组装,fwrite到文件中。
- 【SPI】中receiveSlot为spi_printtup:把tts转换为HeapTuple格式,保存到SPI结果全局变量中。
1 概要
执行器的工作包括:work、get result,之前work的内容已经介绍过了,这里分析下执行器如何拿到执行结果。
- 执行器会在多种场景下工作,例如:
- SPI调用。
- 常规客户端服务端的调用。
- standalone backend调用(没有postmaster)。
- 系统内部调用。
- 对于上述场景,执行器的调用者有较大的差异,结果集无法使用一套函数返回。所以执行器设计了一套拿结果的函数钩子(接口),调用者需要将结果集的获取函数配置到接口上,执行器在执行中会把结果通过接口函数调入相应模块中,完成调用者所需的结果集构造,例如:
- SPI的结果需要存放到执行的全局变量结构中。
- 常规客户端服务端调用需要将结果用Libpq返回客户端。
- standalone backend调用需要将结果打印到stdout。
- 系统内部调用不需要返回结果。
PG的结果接收器提供了四个接口:
- receiveSlot:输入执行器产生的tts,按指定格式输出
- rStartup:初始化结果接收器
- rShutdown:停止结果接收器
- rDestroy:清理动态申请中间变量
struct _DestReceiver
{
/* Called for each tuple to be output: */
bool (*receiveSlot) (TupleTableSlot *slot,
DestReceiver *self);
/* Per-executor-run initialization and shutdown: */
void (*rStartup) (DestReceiver *self,
int operation,
TupleDesc typeinfo);
void (*rShutdown) (DestReceiver *self);
/* Destroy the receiver object itself (if dynamically allocated) */
void (*rDestroy) (DestReceiver *self);
/* CommandDest code for this receiver */
CommandDest mydest;
/* Private fields might appear beyond this point... */
};
2 场景
第一组:正常客户端连接场景【DestRemote】
这一组函数接口由printtup_create_DR配置:
代码语言:javascript复制DestReceiver *
printtup_create_DR(CommandDest dest)
{
DR_printtup *self = (DR_printtup *) palloc0(sizeof(DR_printtup));
self->pub.receiveSlot = printtup; /* might get changed later */
self->pub.rStartup = printtup_startup;
self->pub.rShutdown = printtup_shutdown;
self->pub.rDestroy = printtup_destroy;
self->pub.mydest = dest;
/*
* Send T message automatically if DestRemote, but not if
* DestRemoteExecute
*/
self->sendDescrip = (dest == DestRemote);
self->attrinfo = NULL;
self->nattrs = 0;
self->myinfo = NULL;
self->buf.data = NULL;
self->tmpcontext = NULL;
return (DestReceiver *) self;
}
注意:这里的数据结构DR_printtup把DestReceiver有包装了 一层:
代码语言:javascript复制typedef struct
{
DestReceiver pub; /* publicly-known function pointers */
Portal portal; /* the Portal we are printing from */
bool sendDescrip; /* send RowDescription at startup? */
TupleDesc attrinfo; /* The attr info we are set up for */
int nattrs;
PrinttupAttrInfo *myinfo; /* Cached info about each attr */
StringInfoData buf; /* output buffer (*not* in tmpcontext) */
MemoryContext tmpcontext; /* Memory context for per-row workspace */
} DR_printtup;
来看下这几个函数的工作位置和流程,例如:
代码语言:javascript复制select s::int, left(random()::text,4) l from generate_series(1,2) s;
-- 输出:两行、两列
s | l
--- ------
1 | 0.55
2 | 0.28
1 rStartup = printtup_startup:申请上下文,发送行描述符
位置
代码语言:javascript复制#0 printtup_startup (self=0x106dd50, operation=1, typeinfo=0x100d190) at printtup.c:113
#1 0x0000000000733ddf in standard_ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:350
#2 0x0000000000733ca8 in ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:305
#3 0x000000000097ccd5 in PortalRunSelect (portal=0xff5050, forward=true, count=0, dest=0x106dd50) at pquery.c:921
#4 0x000000000097c994 in PortalRun (portal=0xff5050, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0x106dd50, altdest=0x106dd50, qc=0x7ffd6b492890) at pquery.c:765
#5 0x000000000097663b in exec_simple_query (query_string=0xf5a4f0 "select s::int, left(random()::text,4) l from generate_series(1,2) sn;") at postgres.c:1213
#6 0x000000000097ab59 in PostgresMain (argc=1, argv=0x7ffd6b492b20, dbname=0xf83cc0 "postgres", username=0xf83c98 "mingjiegao") at postgres.c:4494
#7 0x00000000008b6d4e in BackendRun (port=0xf7ba90) at postmaster.c:4530
#8 0x00000000008b66cd in BackendStartup (port=0xf7ba90) at postmaster.c:4252
#9 0x00000000008b2b45 in ServerLoop () at postmaster.c:1745
#10 0x00000000008b2417 in PostmasterMain (argc=1, argv=0xf540e0) at postmaster.c:1417
#11 0x00000000007b4c93 in main (argc=1, argv=0xf540e0) at main.c:209
流程
代码语言:javascript复制static void
printtup_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
{
DR_printtup *myState = (DR_printtup *) self;
Portal portal = myState->portal;
/*
* Create I/O buffer to be used for all messages. This cannot be inside
* tmpcontext, since we want to re-use it across rows.
*/
initStringInfo(&myState->buf);
申请"printtup"上下文,存放所有输出数据,
代码语言:javascript复制 myState->tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
"printtup",
ALLOCSET_DEFAULT_SIZES);
启动时就需要发送行描述符:
代码语言:javascript复制 if (myState->sendDescrip)
SendRowDescriptionMessage(&myState->buf,
typeinfo,
FetchPortalTargetList(portal),
portal->formats);
}
SendRowDescriptionMessage发送行描述符,入参:
代码语言:javascript复制List *targetlist
TargetEntry:
{xpr = {type = T_TargetEntry},
expr = 0x1086570,
resno = 1,
resname = 0xf5af88 "s",
ressortgroupref = 0,
resorigtbl = 0,
resorigcol = 0,
resjunk = false}
TargetEntry:
{xpr = {type = T_TargetEntry},
expr = 0x1086868,
resno = 2,
resname = 0xf5b5a0 "l",
ressortgroupref = 0,
resorigtbl = 0,
resorigcol = 0,
resjunk = false}
typeinfo
{natts = 2, tdtypeid = 2249, tdtypmod = -1, tdrefcount = -1, constr = 0x0, attrs = 0x100d1a8}
流程:拼接输出串
代码语言:javascript复制void
SendRowDescriptionMessage(StringInfo buf, TupleDesc typeinfo,
List *targetlist, int16 *formats)
{
int natts = typeinfo->natts;
int i;
ListCell *tlist_item = list_head(targetlist);
/* tuple descriptor message type */
pq_beginmessage_reuse(buf, 'T');
/* # of attrs in tuples */
pq_sendint16(buf, natts);
/*
* Preallocate memory for the entire message to be sent. That allows to
* use the significantly faster inline pqformat.h functions and to avoid
* reallocations.
*
* Have to overestimate the size of the column-names, to account for
* character set overhead.
*/
enlargeStringInfo(buf, (NAMEDATALEN * MAX_CONVERSION_GROWTH /* attname */
sizeof(Oid) /* resorigtbl */
sizeof(AttrNumber) /* resorigcol */
sizeof(Oid) /* atttypid */
sizeof(int16) /* attlen */
sizeof(int32) /* attypmod */
sizeof(int16) /* format */
) * natts);
for (i = 0; i < natts; i)
{
Form_pg_attribute att = TupleDescAttr(typeinfo, i);
Oid atttypid = att->atttypid;
int32 atttypmod = att->atttypmod;
Oid resorigtbl;
AttrNumber resorigcol;
int16 format;
/*
* If column is a domain, send the base type and typmod instead.
* Lookup before sending any ints, for efficiency.
*/
atttypid = getBaseTypeAndTypmod(atttypid, &atttypmod);
/* Do we have a non-resjunk tlist item? */
while (tlist_item &&
((TargetEntry *) lfirst(tlist_item))->resjunk)
tlist_item = lnext(targetlist, tlist_item);
if (tlist_item)
{
TargetEntry *tle = (TargetEntry *) lfirst(tlist_item);
resorigtbl = tle->resorigtbl;
resorigcol = tle->resorigcol;
tlist_item = lnext(targetlist, tlist_item);
}
else
{
/* No info available, so send zeroes */
resorigtbl = 0;
resorigcol = 0;
}
if (formats)
format = formats[i];
else
format = 0;
pq_writestring(buf, NameStr(att->attname));
pq_writeint32(buf, resorigtbl);
pq_writeint16(buf, resorigcol);
pq_writeint32(buf, atttypid);
pq_writeint16(buf, att->attlen);
pq_writeint32(buf, atttypmod);
pq_writeint16(buf, format);
}
pq_endmessage_reuse(buf);
}
pq_endmessage_reuse
代码语言:javascript复制socket_putmessage(char msgtype, const char *s, size_t len)
(gdb) p s
$18 = 0x10724e0 ""
(gdb) p len
$19 = 42
(gdb) x/32bx 0x10724e0
0x10724e0: 0x00 0x02 0x73(s) 0x00 0x00 0x00 0x00 0x00
0x10724e8: 0x00 0x00 0x00 0x00 0x00 0x17 0x00 0x04
0x10724f0: 0xff 0xff 0xff 0xff 0x00 0x00 0x6c(l) 0x00
0x10724f8: 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00
0x1072500: 0x00 0x19 0xff 0xff 0xff 0xff 0xff 0xff
0x1072508: 0x00 0x00
2 receiveSlot = printtup
位置
代码语言:javascript复制#0 printtup (slot=0x100d2a8, self=0x106dd50) at printtup.c:303
#1 0x0000000000736102 in ExecutePlan (estate=0x100b630, planstate=0x100b868, use_parallel_mode=false, operation=CMD_SELECT, sendTuples=true, numberTuples=0,direction=ForwardScanDirection, dest=0x106dd50, execute_once=true) at execMain.c:1582
#2 0x0000000000733e7d in standard_ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:361
#3 0x0000000000733ca8 in ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:305
#4 0x000000000097ccd5 in PortalRunSelect (portal=0xff5050, forward=true, count=0, dest=0x106dd50) at pquery.c:921
#5 0x000000000097c994 in PortalRun (portal=0xff5050, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0x106dd50, altdest=0x106dd50,qc=0x7ffd6b492890) at pquery.c:765
#6 0x000000000097663b in exec_simple_query (query_string=0xf5a4f0 "select s::int, left(random()::text,4) l from generate_series(1,2) s ;") at postgres.c:1213
#7 0x000000000097ab59 in PostgresMain (argc=1, argv=0x7ffd6b492b20, dbname=0xf83cc0 "postgres", username=0xf83c98 "mingjiegao") at postgres.c:4494
#8 0x00000000008b6d4e in BackendRun (port=0xf7ba90) at postmaster.c:4530
#9 0x00000000008b66cd in BackendStartup (port=0xf7ba90) at postmaster.c:4252
#10 0x00000000008b2b45 in ServerLoop () at postmaster.c:1745
#11 0x00000000008b2417 in PostmasterMain (argc=1, argv=0xf540e0) at postmaster.c:1417
#12 0x00000000007b4c93 in main (argc=1, argv=0xf540e0) at main.c:209
流程
printtup输入为tts包装的元组,和上面初始化后的DestReceiver。
代码语言:javascript复制static bool
printtup(TupleTableSlot *slot, DestReceiver *self)
printtup_prepare_info // 拼接DR_printtup中的信息,准备发送
MemoryContextSwitchTo // 切换到"printtup"
pq_beginmessage_reuse // 调用libpq开始发数据
pq_sendint16
...
...
pq_endmessage_reuse
3 rShutdown = printtup_shutdown
位置
代码语言:javascript复制#0 printtup_shutdown (self=0x106dd50) at printtup.c:388
#1 0x0000000000733e98 in standard_ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:376
#2 0x0000000000733ca8 in ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:305
#3 0x000000000097ccd5 in PortalRunSelect (portal=0xff5050, forward=true, count=0, dest=0x106dd50) at pquery.c:921
#4 0x000000000097c994 in PortalRun (portal=0xff5050, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0x106dd50, altdest=0x106dd50, qc=0x7ffd6b492890) at pquery.c:765
#5 0x000000000097663b in exec_simple_query (query_string=0xf5a4f0 "select s::int, left(random()::text,4) l from generate_series(1,2) s ;") at postgres.c:1213
#6 0x000000000097ab59 in PostgresMain (argc=1, argv=0x7ffd6b492b20, dbname=0xf83cc0 "postgres", username=0xf83c98 "mingjiegao") at postgres.c:4494
#7 0x00000000008b6d4e in BackendRun (port=0xf7ba90) at postmaster.c:4530
#8 0x00000000008b66cd in BackendStartup (port=0xf7ba90) at postmaster.c:4252
#9 0x00000000008b2b45 in ServerLoop () at postmaster.c:1745
#10 0x00000000008b2417 in PostmasterMain (argc=1, argv=0xf540e0) at postmaster.c:1417
#11 0x00000000007b4c93 in main (argc=1, argv=0xf540e0) at main.c:209
流程:清理工作
代码语言:javascript复制static void
printtup_shutdown(DestReceiver *self)
{
DR_printtup *myState = (DR_printtup *) self;
if (myState->myinfo)
pfree(myState->myinfo);
myState->myinfo = NULL;
myState->attrinfo = NULL;
if (myState->buf.data)
pfree(myState->buf.data);
myState->buf.data = NULL;
if (myState->tmpcontext)
MemoryContextDelete(myState->tmpcontext);
myState->tmpcontext = NULL;
}
4 rDestroy = printtup_destroy
位置
代码语言:javascript复制#0 printtup_destroy (self=0x106dd50) at printtup.c:412
#1 0x0000000000976650 in exec_simple_query (query_string=0xf5a4f0 "select s::int, left(random()::text,4) l from generate_series(1,2) s ;") at postgres.c:1221
#2 0x000000000097ab59 in PostgresMain (argc=1, argv=0x7ffd6b492b20, dbname=0xf83cc0 "postgres", username=0xf83c98 "mingjiegao") at postgres.c:4494
#3 0x00000000008b6d4e in BackendRun (port=0xf7ba90) at postmaster.c:4530
#4 0x00000000008b66cd in BackendStartup (port=0xf7ba90) at postmaster.c:4252
#5 0x00000000008b2b45 in ServerLoop () at postmaster.c:1745
#6 0x00000000008b2417 in PostmasterMain (argc=1, argv=0xf540e0) at postmaster.c:1417
#7 0x00000000007b4c93 in main (argc=1, argv=0xf540e0) at main.c:209
流程:清理动态申请的:外层数据结构DR_printtup
代码语言:javascript复制static void
printtup_destroy(DestReceiver *self)
{
pfree(self);
}
第二组:COPY数据场景【DestCopyOut】
这一组函数接口由CreateCopyDestReceiver配置:
代码语言:javascript复制DestReceiver *
CreateCopyDestReceiver(void)
{
DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
self->pub.receiveSlot = copy_dest_receive;
self->pub.rStartup = copy_dest_startup;
self->pub.rShutdown = copy_dest_shutdown;
self->pub.rDestroy = copy_dest_destroy;
self->pub.mydest = DestCopyOut;
self->cstate = NULL; /* will be set later */
self->processed = 0;
return (DestReceiver *) self;
}
注意copy也给DestReceiver包了一层:DR_copy
代码语言:javascript复制typedef struct
{
DestReceiver pub; /* publicly-known function pointers */
CopyToState cstate; /* CopyToStateData for the command */
uint64 processed; /* # of tuples processed */
} DR_copy;
来看下这几个函数的工作位置和流程,例如:
代码语言:javascript复制copy (select s::int, left(random()::text,4) l from generate_series(1,2) s) to '/tmp/a';
-- 输出:两行两列
1 0.74
2 0.09
1 rStartup = copy_dest_startup
空
2 receiveSlot = copy_dest_receive
位置
代码语言:javascript复制#0 copy_dest_receive (slot=0x1048538, self=0x10861c0) at copyto.c:1259
#1 0x0000000000736102 in ExecutePlan (estate=0x10468c0, planstate=0x1046af8, use_parallel_mode=false, operation=CMD_SELECT, sendTuples=true, numberTuples=0, direction=ForwardScanDirection, dest=0x10861c0, execute_once=true) at execMain.c:1582
#2 0x0000000000733e7d in standard_ExecutorRun (queryDesc=0x1086218, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:361
#3 0x0000000000733ca8 in ExecutorRun (queryDesc=0x1086218, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:305
#4 0x0000000000685e87 in DoCopyTo (cstate=0xf7da60) at copyto.c:905
#5 0x000000000067bdfa in DoCopy (pstate=0xf7d910, stmt=0xf5bb88, stmt_location=0, stmt_len=86, processed=0x7ffd6b4924e8) at copy.c:309
#6 0x000000000097ec16 in standard_ProcessUtility (pstmt=0xf5bef8, queryString=0xf5a4f0 "copy (select s::int, left(random()::text,4) l from generate_series(1,2) s) to '/tmp/a';", readOnlyTree=false, context=PROCESS_UTILITY_TOPLEVEL, params=0x0, queryEnv=0x0, dest=0xf5bfe8, qc=0x7ffd6b492890) at utility.c:739
#7 0x000000000097e69b in ProcessUtility (pstmt=0xf5bef8, queryString=0xf5a4f0 "copy (select s::int, left(random()::text,4) l from generate_series(1,2) s) to '/tmp/a';", readOnlyTree=false, context=PROCESS_UTILITY_TOPLEVEL, params=0x0, queryEnv=0x0, dest=0xf5bfe8, qc=0x7ffd6b492890) at utility.c:527
#8 0x000000000097d297 in PortalRunUtility (portal=0xff5050, pstmt=0xf5bef8, isTopLevel=true, setHoldSnapshot=false, dest=0xf5bfe8, qc=0x7ffd6b492890)at pquery.c:1155
#9 0x000000000097d4fb in PortalRunMulti (portal=0xff5050, isTopLevel=true, setHoldSnapshot=false, dest=0xf5bfe8, altdest=0xf5bfe8, qc=0x7ffd6b492890) at pquery.c:1312
#10 0x000000000097ca27 in PortalRun (portal=0xff5050, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0xf5bfe8, altdest=0xf5bfe8, qc=0x7ffd6b492890) at pquery.c:788
#11 0x000000000097663b in exec_simple_query (query_string=0xf5a4f0 "copy (select s::int, left(random()::text,4) l from generate_series(1,2) s) to '/tmp/a';")at postgres.c:1213
#12 0x000000000097ab59 in PostgresMain (argc=1, argv=0x7ffd6b492b20, dbname=0xf83cc0 "postgres", username=0xf83c98 "mingjiegao") at postgres.c:4494
#13 0x00000000008b6d4e in BackendRun (port=0xf7ba90) at postmaster.c:4530
#14 0x00000000008b66cd in BackendStartup (port=0xf7ba90) at postmaster.c:4252
#15 0x00000000008b2b45 in ServerLoop () at postmaster.c:1745
#16 0x00000000008b2417 in PostmasterMain (argc=1, argv=0xf540e0) at postmaster.c:1417
#17 0x00000000007b4c93 in main (argc=1, argv=0xf540e0) at main.c:209
流程
- 同样也是拿到tts和DestReceiver
- 由CopyOneRowTo调用CopySendEndOfRow调用fwrite写入文件
static bool
copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_copy *myState = (DR_copy *) self;
CopyToState cstate = myState->cstate;
/* Send the data */
CopyOneRowTo(cstate, slot);
/* Increment the number of processed tuples, and report the progress */
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
myState->processed);
return true;
}
3 rShutdown = copy_dest_shutdown
位置
代码语言:javascript复制#0 copy_dest_shutdown (self=0x10861c0) at copyto.c:1279
#1 0x0000000000733e98 in standard_ExecutorRun (queryDesc=0x1086218, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:376
#2 0x0000000000733ca8 in ExecutorRun (queryDesc=0x1086218, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:305
#3 0x0000000000685e87 in DoCopyTo (cstate=0xf7da60) at copyto.c:905
#4 0x000000000067bdfa in DoCopy (pstate=0xf7d910, stmt=0xf5bb88, stmt_location=0, stmt_len=86, processed=0x7ffd6b4924e8) at copy.c:309
#5 0x000000000097ec16 in standard_ProcessUtility (pstmt=0xf5bef8, queryString=0xf5a4f0 "copy (select s::int, left(random()::text,4) l from generate_series(1,2) s) to '/tmp/a';", readOnlyTree=false, context=PROCESS_UTILITY_TOPLEVEL, params=0x0, queryEnv=0x0, dest=0xf5bfe8, qc=0x7ffd6b492890) at utility.c:739
无操作
4 rDestroy = copy_dest_destroy
不执行,因为DestReceiver外面包的数据结构DR_copy没有什么需要释放的。
第三组:SPI获取数据场景【DestSPI】
这一组函数接口由CreateDestReceiver分发函数直接配置,注意前面两种都是走CreateDestReceiver入口进入自己的配置函数,但是SPI不同,直接在CreateDestReceiver里面配置:
代码语言:javascript复制DestReceiver *
CreateDestReceiver(CommandDest dest)
{
/*
* It's ok to cast the constness away as any modification of the none
* receiver would be a bug (which gets easier to catch this way).
*/
switch (dest)
{
case DestRemote:
case DestRemoteExecute:
return printtup_create_DR(dest);
case DestRemoteSimple:
return unconstify(DestReceiver *, &printsimpleDR);
case DestNone:
return unconstify(DestReceiver *, &donothingDR);
case DestDebug:
return unconstify(DestReceiver *, &debugtupDR);
// 这里配置 <-<-<-----------------------------
case DestSPI:
return unconstify(DestReceiver *, &spi_printtupDR);
case DestTuplestore:
return CreateTuplestoreDestReceiver();
case DestIntoRel:
return CreateIntoRelDestReceiver(NULL);
case DestCopyOut:
return CreateCopyDestReceiver();
case DestSQLFunction:
return CreateSQLFunctionDestReceiver();
case DestTransientRel:
return CreateTransientRelDestReceiver(InvalidOid);
case DestTupleQueue:
return CreateTupleQueueDestReceiver(NULL);
}
/* should never get here */
pg_unreachable();
}
spi_printtupDR带四个函数:
代码语言:javascript复制static const DestReceiver spi_printtupDR = {
spi_printtup, spi_dest_startup, donothingCleanup, donothingCleanup,
DestSPI
};
SPI的结果不是直接返回给客户端的!SPI有自己的三个全局变量来指向结果集,SPI的接口函数会从全局变量中取值,组织后返回给客户端。(使用全局变量当接口的设计很差!)
代码语言:javascript复制uint64 SPI_processed = 0; // 行数
SPITupleTable *SPI_tuptable = NULL; // 数据
int SPI_result = 0; // 执行结果
例子
代码语言:javascript复制直接执行:
```c
cat << EOF > spitest.c
#include "postgres.h"
#include "executor/spi.h"
#include "utils/builtins.h"
#include "fmgr.h"
PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(sptest1);
Datum
sptest1(PG_FUNCTION_ARGS)
{
char *sql10 = "select s::int, left(random()::text,4) l from generate_series(1,10) s";
int ret;
int proc;
SPI_connect();
ret = SPI_exec(sql10, 0);
proc = SPI_processed;
if (ret > 0 && SPI_tuptable != NULL)
{
代码语言:txt复制SPITupleTable *tuptable = SPI_tuptable;
代码语言:txt复制TupleDesc tupdesc = tuptable->tupdesc;
代码语言:txt复制char buf[8192];
代码语言:txt复制uint64 j;
代码语言:txt复制for (j = 0; j < tuptable->numvals; j )
代码语言:txt复制{
代码语言:txt复制 HeapTuple tuple = tuptable->vals[j];
代码语言:txt复制 int i;
代码语言:txt复制 for (i = 1, buf[0] = 0; i <= tupdesc->natts; i )
代码语言:txt复制 snprintf(buf strlen(buf),
代码语言:txt复制 sizeof(buf) - strlen(buf), " %4s%4s",
代码语言:txt复制 SPI_getvalue(tuple, tupdesc, i),
代码语言:txt复制 (i == tupdesc->natts) ? " " : " |");
代码语言:txt复制 elog(INFO, "%s", buf);
代码语言:txt复制}
}
SPI_finish();
return (proc);
}
EOF
gcc -O0 -Wall -I /home/mingjiegao/dev/src/postgres/src/include -g -shared -fpic -o spitest.so spitest.c
代码语言:txt复制psql执行:
```javascript
postgres=# select sptest1();
INFO: 1 | 0.10
INFO: 2 | 0.18
INFO: 3 | 0.01
INFO: 4 | 0.78
INFO: 5 | 0.60
INFO: 6 | 0.76
INFO: 7 | 0.18
INFO: 8 | 0.86
INFO: 9 | 0.19
INFO: 10 | 0.99
sptest1
代码语言:txt复制 10
(1 row)
代码语言:txt复制### 1 rStartup = spi_dest_startup
位置
```javascript
sptest1
SPI_exec
代码语言:txt复制SPI_execute
代码语言:txt复制 _SPI_execute_plan
代码语言:txt复制 _SPI_pquery
代码语言:txt复制 ExecutorRun
代码语言:txt复制 standard_ExecutorRun
代码语言:txt复制 spi_dest_startup
代码语言:txt复制流程
```javascript
void
spi_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
{
代码语言:txt复制SPITupleTable *tuptable;
代码语言:txt复制MemoryContext oldcxt;
代码语言:txt复制MemoryContext tuptabcxt;
代码语言:txt复制if (_SPI_current == NULL)
代码语言:txt复制 elog(ERROR, "spi_dest_startup called while not connected to SPI");
代码语言:txt复制if (_SPI_current->tuptable != NULL)
代码语言:txt复制 elog(ERROR, "improper call to spi_dest_startup");
代码语言:txt复制- 从"ExecutorState"切换到"SPI Proc"
- 创建"SPI TupTable",切换到"SPI TupTable"
```javascript
代码语言:txt复制oldcxt = _SPI_procmem(); /* switch to procedure memory context */
代码语言:txt复制tuptabcxt = AllocSetContextCreate(CurrentMemoryContext,
代码语言:txt复制 "SPI TupTable",
代码语言:txt复制 ALLOCSET_DEFAULT_SIZES);
代码语言:txt复制MemoryContextSwitchTo(tuptabcxt);
代码语言:txt复制在"SPI TupTable"中申请`SPITupleTable`结构,由_SPI_current->tuptable记录:
SPITupleTable结构中有:
- `TupleDesc tupdesc;`
- `HeapTuple *vals;`
- `uint64 numvals;`
记录结果集数据。
```javascript
代码语言:txt复制_SPI_current->tuptable = tuptable = (SPITupleTable *)
代码语言:txt复制 palloc0(sizeof(SPITupleTable));
代码语言:txt复制tuptable->tuptabcxt = tuptabcxt;
代码语言:txt复制tuptable->subid = GetCurrentSubTransactionId();
代码语言:txt复制/*
代码语言:txt复制 * The tuptable is now valid enough to be freed by AtEOSubXact_SPI, so put
* it onto the SPI context's tuptables list. This will ensure it's not
* leaked even in the unlikely event the following few lines fail.
*/- _SPI_connection中保存了`slist_head tuptables;`所有活跃的tuptable链表。
- 申请128个HeapTupleData指针位置保存结果数据。
```javascript
slist_push_head(&_SPI_current->tuptables, &tuptable->next);
代码语言:txt复制/* set up initial allocations */
代码语言:txt复制tuptable->alloced = 128;
代码语言:txt复制tuptable->vals = (HeapTuple *) palloc(tuptable->alloced * sizeof(HeapTuple));
代码语言:txt复制tuptable->numvals = 0;
代码语言:txt复制tuptable->tupdesc = CreateTupleDescCopy(typeinfo);
代码语言:txt复制MemoryContextSwitchTo(oldcxt);
}
代码语言:txt复制### 2 receiveSlot = spi_printtup
位置
```javascript
sptest1
SPI_exec
代码语言:txt复制SPI_execute
代码语言:txt复制 _SPI_execute_plan
代码语言:txt复制 _SPI_pquery
代码语言:txt复制 ExecutorRun
代码语言:txt复制 standard_ExecutorRun
代码语言:txt复制 ExecutePlan
代码语言:txt复制 spi_printtup
代码语言:txt复制流程
```javascript
bool
spi_printtup(TupleTableSlot slot, DestReceiver self)
{
代码语言:txt复制SPITupleTable *tuptable;
代码语言:txt复制MemoryContext oldcxt;
代码语言:txt复制if (_SPI_current == NULL)
代码语言:txt复制 elog(ERROR, "spi_printtup called while not connected to SPI");
代码语言:txt复制tuptable还没赋值的状态:
`{tupdesc = 0x107ea90, vals = 0x107e678, numvals = 0, alloced = 128, tuptabcxt = 0x107e500, next = {next = 0x0}, subid = 1}`
```javascript
代码语言:txt复制tuptable = _SPI_current->tuptable;
代码语言:txt复制if (tuptable == NULL)
代码语言:txt复制 elog(ERROR, "improper call to spi_printtup");
代码语言:txt复制- 切到"SPI TupTable"
- 分配的128个位置不够用了?不够在申请256个。
```javascript
代码语言:txt复制oldcxt = MemoryContextSwitchTo(tuptable->tuptabcxt);
代码语言:txt复制if (tuptable->numvals >= tuptable->alloced)
代码语言:txt复制{
代码语言:txt复制 /* Double the size of the pointer array */
代码语言:txt复制 uint64 newalloced = tuptable->alloced * 2;
代码语言:txt复制 tuptable->vals = (HeapTuple *) repalloc_huge(tuptable->vals,
代码语言:txt复制 newalloced * sizeof(HeapTuple));
代码语言:txt复制 tuptable->alloced = newalloced;
代码语言:txt复制}
代码语言:txt复制- 调用tts接口函数ExecCopySlotHeapTuple做元组拷贝,这里实际使用的是tts_virtual_copy_heap_tuple,参考《Postgresql源码(76)执行器专用元组格式TupleTableSlot》。
- ExecCopySlotHeapTuple输入tts输出标准存储格式HeapTuple。
```javascript
代码语言:txt复制tuptable->vals[tuptable->numvals] = ExecCopySlotHeapTuple(slot);
代码语言:txt复制(tuptable->numvals) ;
代码语言:txt复制MemoryContextSwitchTo(oldcxt);
代码语言:txt复制return true;
}
代码语言:txt复制
3 rShutdown = donothingCleanup
无
4 rDestroy = donothingCleanup
无