文章目录
- 1. 安装
- 2. 创建models
- 3. 连接数据库
- 4. 插入文档
- 5. 查询
- 6. 更新、删除
- 7. 嵌套文档
learn from 《Building Data Science Applications with FastAPI》
面向文档的数据库(如MongoDB)不需要预先配置模式
Motor,这是一个用于与 MongoDB 异步通信的库,由MongoDB组织官方支持
1. 安装
代码语言:javascript复制pip install motor
Successfully installed motor-2.5.1 pymongo-3.12.3
2. 创建models
- MongoDB 会为每个文件创建
_id
属性作为唯一标识符,但是_
开头的变量被 Pydantic 认为是私有的,不会作为数据字段 _id
是二进制对象,不被 Pydantic 支持
# _*_ coding: utf-8 _*_
# @Time : 2022/3/23 14:37
# @Author : Michael
# @File : models.py
# @desc :
from datetime import datetime
from typing import Optional
from bson import ObjectId # A MongoDB ObjectId
from pydantic import BaseModel, Field
class PyObjectId(ObjectId):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
if not ObjectId.is_valid(v):
raise ValueError("Invalid objectid")
return ObjectId(v)
@classmethod
def __modify_schema__(cls, field_schema):
field_schema.update(type="string")
class MongoBaseModel(BaseModel):
# PyObjectId 类的作用是 使得 ObjectId 成为 Pydantic 兼容的类型
id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
# alias 是一个 pydantic选项,在调用 dict 方法时,会转换为 _id 名,这是MongoDB需要的
class Config:
json_encoders = {ObjectId: str}
# json序列化时,采用的映射方法,ObjectId自己实现了__str__,可以被映射为 str
class PostBase(MongoBaseModel):
title: str
content: str
publication_date: datetime = Field(default_factory=datetime.now)
class PostPartialUpdate(BaseModel):
title: Optional[str] = None
content: Optional[str] = None
class PostCreate(PostBase):
pass
class PostDB(PostBase):
pass
3. 连接数据库
https://docs.mongodb.com/manual/ reference/connection-string/
代码语言:javascript复制# _*_ coding: utf-8 _*_
# @Time : 2022/3/23 14:37
# @Author : Michael
# @File : app.py.py
# @desc :
from typing import List, Tuple
from bson import ObjectId, errors # BSON (Binary JSON) encoding and decoding
from fastapi import Depends, FastAPI, HTTPException, Query, status
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
from web_python_dev.mongo_motor.models import PostDB, PostCreate, PostPartialUpdate
app = FastAPI()
motor_client = AsyncIOMotorClient(
"mongodb://localhost:27017"
)
database = motor_client["cp6_mongodb"] # 单个数据库实例
def get_database() -> AsyncIOMotorDatabase:
return database
4. 插入文档
代码语言:javascript复制async def pagination(skip: int = Query(0, ge=0),
limit: int = Query(10, ge=0)) -> Tuple[int, int]:
capped_limit = min(100, limit)
return (skip, capped_limit)
async def get_object_id(id: str) -> ObjectId:
try:
return ObjectId(id)
except (errors.InvalidId, TypeError):
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
async def get_post_or_404(
id: ObjectId = Depends(get_object_id),
database: AsyncIOMotorDatabase = Depends(get_database)
) -> PostDB:
raw_post = await database['posts'].find_one({"_id": id})
if raw_post is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return PostDB(**raw_post)
代码语言:javascript复制@app.post("/posts", response_model=PostDB, status_code=status.HTTP_201_CREATED)
async def create_post(
post: PostCreate, database: AsyncIOMotorDatabase = Depends(get_database)
) -> PostDB:
post_db = PostDB(**post.dict())
await database["posts"].insert_one(post_db.dict(by_alias=True))
# by_alias=True 使用 _id 来序列化
post_db = await get_post_or_404(post_db.id, database)
return post_db
测试之前需要 docker 开启服务
代码语言:javascript复制docker run -d --name fastapi-mongo -p 27017:27017 mongo:4.4
5. 查询
代码语言:javascript复制@app.get("/posts")
async def list_posts(
pagination: Tuple[int, int] = Depends(pagination),
database: AsyncIOMotorDatabase = Depends(get_database)
) -> List[PostDB]:
skip, limit = pagination
query = database['posts'].find({}, skip=skip, limit=limit)
# find 第一个参数 是过滤用的,我们要获取所有的,所以留空
result = [PostDB(**raw_post) async for raw_post in query]
# async 列表推导
return result
代码语言:javascript复制@app.get("/posts/{id}", response_model=PostDB)
async def get_post(post: PostDB = Depends(get_post_or_404)) -> PostDB:
return post
6. 更新、删除
代码语言:javascript复制@app.patch("/posts/{id}", response_model=PostDB)
async def update_post(
post_update: PostPartialUpdate,
post: PostDB = Depends(get_post_or_404),
database: AsyncIOMotorDatabase = Depends(get_database),
) -> PostDB:
await database["posts"].update_one(
{"_id": post.id}, {"$set": post_update.dict(exclude_unset=True)}
)
post_db = await get_post_or_404(post.id, database)
return post_db
代码语言:javascript复制@app.delete("/posts/{id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_post(
post: PostDB = Depends(get_post_or_404),
database: AsyncIOMotorDatabase = Depends(get_database),
):
await database["posts"].delete_one({"_id": post.id})
7. 嵌套文档
如果我们想将 post 和 comment 一起存储
在 models.py 中添加
代码语言:javascript复制class CommentBase(BaseModel):
publication_date: datetime = Field(default_factory=datetime.now)
content: str
class CommentCreate(CommentBase):
pass
class CommentDB(CommentBase):
pass
class PostDB(PostBase):
comments: List[CommentDB] = Field(default_factory=list)
现在我们在查询一下看看,发现 comments 也在结果当中
那我们要创建 comments 的内容
代码语言:javascript复制@app.post("/posts/{id}/comments", response_model=PostDB, status_code=status.HTTP_201_CREATED)
async def create_comment(comment:CommentCreate,
post:PostDB=Depends(get_post_or_404),
database:AsyncIOMotorDatabase=Depends(get_database))->PostDB:
await database['posts'].update_one(
{'_id': post.id}, {'$push': {'comments': comment.dict()}}
)
# $push操作。这是向列表属性添加元素的有用运算符
post_db = await get_post_or_404(post.id, database)
return post_db
更多update操作:https://www.mongodb.com/docs/manual/reference/operator/update/