SqlAlchemy 2.0 中文文档(十二)

2024-06-26 14:36:00 浏览数 (2)

原文:docs.sqlalchemy.org/en/20/contents.html

邻接列表关系

原文:docs.sqlalchemy.org/en/20/orm/self_referential.html

邻接列表模式是一种常见的关系模式,其中表包含对自身的外键引用,换句话说是自引用关系。这是在平面表中表示层次数据的最常见方法。其他方法包括嵌套集,有时称为“修改的先序”,以及材料路径。尽管在 SQL 查询中评估其流畅性时修改的先序具有吸引力,但邻接列表模型可能是满足大多数层次存储需求的最合适模式,原因是并发性、减少的复杂性,以及修改的先序对于能够完全加载子树到应用程序空间的应用程序几乎没有优势。

另请参阅

此部分详细说明了自引用关系的单表版本。有关使用第二个表作为关联表的自引用关系,请参阅自引用多对多关系部分。

在本示例中,我们将使用一个名为Node的单个映射类,表示树结构:

代码语言:javascript复制
class Node(Base):
    __tablename__ = "node"
    id = mapped_column(Integer, primary_key=True)
    parent_id = mapped_column(Integer, ForeignKey("node.id"))
    data = mapped_column(String(50))
    children = relationship("Node")

使用此结构,可以构建如下的图形:

代码语言:javascript复制
root -- ---> child1
        ---> child2 -- --> subchild1
       |               --> subchild2
        ---> child3

可以用数据表示为:

代码语言:javascript复制
id       parent_id     data
---      -------       ----
1        NULL          root
2        1             child1
3        1             child2
4        3             subchild1
5        3             subchild2
6        1             child3

这里的relationship()配置与“正常”的一对多关系的工作方式相同,唯一的例外是,“方向”,即关系是一对多还是多对一,默认假定为一对多。要将关系建立为多对一,需要添加一个额外的指令,称为relationship.remote_side,它是一个Column或一组Column对象,指示应该被视为“远程”的对象:

代码语言:javascript复制
class Node(Base):
    __tablename__ = "node"
    id = mapped_column(Integer, primary_key=True)
    parent_id = mapped_column(Integer, ForeignKey("node.id"))
    data = mapped_column(String(50))
    parent = relationship("Node", remote_side=[id])

在上述情况下,id列被应用为parent relationship()relationship.remote_side,从而将parent_id建立为“本地”端,并且关系随后表现为多对一。

一如既往,两个方向可以结合成一个双向关系,使用两个由relationship.back_populates链接的relationship()构造。

代码语言:javascript复制
class Node(Base):
    __tablename__ = "node"
    id = mapped_column(Integer, primary_key=True)
    parent_id = mapped_column(Integer, ForeignKey("node.id"))
    data = mapped_column(String(50))
    children = relationship("Node", back_populates="parent")
    parent = relationship("Node", back_populates="children", remote_side=[id])

另请参阅

邻接列表 - 更新为 SQLAlchemy 2.0 的工作示例

复合邻接列表

邻接列表关系的一个子类别是在连接条件的“本地”和“远程”两侧都存在特定列的罕见情况。下面是Folder类的一个示例;使用复合主键,account_id列指向自身,以指示位于与父文件夹相同帐户内的子文件夹;而folder_id则指向该帐户内的特定文件夹:

代码语言:javascript复制
class Folder(Base):
    __tablename__ = "folder"
    __table_args__ = (
        ForeignKeyConstraint(
            ["account_id", "parent_id"], ["folder.account_id", "folder.folder_id"]
        ),
    )

    account_id = mapped_column(Integer, primary_key=True)
    folder_id = mapped_column(Integer, primary_key=True)
    parent_id = mapped_column(Integer)
    name = mapped_column(String)

    parent_folder = relationship(
        "Folder", back_populates="child_folders", remote_side=[account_id, folder_id]
    )

    child_folders = relationship("Folder", back_populates="parent_folder")

在上述示例中,我们将account_id传递到relationship.remote_side列表中。relationship()识别到这里的account_id列在两侧都存在,并将“远程”列与它识别为唯一存在于“远程”侧的folder_id列对齐。

自引用查询策略

查询自引用结构的方式与任何其他查询相同:

代码语言:javascript复制
# get all nodes named 'child2'
session.scalars(select(Node).where(Node.data == "child2"))

但是,当尝试沿着树的一个级别从一个外键连接到下一个级别时,需要额外小心。在 SQL 中,从表连接到自身的连接需要至少对表达式的一侧进行“别名”,以便可以明确引用它。

请回想一下 ORM 教程中的选择 ORM 别名,aliased()结构通常用于提供 ORM 实体的“别名”。使用此技术从Node连接到自身的连接如下所示:

代码语言:javascript复制
from sqlalchemy.orm import aliased

nodealias = aliased(Node)
session.scalars(
    select(Node)
    .where(Node.data == "subchild1")
    .join(Node.parent.of_type(nodealias))
    .where(nodealias.data == "child2")
).all()
SELECT  node.id  AS  node_id,
  node.parent_id  AS  node_parent_id,
  node.data  AS  node_data
FROM  node  JOIN  node  AS  node_1
  ON  node.parent_id  =  node_1.id
WHERE  node.data  =  ?
  AND  node_1.data  =  ?
['subchild1',  'child2'] 
```## 配置自引用的急切加载

在正常查询操作期间,通过从父表到子表的连接或外连接来发生关系的急切加载,以便可以从单个 SQL 语句或所有子集合的第二个语句中填充父对象及其直接子集合或引用。SQLAlchemy 的连接和子查询急切加载在连接到相关项时在所有情况下使用别名表,因此与自引用连接兼容。然而,要使用自引用关系进行急切加载,SQLAlchemy 需要告知应该连接和/或查询多少级深度;否则,急切加载将根本不会发生。此深度设置通过`relationships.join_depth`进行配置:

```py
class Node(Base):
    __tablename__ = "node"
    id = mapped_column(Integer, primary_key=True)
    parent_id = mapped_column(Integer, ForeignKey("node.id"))
    data = mapped_column(String(50))
    children = relationship("Node", lazy="joined", join_depth=2)

session.scalars(select(Node)).all()
SELECT  node_1.id  AS  node_1_id,
  node_1.parent_id  AS  node_1_parent_id,
  node_1.data  AS  node_1_data,
  node_2.id  AS  node_2_id,
  node_2.parent_id  AS  node_2_parent_id,
  node_2.data  AS  node_2_data,
  node.id  AS  node_id,
  node.parent_id  AS  node_parent_id,
  node.data  AS  node_data
FROM  node
  LEFT  OUTER  JOIN  node  AS  node_2
  ON  node.id  =  node_2.parent_id
  LEFT  OUTER  JOIN  node  AS  node_1
  ON  node_2.id  =  node_1.parent_id
[] 

复合邻接列表

邻接列表关系的一个子类别是在连接条件的“本地”和“远程”两侧都存在特定列的罕见情况。下面是Folder类的一个示例;使用复合主键,account_id列指向自身,以指示位于与父文件夹相同帐户内的子文件夹;而folder_id则指向该帐户内的特定文件夹:

代码语言:javascript复制
class Folder(Base):
    __tablename__ = "folder"
    __table_args__ = (
        ForeignKeyConstraint(
            ["account_id", "parent_id"], ["folder.account_id", "folder.folder_id"]
        ),
    )

    account_id = mapped_column(Integer, primary_key=True)
    folder_id = mapped_column(Integer, primary_key=True)
    parent_id = mapped_column(Integer)
    name = mapped_column(String)

    parent_folder = relationship(
        "Folder", back_populates="child_folders", remote_side=[account_id, folder_id]
    )

    child_folders = relationship("Folder", back_populates="parent_folder")

在上面的例子中,我们将account_id传递到relationship.remote_side列表中。relationship()识别到这里的account_id列在两侧均存在,并且将“远程”列与它识别为唯一存在于“远程”一侧的folder_id列对齐。

自引用查询策略

自引用结构的查询与任何其他查询相同:

代码语言:javascript复制
# get all nodes named 'child2'
session.scalars(select(Node).where(Node.data == "child2"))

但是,在尝试从树的一级到下一级进行连接时需要特别注意。在 SQL 中,从表连接到自身需要至少一个表达式的一侧被“别名”,以便可以明确地引用它。

请回想一下在 ORM 教程中选择 ORM 别名,aliased()构造通常用于提供 ORM 实体的“别名”。使用这种技术从Node到自身的连接看起来像:

代码语言:javascript复制
from sqlalchemy.orm import aliased

nodealias = aliased(Node)
session.scalars(
    select(Node)
    .where(Node.data == "subchild1")
    .join(Node.parent.of_type(nodealias))
    .where(nodealias.data == "child2")
).all()
SELECT  node.id  AS  node_id,
  node.parent_id  AS  node_parent_id,
  node.data  AS  node_data
FROM  node  JOIN  node  AS  node_1
  ON  node.parent_id  =  node_1.id
WHERE  node.data  =  ?
  AND  node_1.data  =  ?
['subchild1',  'child2'] 

配置自引用关系的急切加载

通过在正常查询操作期间从父表到子表使用连接或外连接来进行关系的急切加载,以便可以从单个 SQL 语句或所有直接子集合的第二个语句中填充父表及其直接子集合或引用。SQLAlchemy 的连接和子查询急切加载在加入相关项时始终使用别名表,因此与自引用连接兼容。然而,要想使用自引用关系的急切加载,需要告诉 SQLAlchemy 应该加入和/或查询多少级深度;否则,急切加载将根本不会发生。此深度设置通过relationships.join_depth进行配置:

代码语言:javascript复制
class Node(Base):
    __tablename__ = "node"
    id = mapped_column(Integer, primary_key=True)
    parent_id = mapped_column(Integer, ForeignKey("node.id"))
    data = mapped_column(String(50))
    children = relationship("Node", lazy="joined", join_depth=2)

session.scalars(select(Node)).all()
SELECT  node_1.id  AS  node_1_id,
  node_1.parent_id  AS  node_1_parent_id,
  node_1.data  AS  node_1_data,
  node_2.id  AS  node_2_id,
  node_2.parent_id  AS  node_2_parent_id,
  node_2.data  AS  node_2_data,
  node.id  AS  node_id,
  node.parent_id  AS  node_parent_id,
  node.data  AS  node_data
FROM  node
  LEFT  OUTER  JOIN  node  AS  node_2
  ON  node.id  =  node_2.parent_id
  LEFT  OUTER  JOIN  node  AS  node_1
  ON  node_2.id  =  node_1.parent_id
[] 

配置关系连接

原文:docs.sqlalchemy.org/en/20/orm/join_conditions.html

relationship()通常会通过检查两个表之间的外键关系来创建两个表之间的连接,以确定应该比较哪些列。有各种情况需要定制此行为。

处理多个连接路径

处理的最常见情况之一是两个表之间存在多个外键路径时。

考虑一个包含两个外键到Address类的Customer类:

代码语言:javascript复制
from sqlalchemy import Integer, ForeignKey, String, Column
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import relationship

class Base(DeclarativeBase):
    pass

class Customer(Base):
    __tablename__ = "customer"
    id = mapped_column(Integer, primary_key=True)
    name = mapped_column(String)

    billing_address_id = mapped_column(Integer, ForeignKey("address.id"))
    shipping_address_id = mapped_column(Integer, ForeignKey("address.id"))

    billing_address = relationship("Address")
    shipping_address = relationship("Address")

class Address(Base):
    __tablename__ = "address"
    id = mapped_column(Integer, primary_key=True)
    street = mapped_column(String)
    city = mapped_column(String)
    state = mapped_column(String)
    zip = mapped_column(String)

当我们尝试使用上述映射时,将产生错误:

代码语言:javascript复制
sqlalchemy.exc.AmbiguousForeignKeysError: Could not determine join
condition between parent/child tables on relationship
Customer.billing_address - there are multiple foreign key
paths linking the tables.  Specify the 'foreign_keys' argument,
providing a list of those columns which should be
counted as containing a foreign key reference to the parent table.

上述消息相当长。relationship()可以返回许多潜在消息,这些消息已经被精心设计用于检测各种常见的配置问题;大多数都会建议需要哪些额外配置来解决模糊或其他缺失的信息。

在这种情况下,消息希望我们通过为每个指定的relationship()指定应该考虑哪个外键列来修饰每一个,并且适当的形式如下:

代码语言:javascript复制
class Customer(Base):
    __tablename__ = "customer"
    id = mapped_column(Integer, primary_key=True)
    name = mapped_column(String)

    billing_address_id = mapped_column(Integer, ForeignKey("address.id"))
    shipping_address_id = mapped_column(Integer, ForeignKey("address.id"))

    billing_address = relationship("Address", foreign_keys=[billing_address_id])
    shipping_address = relationship("Address", foreign_keys=[shipping_address_id])

在上面,我们指定了foreign_keys参数,它是一个ColumnColumn对象的列表,指示要考虑的“外键”列,或者换句话说,包含引用父表的值的列。从Customer对象加载Customer.billing_address关系将使用billing_address_id中的值来标识要加载的Address行;类似地,shipping_address_id用于shipping_address关系。这两列的关联在持久性期间也起到了作用;刚刚插入的Address对象的新生成的主键将在刷新期间复制到关联的Customer对象的适当外键列中。

在使用 Declarative 指定foreign_keys时,我们还可以使用字符串名称进行指定,但是如果使用列表,列表应该是字符串的一部分是很重要的:

代码语言:javascript复制
billing_address = relationship("Address", foreign_keys="[Customer.billing_address_id]")

在这个具体的例子中,在任何情况下列表都是不必要的,因为我们只需要一个Column

代码语言:javascript复制
billing_address = relationship("Address", foreign_keys="Customer.billing_address_id")

警告

当作为 Python 可执行字符串传递时,relationship.foreign_keys 参数将使用 Python 的 eval() 函数进行解释。请勿将不受信任的输入传递给此字符串。详细信息请参见关系参数的评估。

relationship() 在构建连接时的默认行为是将一侧的主键列的值等同于另一侧的外键引用列的值。我们可以使用 relationship.primaryjoin 参数来更改此条件,以及在使用“次要”表时,还可以使用 relationship.secondaryjoin 参数。

在下面的示例中,我们使用 User 类以及存储街道地址的 Address 类来创建一个关系 boston_addresses,它将仅加载指定城市为“波士顿”的 Address 对象:

代码语言:javascript复制
from sqlalchemy import Integer, ForeignKey, String, Column
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import relationship

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "user"
    id = mapped_column(Integer, primary_key=True)
    name = mapped_column(String)
    boston_addresses = relationship(
        "Address",
        primaryjoin="and_(User.id==Address.user_id, Address.city=='Boston')",
    )

class Address(Base):
    __tablename__ = "address"
    id = mapped_column(Integer, primary_key=True)
    user_id = mapped_column(Integer, ForeignKey("user.id"))

    street = mapped_column(String)
    city = mapped_column(String)
    state = mapped_column(String)
    zip = mapped_column(String)

在此字符串 SQL 表达式中,我们使用了 and_() 连接构造来为连接条件建立两个不同的谓词 - 将 User.idAddress.user_id 列相互连接,同时将 Address 中的行限制为只有 city='Boston'。在使用声明式时,诸如 and_() 这样的基本 SQL 函数会自动在字符串 relationship() 参数的评估命名空间中可用。

警告

当作为 Python 可执行字符串传递时,relationship.primaryjoin 参数将使用 Python 的 eval() 函数进行解释。请勿将不受信任的输入传递给此字符串。详细信息请参见关系参数的评估。

我们在relationship.primaryjoin中使用的自定义标准通常只在 SQLAlchemy 渲染 SQL 以加载或表示此关系时才重要。也就是说,它用于在执行每个属性的延迟加载时发出的 SQL 语句中,或者在查询时构造联接时,例如通过Select.join()或通过渴望的“joined”或“subquery”加载样式。当操作内存中的对象时,我们可以将任何我们想要的Address对象放入boston_addresses集合中,而不管.city属性的值是什么。这些对象将一直保留在集合中,直到属性过期并重新从应用准则的数据库中加载为止。当发生刷新时,boston_addresses内的对象将被无条件地刷新,将主键user.id列的值分配给每一行的持有外键的address.user_id列。在这里,city标准没有影响,因为刷新过程只关心将主键值同步到引用外键值中。## 创建自定义外键条件

主要连接条件的另一个元素是如何确定那些被认为是“外部”的列的。通常,一些Column对象的子集将指定ForeignKey,或者否则将是与连接条件相关的ForeignKeyConstraint的一部分。relationship()查找此外键状态,因为它决定了它应该如何加载和持久化此关系的数据。然而,relationship.primaryjoin参数可以用来创建不涉及任何“架构”级外键的连接条件。我们可以结合relationship.primaryjoin以及relationship.foreign_keysrelationship.remote_side显式地建立这样一个连接。

下面,一个名为HostEntry的类与自身连接,将字符串content列与ip_address列相等,后者是一种名为INET的 PostgreSQL 类型。我们需要使用cast()来将连接的一侧转换为另一侧的类型:

代码语言:javascript复制
from sqlalchemy import cast, String, Column, Integer
from sqlalchemy.orm import relationship
from sqlalchemy.dialects.postgresql import INET

from sqlalchemy.orm import DeclarativeBase

class Base(DeclarativeBase):
    pass

class HostEntry(Base):
    __tablename__ = "host_entry"

    id = mapped_column(Integer, primary_key=True)
    ip_address = mapped_column(INET)
    content = mapped_column(String(50))

    # relationship() using explicit foreign_keys, remote_side
    parent_host = relationship(
        "HostEntry",
        primaryjoin=ip_address == cast(content, INET),
        foreign_keys=content,
        remote_side=ip_address,
    )

上述关系将产生如下连接:

代码语言:javascript复制
SELECT  host_entry.id,  host_entry.ip_address,  host_entry.content
FROM  host_entry  JOIN  host_entry  AS  host_entry_1
ON  host_entry_1.ip_address  =  CAST(host_entry.content  AS  INET)

上述的另一种替代语法是在relationship.primaryjoin表达式内联使用foreign()remote()注释。此语法表示了relationship()通常自动应用于连接条件的注释,给定了relationship.foreign_keysrelationship.remote_side参数。当存在显式连接条件时,这些函数可能更加简洁,并且还标记了“外键”或“远程”列的确切位置,无论该列是否多次声明或在复杂的 SQL 表达式中声明:

代码语言:javascript复制
from sqlalchemy.orm import foreign, remote

class HostEntry(Base):
    __tablename__ = "host_entry"

    id = mapped_column(Integer, primary_key=True)
    ip_address = mapped_column(INET)
    content = mapped_column(String(50))

    # relationship() using explicit foreign() and remote() annotations
    # in lieu of separate arguments
    parent_host = relationship(
        "HostEntry",
        primaryjoin=remote(ip_address) == cast(foreign(content), INET),
    )
```## 在连接条件中使用自定义运算符

关于关系的另一个用例是使用自定义运算符,例如在与`INET`和`CIDR`等类型结合时,使用 PostgreSQL 的“包含于”`<<`运算符。对于自定义布尔运算符,我们使用`Operators.bool_op()`函数:

```py
inet_column.bool_op("<<")(cidr_column)

类似上述的比较可以直接在构建relationship()时,与relationship.primaryjoin一起使用:

代码语言:javascript复制
class IPA(Base):
    __tablename__ = "ip_address"

    id = mapped_column(Integer, primary_key=True)
    v4address = mapped_column(INET)

    network = relationship(
        "Network",
        primaryjoin="IPA.v4address.bool_op('<<')(foreign(Network.v4representation))",
        viewonly=True,
    )

class Network(Base):
    __tablename__ = "network"

    id = mapped_column(Integer, primary_key=True)
    v4representation = mapped_column(CIDR)

上面的查询如下:

代码语言:javascript复制
select(IPA).join(IPA.network)

将呈现为:

代码语言:javascript复制
SELECT  ip_address.id  AS  ip_address_id,  ip_address.v4address  AS  ip_address_v4address
FROM  ip_address  JOIN  network  ON  ip_address.v4address  <<  network.v4representation
```## 基于 SQL 函数的自定义运算符

`Operators.op.is_comparison`的用例的一种变体是当我们不使用运算符,而是使用 SQL 函数时。这种用例的典型示例是 PostgreSQL PostGIS 函数,但任何数据库中解析为二进制条件的 SQL 函数都可以应用。为了适应这种用例,`FunctionElement.as_comparison()`方法可以修改任何 SQL 函数,例如从`func`命名空间调用的函数,以指示 ORM 函数生成两个表达式的比较。下面的示例用[Geoalchemy2](https://geoalchemy-2.readthedocs.io/)库说明了这一点:

```py
from geoalchemy2 import Geometry
from sqlalchemy import Column, Integer, func
from sqlalchemy.orm import relationship, foreign

class Polygon(Base):
    __tablename__ = "polygon"
    id = mapped_column(Integer, primary_key=True)
    geom = mapped_column(Geometry("POLYGON", srid=4326))
    points = relationship(
        "Point",
        primaryjoin="func.ST_Contains(foreign(Polygon.geom), Point.geom).as_comparison(1, 2)",
        viewonly=True,
    )

class Point(Base):
    __tablename__ = "point"
    id = mapped_column(Integer, primary_key=True)
    geom = mapped_column(Geometry("POINT", srid=4326))

在上面,FunctionElement.as_comparison()表明func.ST_Contains() SQL 函数正在比较Polygon.geomPoint.geom表达式。foreign()注释另外指出了在此特定关系中哪个列承担“外键”角色。

1.3 版本中的新增功能:添加了FunctionElement.as_comparison()。## 重叠的外键

当使用复合外键时,可能会出现罕见的情况,使得单个列可能是通过外键约束引用的多个列的主题。

考虑一个(诚然复杂的)映射,如Magazine对象,由Writer对象和Article对象使用包含magazine_id的复合主键方案引用;然后为了使Article也引用WriterArticle.magazine_id涉及到两个单独的关系;Article.magazineArticle.writer

代码语言:javascript复制
class Magazine(Base):
    __tablename__ = "magazine"

    id = mapped_column(Integer, primary_key=True)

class Article(Base):
    __tablename__ = "article"

    article_id = mapped_column(Integer)
    magazine_id = mapped_column(ForeignKey("magazine.id"))
    writer_id = mapped_column()

    magazine = relationship("Magazine")
    writer = relationship("Writer")

    __table_args__ = (
        PrimaryKeyConstraint("article_id", "magazine_id"),
        ForeignKeyConstraint(
            ["writer_id", "magazine_id"], ["writer.id", "writer.magazine_id"]
        ),
    )

class Writer(Base):
    __tablename__ = "writer"

    id = mapped_column(Integer, primary_key=True)
    magazine_id = mapped_column(ForeignKey("magazine.id"), primary_key=True)
    magazine = relationship("Magazine")

配置上述映射后,我们将看到发出此警告:

代码语言:javascript复制
SAWarning: relationship 'Article.writer' will copy column
writer.magazine_id to column article.magazine_id,
which conflicts with relationship(s): 'Article.magazine'
(copies magazine.id to article.magazine_id). Consider applying
viewonly=True to read-only relationships, or provide a primaryjoin
condition marking writable columns with the foreign() annotation.

这指的是Article.magazine_id是两个不同外键约束的主题的事实;它直接引用Magazine.id作为源列,但也在Writer的复合键的上下文中引用Writer.magazine_id作为源列。如果我们将Article与特定的Magazine关联起来,然后将Article与与不同Magazine关联的Writer关联起来,ORM 将非确定性地覆盖Article.magazine_id,在不通知的情况下更改我们引用的杂志;如果我们将WriterArticle中解除关联,它还可能尝试将 NULL 放入此列。警告让我们知道这是这种情况。

要解决这个问题,我们需要将Article的行为分解,包括以下三个特性:

  1. 首先,Article根据仅在Article.magazine关系中持久化的数据写入Article.magazine_id,即从Magazine.id复制的值。
  2. Article可以代表在Article.writer关系中持久化的数据写入Article.writer_id,但仅限于Writer.id列;Writer.magazine_id列不应写入Article.magazine_id,因为它最终源自Magazine.id
  3. 当加载Article.writer时,Article会考虑Article.magazine_id,尽管它在此关系中不会向其写入。

要获得仅#1 和#2,我们可以仅指定Article.writer_id作为Article.writer的“外键”:

代码语言:javascript复制
class Article(Base):
    # ...

    writer = relationship("Writer", foreign_keys="Article.writer_id")

然而,这会导致Article.writer在与Writer查询时不考虑Article.magazine_id

代码语言:javascript复制
SELECT  article.article_id  AS  article_article_id,
  article.magazine_id  AS  article_magazine_id,
  article.writer_id  AS  article_writer_id
FROM  article
JOIN  writer  ON  writer.id  =  article.writer_id

因此,为了获得所有#1、#2 和#3,我们通过完全组合relationship.primaryjoinrelationship.foreign_keys参数,或者更简洁地通过用foreign()注释来表达连接条件以及要写入的列:

代码语言:javascript复制
class Article(Base):
    # ...

    writer = relationship(
        "Writer",
        primaryjoin="and_(Writer.id == foreign(Article.writer_id), "
        "Writer.magazine_id == Article.magazine_id)",
    )

非关系对比/实现路径

警告

此部分详细介绍了一个实验性功能。

使用自定义表达式意味着我们可以生成不遵循通常的主键/外键模型的非正统连接条件。其中一个例子是实现路径模式,其中我们比较字符串以产生重叠路径标记,以便生成树结构。

通过精心使用foreign()remote(),我们可以构建一个有效地产生基本的物化路径系统的关系。基本上,当foreign()remote()相同的比较表达式的一侧时,关系被视为“一对多”;当它们在不同的一侧时,关系被视为“多对一”。对于我们将在此处使用的比较,我们将处理集合,因此保持配置为“一对多”:

代码语言:javascript复制
class Element(Base):
    __tablename__ = "element"

    path = mapped_column(String, primary_key=True)

    descendants = relationship(
        "Element",
        primaryjoin=remote(foreign(path)).like(path.concat("/%")),
        viewonly=True,
        order_by=path,
    )

上述情况下,如果给定一个具有路径属性为"/foo/bar2"Element对象,我们寻求加载Element.descendants以如下形式:

代码语言:javascript复制
SELECT  element.path  AS  element_path
FROM  element
WHERE  element.path  LIKE  ('/foo/bar2'  ||  '/%')  ORDER  BY  element.path

自引用多对多关系

另见

本节记录了“邻接列表”模式的两表变体,该模式在邻接列表关系有所描述。务必查看子节自引用查询策略和配置自引用急切加载,这两者同样适用于此处讨论的映射模式。

多对多关系可以通过relationship.primaryjoinrelationship.secondaryjoin中的一个或两个进行自定义 - 后者对于使用relationship.secondary参数指定多对多引用的关系非常重要。一个常见的情况涉及使用relationship.primaryjoinrelationship.secondaryjoin来建立从一个类到自身的多对多关系,如下所示:

代码语言:javascript复制
from typing import List

from sqlalchemy import Integer, ForeignKey, Column, Table
from sqlalchemy.orm import DeclarativeBase, Mapped
from sqlalchemy.orm import mapped_column, relationship

class Base(DeclarativeBase):
    pass

node_to_node = Table(
    "node_to_node",
    Base.metadata,
    Column("left_node_id", Integer, ForeignKey("node.id"), primary_key=True),
    Column("right_node_id", Integer, ForeignKey("node.id"), primary_key=True),
)

class Node(Base):
    __tablename__ = "node"
    id: Mapped[int] = mapped_column(primary_key=True)
    label: Mapped[str]
    right_nodes: Mapped[List["Node"]] = relationship(
        "Node",
        secondary=node_to_node,
        primaryjoin=id == node_to_node.c.left_node_id,
        secondaryjoin=id == node_to_node.c.right_node_id,
        back_populates="left_nodes",
    )
    left_nodes: Mapped[List["Node"]] = relationship(
        "Node",
        secondary=node_to_node,
        primaryjoin=id == node_to_node.c.right_node_id,
        secondaryjoin=id == node_to_node.c.left_node_id,
        back_populates="right_nodes",
    )

在上述情况下,SQLAlchemy 无法自动知道哪些列应该连接到right_nodesleft_nodes关系。relationship.primaryjoinrelationship.secondaryjoin参数确定我们希望如何连接到关联表。在上面的声明形式中,由于我们正在声明这些条件,因此id变量直接可用作我们希望与之连接的Column对象。

或者,我们可以使用字符串来定义relationship.primaryjoinrelationship.secondaryjoin参数,这在我们的配置中可能没有Node.id列对象可用,或者node_to_node表可能还没有可用时很合适。当在声明字符串中引用普通的Table对象时,我们使用表的字符串名称,就像它在MetaData中一样:

代码语言:javascript复制
class Node(Base):
    __tablename__ = "node"
    id = mapped_column(Integer, primary_key=True)
    label = mapped_column(String)
    right_nodes = relationship(
        "Node",
        secondary="node_to_node",
        primaryjoin="Node.id==node_to_node.c.left_node_id",
        secondaryjoin="Node.id==node_to_node.c.right_node_id",
        backref="left_nodes",
    )

警告

当作为 Python 可评估字符串传递时,relationship.primaryjoinrelationship.secondaryjoin参数使用 Python 的eval()函数进行解释。不要将不受信任的输入传递给这些字符串。有关声明式评估relationship()参数的详细信息,请参阅关系参数的评估。

在此处的经典映射情况类似,其中node_to_node可以连接到node.c.id

代码语言:javascript复制
from sqlalchemy import Integer, ForeignKey, String, Column, Table, MetaData
from sqlalchemy.orm import relationship, registry

metadata_obj = MetaData()
mapper_registry = registry()

node_to_node = Table(
    "node_to_node",
    metadata_obj,
    Column("left_node_id", Integer, ForeignKey("node.id"), primary_key=True),
    Column("right_node_id", Integer, ForeignKey("node.id"), primary_key=True),
)

node = Table(
    "node",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("label", String),
)

class Node:
    pass

mapper_registry.map_imperatively(
    Node,
    node,
    properties={
        "right_nodes": relationship(
            Node,
            secondary=node_to_node,
            primaryjoin=node.c.id == node_to_node.c.left_node_id,
            secondaryjoin=node.c.id == node_to_node.c.right_node_id,
            backref="left_nodes",
        )
    },
)

请注意,在两个示例中,relationship.backref关键字指定了一个left_nodes的 backref - 当relationship()在相反方向创建第二个关系时,它足够智能以反转relationship.primaryjoinrelationship.secondaryjoin参数。

另请参阅

  • 邻接列表关系 - 单表版本
  • 自引用查询策略 - 关于使用自引用映射进行查询的提示
  • 配置自引用急切加载 - 使用自引用映射进行急切加载的提示 ## 复合“次要”连接

注意

本节介绍了 SQLAlchemy 支持的一些边缘案例,但建议尽可能以更简单的方式解决这类问题,例如使用合理的关系布局和/或 Python 属性内部。

有时,当需要在两个表之间建立relationship()时,需要涉及更多的表才能将它们连接起来。这是一种relationship()的领域,人们试图推动可能性的边界,而这类奇特用例的最终解决方案通常需要在 SQLAlchemy 邮件列表上讨论出来。

在较新的 SQLAlchemy 版本中,relationship.secondary参数可以在某些情况下使用,以提供由多个表组成的复合目标。下面是这种连接条件的示例(至少需要版本 0.9.2 才能正常运行):

代码语言:javascript复制
class A(Base):
    __tablename__ = "a"

    id = mapped_column(Integer, primary_key=True)
    b_id = mapped_column(ForeignKey("b.id"))

    d = relationship(
        "D",
        secondary="join(B, D, B.d_id == D.id).join(C, C.d_id == D.id)",
        primaryjoin="and_(A.b_id == B.id, A.id == C.a_id)",
        secondaryjoin="D.id == B.d_id",
        uselist=False,
        viewonly=True,
    )

class B(Base):
    __tablename__ = "b"

    id = mapped_column(Integer, primary_key=True)
    d_id = mapped_column(ForeignKey("d.id"))

class C(Base):
    __tablename__ = "c"

    id = mapped_column(Integer, primary_key=True)
    a_id = mapped_column(ForeignKey("a.id"))
    d_id = mapped_column(ForeignKey("d.id"))

class D(Base):
    __tablename__ = "d"

    id = mapped_column(Integer, primary_key=True)

在上面的示例中,我们直接引用了命名为abcd的表,提供了relationship.secondaryrelationship.primaryjoinrelationship.secondaryjoin这三个声明式样式的参数。从AD的查询如下所示:

代码语言:javascript复制
sess.scalars(select(A).join(A.d)).all()

SELECT  a.id  AS  a_id,  a.b_id  AS  a_b_id
FROM  a  JOIN  (
  b  AS  b_1  JOIN  d  AS  d_1  ON  b_1.d_id  =  d_1.id
  JOIN  c  AS  c_1  ON  c_1.d_id  =  d_1.id)
  ON  a.b_id  =  b_1.id  AND  a.id  =  c_1.a_id  JOIN  d  ON  d.id  =  b_1.d_id 

在上面的示例中,我们利用能够将多个表放入“secondary”容器的优势,以便我们可以跨多个表进行连接,同时保持对relationship()的“简化”,在这种情况下,“左”和“右”两侧都只有“一个”表;复杂性保持在中间。

警告

像上面的关系通常标记为viewonly=True,使用relationship.viewonly,应视为只读。虽然有时可以使类似上面的关系可写,但这通常很复杂且容易出错。

另请参阅

使用 viewonly 关系参数的注意事项 ## 别名类的关系

在前一节中,我们介绍了一种技术,其中我们使用relationship.secondary来在连接条件中放置额外的表。有一种复杂的连接情况,即使使用这种技术也不够;当我们试图从A连接到B,并在其中使用任意数量的CD等,但AB之间也直接有连接条件时。在这种情况下,仅仅使用一个复杂的relationship.primaryjoin条件可能难以表达从AB的连接,因为中间表可能需要特殊处理,并且也不能用relationship.secondary对象来表达,因为A->secondary->B模式不支持AB之间的任何引用。当出现这种极其复杂的情况时,我们可以采用创建第二个映射作为关系目标的方法。这就是我们使用AliasedClass来制作一个包含我们所需的所有额外表的类的映射。为了将此映射作为我们类的“替代”映射生成,我们使用aliased()函数生成新的构造,然后针对该对象使用relationship(),就像它是一个普通的映射类一样。

下面说明了一个从ABrelationship(),其中主要连接条件增加了两个额外的实体CD,这两个实体必须同时与AB中的行对齐:

代码语言:javascript复制
class A(Base):
    __tablename__ = "a"

    id = mapped_column(Integer, primary_key=True)
    b_id = mapped_column(ForeignKey("b.id"))

class B(Base):
    __tablename__ = "b"

    id = mapped_column(Integer, primary_key=True)

class C(Base):
    __tablename__ = "c"

    id = mapped_column(Integer, primary_key=True)
    a_id = mapped_column(ForeignKey("a.id"))

    some_c_value = mapped_column(String)

class D(Base):
    __tablename__ = "d"

    id = mapped_column(Integer, primary_key=True)
    c_id = mapped_column(ForeignKey("c.id"))
    b_id = mapped_column(ForeignKey("b.id"))

    some_d_value = mapped_column(String)

# 1. set up the join() as a variable, so we can refer
# to it in the mapping multiple times.
j = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)

# 2. Create an AliasedClass to B
B_viacd = aliased(B, j, flat=True)

A.b = relationship(B_viacd, primaryjoin=A.b_id == j.c.b_id)

使用上述映射,简单的连接如下所示:

代码语言:javascript复制
sess.scalars(select(A).join(A.b)).all()

SELECT  a.id  AS  a_id,  a.b_id  AS  a_b_id
FROM  a  JOIN  (b  JOIN  d  ON  d.b_id  =  b.id  JOIN  c  ON  c.id  =  d.c_id)  ON  a.b_id  =  b.id 
将别名类映射与类型和避免早期映射器配置集成

对映射类使用aliased()构造会强制执行configure_mappers()步骤,该步骤将解析所有当前类及其关系。如果当前映射需要但尚未声明的不相关映射类,或者如果关系本身的配置需要访问尚未声明的类,则可能会出现问题。此外,当关系提前声明时,SQLAlchemy 的声明模式与 Python 类型最有效地配合使用。

为了组织关系的构建以解决这些问题,可以使用像MapperEvents.before_mapper_configured()这样的配置级事件钩子,该钩子仅在所有映射准备好进行配置时才会调用配置代码:

代码语言:javascript复制
from sqlalchemy import event

class A(Base):
    __tablename__ = "a"

    id = mapped_column(Integer, primary_key=True)
    b_id = mapped_column(ForeignKey("b.id"))

@event.listens_for(A, "before_mapper_configured")
def _configure_ab_relationship(mapper, cls):
    # do the above configuration in a configuration hook

    j = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)
    B_viacd = aliased(B, j, flat=True)
    A.b = relationship(B_viacd, primaryjoin=A.b_id == j.c.b_id)

上面,函数_configure_ab_relationship()仅在请求完全配置的A版本时调用,此时类BDC将可用。

对于与内联类型配合使用的方法,可以使用类似的技术有效地生成用于别名类的“单例”创建模式,其中它作为全局变量进行了延迟初始化,然后可以在关系内联中使用:

代码语言:javascript复制
from typing import Any

B_viacd: Any = None
b_viacd_join: Any = None

class A(Base):
    __tablename__ = "a"

    id: Mapped[int] = mapped_column(primary_key=True)
    b_id: Mapped[int] = mapped_column(ForeignKey("b.id"))

    # 1. the relationship can be declared using lambdas, allowing it to resolve
    #    to targets that are late-configured
    b: Mapped[B] = relationship(
        lambda: B_viacd, primaryjoin=lambda: A.b_id == b_viacd_join.c.b_id
    )

# 2. configure the targets of the relationship using a before_mapper_configured
#    hook.
@event.listens_for(A, "before_mapper_configured")
def _configure_ab_relationship(mapper, cls):
    # 3. set up the join() and AliasedClass as globals from within
    #    the configuration hook.

    global B_viacd, b_viacd_join

    b_viacd_join = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)
    B_viacd = aliased(B, b_viacd_join, flat=True)
在查询中使用 AliasedClass 目标

在前面的示例中,A.b关系指的是B_viacd实体作为目标,而不是直接的B类。要添加涉及A.b关系的额外条件,通常需要直接引用B_viacd,而不是使用B,特别是在A.b的目标实体要转换为别名或子查询的情况下。下面用子查询而不是连接说明了相同的关系:

代码语言:javascript复制
subq = select(B).join(D, D.b_id == B.id).join(C, C.id == D.c_id).subquery()

B_viacd_subquery = aliased(B, subq)

A.b = relationship(B_viacd_subquery, primaryjoin=A.b_id == subq.c.id)

使用上述A.b关系的查询将呈现为一个子查询:

代码语言:javascript复制
sess.scalars(select(A).join(A.b)).all()

SELECT  a.id  AS  a_id,  a.b_id  AS  a_b_id
FROM  a  JOIN  (SELECT  b.id  AS  id,  b.some_b_column  AS  some_b_column
FROM  b  JOIN  d  ON  d.b_id  =  b.id  JOIN  c  ON  c.id  =  d.c_id)  AS  anon_1  ON  a.b_id  =  anon_1.id 

如果我们想要基于A.b连接添加额外的条件,则必须根据B_viacd_subquery而不是直接根据B来做:

代码语言:javascript复制
sess.scalars(
    select(A)
    .join(A.b)
    .where(B_viacd_subquery.some_b_column == "some b")
    .order_by(B_viacd_subquery.id)
).all()

SELECT  a.id  AS  a_id,  a.b_id  AS  a_b_id
FROM  a  JOIN  (SELECT  b.id  AS  id,  b.some_b_column  AS  some_b_column
FROM  b  JOIN  d  ON  d.b_id  =  b.id  JOIN  c  ON  c.id  =  d.c_id)  AS  anon_1  ON  a.b_id  =  anon_1.id
WHERE  anon_1.some_b_column  =  ?  ORDER  BY  anon_1.id 
```## 使用窗口函数限制行关系

另一个与`AliasedClass`对象关系的有趣用例是在关系需要连接到任意形式的专门 SELECT 的情况下。一种情况是当需要使用窗口函数时,例如限制应返回多少行以供关系使用。下面的示例说明了一个非主映射器关系,它将为每个集合加载前十个项目:

```py
class A(Base):
    __tablename__ = "a"

    id = mapped_column(Integer, primary_key=True)

class B(Base):
    __tablename__ = "b"
    id = mapped_column(Integer, primary_key=True)
    a_id = mapped_column(ForeignKey("a.id"))

partition = select(
    B, func.row_number().over(order_by=B.id, partition_by=B.a_id).label("index")
).alias()

partitioned_b = aliased(B, partition)

A.partitioned_bs = relationship(
    partitioned_b, primaryjoin=and_(partitioned_b.a_id == A.id, partition.c.index < 10)
)

我们可以将上述partitioned_bs关系与大多数加载器策略一起使用,例如selectinload()

代码语言:javascript复制
for a1 in session.scalars(select(A).options(selectinload(A.partitioned_bs))):
    print(a1.partitioned_bs)  # <-- will be no more than ten objects

上面,“selectinload” 查询如下所示:

代码语言:javascript复制
SELECT
  a_1.id  AS  a_1_id,  anon_1.id  AS  anon_1_id,  anon_1.a_id  AS  anon_1_a_id,
  anon_1.data  AS  anon_1_data,  anon_1.index  AS  anon_1_index
FROM  a  AS  a_1
JOIN  (
  SELECT  b.id  AS  id,  b.a_id  AS  a_id,  b.data  AS  data,
  row_number()  OVER  (PARTITION  BY  b.a_id  ORDER  BY  b.id)  AS  index
  FROM  b)  AS  anon_1
ON  anon_1.a_id  =  a_1.id  AND  anon_1.index  <  %(index_1)s
WHERE  a_1.id  IN  (  ...  primary  key  collection  ...)
ORDER  BY  a_1.id

上述情况下,对于“a”中的每个匹配主键,我们将按“b.id”排序获取前十个“bs”。通过在“a_id”上进行分区,我们确保每个“行号”都是相对于父“a_id”的局部的。

这样的映射通常也会包括从“A”到“B”的“普通”关系,用于持久性操作以及当需要每个“A”的完整“B”对象集合时。## 构建查询可用的属性

非常雄心勃勃的自定义连接条件可能无法直接持久化,并且在某些情况下甚至可能无法正确加载。要删除等式的持久性部分,请在relationship()上使用标志relationship.viewonly,将其建立为只读属性(写入到集合的数据将在刷新时被忽略)。然而,在极端情况下,考虑与Query一起使用普通的 Python 属性,如下所示:

代码语言:javascript复制
class User(Base):
    __tablename__ = "user"
    id = mapped_column(Integer, primary_key=True)

    @property
    def addresses(self):
        return object_session(self).query(Address).with_parent(self).filter(...).all()

在其他情况下,描述符可以构建以利用现有的 Python 数据。有关使用描述符和混合体的更一般讨论,请参见使用描述符和混合体部分。

另见

使用描述符和混合体 ## 关于使用 viewonly 关系参数的注意事项

当应用于relationship()构造时,relationship.viewonly参数指示这个relationship()不会参与任何 ORM 工作单元操作,并且该属性不希望在其表示的集合的 Python 变异中参与。这意味着虽然只读关系可能引用一个可变的 Python 集合,如列表或集合,但对该列表或集合进行更改,如在映射实例上存在的那样,对 ORM 刷新过程没有影响

要探索这种情况,请考虑这种映射:

代码语言:javascript复制
from __future__ import annotations

import datetime

from sqlalchemy import and_
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "user_account"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str | None]

    all_tasks: Mapped[list[Task]] = relationship()

    current_week_tasks: Mapped[list[Task]] = relationship(
        primaryjoin=lambda: and_(
            User.id == Task.user_account_id,
            # this expression works on PostgreSQL but may not be supported
            # by other database engines
            Task.task_date >= func.now() - datetime.timedelta(days=7),
        ),
        viewonly=True,
    )

class Task(Base):
    __tablename__ = "task"

    id: Mapped[int] = mapped_column(primary_key=True)
    user_account_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
    description: Mapped[str | None]
    task_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())

    user: Mapped[User] = relationship(back_populates="current_week_tasks")

以下各节将注意这种配置的不同方面。

包括反向引用的 Python 中的变异与 viewonly=True 不适用

上述映射将User.current_week_tasks只读关系作为Task.user属性的反向引用目标。这目前并未被 SQLAlchemy 的 ORM 配置过程标记,但这是一个配置错误。改变Task上的.user属性不会影响.current_week_tasks属性:

代码语言:javascript复制
>>> u1 = User()
>>> t1 = Task(task_date=datetime.datetime.now())
>>> t1.user = u1
>>> u1.current_week_tasks
[]

这里还有另一个参数叫做relationship.sync_backrefs,可以在这里打开,以允许在这种情况下对.current_week_tasks进行变异,然而这并不被认为是最佳实践,对于一个只读关系,不应该依赖于 Python 中的变异。

在这种映射中,可以在User.all_tasksTask.user之间配置反向引用,因为这两者都不是只读的,将正常同步。

除了禁用只读关系的反向引用变异问题外,Python 中对User.all_tasks集合的普通更改也不会反映在User.current_week_tasks集合中,直到更改已刷新到数据库。

总的来说,对于一个需要立即响应 Python 中的变异的自定义集合的用例,只读关系通常不合适。更好的方法是使用 SQLAlchemy 的混合属性功能,或者对于仅实例情况,使用 Python 的@property,其中可以实现一个根据当前 Python 实例生成的用户定义集合。要将我们的示例更改为这种方式工作,我们修复Task.user上的relationship.back_populates参数,引用User.all_tasks,然后演示一个简单的@property,将以立即User.all_tasks集合的形式提供结果:

代码语言:javascript复制
class User(Base):
    __tablename__ = "user_account"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str | None]

    all_tasks: Mapped[list[Task]] = relationship(back_populates="user")

    @property
    def current_week_tasks(self) -> list[Task]:
        past_seven_days = datetime.datetime.now() - datetime.timedelta(days=7)
        return [t for t in self.all_tasks if t.task_date >= past_seven_days]

class Task(Base):
    __tablename__ = "task"

    id: Mapped[int] = mapped_column(primary_key=True)
    user_account_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
    description: Mapped[str | None]
    task_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())

    user: Mapped[User] = relationship(back_populates="all_tasks")

使用在 Python 中每次动态计算的集合,我们保证始终有正确的答案,而无需使用数据库:

代码语言:javascript复制
>>> u1 = User()
>>> t1 = Task(task_date=datetime.datetime.now())
>>> t1.user = u1
>>> u1.current_week_tasks
[<__main__.Task object at 0x7f3d699523c0>]
viewonly=True 的集合/属性直到过期才会重新查询

继续使用原始的 viewonly 属性,如果我们确实对 persistent 对象上的 User.all_tasks 集合进行更改,则在 两个 事情发生之后,viewonly 集合只能显示此更改的净结果。第一个是将更改刷新到 User.all_tasks 中,以便新数据在数据库中可用,至少在本地事务的范围内是如此。第二个是 User.current_week_tasks 属性被 expired 并通过对数据库进行新的 SQL 查询重新加载。

为了支持这一需求,最简单的流程是仅在主要是只读操作中使用 viewonly 关系。例如,如果我们从数据库中检索到一个新的 User,那么集合将是当前的:

代码语言:javascript复制
>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f8711b906b0>]

当我们对 u1.all_tasks 进行修改时,如果想要在 u1.current_week_tasks 视图关系中看到这些更改,这些更改需要被刷新,并且 u1.current_week_tasks 属性需要过期,以便在下一次访问时进行 惰性加载。最简单的方法是使用 Session.commit(),保持 Session.expire_on_commit 参数设置为其默认值 True

代码语言:javascript复制
>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.commit()
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f8711b90ec0>, <__main__.Task object at 0x7f8711b90a10>]

上面,对 Session.commit() 的调用将更改刷新到了数据库中的 u1.all_tasks,然后使所有对象过期,因此当我们访问 u1.current_week_tasks 时,会发生 :term:惰性加载,从数据库中新鲜获取此属性的内容。

要拦截操作而不实际提交事务,需要先显式 expired 该属性。一种简单的方法是直接调用它。在下面的示例中,Session.flush() 将挂起的更改发送到数据库,然后使用 Session.expire() 来过期 u1.current_week_tasks 集合,以便在下一次访问时重新获取:

代码语言:javascript复制
>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.flush()
...     sess.expire(u1, ["current_week_tasks"])
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]

事实上,我们可以跳过对Session.flush()的调用,假设一个保持Session.autoflush为其默认值TrueSession,因为过期的current_week_tasks属性在过期后访问时将触发自动刷新:

代码语言:javascript复制
>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.expire(u1, ["current_week_tasks"])
...     print(u1.current_week_tasks)  # triggers autoflush before querying
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]

继续使用上述方法进行更详细的处理,我们可以在相关的User.all_tasks集合发生变化时通过 event hooks 进行程序化过期。这是一种高级技术,应该首先检查更简单的架构,比如@property或坚持只读用例。在我们简单的示例中,这将被配置为:

代码语言:javascript复制
from sqlalchemy import event, inspect

@event.listens_for(User.all_tasks, "append")
@event.listens_for(User.all_tasks, "remove")
@event.listens_for(User.all_tasks, "bulk_replace")
def _expire_User_current_week_tasks(target, value, initiator):
    inspect(target).session.expire(target, ["current_week_tasks"])

有了上述钩子,突变操作被拦截并导致User.current_week_tasks集合自动过期:

代码语言:javascript复制
>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f66d093ccb0>, <__main__.Task object at 0x7f66d093cce0>]

上述使用的AttributeEvents事件钩子也会被 backref 突变触发,因此,使用上述钩子会拦截对Task.user的更改:

代码语言:javascript复制
>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     t1 = Task(task_date=datetime.datetime.now())
...     t1.user = u1
...     sess.add(t1)
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f3b0c070d10>, <__main__.Task object at 0x7f3b0c057d10>]
```## 处理多个连接路径

处理的最常见情况之一是两个表之间存在多个外键路径。

考虑一个包含对`Address`类的两个外键的`Customer`类:

```py
from sqlalchemy import Integer, ForeignKey, String, Column
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import relationship

class Base(DeclarativeBase):
    pass

class Customer(Base):
    __tablename__ = "customer"
    id = mapped_column(Integer, primary_key=True)
    name = mapped_column(String)

    billing_address_id = mapped_column(Integer, ForeignKey("address.id"))
    shipping_address_id = mapped_column(Integer, ForeignKey("address.id"))

    billing_address = relationship("Address")
    shipping_address = relationship("Address")

class Address(Base):
    __tablename__ = "address"
    id = mapped_column(Integer, primary_key=True)
    street = mapped_column(String)
    city = mapped_column(String)
    state = mapped_column(String)
    zip = mapped_column(String)

上述映射,在我们尝试使用它时,会产生错误:

代码语言:javascript复制
sqlalchemy.exc.AmbiguousForeignKeysError: Could not determine join
condition between parent/child tables on relationship
Customer.billing_address - there are multiple foreign key
paths linking the tables.  Specify the 'foreign_keys' argument,
providing a list of those columns which should be
counted as containing a foreign key reference to the parent table.

上述消息相当长。relationship()可能返回许多潜在消息,这些消息经过精心设计,以检测各种常见的配置问题;大多数消息都会建议需要解决模糊性或其他缺失信息的附加配置。

在这种情况下,消息希望我们为每个relationship()进行限定,指示每个外键列应该被考虑,并且适当的形式如下:

代码语言:javascript复制
class Customer(Base):
    __tablename__ = "customer"
    id = mapped_column(Integer, primary_key=True)
    name = mapped_column(String)

    billing_address_id = mapped_column(Integer, ForeignKey("address.id"))
    shipping_address_id = mapped_column(Integer, ForeignKey("address.id"))

    billing_address = relationship("Address", foreign_keys=[billing_address_id])
    shipping_address = relationship("Address", foreign_keys=[shipping_address_id])

在上面的例子中,我们指定了foreign_keys参数,它是一个ColumnColumn对象列表,指示要考虑的“外键”列,或者换句话说,包含指向父表的值的列。从Customer对象加载Customer.billing_address关系将使用billing_address_id中存在的值来标识要加载的Address行;类似地,shipping_address_id用于shipping_address关系。这两列的关联在持久化过程中也起着作用;刚插入的Address对象的新生成的主键将在刷新期间被复制到关联的Customer对象的适当外键列中。

在使用 Declarative 指定foreign_keys时,我们还可以使用字符串名称进行指定,但是重要的是,如果使用列表,列表是字符串的一部分

代码语言:javascript复制
billing_address = relationship("Address", foreign_keys="[Customer.billing_address_id]")

在这个具体的例子中,在任何情况下列表都是不必要的,因为我们只需要一个Column

代码语言:javascript复制
billing_address = relationship("Address", foreign_keys="Customer.billing_address_id")

警告

当作为 Python 可评估字符串传递时,relationship.foreign_keys 参数将使用 Python 的 eval() 函数进行解释。请勿将不受信任的输入传递给此字符串。详情请参阅关系参数的评估以了解有关relationship() 参数的声明性评估的详细信息。

指定备用连接条件

在构建连接时,relationship()的默认行为是将一侧的主键列的值等同于另一侧的外键引用列的值。我们可以使用relationship.primaryjoin参数来更改此标准为任何我们喜欢的内容,以及在使用“次要”表时,在使用relationship.secondaryjoin参数。

在下面的例子中,我们使用User类以及一个存储街道地址的Address类,我们创建了一个关系boston_addresses,它只会加载那些指定城市为“Boston”的Address对象:

代码语言:javascript复制
from sqlalchemy import Integer, ForeignKey, String, Column
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import relationship

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "user"
    id = mapped_column(Integer, primary_key=True)
    name = mapped_column(String)
    boston_addresses = relationship(
        "Address",
        primaryjoin="and_(User.id==Address.user_id, Address.city=='Boston')",
    )

class Address(Base):
    __tablename__ = "address"
    id = mapped_column(Integer, primary_key=True)
    user_id = mapped_column(Integer, ForeignKey("user.id"))

    street = mapped_column(String)
    city = mapped_column(String)
    state = mapped_column(String)
    zip = mapped_column(String)

在这个字符串的 SQL 表达式中,我们使用了 and_() 连接构造来建立两个不同的谓词,用于连接 User.idAddress.user_id 列,以及将 Address 中的行限制为只有 city='Boston'。当使用声明式时,类似 and_() 这样的基本 SQL 函数会自动在字符串 relationship() 参数的计算命名空间中可用。

警告

当作为 Python 可评估字符串传递时,relationship.primaryjoin 参数是使用 Python 的 eval() 函数解释的。不要将不受信任的输入传递给此字符串。有关声明式评估 relationship() 参数的详细信息,请参阅 关系参数的评估。

我们在 relationship.primaryjoin 中使用的自定义条件通常只在 SQLAlchemy 渲染 SQL 以加载或表示此关系时才重要。也就是说,在执行每个属性的惰性加载的 SQL 语句中使用它,或者在查询时构造连接,例如通过 Select.join() 或通过急切的“连接”或“子查询”加载样式。当操作内存中的对象时,我们可以将任何我们想要的 Address 对象放入 boston_addresses 集合中,而不管 .city 属性的值是什么。这些对象将保留在集合中,直到属性过期并重新从应用条件的数据库中加载为止。当执行刷新时,boston_addresses 中的对象将被无条件地刷新,将主键 user.id 列的值分配到每行的持有外键 address.user_id 列。这里的 city 条件没有效果,因为刷新过程只关心将主键值同步到引用外键值。

创建自定义外键条件

主要连接条件的另一个元素是如何确定那些被认为是“外部”的列的。通常,一些 Column 对象的子集将指定 ForeignKey,或者是 ForeignKeyConstraint 的一部分,这与连接条件相关。relationship() 查看这个外键状态,以确定它应该如何为这个关系加载和持久化数据。然而,relationship.primaryjoin 参数可以用来创建一个不涉及任何“模式”级外键的连接条件。我们可以显式地结合 relationship.primaryjoin 以及 relationship.foreign_keysrelationship.remote_side 来建立这样一个连接。

下面,一个 HostEntry 类与自身连接,将字符串 content 列等同于 ip_address 列,这是一个名为 INET 的 PostgreSQL 类型。我们需要使用 cast() 来将连接的一侧转换为另一侧的类型:

代码语言:javascript复制
from sqlalchemy import cast, String, Column, Integer
from sqlalchemy.orm import relationship
from sqlalchemy.dialects.postgresql import INET

from sqlalchemy.orm import DeclarativeBase

class Base(DeclarativeBase):
    pass

class HostEntry(Base):
    __tablename__ = "host_entry"

    id = mapped_column(Integer, primary_key=True)
    ip_address = mapped_column(INET)
    content = mapped_column(String(50))

    # relationship() using explicit foreign_keys, remote_side
    parent_host = relationship(
        "HostEntry",
        primaryjoin=ip_address == cast(content, INET),
        foreign_keys=content,
        remote_side=ip_address,
    )

上述关系将产生类似以下的连接:

代码语言:javascript复制
SELECT  host_entry.id,  host_entry.ip_address,  host_entry.content
FROM  host_entry  JOIN  host_entry  AS  host_entry_1
ON  host_entry_1.ip_address  =  CAST(host_entry.content  AS  INET)

以上的另一种语法是在 relationship.primaryjoin 表达式内联使用 foreign()remote() annotations。这种语法表示了 relationship() 通常自己应用于连接条件的注释,考虑到 relationship.foreign_keysrelationship.remote_side 参数。当存在明确的连接条件时,这些函数可能更简洁,并且还可以标记出“外部”或“远程”的确切列,而不管该列是否在多次声明或在复杂的 SQL 表达式中:

代码语言:javascript复制
from sqlalchemy.orm import foreign, remote

class HostEntry(Base):
    __tablename__ = "host_entry"

    id = mapped_column(Integer, primary_key=True)
    ip_address = mapped_column(INET)
    content = mapped_column(String(50))

    # relationship() using explicit foreign() and remote() annotations
    # in lieu of separate arguments
    parent_host = relationship(
        "HostEntry",
        primaryjoin=remote(ip_address) == cast(foreign(content), INET),
    )

在连接条件中使用自定义运算符

另一个关系的用例是使用自定义运算符,比如 PostgreSQL 的“包含在内” << 运算符,当与诸如 INETCIDR 这样的类型连接时。对于自定义布尔运算符,我们使用 Operators.bool_op() 函数:

代码语言:javascript复制
inet_column.bool_op("<<")(cidr_column)

像上面的比较可以直接用于构造 relationship() 中的 relationship.primaryjoin

代码语言:javascript复制
class IPA(Base):
    __tablename__ = "ip_address"

    id = mapped_column(Integer, primary_key=True)
    v4address = mapped_column(INET)

    network = relationship(
        "Network",
        primaryjoin="IPA.v4address.bool_op('<<')(foreign(Network.v4representation))",
        viewonly=True,
    )

class Network(Base):
    __tablename__ = "network"

    id = mapped_column(Integer, primary_key=True)
    v4representation = mapped_column(CIDR)

上述,像这样的查询:

代码语言:javascript复制
select(IPA).join(IPA.network)

将显示为:

代码语言:javascript复制
SELECT  ip_address.id  AS  ip_address_id,  ip_address.v4address  AS  ip_address_v4address
FROM  ip_address  JOIN  network  ON  ip_address.v4address  <<  network.v4representation

基于 SQL 函数的自定义运算符

Operators.op.is_comparison 用例的变体是当我们不是使用运算符,而是使用 SQL 函数。这种用例的典型例子是 PostgreSQL PostGIS 函数,但任何解析为二进制条件的任何数据库上的 SQL 函数都可能适用。为适应这种用例,FunctionElement.as_comparison() 方法可以修改任何 SQL 函数,例如从 func 命名空间调用的函数,以指示 ORM 该函数生成了两个表达式的比较。下面的例子使用了 Geoalchemy2 库说明了这一点:

代码语言:javascript复制
from geoalchemy2 import Geometry
from sqlalchemy import Column, Integer, func
from sqlalchemy.orm import relationship, foreign

class Polygon(Base):
    __tablename__ = "polygon"
    id = mapped_column(Integer, primary_key=True)
    geom = mapped_column(Geometry("POLYGON", srid=4326))
    points = relationship(
        "Point",
        primaryjoin="func.ST_Contains(foreign(Polygon.geom), Point.geom).as_comparison(1, 2)",
        viewonly=True,
    )

class Point(Base):
    __tablename__ = "point"
    id = mapped_column(Integer, primary_key=True)
    geom = mapped_column(Geometry("POINT", srid=4326))

上述,FunctionElement.as_comparison() 表示 func.ST_Contains() SQL 函数正在比较 Polygon.geomPoint.geom 表达式。foreign() 注释另外指出了在这种特定关系中扮演“外键”角色的列。

新版本 1.3 中新增了 FunctionElement.as_comparison()

重叠的外键

很少见的情况可能会出现,即使用复合外键,以便单个列可能是通过外键约束引用的多个列的主题。

考虑一个(诚然复杂的)映射,例如Magazine对象,使用包括magazine_id的复合主键方案,分别由Writer对象和Article对象引用;然后,为了使Article也引用WriterArticle.magazine_id涉及到两个不同的关系;Article.magazineArticle.writer

代码语言:javascript复制
class Magazine(Base):
    __tablename__ = "magazine"

    id = mapped_column(Integer, primary_key=True)

class Article(Base):
    __tablename__ = "article"

    article_id = mapped_column(Integer)
    magazine_id = mapped_column(ForeignKey("magazine.id"))
    writer_id = mapped_column()

    magazine = relationship("Magazine")
    writer = relationship("Writer")

    __table_args__ = (
        PrimaryKeyConstraint("article_id", "magazine_id"),
        ForeignKeyConstraint(
            ["writer_id", "magazine_id"], ["writer.id", "writer.magazine_id"]
        ),
    )

class Writer(Base):
    __tablename__ = "writer"

    id = mapped_column(Integer, primary_key=True)
    magazine_id = mapped_column(ForeignKey("magazine.id"), primary_key=True)
    magazine = relationship("Magazine")

当上述映射被配置时,我们将看到此警告被发出:

代码语言:javascript复制
SAWarning: relationship 'Article.writer' will copy column
writer.magazine_id to column article.magazine_id,
which conflicts with relationship(s): 'Article.magazine'
(copies magazine.id to article.magazine_id). Consider applying
viewonly=True to read-only relationships, or provide a primaryjoin
condition marking writable columns with the foreign() annotation.

这指的是Article.magazine_id是两个不同外键约束的主体;它直接引用Magazine.id作为源列,但在与Writer的复合键上下文中,也引用Writer.magazine_id作为源列。如果我们将Article与特定的Magazine关联起来,但然后将Article与另一个与不同Magazine关联的Writer关联起来,ORM 会非确定性地覆盖Article.magazine_id,悄悄地改变我们所引用的杂志;如果我们将WriterArticle中取消关联,它还可能尝试将 NULL 放入此列。警告让我们知道这种情况。

要解决这个问题,我们需要打破Article的行为,包括以下三个功能:

  1. 首先,Article根据仅在Article.magazine关系中持久化的数据来写入Article.magazine_id,即从Magazine.id复制的值。
  2. Article可以代表在Article.writer关系中持久化的数据写入Article.writer_id,但只能写入Writer.id列;Writer.magazine_id列不应写入Article.magazine_id,因为它最终来自Magazine.id
  3. 当加载Article.writer时,Article考虑了Article.magazine_id,即使在此关系中并不代表它。

要获取只有#1 和#2,我们可以将Article.writer_id指定为Article.writer的“外键”:

代码语言:javascript复制
class Article(Base):
    # ...

    writer = relationship("Writer", foreign_keys="Article.writer_id")

然而,这会导致Article.writer在与Writer进行查询时不考虑Article.magazine_id

代码语言:javascript复制
SELECT  article.article_id  AS  article_article_id,
  article.magazine_id  AS  article_magazine_id,
  article.writer_id  AS  article_writer_id
FROM  article
JOIN  writer  ON  writer.id  =  article.writer_id

因此,要获取#1、#2 和#3 的所有内容,我们需要通过完全组合relationship.primaryjoin来表达连接条件,以及要写入的列,同时使用relationship.foreign_keys参数,或者更简洁地使用foreign()进行注释:

代码语言:javascript复制
class Article(Base):
    # ...

    writer = relationship(
        "Writer",
        primaryjoin="and_(Writer.id == foreign(Article.writer_id), "
        "Writer.magazine_id == Article.magazine_id)",
    )

非关系比较 / 材料化路径

警告

本节详细介绍了一个实验性功能。

使用自定义表达式意味着我们可以生成不遵循通常的主键/外键模型的非正统连接条件。其中一个例子是材料化路径模式,我们在比较字符串以产生重叠路径标记时,以便生成树结构。

通过谨慎使用foreign()remote(),我们可以构建一个有效地生成基本材料化路径系统的关系。基本上,当foreign()remote()相同的比较表达式一侧时,关系被认为是“一对多”;当它们在不同的一侧时,关系被认为是“多对一”。对于我们将在此处使用的比较,我们将处理集合,所以我们保持事物配置为“一对多”:

代码语言:javascript复制
class Element(Base):
    __tablename__ = "element"

    path = mapped_column(String, primary_key=True)

    descendants = relationship(
        "Element",
        primaryjoin=remote(foreign(path)).like(path.concat("/%")),
        viewonly=True,
        order_by=path,
    )

上文中,如果给定具有"/foo/bar2"路径属性的Element对象,则我们寻找对Element.descendants的加载应如下所示:

代码语言:javascript复制
SELECT  element.path  AS  element_path
FROM  element
WHERE  element.path  LIKE  ('/foo/bar2'  ||  '/%')  ORDER  BY  element.path

自我引用多对多关系

另请参阅

本节记录了“邻接列表”模式的两个表变体,该模式在邻接列表关系中有所记录。务必查看自我引用查询模式的子部分自我引用查询策略和配置自我引用急加载,它们同样适用于此处讨论的映射模式。

多对多关系可以通过relationship.primaryjoin和/或relationship.secondaryjoin中的一个或两个进行自定义 - 后者对于使用relationship.secondary参数指定多对多引用的关系非常重要。涉及使用relationship.primaryjoinrelationship.secondaryjoin的常见情况是在从一个类到自身建立多对多关系时,如下所示:

代码语言:javascript复制
from typing import List

from sqlalchemy import Integer, ForeignKey, Column, Table
from sqlalchemy.orm import DeclarativeBase, Mapped
from sqlalchemy.orm import mapped_column, relationship

class Base(DeclarativeBase):
    pass

node_to_node = Table(
    "node_to_node",
    Base.metadata,
    Column("left_node_id", Integer, ForeignKey("node.id"), primary_key=True),
    Column("right_node_id", Integer, ForeignKey("node.id"), primary_key=True),
)

class Node(Base):
    __tablename__ = "node"
    id: Mapped[int] = mapped_column(primary_key=True)
    label: Mapped[str]
    right_nodes: Mapped[List["Node"]] = relationship(
        "Node",
        secondary=node_to_node,
        primaryjoin=id == node_to_node.c.left_node_id,
        secondaryjoin=id == node_to_node.c.right_node_id,
        back_populates="left_nodes",
    )
    left_nodes: Mapped[List["Node"]] = relationship(
        "Node",
        secondary=node_to_node,
        primaryjoin=id == node_to_node.c.right_node_id,
        secondaryjoin=id == node_to_node.c.left_node_id,
        back_populates="right_nodes",
    )

在上述情况下,SQLAlchemy 无法自动知道哪些列应该连接到right_nodesleft_nodes关系的哪些列上。relationship.primaryjoinrelationship.secondaryjoin参数建立了我们希望如何加入关联表的方式。在上面的声明形式中,由于我们在对应于Node类的 Python 块中声明了这些条件,因此id变量直接作为我们希望与之连接的Column对象是可用的。

或者,我们可以使用字符串定义relationship.primaryjoinrelationship.secondaryjoin参数,在我们的配置中可能还没有Node.id列对象可用,或者node_to_node表可能还不可用的情况下非常适用。当在声明字符串中引用普通的Table对象时,我们使用表的字符串名称,就像在MetaData中一样:

代码语言:javascript复制
class Node(Base):
    __tablename__ = "node"
    id = mapped_column(Integer, primary_key=True)
    label = mapped_column(String)
    right_nodes = relationship(
        "Node",
        secondary="node_to_node",
        primaryjoin="Node.id==node_to_node.c.left_node_id",
        secondaryjoin="Node.id==node_to_node.c.right_node_id",
        backref="left_nodes",
    )

警告

当作为 Python 可评估字符串传递时,relationship.primaryjoinrelationship.secondaryjoin参数使用 Python 的eval()函数解释。不要将不受信任的输入传递给这些字符串。有关声明性relationship()参数的评估详细信息,请参阅关系参数的评估。

在这里的经典映射情况中,node_to_node可以连接到node.c.id

代码语言:javascript复制
from sqlalchemy import Integer, ForeignKey, String, Column, Table, MetaData
from sqlalchemy.orm import relationship, registry

metadata_obj = MetaData()
mapper_registry = registry()

node_to_node = Table(
    "node_to_node",
    metadata_obj,
    Column("left_node_id", Integer, ForeignKey("node.id"), primary_key=True),
    Column("right_node_id", Integer, ForeignKey("node.id"), primary_key=True),
)

node = Table(
    "node",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("label", String),
)

class Node:
    pass

mapper_registry.map_imperatively(
    Node,
    node,
    properties={
        "right_nodes": relationship(
            Node,
            secondary=node_to_node,
            primaryjoin=node.c.id == node_to_node.c.left_node_id,
            secondaryjoin=node.c.id == node_to_node.c.right_node_id,
            backref="left_nodes",
        )
    },
)

请注意,在这两个示例中,relationship.backref 关键字都指定了一个 left_nodes 回引 - 当relationship()在反向创建第二个关系时,它足够聪明地反转了relationship.primaryjoinrelationship.secondaryjoin 参数。

另请参阅

  • 邻接列表关系 - 单表版本
  • 自引用查询策略 - 使用自引用映射查询的技巧
  • 配置自引用预加载 - 使用自引用映射预加载的技巧

复合“次要”连接

注意

本节涵盖了一些在某种程度上受 SQLAlchemy 支持的边缘案例,但建议尽可能在可能的情况下通过使用合理的关系布局和/或 Python 内的属性来解决类似问题。

有时,当一个人试图在两个表之间建立一个relationship()时,需要涉及更多的表,而不仅仅是两个或三个表。这是一个relationship()的领域,在这个领域,人们试图推动可能性的边界,并且通常对许多这些奇特用例的最终解决方案需要在 SQLAlchemy 邮件列表上讨论。

在较新版本的 SQLAlchemy 中,relationship.secondary 参数可用于某些情况,以提供由多个表组成的复合目标。以下是这种连接条件的示例(至少需要版本 0.9.2 才能正常运行):

代码语言:javascript复制
class A(Base):
    __tablename__ = "a"

    id = mapped_column(Integer, primary_key=True)
    b_id = mapped_column(ForeignKey("b.id"))

    d = relationship(
        "D",
        secondary="join(B, D, B.d_id == D.id).join(C, C.d_id == D.id)",
        primaryjoin="and_(A.b_id == B.id, A.id == C.a_id)",
        secondaryjoin="D.id == B.d_id",
        uselist=False,
        viewonly=True,
    )

class B(Base):
    __tablename__ = "b"

    id = mapped_column(Integer, primary_key=True)
    d_id = mapped_column(ForeignKey("d.id"))

class C(Base):
    __tablename__ = "c"

    id = mapped_column(Integer, primary_key=True)
    a_id = mapped_column(ForeignKey("a.id"))
    d_id = mapped_column(ForeignKey("d.id"))

class D(Base):
    __tablename__ = "d"

    id = mapped_column(Integer, primary_key=True)

在上面的示例中,我们提供了relationship.secondaryrelationship.primaryjoinrelationship.secondaryjoin 这三个参数,在声明样式中直接引用了命名表 abcd。从 AD 的查询如下:

代码语言:javascript复制
sess.scalars(select(A).join(A.d)).all()

SELECT  a.id  AS  a_id,  a.b_id  AS  a_b_id
FROM  a  JOIN  (
  b  AS  b_1  JOIN  d  AS  d_1  ON  b_1.d_id  =  d_1.id
  JOIN  c  AS  c_1  ON  c_1.d_id  =  d_1.id)
  ON  a.b_id  =  b_1.id  AND  a.id  =  c_1.a_id  JOIN  d  ON  d.id  =  b_1.d_id 

在上面的示例中,我们利用能够将多个表填入“次要”容器的优势,这样我们就可以跨多个表进行连接,同时保持对relationship()的“简单”使用,因为“左”和“右”两侧只有“一个”表;复杂性被保留在中间。

警告

类似上述的关系通常标记为viewonly=True,使用relationship.viewonly,应当被视为只读。虽然有时可以使类似上述的关系可写,但这通常是复杂且容易出错的。

另请参阅

关于使用只读关系参数的注意事项

别名类的关系

在前一节中,我们说明了一种技术,在这种技术中,我们使用了relationship.secondary来将额外的表放置在连接条件中。有一种复杂的连接情况,即使使用这种技术也不足够;当我们试图从A连接到B时,中间可能会使用任意数量的CD等,但是在AB之间也有直接的连接条件。在这种情况下,仅使用复杂的relationship.primaryjoin条件可能难以表达,因为中间表可能需要特殊处理,而且也不能使用relationship.secondary对象来表达,因为A->secondary->B模式不支持AB之间的任何引用。当出现这种极其高级的情况时,我们可以采用创建第二个映射作为关系的目标。这就是我们使用AliasedClass来创建一个包含我们所需的所有额外表的类的映射。为了将这个映射作为我们类的“替代”映射生成,我们使用aliased()函数来生成新的结构,然后针对该对象使用relationship(),就像它是一个普通的映射类一样。

下面演示了从 AB 的简单连接的 relationship(),但是主连接条件增加了另外两个实体 CD,这两个实体必须同时与 AB 中的行对应起来:

代码语言:javascript复制
class A(Base):
    __tablename__ = "a"

    id = mapped_column(Integer, primary_key=True)
    b_id = mapped_column(ForeignKey("b.id"))

class B(Base):
    __tablename__ = "b"

    id = mapped_column(Integer, primary_key=True)

class C(Base):
    __tablename__ = "c"

    id = mapped_column(Integer, primary_key=True)
    a_id = mapped_column(ForeignKey("a.id"))

    some_c_value = mapped_column(String)

class D(Base):
    __tablename__ = "d"

    id = mapped_column(Integer, primary_key=True)
    c_id = mapped_column(ForeignKey("c.id"))
    b_id = mapped_column(ForeignKey("b.id"))

    some_d_value = mapped_column(String)

# 1. set up the join() as a variable, so we can refer
# to it in the mapping multiple times.
j = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)

# 2. Create an AliasedClass to B
B_viacd = aliased(B, j, flat=True)

A.b = relationship(B_viacd, primaryjoin=A.b_id == j.c.b_id)

使用上述映射,简单连接如下所示:

代码语言:javascript复制
sess.scalars(select(A).join(A.b)).all()

SELECT  a.id  AS  a_id,  a.b_id  AS  a_b_id
FROM  a  JOIN  (b  JOIN  d  ON  d.b_id  =  b.id  JOIN  c  ON  c.id  =  d.c_id)  ON  a.b_id  =  b.id 
将别名类映射与类型化结合,并避免早期映射器配置

aliased() 构造对映射类的创建强制执行 configure_mappers() 步骤,该步骤将解析所有当前类及其关系。如果当前映射所需的不相关映射类尚未声明,或者如果关系本身的配置需要访问尚未声明的类,则可能会出现问题。此外,当关系在前面声明时,SQLAlchemy 的声明模式与 Python 类型化的协作效果最佳。

为了组织关系的构建以解决这些问题,可以使用配置级别的事件钩子,如 MapperEvents.before_mapper_configured(),该钩子将仅在所有映射准备好配置时调用配置代码:

代码语言:javascript复制
from sqlalchemy import event

class A(Base):
    __tablename__ = "a"

    id = mapped_column(Integer, primary_key=True)
    b_id = mapped_column(ForeignKey("b.id"))

@event.listens_for(A, "before_mapper_configured")
def _configure_ab_relationship(mapper, cls):
    # do the above configuration in a configuration hook

    j = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)
    B_viacd = aliased(B, j, flat=True)
    A.b = relationship(B_viacd, primaryjoin=A.b_id == j.c.b_id)

在上面的示例中,当请求完全配置的 A 版本时,函数 _configure_ab_relationship() 将被调用,此时类 BDC 将可用。

对于与内联类型化集成的方法,可以使用类似的技术来有效地生成别名类的“单例”创建模式,其中它作为全局变量进行延迟初始化,然后可以在关系内联中使用它:

代码语言:javascript复制
from typing import Any

B_viacd: Any = None
b_viacd_join: Any = None

class A(Base):
    __tablename__ = "a"

    id: Mapped[int] = mapped_column(primary_key=True)
    b_id: Mapped[int] = mapped_column(ForeignKey("b.id"))

    # 1. the relationship can be declared using lambdas, allowing it to resolve
    #    to targets that are late-configured
    b: Mapped[B] = relationship(
        lambda: B_viacd, primaryjoin=lambda: A.b_id == b_viacd_join.c.b_id
    )

# 2. configure the targets of the relationship using a before_mapper_configured
#    hook.
@event.listens_for(A, "before_mapper_configured")
def _configure_ab_relationship(mapper, cls):
    # 3. set up the join() and AliasedClass as globals from within
    #    the configuration hook.

    global B_viacd, b_viacd_join

    b_viacd_join = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)
    B_viacd = aliased(B, b_viacd_join, flat=True)
在查询中使用别名类目标

在前面的示例中,A.b 关系将 B_viacd 实体作为目标,而 不是 直接的 B 类。要添加涉及 A.b 关系的附加条件,通常需要直接引用 B_viacd 而不是使用 B,特别是在将 A.b 的目标实体转换为别名或子查询的情况下。下面演示了使用子查询而不是连接的相同关系:

代码语言:javascript复制
subq = select(B).join(D, D.b_id == B.id).join(C, C.id == D.c_id).subquery()

B_viacd_subquery = aliased(B, subq)

A.b = relationship(B_viacd_subquery, primaryjoin=A.b_id == subq.c.id)

使用上述 A.b 关系的查询将呈现一个子查询:

代码语言:javascript复制
sess.scalars(select(A).join(A.b)).all()

SELECT  a.id  AS  a_id,  a.b_id  AS  a_b_id
FROM  a  JOIN  (SELECT  b.id  AS  id,  b.some_b_column  AS  some_b_column
FROM  b  JOIN  d  ON  d.b_id  =  b.id  JOIN  c  ON  c.id  =  d.c_id)  AS  anon_1  ON  a.b_id  =  anon_1.id 

如果我们想要根据 A.b 连接添加额外的条件,必须以 B_viacd_subquery 而不是直接以 B 的形式添加:

代码语言:javascript复制
sess.scalars(
    select(A)
    .join(A.b)
    .where(B_viacd_subquery.some_b_column == "some b")
    .order_by(B_viacd_subquery.id)
).all()

SELECT  a.id  AS  a_id,  a.b_id  AS  a_b_id
FROM  a  JOIN  (SELECT  b.id  AS  id,  b.some_b_column  AS  some_b_column
FROM  b  JOIN  d  ON  d.b_id  =  b.id  JOIN  c  ON  c.id  =  d.c_id)  AS  anon_1  ON  a.b_id  =  anon_1.id
WHERE  anon_1.some_b_column  =  ?  ORDER  BY  anon_1.id 
将别名类映射与类型化结合,并避免早期映射器配置

对映射类创建 aliased() 构造会强制进行 configure_mappers() 步骤,这将解析所有当前的类及其关系。如果当前映射需要未声明的不相关的映射类,或者如果关系的配置本身需要访问尚未声明的类,则可能会出现问题。另外,当关系提前声明时,SQLAlchemy 的声明式模式与 Python 类型的工作方式最有效。

要组织关系构建以处理这些问题,可以使用一个配置级别的事件钩子,比如 MapperEvents.before_mapper_configured(),它只有在所有映射都准备好配置时才会调用配置代码:

代码语言:javascript复制
from sqlalchemy import event

class A(Base):
    __tablename__ = "a"

    id = mapped_column(Integer, primary_key=True)
    b_id = mapped_column(ForeignKey("b.id"))

@event.listens_for(A, "before_mapper_configured")
def _configure_ab_relationship(mapper, cls):
    # do the above configuration in a configuration hook

    j = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)
    B_viacd = aliased(B, j, flat=True)
    A.b = relationship(B_viacd, primaryjoin=A.b_id == j.c.b_id)

上述函数 _configure_ab_relationship() 只有在请求一个完全配置好的 A 版本时才会被调用,在这时 BDC 类会被加载。

对于与内联类型集成的方法,可以使用类似的技术有效地为别名类生成“单例”创建模式,在其中作为全局变量进行延迟初始化,然后可以在关系中使用:

代码语言:javascript复制
from typing import Any

B_viacd: Any = None
b_viacd_join: Any = None

class A(Base):
    __tablename__ = "a"

    id: Mapped[int] = mapped_column(primary_key=True)
    b_id: Mapped[int] = mapped_column(ForeignKey("b.id"))

    # 1. the relationship can be declared using lambdas, allowing it to resolve
    #    to targets that are late-configured
    b: Mapped[B] = relationship(
        lambda: B_viacd, primaryjoin=lambda: A.b_id == b_viacd_join.c.b_id
    )

# 2. configure the targets of the relationship using a before_mapper_configured
#    hook.
@event.listens_for(A, "before_mapper_configured")
def _configure_ab_relationship(mapper, cls):
    # 3. set up the join() and AliasedClass as globals from within
    #    the configuration hook.

    global B_viacd, b_viacd_join

    b_viacd_join = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)
    B_viacd = aliased(B, b_viacd_join, flat=True)
在查询中使用 AliasedClass 目标

在前面的示例中,A.b 关系将 B_viacd 实体作为目标,而不是直接使用 B 类。要添加涉及 A.b 关系的额外条件,通常需要直接引用 B_viacd 而不是使用 B,特别是在目标实体 A.b 需要转换为别名或子查询的情况下。以下是使用子查询而不是连接的相同关系的示例:

代码语言:javascript复制
subq = select(B).join(D, D.b_id == B.id).join(C, C.id == D.c_id).subquery()

B_viacd_subquery = aliased(B, subq)

A.b = relationship(B_viacd_subquery, primaryjoin=A.b_id == subq.c.id)

使用上述 A.b 关系的查询将呈现一个子查询:

代码语言:javascript复制
sess.scalars(select(A).join(A.b)).all()

SELECT  a.id  AS  a_id,  a.b_id  AS  a_b_id
FROM  a  JOIN  (SELECT  b.id  AS  id,  b.some_b_column  AS  some_b_column
FROM  b  JOIN  d  ON  d.b_id  =  b.id  JOIN  c  ON  c.id  =  d.c_id)  AS  anon_1  ON  a.b_id  =  anon_1.id 

如果我们想要根据 A.b 连接添加额外的条件,必须以 B_viacd_subquery 而不是直接使用 B

代码语言:javascript复制
sess.scalars(
    select(A)
    .join(A.b)
    .where(B_viacd_subquery.some_b_column == "some b")
    .order_by(B_viacd_subquery.id)
).all()

SELECT  a.id  AS  a_id,  a.b_id  AS  a_b_id
FROM  a  JOIN  (SELECT  b.id  AS  id,  b.some_b_column  AS  some_b_column
FROM  b  JOIN  d  ON  d.b_id  =  b.id  JOIN  c  ON  c.id  =  d.c_id)  AS  anon_1  ON  a.b_id  =  anon_1.id
WHERE  anon_1.some_b_column  =  ?  ORDER  BY  anon_1.id 

带窗口函数的行限制关系

另一个关系到 AliasedClass 对象的有趣用例是关系需要连接到任何形式的专门 SELECT 时。一种情况是当需要使用窗口函数时,例如限制返回的行数。下面的示例说明了一个非主映射器关系,该关系将为每个集合加载前十个项目:

代码语言:javascript复制
class A(Base):
    __tablename__ = "a"

    id = mapped_column(Integer, primary_key=True)

class B(Base):
    __tablename__ = "b"
    id = mapped_column(Integer, primary_key=True)
    a_id = mapped_column(ForeignKey("a.id"))

partition = select(
    B, func.row_number().over(order_by=B.id, partition_by=B.a_id).label("index")
).alias()

partitioned_b = aliased(B, partition)

A.partitioned_bs = relationship(
    partitioned_b, primaryjoin=and_(partitioned_b.a_id == A.id, partition.c.index < 10)
)

我们可以使用上述 partitioned_bs 关系与大多数加载器策略,例如 selectinload():

代码语言:javascript复制
for a1 in session.scalars(select(A).options(selectinload(A.partitioned_bs))):
    print(a1.partitioned_bs)  # <-- will be no more than ten objects

在上面的例子中,“selectinload”查询如下所示:

代码语言:javascript复制
SELECT
  a_1.id  AS  a_1_id,  anon_1.id  AS  anon_1_id,  anon_1.a_id  AS  anon_1_a_id,
  anon_1.data  AS  anon_1_data,  anon_1.index  AS  anon_1_index
FROM  a  AS  a_1
JOIN  (
  SELECT  b.id  AS  id,  b.a_id  AS  a_id,  b.data  AS  data,
  row_number()  OVER  (PARTITION  BY  b.a_id  ORDER  BY  b.id)  AS  index
  FROM  b)  AS  anon_1
ON  anon_1.a_id  =  a_1.id  AND  anon_1.index  <  %(index_1)s
WHERE  a_1.id  IN  (  ...  primary  key  collection  ...)
ORDER  BY  a_1.id

在上面的例子中,对于“a”中的每个匹配的主键,我们将按照“b.id”的顺序获取前十个“bs”。通过在“a_id”上分区,我们确保每个“行号”都局限于父“a_id”。

这样的映射通常还会包括从“A”到“B”的“普通”关系,用于持久性操作以及当需要“A”每个对象的完整集合时。

构建查询可用的属性

非常雄心勃勃的自定义连接条件可能无法直接持久化,有些情况下甚至可能无法正确加载。要消除持久性方程式的部分,使用标志relationship.viewonlyrelationship()上,将其建立为只读属性(写入到集合的数据将在 flush()时被忽略)。但是,在极端情况下,请考虑与Query一起使用常规的 Python 属性,如下所示:

代码语言:javascript复制
class User(Base):
    __tablename__ = "user"
    id = mapped_column(Integer, primary_key=True)

    @property
    def addresses(self):
        return object_session(self).query(Address).with_parent(self).filter(...).all()

在其他情况下,可以构建描述符来利用现有的 Python 数据。有关更一般的 Python 属性的特殊讨论,请参阅使用描述符和混合部分。

另请参阅

使用描述符和混合

使用 viewonly 关系参数的注意事项

当应用于relationship()构造时,relationship.viewonly参数指示此relationship()不会参与任何 ORM 工作单元操作,此外,该属性也不会参与其表示的集合的 Python 变异。这意味着虽然 viewonly 关系可能引用可变的 Python 集合,如列表或集合,但对在映射实例上存在的该列表或集合进行更改对 ORM flush 过程没有影响。

要探索这种情景,请考虑以下映射:

代码语言:javascript复制
from __future__ import annotations

import datetime

from sqlalchemy import and_
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "user_account"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str | None]

    all_tasks: Mapped[list[Task]] = relationship()

    current_week_tasks: Mapped[list[Task]] = relationship(
        primaryjoin=lambda: and_(
            User.id == Task.user_account_id,
            # this expression works on PostgreSQL but may not be supported
            # by other database engines
            Task.task_date >= func.now() - datetime.timedelta(days=7),
        ),
        viewonly=True,
    )

class Task(Base):
    __tablename__ = "task"

    id: Mapped[int] = mapped_column(primary_key=True)
    user_account_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
    description: Mapped[str | None]
    task_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())

    user: Mapped[User] = relationship(back_populates="current_week_tasks")

以下各节将说明此配置的不同方面。

在 Python 中,包括 backrefs 在内的变异操作不适用于 viewonly=True

上述映射针对User.current_week_tasks视图关系,作为Task.user属性的 backref 目标。目前,SQLAlchemy 的 ORM 配置过程尚未标记此项,但这是配置错误。更改Task上的.user属性不会影响.current_week_tasks属性:

代码语言:javascript复制
>>> u1 = User()
>>> t1 = Task(task_date=datetime.datetime.now())
>>> t1.user = u1
>>> u1.current_week_tasks
[]

还有另一个称为relationship.sync_backrefs的参数,可以在这里打开,以允许在这种情况下对.current_week_tasks进行突变,但是这并不被认为是viewonly关系的最佳实践,其不应该依赖于 Python 中的突变。

在此映射中,可以在User.all_tasksTask.user之间配置反向引用,因为它们都不是viewonly并且将正常同步。

除了禁用viewonly关系的反向引用突变之外,Python 中对User.all_tasks集合的普通更改也不会反映在User.current_week_tasks集合中,直到更改已刷新到数据库中。

总的来说,对于一个自定义集合应该立即响应 Python 中突变的用例,viewonly关系通常不合适。更好的方法是使用 SQLAlchemy 的 Hybrid Attributes 功能,或者仅对于实例化的情况使用 Python 的@property,在这种情况下,可以实现一个用户定义的集合,该集合是以当前 Python 实例为基础生成的。要将我们的示例更改为这种工作方式,我们修复Task.user上的relationship.back_populates参数,以引用User.all_tasks,然后说明一个简单的@property,该@property将以User.all_tasks集合的立即结果的形式提供结果:

代码语言:javascript复制
class User(Base):
    __tablename__ = "user_account"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str | None]

    all_tasks: Mapped[list[Task]] = relationship(back_populates="user")

    @property
    def current_week_tasks(self) -> list[Task]:
        past_seven_days = datetime.datetime.now() - datetime.timedelta(days=7)
        return [t for t in self.all_tasks if t.task_date >= past_seven_days]

class Task(Base):
    __tablename__ = "task"

    id: Mapped[int] = mapped_column(primary_key=True)
    user_account_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
    description: Mapped[str | None]
    task_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())

    user: Mapped[User] = relationship(back_populates="all_tasks")

每次在 Python 中计算的集合,我们都能保证始终得到正确的答案,而无需使用数据库:

代码语言:javascript复制
>>> u1 = User()
>>> t1 = Task(task_date=datetime.datetime.now())
>>> t1.user = u1
>>> u1.current_week_tasks
[<__main__.Task object at 0x7f3d699523c0>]
viewonly=True的集合/属性在过期之前不会被重新查询

对于原始的viewonly属性,如果我们确实对User.all_tasks集合进行了更改,那么在个事件发生后,viewonly集合才能显示这些更改的最终结果。第一个是将User.all_tasks的更改 flushed,以便新数据在数据库中可用,至少在本地事务范围内可用。第二个是User.current_week_tasks属性被 expired,并通过新的 SQL 查询重新加载到数据库。

为了支持这一要求,使用的最简单的流程是只在主要是只读操作中使用viewonly关系。比如下面,如果我们从数据库中检索到一个新的User,集合将是当前的:

代码语言:javascript复制
>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f8711b906b0>]

当我们对u1.all_tasks进行修改时,如果我们希望在u1.current_week_tasks的 viewonly 关系中看到这些更改反映出来,这些更改需要被刷新,并且u1.current_week_tasks属性需要过期,以便在下次访问时懒加载。这样做的最简单方法是使用Session.commit(),保持Session.expire_on_commit参数设置为其默认值True

代码语言:javascript复制
>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.commit()
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f8711b90ec0>, <__main__.Task object at 0x7f8711b90a10>]

上述,对Session.commit()的调用将更改刷新到数据库的u1.all_tasks,然后过期所有对象,因此当我们访问u1.current_week_tasks时,将从数据库中新鲜地获取此属性的内容,触发了一个:term:懒加载

要拦截操作而不实际提交事务,必须首先显式地过期属性。这样做的简单方法就是直接调用它。在下面的例子中,Session.flush()发送挂起的更改到数据库,然后使用Session.expire()来过期u1.current_week_tasks集合,以便在下次访问时重新获取:

代码语言:javascript复制
>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.flush()
...     sess.expire(u1, ["current_week_tasks"])
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]

实际上,我们可以跳过对Session.flush()的调用,假设Session保持其默认值为TrueSession.autoflush,因为过期的current_week_tasks属性将在过期后在访问时触发自动刷新:

代码语言:javascript复制
>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.expire(u1, ["current_week_tasks"])
...     print(u1.current_week_tasks)  # triggers autoflush before querying
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]

进一步发展上述方法,我们可以在相关的User.all_tasks集合发生变化时,通过事件钩子来以编程方式应用过期。这是一种高级技术,应该首先检查像@property这样的更简单的架构或者坚持只读用例。在我们的简单示例中,配置如下:

代码语言:javascript复制
from sqlalchemy import event, inspect

@event.listens_for(User.all_tasks, "append")
@event.listens_for(User.all_tasks, "remove")
@event.listens_for(User.all_tasks, "bulk_replace")
def _expire_User_current_week_tasks(target, value, initiator):
    inspect(target).session.expire(target, ["current_week_tasks"])

使用上述钩子,突变操作被拦截,并导致User.current_week_tasks集合自动过期:

代码语言:javascript复制
>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f66d093ccb0>, <__main__.Task object at 0x7f66d093cce0>]

上面使用的AttributeEvents事件钩子也会被后向引用突变触发,因此通过上面的钩子也会拦截对Task.user的更改:

代码语言:javascript复制
>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     t1 = Task(task_date=datetime.datetime.now())
...     t1.user = u1
...     sess.add(t1)
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f3b0c070d10>, <__main__.Task object at 0x7f3b0c057d10>]
使用 viewonly=True 的 In-Python 突变不合适

上述映射将User.current_week_tasks视图关系作为Task.user属性的 backref 目标。这目前并未被 SQLAlchemy 的 ORM 配置过程标记,但这是一个配置错误。更改Task上的.user属性不会影响.current_week_tasks属性:

代码语言:javascript复制
>>> u1 = User()
>>> t1 = Task(task_date=datetime.datetime.now())
>>> t1.user = u1
>>> u1.current_week_tasks
[]

这里还有另一个参数叫做relationship.sync_backrefs,可以在这里打开,允许在这种情况下对.current_week_tasks进行变异,然而这并不被认为是最佳实践,对于只读关系,不应依赖于 Python 中的变异。

在这种映射中,可以在User.all_tasksTask.user之间配置反向引用,因为这两者都不是只读的,将正常同步。

除了对于只读关系禁用反向引用变异的问题外,Python 中对User.all_tasks集合的普通更改也不会反映在User.current_week_tasks集合中,直到更改已刷新到数据库。

总的来说,对于一个自定义集合应立即响应 Python 中的变异的用例,只读关系通常不合适。更好的方法是使用 SQLAlchemy 的 Hybrid Attributes 功能,或者对于仅实例情况,使用 Python 的@property,其中可以实现以当前 Python 实例为基础生成的用户定义集合。要将我们的示例更改为这种方式工作,我们修复Task.user上的relationship.back_populates参数,引用User.all_tasks,然后演示一个简单的@property,将以即时User.all_tasks集合的结果为基础提供结果。

代码语言:javascript复制
class User(Base):
    __tablename__ = "user_account"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str | None]

    all_tasks: Mapped[list[Task]] = relationship(back_populates="user")

    @property
    def current_week_tasks(self) -> list[Task]:
        past_seven_days = datetime.datetime.now() - datetime.timedelta(days=7)
        return [t for t in self.all_tasks if t.task_date >= past_seven_days]

class Task(Base):
    __tablename__ = "task"

    id: Mapped[int] = mapped_column(primary_key=True)
    user_account_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
    description: Mapped[str | None]
    task_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())

    user: Mapped[User] = relationship(back_populates="all_tasks")

使用每次都在 Python 中计算的即时集合,我们保证始终具有正确答案,而无需使用数据库:

代码语言:javascript复制
>>> u1 = User()
>>> t1 = Task(task_date=datetime.datetime.now())
>>> t1.user = u1
>>> u1.current_week_tasks
[<__main__.Task object at 0x7f3d699523c0>]
viewonly=True 集合/属性在过期之前不会重新查询

继续使用原始的只读属性,如果实际上对持久对象上的User.all_tasks集合进行更改,那么只有在发生两个事情之后,只读集合才能显示这种更改的净结果。第一是刷新对User.all_tasks的更改,以便新数据在数据库中可用,至少在本地事务范围内。第二是User.current_week_tasks属性被过期并通过对数据库的新 SQL 查询重新加载。

为了支持这个要求,使用最简单的流程是仅在主要是只读操作中使用仅视图关系。比如,如果我们从数据库中获取一个新的User,那么集合将是当前的:

代码语言:javascript复制
>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f8711b906b0>]

当我们对u1.all_tasks进行修改时,如果我们希望在u1.current_week_tasks视图关系中看到这些更改的反映,这些更改需要被刷新,并且u1.current_week_tasks属性需要过期,这样它将在下一次访问时进行延迟加载。最简单的方法是使用Session.commit(),保持Session.expire_on_commit参数设置为其默认值True

代码语言:javascript复制
>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.commit()
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f8711b90ec0>, <__main__.Task object at 0x7f8711b90a10>]

上面,对Session.commit()的调用将更改刷新到数据库中,然后使所有对象过期,这样当我们访问u1.current_week_tasks时,一个:term:延迟加载会发生,从数据库中重新获取该属性的内容。

要拦截操作而不实际提交事务,需要首先显式地将属性过期。一个简单的方法是直接调用它。在下面的示例中,Session.flush()将挂起的更改发送到数据库,然后使用Session.expire()使u1.current_week_tasks集合过期,以便在下一次访问时重新获取:

代码语言:javascript复制
>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.flush()
...     sess.expire(u1, ["current_week_tasks"])
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]

我们实际上可以跳过对Session.flush()的调用,假设一个保持Session.autoflush值为默认值TrueSession,因为过期的current_week_tasks属性在过期后被访问时会触发自动刷新:

代码语言:javascript复制
>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.expire(u1, ["current_week_tasks"])
...     print(u1.current_week_tasks)  # triggers autoflush before querying
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]

继续上述方法到更复杂的内容,当相关的User.all_tasks集合发生变化时,我们可以在程序上应用过期,使用 event hooks。这是一种高级技术,应该首先检查简单的体系结构,比如@property或者坚持只读用例。在我们的简单示例中,这将配置为:

代码语言:javascript复制
from sqlalchemy import event, inspect

@event.listens_for(User.all_tasks, "append")
@event.listens_for(User.all_tasks, "remove")
@event.listens_for(User.all_tasks, "bulk_replace")
def _expire_User_current_week_tasks(target, value, initiator):
    inspect(target).session.expire(target, ["current_week_tasks"])

有了上述钩子,变更操作将被拦截,并导致User.current_week_tasks集合自动过期:

代码语言:javascript复制
>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f66d093ccb0>, <__main__.Task object at 0x7f66d093cce0>]

上面使用的AttributeEvents事件钩子也会被反向引用的变化触发,因此通过上述钩子,对Task.user的更改也会被拦截:

代码语言:javascript复制
>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     t1 = Task(task_date=datetime.datetime.now())
...     t1.user = u1
...     sess.add(t1)
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f3b0c070d10>, <__main__.Task object at 0x7f3b0c057d10>]

8711b90ec0>, <main.Task object at 0x7f8711b90a10>]

代码语言:javascript复制
上面,对`Session.commit()`的调用将更改刷新到数据库中,然后使所有对象过期,这样当我们访问`u1.current_week_tasks`时,一个:term:`延迟加载`会发生,从数据库中重新获取该属性的内容。

要拦截操作而不实际提交事务,需要首先显式地将属性过期。一个简单的方法是直接调用它。在下面的示例中,`Session.flush()`将挂起的更改发送到数据库,然后使用`Session.expire()`使`u1.current_week_tasks`集合过期,以便在下一次访问时重新获取:

```py
>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.flush()
...     sess.expire(u1, ["current_week_tasks"])
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]

我们实际上可以跳过对Session.flush()的调用,假设一个保持Session.autoflush值为默认值TrueSession,因为过期的current_week_tasks属性在过期后被访问时会触发自动刷新:

代码语言:javascript复制
>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.expire(u1, ["current_week_tasks"])
...     print(u1.current_week_tasks)  # triggers autoflush before querying
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]

继续上述方法到更复杂的内容,当相关的User.all_tasks集合发生变化时,我们可以在程序上应用过期,使用 event hooks。这是一种高级技术,应该首先检查简单的体系结构,比如@property或者坚持只读用例。在我们的简单示例中,这将配置为:

代码语言:javascript复制
from sqlalchemy import event, inspect

@event.listens_for(User.all_tasks, "append")
@event.listens_for(User.all_tasks, "remove")
@event.listens_for(User.all_tasks, "bulk_replace")
def _expire_User_current_week_tasks(target, value, initiator):
    inspect(target).session.expire(target, ["current_week_tasks"])

有了上述钩子,变更操作将被拦截,并导致User.current_week_tasks集合自动过期:

代码语言:javascript复制
>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f66d093ccb0>, <__main__.Task object at 0x7f66d093cce0>]

上面使用的AttributeEvents事件钩子也会被反向引用的变化触发,因此通过上述钩子,对Task.user的更改也会被拦截:

代码语言:javascript复制
>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     t1 = Task(task_date=datetime.datetime.now())
...     t1.user = u1
...     sess.add(t1)
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f3b0c070d10>, <__main__.Task object at 0x7f3b0c057d10>]

0 人点赞