Flask 扫盲系列-数据库

2020-05-22 10:40:16 浏览数 (1)

在前面的学习中,我们已经简单搭建了一个在线股票走势查询系统,并且了解了 Flask 中的上下文,那么今天我们一起来学习下 Flask 中的数据库操作。

Flask-SQLAlchemy

说多数据库,相信大家都是再熟悉不过了,无论是什么程序,都需要和各种各样的数据打交道,那么保存这些数据的地方,就是数据库了。Flask 支持多种数据库,同时我们未来方便安全的操作数据库,这里选择使用 Flask-SQLAlchemy 插件来管理数据库的相关操作。

实战登陆

我们直接从实战出发,来实践下它们的用法。

在上一篇我们定义了一个登陆页面,但是对于登陆我们并没有校验,当然也没有保存任何用户信息,现在我们来完善登陆注册功能。

定义表结构

首先我们定义用户表的表结构,为了方便起见,我们使用插件 flask_login 来进行用户鉴权,在 app.py 文件中添加如下代码

代码语言:javascript复制
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
from flask_login import UserMixin, login_user
import hashlib


db = SQLAlchemy(app)


# 用户表结构
class WebUser(UserMixin, db.Model):
    __tablename__ = 'webuser'
    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.String(64), unique=True, index=True)
    email = db.Column(db.String(64), unique=True, index=True)
    username = db.Column(db.String(64), unique=True, index=True)
    password_hash = db.Column(db.String(128))
    confirmed = db.Column(db.Boolean, default=False)

    def __init__(self, **kwargs):
        super(WebUser, self).__init__(**kwargs)
        if self.email is not None and self.avatar_hash is None:
            self.avatar_hash = hashlib.md5(
                self.email.lower().encode('utf-8')).hexdigest()

    @staticmethod
    def insert_user():
        users = {
            'user1': ['user1@luobo.com', 'test1', 1],
            'user2': ['user2@luobo.com', 'test2', 1],
            'admin1': ['admin1@luobo.com', 'admin1', 2],
            'admin2': ['admin2@luobo.com', 'admin2', 2]
        }
        for u in users:
            user = WebUser.query.filter_by(username=u[0]).first()
            if user is None:
                user = WebUser(user_id=time.time(), username=u, email=users[u][0],
                               confirmed=True, role_id=users[u][2])
                user.password = users[u][1]
                db.session.add(user)
            db.session.commit()

    @property
    def password(self):
        raise AttributeError('You can not read the password')

    @password.setter
    def password(self, password):
        self.password_hash = generate_password_hash(password)

    def verify_password(self, password):
        if self.password_hash is not None:
            return check_password_hash(self.password_hash, password)

我们定义了用户表的字段,包括 user_id、emali、username 等,对于用户密码的存储,使用 security 工具进行哈希处理后存储。同时还定义了一个静态方法 insert_user 用于初始化用户。

修改视图函数

接下来我们修改 login 视图函数,进行真正的用户验证

代码语言:javascript复制
@app.route('/login/', methods=['GET', 'POST'])
def login():
    form = LoginForm()
    if form.validate_on_submit():
        user = WebUser.query.filter_by(email=form.email.data).first()
        if user is not None and user.verify_password(form.password.data):
            login_user(user)
            flash('欢迎回来!')
            return redirect(request.args.get('next') or url_for('index'))
        flash('用户名或密码不正确!')
    return render_template('login.html', form=form)

数据库设置

下面我们还需要设置数据库连接信息

代码语言:javascript复制
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///'   'myweb.sqlite'
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True 

SQLALCHEMY_DATABASE_URI 是数据库的连接地址,我们直接使用轻巧的 sqlite 文件数据库,SQLALCHEMY_COMMIT_ON_TEARDOWN 设置为 True,表示每次请求结束后,都会自动提交数据库的变动。

下面我们在终端进入到 flask shell 中

代码语言:javascript复制
C:WorkcodeFlaskflask_stock>flask shell

然后使用 Flask-SQLAlchemy 提供的函数 create_all() 创建数据库表

代码语言:javascript复制
>>> from app import db
>>> db.create_all()

如果不出意外,此时当前目录下应该会生成一个 myweb.sqlite 文件。

之后我们在通过 WebUser 类的静态方法来插入初始用户

代码语言:javascript复制
>>> from app import WebUser
>>> WebUser.insert_user()

此时如果我们通过数据库连接工具查看 webuser 表的话,会发现数据已经成功插入了。

配置 flask_login 插件

最后为了使用 flask_login 插件,我们还需要通过 LoginManager 对象来初始化 app 实例。LoginManager 对象的 session_protection 属性可以设为 None、'basic' 或 'strong',以提供不同的安全等级,防止用户会话遭篡改。

代码语言:javascript复制
from flask_login import LoginManager

login_manager = LoginManager(app)
login_manager.session_protection = 'strong'

最后,Flask-Login 要求程序实现一个回调函数,使用指定的标识符加载用户。

代码语言:javascript复制
@login_manager.user_loader 
def load_user(user_id):     
    return WebUser.query.get(int(user_id))

现在我们就可以尝试使用已有的用户和密码去登陆系统了,如果不出意外的话,使用正确的用户名和密码才能成功登陆。

现在再把 flash 消息渲染到 HTML 页面上

代码语言:javascript复制
{% for message in get_flashed_messages() %}
<div class="alert alert-warning">
     <button type="button" class="close" data-dismiss="alert">&times;</button>
     {{ message }}
 </div>
{% endfor %}

验证用户

下面我们再来看下如何验证用户是否登陆。

还记得我们的 WebUser 类其实是继承自 flask_login 的 UserMixin 类的,该类已经实现了如下的用户方法

属性/方法

说明

is_authenticated

如果用户已经认证,返回 True,否则返回 False

is_active

如果用户允许登陆,返回 True,否则返回 False

is_anonymous

如果当前用户未登录,返回 True,否则返回 False

get_id()

返回用户的唯一标识符,使用 Unicode 编码字符串

再结合 flask_login 提供的 current_user 对象,就可以判断用户的认证状态了。current_user 是一个和 current_app 类似的代理对象(Proxy), 表示当前用户。

修改 get_kline_chart 的 30 天逻辑

代码语言:javascript复制
from flask_login import current_user

def get_kline_chart():
...
    if int(query_time) > 30:
        if current_user.is_authenticated:
            pass
        else:
            abort(403)
...

修改用户认证判断逻辑

因为在上一篇里我们在模板中是通过 {% if not auth %} 来判断用户登陆与否的,现在需要修改下

代码语言:javascript复制
<ul class="nav navbar-nav navbar-right">
                {% if current_user.is_authenticated %}
                <li><a href="{{ url_for('logout') }}">Log Out</a></li>
                {% else %}
                <li><a href="{{ url_for('login') }}">Log In</a></li>
                {% endif %}
            </ul>

而对于 logout 视图函数,也做如下修改

代码语言:javascript复制
from flask_login import logout_user, login_required

@app.route('/logout/')
@login_required
def logout():
    logout_user()
    return redirect(url_for('index'))

直接调用 logout_user 函数就可以登出用户,同时还需要注意,这里使用了 login_required 装饰器,顾名思义,只有认证了的用户才可以调用该装饰器装饰的视图函数,这样就保证了未登陆的用户无权限访问 /logout 地址。

实战注册

注册我们就不做的过于复杂了,只要用户输入正确的 email 地址且唯一并且两次 password 一致,我们就通过注册。

定义注册表单

创建一个注册表单类

代码语言:javascript复制
class RegisterForm(FlaskForm):
    email = StringField('email', validators=[DataRequired()])
    password = PasswordField('password', validators=[DataRequired(),
                                                     EqualTo('confirm_pw', message='两次输入的密码需要一致!')])
    confirm_pw = PasswordField('confirm_pw', validators=[DataRequired()])
    submit = SubmitField('Submit')

    def validate_email(self, field):
        if WebUser.query.filter_by(email=field.data).first():
            raise ValidationError('该邮箱已经存在!')

以 validate_ 开头且后面跟着字段名的方法,是固定写法,用于自定义字段的验证方法。

然后我们再创建一个注册视图函数

代码语言:javascript复制
@app.route('/register/', methods=['GET', 'POST'])
def register():
    form = RegisterForm()
    if form.validate_on_submit():
        email = form.email.data
        password = form.password.data
        user = WebUser.query.filter_by(email=email).first()
        if user is None:
            newuser = WebUser(email=email, username=email, password=password, user_id=time.time())
            db.session.add(newuser)
            flash("你可以登陆啦!")
            return redirect(url_for('login'))
        flash("邮箱已经存在!")
    return render_template('register.html', form=form)

在该视图函数中,我们接收表单传递过来的数据,并验证 email 是否存在,如果不存在则插入数据库。并且跳转至登陆页面。

最后我们再编写注册页面,创建 register.html 文件

代码语言:javascript复制
{% extends "base.html" %}
{% import "bootstrap/wtf.html" as wtf %}

{% block title %}注册{% endblock %}


{% block page_content %}
{% for message in get_flashed_messages() %}
<div class="alert alert-warning">
     <button type="button" class="close" data-dismiss="alert">&times;</button>
     {{ message }}
 </div>
{% endfor %}
{{ wtf.quick_form(form) }}
{% endblock %}

这样,一个注册功能就完成了。

当然我们最好还是给出一个注册的入口,这个入口就在登陆表单的下面

代码语言:javascript复制
<p>
    还没有用户?
    <a href="{{ url_for('register') }}">
        点击这里注册
    </a>
</p>

快来动手实践下吧!

0 人点赞