Postgresql源码(82)SPI模块拆解分析一:执行简单SQL获取结果

2022-09-30 08:47:02 浏览数 (1)

SPI简单用例初步分析

1 用例:外挂C函数调用SPI

直接执行:

代码语言:javascript复制
cat << EOF > spitest.c
#include "postgres.h"
#include "executor/spi.h"
#include "utils/builtins.h"
#include "fmgr.h"

PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(sptest1);

Datum
sptest1(PG_FUNCTION_ARGS)
{
  char *sql10 = "select s from generate_series(1,10) s";
  int ret;
  int proc;
  SPI_connect();
  ret = SPI_exec(sql10, 0);
  proc = SPI_processed;
  SPI_finish();
  return (proc);
}
EOF

gcc -O0 -Wall -I /home/mingjiegao/dev/src/postgres/src/include -g -shared -fpic -o spitest.so spitest.c

psql执行:

代码语言:javascript复制
create or replace function sptest1() 
returns int8 
as '/home/mingjiegao/tmp/spitest/spitest.so', 'sptest1' language C STRICT;  

-- 调用函数
postgres=# select sptest1();
 sptest1 
---------
      10
(1 row)

2 执行分析

SPI几个内部全局变量的含义

代码语言:javascript复制
static _SPI_connection *_SPI_stack = NULL;
static _SPI_connection *_SPI_current = NULL;
static int	_SPI_stack_depth = 0;	/* allocated size of _SPI_stack */
static int	_SPI_connected = -1;	/* current stack index */
  • _SPI_stack:栈指针(数组):第一次16个conn、第二次32个conn、第三次64个conn。
  • _SPI_current:当前活跃conn。
  • _SPI_stack_depth:当前栈深入,第一次16、第二次32、第三次64。
  • _SPI_connected:当前活跃conn的索引。

2.1 SPI_connect

栈中申请一个conn位置,用_SPI_current指向,然后初始化结构体变量。

代码语言:javascript复制
int
SPI_connect(void)
{
	return SPI_connect_ext(0);
}

int
SPI_connect_ext(int options)
{
	int			newdepth;

维护栈大小,如果栈还有空间,直接使用;如果就剩一个conn了,扩到原来的一倍。

比如:如果栈已经使用了15个conn,下次就要使用第16个conn了,就开始扩容,扩到32个。

代码语言:javascript复制
	/* Enlarge stack if necessary */
	if (_SPI_stack == NULL)
	{
		if (_SPI_connected != -1 || _SPI_stack_depth != 0)
			elog(ERROR, "SPI stack corrupted");
		newdepth = 16;
		_SPI_stack = (_SPI_connection *)
			MemoryContextAlloc(TopMemoryContext,
							   newdepth * sizeof(_SPI_connection));
		_SPI_stack_depth = newdepth;
	}
	else
	{
		if (_SPI_stack_depth <= 0 || _SPI_stack_depth <= _SPI_connected)
			elog(ERROR, "SPI stack corrupted");
		if (_SPI_stack_depth == _SPI_connected   1)
		{
			newdepth = _SPI_stack_depth * 2;
			_SPI_stack = (_SPI_connection *)
				repalloc(_SPI_stack,
						 newdepth * sizeof(_SPI_connection));
			_SPI_stack_depth = newdepth;
		}
	}

	/* Enter new stack level */
	_SPI_connected  ;
	Assert(_SPI_connected >= 0 && _SPI_connected < _SPI_stack_depth);

	_SPI_current = &(_SPI_stack[_SPI_connected]);
	_SPI_current->processed = 0;
	_SPI_current->tuptable = NULL;
	_SPI_current->execSubid = InvalidSubTransactionId;
	slist_init(&_SPI_current->tuptables);
	_SPI_current->procCxt = NULL;	/* in case we fail to create 'em */
	_SPI_current->execCxt = NULL;
	_SPI_current->connectSubid = GetCurrentSubTransactionId();
	_SPI_current->queryEnv = NULL;
	_SPI_current->atomic = (options & SPI_OPT_NONATOMIC ? false : true);
	_SPI_current->internal_xact = false;
	_SPI_current->outer_processed = SPI_processed;
	_SPI_current->outer_tuptable = SPI_tuptable;
	_SPI_current->outer_result = SPI_result;
  • 创建"SPI Proc"上下文记录到:_SPI_current->procCxt
  • 创建"SPI Exec"上下文记录到:_SPI_current->execCxt
  • 切换到"SPI Proc"
代码语言:javascript复制
	_SPI_current->procCxt = AllocSetContextCreate(_SPI_current->atomic ? TopTransactionContext : PortalContext,
												  "SPI Proc",
												  ALLOCSET_DEFAULT_SIZES);
	_SPI_current->execCxt = AllocSetContextCreate(_SPI_current->atomic ? TopTransactionContext : _SPI_current->procCxt,
												  "SPI Exec",
												  ALLOCSET_DEFAULT_SIZES);
	/* ... and switch to procedure's context */
	_SPI_current->savedcxt = MemoryContextSwitchTo(_SPI_current->procCxt);

	/*
	 * Reset API global variables so that current caller cannot accidentally
	 * depend on state of an outer caller.
	 */
	SPI_processed = 0;
	SPI_tuptable = NULL;
	SPI_result = 0;

	return SPI_OK_CONNECT;
}

2.2 SPI_exec

代码语言:javascript复制
/* Parse, plan, and execute a query string */
int
SPI_execute(const char *src, bool read_only, long tcount)
{
	_SPI_plan	plan;
	SPIExecuteOptions options;
	int			res;

	if (src == NULL || tcount < 0)
		return SPI_ERROR_ARGUMENT;
  • _SPI_begin_call切换到exec状态:切换上下文到SPI Exec
代码语言:javascript复制
	res = _SPI_begin_call(true);
	if (res < 0)
		return res;

	memset(&plan, 0, sizeof(_SPI_plan));
	plan.magic = _SPI_PLAN_MAGIC;
	plan.parse_mode = RAW_PARSE_DEFAULT;
	plan.cursor_options = CURSOR_OPT_PARALLEL_OK;
  • 执行raw_parser
  • 执行CreateOneShotCachedPlan
代码语言:javascript复制
	_SPI_prepare_oneshot_plan(src, &plan);

	memset(&options, 0, sizeof(options));
	options.read_only = read_only;
	options.tcount = tcount;
  • pg_analyze_and_rewrite:分析重写
  • CompleteCachedPlan
  • GetCachedPlan
    • BuildCachedPlan
      • pg_plan_queries:生成执行计划
  • PushActiveSnapshot
  • CreateDestReceiver:创建接收器,指定SPI接收器
  • CreateQueryDesc:给执行器创建执行描述符
  • _SPI_pquery:DML使用Executor执行查询
    • ExecutorStart
    • ExecutorRun
    • ExecutorFinish
    • ExecutorEnd
代码语言:javascript复制
	res = _SPI_execute_plan(&plan, &options,
							InvalidSnapshot, InvalidSnapshot,
							true);

	_SPI_end_call(true);
	return res;
}

0 人点赞