阅读sqlmap源代码,编写burpsuite插件--sqlmapapi

burpsuite插件编写---sql injection

0x00 概要

在安全测试过程中,大部分人会使用burpsuite的scanner模块进行测试,可以发现一些浅显的漏洞:比如xss、sql injection、c***f、xxe、Arbitrary file existence disclosure in Act、明文传输等。
说到sql injection,测试人员都会有一种想法是否存在一款自动化工具,可以将某一网站的所有链接都去尝试一边,尽可能的发现所有的sql injection。有了这种想法后大家会去想解决方案,有一种解决方案是编写burpsuite插件。

0x01 为什么sqlmapapi只能检测get请求是否存在注入?

编写burpsuite插件检查sql injection,网上已经存在了很多代码。

如图中代码一样,网上所有的资料在调用sqlmapapi进行扫描时只能扫描get请求的链接,比如:http://www.xxx.com/index.php?id=1 对post提交的参数无法进行扫描,开始我一直很迷惑到底是sqlmap没有提供该方法还是前辈们没有写,所以我就查看了一下sqlmap的源代码。
调用sqlmapapi的基本过程:

接下来我们看看为什么sqlmapapi不能进行post扫描:
1、首先查看sqlmapapi.py文件:

    # Start the client or the server
    if args.server is True:
        server(args.host, args.port, adapter=args.adapter, username=args.username, password=args.password)
    elif args.client is True:
        client(args.host, args.port, username=args.username, password=args.password)
    else:
        apiparser.print_help()

sqlmapapi开启服务后,我们在插件中的请求就是一个客户端,所以我们需要关注 client(args.host, args.port, username=args.username, password=args.password)

2、cilent函数所在文件:

3、由于在api.py中存在大量代码,逐行查找很费劲,我们直接搜索def client

4、分析cilent函数

def client(host=RESTAPI_DEFAULT_ADDRESS, port=RESTAPI_DEFAULT_PORT, username=None, password=None):
    """
    REST-JSON API client
    """

    DataStore.username = username
    DataStore.password = password

    dbgMsg = "Example client access from command line:"
    dbgMsg += "\n\t$ taskid=$(curl http://%s:%d/task/new 2>1 | grep -o -I ‘[a-f0-9]\{16\}‘) && echo $taskid" % (host, port)
    dbgMsg += "\n\t$ curl -H \"Content-Type: application/json\" -X POST -d ‘{\"url\": \"http://testphp.vulnweb.com/artists.php?artist=1\"}‘ http://%s:%d/scan/$taskid/start" % (host, port)
    dbgMsg += "\n\t$ curl http://%s:%d/scan/$taskid/data" % (host, port)
    dbgMsg += "\n\t$ curl http://%s:%d/scan/$taskid/log" % (host, port)
    logger.debug(dbgMsg)

    addr = "http://%s:%d" % (host, port)
    logger.info("Starting REST-JSON API client to ‘%s‘..." % addr)

    try:
        _client(addr)
    except Exception, ex:
        if not isinstance(ex, urllib2.HTTPError) or ex.code == httplib.UNAUTHORIZED:
            errMsg = "There has been a problem while connecting to the "
            errMsg += "REST-JSON API server at ‘%s‘ " % addr
            errMsg += "(%s)" % ex
            logger.critical(errMsg)
            return

我们查看的重点代码是

    dbgMsg = "Example client access from command line:"
    dbgMsg += "\n\t$ taskid=$(curl http://%s:%d/task/new 2>1 | grep -o -I ‘[a-f0-9]\{16\}‘) && echo $taskid" % (host, port)
    dbgMsg += "\n\t$ curl -H \"Content-Type: application/json\" -X POST -d ‘{\"url\": \"http://testphp.vulnweb.com/artists.php?artist=1\"}‘ http://%s:%d/scan/$taskid/start" % (host, port)
    dbgMsg += "\n\t$ curl http://%s:%d/scan/$taskid/data" % (host, port)
    dbgMsg += "\n\t$ curl http://%s:%d/scan/$taskid/log" % (host, port)
    logger.debug(dbgMsg)

在 dbgMsg += "\n\t$ curl -H \"Content-Type: application/json\" -X POST -d ‘{\"url\": \"http://testphp.vulnweb.com/artists.php?artist=1\"}‘ http://%s:%d/scan/$taskid/start" % (host, port)这一行代码表示开启一个任务扫描,Content-Type: application/json为请求头的部分;http://%s:%d/scan/$taskid/start 开启的具体那个任务;-X POST -d ‘{\"url\": \"http://testphp.vulnweb.com/artists.php?artist=1\"}‘ post请求参数,其中只有url,没有关于post请求的data参数,因此sqlmapapi只能进行get请求的sql injection 检测。

0x02继承IHttpListener接口(方法一),存在缺陷

先上源代码:

from burp import IBurpExtender
from burp import IHttpListener
from java.io import PrintWriter
import re
import urllib
import urllib2
import time
import json
from threading import Thread
import requests

class BurpExtender(IBurpExtender, IHttpListener):

    #
    #implement IBurpExtender
    #
    def    registerExtenderCallbacks(self, callbacks):
        # keep a reference to our callbacks object
        self._callbacks = callbacks

        # set our extension name
        callbacks.setExtensionName("fanyingjie")

        # obtain our output stream
        self._stdout = PrintWriter(callbacks.getStdout(), True)

        self._helpers  = callbacks.getHelpers()

        # register ourselves as an
        callbacks.registerHttpListener(self)

    def processHttpMessage(self,toolFlag,messageIsRequest, messageInfo):
        if(messageIsRequest):
            a=self._helpers.analyzeRequest(messageInfo)
            method=a.getMethod()
            url=str(a.getUrl())
            if(("?" in url) and (method=="GET")):
                self._stdout.println("start")
                t=AutoSqli(target=url,stdout=self._stdout,method=method)
                t.run()

class AutoSqli(Thread):
    def __init__(self,target,stdout,method):
        self.server="http://192.168.159.134:8775"
        self.taskid = ‘‘
        self.target=target
        self.method=method
        self._stdout=stdout
        self.start_time = time.time()

    def task_new(self):
        self.taskid = json.loads(urllib2.urlopen(self.server + ‘/task/new‘).read())[‘taskid‘]
        self._stdout.println(‘Created new task: ‘ + self.taskid )
        if len(self.taskid) > 0:
            return True
        return False

    def task_delete(self):
        if json.loads(urllib2.urlopen(self.server + ‘/task/‘ + self.taskid + ‘/delete‘).read())[‘success‘]:
            self._stdout.println(‘[%s] Deleted task‘ % (self.taskid))
            return True
        return False

    def scan_start(self):
        headers = {‘Content-Type‘: ‘application/json‘}
        payload = {‘url‘:self.target}
        url = self.server + ‘/scan/‘ + self.taskid + ‘/start‘
        #t = json.loads(requests.post(url, data=json.dumps(payload), headers=headers).text)

        req=urllib2.Request(url,data=json.dumps(payload),headers=headers)
        t=json.loads(urllib2.urlopen(req).read())
        self._stdout.println("start "+ self.taskid)

        if len(str(t[‘engineid‘])) > 0 and t[‘success‘]:
            return True
        return False

    def scan_status(self):
        status = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/status‘).read())[‘status‘]
        if status == ‘running‘:
            return ‘running‘
        if status == ‘terminated‘:
            return ‘terminated‘
        return "error"

    def scan_data(self):

        data = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/data‘).read())[‘data‘]
        if len(data) == 0:
            self._stdout.println(‘not injection:\t‘ + self.target)
            return False
        else:
            self._stdout.println(‘injection:\t‘ + self.target)
            return True

    def scan_kill(self):
        json.loads(rurllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/kill‘).read())[‘success‘]
        self._stdout.println("%s kill")%(self.taskid)

    def scan_stop(self):
        json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/stop‘).read())[‘success‘]
        self._stdout.println("%s stop")%(self.taskid)

    def run(self):
        try:
            if not self.task_new():
                return False
            if not self.scan_start():
                return False
            while True:
                if self.scan_status() == ‘running‘:
                    time.sleep(10)
                elif self.scan_status() == ‘terminated‘:
                    break
                else:
                    break
                #print self.target + ":\t" + str(time.time() - self.start_time)
                if time.time() - self.start_time > 500:
                    self.scan_stop()
                    self.scan_kill()
                    break
            self.scan_data()
            #self.task_delete()

        except Exception as e:
            pass

在本插件中继承了IHttpListener接口,继承该接口后,每点击一次链接就会执行一次插件,只有当插件完成执行完成后,才能进行下一步:

访问链接后,浏览器一直等待回应,继承IHttpListener接口的方法很影响测试效率,不过优点是可以检查出存在注入的链接
接下来对该代码进行简单的讲解一下:

        if(messageIsRequest):  #当包是请求包时执行sql injection检查
            a=self._helpers.analyzeRequest(messageInfo)  #这是burp提供的一个函数,可以从请求包中获取到url method header等
            method=a.getMethod()
            url=str(a.getUrl())
            if(("?" in url) and (method=="GET")): #当请求是get请求和链接中存在参数时进行sql injection 检查
                self._stdout.println("start")
                t=AutoSqli(target=url,stdout=self._stdout,method=method)
                t.run()
class AutoSqli(Thread):
    def __init__(self,target,stdout,method):
        self.server="http://192.168.159.134:8775"#开启sqlmapapi服务的ip
        self.taskid = ‘‘
        self.target=target
        self.method=method
        self._stdout=stdout
        self.start_time = time.time()

    def task_new(self):#创建一个新的任务,并获取taskid
        self.taskid = json.loads(urllib2.urlopen(self.server + ‘/task/new‘).read())[‘taskid‘]
        self._stdout.println(‘Created new task: ‘ + self.taskid )
        if len(self.taskid) > 0:
            return True
        return False

    def task_delete(self):#通过taskid删除某一个任务
        if json.loads(urllib2.urlopen(self.server + ‘/task/‘ + self.taskid + ‘/delete‘).read())[‘success‘]:
            self._stdout.println(‘[%s] Deleted task‘ % (self.taskid))
            return True
        return False

    def scan_start(self):#开始一个扫描,传入需要扫描的地址
        headers = {‘Content-Type‘: ‘application/json‘}
        payload = {‘url‘:self.target}
        url = self.server + ‘/scan/‘ + self.taskid + ‘/start‘
        #t = json.loads(requests.post(url, data=json.dumps(payload), headers=headers).text)

        req=urllib2.Request(url,data=json.dumps(payload),headers=headers)
        t=json.loads(urllib2.urlopen(req).read())
        self._stdout.println("start "+ self.taskid)

        if len(str(t[‘engineid‘])) > 0 and t[‘success‘]:
            return True
        return False

    def scan_status(self):#查看是否扫描完成,通过status判断,terminated是扫描完成
        status = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/status‘).read())[‘status‘]
        if status == ‘running‘:
            return ‘running‘
        if status == ‘terminated‘:
            return ‘terminated‘
        return "error"

    def scan_data(self):#获取扫描完成的结果,如果data有值表名存在注入,否则不存在

        data = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/data‘).read())[‘data‘]
        if len(data) == 0:
            self._stdout.println(‘not injection:\t‘ + self.target)
            return False
        else:
            self._stdout.println(‘injection:\t‘ + self.target)
            return True

    def scan_kill(self):
        json.loads(rurllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/kill‘).read())[‘success‘]
        self._stdout.println("%s kill")%(self.taskid)

    def scan_stop(self):
        json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/stop‘).read())[‘success‘]
        self._stdout.println("%s stop")%(self.taskid)

    def run(self):
        try:
            if not self.task_new():
                return False
            if not self.scan_start():
                return False
            while True:
                if self.scan_status() == ‘running‘:
                    time.sleep(10)
                elif self.scan_status() == ‘terminated‘:
                    break
                else:
                    break
                #print self.target + ":\t" + str(time.time() - self.start_time)
                if time.time() - self.start_time > 500:
                    self.scan_stop()
                    self.scan_kill()
                    break
            self.scan_data()
            #self.task_delete()

        except Exception as e:
            pass

在下一篇文章中,我会记录一下第二种方法,可以在不影响测试效率的情况下,依然可以有效的检查出注入;在设置一些sqlmapapi参数的前提下,比较一下sqlmapapi和burpsuite scanner模块那个更有效率。

原文地址:http://blog.51cto.com/13770310/2128197

时间: 2024-11-12 20:12:48

阅读sqlmap源代码,编写burpsuite插件--sqlmapapi的相关文章

打造阅读Linux源代码利器

打造阅读Linux源代码利器 在Linux里阅读/编写代码一般用vi 但是碰到较大的项目时阅读源代码还是比较费力,一直用find  和 grep命令. 其实,我们自己可以打造一个阅读源代码的vim,这样大大提高工作效率. 阅读源码利器组合:Vi + Ctags + taglist 以下为最终实现的效果图: 安装Ctags Ctags可以到http://ctags.sourceforge.net下载.目前我下载的是ctags-5.8.tar.gz tar  -xvzf ctags-5.8.tar.

如何高效的阅读hadoop源代码?

个人谈谈阅读hadoop源代码的经验. 首先,不得不说,hadoop发展到现在这个阶段,代码已经变得非常庞大臃肿,如果你直接阅读最新版本的源代码,难度比较大,需要足够的耐心和时间,所以,如果你觉得认真一次,认真阅读一次hadoop源代码,一定要有足够的心理准备和时间预期. 其次,需要注意,阅读Hadoop源代码的效率,因人而异,如果你有足够的分布式系统知识储备,看过类似的系统,则能够很快地读它的源代码进行通读,并快速切入你最关注的局部细节,比如你之前看过某个分布式数据库的源代码,对分布式系统的网

如何阅读项目源代码?

转自:http://blog.csdn.net/ilyfeng1314/article/details/7452326 为何要阅读源代码? 将代码作为参考手册 将优秀代码作为范例学习 维护现有的代码 改进现有的代码 重用现有的代码 对代码进行审查 网上搜到的一篇介绍阅读别人源代码的方法,放在这里,有空可以查看. 阅读别人的代码作为开发人员是一件经常要做的事情.一个是学习新的编程语言的时候通过阅读别人的代码是一个最好的学习方法,另外是积累编程经验.如果你有机会阅读一些操作系统的代码会帮助你理解一些

第七章(插件的使用和写法)(7.6 编写 jQuery 插件)

7.6.1 插件的种类 编写插件的目的是给已经有的一系列方法或函数做一个封装,以便在其他地方重复使用,方便后期维护和提高开发效率. jQuery 的插件主要分为3种类型. 1 封装对象方法的插件 这种插件是将对象方法封装起来,用于对通过选择器获取的 jQuery 对象进行操作,是最常见的一种插件. 据不完全统计,95%以上的 jQuery 插件都是封装对象方法的插件,此类插件可以发挥出 jQuery 选择器的强大优势.有相当一部分的 jQuery 的方法,都是在 jQuery 脚本库内部通过这种

sqlmap批量扫描burpsuite拦截的日志记录

1.功能上,sqlmap具备对burpsuite拦截的request日志进行批量扫描的能力 python sqlmap.py -l hermes.log --batch -v 3 --batch:会自动选择yes -l : 指定日志文件 -v: 调试信息等级,3级的话,可以看大注入的payload信息 2.如何配置burpsuite拦截扫描对象? 4.其他方式拦截的request日志文件,也可以 5.使用--smart智能探测效果不是很好,还是采用一些配置,加深扫描比较好 python sqlm

编写jQuery插件

编写jQuery插件 在园子里有很多关于jQuery插件的文章,尤其 以下2篇文章: 不定义JQuery插件,不要说会JQuery jQuery插件开发精品教程,让你的jQuery提升一个台阶 这2位大神基础讲的很清楚,在这里就不多说了,主要那个小需求来练练: 需求说明:一个标题插件,可以通过后端取数,自定义标题,自定义样式 讨论:插件通常不都是加载一下就不操作了,比如表格插件,加载数据,刷新等等. 今天练习的控件就简单给大家理理写控件的思路,(有问题,有意见大家指出.) ; (function

nopcommerce商城系统--如何编写一个插件

原址:http://www.nopcommerce.com/docs/77/how-to-write-a-nopcommerce-plugin.aspx plug-in (或 plugin)是一个为更大的软件应用程序添加特定的能力的组件(Wikipedia) 插件是用来扩展nopCommerce功能的.nopCommerce拥有多种类型的插件.例如:支付方式(PayPal),税务机构,送货方式计算方法(UPS, USP, FedEx),小部件(如"在线聊天"块)等等. nopComme

自己编写jQuery插件 之 菜单折叠

菜单折叠这个功能很简单,很多人都有写过,只因它在项目中使用实在是太频繁了.代码就那么几行,没什么讲的,这里只是将其封装成插件而已. Html代码如下: <div class="box"> <p>菜单一</p> <ul> <li><a>1111</a></li> <li><a>1111</a></li> <li><a>11

自己编写jQuery插件 之 表单验证

吐个嘈先:最近状态不咋滴,真是什么都不想干,不想上班,做什么都没动力,觉得没意思.不想这样,不想这样,快让这种情绪消失吧,忽忽.... 表单验证在项目中用的还是比较多的,公司当前正在做的项目就要用到,故此写了此插件,先给大家看下在项目中应用的效果图吧: 直接上插件实现代码了,围绕代码进行讲解比较容易点: /* 描述:基于jquery的表单验证插件. 时间:2014-8-3 作者:similar([email protected]) */ (function ($) { $.fn.checkFor