文章目录
- 一、认证组件
- 1. 分析源码
- 2. 全局配置认证
- 二、自定义认证类
- 1. 代码实现
- 2. 接口测试
一、认证组件
1. 分析源码
通过分析源码了解认证组件的方法调用过程
APIView 的 dispatch 中使用 initial 方法实现初始化并进行三大认证,第一步就是认证组件
rest_framework/views.py
class APIView(View):
# ...
def initial(self, request, *args, **kwargs):
# ...
# 认证组件:校验用户
# 这里调用 perform_authentication 实现认证
self.perform_authentication(request)
# 权限组件:校验用户权限
self.check_permissions(request)
# 频率组件:限制视图接口被访问次数
self.check_throttles(request)
def perform_authentication(self, request):
"""
Perform authentication on the incoming request.
Note that if you override this and simply 'pass', then authentication
will instead be performed lazily, the first time either
`request.user` or `request.auth` is accessed.
"""
# 去 request 中调用 user 方法属性
request.user
request.user 去 request 中找 user 方法属性,找到认证方法实现过程
rest_framework/request.py
class Request:
@property
def user(self):
"""
Returns the user associated with the current request, as authenticated
by the authentication classes provided to the request.
"""
# 属性来源是 _属性名
if not hasattr(self, '_user'):
# 回收错误信息
with wrap_attributeerrors():
# 没用户,认证处用户
self._authenticate()
# 有用户,直接返回用户
return self._user
# 认证方法
def _authenticate(self):
"""
Attempt to authenticate the request using each authentication instance
in turn.
"""
# 遍历拿到认证器,进行认证
# self.authenticators,配置的一堆认证类产生的认证类对象组成的 list
for authenticator in self.authenticators:
# 该方法被 try 包裹,代表该方法会抛异常,抛异常代表认证失败
try:
# 认证器(对象)调用认证方法 authenticate(认证类对象self,request 请求对象)
# 返回值:登录的用户与认证的信息组成的 tuple
user_auth_tuple = authenticator.authenticate(self)
except exceptions.APIException:
# 非法用户
self._not_authenticated()
raise
# 处理返回值
if user_auth_tuple is not None:
self._authenticator = authenticator
# 合法用户
# 如果有返回值,就将登录用户与登录认证分别保存到 request.user request.auth
self.user, self.auth = user_auth_tuple
return
# 游客
# 如果返回值 user_auth_tuple,代表认证通过,但是没有登录用户和登录认证信息,代表游客
self._not_authenticated()
寻找 authenticators 如何定义
rest_framework/views.py
class APIView(View):
# The following policies may be set at either globally, or per-view.
# 配置认证类
authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES
def dispatch(self, request, *args, **kwargs):
"""
`.dispatch()` is pretty much the same as Django's regular dispatch,
but with extra hooks for startup, finalize, and exception handling.
"""
self.args = args
self.kwargs = kwargs
# 请求模块(解析模块)
request = self.initialize_request(request, *args, **kwargs)
self.request = request
self.headers = self.default_response_headers # deprecate?
try:
# 进入三大认证模块
self.initial(request, *args, **kwargs)
# Get the appropriate handler method
if request.method.lower() in self.http_method_names:
handler = getattr(self, request.method.lower(),
self.http_method_not_allowed)
else:
handler = self.http_method_not_allowed
response = handler(request, *args, **kwargs)
except Exception as exc:
response = self.handle_exception(exc)
self.response = self.finalize_response(request, response, *args, **kwargs)
return self.response
def initialize_request(self, request, *args, **kwargs):
"""
Returns the initial request object.
"""
parser_context = self.get_parser_context(request)
return Request(
request,
# 获取解析类
parsers=self.get_parsers(),
# 获取认证器
authenticators=self.get_authenticators(),
negotiator=self.get_content_negotiator(),
parser_context=parser_context
)
# 获取认证器
def get_authenticators(self):
"""
Instantiates and returns the list of authenticators that this view can use.
"""
# 实例化一堆认证类对象
return [auth() for auth in self.authentication_classes]
了解到认证器是通过一系列人证类对象实例化后定义
我们进去 SessionAuthentication 查看默认配置的认证类的实现
代码语言:javascript复制class SessionAuthentication(BaseAuthentication):
"""
Use Django's session framework for authentication.
"""
def authenticate(self, request):
"""
Returns a `User` if the request session currently has a logged in user.
Otherwise returns `None`.
"""
# 得到用户
user = getattr(request._request, 'user', None)
# Unauthenticated, CSRF validation not required
if not user or not user.is_active:
# 没有解析出,代表游客
return None
# 解析出用户后,要重新启用 csrf 认证
# 如果 csrf 认证失败,就出现异常,认证为非法用户
self.enforce_csrf(request)
# CSRF passed with authenticated user
# 认证为合法用户,没有返回认证信息
return (user, None)
class BasicAuthentication(BaseAuthentication):
"""
HTTP Basic authentication against username/password.
"""
www_authenticate_realm = 'api'
def authenticate(self, request):
"""
Returns a `User` if a correct username and password have been supplied
using HTTP Basic authentication. Otherwise returns `None`.
"""
# 获取认证信息:该认证信息是两段式(basic 认证字符串)
auth = get_authorization_header(request).split()
# 没有认证信息,认证为游客
if not auth or auth[0].lower() != b'basic':
return None
# 有认证信息,格式错误,认证为非法用户
if len(auth) == 1:
msg = _('Invalid basic header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid basic header. Credentials string should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)
try:
auth_parts = base64.b64decode(auth[1]).decode(HTTP_HEADER_ENCODING).partition(':')
except (TypeError, UnicodeDecodeError, binascii.Error):
msg = _('Invalid basic header. Credentials not correctly base64 encoded.')
raise exceptions.AuthenticationFailed(msg)
userid, password = auth_parts[0], auth_parts[2]
# 认证信息处理出用户主键和密码,进一步得到用户对象
return self.authenticate_credentials(userid, password, request)
def authenticate_credentials(self, userid, password, request=None):
"""
Authenticate the userid and password against username and password
with optional request for context.
"""
credentials = {
get_user_model().USERNAME_FIELD: userid,
'password': password
}
user = authenticate(request=request, **credentials)
# 如果没有该用户或非活跃用户,认证为非法用户
if user is None:
raise exceptions.AuthenticationFailed(_('Invalid username/password.'))
if not user.is_active:
raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
return (user, None)
def authenticate_header(self, request):
return 'Basic realm="%s"' % self.www_authenticate_realm
我们现在查看默认认证类配置定义位置
rest_framework/setting.py
DEFAULTS = {
# ...
# 默认认证类配置
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.SessionAuthentication',
'rest_framework.authentication.BasicAuthentication'
],
}
根据这个,现在就可以自定义项目配置文件
2. 全局配置认证
编写自己项目的settings.py
# 全局局部配置
REST_FRAMEWORK = {
# ...
# 配置默认认证类
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.SessionAuthentication',
'rest_framework.authentication.BasicAuthentication'
],
}
二、自定义认证类
可以看到以上默认的认证类,所有的规则都是固定的。尤其是做session认证时,会调用csrf,但是对于前后端分离的情况,这种规则并不友好。
1. 代码实现
authentications.py
from rest_framework.authentication import BasicAuthentication
from rest_framework.exceptions import AuthenticationFailed
from api import models
class MyAuthentication(BasicAuthentication):
# 重新 authenticate 方法,自定义认证规则
def authenticate(self, request):
# 认证规则要基于条件:
# 游客:无认证信息,返回 None
# 非法用户:有认证信息,认证失败,抛异常
# 合法用户:有认证信息,认证成功,返回元组
# 前台在请求头携带认证信息,且默认规范用 Authorization 字段携带认证信息
# 后端固定在请求对象的 META字典中 HTTP_AUTHORIZATION 获取
auth = request.META.get('HTTP_AUTHORIZATION', None
)
# 游客认证
if auth is None:
return None
# 设置认证字段规则(两端式):“auth 认证字符串”
auth_list = auth.split()
# 校验合法还是非法用户
if len(auth_list) != 2 or auth_list[0].lower() != 'auth':
raise AuthenticationFailed('The authentication information is incorrect! Illegal user')
# 合法的用户需要进一步从 auth_list[1] 解析
# 假设一种情况:信息为 abc.123.xyz 就可以解析 admin 用户
# 实际开发时,该逻辑一定需要校验用户
if auth_list[1] != 'abc.123.xyz':
raise AuthenticationFailed('User verification failed! Illegal user')
user = models.User.objects.filter(username='baimoc').first()
if not user:
raise AuthenticationFailed('User data is incorrect! Illegal user')
return (user, None)
settings.py
# 全局局部配置
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': [
'api.authentications.MyAuthentication'
],
}
views.py
from rest_framework.views import APIView
from rest_framework.generics import GenericAPIView
from rest_framework.viewsets import GenericViewSet, ViewSet
from utils.response import APIResponse
class LoginView(APIView):
def get(self, request, *args, **kwargs):
# 如果认证通过,request.user 就一定有值
# 游客:AnonymousUser
# 用户:User
return APIResponse(0, 'Login successful')
urls.py
from django.conf.urls import url
from api import views
urlpatterns = [
url(r'^login/$', views.LoginView.as_view()),
]