60 秒系统安全认证实战

2021-04-21 18:13:21 浏览数 (2)

/ python 生产实战 60 秒系统安全认证实战 /

上节主要讲解了目前主流的认证规范/协议以及对 JWT 进行了深入的研究和分析并在最后给出了在生产环境中如何去生成一个有效的 Token,基于 Python 语言那在生产环境中是如何进行有效的安全认证的呢?上节我们也基于 JWT 的 Token 的认证过程进行了登陆认证、请求认证的理论分析以及用图示的方式给出了数据的流向,本节我们再带大家从代码层面走一次流程,一方面加深大家对上节理论部分的理解,另一方面也是给大家在做工程的过程中提供一套"模版"快速应用在项目中

1

准备工具

1.1

密码安全

为了数据安全,我们利用 PassLib 对入库的用户密码进行加密处理,推荐的加密算法是"Bcrypt"。我们需要安装依赖包:

代码语言:javascript复制
pip install passlib
pip install bcrypt

简单的介绍一下这两个库: 1.passlib 是 python2&3 的密码散列库,它提供超过 30 种密码散列算法的跨平台实现,以及作为管理现有密码哈希的框架。它被设计成有用的 对于范围广泛的任务,从验证/etc/shadow 中找到的散列到 为多用户应用程序提供全强度密码哈希。2.bcrypt 模块是一个用于在 Python 中生成强哈希值的库。 注意:若对于以上两个库的详细内容比较感兴趣的小伙伴可以自行到 python 官网查看相关资料,我们本节的核心是做整个流程的实现这个具体的库内容就先不做详细介绍了。

2

用户登陆流程

用户通过终端发送 username 和 password 到后端。后端收到数据后,进行一下操作:1.用户信息校验:查询当前系统是否存在该用户,以及密码是否正确 2.如果用户存在,则生成 JWT token 并返回,JWT payload 中可以携带自定义数据

代码语言:javascript复制
# -*- encoding: utf-8 -*-
from datetime import datetime, timedelta
from typing import Optional

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
import jwt
from pydantic import BaseModel
from passlib.context import CryptContext


# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 25


# 模拟数据库数据
fake_users_db = {
    "hiashiniu": {
        "username": "hiashiniu",
        "full_name": "HaiShiNiu",
        "email": "haishiniu@bbs.com",
        "hashed_password": "$2b$12$uG4n5CK4.kCid/MfQ3ns9.uRslWuWr9EL5FR4HYNg6SNo6wEn8OPC",
        "disabled": False,
    }
}


class Token(BaseModel):
    access_token: str
    token_type: str


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")

app = FastAPI()


# 校验密码
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


# 密码哈希
def get_password_hash(password):
    # pwd_context.hash('123456789')
    # '$2b$12$uG4n5CK4.kCid/MfQ3ns9.uRslWuWr9EL5FR4HYNg6SNo6wEn8OPC'
    return pwd_context.hash(password)


# 模拟从数据库读取用户信息
def get_user(db, username: str):
    user_dict = db.get(username,None)
    if user_dict:
        return UserInDB(**user_dict)


# 用户信息校验:username和password分别校验
def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    print(user)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


# 生成token,带有过期时间
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    # 有一个实效性 默认的时间25分钟
    if expires_delta:
        expire = datetime.utcnow()   expires_delta
    else:
        expire = datetime.utcnow()   timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    # 首先校验用户信息
    print(form_data.password)
    print(form_data.username)
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    # 生成并返回token信息
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

3

请求数据流

终端获取到 token 信息后,必须在后续请求的 Authorization 头信息中带有 Bearer token,才能被允许访问。我们添加一个校验函数,对请求的合法性进行校验,读取 token 内容解析并进行验证。

代码语言:javascript复制
# -*- encoding: utf-8 -*-

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
    except jwt.PyJWTError as ex:
        print(ex)
        raise credentials_exception
    user = get_user(fake_users_db, username=username)
    if user is None:
        raise credentials_exception
    return user


@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user

4

效果展示

本小节我们整合上述代码进行一下效果展示

代码语言:javascript复制
# -*- encoding: utf-8 -*-
from datetime import datetime, timedelta
from typing import Optional

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
import jwt
from pydantic import BaseModel
from passlib.context import CryptContext


# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 25


# 模拟数据库数据
fake_users_db = {
    "hiashiniu": {
        "username": "hiashiniu",
        "full_name": "HaiShiNiu",
        "email": "haishiniu@bbs.com",
        "hashed_password": "$2b$12$uG4n5CK4.kCid/MfQ3ns9.uRslWuWr9EL5FR4HYNg6SNo6wEn8OPC",
        "disabled": False,
    }
}


class Token(BaseModel):
    access_token: str
    token_type: str


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")

app = FastAPI()


# 校验密码
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


# 密码哈希
def get_password_hash(password):
    # pwd_context.hash('123456789')
    # '$2b$12$uG4n5CK4.kCid/MfQ3ns9.uRslWuWr9EL5FR4HYNg6SNo6wEn8OPC'
    return pwd_context.hash(password)


# 模拟从数据库读取用户信息
def get_user(db, username: str):
    user_dict = db.get(username,None)
    if user_dict:
        return UserInDB(**user_dict)


# 用户信息校验:username和password分别校验
def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


# 生成token,带有过期时间
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    # 有一个实效性 默认的时间25分钟
    if expires_delta:
        expire = datetime.utcnow()   expires_delta
    else:
        expire = datetime.utcnow()   timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    # 首先校验用户信息
    print(form_data.password)
    print(form_data.username)
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    # 生成并返回token信息
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}


async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
    except jwt.PyJWTError as ex:
        print(ex)
        raise credentials_exception
    user = get_user(fake_users_db, username=username)
    if user is None:
        raise credentials_exception
    return user


@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user

1.使用命令启动项目:

代码语言:javascript复制
uvicorn main:app --reload 

2.使用 Postman 模拟登陆获取验证 Token

3.使用 Postman 模拟携带 Token 获取正常逻辑信息

5

总结

本节核心:从代码层面实战了 登陆认证、请求认证的数据流转,让我们对数据安全有了新的认识。

原创不易,只愿能帮助那些需要这些内容的同行或刚入行的小伙伴,你的每次 点赞、分享 都是我继续创作下去的动力,我希望能在推广 python 技术的道路上尽我一份力量,欢迎在评论区向我提问,我都会一一解答,记得一键三连支持一下哦!

0 人点赞