rest_framework组件之认证,权限,访问频率

共用的models

 1 from django.db import models
 2
 3
 4 # Create your models here.
 5
 6
 7 class User(models.Model):
 8     username = models.CharField(max_length=32)
 9     password = models.CharField(max_length=32)
10     user_type = models.IntegerField(choices=((1, ‘超级用户‘), (2, ‘普通用户‘), (3, ‘第三方用户‘)))
11
12
13 class UserToken(models.Model):
14     user = models.OneToOneField(to=‘User‘)
15     token = models.CharField(max_length=64)

urls

1 from django.conf.urls import url
2 from django.contrib import admin
3 from app01 import views
4 urlpatterns = [
5     url(r‘^admin/‘, admin.site.urls),
6     url(r‘^course/‘,views.Course.as_view()),
7     url(r‘^login/‘,views.Login.as_view()),
8     url(r‘^get_ip/‘,views.GetIp.as_view()),
9 ]

views

 1 from django.shortcuts import render, HttpResponse
 2
 3 # Create your views here.
 4
 5 import json
 6 from django.views import View
 7 from rest_framework.views import APIView
 8 from app01 import models
 9 from utils.common import *
10 from rest_framework.response import Response
11
12
13 class Login(APIView):
14     def post(self, request, *args, **kwargs):
15         response = MyResponse()
16         name = request.data.get(‘username‘)
17         pwd = request.data.get(‘password‘)
18         user = models.User.objects.filter(username=name, password=pwd).first()
19         if user:
20             token = get_token(name)
21             ret = models.UserToken.objects.update_or_create(user=user, defaults={‘token‘: token})
22             response.status = 100
23             response.msg = ‘登陆成功‘
24             response.token = token
25
26         else:
27             response.msg = ‘用户名或密码错误‘
28
29         return Response(response.get_dic())
30
31
32 class Course(APIView):
33     authentication_classes = [MyAuth, ]
34     permission_classes = [MyPermission, ]
35     throttle_classes = [VisitThrottle, ]
36
37     def get(self, request):
38         print(request.user)
39         print(request.auth)
40         return HttpResponse(json.dumps({‘name‘: ‘Python‘}))


认证组件   1 写一个认证类      from rest_framework.authentication import BaseAuthentication      class MyAuth(BaseAuthentication):         def authenticate(self,request):      #         request  是封装后的            token = request.query_params.get(‘token‘)            ret = models.UserToken.objects.filter(token=token).first()            if ret:      #             认证通过               return            else:               raise AuthenticationFailed(‘认证失败‘)         #可以不写了         def authenticate_header(self,ss):            pass   2 局部使用      authentication_classes=[MyAuth,MyAuth2]   3 全局使用      查找顺序:自定义的APIView里找---》项目settings里找---》内置默认的      REST_FRAMEWORK={         ‘DEFAULT_AUTHENTICATION_CLASSES‘:[‘utils.common.MyAuth‘,]

      }

认证:

 1 import hashlib
 2 import time
 3
 4
 5 class MyResponse():
 6     def __init__(self):
 7         self.status = 1001
 8         self.msg = None
 9
10     def get_dic(self):
11         return self.__dict__
12
13
14 # res = MyResponse()
15 #
16 # print(res.get_dic())
17
18 def get_token(name):
19     md = hashlib.md5()
20     md.update(name.encode(‘utf-8‘))
21     md.update(str(time.time()).encode(‘utf-8‘))
22
23     return md.hexdigest()  # 返回的是包含name和time的hash值
24
25
26 from rest_framework.authentication import BaseAuthentication
27 from app01 import models
28 from rest_framework.exceptions import APIException, AuthenticationFailed
29
30
31 class MyAuth(BaseAuthentication):
32     def authenticate(self, request):
33         token = request.query_params.get(‘token‘)
34         ret = models.UserToken.objects.filter(token=token).first()
35         if ret:
36
37             return ret.user,ret
38         else:
39
40             raise AuthenticationFailed("认证失败")

权限组件   1 写一个类      class MyPermission():         def has_permission(self,request,view):            token=request.query_params.get(‘token‘)            ret=models.UserToken.objects.filter(token=token).first()            if ret.user.type==2:            # 超级用户可以访问               return True            else:               return False   2 局部使用:      permission_classes=[MyPermission,]   3 全局使用:         REST_FRAMEWORK={         ‘DEFAULT_PERMISSION_CLASSES‘:[‘utils.common.MyPermission‘,]      }

权限组件:

 1 from rest_framework.permissions import BasePermission
 2
 3 class MyPermission(BasePermission):
 4     message = "不是超级用户,查看不了"
 5     def has_permission(self, request, view):
 6         token = request.query_params.get("token")
 7         ret=models.UserToken.objects.filter(token=token).first()
 8         print(ret.user.get_user_type_display())
 9         if ret.user.user_type == 1:
10             return True
11         else:
12             return False



频率组件   1 写一个类:      from rest_framework.throttling import SimpleRateThrottle      class VisitThrottle(SimpleRateThrottle):         scope = ‘xxx‘         def get_cache_key(self, request, view):            return self.get_ident(request)   2 在setting里配置:          ‘DEFAULT_THROTTLE_RATES‘:{            ‘xxx‘:‘5/h‘,         }   3 局部使用      throttle_classes=[VisitThrottle,]   4 全局使用      REST_FRAMEWORK={         ‘DEFAULT_THROTTLE_CLASSES‘:[‘utils.common.MyPermission‘,]      }访问频率组件:

1 from rest_framework.throttling import SimpleRateThrottle
2
3
4 class VisitThrottle(SimpleRateThrottle):
5     scope = ‘da peng ya‘
6
7     def get_cache_key(self, request, view):
8         return self.get_ident(request)


作业:1 认证类,不存数据库的token验证  token=‘idfjasfsadfas|userid‘

 1 def get_token(id,salt=‘123‘):
 2     import hashlib
 3     md=hashlib.md5()
 4     md.update(bytes(str(id),encoding=‘utf-8‘))
 5     md.update(bytes(salt,encoding=‘utf-8‘))
 6
 7     return md.hexdigest()+‘|‘+str(id)
 8
 9 def check_token(token,salt=‘123‘):
10     ll=token.split(‘|‘)
11     import hashlib
12     md=hashlib.md5()
13     md.update(bytes(ll[-1],encoding=‘utf-8‘))
14     md.update(bytes(salt,encoding=‘utf-8‘))
15     if ll[0]==md.hexdigest():
16         return True
17     else:
18         return False
19
20 class TokenAuth():
21     def authenticate(self, request):
22         token = request.GET.get(‘token‘)
23         success=check_token(token)
24         if success:
25             return
26         else:
27             raise AuthenticationFailed(‘认证失败‘)
28     def authenticate_header(self,request):
29         pass
30 class Login(APIView):
31     def post(self,reuquest):
32         back_msg={‘status‘:1001,‘msg‘:None}
33         try:
34             name=reuquest.data.get(‘name‘)
35             pwd=reuquest.data.get(‘pwd‘)
36             user=models.User.objects.filter(username=name,password=pwd).first()
37             if user:
38                 token=get_token(user.pk)
39                 # models.UserToken.objects.update_or_create(user=user,defaults={‘token‘:token})
40                 back_msg[‘status‘]=‘1000‘
41                 back_msg[‘msg‘]=‘登录成功‘
42                 back_msg[‘token‘]=token
43             else:
44                 back_msg[‘msg‘] = ‘用户名或密码错误‘
45         except Exception as e:
46             back_msg[‘msg‘]=str(e)
47         return Response(back_msg)
48 from rest_framework.authentication import BaseAuthentication
49 class TokenAuth():
50     def authenticate(self, request):
51         token = request.GET.get(‘token‘)
52         token_obj = models.UserToken.objects.filter(token=token).first()
53         if token_obj:
54             return
55         else:
56             raise AuthenticationFailed(‘认证失败‘)
57     def authenticate_header(self,request):
58         pass
59
60 class Course(APIView):
61     authentication_classes = [TokenAuth, ]
62
63     def get(self, request):
64         return HttpResponse(‘get‘)
65
66     def post(self, request):
67         return HttpResponse(‘post‘)


2 自己写一个每分钟限制三次访问的频率类 ip:拿到ip  全局变量:{ip:[时间1,时间2,时间3]}

获取IP
1     def get(self, request):
2         x_forwarded_for = request.META.get(‘HTTP_X_FORWARDED_FOR‘)
3         if x_forwarded_for:
4             ip = x_forwarded_for.split(‘,‘)[0]  # 所以这里是真实的ip
5         else:
6             ip = request.META.get(‘REMOTE_ADDR‘)  # 这里获得代理ip
7         return ip

待补充。。



原文地址:https://www.cnblogs.com/Roc-Atlantis/p/9833238.html

时间: 2024-10-08 11:29:12

rest_framework组件之认证,权限,访问频率的相关文章

认证 权限 视图 频率

认证组件 使用:写一个认证类,继承BaseAuthentication 在类中写authenticate方法,把request对象传入 能从request对象中取出用户携带的token根据token判断是否登录过 如果登录过,返回两个值 user对象 ,token对象(或者其他自定义的对象) 如果没有登录过抛异常 from rest_framework.authentication import BaseAuthentication from app01 import models from r

认证、权限、频率、自定义签发token-多方式登录

目录 三大认证流程图 路由配置 认证组件 权限组件 自定义权限类 配置drf自带的权限类 drf-jwt 签发token源码分析 多方式登录 签发token VIP用户认证权限例子 频率组件 自定义频率类 三大认证流程图 路由配置 在应用下新建文件router.py # router.py from rest_framework.routers import Route, DynamicRoute, SimpleRouter as DRFSimpleRouter class SimpleRout

DRF的权限和频率

DRF的权限 权限组件源码 权限和频率以及版本认证都是在initial方法里初始化的 我们的权限类一定要有has_permission方法~否则就会抛出异常~~这也是框架给我提供的钩子~~ 在rest_framework.permissions这个文件中~存放了框架给我们提供的所有权限的方法~~ 主要说下BasePermission 这个是我们写权限类继承的一个基础权限类~~~ 权限的详细用法 initial方法在初始化的时候是有顺序的:版本-->权限-->频率 写权限类 class MyPe

REST framework (组件使用之认证、权限、访问频率)

目录 一.认证 二.权限 三.限制访问频率 四.总结 一.认证(补充的一个点) 认证请求头 1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from rest_framework.views import APIView 4 from rest_framework.response import Response 5 from rest_framework.authentication import BaseAuthentication 6

Django Restful Framework【第三篇】认证、权限、限制访问频率

一.认证 认证请求头 views.py #!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.authentication import BaseAuthentication from rest_framework.permissions import

restful(3):认证、权限、频率

models.py中: class UserInfo(models.Model): name = models.CharField(max_length=32) psw = models.CharField(max_length=32) user_type_choices = ((1,"普通"),(2,"VIP"),(3,"SVIP")) user_type = models.SmallIntegerField(choices=user_type

限流(访问频率)组件的使用

目录 限流(访问频率)组件的使用 限流(访问频率)组件的使用 框架中限流功能的实现依赖于封装好的限流类,设置方式上分为全局设置和局部设置.继承类设置和自定义类设置.如果要实现限流功能则必须设置DEAFULRT_THROTTLE_CLASSES和DEAFULRT_THROTTLE_RATES 自定义限流类 限流类的基本思路: 限流需要知道访问者的本地IP地址,通过地址才能进行记录访问次数: 限流需要了解访问者访问的时间,通过记录时间和次数来限制规定时间内访问者可以访问的频次: 拿到IP地址后,创建

rest_framework之访问频率详解

访问频率(节流) 1.某个用户一分钟之内访问的次数不能超过3次,超过3次则不能访问了,需要等待,过段时间才能再访问. 2.自定义访问频率.两个方法都必须写上. 登入页面的视图加上访问频率 3.返回值False,则不能访问 4.返回值True,则能访问 上面的节流太简单粗暴了,接下来加上一些判断.先获取用户IP 节流源码 1. 2.remote_addr = request.META.get('REMOTE_ADDR')  获取IP地址 3.获取IP地址 原文地址:https://www.cnbl

Django rest framework 限制访问频率(源码分析三)

基于 当用发出请求时 首先执行dispatch函数,当执行当第二部时: #2.处理版本信息 处理认证信息 处理权限信息 对用户的访问频率进行限制 self.initial(request, *args, **kwargs) 进入到initial方法: def initial(self, request, *args, **kwargs): """ Runs anything that needs to occur prior to calling the method han