FastAPI 结合 SQLAlchemy 操作 MySQL 数据库

2022-01-07 11:26:30 浏览数 (1)

文章目录
  • 1. 安装 SQLAlchemy
  • 2. 创建数据库
  • 3. SQLAlchemy 连接 MySQL
  • 4. 创建数据模型
  • 5. 创建 Pydantic 模型
  • 6. crud 工具
  • 7. main函数

learning from 《python web开发从入门到精通》

1. 安装 SQLAlchemy

  • pip install sqlalchemy

2. 创建数据库

  • mysql -u root -p 命令行登录 MySQL

创建数据库 fastapi_db

代码语言:javascript复制
mysql> create database fastapi_db default charset utf8mb4 collate utf8mb4_unicode_ci;
Query OK, 1 row affected (0.04 sec)

3. SQLAlchemy 连接 MySQL

  • database.py
代码语言:javascript复制
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

# 数据库连接配置
SQLALCHEMY_DATABASE_URI = (
    "mysql pymysql://root:123456@localhost/fastapi_db?charset=utf8mb4"
    #                用户:密码@服务器/数据库?参数
)

# 创建数据库引擎
engine = create_engine(SQLALCHEMY_DATABASE_URI)
# 创建数据库会话
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 声明基类
Base = declarative_base()

4. 创建数据模型

  • models.py
代码语言:javascript复制
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from .database import Base


# 定义 User 类
class User(Base):
    __tablename__ = 'users'  # 定义表名
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String(255), unique=True, index=True)
    hashed_password = Column(String(255))
    is_active = Column(Boolean, default=True)
    items = relationship("Item", back_populates="owner")
    # 关联 Item 表


# 定义 Item 类
class Item(Base):
    __tablename__ = "items"
    id = Column(Integer, primary_key=True, index=True)
    title = Column(String(255), index=True)
    description = Column(String(255), index=True)
    owner_id = Column(Integer, ForeignKey('users.id'))
    owner = relationship("User", back_populates="items")
    # 关联 User 表

relationship 还不懂,有待学习 SQLAlchemy

5. 创建 Pydantic 模型

  • schemas.py
代码语言:javascript复制
from typing import List
from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: str = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: List[Item] = []

    class Config:
        orm_mode = True

6. crud 工具

  • crud.py
代码语言:javascript复制
from sqlalchemy.orm import Session

from . import models, schemas


def get_user(db: Session, user_id: int):
    """
    根据id获取用户信息
    :param db: 数据库会话
    :param user_id: 用户id
    :return: 用户信息
    """
    return db.query(models.User).filter(models.User.id == user_id).first()


def get_user_by_email(db: Session, email: str):
    """
    根据email获取用户信息
    :param db: 数据库会话
    :param email: 用户email
    :return: 用户信息
    """
    return db.query(models.User).filter(models.User.email == email).first()


def get_users(db: Session, skip: int = 0, limit: int = 100):
    """
    获取特定数量的用户
    :param db: 数据库会话
    :param skip: 开始位置
    :param limit: 限制数量
    :return: 用户信息列表
    """
    return db.query(models.User).offset(skip).limit(limit).all()


def create_user(db: Session, user: schemas.UserCreate):
    """
    创建用户
    :param db: 数据库会话
    :param user: 用户模型
    :return: 根据email和password登录的用户信息
    """
    fake_hashed_password = user.password   "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db.add(db_user)      # 添加到会话
    db.commit()          # 提交到数据库
    db.refresh(db_user)  # 刷新数据库
    return db_user


def get_items(db: Session, skip: int = 0, limit: int = 100):
    """
    获取指定数量的item
    :param db: 数据库会话
    :param skip: 开始位置
    :param limit: 限制数量
    :return: item列表
    """
    return db.query(models.Item).offset(skip).limit(limit).all()


def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
    """
    创建用户item
    :param db: 数据库会话
    :param item: Item对象
    :param user_id: 用户id
    :return: Item模型对象
    """
    db_item = models.Item(**item.dict(), owner_id=user_id)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item

7. main函数

代码语言:javascript复制
from typing import List

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# 依赖
def get_db():
    try:
        db = SessionLocal()
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    # 根据email查找用户
    db_user = crud.get_user_by_email(db, email=user.email)
    # 如果用户存在,提示该邮箱已经被注册
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    # 返回创建的user对象
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    # 读取指定数量用户
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    # 获取当前id的用户信息
    db_user = crud.get_user(db, user_id=user_id)
    # 如果没有信息,提示用户不存在
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    # 创建该用户的items
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    # 获取所有items
    items = crud.get_items(db, skip=skip, limit=limit)
    return items
代码语言:javascript复制
(pt19) D:web_python_dev>uvicorn fastapi_mysql.main:app --reload
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL C to quit)
INFO:     Started reloader process [6988] using watchgod
INFO:     Started server process [2112]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
代码语言:javascript复制
mysql> use fastapi_db
Database changed
mysql> show tables;
 ---------------------- 
| Tables_in_fastapi_db |
 ---------------------- 
| items                |
| users                |
 ---------------------- 
2 rows in set (0.00 sec)

在网页里发送一个 post 请求后,查询 sql

代码语言:javascript复制
mysql> select * from users;
 ---- ---------------- --------------------- ----------- 
| id | email          | hashed_password     | is_active |
 ---- ---------------- --------------------- ----------- 
|  1 | michael@xx.com | abcdnotreallyhashed |         1 |
 ---- ---------------- --------------------- ----------- 
1 row in set (0.00 sec)

0 人点赞