Python 使用SQLAlchemy数据库模块

2023-11-24 10:42:10 浏览数 (1)

SQLAlchemy 是用Python编程语言开发的一个开源项目,它提供了SQL工具包和ORM对象关系映射工具,使用MIT许可证发行,SQLAlchemy 提供高效和高性能的数据库访问,实现了完整的企业级持久模型。

ORM(对象关系映射)是一种编程模式,用于将对象与关系型数据库中的表和记录进行映射,从而实现通过面向对象的方式进行数据库操作。ORM 的目标是在编程语言中使用类似于面向对象编程的语法,而不是使用传统的 SQL 查询语言,来操作数据库。

主要思想是将数据库表的结构映射到程序中的对象,通过对对象的操作来实现对数据库的操作,而不是直接编写 SQL 查询。ORM 工具负责将数据库记录转换为程序中的对象,反之亦然。

ORM 的核心概念包括:

  1. 实体(Entity): 在 ORM 中,实体是指映射到数据库表的对象。每个实体对应数据库中的一条记录。
  2. 属性(Attribute): 实体中的属性对应数据库表中的列。每个属性表示一个字段。
  3. 关系(Relationship): ORM 允许定义实体之间的关系,例如一对多、多对一、多对多等。这种关系会映射到数据库表之间的关系。
  4. 映射(Mapping): ORM 负责将实体的属性和方法映射到数据库表的列和操作。
  5. 会话(Session): ORM 提供了会话来管理对象的生命周期,包括对象的创建、更新和删除。
  6. 查询语言: ORM 通常提供一种查询语言,允许开发者使用面向对象的方式编写查询,而不是直接使用 SQL。

对象映射ROM模型可连接任何关系数据库,连接方法大同小异,以下总结了如何连接常用的几种数据库方式。

代码语言:javascript复制
# sqlite 创建数据库连接
engine = create_engine('sqlite:///database.db', echo=False)

# sqlite 创建内存数据库
engine = create_engine('sqlite://')
engine = create_engine('sqlite:///:memory:', echo=True)

# PostgreSQL 创建数据库连接
engine = create_engine('postgresql://scott:tiger@localhost/mydatabase')                # default
engine = create_engine('postgresql psycopg2://scott:tiger@localhost/mydatabase')       # psycopg2
engine = create_engine('postgresql pg8000://scott:tiger@localhost/mydatabase')         # pg8000

# MySQL 创建数据库连接
engine = create_engine('mysql://scott:tiger@localhost/foo')                  # default
engine = create_engine('mysql mysqldb://scott:tiger@localhost/foo')          # mysql-python
engine = create_engine('mysql mysqlconnector://scott:tiger@localhost/foo')   # MySQL-connector-python
engine = create_engine('mysql oursql://scott:tiger@localhost/foo')           # OurSQL

# Oracle 创建数据库连接
engine = create_engine('oracle://scott:tiger@127.0.0.1:1521/sidname')
engine = create_engine('oracle cx_oracle://scott:tiger@tnsname')

# MSSQL 创建数据库连接
engine = create_engine('mssql pyodbc://scott:tiger@mydsn')                   # pyodbc
engine = create_engine('mssql pymssql://scott:tiger@hostname:port/dbname')   # pymssql

数据表创建

简单的创建一个User映射类,映射到UserDB库上,分别增加几个常用的数据库字段,并插入一些测试数据。

代码语言:javascript复制
import sqlite3,datetime,time
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Float, DateTime, Date, Time, Boolean, DECIMAL, Enum, Text

# 建立基本映射类
Base = declarative_base()

# 创建SQLITE数据库
engine = create_engine('sqlite:///:memory:', echo=False)

# 创建映射类User
class User(Base):
    __tablename__ = 'UserDB'

    # 主键 primary_key | 自动增长 autoincrement | 不为空 nullable | 唯一性约束 unique
    id = Column(Integer, primary_key=True, autoincrement=True, nullable=True, unique=True)

    # 字符串类型
    username = Column(String(32), nullable=True, default="none")
    password = Column(String(32), nullable=True, default="none")

    # 姓名字段默认值是0
    age = Column(Integer,nullable=False, default=0)

    # 增加创建日期 [日期:时间]
    create_time = Column(DateTime, default=datetime.datetime.now)

    # onupdate=datetime.now 每次更新数据的时候都要更新该字段值
    update_time = Column(DateTime, onupdate=datetime.datetime.now, default=datetime.datetime.now)

    # 增加用户分数
    user_value = Column(Float, default=0.0)

    # 枚举类型定义
    # tag = Column(Enum("python",'flask','django'))

    # __repr__方法用于输出该类的对象被print()时输出的字符串,如果不想写可以不写
    def __repr__(self):
        return "<UserDB(username='%s',password='%s')>" % (self.username,self.password)

if __name__ == "__main__":
    print("当前表名: {}".format(User.__table__))

    # 创建会话
    Session = sessionmaker(bind=engine)
    session = Session()

    # 创建数据表
    Base.metadata.create_all(engine, checkfirst=True)

    # 逐条增加新记录
    insert_user = User(username='lyshark', password='123456', age=24, user_value=12.5)
    session.add(insert_user)

    insert_user = User(username='sqlalchemy', password='123', age=34, user_value=45.8)
    session.add(insert_user)

    # 插入多条记录
    session.add_all(
        [
         User(username="admin", password="123123", age=54, user_value=66.9),
         User(username="root", password="3456576", age=67, user_value=98.4)
        ]
    )

    # 提交事务
    session.commit()

数据库查询

演示了通过ORM关系映射实现对单表的简单查询与筛选过滤功能。

代码语言:javascript复制
import sqlite3,time,datetime
from sqlalchemy import func
from sqlalchemy import or_
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Float,DateTime

# 建立基本映射类
Base = declarative_base()

# 创建SQLITE数据库
engine = create_engine('sqlite:///:memory:', echo=False)

# 创建映射类User
class User(Base):
    __tablename__ = 'UserDB'
    id = Column(Integer, primary_key=True, autoincrement=True, nullable=True, unique=True)
    username = Column(String(32), nullable=True, default="none")
    password = Column(String(32), nullable=True, default="none")
    age = Column(Integer,nullable=False, default=0)
    create_time = Column(DateTime, default=datetime.datetime.now)
    update_time = Column(DateTime, onupdate=datetime.datetime.now, default=datetime.datetime.now)
    user_value = Column(Float, default=0.0)

    # __repr__方法用于输出该类的对象被print()时输出的字符串,如果不想写可以不写
    def __repr__(self):
        return "<UserDB(username='%s',password='%s')>" % (self.username,self.password)

if __name__ == "__main__":
    # 创建会话
    Session = sessionmaker(bind=engine)
    session = Session()

    # 创建数据表(存在则跳过)
    Base.metadata.create_all(engine, checkfirst=True)

    # 查询所有字段
    all_value = session.query(User).all()
    for item in all_value:
        print("ID: {} --> 用户: {}".format(item.id, item.username))

    # 查询指定字段
    key_value = session.query(User.username,User.password).all()
    print(key_value)

    # 查询第一条
    first_value = session.query(User).first()
    print("第一条记录: {} {}".format(first_value.username, first_value.password))

    # 使用过滤器 [ 过滤出age>20的用户,输出其(id,username)字段 ]
    filter_value = session.query(User.id,User.username).filter(User.age > 20).all()
    print("过滤结果: {}".format(filter_value))

    # 排序输出 [ 正序/倒序 ]
    sort_value = session.query(User.username,User.age).order_by(User.age).all()
    print("正序排列: {}".format(sort_value))

    sort_value = session.query(User.username,User.age).order_by(User.age.desc()).all()
    print("倒序排列: {}".format(sort_value))

    # 查询计数
    count_value = session.query(User).count()
    print("记录条数: {}".format(count_value))

    # and/or 条件过滤 默认为and 在filter()中用,分隔多个条件表示,如果是or则需增加or_连接多个条件
    and_value = session.query(User.username,User.age).filter(User.age >= 20, User.age <= 40).all()
    print("与查询: {}".format(and_value))

    or_value = session.query(User.username,User.age).filter(or_(User.age >= 20, User.age <= 40)).all()
    print("或查询: {}".format(or_value))

    # 等于查询
    equal_value = session.query(User.username,User.password).filter(User.age == 67).all()
    print("等于查询: {}".format(equal_value))

    not_equal_value = session.query(User.username,User.password).filter(User.age != 67).all()
    print("不等于查询: {}".format(not_equal_value))

    # like模糊匹配
    like_value = session.query(User.username,User.create_time).filter(User.username.like("%ly%")).all()
    print("模糊匹配: {}".format(like_value))

    # in查询范围
    in_value = session.query(User.username,User.password).filter(User.age.in_([24,34])).all()
    print("查询两者: {}".format(in_value))

    not_in_value = session.query(User.username,User.password).filter(User.age.notin_([24,34])).all()
    print("查询非两者: {}".format(not_in_value))

    # op正则匹配查询
    op_value = session.query(User.username).filter(User.username.op("regexp")("^a")).all()
    print("正则匹配: {}".format(op_value))

    # 调用数据库内置函数
    func_value = session.query(func.count(User.age)).one()
    print("调用函数: {}".format(func_value))

    # 数据切片
    cat_value = session.query(User.username).all()[:2]
    print("输出前两条: {}".format(cat_value))

    cat_value = session.query(User.username).offset(5).limit(3).all()
    print("第6行开始显示前3个: {}".format(cat_value))

    cat_value = session.query(User.username).order_by(User.id.desc())[0:10]
    print("输出最后10个: {}".format(cat_value))

    # 非空查询
    isnot_value = session.query(User).filter(User.username.isnot(None)).all()
    print("非空显示: {}".format(isnot_value))

    null_value = session.query(User).filter(User.username.is_(None)).all()
    print("为空显示: {}".format(null_value))

    # 分组测试
    group_by = session.query(User.username,func.count(User.id)).group_by(User.age).all()
    print("根据年龄分组: {}".format(group_by))

    # 进一步过滤查询
    having_by = session.query(User.username,User.age).group_by(User.age).having(User.age > 30).all()
    print("以age分组,并查询age大于30的记录: {}".format(having_by))

数据库修改

演示了修改数据库参数以及对数据库指定记录的删除功能。

代码语言:javascript复制
import sqlite3,time,datetime
from sqlalchemy import func
from sqlalchemy import or_
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Float,DateTime

# 建立基本映射类
Base = declarative_base()

# 创建SQLITE数据库
engine = create_engine('sqlite:///:memory:', echo=False)

# 创建映射类User
class User(Base):
    __tablename__ = 'UserDB'
    id = Column(Integer, primary_key=True, autoincrement=True, nullable=True, unique=True)
    username = Column(String(32), nullable=True, default="none")
    password = Column(String(32), nullable=True, default="none")
    age = Column(Integer,nullable=False, default=0)
    create_time = Column(DateTime, default=datetime.datetime.now)
    update_time = Column(DateTime, onupdate=datetime.datetime.now, default=datetime.datetime.now)
    user_value = Column(Float, default=0.0)

    # __repr__方法用于输出该类的对象被print()时输出的字符串,如果不想写可以不写
    def __repr__(self):
        return "<UserDB(username='%s',password='%s')>" % (self.username,self.password)

if __name__ == "__main__":
    # 创建会话
    Session = sessionmaker(bind=engine)
    session = Session()

    # 创建数据表(存在则跳过)
    Base.metadata.create_all(engine, checkfirst=True)

    # 修改数据: 先查询在修改
    select_update = session.query(User).filter_by(username="lyshark").first()
    select_update.password = "test1234"
    session.commit()

    # 修改数据: 直接修改
    session.query(User).filter_by(username="lyshark").update({User.password: 'abcd'})
    session.commit()

    session.query(User).filter_by(username="lyshark").update({"password": '123456'})
    session.commit()

    # 删除数据: 先查询在删除
    del_ptr = session.query(User).filter_by(username="lyshark").first()
    session.delete(del_ptr)
    session.commit()

    # 删除数据: 直接删除
    session.query(User).filter(User.username=="sqlalchemy").delete()
    session.commit()

数据库查询转字典

将从数据库中过滤查询指定的记录,并将该记录转换为字典JSON格式,利于解析。

代码语言:javascript复制
import sqlite3,time,datetime,json
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Float,DateTime

# 建立基本映射类
Base = declarative_base()

# 创建SQLITE数据库
engine = create_engine('sqlite:///:memory:', echo=False)

# 创建映射类User
class User(Base):
    __tablename__ = 'UserDB'
    id = Column(Integer, primary_key=True, autoincrement=True, nullable=True, unique=True)
    username = Column(String(32), nullable=True, default="none")
    password = Column(String(32), nullable=True, default="none")
    age = Column(Integer,nullable=False, default=0)
    create_time = Column(DateTime, default=datetime.datetime.now)
    update_time = Column(DateTime, onupdate=datetime.datetime.now, default=datetime.datetime.now)
    user_value = Column(Float, default=0.0)

    # 查询结果转字典 (保留数据类型)
    def single_to_dict(self):
        return {c.name: getattr(self, c.name) for c in self.__table__.columns}

    # 查询结果转字典 (全转为字符串)
    def dobule_to_dict(self):
        result = {}
        for key in self.__mapper__.c.keys():
            if getattr(self, key) is not None:
                result[key] = str(getattr(self, key))
            else:
                result[key] = getattr(self, key)
        return result

# 将查询结果转为JSON
def to_json(all_vendors):
    v = [ ven.dobule_to_dict() for ven in all_vendors ]
    return v

if __name__ == "__main__":
    # 创建会话
    Session = sessionmaker(bind=engine)
    session = Session()

    # 创建数据表(存在则跳过)
    Base.metadata.create_all(engine, checkfirst=True)

    # 查询结果转为字典(保持数据库格式)
    key_value = session.query(User).first()
    data = key_value.single_to_dict()
    print("转为字典: {}".format(data))

    # 查询结果转为字典(字符串格式)
    key_value = session.query(User).first()
    data = key_value.dobule_to_dict()
    print("转为字符串字典: {}".format(data))

    # 查询结果转为JSON格式
    key_value = session.query(User)
    data = to_json(key_value)
    print("转为JSON格式: {}".format(data))

数据库类内函数调用

用户在使用ORM模型定义类时,可以同时在该映射类中定义各种针对类模型的处理函数,实现对数据的动态处理

代码语言:javascript复制
from werkzeug.security import generate_password_hash,check_password_hash
import sqlite3,datetime,time
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Float, DateTime, Date, Time, Boolean, DECIMAL, Enum, Text

# 创建SQLITE数据库
engine = create_engine("sqlite:///:memory:", encoding='utf-8')
Base = declarative_base()                                       # 生成orm基类

# 创建会话
Session_class = sessionmaker(bind=engine)                       # 创建与数据库的会话session
session = Session_class()                                       # 生成session实例

# 创建映射类User
class User(Base):
    __tablename__ = 'UserDB'
    id = Column(Integer, primary_key=True, autoincrement=True, nullable=True, unique=True)
    username = Column(String(64))
    _password_hash_ = Column(String(256))                       # 加下划线作为私有函数,无法被外部访问
    email = Column(String(64))

    # 设置一个password字段用来设置密码
    @property
    def password(self):
        raise Exception("密码不能被读取")

    # 赋值password字段时,则自动加密存储
    @password.setter
    def password(self, value):
        self._password_hash_ = generate_password_hash(value)

    # 使用 check_password,进行密码校验 返回True False。
    def check_password(self, pasword):
        return check_password_hash(self._password_hash_, pasword)

    # 设置输出函数
    def print_function(self):
        # 密码不可读调用 self.password 会报错
        # print("用户: {} 密码: {}".format(self.username,self.password))

        print("用户: {} email: {}".format(self.username, self.email))
        return True

if __name__ == "__main__":
    print("当前表名: {}".format(User.__table__))

    # 创建数据表
    Base.metadata.create_all(engine, checkfirst=True)

    # 插入测试数据
    insert = User(username="lyshark",password="123123",email="lyshark@163.com")
    session.add(insert)

    insert = User(username="admin",password="556677",email="lyshark@163.com")
    session.add(insert)
    session.commit()

    # 查询测试
    tag = session.query(User).filter_by(username="lyshark").first()
    print("测试密码是否正确: {}".format(tag.check_password("123123")))

    # 调用函数验证当前用户
    tag = session.query(User).filter_by(username="admin").first()
    func = tag.print_function()
    print("输出测试: {}".format(func))

数据库聚合函数

通过func库调用数据库内的聚合函数,实现统计最大最小平均数等数据。

代码语言:javascript复制
import sqlite3,datetime,time
from sqlalchemy import func
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Float, DateTime, Date, Time, Boolean, DECIMAL, Enum, Text

# 建立基本映射类
Base = declarative_base()

# 创建SQLITE数据库
engine = create_engine('sqlite:///:memory:', echo=False)

# 创建映射类User
class User(Base):
    __tablename__ = 'UserDB'

    # 主键 primary_key | 自动增长 autoincrement | 不为空 nullable | 唯一性约束 unique
    id = Column(Integer, primary_key=True, autoincrement=True, nullable=True, unique=True)

    # 字符串类型
    username = Column(String(32), nullable=True, default="none")
    password = Column(String(32), nullable=True, default="none")

    # 姓名字段默认值是0
    age = Column(Integer,nullable=False, default=0)

    # 增加创建日期 [日期:时间]
    create_time = Column(DateTime, default=datetime.datetime.now)

    # onupdate=datetime.now 每次更新数据的时候都要更新该字段值
    update_time = Column(DateTime, onupdate=datetime.datetime.now, default=datetime.datetime.now)

    # 增加用户分数
    user_value = Column(Float, default=0.0)

    # 枚举类型定义
    # tag = Column(Enum("python",'flask','django'))

    # __repr__方法用于输出该类的对象被print()时输出的字符串,如果不想写可以不写
    def __repr__(self):
        return "<UserDB(username='%s',password='%s')>" % (self.username,self.password)

if __name__ == "__main__":
    print("当前表名: {}".format(User.__table__))

    # 创建会话
    Session = sessionmaker(bind=engine)
    session = Session()

    # 创建数据表
    Base.metadata.create_all(engine, checkfirst=True)

    # 统计总数
    count = session.query(func.count(User.id)).first()
    print("总记录: {}".format(count))

    # age 字段平均值
    age_avg = session.query(func.avg(User.age)).first()
    print("平均值: {}".format(age_avg))

    # age 字段最大值
    age_max = session.query(func.max(User.age)).first()
    print("最大值: {}".format(age_max))

    # age 最小值
    age_min = session.query(func.min(User.age)).one()
    print("最小值: {}".format(age_min))

    # age 求和 只求前三个的和
    age_sum = session.query(func.sum(User.age)).one()
    print("求总数: {}".format(age_sum))

    # 提交事务
    session.commit()

ORM定义一对多关系

SQLAlchemy提供了一个relationship,这个类可以定义属性,以后在访问相关联的表的时候就直接可以通过属性访问的方式就可以访问得到。

代码语言:javascript复制
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine, Column, Integer, String, Text, func, ForeignKey

# 打开数据库
Base = declarative_base()
engine = create_engine('sqlite:///:memory:', echo=False)

# 主表
class Author(Base):
    __tablename__ = 'author'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(10), nullable=False)

    # 定义外键关联到Book模型上面,主表是author
    books = relationship('Book', backref='author')

    def __repr__(self):
        return '<Author:(id={}, name={})>'.format(self.id, self.name)

# 从表
class Book(Base):
    __tablename__ = 'book'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(20), nullable=False)

    # 外键关联到主表author的id字段上
    author_id = Column(Integer, ForeignKey('author.id',ondelete="RESTRICT"))

    def __repr__(self):
        return '<Book:(id={}, name={}, author_id={})>'.format(self.id, self.name, self.author_id)

if __name__ == "__main__":
    # 创建会话
    Session = sessionmaker(bind=engine)
    session = Session()

    # 创建数据表(存在则跳过)
    Base.metadata.create_all(engine, checkfirst=True)

    # ------------------------------------------------------
    # 创建数据并插入
    author1 = Author(id=1, name="张三")
    author2 = Author(id=2, name="李四")

    book1 = Book(name="<Python 开发>", author_id=1)
    book2 = Book(name="<C   开发教程>", author_id=1)
    book3 = Book(name="<C# 从入门到精通>", author_id=1)

    book4 = Book(name="<渗透测试指南>", author_id=2)
    book5 = Book(name="<nmap 扫描工具指南>", author_id=2)

    session.add_all([author1,book1,book2,book3])
    session.add_all([author2,book4,book5])
    session.commit()

    # ------------------------------------------------------
    # 关联插入模式

    author1 = Author(id=3, name="王五")

    book1 = Book(name="<Python 开发>", author_id=1)
    book2 = Book(name="<C   开发教程>", author_id=1)
    book3 = Book(name="<C# 从入门到精通>", author_id=1)

    author1.books.append(book1)
    author1.books.append(book2)
    author1.books.append(book3)

    session.add(author1)
    session.commit()

    # ------------------------------------------------------
    # 一对多查询测试
    book = session.query(Book).get(1)
    print("书籍作者: {}".format(book.author.name))

    author = session.query(Author).get(1)
    print("书籍数量: {}".format(len(author.books)))

    for book_name in author.books:
        print("书籍: {}".format(book_name.name))

ORM定义一对一关系

如果想要将两个模型映射成一对一的关系,那么应该在父模型中,指定引用的时候,要传递一个uselist=False参数进去。就是告诉父模型,以后引用这个从模型的时候,不再是一个列表了,而是一个对象了。

代码语言:javascript复制
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker,relationship,backref
from sqlalchemy import create_engine, Column, Integer, String, Text, func, ForeignKey

# 打开数据库
Base = declarative_base()
engine = create_engine('sqlite:///:memory:', echo=False)

# 主表
class User(Base):
    __tablename__ = 'user'
    id = Column(Integer, primary_key=True, autoincrement=True)
    username = Column(String(50), nullable=False)

    def __repr__(self):
        return 'User(username:%s)' % self.username

# 从表
class UserExtend(Base):
    __tablename__ = 'user_extend'
    id = Column(Integer,primary_key=True,autoincrement=True)
    school = Column(String(50))
    age = Column(String(32))
    sex = Column(String(32))

    uid = Column(Integer,ForeignKey('user.id'))
    user = relationship('User',backref=backref('extend', uselist=False))

    #uselist=False 告诉父模型 以后引用时不再是列表 而是对象
    def __repr__(self):
        return 'extend(school:%s)'%self.school

if __name__ == "__main__":
    # 创建会话
    Session = sessionmaker(bind=engine)
    session = Session()

    # 创建数据表(存在则跳过)
    Base.metadata.create_all(engine, checkfirst=True)

    # ------------------------------------------------------
    # 插入测试数据
    user = User(username="lyshark")
    extend = UserExtend(school="<家里蹲大学>", age="22", sex="M")
    extend.user = user

    session.add(extend)
    session.commit()

    # ------------------------------------------------------
    # 一对一关系测试
    user_ptr = session.query(User).first()
    print("用户名: {} --> 学校: {}".format(user_ptr.username,user_ptr.extend.school))

    extend_ptr = session.query(UserExtend).first()
    print("用户名: {} --> 学校: {} --> 年龄: {} --> 性别: {}".format(extend_ptr.user.username,extend_ptr.school,extend_ptr.age,extend_ptr.sex))

ORM定义多对多关系

多对多与上面的一对多,一对一不同,创建多对对必须使用中间表Table来解决查询问题。

  • 多对多的关系需要通过一张中间表来绑定他们之间的关系。
  • 先把两个需要做多对多的模型定义出来
  • 使用Table定义一个中间表,中间表一般就是包含两个模型的外键字段就可以了,并且让他们两个来作为一个“复合主键”。
  • 在两个需要做多对多的模型中随便选择一个模型,定义一个relationship属性,来绑定三者之间的关系,在使用relationship的时候,需要传入一个secondary=中间表。
代码语言:javascript复制
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker,relationship,backref
from sqlalchemy import create_engine, Column, Integer, String, Text, func, ForeignKey, Table
from sqlalchemy import Column,INT,VARCHAR,ForeignKey
from sqlalchemy.orm import relationship

# 打开数据库
Base = declarative_base()
engine = create_engine('sqlite:///:memory:', echo=False)

# 女孩表
class Girls(Base):
    __tablename__ = "girl"
    id = Column(Integer,primary_key=True, autoincrement=True)
    name = Column(String(32))

    # 建立多对多关系
    g2b = relationship("Boys",backref="b2g",secondary="hotel")

# 男孩表
class Boys(Base):
    __tablename__ = "boy"
    id = Column(Integer,primary_key=True, autoincrement=True)
    name = Column(String(32))

# 映射关系表
class Table(Base):
    __tablename__ = "hotel"
    id = Column(Integer, primary_key=True, autoincrement=True)
    boy_id = Column(Integer,ForeignKey("boy.id"))
    girl_id = Column(Integer,ForeignKey("girl.id"))

if __name__ == "__main__":
    # 创建会话
    Session = sessionmaker(bind=engine)
    session = Session()

    # 创建数据表(存在则跳过)
    Base.metadata.create_all(engine, checkfirst=True)

    # ------------------------------------------------------
    # 增加数据 - relationship 正向
    girl_obj = Girls(name="女孩主")
    girl_obj.g2b = [Boys(name="男孩从1"),Boys(name="男孩从2")]
    session.add(girl_obj)
    session.commit()

    # 增加数据 - relationship 反向
    boy_obj = Boys(name="男孩主")
    boy_obj.b2g = [Girls(name="女孩从1"),Girls(name="女孩从2")]
    session.add(boy_obj)
    session.commit()

    # ------------------------------------------------------
    # 正向查询
    girl_obj_list = session.query(Girls).all()
    for girl_obj in girl_obj_list:
        for boy in girl_obj.g2b:
            print(girl_obj.name,boy.name)

    # 反向查询
    boy_obj_list = session.query(Boys).all()
    for boy in boy_obj_list:
        for girl in boy.b2g:
            print(girl.name,boy.name)

连接查询与子查询

连接查询通过JOIN语句实现,子查询则通过subquery实现,首先需要创建一对多关系然后才可使用子查询。

代码语言:javascript复制
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine, Column, Integer, String, Text, func, ForeignKey

# 打开数据库
Base = declarative_base()
engine = create_engine('sqlite:///:memory:', echo=False)

# 主表
class Author(Base):
    __tablename__ = 'author'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(10), nullable=False)

    # 定义外键关联到Book模型上面,主表是author
    books = relationship('Book', backref='author')

    def __repr__(self):
        return '<Author:(id={}, name={})>'.format(self.id, self.name)

# 从表
class Book(Base):
    __tablename__ = 'book'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(20), nullable=False)

    # 外键关联到主表author的id字段上
    author_id = Column(Integer, ForeignKey('author.id',ondelete="RESTRICT"))

    def __repr__(self):
        return '<Book:(id={}, name={}, author_id={})>'.format(self.id, self.name, self.author_id)

if __name__ == "__main__":
    # 创建会话
    Session = sessionmaker(bind=engine)
    session = Session()

    # 创建数据表(存在则跳过)
    Base.metadata.create_all(engine, checkfirst=True)

    # ------------------------------------------------------
    # 创建数据并插入
    author1 = Author(id=1, name="张三")
    author2 = Author(id=2, name="李四")

    book1 = Book(name="<Python 开发>", author_id=1)
    book2 = Book(name="<C   开发教程>", author_id=1)
    book3 = Book(name="<C# 从入门到精通>", author_id=1)

    book4 = Book(name="<渗透测试指南>", author_id=2)
    book5 = Book(name="<nmap 扫描工具指南>", author_id=2)

    session.add_all([author1,book1,book2,book3])
    session.add_all([author2,book4,book5])
    session.commit()

    # ------------------------------------------------------
    # 连表查询
    no_join_select = session.query(Author).filter(Author.id == Book.id).filter(Author.name == "王五").all()
    print("查询主键==从键 并且 Author.name == 王五的记录: {}".format(no_join_select))

    # JOIN 连接查询
    join = session.query(Author).join(Book).filter(Book.name=="<nmap 扫描工具指南>").first().name
    print("查询主表Author中的Book书名的作者是: {}".format(join))

    join = session.query(Book).join(Author).filter(Author.name=="李四").all()
    for book in join:
        print("查询从表Book中的Author作者有哪些书: {}".format(book.name))

    # subquery 子查询
    sbq = session.query(Book.author_id,func.count('*').label("book_count")).group_by(Book.author_id).subquery()
    print("查询出书籍编号计数(子语句): {}".format(sbq))

    sub_join = session.query(Author.name,sbq.c.book_count).outerjoin(sbq,Author.id == sbq.c.author_id).all()
    print("查询用户有几本书(主语句): {}".format(sub_join))

0 人点赞