抽屉之Tornado实战(7)--form表单验证

在这里,我们把form表单验证的代码进行工具化了,以后稍微修改一下参数就可以拿来用了

  先贴上代码

forms.py

from backend.form import fields

class BaseForm:

    def __init__(self):
        self._value_dict = {}
        self._error_dict = {}
        self._valid_status = True

    def valid(self, handler):

        for field_name, field_obj in self.__dict__.items():
            if field_name.startswith(‘_‘):
                continue

            if type(field_obj) == fields.CheckBoxField:
                post_value = handler.get_arguments(field_name, None)
            elif type(field_obj) == fields.FileField:
                post_value = []
                file_list = handler.request.files.get(field_name, [])
                for file_item in file_list:
                    post_value.append(file_item[‘filename‘])
            else:
                post_value = handler.get_argument(field_name, None)

            field_obj.match(field_name, post_value)
            if field_obj.is_valid:
                self._value_dict[field_name] = field_obj.value
            else:
                self._error_dict[field_name] = field_obj.error
                self._valid_status = False
        return self._valid_status

fields.py

import re
import os

class Field:

    def __init__(self):

        self.is_valid = False
        self.name = None
        self.value = None
        self.error = None

    def match(self, name, value):
        self.name = name

        if not self.required:
            self.is_valid = True
            self.value = value
        else:
            if not value:
                if self.custom_error_dict.get(‘required‘, None):
                    self.error = self.custom_error_dict[‘required‘]
                else:
                    self.error = "%s is required" % name
            else:
                ret = re.match(self.REGULAR, value)
                if ret:
                    self.is_valid = True
                    self.value = value
                else:
                    if self.custom_error_dict.get(‘valid‘, None):
                        self.error = self.custom_error_dict[‘valid‘]
                    else:
                        self.error = "%s is invalid" % name

class StringField(Field):

    REGULAR = "^.*$"

    def __init__(self, custom_error_dict=None, required=True):

        self.custom_error_dict = {}  # {‘required‘: ‘IP不能为空‘, ‘valid‘: ‘IP格式错误‘}
        if custom_error_dict:
            self.custom_error_dict.update(custom_error_dict)

        self.required = required

        super(StringField, self).__init__()

class IPField(Field):

    REGULAR = "^(25[0-5]|2[0-4]\d|[0-1]?\d?\d)(\.(25[0-5]|2[0-4]\d|[0-1]?\d?\d)){3}$"

    def __init__(self, custom_error_dict=None, required=True):

        self.custom_error_dict = {}  # {‘required‘: ‘IP不能为空‘, ‘valid‘: ‘IP格式错误‘}
        if custom_error_dict:
            self.custom_error_dict.update(custom_error_dict)

        self.required = required
        super(IPField, self).__init__()

class EmailField(Field):

    REGULAR = "^\w+([-+.‘]\w+)*@\w+([-.]\w+)*\.\w+([-.]\w+)*$"

    def __init__(self, custom_error_dict=None, required=True):

        self.custom_error_dict = {}  # {‘required‘: ‘IP不能为空‘, ‘valid‘: ‘IP格式错误‘}
        if custom_error_dict:
            self.custom_error_dict.update(custom_error_dict)

        self.required = required
        super(EmailField, self).__init__()

class IntegerField(Field):

    REGULAR = "^\d+$"

    def __init__(self, custom_error_dict=None, required=True):

        self.custom_error_dict = {}  # {‘required‘: ‘IP不能为空‘, ‘valid‘: ‘IP格式错误‘}
        if custom_error_dict:
            self.custom_error_dict.update(custom_error_dict)

        self.required = required
        super(IntegerField, self).__init__()

class CheckBoxField(Field):

    REGULAR = "^\d+$"

    def __init__(self, custom_error_dict=None, required=True):

        self.custom_error_dict = {}  # {‘required‘: ‘IP不能为空‘, ‘valid‘: ‘IP格式错误‘}
        if custom_error_dict:
            self.custom_error_dict.update(custom_error_dict)

        self.required = required
        super(CheckBoxField, self).__init__()

    def match(self, name, value):
        self.name = name

        if not self.required:
            self.is_valid = True
            self.value = value
        else:
            if not value:
                if self.custom_error_dict.get(‘required‘, None):
                    self.error = self.custom_error_dict[‘required‘]
                else:
                    self.error = "%s is required" % name
            else:
                if isinstance(name, list):
                    self.is_valid = True
                    self.value = value
                else:
                    if self.custom_error_dict.get(‘valid‘, None):
                        self.error = self.custom_error_dict[‘valid‘]
                    else:
                        self.error = "%s is invalid" % name

class FileField(Field):

    REGULAR = "^(\w+\.pdf)|(\w+\.mp3)|(\w+\.py)$"

    def __init__(self, custom_error_dict=None, required=True):

        self.custom_error_dict = {}  # {‘required‘: ‘IP不能为空‘, ‘valid‘: ‘IP格式错误‘}
        if custom_error_dict:
            self.custom_error_dict.update(custom_error_dict)

        self.required = required

        super(FileField, self).__init__()

    def match(self, name, file_name_list):
        flag = True
        self.name = name

        if not self.required:
            self.is_valid = True
            self.value = file_name_list
        else:
            if not file_name_list:
                if self.custom_error_dict.get(‘required‘, None):
                    self.error = self.custom_error_dict[‘required‘]
                else:
                    self.error = "%s is required" % name
                flag = False
            else:
                for file_name in file_name_list:
                    if not file_name or not file_name.strip():
                        if self.custom_error_dict.get(‘required‘, None):
                            self.error = self.custom_error_dict[‘required‘]
                        else:
                            self.error = "%s is required" % name
                        flag = False
                        break
                    else:
                        ret = re.match(self.REGULAR, file_name)
                        if not ret:
                            if self.custom_error_dict.get(‘valid‘, None):
                                self.error = self.custom_error_dict[‘valid‘]
                            else:
                                self.error = "%s is invalid" % name
                            flag = False
                            break

            self.is_valid = flag

    def save(self, request, upload_to=""):

        file_metas = request.files[self.name]
        for meta in file_metas:
            file_name = meta[‘filename‘]
            file_path_name = os.path.join(upload_to, file_name)
            with open(file_path_name, ‘wb‘) as up:
                up.write(meta[‘body‘])

        upload_file_path_list = map(lambda path: os.path.join(upload_to, path), self.value)
        self.value = list(upload_file_path_list)

在form.py这个文件,做了一件什么事呢?代码就定义了父类,主要是判断要验证内容的类型,然后取值,然后又调用了在fields.py里类的match方法,最后把验证后结果信息返回(vaild方法)。

  而在fields.py文件里,主要是对为空检测,合法性检测,并把检测结果返回给form.py的vaild方法里(match方法)

首先

  • form组件只做为空检测,合法性检测,并没做超时检测,内部可获取验证状态_valid_status--True/False,错误信息_error_dict,验证通过时的用户数据_value_dict

  • 验证类型:字符串,IP,邮箱,数字,复选框,文件

再者,怎么用?

  • 分析你的应用场景,需要对哪几个类型进行验证,定义一个类,把需要的验证类型写入到构造方法里,记得继承一下BaseForm类,并且继承一下父类的构造方法

  • 在构造方法里,实例Field对象时,可以传入自定制错误类型信息custom_error_dict,required=False可为空设置

    from backend.form.forms import BaseForm
    from backend.form.fields import StringField
    from backend.form.fields import IntegerField
    from backend.form.fields import EmailField
    
    class SendMsgForm(BaseForm):
    
        def __init__(self):
            self.email = EmailField(custom_error_dict={‘required‘: ‘注册邮箱不能为空.‘, ‘valid‘: ‘注册邮箱格式错误.‘})
    
            super(SendMsgForm, self).__init__()
    
    class RegisterForm(BaseForm):
    
        def __init__(self):
            self.username = StringField()
            self.email = EmailField()
            self.password = StringField()
            self.email_code = StringField()
    
            super(RegisterForm, self).__init__()
    
    class LoginForm(BaseForm):
    
        def __init__(self):
            self.user = StringField()
            self.pwd = StringField()
            self.code = StringField()
    
            super(LoginForm, self).__init__()

    最后

    • 在post方法里,调用一下form对象的vaild方法(把handler对象,也就是self传入),接下来只要根据form对象里检测完后的信息进行相应的操作

import io
import datetime
import json
from backend.utils import check_code
from backend.core.request_handler import BaseRequestHandler
from forms import account
from backend.utils.response import BaseResponse
from backend import commons
from models import chouti_orm as ORM
from sqlalchemy import and_, or_

class CheckCodeHandler(BaseRequestHandler):
    def get(self, *args, **kwargs):
        stream = io.BytesIO()
        img, code = check_code.create_validate_code()
        img.save(stream, "png")
        self.session["CheckCode"] = code
        self.write(stream.getvalue())

class LoginHandler(BaseRequestHandler):
    def post(self, *args, **kwargs):
        #对象里有self.status=False,self.data=None,self.summary=None,self.message={}
        rep = BaseResponse()
        form = account.LoginForm()
        if form.valid(self):
            if form._value_dict[‘code‘].lower() != self.session["CheckCode"].lower():
                rep.message = {‘code‘: ‘验证码错误‘}
                self.write(json.dumps(rep.__dict__))
                return
            conn = ORM.session()
            obj = conn.query(ORM.UserInfo).filter(
                or_(
                    and_(ORM.UserInfo.email == form._value_dict[‘user‘],
                         ORM.UserInfo.password == form._value_dict[‘pwd‘]),
                    and_(ORM.UserInfo.username == form._value_dict[‘user‘],
                         ORM.UserInfo.password == form._value_dict[‘pwd‘])
                )).first()
            if not obj:
                rep.message = {‘user‘: ‘用户名邮箱或密码错误‘}
                self.write(json.dumps(rep.__dict__))
                return

            self.session[‘is_login‘] = True
            self.session[‘user_info‘] = obj.__dict__
            rep.status = True
        else:
            rep.message = form._error_dict
        self.write(json.dumps(rep.__dict__))

class RegisterHandler(BaseRequestHandler):
    def post(self, *args, **kwargs):
        rep = BaseResponse()
        form = account.RegisterForm()
        if form.valid(self):
            current_date = datetime.datetime.now()
            limit_day = current_date - datetime.timedelta(minutes=1)
            conn = ORM.session()
            is_valid_code = conn.query(ORM.SendMsg).filter(ORM.SendMsg.email == form._value_dict[‘email‘],
                                                           ORM.SendMsg.code == form._value_dict[‘email_code‘],
                                                           ORM.SendMsg.ctime > limit_day).count()
            if not is_valid_code:
                rep.message[‘email_code‘] = ‘邮箱验证码不正确或过期‘
                self.write(json.dumps(rep.__dict__))
                return
            has_exists_email = conn.query(ORM.UserInfo).filter(ORM.UserInfo.email == form._value_dict[‘email‘]).count()
            if has_exists_email:
                rep.message[‘email‘] = ‘邮箱已经存在‘
                self.write(json.dumps(rep.__dict__))
                return
            has_exists_username = conn.query(ORM.UserInfo).filter(
                ORM.UserInfo.username == form._value_dict[‘username‘]).count()
            if has_exists_username:
                rep.message[‘email‘] = ‘用户名已经存在‘
                self.write(json.dumps(rep.__dict__))
                return
            form._value_dict[‘ctime‘] = current_date
            form._value_dict.pop(‘email_code‘)
            obj = ORM.UserInfo(**form._value_dict)
            conn.add(obj)
            conn.query(ORM.SendMsg).filter_by(email=form._value_dict[‘email‘]).delete()
            conn.commit()
            self.session[‘is_login‘] = True
            self.session[‘user_info‘] = obj.__dict__
            rep.status = True

        else:
            rep.message = form._error_dict

        self.write(json.dumps(rep.__dict__))

class SendMsgHandler(BaseRequestHandler):
    def post(self, *args, **kwargs):
        rep = BaseResponse()
        form = account.SendMsgForm()
        if form.valid(self):
            email = form._value_dict[‘email‘]
            conn = ORM.session()

            has_exists_email = conn.query(ORM.UserInfo).filter(ORM.UserInfo.email == form._value_dict[‘email‘]).count()
            if has_exists_email:
                rep.summary = "此邮箱已经被注册"
                self.write(json.dumps(rep.__dict__))
                return
            current_date = datetime.datetime.now()
            code = commons.random_code()

            count = conn.query(ORM.SendMsg).filter_by(**form._value_dict).count()
            if not count:
                insert = ORM.SendMsg(code=code,
                                     email=email,
                                     ctime=current_date)
                conn.add(insert)
                conn.commit()
                rep.status = True
            else:
                limit_day = current_date - datetime.timedelta(hours=1)
                times = conn.query(ORM.SendMsg).filter(ORM.SendMsg.email == email,
                                                       ORM.SendMsg.ctime > limit_day,
                                                       ORM.SendMsg.times >= 10,
                                                       ).count()
                if times:
                    rep.summary = "‘已经超过今日最大次数(1小时后重试)‘"
                else:
                    unfreeze = conn.query(ORM.SendMsg).filter(ORM.SendMsg.email == email,
                                                              ORM.SendMsg.ctime < limit_day).count()
                    if unfreeze:
                        conn.query(ORM.SendMsg).filter_by(email=email).update({"times": 0})

                    conn.query(ORM.SendMsg).filter_by(email=email).update({"times": ORM.SendMsg.times + 1,
                                                                           "code": code,
                                                                           "ctime": current_date},
                                                                          synchronize_session="evaluate")
                    conn.commit()
                    rep.status = True
        else:
            rep.summary = form._error_dict[‘email‘]
        self.write(json.dumps(rep.__dict__))

原文地址:https://www.cnblogs.com/zcok168/p/9797711.html

时间: 2024-11-07 17:11:31

抽屉之Tornado实战(7)--form表单验证的相关文章

看用Tornado如何自定义实现表单验证

我们知道,平时在登陆某个网站或软件时,网站对于你输入的内容是有要求的,并且会对你输入的错误内容有提示,对于Django这种大而全的web框架,是提供了form表单验证功能,但是对于Tornado而言,就没有这功能,所以就需要我们来自己自定义form表单验证,而且这种方法正是Django里的form表单验证的实质内容,也帮我们在后面学习Django理解相关的源码. 写之前,我们必须知道form表单验证的实质是什么? 实质就是正则匹配 我们知道用户提交数据是通过post方式提交,所以我们重写post

python_way day14 HTML-day5 (form表单验证,)

python-way day19 1. dJango的form表单验证 一,django表单验证功能 1.django验证基础: <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>django form</title> </head> <body> <div> <i

用jquery写自己的form表单验证

这几天看了锋利的jquery,感觉很不错.特别是jquery强大的选择器.今天就利用jquery写了一个自己的form表单验证的小案例.当巩固下jquery.首先贴下代码,当然只是一个小案例. 思路:   1.<input type="text" Validate="Date" id="date"/>这里的 Validate:是我们需要验证的类型(属于日期类型),这里你也可以自己定义.id属性就不用说了.<input type=

django之form表单验证

django中的Form一般有两种功能: 输入html 验证用户输入 #!/usr/bin/env python # -*- coding:utf-8 -*- import re from django import forms from django.core.exceptions import ValidationError def mobile_validate(value): mobile_re = re.compile(r'^(13[0-9]|15[012356789]|17[678]

Python自动化运维系列之Django Form表单验证

Form表单验证 Django核心功能组件之一,虽然也可以在前端使用JS对表单验证, 但是Django中已经为我们准备好的Form功能还算是很强大的,有时候比较适合运维,为我们简化了很多前端开发工作. Django最主要的几个功能有4个     ·  生成HTML标签     ·  验证数据(表单错误信息提示)     ·  HTML 表单保留上次提交数据     ·  初始化页面表单内容 Django的Form内容还是挺多的,我们可以从一个简单的登陆验证例子来看看Form的基本功能使用 1)新

Django基础之Form表单验证

Form表单验证 1.创建Form类(本质就是正则表达式的集合) from django.forms import Form from django.forms import fields from django.forms import widgets from Mybbs.models import * import re class UserForm(Form): username = fields.CharField( required=True, error_messages={'re

Django中的Form表单验证

回忆一下Form表单验证的逻辑: 前端有若干个input输入框,将用户输入内容,以字典传递给后端. 后端预先存在一个Form表单验证的基类,封装了一个检测用户输入是否全部通过的方法.该方法会先定义好错误信息的字典,并会遍历类的所有属性(对应前端待验证的输入域),调用各自的验证方法,将错误信息(两类,必要与否以及格式正确与否)存入字典,并得出最终的验证结果.在使用时,需要定义继承自Form基类不同的Form类,以对应有着不同输入域的Form表单.在拿到前端给的字典前,要先初始化自定义From类,直

ant-pro使用Form表单验证上传图片出现的问题

1.复现:用antd的Form表单验证上传图片必填项时出现问题:复现过程,先提交,提示图片需要上传,上传成功后,依旧提示:图片未上传 2.表单验证原理:先理解一下antd的Form表单验证的表层原理,每个表单getFieldDecorator配置项都有个名字,比如就叫goodsSkuImg,这个goodsSkuImg对应this.props.form.goodsSkuImg如果为空则验证不通过. 3.结论:Form提示的根本原因是this.props.form.goodsSkuImg值为空 4.

element-ui Form表单验证

element-ui Form表单验证规则全解 element的form表单非常好用,自带了验证规则,用起来很方便,官网给的案例对于一些普通场景完全没问题,不过一些复杂场景的验证还得自己多看文档摸索,自己经过数次爬坑 之后,总结了几种form表单的验证规则,为了便于阅读,验证规则是拆分的,完整的代码放在文末 1. 普通输入验证 <el-form-item label="活动名称" prop="name"> <!-- validate-event属性