Postgresql源码(38)aclchk鉴权流程分析

2022-05-12 09:03:16 浏览数 (2)

aclchk鉴权分析

全文总结

  1. pg中aclchk.c提供了各种xxx_aclmask函数,这类函数输入需要鉴权的对象、用户、所需权限,返回当前用户有没有所需权限。
  2. 各种xxx_aclmask函数都会集中调用aclmask,所以所有对象其实共享统一的鉴权逻辑。
  3. 鉴权需要acl信息一般记录在系统表的aclxxx列中。在使用时acl信息会解析为数组,每一条存放一个授权信息。在鉴权时,挨个匹配即可。

正文:

pg14测试用户,u1授权,u2访问

代码语言:javascript复制
create user u1 login;
create user u2 login;

场景一:表无权限

总结

代码语言:javascript复制
// 【1】(pg_class_aclmask_ext)从PGCLASS里面查到这个表的信息!pg_class_aclmask
// 【2】(pg_class_aclmask_ext)超级用户直接放过
// 【3】(pg_class_aclmask_ext)找到对象的owner的ID
// 【4】(pg_class_aclmask_ext)PGCLASS中是否记录了这张表的ACL信息?
// 【5】(pg_class_aclmask_ext)PGCLASS中没记ACL信息
// 【6】(acldefault)构造ACL信息
// 【7】(acldefault)acldefault构造授权信息
// 【8】(aclmask)开始鉴权,mask=2表示需要的权限是select
// 【9】(aclmask)鉴权过程:用上面构造的acl解析出类似这样的授权信息({ai_grantee = 16384(u1), ai_grantor = 16384(u1), ai_privs = 127}),对比grantee是不是当前申请权限的用户,如果是继续对比授予的权限包不包含申请的这种,如果都符合,返回result非0。

分析过程

u1

代码语言:javascript复制
create table t1(i int);
insert into t2 values (1);

u2

代码语言:javascript复制
postgres=> select * from t1;
ERROR:  permission denied for table t1

鉴权流程

代码语言:javascript复制
#0  pg_class_aclmask (table_oid=16386, roleid=16385, mask=2, how=ACLMASK_ALL) at aclchk.c:3828
#1  0x0000000000731a50 in ExecCheckRTEPerms (rte=0x1134078) at execMain.c:639
#2  0x00000000007318d5 in ExecCheckRTPerms (rangeTable=0x11572f0, ereport_on_violation=true) at execMain.c:577
#3  0x0000000000731e05 in InitPlan (queryDesc=0x11fede0, eflags=16) at execMain.c:818
#4  0x000000000073125d in standard_ExecutorStart (queryDesc=0x11fede0, eflags=16) at execMain.c:263
#5  0x0000000000730fc5 in ExecutorStart (queryDesc=0x11fede0, eflags=0) at execMain.c:143
#6  0x0000000000975b51 in PortalStart (portal=0x1194c40, params=0x0, eflags=0, snapshot=0x0) at pquery.c:512
#7  0x000000000096fda4 in exec_simple_query (query_string=0x1133300 "select * from t1;") at postgres.c:1175
#8  0x0000000000974319 in PostgresMain (argc=1, argv=0x7ffcaa7a3450, dbname=0x115cb28 "postgres", username=0x115cb08 "u2") at postgres.c:4486
#9  0x00000000008b0c9c in BackendRun (port=0x11548f0) at postmaster.c:4506
#10 0x00000000008b0622 in BackendStartup (port=0x11548f0) at postmaster.c:4228
#11 0x00000000008acb56 in ServerLoop () at postmaster.c:1745
#12 0x00000000008ac42d in PostmasterMain (argc=3, argv=0x112de80) at postmaster.c:1417
#13 0x00000000007ae419 in main (argc=3, argv=0x112de80) at main.c:209

表权限判断核心函数:pg_class_aclmask

代码语言:javascript复制
pg_class_aclmask
pg_class_aclmask(Oid table_oid, Oid roleid, AclMode mask, AclMaskHow how)
                      16386          16385          2              ACLMASK_ALL

AclMode:

typedef uint32 AclMode;			/* a bitmask of privilege bits */
#define ACL_INSERT		(1<<0)	/* for relations */
#define ACL_SELECT		(1<<1)
#define ACL_UPDATE		(1<<2)
#define ACL_DELETE		(1<<3)
#define ACL_TRUNCATE	(1<<4)
#define ACL_REFERENCES	(1<<5)
#define ACL_TRIGGER		(1<<6)
#define ACL_EXECUTE		(1<<7)	/* for functions */
#define ACL_USAGE		(1<<8)	/* for languages, namespaces, FDWs, and
								 * servers */
#define ACL_CREATE		(1<<9)	/* for namespaces and databases */
#define ACL_CREATE_TEMP (1<<10) /* for databases */
#define ACL_CONNECT		(1<<11) /* for databases */
#define N_ACL_RIGHTS	12		/* 1 plus the last 1<<x */
#define ACL_NO_RIGHTS	0

表鉴权具体流程

代码语言:javascript复制
AclMode
pg_class_aclmask_ext(Oid table_oid, Oid roleid, AclMode mask,
					 AclMaskHow how, bool *is_missing)
{
	AclMode		result;
	HeapTuple	tuple;
	Form_pg_class classForm;
	Datum		aclDatum;
	bool		isNull;
	Acl		   *acl;
	Oid			ownerId;

// 【1】从PGCLASS里面查到这个表的信息!
	tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(table_oid));
	if (!HeapTupleIsValid(tuple))
	{
		if (is_missing != NULL)
		{
			/* return "no privileges" instead of throwing an error */
			*is_missing = true;
			return 0;
		}
		else
			ereport(ERROR,
					(errcode(ERRCODE_UNDEFINED_TABLE),
					 errmsg("relation with OID %u does not exist",
							table_oid)));
	}

	classForm = (Form_pg_class) GETSTRUCT(tuple);

	if ((mask & (ACL_INSERT | ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE | ACL_USAGE)) &&
		IsSystemClass(table_oid, classForm) &&
		classForm->relkind != RELKIND_VIEW &&
		!superuser_arg(roleid))
		mask &= ~(ACL_INSERT | ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE | ACL_USAGE);

// 【2】超级用户直接放过
	if (superuser_arg(roleid))
	{
		ReleaseSysCache(tuple);
		return mask;
	}

// 【3】找到对象的owner的ID
	ownerId = classForm->relowner;

// 【4】PGCLASS中是否记录了这张表的ACL信息?
	aclDatum = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_relacl,
							   &isNull);
	if (isNull)
	{
// 【5】PGCLASS中没记ACL信息
		switch (classForm->relkind)
		{
			case RELKIND_SEQUENCE:
				acl = acldefault(OBJECT_SEQUENCE, ownerId);
				break;
			default:
// 【6】构造ACL信息
				acl = acldefault(OBJECT_TABLE, ownerId);
          // 【7】acldefault构造授权信息
          // acl = {vl_len_ = 144, ndim = 1, dataoffset = 0, elemtype = 1033}
				break;
		}
		aclDatum = (Datum) 0;
	}
	else
	{
		/* detoast rel's ACL if necessary */
		acl = DatumGetAclP(aclDatum);
	}
  
  // 【8】开始鉴权,mask=2表示需要的权限是select
	result = aclmask(acl, roleid, ownerId, mask, how);
=====================================
    // 根据acl反解信息
    // num = ACL_NUM(acl) = 1
    // aidat = ACL_DAT(acl) = {ai_grantee = 16384(u1), ai_grantor = 16384(u1), ai_privs = 127}
    // 【9】鉴权过程:用acl解析出上面这样的授权信息,对比grantee是不是当前申请权限的用户,如果是继续对比授予的权限包不包含申请的这种,如果都符合,返回result非0。
    	for (i = 0; i < num; i  ) { 
        AclItem    *aidata = &aidat[i];
        if (aidata->ai_grantee == ACL_ID_PUBLIC || aidata->ai_grantee == roleid) {
          result |= aidata->ai_privs & mask;
          if ((how == ACLMASK_ALL) ? (result == mask) : (result != 0)) return result; 
        } 
      }
=====================================
  
	/* if we have a detoasted copy, free it */
	if (acl && (Pointer) acl != DatumGetPointer(aclDatum))
		pfree(acl);

	ReleaseSysCache(tuple);

	/*
	 * Check if ACL_SELECT is being checked and, if so, and not set already as
	 * part of the result, then check if the user is a member of the
	 * pg_read_all_data role, which allows read access to all relations.
	 */
	if (mask & ACL_SELECT && !(result & ACL_SELECT) &&
		has_privs_of_role(roleid, ROLE_PG_READ_ALL_DATA))
		result |= ACL_SELECT;

	/*
	 * Check if ACL_INSERT, ACL_UPDATE, or ACL_DELETE is being checked and, if
	 * so, and not set already as part of the result, then check if the user
	 * is a member of the pg_write_all_data role, which allows
	 * INSERT/UPDATE/DELETE access to all relations (except system catalogs,
	 * which requires superuser, see above).
	 */
	if (mask & (ACL_INSERT | ACL_UPDATE | ACL_DELETE) &&
		!(result & (ACL_INSERT | ACL_UPDATE | ACL_DELETE)) &&
		has_privs_of_role(roleid, ROLE_PG_WRITE_ALL_DATA))
		result |= (mask & (ACL_INSERT | ACL_UPDATE | ACL_DELETE));

	return result;
}

场景二:表只读权限

总结

代码语言:javascript复制
1、aclmask遍历acl信息中,第二轮查到了{ai_grantee = 16385, ai_grantor = 16384, ai_privs = 2}(这一条是u1执行grant select on t2 to u2后数据库产生的)
2、grantee就是u2可以命中!
3、privs就是2也就是select权限可以命中!
4、结果返回2(非0)表示有读的权限。

u1执行

代码语言:javascript复制
create table t2(i int);
insert into t2 values (2);
grant select on t2 to u2;

u2执行

代码语言:javascript复制
select * from t2;

insert into t2 values (2);

流程

1、aclmask遍历acl信息中,第二轮查到了{ai_grantee = 16385, ai_grantor = 16384, ai_privs = 2}

2、grantee就是u2命中!privs就是2也就是select权限命中!

3、结果返回2表示有读的权限。

代码语言:javascript复制
pg_class_aclmask_ext
  aclmask
    for (i = 0; i < num; i  )
      AclItem    *aidata = &aidat[i];
      if (aidata->ai_grantee == ACL_ID_PUBLIC || aidata->ai_grantee == roleid)
        result |= aidata->ai_privs & mask;
        
        
aidata:
{ai_grantee = 16384, ai_grantor = 16384, ai_privs = 127}
{ai_grantee = 16385, ai_grantor = 16384, ai_privs = 2}

场景三:schema无权限

总结

代码语言:javascript复制
// 【1】(pg_namespace_aclmask)超级用户直接返回
// 【2】(pg_namespace_aclmask)临时namespace?
// 【3】(pg_namespace_aclmask)从pg_namespace表查nspacl
// 【4】(acldefault)查不到acl信息构造一条
// 【5】(aclmask)使用构造的acl信息开始鉴权{ai_grantee = 16384, ai_grantor = 16384, ai_privs = 768}

u1

代码语言:javascript复制
create schema ns1;
create table ns1.t3(i int);
insert into ns1.t3 values (3);

u2

代码语言:javascript复制
postgres=> select * from ns1.t3;
ERROR:  permission denied for schema ns1

堆栈

代码语言:javascript复制
(gdb) bt
#0  pg_namespace_aclcheck (nsp_oid=16395, roleid=16385, mode=256) at aclchk.c:4763
#1  0x00000000005cb463 in LookupExplicitNamespace (nspname=0x1133e18 "ns1", missing_ok=true) at namespace.c:2960
#2  0x00000000005c765a in RangeVarGetRelidExtended (relation=0x1133eb8, lockmode=1, flags=1, callback=0x0, callback_arg=0x0) at namespace.c:326
#3  0x000000000049e902 in relation_openrv_extended (relation=0x1133eb8, lockmode=1, missing_ok=true) at relation.c:186
#4  0x0000000000566628 in table_openrv_extended (relation=0x1133eb8, lockmode=1, missing_ok=true) at table.c:137
...

流程

代码语言:javascript复制
pg_namespace_aclcheck(Oid nsp_oid, Oid roleid, AclMode mode)
  if (pg_namespace_aclmask(nsp_oid, roleid, mode, ACLMASK_ANY) != 0)
    return ACLCHECK_OK;
    
nsp_oid: 16395    (ns1)
roleid:  16385    (u2)
mode:    256      (ACL_USAGE (1<<8) /* for languages, namespaces, FDWs, and servers)

pg_namespace_aclmask

代码语言:javascript复制
AclMode
pg_namespace_aclmask(Oid nsp_oid, Oid roleid,
					 AclMode mask, AclMaskHow how)
{
	AclMode		result;
	HeapTuple	tuple;
	Datum		aclDatum;
	bool		isNull;
	Acl		   *acl;
	Oid			ownerId;

// 【1】超级用户直接返回
  if (superuser_arg(roleid))
		return mask;

// 【2】临时namespace?
	if (isTempNamespace(nsp_oid))
	{
		if (pg_database_aclcheck(MyDatabaseId, roleid,
								 ACL_CREATE_TEMP) == ACLCHECK_OK)
			return mask & ACL_ALL_RIGHTS_SCHEMA;
		else
			return mask & ACL_USAGE;
	}

// 【3】从pg_namespace表查nspacl
	tuple = SearchSysCache1(NAMESPACEOID, ObjectIdGetDatum(nsp_oid));
	if (!HeapTupleIsValid(tuple))
		ereport(ERROR,
				(errcode(ERRCODE_UNDEFINED_SCHEMA),
				 errmsg("schema with OID %u does not exist", nsp_oid)));

	ownerId = ((Form_pg_namespace) GETSTRUCT(tuple))->nspowner;

	aclDatum = SysCacheGetAttr(NAMESPACEOID, tuple, Anum_pg_namespace_nspacl,
							   &isNull);
	// 【4】查不到acl信息构造一条
  if (isNull)
	{
		/* No ACL, so build default ACL */
		acl = acldefault(OBJECT_SCHEMA, ownerId);
		aclDatum = (Datum) 0;
	}
	else
	{
		/* detoast ACL if necessary */
		acl = DatumGetAclP(aclDatum);
	}

// 【5】使用构造的acl信息开始鉴权{ai_grantee = 16384, ai_grantor = 16384, ai_privs = 768}
	result = aclmask(acl, roleid, ownerId, mask, how);

	/* if we have a detoasted copy, free it */
	if (acl && (Pointer) acl != DatumGetPointer(aclDatum))
		pfree(acl);

	ReleaseSysCache(tuple);

	/*
	 * Check if ACL_USAGE is being checked and, if so, and not set already as
	 * part of the result, then check if the user is a member of the
	 * pg_read_all_data or pg_write_all_data roles, which allow usage access
	 * to all schemas.
	 */
	if (mask & ACL_USAGE && !(result & ACL_USAGE) &&
		(has_privs_of_role(roleid, ROLE_PG_READ_ALL_DATA) ||
		 has_privs_of_role(roleid, ROLE_PG_WRITE_ALL_DATA)))
		result |= ACL_USAGE;
	return result;
}

0 人点赞