2019年2月在写这篇文章 挖掘暗藏ThinkPHP中的反序列利用链 , 寻找PHP反序列化的
POP Chain
时, 我就在想这种纯粹的体力劳动可不可以更现代化一点, 不仅仅是Ctrl+Shift+F
这种机械重复的体力劳动, 当时了解了一些相关的项目/论文, 包括不限于Navex
,Prvd
,Cobra
,Codeql
. 鉴于Cobra代码开源, 也相对简单, 后来有一阵子某知名OA漏洞爆发, 于是参考了Cobra
的PHP Parser
尝试实现一个通过遍历Java AST(抽象语法树)进行漏洞挖掘的工具, 没想到效果出奇的好, 筛选出160个前台注入点, 手工编写了约50个前台注入EXP.文中涉及的漏洞均为
workflowcentertreedata
通告的相似漏洞研究, 补丁版本之后均已失效
预备知识
某知名OA介绍
某知名OA是使用Java编写的一个OA套件, 代码相对古老, 其中sql查询语句多是拼接, 且代码中没有过滤, 其过滤是通过统一的Filter
实现的, 存在一些绕过的情况.
某知名OA的主体功能是通过JSP实现的, 这里是目前只有PMD
支持解析, 但是没有尝试, 从idea的的解析结果来看, 大概是解析不到具体函数逻辑的, 好在JSP可以编译成Java Servlet
, 某知名OA使用的Resin Server
也会缓存编译好的Java Servlet
, 这里倒是省了不少麻烦.
编译原理基础
了解过编译原理的同学都知道, 一般语言的编译都是通过 词法分析
,语法分析
, 然后解析成AST(抽象语法树)
, 这里包含了一个程序源文件的所有结构化信息, 通过遍历AST的方式, 我们可以精确的取出我们需要的信息, 而不是笨拙的使用全局搜索, 正则表达式这种会丢失上下文信息的方式.
一般的编译过程如下图所示
环境准备
首先这里需要搭建某知名OA的环境, 这里以某知名OA 8
为例, 可以去百度下载Ecology8.100.0531
默认配置安装完成就OK了
遍历某知名OA的JSP文件路径
先使用Python获取到某知名OA文件夹中的JSP文件路径, 这里可以自己过滤一下
#python 遍历文件夹
import os
def get_files(path=r"D:\WEAVER\ecology\"):
g = os.walk(path)
result = []
for path, d, file_list in g:
for filename in file_list:
full_path = os.path.join(path, filename)
result.append([full_path, filename])
return result
然后通过burp intruder
的方式遍历某知名OA的JSP在前台的可访问性, 这里使用Python访问也行
获取到如下列表
Request Payload Status Error Timeout Length Comment
7373 workflow/request/WorkflowViewRequestDetailBodyAction.jsp 200 false false 73584
7319 workflow/request/WorkflowManageRequestBody.jsp 200 false false 71216
7359 workflow/request/WorkflowSignInput.jsp 200 false false 69746
6445 web/workflow/request/WorkflowAddRequestBody.jsp 200 false false 69080
7372 workflow/request/WorkflowViewRequestDetailBody.jsp 200 false false 66718
7297 workflow/request/WorkflowAddRequestBodyDataCenter.jsp 200 false false 64160
7322 workflow/request/WorkflowManageRequestBodyDataCenter.jsp 200 false false 64098
7301 workflow/request/WorkflowAddRequestFormBody.jsp 200 false false 62012
3499 hrm/report/resource/HrmConstRpDataDefine.jsp 200 false false 61648
6923 workflow/request/BillBudgetExpenseDetail.jsp 200 false false 61272
7295 workflow/request/WorkflowAddRequestBody.jsp 200 false false 60130
7370 workflow/request/WorkflowViewRequestBody.jsp 200 false false 59860
.....
2368 formmode/import/ProcessOperation.jsp 200 false false 218
7378 workflow/request/WorkflowViewSign.jsp 0 false false 5
6419 web/WebBBSDsp.jsp 0 false false 0
6421 web/WebDsp.jsp 0 false false 0
6422 web/WebJournalDsp.jsp 0 false false 0
6426 web/WebListDspSecond.jsp 0 false false 0
获取Resin生成的Servlet.java
获取到JSP文件的访问权限列表的同时, 某知名OA的目录D:\WEAVER\ecology\WEB-INF\work\_jsp
中也生成了对应的JSP Servlet
然后把_jsp
目录复制出来, 某知名OA的准备过程就到这里结束了
参考Cobra
的PHP Parser
Cobra 源码理解
# -*- coding: utf-8 -*-
"""
parser
~~~~~~
Implements Code Parser
:author: BlBana <[email protected]>
:homepage: https://github.com/WhaleShark-Team/cobra
:license: MIT, see LICENSE for more details.
:copyright: Copyright (c) 2018 Feei. All rights reserved
"""
from phply.phplex import lexer # 词法分析
from phply.phpparse import make_parser # 语法分析
from phply import phpast as php
from .log import logger
with_line = True
scan_results = [] # 结果存放列表初始化
repairs = [] # 用于存放修复函数
def export(items):
result = []
if items:
for item in items:
if hasattr(item, 'generic'):
item = item.generic(with_lineno=with_line)
result.append(item)
return result
def export_list(params, export_params):
"""
将params中嵌套的多个列表,导出为一个列表
:param params:
:param export_params:
:return:
"""
for param in params:
if isinstance(param, list):
export_params = export_list(param, export_params)
else:
export_params.append(param)
return export_params
def get_all_params(nodes): # 用来获取调用函数的参数列表,nodes为参数列表
"""
获取函数结构的所有参数
:param nodes:
:return:
"""
params = []
export_params = [] # 定义空列表,用来给export_list中使用
for node in nodes:
if isinstance(node.node, php.FunctionCall): # 函数参数来自另一个函数的返回值
params = get_all_params(node.node.params)
else:
if isinstance(node.node, php.Variable):
params.append(node.node.name)
if isinstance(node.node, php.BinaryOp):
params = get_binaryop_params(node.node)
params = export_list(params, export_params)
if isinstance(node.node, php.ArrayOffset):
param = get_node_name(node.node.node)
params.append(param)
if isinstance(node.node, php.Cast):
param = get_cast_params(node.node.expr)
params.append(param)
if isinstance(node.node, php.Silence):
param = get_silence_params(node.node)
params.append(param)
return params
def get_silence_params(node):
"""
用来提取Silence类型中的参数
:param node:
:return:
"""
param = []
if isinstance(node.expr, php.Variable):
param = get_node_name(node.expr)
if isinstance(node.expr, php.FunctionCall):
param.append(node.expr)
if isinstance(node.expr, php.Eval):
param.append(node.expr)
if isinstance(node.expr, php.Assignment):
param.append(node.expr)
return param
def get_cast_params(node):
"""
用来提取Cast类型中的参数
:param node:
:return:
"""
param = []
if isinstance(node, php.Silence):
param = get_node_name(node.expr)
return param
def get_binaryop_params(node): # 当为BinaryOp类型时,分别对left和right进行处理,取出需要的变量
"""
用来提取Binaryop中的参数
:param node:
:return:
"""
logger.debug('[AST] Binaryop --> {node}'.format(node=node))
params = []
buffer_ = []
if isinstance(node.left, php.Variable) or isinstance(node.right, php.Variable): # left, right都为变量直接取值
if isinstance(node.left, php.Variable):
params.append(node.left.name)
if isinstance(node.right, php.Variable):
params.append(node.right.name)
if not isinstance(node.right, php.Variable) or not isinstance(node.left, php.Variable): # right不为变量时
params_right = get_binaryop_deep_params(node.right, params)
params_left = get_binaryop_deep_params(node.left, params)
params = params_left + params_right
params = export_list(params, buffer_)
return params
def get_binaryop_deep_params(node, params): # 取出right,left不为变量时,对象结构中的变量
"""
取出深层的变量名
:param node: node为上一步中的node.left或者node.right节点
:param params:
:return:
"""
if isinstance(node, php.ArrayOffset): # node为数组,取出数组变量名
param = get_node_name(node.node)
params.append(param)
if isinstance(node, php.BinaryOp): # node为BinaryOp,递归取出其中变量
param = get_binaryop_params(node)
params.append(param)
if isinstance(node, php.FunctionCall): # node为FunctionCall,递归取出其中变量名
params = get_all_params(node.params)
return params
def get_expr_name(node): # expr为'expr'中的值
"""
获取赋值表达式的表达式部分中的参数名-->返回用来进行回溯
:param node:
:return:
"""
param_lineno = 0
is_re = False
if isinstance(node, php.ArrayOffset): # 当赋值表达式为数组
param_expr = get_node_name(node.node) # 返回数组名
param_lineno = node.node.lineno
elif isinstance(node, php.Variable): # 当赋值表达式为变量
param_expr = node.name # 返回变量名
param_lineno = node.lineno
elif isinstance(node, php.FunctionCall): # 当赋值表达式为函数
param_expr = get_all_params(node.params) # 返回函数参数列表
param_lineno = node.lineno
is_re = is_repair(node.name) # 调用了函数,判断调用的函数是否为修复函数
elif isinstance(node, php.BinaryOp): # 当赋值表达式为BinaryOp
param_expr = get_binaryop_params(node)
param_lineno = node.lineno
else:
param_expr = node
return param_expr, param_lineno, is_re
def get_node_name(node): # node为'node'中的元组
"""
获取Variable类型节点的name
:param node:
:return:
"""
if isinstance(node, php.Variable):
return node.name # 返回此节点中的变量名
def is_repair(expr):
"""
判断赋值表达式是否出现过滤函数,如果已经过滤,停止污点回溯,判定漏洞已修复
:param expr: 赋值表达式
:return:
"""
is_re = False # 是否修复,默认值是未修复
for repair in repairs:
if expr == repair:
is_re = True
return is_re
return is_re
def is_sink_function(param_expr, function_params):
"""
判断自定义函数的入参-->判断此函数是否是危险函数
:param param_expr:
:param function_params:
:return:
"""
is_co = -1
cp = None
if function_params is not None:
for function_param in function_params:
if param_expr == function_param:
is_co = 2
cp = function_param
logger.debug('[AST] is_sink_function --> {function_param}'.format(function_param=cp))
return is_co, cp
def is_controllable(expr): # 获取表达式中的变量,看是否在用户可控变量列表中
"""
判断赋值表达式是否是用户可控的
:param expr:
:return:
"""
controlled_params = [
'$_GET',
'$_POST',
'$_REQUEST',
'$_COOKIE',
'$_FILES',
'$_SERVER',
'$HTTP_POST_FILES',
'$HTTP_COOKIE_VARS',
'$HTTP_REQUEST_VARS',
'$HTTP_POST_VARS',
'$HTTP_RAW_POST_DATA',
'$HTTP_GET_VARS'
]
if expr in controlled_params:
logger.debug('[AST] is_controllable --> {expr}'.format(expr=expr))
return 1, expr
return -1, None
def parameters_back(param, nodes, function_params=None): # 用来得到回溯过程中的被赋值的变量是否与敏感函数变量相等,param是当前需要跟踪的污点
"""
递归回溯敏感函数的赋值流程,param为跟踪的污点,当找到param来源时-->分析复制表达式-->获取新污点;否则递归下一个节点
:param param:
:param nodes:
:param function_params:
:return:
"""
expr_lineno = 0 # source所在行号
is_co, cp = is_controllable(param)
if len(nodes) != 0 and is_co == -1:
node = nodes[len(nodes) - 1]
if isinstance(node, php.Assignment): # 回溯的过程中,对出现赋值情况的节点进行跟踪
param_node = get_node_name(node.node) # param_node为被赋值的变量
param_expr, expr_lineno, is_re = get_expr_name(node.expr) # param_expr为赋值表达式,param_expr为变量或者列表
if param == param_node and is_re is True:
is_co = 0
cp = None
return is_co, cp, expr_lineno
if param == param_node and not isinstance(param_expr, list): # 找到变量的来源,开始继续分析变量的赋值表达式是否可控
is_co, cp = is_controllable(param_expr) # 开始判断变量是否可控
if is_co != 1:
is_co, cp = is_sink_function(param_expr, function_params)
param = param_expr # 每次找到一个污点的来源时,开始跟踪新污点,覆盖旧污点
if param == param_node and isinstance(param_expr, list):
for expr in param_expr:
param = expr
is_co, cp = is_controllable(expr)
if is_co == 1:
return is_co, cp, expr_lineno
_is_co, _cp, expr_lineno = parameters_back(param, nodes[:-1], function_params)
if _is_co != -1: # 当参数可控时,值赋给is_co 和 cp,有一个参数可控,则认定这个函数可能可控
is_co = _is_co
cp = _cp
if is_co == -1: # 当is_co为True时找到可控,停止递归
is_co, cp, expr_lineno = parameters_back(param, nodes[:-1], function_params) # 找到可控的输入时,停止递归
elif len(nodes) == 0 and function_params is not None:
for function_param in function_params:
if function_param == param:
is_co = 2
cp = function_param
return is_co, cp, expr_lineno
def get_function_params(nodes):
"""
获取用户自定义函数的所有入参
:param nodes: 自定义函数的参数部分
:return: 以列表的形式返回所有的入参
"""
params = []
for node in nodes:
if isinstance(node, php.FormalParameter):
params.append(node.name)
return params
def anlysis_function(node, back_node, vul_function, function_params, vul_lineno):
"""
对用户自定义的函数进行分析-->获取函数入参-->入参用经过赋值流程,进入sink函数-->此自定义函数为危险函数
:param node:
:param back_node:
:param vul_function:
:param function_params:
:param vul_lineno:
:return:
"""
global scan_results
try:
if node.name == vul_function and int(node.lineno) == int(vul_lineno): # 函数体中存在敏感函数,开始对敏感函数前的代码进行检测
for param in node.params:
if isinstance(param.node, php.Variable):
analysis_variable_node(param.node, back_node, vul_function, vul_lineno, function_params)
if isinstance(param.node, php.FunctionCall):
analysis_functioncall_node(param.node, back_node, vul_function, vul_lineno, function_params)
if isinstance(param.node, php.BinaryOp):
analysis_binaryop_node(param.node, back_node, vul_function, vul_lineno, function_params)
if isinstance(param.node, php.ArrayOffset):
analysis_arrayoffset_node(param.node, vul_function, vul_lineno)
except Exception as e:
logger.debug(e)
# def analysis_functioncall(node, back_node, vul_function, vul_lineno):
# """
# 调用FunctionCall-->判断调用Function是否敏感-->get params获取所有参数-->开始递归判断
# :param node:
# :param back_node:
# :param vul_function:
# :param vul_lineno
# :return:
# """
# global scan_results
# try:
# if node.name == vul_function and int(node.lineno) == int(vul_lineno): # 定位到敏感函数
# for param in node.params:
# if isinstance(param.node, php.Variable):
# analysis_variable_node(param.node, back_node, vul_function, vul_lineno)
#
# if isinstance(param.node, php.FunctionCall):
# analysis_functioncall_node(param.node, back_node, vul_function, vul_lineno)
#
# if isinstance(param.node, php.BinaryOp):
# analysis_binaryop_node(param.node, back_node, vul_function, vul_lineno)
#
# if isinstance(param.node, php.ArrayOffset):
# analysis_arrayoffset_node(param.node, vul_function, vul_lineno)
#
# except Exception as e:
# logger.debug(e)
def analysis_binaryop_node(node, back_node, vul_function, vul_lineno, function_params=None):
"""
处理BinaryOp类型节点-->取出参数-->回溯判断参数是否可控-->输出结果
:param node:
:param back_node:
:param vul_function:
:param vul_lineno:
:param function_params:
:return:
"""
logger.debug('[AST] vul_function:{v}'.format(v=vul_function))
params = get_binaryop_params(node)
params = export_list(params, export_params=[])
for param in params:
is_co, cp, expr_lineno = parameters_back(param, back_node, function_params)
set_scan_results(is_co, cp, expr_lineno, vul_function, param, vul_lineno)
def analysis_arrayoffset_node(node, vul_function, vul_lineno):
"""
处理ArrayOffset类型节点-->取出参数-->回溯判断参数是否可控-->输出结果
:param node:
:param vul_function:
:param vul_lineno:
:return:
"""
logger.debug('[AST] vul_function:{v}'.format(v=vul_function))
param = get_node_name(node.node)
expr_lineno = node.lineno
is_co, cp = is_controllable(param)
set_scan_results(is_co, cp, expr_lineno, vul_function, param, vul_lineno)
def analysis_functioncall_node(node, back_node, vul_function, vul_lineno, function_params=None):
"""
处理FunctionCall类型节点-->取出参数-->回溯判断参数是否可控-->输出结果
:param node:
:param back_node:
:param vul_function:
:param vul_lineno:
:param function_params:
:return:
"""
logger.debug('[AST] vul_function:{v}'.format(v=vul_function))
params = get_all_params(node.params)
for param in params:
is_co, cp, expr_lineno = parameters_back(param, back_node, function_params)
set_scan_results(is_co, cp, expr_lineno, vul_function, param, vul_lineno)
def analysis_variable_node(node, back_node, vul_function, vul_lineno, function_params=None):
"""
处理Variable类型节点-->取出参数-->回溯判断参数是否可控-->输出结果
:param node:
:param back_node:
:param vul_function:
:param vul_lineno:
:param function_params:
:return:
"""
logger.debug('[AST] vul_function:{v}'.format(v=vul_function))
params = get_node_name(node)
is_co, cp, expr_lineno = parameters_back(params, back_node, function_params)
set_scan_results(is_co, cp, expr_lineno, vul_function, params, vul_lineno)
def analysis_if_else(node, back_node, vul_function, vul_lineno, function_params=None):
nodes = []
if isinstance(node.node, php.Block): # if语句中的sink点以及变量
analysis(node.node.nodes, vul_function, back_node, vul_lineno, function_params)
if node.else_ is not None: # else语句中的sink点以及变量
if isinstance(node.else_.node, php.Block):
analysis(node.else_.node.nodes, vul_function, back_node, vul_lineno, function_params)
if len(node.elseifs) != 0: # elseif语句中的sink点以及变量
for i_node in node.elseifs:
if i_node.node is not None:
if isinstance(i_node.node, php.Block):
analysis(i_node.node.nodes, vul_function, back_node, vul_lineno, function_params)
else:
nodes.append(i_node.node)
analysis(nodes, vul_function, back_node, vul_lineno, function_params)
def analysis_echo_print(node, back_node, vul_function, vul_lineno, function_params=None):
"""
处理echo/print类型节点-->判断节点类型-->不同If分支回溯判断参数是否可控-->输出结果
:param node:
:param back_node:
:param vul_function:
:param vul_lineno:
:param function_params:
:return:
"""
global scan_results
if int(vul_lineno) == int(node.lineno):
if isinstance(node, php.Print):
if isinstance(node.node, php.FunctionCall):
analysis_functioncall_node(node.node, back_node, vul_function, vul_lineno, function_params)
if isinstance(node.node, php.Variable) and vul_function == 'print': # 直接输出变量信息
analysis_variable_node(node.node, back_node, vul_function, vul_lineno, function_params)
if isinstance(node.node, php.BinaryOp) and vul_function == 'print':
analysis_binaryop_node(node.node, back_node, vul_function, vul_lineno, function_params)
if isinstance(node.node, php.ArrayOffset) and vul_function == 'print':
analysis_arrayoffset_node(node.node, vul_function, vul_lineno)
elif isinstance(node, php.Echo):
for k_node in node.nodes:
if isinstance(k_node, php.FunctionCall): # 判断节点中是否有函数调用节点
analysis_functioncall_node(k_node, back_node, vul_function, vul_lineno, function_params) # 将含有函数调用的节点进行分析
if isinstance(k_node, php.Variable) and vul_function == 'echo':
analysis_variable_node(k_node, back_node, vul_function, vul_lineno), function_params
if isinstance(k_node, php.BinaryOp) and vul_function == 'echo':
analysis_binaryop_node(k_node, back_node, vul_function, vul_lineno, function_params)
if isinstance(k_node, php.ArrayOffset) and vul_function == 'echo':
analysis_arrayoffset_node(k_node, vul_function, vul_lineno)
def analysis_eval(node, vul_function, back_node, vul_lineno, function_params=None):
"""
处理eval类型节点-->判断节点类型-->不同If分支回溯判断参数是否可控-->输出结果
:param node:
:param vul_function:
:param back_node:
:param vul_lineno:
:param function_params:
:return:
"""
global scan_results
if vul_function == 'eval' and int(node.lineno) == int(vul_lineno):
if isinstance(node.expr, php.Variable):
analysis_variable_node(node.expr, back_node, vul_function, vul_lineno, function_params)
if isinstance(node.expr, php.FunctionCall):
analysis_functioncall_node(node.expr, back_node, vul_function, vul_lineno, function_params)
if isinstance(node.expr, php.BinaryOp):
analysis_binaryop_node(node.expr, back_node, vul_function, vul_lineno, function_params)
if isinstance(node.expr, php.ArrayOffset):
analysis_arrayoffset_node(node.expr, vul_function, vul_lineno)
def analysis_file_inclusion(node, vul_function, back_node, vul_lineno, function_params=None):
"""
处理include/require类型节点-->判断节点类型-->不同If分支回溯判断参数是否可控-->输出结果
:param node:
:param vul_function:
:param back_node:
:param vul_lineno:
:param function_params:
:return:
"""
global scan_results
include_fs = ['include', 'include_once', 'require', 'require_once']
if vul_function in include_fs and int(node.lineno) == int(vul_lineno):
logger.debug('[AST-INCLUDE] {l}-->{r}'.format(l=vul_function, r=vul_lineno))
if isinstance(node.expr, php.Variable):
analysis_variable_node(node.expr, back_node, vul_function, vul_lineno, function_params)
if isinstance(node.expr, php.FunctionCall):
analysis_functioncall_node(node.expr, back_node, vul_function, vul_lineno, function_params)
if isinstance(node.expr, php.BinaryOp):
analysis_binaryop_node(node.expr, back_node, vul_function, vul_lineno, function_params)
if isinstance(node.expr, php.ArrayOffset):
analysis_arrayoffset_node(node.expr, vul_function, vul_lineno)
def set_scan_results(is_co, cp, expr_lineno, sink, param, vul_lineno):
"""
获取结果信息-->输出结果
:param is_co:
:param cp:
:param expr_lineno:
:param sink:
:param param:
:param vul_lineno:
:return:
"""
results = []
global scan_results
result = {
'code': is_co,
'source': cp,
'source_lineno': expr_lineno,
'sink': sink,
'sink_param:': param,
'sink_lineno': vul_lineno
}
if result['code'] != -1: # 查出来漏洞结果添加到结果信息中
results.append(result)
scan_results += results
def analysis(nodes, vul_function, back_node, vul_lineo, function_params=None):
"""
调用FunctionCall-->analysis_functioncall分析调用函数是否敏感
:param nodes: 所有节点
:param vul_function: 要判断的敏感函数名
:param back_node: 各种语法结构里面的语句
:param vul_lineo: 漏洞函数所在行号
:param function_params: 自定义函数的所有参数列表
:return:
"""
buffer_ = []
for node in nodes:
if isinstance(node, php.FunctionCall): # 函数直接调用,不进行赋值
anlysis_function(node, back_node, vul_function, function_params, vul_lineo)
elif isinstance(node, php.Assignment): # 函数调用在赋值表达式中
if isinstance(node.expr, php.FunctionCall):
anlysis_function(node.expr, back_node, vul_function, function_params, vul_lineo)
if isinstance(node.expr, php.Eval):
analysis_eval(node.expr, vul_function, back_node, vul_lineo, function_params)
if isinstance(node.expr, php.Silence):
buffer_.append(node.expr)
analysis(buffer_, vul_function, back_node, vul_lineo, function_params)
elif isinstance(node, php.Print) or isinstance(node, php.Echo):
analysis_echo_print(node, back_node, vul_function, vul_lineo, function_params)
elif isinstance(node, php.Silence):
nodes = get_silence_params(node)
analysis(nodes, vul_function, back_node, vul_lineo)
elif isinstance(node, php.Eval):
analysis_eval(node, vul_function, back_node, vul_lineo, function_params)
elif isinstance(node, php.Include) or isinstance(node, php.Require):
analysis_file_inclusion(node, vul_function, back_node, vul_lineo, function_params)
elif isinstance(node, php.If): # 函数调用在if-else语句中时
analysis_if_else(node, back_node, vul_function, vul_lineo, function_params)
elif isinstance(node, php.While) or isinstance(node, php.For): # 函数调用在循环中
if isinstance(node.node, php.Block):
analysis(node.node.nodes, vul_function, back_node, vul_lineo, function_params)
elif isinstance(node, php.Function) or isinstance(node, php.Method):
function_body = []
function_params = get_function_params(node.params)
analysis(node.nodes, vul_function, function_body, vul_lineo, function_params=function_params)
elif isinstance(node, php.Class):
analysis(node.nodes, vul_function, back_node, vul_lineo, function_params)
back_node.append(node)
def scan_parser(code_content, sensitive_func, vul_lineno, repair):
"""
开始检测函数
:param code_content: 要检测的文件内容
:param sensitive_func: 要检测的敏感函数,传入的为函数列表
:param vul_lineno: 漏洞函数所在行号
:param repair: 对应漏洞的修复函数列表
:return:
"""
try:
global repairs
global scan_results
repairs = repair
scan_results = []
parser = make_parser()
all_nodes = parser.parse(code_content, debug=False, lexer=lexer.clone(), tracking=with_line)
for func in sensitive_func: # 循环判断代码中是否存在敏感函数,若存在,递归判断参数是否可控;对文件内容循环判断多次
back_node = []
analysis(all_nodes, func, back_node, int(vul_lineno), function_params=None)
except SyntaxError as e:
logger.warning('[AST] [ERROR]:{e}'.format(e=e))
return scan_results
数据流分析基础知识
使用数据流分析进行漏洞挖掘一般知道4个关键词就可以了
sink
: 污点函数, 敏感函数, 比如PHP
:mysqli_query
,system
,shell_exec
,unserialize
Java
:executeSql
,GroovyShell.evaluate()
,Runtime.getRuntime().exec()
,unserialize
source
: 输入来源, 通常为用户可控的来源, 比如PHP
:$_GET
,$_POST
,$_REQUEST
,$_COOKIE
,$_FILES
,$_SERVER
,$HTTP_POST_FILES
,$HTTP_COOKIE_VARS
,$HTTP_REQUEST_VARS
,$HTTP_POST_VARS
,$HTTP_RAW_POST_DATA
,$HTTP_GET_VARS
JAVA
:request.getParameter
,request.getparametermap
repair
/sanitizer
: 修复函数/清理函数, 通常为恶意输入过滤, hash或者强制类型转换, 比如PHP
:md5
,addslashes
,mysqli_real_escape_string
,mysql_escape_string
Java
:Integer.parseInt
, Java中更多是开发者自己实现的函数, 例如某知名OA中的null2int
,getIntValue
DataFlow
: 数据流, 变量在代码中的传递路径, 是Static Analysis
中很重要的知识点, 这里先不考虑ControlFlow
了解了以上知识点, 结合Cobra
的PHP Parser
, 总结一下大概逻辑
- 定义
sink
,source
,repair
- 一组敏感函数
sensitive_func
, 例如mysqli_query
- 一组修复函数
repair
, 例如mysqli_real_escape_string
- 一组预置的可控输入
source
, 例如_GET
- 一组敏感函数
- 查找
mysqli_query
所在代码文件vul_file
的行数sink_lineno
-
Cobra
的逻辑是自上而下遍历PHP文件, 直到匹配vul_file
的sink_lineno
, 递归寻找变量传递过程, 是否能传达到可控输入source
(这里的source
也可以是函数定义的形参, 这样可以发现漏洞函数, 作为二次sink进行新的漏洞发掘) -
若果传递过程中没有经过修复函数
repair
的处理, 即可认为这里存在漏洞
实现Java的AST处理器
其实大部分语言到了AST层面, 结构都差不多, 到了IR阶段(Intermediate Representation
)就基本没有区别了
(很多代码审计软件都会先把源文件转换成IR再进行处理, 用AST其实一样处理, 只是IR更加通用, 常见的IR有三地址码形式)
所以从PHP的处理器到Java的处理器的基本功能是差不多实现的.
这里我们只需要把Java代码转换成AST的形式就足够满足需求了
Java AST解析器选择Python的javalang
库
安装方法: pip install javalang
这边我之前整理phply
和javalang
结构对照的表格, 可能有所疏漏, 但是基本覆盖了常用的一些对象
phply | javalang | 解释 | 可迭代/参数 | 类型递归 |
---|---|---|---|---|
php.Variable | MemberReference | 变量引用 member | ||
php.FunctionCall | MethodInvocation | 函数直接调用 member arguments | arguments | |
php.BinaryOp | BinaryOperation | 二元操作 operandl operandr operator | operandl operandr | |
ArrayInitializer | 数组初始化 | |||
php.ArrayOffset | ArraySelector | 数组赋值操作/不需要 | children | |
php.Block | BlockStatement | 一些局部语句块,{} statements | statements | |
php.Print | Java中应当没有,应该是函数调用sout | |||
php.Assignment | Assignment | 赋值语句 | expressionl | |
php.Eval | 这个java里没有,有就是 beanshell/jshell | |||
php.Silence | 准备执行函数调用而不显示错误消息 https://www.php.net/manual/en/internals2.opcodes.begin-silence.php | |||
php.Echo | Java中应当没有,应该是函数调用sout | |||
php.Include | import 暂不考虑 | |||
php.Require | import 暂不考虑 | |||
php.While | WhileStatement | body.statements condition | body.statements | |
php.For | ForStatement | body | ||
php.Function | MethodDeclaration | phply:函数名称 java没有 | body | |
php.Method | MethodDeclaration | phply:类名称与函数名称 java类方法 | body | |
php.Class | ClassDeclaration | 类定义 | body | |
php.Cast | Cast | 强制类型转换 $foo = (int) $bar; | ||
php.If | IfStatement | then_statement else_statement | then_statement else_statement | |
DoStatement | do{}While结构,基本等同While处理 | body.statements | ||
Statement | expression | |||
CompilationUnit | 整个树 | children[-1] | ||
StatementExpression | 是直接赋值给变量(没变量类型声明开头) (代指一行? | expression | ||
LocalVariableDeclaration | 声明变量且初始化 | declarators | declarators[0].initializer | |
This | 代指当前类/类变量也是This的实例 | |||
SwitchStatement | cases:[SwitchStatementCase] | |||
SwitchStatementCase | statements | |||
php.Block | BlockStatement | statements |
scan_parser
配置sink
, repair
启动扫描
def scan_parser(self, code_content, sensitive_func, vul_lineno, repair):
"""
先从 sensitive_func 中提取敏感函数 func 循环查询AST
->进入analysis中查询 vul_lineno 所在行的敏感函数调用
:param code_content: 要检测的文件内容
:param sensitive_func: 要检测的敏感函数,传入的为函数列表
:param vul_lineno: 漏洞函数所在行号
:param repair: 对应漏洞的修复函数列表
:return:
"""
try:
# global repairs
# global scan_results
self.repairs = repair
self.scan_results = []
tree = javalang.parse.parse(code_content)
all_nodes = tree.children[-1]
for func in sensitive_func: # 循环判断代码中是否存在敏感函数,若存在,递归判断参数是否可控;对文件内容循环判断多次
back_node = []
self.analysis(all_nodes, func, back_node, int(vul_lineno), function_params=None)
except SyntaxError as e:
print('[AST] [ERROR]:{e}'.format(e=e))
return self.scan_results
analysis
分析器主函数
def analysis(self, nodes, vul_function, back_node, vul_lineo, function_params=None):
"""
总体的思路是遍历所有节点且放入back_nodes中
-> 查找所有的 MethodInvocation 直到找到匹配 vul_lineo 的那一个
-> 然后在函数调用中查找出来涉及的变量
( anlysis_function 就是进入函数体进行敏感函数查找而已,可以优化 )
( analysis_functioncall_node 就是取出敏感函数的参数(变量)进行 parameters_back )
:param nodes: 所有节点
:param vul_function: 要判断的敏感函数名
:param back_node: 各种语法结构里面的语句
:param vul_lineo: 漏洞函数所在行号
:param function_params: 自定义函数的所有参数列表
:return:
"""
buffer_ = []
for node in nodes:
if isinstance(node, MethodInvocation):
# 从原文的意思看,这里是检测到函数调用,去找这个方法的MethodDeceleration,如果这个函数里面有敏感操作,就爆有问题
self.anlysis_function(node, back_node, vul_function, function_params, vul_lineo)
elif isinstance(node, StatementExpression):
if isinstance(node.expression, MethodInvocation):
self.anlysis_function(node.expression, back_node, vul_function, function_params, vul_lineo)
elif isinstance(node.expression, Assignment):
if isinstance(node.expression.value, MethodInvocation):
self.anlysis_function(node.expression.value, back_node, vul_function, function_params,
vul_lineo)
# todo 这里还有 binop 的操作
elif isinstance(node, LocalVariableDeclaration):
for declarator in node.declarators:
if isinstance(declarator.initializer, MethodInvocation):
self.anlysis_function(declarator.initializer, back_node, vul_function, function_params,
vul_lineo)
elif isinstance(node, IfStatement): # 函数调用在if-else语句中时
self.analysis_if_else(node, vul_function, back_node, vul_lineo, function_params)
elif isinstance(node, TryStatement): # 函数调用在try-catch-finally语句中时
# print(back_node)
self.analysis(node.block, vul_function, back_node, vul_lineo, function_params)
# analysis(node.catches, back_node, vul_function, vul_lineo, function_params)
# analysis(node.finally_block, back_node, vul_function, vul_lineo, function_params)
elif isinstance(node, WhileStatement):
self.analysis(node.body.statements, vul_function, back_node, vul_lineo, function_params)
elif isinstance(node, ForStatement):
if isinstance(node.body, BlockStatement):
self.analysis(node.body, vul_function, back_node, vul_lineo, function_params)
elif isinstance(node, MethodDeclaration):
function_body = [node]
function_params = self.get_function_params(node.parameters)
self.analysis(node.body, vul_function, function_body, vul_lineo, function_params=function_params)
elif isinstance(node, ClassDeclaration):
self.analysis(node.body, vul_function, back_node, vul_lineo, function_params)
# if back_node == "executeSql":
# print(back_node)
back_node.append(node)
anlysis_function
分析函数调用
def anlysis_function(self, node, back_node, vul_function, function_params, vul_lineno):
"""
对用户自定义的函数进行分析-->获取函数入参-->入参用经过赋值流程,进入sink函数-->此自定义函数为危险函数
最终目的是分析函数调用
:param node: 传入一个 MethodDeclaration 类型节点
:param back_node: 传入 back_nodes
:param vul_function: 存在漏洞的函数名
:param function_params: 函数的形参(从 MethodDeceleration 节点进来的话)
:param vul_lineno:
:return:
"""
global scan_results
# try:
if node.member == vul_function and int(node.position.line) == int(vul_lineno): # 函数体中存在敏感函数,开始对敏感函数前的代码进行检测
for param in node.arguments:
if isinstance(param, MemberReference):
self.analysis_variable_node(param, back_node, vul_function, vul_lineno, function_params)
elif isinstance(param, MethodInvocation):
self.analysis_functioncall_node(param, back_node, vul_function, vul_lineno, function_params)
elif isinstance(param, BinaryOperation):
self.analysis_binaryop_node(param, back_node, vul_function, vul_lineno, function_params)
# except Exception as e:
# print(e)
analysis_variable_node
分析变量节点
def analysis_variable_node(self, node, back_node, vul_function, vul_lineno, function_params=None):
"""
处理Variable类型节点-->取出参数-->回溯判断参数是否可控-->输出结果
这里直接将最后一步回溯到的变量写入全局结果表中,并不包含路径
:param node:
:param back_node:
:param vul_function:
:param vul_lineno:
:param function_params:
:return:
"""
# print('[AST] vul_function:{v}'.format(v=vul_function))
param = self.get_node_name(node)
is_co, cp, expr_lineno = self.parameters_back(param, back_node, function_params)
self.set_scan_results(is_co, cp, expr_lineno, vul_function, param, vul_lineno)
get_expr_name
获取赋值表达式中的参数名
def get_expr_name(self, node): # expr为'expr'中的值
"""
获取赋值表达式的表达式部分中的参数名(变量名)-->返回用来进行回溯
:param node: 输入一个节点(要求是一个表达式的右值), 检测表达式包含的所有变量
:return param_expr: 返回表达式中涉及的所有变量的列表 []
:return param_lineno: 返回当前表达式所在行 int
:return is_re: 返回是否已经修复 boolean
"""
# todo 这里有个坑. javalang有position缺失的情况.可能会发生变量回溯丢失
param_lineno = 0
is_re = False
param_expr = None
if isinstance(node, MemberReference): # 当赋值表达式为变量
param_expr = node.member # 返回变量名
param_lineno = node.position.line
elif isinstance(node, MethodInvocation): # 当赋值表达式为函数
param_expr = self.get_all_params(node.arguments) # 返回函数参数列表
param_lineno = node.position.line
# function_name = node.qualifier + "." + node.member
is_re = False
# 调用了函数,判断调用的函数是否为修复函数
for func in self.get_all_funcs(node):
if self.is_repair(func):
is_re = True
break
elif isinstance(node, BinaryOperation): # 当赋值表达式为BinaryOp
param_expr = self.get_binaryop_params(node)
# todo 需要修复javalang的 position 丢失的问题 这里先硬编码一下
# param_lineno = node.position.line
param_lineno = 7
elif isinstance(node, Assignment): # 当赋值表达式为Assignment
param_expr, param_lineno, is_re = self.get_expr_name(node.value)
# param_lineno = node.position.line
elif isinstance(node, This): # 当赋值表达式为 This
for selector in node.selectors:
param_expr, param_lineno, is_re = self.get_expr_name(selector)
if is_re:
return param_expr, param_lineno, is_re
else:
param_expr = node
# print(param_expr)
# print(param_expr)
return param_expr, param_lineno, is_re
get_node_name
获取变量节点的变量名
def get_node_name(self, node): # node为'node'中的元组
"""
获取MemberReference类型节点的name
:param node: 一般是MemberReference,字面量啥的不需要跟踪
:return: MemberReference.member
"""
if isinstance(node, MemberReference):
return node.member # 返回此节点中的变量名
elif isinstance(node, VariableDeclarator):
return node.name # 返回此节点中的变量名
parameters_back
实现变量回溯
def parameters_back(self, param, nodes, function_params=None, node_lineno=-1): # 用来得到回溯过程中的被赋值的变量是否与敏感函数变量相等,param是当前需要跟踪的污点
"""
递归回溯敏感函数的赋值流程,param为跟踪的污点,当找到param来源时-->分析复制表达式-->获取新污点;否则递归下一个节点
:param param: 输入一个变量名
:param nodes: nodes 也就是之前访问的back_nodes,里面基本都是LocalVariableDeclaration/StatementExpression/IFxxx
:param function_params: 递归过程中保持函数的形参,如果变量是从形参获得也认为可控
:return is_co, cp, expr_lineno: 可控返回1 , 可控的变量名, 变量所在行
"""
# node_lineno = -1
# print(node_lineno)
if len(nodes) > 0 and node_lineno == -1:
node_lineno = nodes[0].position.line # source所在行号
expr_lineno = 0
is_re = False
is_co, cp = self.is_controllable(param)
if len(nodes) != 0 and is_co == -1:
node = nodes[len(nodes) - 1]
# if isinstance(node, LocalVariableDeclaration):
tnodes = []
if isinstance(node, LocalVariableDeclaration): # 回溯的过程中,对出现赋值情况的节点进行跟踪
if isinstance(node, LocalVariableDeclaration):
tnodes = [[declarator, declarator.initializer] for declarator in node.declarators]
elif isinstance(node, StatementExpression):
if isinstance(node.expression, Assignment):
tnodes = [[node.expression.expressionl, node.expression.value]]
for left_var, right_var in tnodes:
param_node = self.get_node_name(left_var)
# param_expr为赋值表达式,param_expr为变量或者列表
param_expr, expr_lineno, is_re = self.get_expr_name(right_var)
if param == param_node and is_re is False and isinstance(right_var, MethodInvocation):
funcs = self.get_all_funcs(right_var)
# print(funcs)
if not is_re:
for func in funcs:
is_co, cp = self.is_controllable(func)
if is_co == 1:
return is_co, cp, expr_lineno
if param == param_node and is_re is True:
is_co = 0
cp = None
return is_co, cp, expr_lineno
if param == param_node and not isinstance(param_expr, list): # 找到变量的来源,开始继续分析变量的赋值表达式是否可控
is_co, cp = self.is_controllable(param_expr) # 开始判断变量是否可控
if is_co != 1:
is_co, cp = self.is_sink_function(param_expr, function_params)
param = param_expr # 每次找到一个污点的来源时,开始跟踪新污点,覆盖旧污点
if param == param_node and isinstance(param_expr, list):
for expr in param_expr:
param = expr
is_co, cp = self.is_controllable(expr)
if is_co == 1:
return is_co, cp, expr_lineno
_is_co, _cp, expr_lineno = self.parameters_back(param, nodes[:-1], function_params, node_lineno)
if _is_co != -1: # 当参数可控时,值赋给is_co 和 cp,有一个参数可控,则认定这个函数可能可控
is_co = _is_co
cp = _cp
if is_co == -1: # 当is_co为True时找到可控,停止递归
is_co, cp, expr_lineno = self.parameters_back(param, nodes[:-1], function_params, node_lineno) # 找到可控的输入时,停止递归
# 如果是变量来源在函数的形参中,其实需要获取到函数名/函数所在行
elif len(nodes) == 0 and function_params is not None:
for function_param in function_params:
if function_param == param:
is_co = 2
cp = function_param
expr_lineno = node_lineno
return is_co, cp, expr_lineno
analysis_functioncall_node
处理函数调用节点
def analysis_functioncall_node(self, node, back_node, vul_function, vul_lineno, function_params=None):
"""
处理FunctionCall类型节点-->取出参数-->回溯判断参数是否可控-->输出结果
:param node:
:param back_node:
:param vul_function:
:param vul_lineno:
:param function_params:
:return:
"""
# print('[AST] vul_function:{v}'.format(v=vul_function))
params = set(list(self.get_all_params(node.arguments)))
for param in params:
is_co, cp, expr_lineno = self.parameters_back(param, back_node, function_params)
self.set_scan_results(is_co, cp, expr_lineno, vul_function, param, vul_lineno)
get_function_params
提取函数的参数
def get_function_params(self, nodes):
"""
获取用户自定义函数的所有入参
:param nodes: 自定义函数的参数部分
:return params: 以列表的形式返回所有的入参
"""
params = []
for node in nodes:
if isinstance(node, FormalParameter):
params.append(node.name)
return list(set(params))
get_all_params
获取函数的参数列表
def get_all_params(self, nodes): # 用来获取调用函数的参数列表,nodes为参数列表
"""
获取函数结构的所有参数
:param nodes: 输入MethodInvocation.arguments 作为nodes
:return params: 返回这个函数参数列表中涉及的全部变量
"""
params = []
export_params = [] # 定义空列表,用来给export_list中使用
for node in nodes:
if isinstance(node, MethodInvocation): # 函数参数来自另一个函数的返回值
params = self.get_all_params(node.arguments)
else:
if isinstance(node, MemberReference):
params.append(node.member)
elif isinstance(node, BinaryOperation):
params = self.get_binaryop_params(node)
params = self.export_list(params, export_params)
return list(set(params))
get_all_funcs
获取节点下所有函数调用
def get_all_funcs(self, node, tmp=[]):
funcs = [node.member]
export_funcs = [] # 定义空列表,用来给export_list中使用
for node in node.arguments:
if isinstance(node, MethodInvocation): # 函数参数来自另一个函数的返回值
funcs.append(node.member)
funcs = list(self.export_list(funcs, export_funcs))
# if isinstance(node, MethodInvocation)
# return get_all_funcs(node)
return list(set(funcs))
analysis_binaryop_node
处理二元运算
def analysis_binaryop_node(self, node, back_node, vul_function, vul_lineno, function_params=None):
"""
处理BinaryOp类型节点-->取出参数-->回溯判断参数是否可控-->输出结果
:param node:
:param back_node:
:param vul_function:
:param vul_lineno:
:param function_params:
:return:
"""
# print('[AST] vul_function:{v}'.format(v=vul_function))
export_params = []
params = self.get_binaryop_params(node)
params = self.export_list(params, export_params)
for param in params:
is_co, cp, expr_lineno = self.parameters_back(param, back_node, function_params)
self.set_scan_results(is_co, cp, expr_lineno, vul_function, param, vul_lineno)
get_binaryop_deep_params
处理多层二元运算
def get_binaryop_deep_params(self, node, params): # 取出right,left不为变量时,对象结构中的变量
"""
递归取出深层的变量名
:param node: node为 get_binaryop_params 中的 node.operandl 或者 node.operandr 节点
:param params: 传进来之前的参数
:return params: 返回深层的参数列表
"""
if isinstance(node, BinaryOperation): # node为BinaryOp,递归取出其中变量
param = self.get_binaryop_params(node)
params.append(param)
if isinstance(node, MethodInvocation): # node为FunctionCall,递归取出其中变量名
params = self.get_all_params(node.arguments)
return params
get_binaryop_params
提取二元运算涉及的变量
def get_binaryop_params(self, node): # 当为BinaryOp类型时,分别对left和right进行处理,取出需要的变量
"""
用来提取Binaryop中的参数
:param node: 输入一个BinaryOperation节点
:return params: 返回当前节点涉及的变量列表
"""
# print('[AST] Binaryop --> {node}'.format(node=node))
params = []
buffer_ = []
if isinstance(node.operandl, MemberReference) or isinstance(node.operandr,
MemberReference): # left, right都为变量直接取值
if isinstance(node.operandl, MemberReference):
params.append(node.operandl.member)
if isinstance(node.operandr, MemberReference):
params.append(node.operandr.member)
if not isinstance(node.operandl, MemberReference) or not isinstance(node.operandr,
MemberReference): # right不为变量时
params_right = self.get_binaryop_deep_params(node.operandr, params)
params_left = self.get_binaryop_deep_params(node.operandl, params)
params = params_left + params_right
params = self.export_list(params, buffer_)
return params
analysis_if_else
分析判断语句
def analysis_if_else(self, node, vul_function, back_node, vul_lineno, function_params=None):
nodes = []
if isinstance(node.then_statement, BlockStatement):
self.analysis(node.then_statement.statements, vul_function, back_node, vul_lineno, function_params)
if isinstance(node.else_statement, BlockStatement):
self.analysis(node.else_statement.statements, vul_function, back_node, vul_lineno, function_params)
if isinstance(node.else_statement, IfStatement):
self.analysis_if_else(node.else_statement, vul_function, back_node, vul_lineno, function_params)
is_sink_function
判断函数入参是否进入
def is_sink_function(self, param_expr, function_params):
"""
判断指定函数函数的入参-->判断此函数是否是危险函数
:param param_expr: 传入一个变量名
:param function_params: 该函数的入参
:return: 如果该变量名在函数定义的入参中,也认为可控返回True
"""
is_co = -1
cp = None
if function_params is not None:
for function_param in function_params:
if param_expr == function_param:
is_co = 2
cp = function_param
# print('[AST] is_sink_function --> {function_param}'.format(function_param=cp))
return is_co, cp
is_controllable
判断复制表达式是否可控
def is_controllable(self, expr): # 获取表达式中的变量,看是否在用户可控变量列表中
"""
判断赋值表达式是否是用户可控的
:param expr: 传入一个函数名
:return 1, expr: 如果该函数是敏感函数就返回 1,函数名
"""
controlled_params = [
'getParameter'
# '$_GET',
# '$_POST',
# '$_REQUEST',
# '$_COOKIE',
# '$_FILES',
# '$_SERVER',
# '$HTTP_POST_FILES',
# '$HTTP_COOKIE_VARS',
# '$HTTP_REQUEST_VARS',
# '$HTTP_POST_VARS',
# '$HTTP_RAW_POST_DATA',
# '$HTTP_GET_VARS'
]
if expr in controlled_params:
# print('[AST] is_controllable --> {expr}'.format(expr=expr))
return 1, expr
return -1, None
is_repair
判断赋值表达式中是否有过滤函数
def is_repair(self, expr):
"""
判断赋值表达式是否出现过滤函数,如果已经过滤,停止污点回溯,判定漏洞已修复
:param expr: 这里应该是函数名称
:return is_re: 返回是否已经修复 boolean
"""
is_re = False # 是否修复,默认值是未修复
for repair in self.repairs:
if expr == repair:
is_re = True
return is_re
return is_re
def is_sink_function(self, param_expr, function_params):
"""
判断指定函数函数的入参-->判断此函数是否是危险函数
:param param_expr: 传入一个变量名
:param function_params: 该函数的入参
:return: 如果该变量名在函数定义的入参中,也认为可控返回True
"""
is_co = -1
cp = None
if function_params is not None:
for function_param in function_params:
if param_expr == function_param:
is_co = 2
cp = function_param
# print('[AST] is_sink_function --> {function_param}'.format(function_param=cp))
return is_co, cp
set_scan_results
存储结果
def set_scan_results(self, is_co, cp, expr_lineno, sink, param, vul_lineno):
"""
获取结果信息-->输出结果
:param is_co:
:param cp:
:param expr_lineno:
:param sink:
:param param:
:param vul_lineno:
:return:
"""
results = []
# global scan_results
result = {
'code': is_co,
'source': cp,
'source_lineno': expr_lineno,
'sink': sink,
'sink_param:': param,
'sink_lineno': vul_lineno
}
# for scan_result in scan_results:
# if
if result['code'] != -1: # 查出来漏洞结果添加到结果信息中
results.append(result)
self.scan_results += results
测试代码
测试文件
历史漏洞: 某知名OA e-cology WorkflowCenterTreeData前台接口SQL注入漏洞复现数据库小龙人-CSDN博客
java_src/_workflowcentertreedata__jsp.java
/*
* JSP generated by Resin-3.1.8 (built Mon, 17 Nov 2008 12:15:21 PST)
*/
package _jsp._mobile._browser;
import javax.servlet.*;
import javax.servlet.jsp.*;
import javax.servlet.http.*;
import org.json.*;
import weaver.general.Util;
import java.util.*;
import weaver.workflow.workflow.WorkTypeComInfo;
public class _workflowcentertreedata__jsp extends com.caucho.jsp.JavaPage {
private static final java.util.HashMap<String, java.lang.reflect.Method> _jsp_functionMap = new java.util.HashMap<String, java.lang.reflect.Method>();
private boolean _caucho_isDead;
public void
_jspService(javax.servlet.http.HttpServletRequest request,
javax.servlet.http.HttpServletResponse response)
throws java.io.IOException, javax.servlet.ServletException {
javax.servlet.http.HttpSession session = request.getSession(true);
com.caucho.server.webapp.WebApp _jsp_application = _caucho_getApplication();
javax.servlet.ServletContext application = _jsp_application;
com.caucho.jsp.PageContextImpl pageContext = _jsp_application.getJspApplicationContext().allocatePageContext(this, _jsp_application, request, response, null, session, 8192, true, false);
javax.servlet.jsp.PageContext _jsp_parentContext = pageContext;
javax.servlet.jsp.JspWriter out = pageContext.getOut();
final javax.el.ELContext _jsp_env = pageContext.getELContext();
javax.servlet.ServletConfig config = getServletConfig();
javax.servlet.Servlet page = this;
response.setContentType("application/x-json;charset=UTF-8");
request.setCharacterEncoding("UTF-8");
try {
out.write(_jsp_string0, 0, _jsp_string0.length);
weaver.conn.RecordSet rs;
rs = (weaver.conn.RecordSet) pageContext.getAttribute("rs");
if (rs == null) {
rs = new weaver.conn.RecordSet();
pageContext.setAttribute("rs", rs);
}
out.write(_jsp_string1, 0, _jsp_string1.length);
weaver.conn.RecordSet rsIn;
rsIn = (weaver.conn.RecordSet) pageContext.getAttribute("rsIn");
if (rsIn == null) {
rsIn = new weaver.conn.RecordSet();
pageContext.setAttribute("rsIn", rsIn);
}
out.write(_jsp_string2, 0, _jsp_string2.length);
String node = Util.null2String(request.getParameter("node"));
String arrNode[] = Util.TokenizerString2(node, "_");
String type = arrNode[0];
String value = arrNode[1];
String flowids = "";
ArrayList flowidList = new ArrayList();
String scope = Util.null2String(request.getParameter("scope"));
String initvalue = Util.null2String(request.getParameter("initvalue"));
String formids = Util.null2String(request.getParameter("formids"));
rs.executeSql("select * from mobileconfig where mc_type=5 and mc_scope=" + scope + " and mc_name='flowids' ");
if (rs.next()) {
flowids = Util.null2String(rs.getString("mc_value"));
}
if (initvalue != null && !"".equals(initvalue)) {
flowids += "," + initvalue;
flowidList = Util.TokenizerString(flowids, ",");
}
JSONArray jsonArrayReturn = new JSONArray();
if ("root".equals(type)) { //\u4e3b\u76ee\u5f55\u4e0b\u7684\u6570\u636e
WorkTypeComInfo wftc = new WorkTypeComInfo();
while (wftc.next()) {
JSONObject jsonTypeObj = null;
String wfTypeId = wftc.getWorkTypeid();
String wfTypeName = wftc.getWorkTypename();
//if("1".equals(wfTypeId)) continue;
rs.executeSql("select id,workflowname from workflow_base where isvalid='1' and workflowtype=" + wfTypeId + " and ( isbill=0 or (isbill=1 and formid<0) or (isbill=1 and formid in (" + formids + ")))");
while (rs.next()) {
jsonTypeObj = new JSONObject();
String wfId = Util.null2String(rs.getString("id"));
if (flowidList.contains(wfId)) {
jsonTypeObj.put("expanded", true);
break;
}
}
if (jsonTypeObj != null) {
jsonTypeObj.put("id", "wftype_" + wfTypeId);
jsonTypeObj.put("text", wfTypeName);
jsonTypeObj.put("checked", false);
jsonTypeObj.put("draggable", false);
jsonTypeObj.put("leaf", false);
jsonArrayReturn.put(jsonTypeObj);
}
}
} else if ("wftype".equals(type)) {
rs.executeSql("select id,workflowname from workflow_base where isvalid='1' and workflowtype=" + value + " and ( isbill=0 or (isbill=1 and formid<0) or (isbill=1 and formid in (" + formids + ")))");
while (rs.next()) {
JSONObject jsonWfObj = new JSONObject();
String wfId = Util.null2String(rs.getString("id"));
String wfName = Util.null2String(rs.getString("workflowname"));
jsonWfObj.put("id", "wf_" + wfId);
jsonWfObj.put("text", wfName);
jsonWfObj.put("draggable", false);
if (!flowidList.contains(wfId)) {
jsonWfObj.put("checked", false);
} else {
jsonWfObj.put("checked", true);
jsonWfObj.put("expanded", true);
}
jsonWfObj.put("leaf", true);
jsonArrayReturn.put(jsonWfObj);
}
}
out.println(jsonArrayReturn.toString());
out.write(_jsp_string1, 0, _jsp_string1.length);
} catch (java.lang.Throwable _jsp_e) {
pageContext.handlePageException(_jsp_e);
} finally {
_jsp_application.getJspApplicationContext().freePageContext(pageContext);
}
}
private java.util.ArrayList _caucho_depends = new java.util.ArrayList();
public java.util.ArrayList _caucho_getDependList() {
return _caucho_depends;
}
public void _caucho_addDepend(com.caucho.vfs.PersistentDependency depend) {
super._caucho_addDepend(depend);
com.caucho.jsp.JavaPage.addDepend(_caucho_depends, depend);
}
public boolean _caucho_isModified() {
if (_caucho_isDead)
return true;
if (com.caucho.server.util.CauchoSystem.getVersionId() != 1886798272571451039L)
return true;
for (int i = _caucho_depends.size() - 1; i >= 0; i--) {
com.caucho.vfs.Dependency depend;
depend = (com.caucho.vfs.Dependency) _caucho_depends.get(i);
if (depend.isModified())
return true;
}
return false;
}
public long _caucho_lastModified() {
return 0;
}
public java.util.HashMap<String, java.lang.reflect.Method> _caucho_getFunctionMap() {
return _jsp_functionMap;
}
public void init(ServletConfig config)
throws ServletException {
com.caucho.server.webapp.WebApp webApp
= (com.caucho.server.webapp.WebApp) config.getServletContext();
super.init(config);
com.caucho.jsp.TaglibManager manager = webApp.getJspApplicationContext().getTaglibManager();
com.caucho.jsp.PageContextImpl pageContext = new com.caucho.jsp.PageContextImpl(webApp, this);
}
public void destroy() {
_caucho_isDead = true;
super.destroy();
}
public void init(com.caucho.vfs.Path appDir)
throws javax.servlet.ServletException {
com.caucho.vfs.Path resinHome = com.caucho.server.util.CauchoSystem.getResinHome();
com.caucho.vfs.MergePath mergePath = new com.caucho.vfs.MergePath();
mergePath.addMergePath(appDir);
mergePath.addMergePath(resinHome);
com.caucho.loader.DynamicClassLoader loader;
loader = (com.caucho.loader.DynamicClassLoader) getClass().getClassLoader();
String resourcePath = loader.getResourcePathSpecificFirst();
mergePath.addClassPath(resourcePath);
com.caucho.vfs.Depend depend;
depend = new com.caucho.vfs.Depend(appDir.lookup("mobile/browser/WorkflowCenterTreeData.jsp"), -7926612934612916794L, false);
com.caucho.jsp.JavaPage.addDepend(_caucho_depends, depend);
}
private final static char[] _jsp_string0;
private final static char[] _jsp_string1;
private final static char[] _jsp_string2;
static {
_jsp_string0 = "\r\n\r\n\r\n\r\n\r\n\r\n".toCharArray();
_jsp_string1 = "\r\n".toCharArray();
_jsp_string2 = "\r\n\r\n".toCharArray();
}
}
分析代码
java_parser_class.py
# -*- coding: utf-8 -*-
import os
from functools import reduce
from javalang.parse import parse
from javalang.tree import *
import javalang
import copy
fp = open("res_test.txt", 'a+')
# fp.write("type\tfilename\tparam_line\tsink_line\n")
class JavaParse():
def __init__(self, filename):
self.filename = filename # r"java_src\_workflowcentertreedata__jsp.java"
self.src = open(self.filename, 'r', encoding='utf8', errors='ignore').read()
self.with_line = True
self.scan_results = [] # 结果存放列表初始化
self.repairs = [] # 用于存放修复函数
def export(self, items):
"""
#todo 暂时不知道干啥的,好像是用来打印的
:param items:
:return:
"""
result = []
if items:
for item in items:
if hasattr(item, 'generic'):
item = item.generic(with_lineno=self.with_line)
result.append(item)
return result
def export_list(self, params1, export_params1):
"""
将params中嵌套的多个列表,导出为一个列表
:param params: 输入一个嵌套类的参数列表
:param export_params: 要合并且输出的列表
:return export_params: 输出一个没有嵌套的列表
"""
params = copy.deepcopy(params1)
export_params = copy.deepcopy(export_params1)
# print(params)
# print(export_params)
for param in params:
if isinstance(param, list):
# print(1)
export_params = self.export_list(param, export_params)
else:
# print(2)
export_params.append(param)
# print(export_params)
# print("return")
return list(set(export_params))
def get_all_funcs(self, node, tmp=[]):
funcs = [node.member]
export_funcs = [] # 定义空列表,用来给export_list中使用
for node in node.arguments:
if isinstance(node, MethodInvocation): # 函数参数来自另一个函数的返回值
funcs.append(node.member)
funcs = list(self.export_list(funcs, export_funcs))
# if isinstance(node, MethodInvocation)
# return get_all_funcs(node)
return list(set(funcs))
# def get_all_funcs(node):
# funcs = [node.qualifier + "." + node.member]
# export_funcs = [] # 定义空列表,用来给export_list中使用
# for node in node.arguments:
# if isinstance(node, MethodInvocation): # 函数参数来自另一个函数的返回值
# funcs.append(node.qualifier + "." + node.member)
# funcs = export_list(funcs, export_funcs)
# # return get_all_funcs(node)
# return funcs
def get_all_params(self, nodes): # 用来获取调用函数的参数列表,nodes为参数列表
"""
获取函数结构的所有参数
:param nodes: 输入MethodInvocation.arguments 作为nodes
:return params: 返回这个函数参数列表中涉及的全部变量
"""
params = []
export_params = [] # 定义空列表,用来给export_list中使用
for node in nodes:
if isinstance(node, MethodInvocation): # 函数参数来自另一个函数的返回值
params = self.get_all_params(node.arguments)
else:
if isinstance(node, MemberReference):
params.append(node.member)
elif isinstance(node, BinaryOperation):
params = self.get_binaryop_params(node)
params = self.export_list(params, export_params)
return list(set(params))
def get_binaryop_params(self, node): # 当为BinaryOp类型时,分别对left和right进行处理,取出需要的变量
"""
用来提取Binaryop中的参数
:param node: 输入一个BinaryOperation节点
:return params: 返回当前节点涉及的变量列表
"""
# print('[AST] Binaryop --> {node}'.format(node=node))
params = []
buffer_ = []
if isinstance(node.operandl, MemberReference) or isinstance(node.operandr,
MemberReference): # left, right都为变量直接取值
if isinstance(node.operandl, MemberReference):
params.append(node.operandl.member)
if isinstance(node.operandr, MemberReference):
params.append(node.operandr.member)
if not isinstance(node.operandl, MemberReference) or not isinstance(node.operandr,
MemberReference): # right不为变量时
params_right = self.get_binaryop_deep_params(node.operandr, params)
params_left = self.get_binaryop_deep_params(node.operandl, params)
params = params_left + params_right
params = self.export_list(params, buffer_)
return params
def get_binaryop_deep_params(self, node, params): # 取出right,left不为变量时,对象结构中的变量
"""
递归取出深层的变量名
:param node: node为 get_binaryop_params 中的 node.operandl 或者 node.operandr 节点
:param params: 传进来之前的参数
:return params: 返回深层的参数列表
"""
if isinstance(node, BinaryOperation): # node为BinaryOp,递归取出其中变量
param = self.get_binaryop_params(node)
params.append(param)
if isinstance(node, MethodInvocation): # node为FunctionCall,递归取出其中变量名
params = self.get_all_params(node.arguments)
return params
# todo
def get_expr_name(self, node): # expr为'expr'中的值
"""
获取赋值表达式的表达式部分中的参数名(变量名)-->返回用来进行回溯
:param node: 输入一个节点(要求是一个表达式的右值), 检测表达式包含的所有变量
:return param_expr: 返回表达式中涉及的所有变量的列表 []
:return param_lineno: 返回当前表达式所在行 int
:return is_re: 返回是否已经修复 boolean
"""
# todo 这里有个坑. javalang有position缺失的情况.可能会发生变量回溯丢失
param_lineno = 0
is_re = False
param_expr = None
if isinstance(node, MemberReference): # 当赋值表达式为变量
param_expr = node.member # 返回变量名
param_lineno = node.position.line
elif isinstance(node, MethodInvocation): # 当赋值表达式为函数
param_expr = self.get_all_params(node.arguments) # 返回函数参数列表
param_lineno = node.position.line
# function_name = node.qualifier + "." + node.member
is_re = False
# 调用了函数,判断调用的函数是否为修复函数
for func in self.get_all_funcs(node):
if self.is_repair(func):
is_re = True
break
elif isinstance(node, BinaryOperation): # 当赋值表达式为BinaryOp
param_expr = self.get_binaryop_params(node)
# todo 需要修复javalang的 position 丢失的问题 这里先硬编码一下
# param_lineno = node.position.line
param_lineno = 7
elif isinstance(node, Assignment): # 当赋值表达式为Assignment
param_expr, param_lineno, is_re = self.get_expr_name(node.value)
# param_lineno = node.position.line
elif isinstance(node, This): # 当赋值表达式为 This
for selector in node.selectors:
param_expr, param_lineno, is_re = self.get_expr_name(selector)
if is_re:
return param_expr, param_lineno, is_re
else:
param_expr = node
# print(param_expr)
# print(param_expr)
return param_expr, param_lineno, is_re
def get_node_name(self, node): # node为'node'中的元组
"""
获取MemberReference类型节点的name
:param node: 一般是MemberReference,字面量啥的不需要跟踪
:return: MemberReference.member
"""
if isinstance(node, MemberReference):
return node.member # 返回此节点中的变量名
elif isinstance(node, VariableDeclarator):
return node.name # 返回此节点中的变量名
def is_repair(self, expr):
"""
判断赋值表达式是否出现过滤函数,如果已经过滤,停止污点回溯,判定漏洞已修复
:param expr: 这里应该是函数名称
:return is_re: 返回是否已经修复 boolean
"""
is_re = False # 是否修复,默认值是未修复
for repair in self.repairs:
if expr == repair:
is_re = True
return is_re
return is_re
def is_sink_function(self, param_expr, function_params):
"""
判断指定函数函数的入参-->判断此函数是否是危险函数
:param param_expr: 传入一个变量名
:param function_params: 该函数的入参
:return: 如果该变量名在函数定义的入参中,也认为可控返回True
"""
is_co = -1
cp = None
if function_params is not None:
for function_param in function_params:
if param_expr == function_param:
is_co = 2
cp = function_param
# print('[AST] is_sink_function --> {function_param}'.format(function_param=cp))
return is_co, cp
def is_controllable(self, expr): # 获取表达式中的变量,看是否在用户可控变量列表中
"""
判断赋值表达式是否是用户可控的
:param expr: 传入一个函数名
:return 1, expr: 如果该函数是敏感函数就返回 1,函数名
"""
controlled_params = [
'getParameter'
# '$_GET',
# '$_POST',
# '$_REQUEST',
# '$_COOKIE',
# '$_FILES',
# '$_SERVER',
# '$HTTP_POST_FILES',
# '$HTTP_COOKIE_VARS',
# '$HTTP_REQUEST_VARS',
# '$HTTP_POST_VARS',
# '$HTTP_RAW_POST_DATA',
# '$HTTP_GET_VARS'
]
if expr in controlled_params:
# print('[AST] is_controllable --> {expr}'.format(expr=expr))
return 1, expr
return -1, None
def parameters_back(self, param, nodes, function_params=None, node_lineno=-1): # 用来得到回溯过程中的被赋值的变量是否与敏感函数变量相等,param是当前需要跟踪的污点
"""
递归回溯敏感函数的赋值流程,param为跟踪的污点,当找到param来源时-->分析复制表达式-->获取新污点;否则递归下一个节点
:param param: 输入一个变量名
:param nodes: nodes 也就是之前访问的back_nodes,里面基本都是LocalVariableDeclaration/StatementExpression/IFxxx
:param function_params: 递归过程中保持函数的形参,如果变量是从形参获得也认为可控
:return is_co, cp, expr_lineno: 可控返回1 , 可控的变量名, 变量所在行
"""
# node_lineno = -1
# print(node_lineno)
if len(nodes) > 0 and node_lineno == -1:
node_lineno = nodes[0].position.line # source所在行号
expr_lineno = 0
is_re = False
is_co, cp = self.is_controllable(param)
if len(nodes) != 0 and is_co == -1:
node = nodes[len(nodes) - 1]
# if isinstance(node, LocalVariableDeclaration):
tnodes = []
if isinstance(node, LocalVariableDeclaration): # 回溯的过程中,对出现赋值情况的节点进行跟踪
if isinstance(node, LocalVariableDeclaration):
tnodes = [[declarator, declarator.initializer] for declarator in node.declarators]
elif isinstance(node, StatementExpression):
if isinstance(node.expression, Assignment):
tnodes = [[node.expression.expressionl, node.expression.value]]
for left_var, right_var in tnodes:
param_node = self.get_node_name(left_var)
# param_expr为赋值表达式,param_expr为变量或者列表
param_expr, expr_lineno, is_re = self.get_expr_name(right_var)
if param == param_node and is_re is False and isinstance(right_var, MethodInvocation):
funcs = self.get_all_funcs(right_var)
# print(funcs)
if not is_re:
for func in funcs:
is_co, cp = self.is_controllable(func)
if is_co == 1:
return is_co, cp, expr_lineno
if param == param_node and is_re is True:
is_co = 0
cp = None
return is_co, cp, expr_lineno
if param == param_node and not isinstance(param_expr, list): # 找到变量的来源,开始继续分析变量的赋值表达式是否可控
is_co, cp = self.is_controllable(param_expr) # 开始判断变量是否可控
if is_co != 1:
is_co, cp = self.is_sink_function(param_expr, function_params)
param = param_expr # 每次找到一个污点的来源时,开始跟踪新污点,覆盖旧污点
if param == param_node and isinstance(param_expr, list):
for expr in param_expr:
param = expr
is_co, cp = self.is_controllable(expr)
if is_co == 1:
return is_co, cp, expr_lineno
_is_co, _cp, expr_lineno = self.parameters_back(param, nodes[:-1], function_params, node_lineno)
if _is_co != -1: # 当参数可控时,值赋给is_co 和 cp,有一个参数可控,则认定这个函数可能可控
is_co = _is_co
cp = _cp
if is_co == -1: # 当is_co为True时找到可控,停止递归
is_co, cp, expr_lineno = self.parameters_back(param, nodes[:-1], function_params, node_lineno) # 找到可控的输入时,停止递归
# 如果是变量来源在函数的形参中,其实需要获取到函数名/函数所在行
elif len(nodes) == 0 and function_params is not None:
for function_param in function_params:
if function_param == param:
is_co = 2
cp = function_param
expr_lineno = node_lineno
return is_co, cp, expr_lineno
def get_function_params(self, nodes):
"""
获取用户自定义函数的所有入参
:param nodes: 自定义函数的参数部分
:return params: 以列表的形式返回所有的入参
"""
params = []
for node in nodes:
if isinstance(node, FormalParameter):
params.append(node.name)
return list(set(params))
def anlysis_function(self, node, back_node, vul_function, function_params, vul_lineno):
"""
对用户自定义的函数进行分析-->获取函数入参-->入参用经过赋值流程,进入sink函数-->此自定义函数为危险函数
最终目的是分析函数调用
:param node: 传入一个 MethodDeclaration 类型节点
:param back_node: 传入 back_nodes
:param vul_function: 存在漏洞的函数名
:param function_params: 函数的形参(从 MethodDeceleration 节点进来的话)
:param vul_lineno:
:return:
"""
global scan_results
# try:
if node.member == vul_function and int(node.position.line) == int(vul_lineno): # 函数体中存在敏感函数,开始对敏感函数前的代码进行检测
for param in node.arguments:
if isinstance(param, MemberReference):
self.analysis_variable_node(param, back_node, vul_function, vul_lineno, function_params)
elif isinstance(param, MethodInvocation):
self.analysis_functioncall_node(param, back_node, vul_function, vul_lineno, function_params)
elif isinstance(param, BinaryOperation):
self.analysis_binaryop_node(param, back_node, vul_function, vul_lineno, function_params)
# except Exception as e:
# print(e)
def analysis_binaryop_node(self, node, back_node, vul_function, vul_lineno, function_params=None):
"""
处理BinaryOp类型节点-->取出参数-->回溯判断参数是否可控-->输出结果
:param node:
:param back_node:
:param vul_function:
:param vul_lineno:
:param function_params:
:return:
"""
# print('[AST] vul_function:{v}'.format(v=vul_function))
export_params = []
params = self.get_binaryop_params(node)
params = self.export_list(params, export_params)
for param in params:
is_co, cp, expr_lineno = self.parameters_back(param, back_node, function_params)
self.set_scan_results(is_co, cp, expr_lineno, vul_function, param, vul_lineno)
def analysis_functioncall_node(self, node, back_node, vul_function, vul_lineno, function_params=None):
"""
处理FunctionCall类型节点-->取出参数-->回溯判断参数是否可控-->输出结果
:param node:
:param back_node:
:param vul_function:
:param vul_lineno:
:param function_params:
:return:
"""
# print('[AST] vul_function:{v}'.format(v=vul_function))
params = set(list(self.get_all_params(node.arguments)))
for param in params:
is_co, cp, expr_lineno = self.parameters_back(param, back_node, function_params)
self.set_scan_results(is_co, cp, expr_lineno, vul_function, param, vul_lineno)
def analysis_variable_node(self, node, back_node, vul_function, vul_lineno, function_params=None):
"""
处理Variable类型节点-->取出参数-->回溯判断参数是否可控-->输出结果
这里直接将最后一步回溯到的变量写入全局结果表中,并不包含路径
:param node:
:param back_node:
:param vul_function:
:param vul_lineno:
:param function_params:
:return:
"""
# print('[AST] vul_function:{v}'.format(v=vul_function))
param = self.get_node_name(node)
is_co, cp, expr_lineno = self.parameters_back(param, back_node, function_params)
self.set_scan_results(is_co, cp, expr_lineno, vul_function, param, vul_lineno)
def analysis_if_else(self, node, vul_function, back_node, vul_lineno, function_params=None):
nodes = []
if isinstance(node.then_statement, BlockStatement):
self.analysis(node.then_statement.statements, vul_function, back_node, vul_lineno, function_params)
if isinstance(node.else_statement, BlockStatement):
self.analysis(node.else_statement.statements, vul_function, back_node, vul_lineno, function_params)
if isinstance(node.else_statement, IfStatement):
self.analysis_if_else(node.else_statement, vul_function, back_node, vul_lineno, function_params)
def set_scan_results(self, is_co, cp, expr_lineno, sink, param, vul_lineno):
"""
获取结果信息-->输出结果
:param is_co:
:param cp:
:param expr_lineno:
:param sink:
:param param:
:param vul_lineno:
:return:
"""
results = []
# global scan_results
result = {
'code': is_co,
'source': cp,
'source_lineno': expr_lineno,
'sink': sink,
'sink_param:': param,
'sink_lineno': vul_lineno
}
# for scan_result in scan_results:
# if
if result['code'] != -1: # 查出来漏洞结果添加到结果信息中
results.append(result)
self.scan_results += results
def analysis(self, nodes, vul_function, back_node, vul_lineo, function_params=None):
"""
总体的思路是遍历所有节点且放入back_nodes中
-> 查找所有的 MethodInvocation 直到找到匹配 vul_lineo 的那一个
-> 然后在函数调用中查找出来涉及的变量
( anlysis_function 就是进入函数体进行敏感函数查找而已,可以优化 )
( analysis_functioncall_node 就是取出敏感函数的参数(变量)进行 parameters_back )
:param nodes: 所有节点
:param vul_function: 要判断的敏感函数名
:param back_node: 各种语法结构里面的语句
:param vul_lineo: 漏洞函数所在行号
:param function_params: 自定义函数的所有参数列表
:return:
"""
buffer_ = []
for node in nodes:
if isinstance(node, MethodInvocation):
# 从原文的意思看,这里是检测到函数调用,去找这个方法的MethodDeceleration,如果这个函数里面有敏感操作,就爆有问题
self.anlysis_function(node, back_node, vul_function, function_params, vul_lineo)
elif isinstance(node, StatementExpression):
if isinstance(node.expression, MethodInvocation):
self.anlysis_function(node.expression, back_node, vul_function, function_params, vul_lineo)
elif isinstance(node.expression, Assignment):
if isinstance(node.expression.value, MethodInvocation):
self.anlysis_function(node.expression.value, back_node, vul_function, function_params,
vul_lineo)
# todo 这里还有 binop 的操作
elif isinstance(node, LocalVariableDeclaration):
for declarator in node.declarators:
if isinstance(declarator.initializer, MethodInvocation):
self.anlysis_function(declarator.initializer, back_node, vul_function, function_params,
vul_lineo)
elif isinstance(node, IfStatement): # 函数调用在if-else语句中时
self.analysis_if_else(node, vul_function, back_node, vul_lineo, function_params)
elif isinstance(node, TryStatement): # 函数调用在try-catch-finally语句中时
# print(back_node)
self.analysis(node.block, vul_function, back_node, vul_lineo, function_params)
# analysis(node.catches, back_node, vul_function, vul_lineo, function_params)
# analysis(node.finally_block, back_node, vul_function, vul_lineo, function_params)
elif isinstance(node, WhileStatement):
self.analysis(node.body.statements, vul_function, back_node, vul_lineo, function_params)
elif isinstance(node, ForStatement):
if isinstance(node.body, BlockStatement):
self.analysis(node.body, vul_function, back_node, vul_lineo, function_params)
elif isinstance(node, MethodDeclaration):
function_body = [node]
function_params = self.get_function_params(node.parameters)
self.analysis(node.body, vul_function, function_body, vul_lineo, function_params=function_params)
elif isinstance(node, ClassDeclaration):
self.analysis(node.body, vul_function, back_node, vul_lineo, function_params)
# if back_node == "executeSql":
# print(back_node)
back_node.append(node)
def scan_parser(self, code_content, sensitive_func, vul_lineno, repair):
"""
先从 sensitive_func 中提取敏感函数 func 循环查询AST
->进入analysis中查询 vul_lineno 所在行的敏感函数调用
:param code_content: 要检测的文件内容
:param sensitive_func: 要检测的敏感函数,传入的为函数列表
:param vul_lineno: 漏洞函数所在行号
:param repair: 对应漏洞的修复函数列表
:return:
"""
try:
# global repairs
# global scan_results
self.repairs = repair
self.scan_results = []
tree = javalang.parse.parse(code_content)
all_nodes = tree.children[-1]
for func in sensitive_func: # 循环判断代码中是否存在敏感函数,若存在,递归判断参数是否可控;对文件内容循环判断多次
back_node = []
self.analysis(all_nodes, func, back_node, int(vul_lineno), function_params=None)
except SyntaxError as e:
print('[AST] [ERROR]:{e}'.format(e=e))
return self.scan_results
def run(self):
code_lines = self.src.split('\n')
run_function = lambda x, y: x if y in x else x + [y]
for i in range(code_lines.__len__()):
line = code_lines[i]
if 'executeSql' in line:
print("*" * 50)
print("executeSql in " + self.filename + ":" + str(i + 1))
res = self.scan_parser(self.src, ['executeSql'], i + 1, ['null2int', 'getIntValue'])
res = reduce(run_function, [[], ] + res)
print(res)
for x in res:
print("##" * 20 + "found sqli in " + self.filename + "##" * 20)
if x['code'] > 0:
sink_line = x['sink_lineno'] - 1
source_lineno = x['source_lineno'] - 1
print("注入参数: ", x['source_lineno'], " | ", code_lines[source_lineno].strip(" \t"))
print("------------>")
print("注入点: ", x['sink_lineno'], " | ", code_lines[sink_line].strip(" \t"))
record = "%d\t%s\t%d\t%d\t%s\n" % (x['code'], self.filename, x['source_lineno'], x['sink_lineno'], code_lines[source_lineno].strip(" \t"))
fp.write(record)
fp.flush()
print("\n")
import sys
import time
t = time.time()
if __name__ == '__main__':
filename = "java_src/Sqli.java"
filename = r"java_src/_workflowcentertreedata__jsp.java"
# filename = sys.argv[1]
print(filename)
a = JavaParse(filename)
a.run()
print(time.time() - t)
# fp = open("res.txt", 'a+')
分析结果
可以很明显的看出, 存在如下注入点
_workflowcentertreedata__jsp.java
-> /mobile/browser/WorkflowCenterTreeData.jsp
- 注入参数: line: 62 |
String scope = Util.null2String(request.getParameter("scope"));
-
注入参数: line: 64 |
String formids = Util.null2String(request.getParameter("formids"));
- 注入参数: line: 54 |
String node = Util.null2String(request.getParameter("node"));
java_src/_workflowcentertreedata__jsp.java
**************************************************
executeSql in java_src/_workflowcentertreedata__jsp.java:66
[{'code': 1, 'source': 'getParameter', 'source_lineno': 62, 'sink': 'executeSql', 'sink_param:': 'scope', 'sink_lineno': 66}]
########################################found sqli in java_src/_workflowcentertreedata__jsp.java########################################
注入参数: 62 | String scope = Util.null2String(request.getParameter("scope"));
------------>
注入点: 66 | rs.executeSql("select * from mobileconfig where mc_type=5 and mc_scope=" + scope + " and mc_name='flowids' ");
**************************************************
executeSql in java_src/_workflowcentertreedata__jsp.java:85
[{'code': 1, 'source': 'getParameter', 'source_lineno': 64, 'sink': 'executeSql', 'sink_param:': 'formids', 'sink_lineno': 85}]
########################################found sqli in java_src/_workflowcentertreedata__jsp.java########################################
注入参数: 64 | String formids = Util.null2String(request.getParameter("formids"));
------------>
注入点: 85 | rs.executeSql("select id,workflowname from workflow_base where isvalid='1' and workflowtype=" + wfTypeId + " and ( isbill=0 or (isbill=1 and formid<0) or (isbill=1 and formid in (" + formids + ")))");
**************************************************
executeSql in java_src/_workflowcentertreedata__jsp.java:105
[{'code': 1, 'source': 'getParameter', 'source_lineno': 64, 'sink': 'executeSql', 'sink_param:': 'formids', 'sink_lineno': 105}, {'code': 1, 'source': 'getParameter', 'source_lineno': 54, 'sink': 'executeSql', 'sink_param:': 'value', 'sink_lineno': 105}]
########################################found sqli in java_src/_workflowcentertreedata__jsp.java########################################
注入参数: 64 | String formids = Util.null2String(request.getParameter("formids"));
------------>
注入点: 105 | rs.executeSql("select id,workflowname from workflow_base where isvalid='1' and workflowtype=" + value + " and ( isbill=0 or (isbill=1 and formid<0) or (isbill=1 and formid in (" + formids + ")))");
########################################found sqli in java_src/_workflowcentertreedata__jsp.java########################################
注入参数: 54 | String node = Util.null2String(request.getParameter("node"));
------------>
注入点: 105 | rs.executeSql("select id,workflowname from workflow_base where isvalid='1' and workflowtype=" + value + " and ( isbill=0 or (isbill=1 and formid<0) or (isbill=1 and formid in (" + formids + ")))");
0.2094409465789795
总体分析结果
过滤后结果
结合前台访问响应码为200的jsp文件列表, 且直接为注入点, 不包含二次sink
注入的注入点, 一个文件多个注入点没有去重, 共计160处注入点
手工构造注入EXP
经过手工构造注入, 去掉某知名OA中表不存在
, del语句注入
, 同一个文件不同注入点
, 剩余48个成功EXP
PS. 由于漏洞过多, /weaver/接口下面映射Servlet就没有再继续分析, 欢迎一起研究自动化代码审计
优缺点分析
优点
1.相比正则匹配漏洞, 通过遍历AST抽象语法树的形式, 能够获得代码中的上下文关系, 可以更准确的定位漏洞
2.操作AST语法树, 可以更灵活的进行代码分析, 格式化的代码可以更好的为其他分析手段提供支撑, 比如机器学习分析AST/CFG/IR
缺点
1.AST处理的性能消耗较大
2.目前的代码不能很好的跨文件处理, 仅限于单个文件, 虽然有办法可以二次解析
3.目前没有覆盖所有的Java Token, 存在遍历对象缺失的情况
4.AST所包含的信息维度不够, 编写代码难度不小, 也不够通用, 一个引擎只能分析一种语言
5.市面上的这类工具已经不少了: Fortify
,CheckMarx
, SonarQube
, Codeql
, Joern
效果各有千秋, 但绝不是银弹
本文只是
Static Analysis
的一次浅显尝试, 虽说效果不错, 能看出来有很多地方写的很粗糙, 后面会使用更先进的技术改善这里的缺点.
Static Analysis
不是银弹, 也有着自己的局限性, 也不能全指望着Static Analysis
能够覆盖所有的漏洞点, 毕竟一个即Sound
又Complete
的分析是不存在的.
来源:freebuf.com 2020-05-20 19:52:43 by: 斗象智能安全平台
请登录后发表评论
注册