python做一个http接口测试框架

目录结构

project

  

  case#测试用例

    

    suite#测试目录

  

  logs#测试日志

  

  papi#测试类

  

  result#测试结果

  

  setting.py#配置文件

1、日志类,用于测试时日志记录

pyapilog.py

  

 1 # -*-coding:utf-8 -*-
 2 # !/usr/bin/python
 3 __author__ = ‘dongjie‘
 4 __data__ = ‘2015-05-20‘
 5
 6 import logging
 7 import datetime
 8 import os
 9 import setting
10 logLevel = {
11    1 : logging.NOTSET,
12    2 : logging.DEBUG,
13    3 : logging.INFO,
14    4 : logging.WARNING,
15    5 : logging.ERROR,
16    6 : logging.CRITICAL
17 }
18 setFile = os.path.join(setting.root_dir, ‘setting.ini‘)
19 loggers = {}
20
21
22 #  定义日志方法,从配置文件读取日志等级,且定义日志输出路径
23 def pyapilog(**kwargs):
24     global loggers
25     log_level = setting.logLevel
26     log_path = setting.logFile
27     if os.path.exists(log_path):
28         log_file = os.path.join(log_path, datetime.datetime.now().strftime(‘%Y-%m-%d‘) + ‘.log‘)
29     else:
30         os.mkdir(r‘%s‘ % log_path)
31         log_file = os.path.join(log_path, datetime.datetime.now().strftime(‘%Y-%m-%d‘) + ‘.log‘)
32     logger = logging.getLogger()
33     logger.setLevel(logLevel[log_level])
34     if not logger.handlers:
35         # 创建一个handler,用于写入日志文件
36         fh = logging.FileHandler(log_file)
37         fh.setLevel(logLevel[log_level])
38         # 再创建一个handler,用于输出到控制台
39         ch = logging.StreamHandler()
40         ch.setLevel(logging.ERROR)
41         # 定义handler的输出格式
42         formatter = logging.Formatter(‘%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s‘)
43         fh.setFormatter(formatter)
44         ch.setFormatter(formatter)
45         # 给logger添加handler
46         logger.addHandler(fh)
47         logger.addHandler(ch)
48         loggers.update(dict(name=logger))
49     return  logger

2、http测试类

httprequest.py

  

# -*-coding:utf-8 -*-
# !/usr/bin/python
__author__ = ‘dongjie‘
__data__ = ‘2015-05-20‘

from pyapilog import pyapilog
import requests
import json
import urllib

class SendHttpRequest(object):
    def __init__(self, url):
        self.url = url
    # post request

    def post(self, value=None):
        params = urllib.urlencode(value)
        try:
            req = requests.post(self.url + "?%s" % params)
        except Exception, err:
            print err
        if req.status_code == 200:
            pyapilog().info(u"发送post请求: %s  服务器返回:  %s" % (req.url, req.status_code))
        else:
            pyapilog().error(u"发送post请求: %s   服务器返回:  %s\n error info: %s " % (req.url, req.status_code, req.text))
        return req.text

    def post_json(self, value):
        head = {‘content-type‘: ‘application/json‘}
        try:
            req = requests.post(self.url, data=json.dumps(value), headers=head)
            print req.url
        except Exception, err:
            print err
        if req.status_code == 200:
            pyapilog().info(u"发送post请求: %s  服务器返回:  %s" % (req.url, req.status_code))
            return req.text
        else:
            pyapilog().error(u"发送post请求: %s   服务器返回:  %s\n error info: %s " % (req.url, req.status_code, req.text))

    def get(self, value=None):
        try:
            req = requests.get(self.url, params=value)
        except Exception, err:
            print err
        if req.status_code == 200:
            pyapilog().info(u"发送get请求: %s   服务器返回:  %s" % (req.url, req.status_code))
        else:
            pyapilog().error(u"发送get请求: %s   服务器返回:  %s\n error info: %s " % (req.url, req.status_code, req.text))
        return req.text

3、数据库操作类

databasedriver.py# -*-coding:utf-8 -*# !/usr/bin/python

  

__author__ = ‘dongjie‘
__data__ = ‘2015-05-21‘
import pymssql
import MySQLdb
import setting
from pyapilog import pyapilog

class sqldriver(object):
    def __init__(self, host, port, user, password, database):
        self.host = host
        self.port = port
        self.user = user
        self.password = password
        self.database = database

    # 执行SQLserver查询
    def exec_mssql(self, sql):
        try:
            conn = pymssql.connect(host=self.host,
                                   port=self.port,
                                   user=self.user,
                                   password=self.password,
                                   database=self.database,
                                   charset="utf8")
            cur = conn.cursor()
            if cur:
                pyapilog().info(u"执行SQL语句%s" % sql)
                cur.execute(sql)
                rows = cur.fetchall()
                if len(rows) == 0:
                    pyapilog().warning(u"没有查询到数据")
                return rows
            else:
                pyapilog().error(u"数据库连接不成功")
            conn.close()
        except Exception, e:
            pyapilog().error(e)

    # 执行Mysql查询
    def exec_mysql(self, sql):
        try:
            conn = MySQLdb.connect(host=self.host,
                                   port=self.port,
                                   user=self.user,
                                   passwd=self.password,
                                   db=self.database,
                                   )
            cur = conn.cursor()
            if cur:
                pyapilog().info(u"执行SQL语句%s" % sql)
                resList = cur.execute(sql)
                return resList
        except Exception, e:
            pyapilog().error(e)

# 执行sql语句返回结果
def execsql(sql):
    config = setting.DATABASE
    driver = config.get("ENGINE")
    host = config.get("HOST")
    port = config.get("PORT")
    user = config.get("USER")
    password = config.get("PWD")
    database = config.get("DATABASE")
    if driver == "MYSQL":
        try:
            sql_result = sqldriver(
                host=host,
                port=port,
                user=user,
                password=password,
                database=database
            ).exec_mysql(sql)
            return sql_result
        except Exception, e:
            pyapilog().error(e)

    elif driver == "MSSQL":
        try:
            sql_result = sqldriver(
                host=host,
                port=port,
                user=user,
                password=password,
                database=database
            ).exec_mssql(sql)
            return sql_result
        except Exception, e:
            pyapilog().error(e)
else:
        pyapilog().error(u"[%s]数据库配置支持MYSQL、MSSQL、ORACLE" % driver)

4、解析json字符串

dataprase.py

  

# -*-coding:utf-8 -*-
# !/usr/bin/python
__author__ = ‘dongjie‘
__data__ = ‘2015-05-21‘
import json
import xmltodict
from pyapilog import pyapilog

# 解析json字符串
class jsonprase(object):
    def __init__(self, json_value):
        try:
            self.json_value = json.loads(json_value)
        except ValueError, e:
            pyapilog().error(e)

    def find_json_node_by_xpath(self, xpath):
        elem = self.json_value
        nodes = xpath.strip("/").split("/")
        for x in range(len(nodes)):
            try:
                elem = elem.get(nodes[x])
            except AttributeError:
                elem = [y.get(nodes[x]) for y in elem]
        return elem

    def datalength(self, xpath="/"):
        return len(self.find_json_node_by_xpath(xpath))

    @property
    def json_to_xml(self):
        try:
            root = {"root": self.json_value}
            xml = xmltodict.unparse(root, pretty=True)
        except ArithmeticError, e:
            pyapilog().error(e)
        return xml

# 解析xml字符串
class xmlprase(object):
    def __init__(self, xml_value):
        self.xml_str = xml_value

    @property
    def xml_to_json(self):
        try:
            xml_dic = xmltodict.parse(self.xml_str,
                                      encoding="utf-8",
                                      process_namespaces=True,
                                      )
            json_str = json.dumps(xml_dic)
        except Exception, e:
            print e
        return json_str

5、还有配置文件差点忘记说了

# -*-coding:utf-8 -*-
# !/usr/bin/python
__author__ = ‘dongjie‘
__data__ = ‘2015-05-20‘

‘‘‘
    配置系统相关的参数,提供全局的相关配置
‘‘‘
import os
import sys
root_dir = ‘/‘.join(os.path.realpath(__file__).split(‘/‘)[:-1])
sys.path.append(root_dir)
# log等级,1:notset 2:debug  3:info 4:warning 5:error 6:critical
logLevel = 2
# 日志文件路径
logFile = os.path.join(root_dir, ‘logs‘)

# 数据库配置,支持MYSQL、MSSQL、ORACLE
DATABASE = {
    "ENGINE": "MSSQL",
    "HOST": "",
    "PORT": 3433,
    "USER": "",
    "PWD": "",
    "DATABASE": ""
}

  6、最后看看我们的测试用例吧,当然是数据驱动了

# -*-coding:utf-8 -*-
from ddt import ddt, data, unpack
import unittest
from papi.httprequest import SendHttpRequest
from papi.dataparse import jsonprase, xmlprase

@ddt
class TestSingleRequest(unittest.TestCase):
    def setUp(self):
        self.url = "http://xxxxxxxxxxxxxxxxxxx/api/xxxxxx"
    @data(
        (32351, 6),
        (9, 4)
    )
    @unpack
    def test_Single_right(self, sid, count):
        value = {"sid": sid, "count": count}
        data = SendHttpRequest(self.url).get(value)
        json_data = jsonprase(data)
        point_lat = json_data.find_json_node_by_xpath("/Point/Lat")
        point_lng = json_data.find_json_node_by_xpath("/Point/Lng")
        is_exists_map = json_data.find_json_node_by_xpath("/Ptd/AmapGuideMap155/IsExistsMap")
        size = json_data.find_json_node_by_xpath("/Ptd/AmapGuideMap155/Size")
        # 断言
        assert float(point_lat) != 0 and float(point_lng) != 0
        # 断言
        assert json_data.find_json_node_by_xpath("/Ptd/AmapGuideMap155/DownUrl") is not None
        if is_exists_map == True:
            assert size != ""

    # 导常请求SingleRequest接口
    @data(
        ("abceeffffg", 6),
        (9, "")
    )
    @unpack
    def test_Single_error(self, sid, count):
        value = {"sid": sid, "count": count}
        data = SendHttpRequest(self.url).get(value)
        self.assertEqual(data, u‘{"Message":"请求无效。"}‘)

@ddt
class TourMaps(unittest.TestCase):
    def setUp(self):
        self.url = "http://xxxxxx/api/TourMap"

    @data(32351, 9)
    def test_requests_online_xml(self, tourId):
        xml_url = self.url + "/%s" % tourId
        data = SendHttpRequest(xml_url).get()
        json_st = xmlprase(data).xml_to_json
        json_data = jsonprase(json_st)
        lng = json_data.find_json_node_by_xpath("/root/data/@lng")
        lat = json_data.find_json_node_by_xpath("/root/data/@lat")
        assert lng != "" and lat != ""
        son_tour = json_data.find_json_node_by_xpath("/root/data/data")
        assert len(son_tour) > 0

class TourData(unittest.TestCase):
    def setUp(self):
        self.url = "http://xxxxxx/api/xxx"

    @data(
        (),
        (),
        (),
    )
    @unpack
    def test_tourList_Location_in_open(self):
        pass

    def test_tourList_Location_not_open(self):
        pass

    def test_tour_open_city(self):
        pass

if __name__ == "__main__":
    suite = unittest.TestLoader().loadTestsFromTestCase(TourMaps, TestSingleRequest)
    unittest.TextTestRunner(verbosity=2).run(suite)

关于python ddt查以参考https://ddt.readthedocs.org/en/latest/example.html

测试结果生成,可以查看python nose相关文档,生成hmtl

时间: 2024-08-24 19:19:13

python做一个http接口测试框架的相关文章

用Python做一个知乎沙雕问题总结

用Python做一个知乎沙雕问题总结 松鼠爱吃饼干2020-04-01 13:40 前言 本文的文字及图片来源于网络,仅供学习.交流使用,不具有任何商业用途,版权归原作者所有,如有问题请及时联系我们以作处理. 作者: 数据森麟 PS:如有需要Python学习资料的小伙伴可以加点击下方链接自行获取http://t.cn/A6Zvjdun 这两天偶然上网的时候,被知乎上一个名为“玉皇大帝住在平流层还是对流层”的问题吸引,本以为只是小打小闹,殊不知这个问题却在知乎上引发了强烈共鸣,浏览次数500W+,

用python做一个搜索引擎(Pylucene)

什么是搜索引擎? 搜索引擎是"对网络信息资源进行搜集整理并提供信息查询服务的系统,包括信息搜集.信息整理和用户查询三部分".如图1是搜索引擎的一般结构,信息搜集模块从网络采集信息到网络信息库之中(一般使用爬虫):然后信息整理模块对采集的信息进行分词.去停用词.赋权重等操作后建立索引表(一般是倒排索引)构成索引库:最后用户查询模块就可以识别用户的检索需求并提供检索服务啦. 图1  搜索引擎的一般结构 2.  使用python实现一个简单搜索引擎 2.1  问题分析 从图1看,一个完整的搜

selenium+python做web端自动化测试框架与实例详解教程

最近受到万点暴击,由于公司业务出现问题,工作任务没那么繁重,有时间摸索selenium+python自动化测试,结合网上查到的资料自己编写出适合web自动化测试的框架,由于本人也是刚刚开始学习python,这套自动化框架目前已经基本完成了所以总结下编写的得失,便于以后回顾温习,有许多不足的的地方,也遇到了各种奇葩问题,希望大神们多多指教. 首先我们要了解什么是自动化测试,简单的说编写代码.脚本,让软件自动运行,发现缺陷,代替部分的手工测试.了解了自动化测试后,我们要清楚一个框架需要分那些模块:

用python做一个简单的pong游戏

pong游戏就是一个用挡板去控制一个小球不触底的一个小游戏,上个世纪以电视游戏的方式发行,取得巨大的成功. 看了一点书,知道pygame是python里一个强大的模块,做出这个游戏的简易模式也不难. 主要思想:1.创建游戏界面,挡板,小球以及记分牌. 2.小球碰到游戏界面四个边界会反弹,即x方向和y方向上的速度会改变为负,碰到底边生命数会减1. 3.小球与挡板碰撞y方向速度会变负,同时分数加1. 4.游戏结束会显示相关文字. 代码如下: import pygame pygame.init() s

Python做一个简单的web服务器,外接一个支持wsgi协议框架显示动态数据

import socket import re import sys import mini_frame # 通过外部传端口号给套接字 # tcp_port = sys.argv[1] class Mini_Wsgi(object): def __init__(self): self.tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.tcp_socket.setsockopt(socket.SOL_SOCKET

[Python] 用python做一个游戏辅助脚本,完整思路

一.说明 简述:本文将以4399小游戏<宠物连连看经典版2>作为测试案例,通过识别小图标,模拟鼠标点击,快速完成配对.对于有兴趣学习游戏脚本的同学有一定的帮助. 运行环境:Win10/Python3.5. 主要模块:win32gui(识别窗口.窗口置顶等操作).PIL(屏幕截图).numpy(创建矩阵).operator(比较值).pymouse(模拟鼠标点击). 注意点: 1.如果安装pymouse不成功或者运行报错,可以考虑先通过whl 安装pyHook.然后再通过pip安装pyuseri

绕过010Editor网络验证(用python做一个仿真http server真容易,就几行代码)

010Editor是一款非常强大的十六进制编辑器,尤其是它的模板功能在分析文件格式时相当好用!网上现在也有不少010Editor的破解版,如果没钱或者舍不得花钱买授权的话,去官方下载安装包再使用注册机算号是一个比较安全的选择.不过010Editor是有网络验证功能的,可以在本地架一个HTTP服务器来绕过这个验证(网上也能找到通过修改注册表绕过的方法,没有验证).使用Python的BaseHTTPServer模块就可以实现这个功能(继承BaseHTTPRequestHandler并重写do_GET

[python]做一个简单爬虫

为什么选择python,它强大的库可以让你专注在爬虫这一件事上而不是更底层的更繁杂的事 爬虫说简单很简单,说麻烦也很麻烦,完全取决于你的需求是什么以及你爬的网站所决定的,遇到的第一个简单的例子是paste.ubuntu.com 这是一个贴代码的网站,没事喜欢看看有没有什么好玩的东西,只是上面大部分都是minecraft的东西,于是写了以下代码 1 import urllib2 2 import socket 3 import re 4 def getData(url, timeOut = 10)

[转]用Python做一个自动生成读表代码的小脚本

写在开始(本片文章不是写给小白的,至少你应该知道一些常识!) 大家在Unity开发中,肯定会把一些数据放到配置文件中,尤其是大一点的项目,每次开发一个新功能的时候,都要重复的写那些读表代码.非常烦.来个实用小工具,大家随便看看. 1 #-*- coding: utf-8 -*- 2 #----------------------------------------------------------# 3 # 版本:python-3.5.0a3-amd64 4 # 功能:生成读表代码文件 5 #