数据库和ORMS:使用 Motor 跟 MongoDB 通信

2022-11-27 17:23:22 浏览数 (2)

文章目录

  • 1. 安装
  • 2. 创建models
  • 3. 连接数据库
  • 4. 插入文档
  • 5. 查询
  • 6. 更新、删除
  • 7. 嵌套文档

learn from 《Building Data Science Applications with FastAPI》

面向文档的数据库(如MongoDB)不需要预先配置模式

Motor,这是一个用于与 MongoDB 异步通信的库,由MongoDB组织官方支持

1. 安装

代码语言:javascript复制
pip install motor
Successfully installed motor-2.5.1 pymongo-3.12.3

2. 创建models

  • MongoDB 会为每个文件创建 _id 属性作为唯一标识符,但是 _ 开头的变量被 Pydantic 认为是私有的,不会作为数据字段
  • _id 是二进制对象,不被 Pydantic 支持
代码语言:javascript复制
# _*_ coding: utf-8 _*_
# @Time : 2022/3/23 14:37
# @Author : Michael
# @File : models.py
# @desc :

from datetime import datetime
from typing import Optional
from bson import ObjectId  # A MongoDB ObjectId
from pydantic import BaseModel, Field


class PyObjectId(ObjectId):
    @classmethod
    def __get_validators__(cls):
        yield cls.validate

    @classmethod
    def validate(cls, v):
        if not ObjectId.is_valid(v):
            raise ValueError("Invalid objectid")
        return ObjectId(v)

    @classmethod
    def __modify_schema__(cls, field_schema):
        field_schema.update(type="string")


class MongoBaseModel(BaseModel):
    # PyObjectId 类的作用是 使得 ObjectId 成为 Pydantic 兼容的类型
    id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
    # alias 是一个 pydantic选项,在调用 dict 方法时,会转换为 _id 名,这是MongoDB需要的

    class Config:
        json_encoders = {ObjectId: str}
        # json序列化时,采用的映射方法,ObjectId自己实现了__str__,可以被映射为 str


class PostBase(MongoBaseModel):
    title: str
    content: str
    publication_date: datetime = Field(default_factory=datetime.now)


class PostPartialUpdate(BaseModel):
    title: Optional[str] = None
    content: Optional[str] = None


class PostCreate(PostBase):
    pass


class PostDB(PostBase):
    pass

3. 连接数据库

https://docs.mongodb.com/manual/ reference/connection-string/

代码语言:javascript复制
# _*_ coding: utf-8 _*_
# @Time : 2022/3/23 14:37
# @Author : Michael
# @File : app.py.py
# @desc :

from typing import List, Tuple
from bson import ObjectId, errors  # BSON (Binary JSON) encoding and decoding
from fastapi import Depends, FastAPI, HTTPException, Query, status
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase

from web_python_dev.mongo_motor.models import PostDB, PostCreate, PostPartialUpdate

app = FastAPI()
motor_client = AsyncIOMotorClient(
    "mongodb://localhost:27017"
)
database = motor_client["cp6_mongodb"]  # 单个数据库实例

def get_database() -> AsyncIOMotorDatabase:
    return database

4. 插入文档

代码语言:javascript复制
async def pagination(skip: int = Query(0, ge=0),
                     limit: int = Query(10, ge=0)) -> Tuple[int, int]:
    capped_limit = min(100, limit)
    return (skip, capped_limit)


async def get_object_id(id: str) -> ObjectId:
    try:
        return ObjectId(id)
    except (errors.InvalidId, TypeError):
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)


async def get_post_or_404(
        id: ObjectId = Depends(get_object_id),
        database: AsyncIOMotorDatabase = Depends(get_database)
) -> PostDB:
    raw_post = await database['posts'].find_one({"_id": id})
    if raw_post is None:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
    return PostDB(**raw_post)
代码语言:javascript复制
@app.post("/posts", response_model=PostDB, status_code=status.HTTP_201_CREATED)
async def create_post(
        post: PostCreate, database: AsyncIOMotorDatabase = Depends(get_database)
) -> PostDB:
    post_db = PostDB(**post.dict())
    await database["posts"].insert_one(post_db.dict(by_alias=True))
    # by_alias=True 使用 _id 来序列化

    post_db = await get_post_or_404(post_db.id, database)

    return post_db

测试之前需要 docker 开启服务

代码语言:javascript复制
docker run -d --name fastapi-mongo -p 27017:27017 mongo:4.4

5. 查询

代码语言:javascript复制
@app.get("/posts")
async def list_posts(
        pagination: Tuple[int, int] = Depends(pagination),
        database: AsyncIOMotorDatabase = Depends(get_database)
) -> List[PostDB]:
    skip, limit = pagination
    query = database['posts'].find({}, skip=skip, limit=limit)
    # find 第一个参数 是过滤用的,我们要获取所有的,所以留空
    result = [PostDB(**raw_post) async for raw_post in query]
    # async 列表推导
    return result
代码语言:javascript复制
@app.get("/posts/{id}", response_model=PostDB)
async def get_post(post: PostDB = Depends(get_post_or_404)) -> PostDB:
    return post

6. 更新、删除

代码语言:javascript复制
@app.patch("/posts/{id}", response_model=PostDB)
async def update_post(
        post_update: PostPartialUpdate,
        post: PostDB = Depends(get_post_or_404),
        database: AsyncIOMotorDatabase = Depends(get_database),
) -> PostDB:
    await database["posts"].update_one(
        {"_id": post.id}, {"$set": post_update.dict(exclude_unset=True)}
    )

    post_db = await get_post_or_404(post.id, database)

    return post_db
代码语言:javascript复制
@app.delete("/posts/{id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_post(
        post: PostDB = Depends(get_post_or_404),
        database: AsyncIOMotorDatabase = Depends(get_database),
):
    await database["posts"].delete_one({"_id": post.id})

7. 嵌套文档

如果我们想将 post 和 comment 一起存储

在 models.py 中添加

代码语言:javascript复制
class CommentBase(BaseModel):
    publication_date: datetime = Field(default_factory=datetime.now)
    content: str


class CommentCreate(CommentBase):
    pass


class CommentDB(CommentBase):
    pass

class PostDB(PostBase):
    comments: List[CommentDB] = Field(default_factory=list)

现在我们在查询一下看看,发现 comments 也在结果当中

那我们要创建 comments 的内容

代码语言:javascript复制
@app.post("/posts/{id}/comments", response_model=PostDB, status_code=status.HTTP_201_CREATED)
async def create_comment(comment:CommentCreate,
                         post:PostDB=Depends(get_post_or_404),
                         database:AsyncIOMotorDatabase=Depends(get_database))->PostDB:
    await database['posts'].update_one(
        {'_id': post.id}, {'$push': {'comments': comment.dict()}}
    )
    # $push操作。这是向列表属性添加元素的有用运算符
    post_db = await get_post_or_404(post.id, database)
    return post_db

更多update操作:https://www.mongodb.com/docs/manual/reference/operator/update/

0 人点赞