ORM
- FastAPI 可与任何数据库和任何样式的库配合使用并和数据库通信
- object-relational mapping 对象关系映射
- ORM 具有在代码和数据库表(关系)中的对象之间进行转换(映射)的工具
- 使用 ORM,通常会创建一个表示 SQL 数据表的类,该类的每个属性都表示一个列,具有名称和类型
小栗子
- Pet 类可以表示 SQL 表 pets
- 并且 Pet 类的每个实例对象代表数据库中的一行数据
- 例如,对象 orion_cat(Pet 的一个实例)可以具有属性 orion_cat.type,用于列类型,属性的值可以是:猫
项目架构
代码语言:javascript复制.
└── sql_app
├── __init__.py
├── curd.py
├── database.py
├── main.py
├── models.py
└── schemas.py
前提
需要先安装 sqlalchemy
代码语言:javascript复制pip install sqlalchemy
使用 sqlite
- 后面的栗子,暂时跟着官网,先使用 sqlite 数据库来演示
- 后面有时候再通过 Mysql 来写多一篇文章
database.py 代码
代码语言:javascript复制# 1、导入 sqlalchemy 部分的包
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 2、声明 database url
SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
# 3、创建 sqlalchemy 引擎
engine = create_engine(
url=SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}
)
# 4、创建一个 database 会话
session = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 5、返回一个 ORM Model
Base = declarative_base()
声明 database 连接 url
代码语言:javascript复制SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
第一行是 slite 连接 url
其他数据库连接 url 的写法
代码语言:javascript复制# sqlite-pysqlite 库
sqlite pysqlite:///file_path
# mysql-mysqldb 库
mysql mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
# mysql-pymysql 库
mysql pymysql://<username>:<password>@<host>/<dbname>[?<options>]
# mysql-mysqlconnector 库
mysql mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
# oracle-cx_Oracle 库
oracle cx_oracle://user:pass@hostname:port[/dbname][?service_name=<service>[&key=value&key=value...]]
# postgresql-pypostgresql 库
postgresql pypostgresql://user:password@host:port/dbname[?key=value&key=value...]
# SQL Server-PyODBC 库
mssql pyodbc://<username>:<password>@<dsnname>
创建一个数据库引擎
代码语言:javascript复制engine = create_engine(
url=SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}
)
- 仅适用于 SQlite,其他数据库不需要用到
{"check_same_thread": False}
- 默认情况下,SQLite 将只允许一个线程与其通信,假设每个线程只处理一个独立的请求
- 这是为了防止被不同的事物(对于不同的请求)共享相同的连接
- 但是在 FastAPI 中,使用普通函数 (def) 可以针对同一请求与数据库的多个线程进行交互,因此需要让 SQLite 知道它应该允许使用多线程
- 需要确保每个请求在依赖项中都有自己的数据库连接会话,因此不需要设置为同一个线程
创建一个数据库会话
代码语言:javascript复制SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
- SessionLocal 类的每个实例都是一个数据库会话
- 但 sessionmaker 本身还不是数据库会话
- 但是一旦创建了 SessionLocal 类的实例,这个实例就会成为实际的数据库会话
- 将其命名为 SessionLocal ,方便区分从 SQLAlchemy 导入的 Session
- 稍后将使用 Session(从 SQLAlchemy 导入的那个)
创建一个 ORM 模型基类
代码语言:javascript复制Base = declarative_base()
后面会通过继承这个 Base 类,来创建每个数据库 Model,也称为 ORM Model
models.py 代码
代码语言:javascript复制from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from .database import Base
class User(Base):
# 1、表名
__tablename__ = "users"
# 2、类属性,每一个都代表数据表中的一列
# Column 就是列的意思
# Integer、String、Boolean 就是数据表中,列的类型
id = Column(Integer, primary_key=True, index=True, default=1, autoincrement=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
items = relationship("Item", back_populates="owner")
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True, index=True, default=1, autoincrement=True)
title = Column(String, index=True)
description = Column(String, index=True)
owner_id = Column(Integer, ForeignKey("users.id"))
owner = relationship("User", back_populates="items")
Column
列,一个属性代表数据表中的一列
常用参数
参数 | 作用 |
---|---|
primary_key | 如果设为 True ,这列就是表的主键 |
unique | 如果设为 True ,这列不允许出现重复的值 |
index | 如果设为 True ,为这列创建索引,提升查询效率 |
nullable | 如果设为 True ,这列允许使用空值; 如果设为 False ,这列不允许使用空值 |
default | 为这列定义默认值 |
autoincrement | 如果设为 True ,这列自增 |
- 如果设为 True ,这列允许使用空值;
- 如果设为 False ,这列不允许使用空值
default 为这列定义默认值 autoincrement 如果设为 True ,这列自增
String、Integer、Boolean
代表数据表中每一列的数据类型
schemas.py 代码
背景
为了避免混淆 SQLAlchemy 模型和 Pydantic 模型之间,将使用文件 models.py 编写 SQLAlchemy 模型和文件 schemas.py 编写 Pydantic 模型
实际代码
代码语言:javascript复制from typing import List, Optional
from pydantic import BaseModel
# Item 的基类,表示创建和查询 Item 时共有的属性
class ItemBase(BaseModel):
title: str
description: Optional[str] = None
# 创建 Item 时的 Model
class ItemCreate(ItemBase):
pass
# 查询 Item 时的 Model
class Item(ItemBase):
id: int
owner_id: int
# 向 Pydantic 提供配置
class Config:
# orm_mode 会告诉 Pydantic 模型读取数据,即使它不是字典,而是 ORM 模型(或任何其他具有属性的任意对象)
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: List[Item] = []
class Config:
orm_mode = True
ItemBase、UserBase
基类,声明在创建或读取数据时共有的属性
ItemCreate、UserCreate
创建数据时使用的 Model
Item、User
读取数据时使用的 Model
orm_mode
代码语言:javascript复制class Config:
orm_mode = True
- 这是一个 Pydantic 配置项
- orm_mode 会告诉 Pydantic 模型读取数据,即使它不是字典,而是 ORM 模型(或任何其他具有属性的任意对象)
# 正常情况
id = data["id"]
# 还会尝试从对象获取属性
id = data.id
设置了 orm_mode,Pydantic 模型与 ORM 就兼容了,只需在路径操作的 response_model 参数中声明它即可
orm_mode 的技术细节
- SQLAlchemy 默认情况下 lazy loading 懒加载,即需要获取数据时,才会主动从数据库中获取对应的数据
- 比如获取属性 ,SQLAlchemy 会从 items 表中获取该用户的 item 数据,但在这之前不会主动获取
current_user.items
如果没有 orm_mode
- 从路径操作中返回一个 SQLAlchemy 模型,它将不会包括关系数据(比如 user 中有 item,则不会返回 item,后面再讲实际的栗子)
- 在 orm_mode 下,Pydantic 会尝试从属性访问它要的数据,可以声明要返回的特定数据,它甚至可以从 ORM 中获取它
curd.py 代码
作用
- 主要用来编写与数据库交互的函数,增删改查,方便整个项目不同地方都能进行复用
- 并且给这些函数添加专属的单元测试
实际代码
代码只实现了查询和创建
- 根据 id 查询 user
- 根据 email 查询 user
- 查询所有 user
- 创建 user
- 查询所有 item
- 创建 item
from sqlalchemy.orm import Session
from .models import User, Item
from .schemas import UserCreate, ItemCreate
# 根据 id 获取 user
def get_user(db: Session, user_id: int):
return db.query(User).filter(User.id == user_id).first()
# 根据 email 获取 user
def get_user_by_email(db: Session, email: str):
return db.query(User).filter(User.email == email).first()
# 获取所有 user
def get_users(db: Session, size: int = 0, limit: int = 100):
return db.query(User).offset(size).limit(limit).all()
# 创建 user,user 类型是 Pydantic Model
def create_user(db: Session, user: UserCreate):
fake_hashed_password = user.password "superpolo"
# 1、使用传进来的数据创建 SQLAlchemy Model 实例对象
db_user = User(email=user.email, hashed_password=fake_hashed_password)
# 2、将实例对象添加到数据库会话 Session 中
db.add(db_user)
# 3、将更改提交到数据库
db.commit()
# 4、刷新实例,方便它包含来自数据库的任何新数据,比如生成的 ID
db.refresh(db_user)
return db_user
# 获取所有 item
def get_items(db: Session, size: int = 0, limit: int = 100):
return db.query(Item).offset(size).limit(limit).all()
# 创建 item,item 类型是 Pydantic Model
def create_item(db: Session, item: ItemCreate, user_id: int):
db_item = Item(**item.dict(), owner_id=user_id)
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
create_user、create_item
函数内的操作步骤如下
代码语言:javascript复制# 1、使用传进来的数据创建 SQLAlchemy Model 实例对象
db_user = User(email=user.email, hashed_password=fake_hashed_password)
# 2、将实例对象添加到数据库会话 Session 中
db.add(db_user)
# 3、将更改提交到数据库
db.commit()
# 4、刷新实例,方便它包含来自数据库的任何新数据,比如生成的 ID
db.refresh(db_user)
main.py 代码
代码语言:javascript复制from typing import List
import uvicorn
from fastapi import Depends, FastAPI, HTTPException, status, Path, Query, Body
from sqlalchemy.orm import Session
from models import Base
from schemas import User, UserCreate, ItemCreate, Item
from database import SessionLocal, engine
import curd
Base.metadata.create_all(bind=engine)
app = FastAPI()
# 依赖项,获取数据库会话对象
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# 创建用户
@app.post("/users", response_model=User)
async def create_user(user: UserCreate, db: Session = Depends(get_db)):
# 1、先查询用户是否有存在
db_user = curd.get_user_by_email(db, user.email)
if db_user:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="user 已存在")
res_user = curd.create_user(db, user)
return res_user
# 根据 user_id 获取用户
@app.get("/user_id/{user_id}", response_model=User)
async def get_user(user_id: int = Path(...), db: Session = Depends(get_db)):
return curd.get_user(db, user_id)
# 根据 email 获取用户
@app.get("/user_email/{email}", response_model=User)
async def get_user_by_email(email: str = Path(...), db: Session = Depends(get_db)):
return curd.get_user_by_email(db, email)
# 获取所有用户
@app.get("/users_all/", response_model=List[User])
async def get_users(skip: int = Query(0),
limit: int = Query(100),
db: Session = Depends(get_db)):
return curd.get_users(db, skip, limit)
# 创建 item
@app.post("/users/{user_id}/items", response_model=Item)
async def get_user_item(user_id: int = Path(...), item: ItemCreate = Body(...), db: Session = Depends(get_db)):
return curd.create_user_item(db, item, user_id)
# 获取所有 item
@app.get("/items/", response_model=List[Item])
async def get_items(skip: int = Query(0),
limit: int = Query(100),
db: Session = Depends(get_db)):
return curd.get_items(db, skip, limit)
if __name__ == "__main__":
uvicorn.run(app="main:app", host="127.0.0.1", port=8080, reload=True, debug=True)
依赖项
代码语言:javascript复制def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
- 每个请求都有一个独立的数据库会话(SessionLocal)
- 在请求完成后会自动关闭它
- 然后下一个请求来的时候,会创建一个新会话
声明依赖项
代码语言:javascript复制async def create_user(user: UserCreate, db: Session = Depends(get_db))
- SessionLocal 是 sessionmaker() 创建的,是 SQLAlchemy Session 的代理
- 通过声明 ,IDE 就可以提供智能代码提示啦
db: Session
使用中间件 middleware 代替依赖项声明数据库会话
代码语言:javascript复制# 中间件
@app.middleware("http")
async def db_session_middleware(request: Request, call_next):
# 默认响应
response = Response("Internal server error", status_code=500)
try:
request.state.db = SessionLocal()
response = await call_next(request)
finally:
# 关闭数据库会话
request.state.db.close()
return response
# 依赖项,获取数据库会话对象
def get_db(request: Request):
return request.state.db
request.state
- request.state 是每个 Request 对象的一个属性
- 它用于存储附加到请求本身的任意对象,例如本例中的数据库会话 db
- 也就是说,我不叫 db,叫 sqlite_db 也可以,只是一个属性名
使用中间件 middleware 和使用 yield 的依赖项的区别
- 中间件需要更多的代码,而且稍微有点复杂
- 中间件必须是一个 async 函数,而且需要有 await 的代码,可能会阻塞程序并稍稍降低性能
- 每个请求运行的时候都会先运行中间件,所以会为每个请求都创建一个数据库连接,即使某个请求的路径操作函数并不需要和数据库交互
建议
- 创建数据库连接对象最好还是用带有 yield 的依赖项来完成
- 在其他使用场景也是,能满足需求的前提下,最好用带有 yield 的依赖项来完成