DRF权限和频率限制

权限

views

from rest_framework.permissions import BasePermission
from rest_framework import exceptions
# 权限类操作
class MyPermission(BasePermission):
   # 也是一种报错,返回给前端
    message = {'code': 10001, 'error': '你没权限'}
    def has_permission(self, request, view):
        """
        Return `True` if permission is granted, `False` otherwise.
        """
        if request.user:
            return True
         # 报错 ,返回给前端
        # raise exceptions.PermissionDenied({'code': 10001, 'error': '你没权限'})
        return False

    def has_object_permission(self, request, view, obj):
        """
        Return `True` if permission is granted, `False` otherwise.
        """
        return False

class OrderView(APIView):
    # 局部配置:权限类
    permission_classes = [MyPermission,]
    def get(self,request,*args,**kwargs):
        return Response('order')

class UserView(APIView):
   # 局部配置:权限类
    permission_classes = [MyPermission, ]
    def get(self,request,*args,**kwargs):
        return Response('user')

settings

REST_FRAMEWORK = {
    "PAGE_SIZE":2, # 分页
    "DEFAULT_PAGINATION_CLASS":"rest_framework.pagination.PageNumberPagination",# 分页
    "DEFAULT_VERSIONING_CLASS":"rest_framework.versioning.URLPathVersioning",# 版本
    "ALLOWED_VERSIONS":['v1','v2'],# 版本
    'VERSION_PARAM':'version',# 版本
    "DEFAULT_AUTHENTICATION_CLASSES":["kka.auth.TokenAuthentication",]# 认证
    # 全局配置:权限
    #"DEFAULT_PERMISSION_CLASSES" : ["api.extension.permission.LuffyPermission"],

}

权限源码分析

class APIView(View):
    permission_classes = api_settings.DEFAULT_PERMISSION_CLASSES

    def dispatch(self, request, *args, **kwargs):
        封装request对象
        self.initial(request, *args, **kwargs)
        通过反射执行视图中的方法

    def initial(self, request, *args, **kwargs):
        版本的处理
        # 认证
        self.perform_authentication(request)

        # 权限判断
        self.check_permissions(request)

        self.check_throttles(request)

    def perform_authentication(self, request):
        request.user

    def check_permissions(self, request):
        # [对象,对象,]
        for permission in self.get_permissions():
            if not permission.has_permission(request, self):
                self.permission_denied(request, message=getattr(permission, 'message', None))
    def permission_denied(self, request, message=None):
        if request.authenticators and not request.successful_authenticator:
            raise exceptions.NotAuthenticated()
        # 报错,将权限类中定义的message返回给前端
        raise exceptions.PermissionDenied(detail=message)

    def get_permissions(self):
        return [permission() for permission in self.permission_classes]

class UserView(APIView):
   # 局部配置:权限
    permission_classes = [MyPermission, ]

    def get(self,request,*args,**kwargs):
        return Response('user')

频率

settings

REST_FRAMEWORK = {
    'DEFAULT_VERSIONING_CLASS':"rest_framework.versioning.URLPathVersioning",
    "ALLOWED_VERSIONS":['v1',],
    "DEFAULT_THROTTLE_RATES":{
       # scope默认位anon
        "anon":'3/m' # 3表示频率,m表示时间:60s
       # 源码中parse_rate()方法
       # num, period = rate.split('/')
       # num_requests = int(num)
       # duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
    }
}

views

from rest_framework.throttling import AnonRateThrottle
class ArticleDetailView(APIView):
    throttle_classes = [AnonRateThrottle, ]
    def get(self,request,*args,**kwargs):
        return Response('文章列表')

频率限制在认证、权限之后

知识点

{
    throttle_anon_1.1.1.1:[100121340,],
    1.1.1.2:[100121251,100120450,]
}

限制:60s能访问3次
来访问时:
    1.获取当前时间 100121280
    2.100121280-60 = 100121220,小于100121220所有记录删除
    3.判断1分钟以内已经访问多少次了? 4
    4.无法访问
停一会
来访问时:
    1.获取当前时间 100121340
    2.100121340-60 = 100121280,小于100121280所有记录删除
    3.判断1分钟以内已经访问多少次了? 0
    4.可以访问

源码

from rest_framework.views import APIView
from rest_framework.response import Response

from rest_framework.throttling import AnonRateThrottle,BaseThrottle

class ArticleView(APIView):
    throttle_classes = [AnonRateThrottle,]
    def get(self,request,*args,**kwargs):
        return Response('文章列表')

class ArticleDetailView(APIView):
    def get(self,request,*args,**kwargs):
        return Response('文章列表')
class BaseThrottle:
    """
    Rate throttling of requests.
    """

    def allow_request(self, request, view):
        """
        Return `True` if the request should be allowed, `False` otherwise.
        """
        raise NotImplementedError('.allow_request() must be overridden')

    def get_ident(self, request):
        """
        Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR
        if present and number of proxies is > 0. If not use all of
        HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR.
        """
        xff = request.META.get('HTTP_X_FORWARDED_FOR')
        remote_addr = request.META.get('REMOTE_ADDR')
        num_proxies = api_settings.NUM_PROXIES

        if num_proxies is not None:
            if num_proxies == 0 or xff is None:
                return remote_addr
            addrs = xff.split(',')
            client_addr = addrs[-min(num_proxies, len(addrs))]
            return client_addr.strip()

        return ''.join(xff.split()) if xff else remote_addr

    def wait(self):
        """
        Optionally, return a recommended number of seconds to wait before
        the next request.
        """
        return None

class SimpleRateThrottle(BaseThrottle):
    """
    A simple cache implementation, that only requires `.get_cache_key()`
    to be overridden.

    The rate (requests / seconds) is set by a `rate` attribute on the View
    class.  The attribute is a string of the form 'number_of_requests/period'.

    Period should be one of: ('s', 'sec', 'm', 'min', 'h', 'hour', 'd', 'day')

    Previous request information used for throttling is stored in the cache.
    """
    cache = default_cache
    timer = time.time
    cache_format = 'throttle_%(scope)s_%(ident)s'
    scope = None
    THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES

    def __init__(self):
        if not getattr(self, 'rate', None):
            self.rate = self.get_rate()
        self.num_requests, self.duration = self.parse_rate(self.rate)

    def get_cache_key(self, request, view):
        """
        Should return a unique cache-key which can be used for throttling.
        Must be overridden.

        May return `None` if the request should not be throttled.
        """
        raise NotImplementedError('.get_cache_key() must be overridden')

    def get_rate(self):
        """
        Determine the string representation of the allowed request rate.
        """
        if not getattr(self, 'scope', None):
            msg = ("You must set either `.scope` or `.rate` for '%s' throttle" %
                   self.__class__.__name__)
            raise ImproperlyConfigured(msg)

        try:
            # 拿scope,自定义类中没有,就去settings中那
            return self.THROTTLE_RATES[self.scope]
        except KeyError:
            msg = "No default throttle rate set for '%s' scope" % self.scope
            raise ImproperlyConfigured(msg)

    def parse_rate(self, rate):
        """
        Given the request rate string, return a two tuple of:
        <allowed number of requests>, <period of time in seconds>
        """
        if rate is None:
            return (None, None)
        num, period = rate.split('/')
        num_requests = int(num)
        duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
        return (num_requests, duration)

    def allow_request(self, request, view):
        """
        Implement the check to see if the request should be throttled.

        On success calls `throttle_success`.
        On failure calls `throttle_failure`.
        """
        if self.rate is None:
            return True

        # 获取请求用户的IP
        self.key = self.get_cache_key(request, view)
        if self.key is None:
            return True

        # 根据IP获取他的所有访问记录,[]
        self.history = self.cache.get(self.key, [])

        self.now = self.timer()

        # Drop any requests from the history which have now passed the
        # throttle duration
        while self.history and self.history[-1] <= self.now - self.duration:
            self.history.pop()
        if len(self.history) >= self.num_requests:
            return self.throttle_failure()
        return self.throttle_success()

    def throttle_success(self):
        """
        Inserts the current request's timestamp along with the key
        into the cache.
        """
        self.history.insert(0, self.now)
        self.cache.set(self.key, self.history, self.duration)
        return True

    def throttle_failure(self):
        """
        Called when a request to the API has failed due to throttling.
        """
        return False

    def wait(self):
        """
        Returns the recommended next request time in seconds.
        """
        if self.history:
            remaining_duration = self.duration - (self.now - self.history[-1])
        else:
            remaining_duration = self.duration

        available_requests = self.num_requests - len(self.history) + 1
        if available_requests <= 0:
            return None

        return remaining_duration / float(available_requests)

class AnonRateThrottle(SimpleRateThrottle):
    """
    Limits the rate of API calls that may be made by a anonymous users.

    The IP address of the request will be used as the unique cache key.
    """
    scope = 'anon'

    def get_cache_key(self, request, view):
        if request.user.is_authenticated:
            return None  # Only throttle unauthenticated requests.

        return self.cache_format % {
            'scope': self.scope,
            'ident': self.get_ident(request)
        }

总结

  1. 如何实现的评率限制

    - 匿名用户,用IP作为用户唯一标记,但如果用户换代理IP,无法做到真正的限制。
    - 登录用户,用用户名或用户ID做标识。
    具体实现:
     在django的缓存中 = {
            throttle_anon_1.1.1.1:[100121340,],
            1.1.1.2:[100121251,100120450,]
        }
    
        限制:60s能访问3次
        来访问时:
            1.获取当前时间 100121280
            2.100121280-60 = 100121220,小于100121220所有记录删除
            3.判断1分钟以内已经访问多少次了? 4
            4.无法访问
        停一会
        来访问时:
            1.获取当前时间 100121340
            2.100121340-60 = 100121280,小于100121280所有记录删除
            3.判断1分钟以内已经访问多少次了? 0
            4.可以访问
    

原文地址:https://www.cnblogs.com/zhuzhizheng/p/12063705.html

时间: 2024-07-31 16:07:45

DRF权限和频率限制的相关文章

DRF的权限和频率

DRF的权限 权限组件源码 权限和频率以及版本认证都是在initial方法里初始化的 我们的权限类一定要有has_permission方法~否则就会抛出异常~~这也是框架给我提供的钩子~~ 在rest_framework.permissions这个文件中~存放了框架给我们提供的所有权限的方法~~ 主要说下BasePermission 这个是我们写权限类继承的一个基础权限类~~~ 权限的详细用法 initial方法在初始化的时候是有顺序的:版本-->权限-->频率 写权限类 class MyPe

DRF 权限 频率

DRF的权限 权限是什么 大家之前都应该听过权限~那么我们权限到底是做什么用的呢~~ 大家都有博客~或者去一些论坛~一定知道管理员这个角色~ 比如我们申请博客的时候~一定要向管理员申请~也就是说管理员会有一些特殊的权利~是我们没有的~~ 这些对某件事情决策的范围和程度~我们叫做权限~~权限是我们在项目开发中非常常用到的~~ 那我们看DRF框架给我们提供的权限组件都有哪些方法~~ 权限组件源码 我们之前说过了DRF的版本和认证~也知道了权限和频率跟版本认证都是在initial方法里初始化的~~ 其

DRF 权限的流程

DRF 权限的流程 django rest framework,入口是 dispatch,然后依次 --->>封装请求--->>处理版本--->>>认证--->>>权限--->>>限制访问频率 (1)auth需要通过token唯一标识来认证 (2)通过auth认证后得到,用户user信息,但是没有admin的权限 权限用来做进一步做职责的划分 代码 class MyPermission(object): message = '无

【DRF权限】 -- 2019-08-09 12:08:24

目录 权限的详细用法 原文: http://106.13.73.98/__/66/ 我们都听过权限,那么权限到底是做什么的呢. 我们都有博客,或者去一些论坛,一定知道管理员这个角色, 比如我们申请博客的时候,一定要向管理员申请,也就是说管理员会有一些特殊的权利,是我们没有的. ==这些对某件事情决策的范围和程度,我们叫做权限==,权限是我们在项目开发中经常用到的. 本文将详细讲述DRF框架为我们提供的权限组件的使用方法. @ * 源码剖析** DRF的版本控制.认证.权限.频率组件都在initi

【DRF权限】

目录 权限的详细用法 "我们都听过权限,那么权限到底是做什么的呢. 我们都有博客,或者去一些论坛,一定知道管理员这个角色, 比如我们申请博客的时候,一定要向管理员申请,也就是说管理员会有一些特殊的权利,是我们没有的. ==这些对某件事情决策的范围和程度,我们叫做权限==,权限是我们在项目开发中经常用到的. 本文将详细讲述DRF框架为我们提供的权限组件的使用方法. @ * 源码剖析** DRF的版本控制.认证.权限.频率组件都在initial方法里初始化. 我们点进去看看: 其实我们版本.认证.权

认证、权限、频率、自定义签发token-多方式登录

目录 三大认证流程图 路由配置 认证组件 权限组件 自定义权限类 配置drf自带的权限类 drf-jwt 签发token源码分析 多方式登录 签发token VIP用户认证权限例子 频率组件 自定义频率类 三大认证流程图 路由配置 在应用下新建文件router.py # router.py from rest_framework.routers import Route, DynamicRoute, SimpleRouter as DRFSimpleRouter class SimpleRout

认证 权限 视图 频率

认证组件 使用:写一个认证类,继承BaseAuthentication 在类中写authenticate方法,把request对象传入 能从request对象中取出用户携带的token根据token判断是否登录过 如果登录过,返回两个值 user对象 ,token对象(或者其他自定义的对象) 如果没有登录过抛异常 from rest_framework.authentication import BaseAuthentication from app01 import models from r

04,认证、权限、频率

.wiz-editor-body .wiz-code-container { position: relative; padding: 8px 0; margin: 5px 25px 5px 5px; text-indent: 0; text-align: left } .CodeMirror { font-family: Consolas, "Liberation Mono", Menlo, Courier, monospace; color: black; font-size: 0

DjangoRestFramework之认证组件,权限组件,频率组件

一 . 认证组件 我们现在知道的认证机制有cookie, session,session比较安全,session的信息是存在服务器上的,如果用户量足够大的话,服务器的压力也就比较大,并且django的session存到了django_session表中,不是很好操作,现在生产中使用的一个叫做token机制的方式比较多.可以使用这个机制把token存到用户信息里,当用户登录成功的时候放里面,等他来的时候就要带着这个来,而且们每次来的时候都更新token,防止被拦截. 我们这里使用一下token,首