|
|
|
#python代码动态执行
|
|
|
|
import ast
|
|
|
|
import subprocess
|
|
|
|
import json
|
|
|
|
import builtins
|
|
|
|
import re
|
|
|
|
import paramiko
|
|
|
|
import impacket
|
|
|
|
from tools.ToolBase import ToolBase
|
|
|
|
|
|
|
|
class PythoncodeTool(ToolBase):
|
|
|
|
|
|
|
|
def is_safe_code(self,code):
|
|
|
|
# List of high-risk functions to block (can be adjusted based on requirements)
|
|
|
|
HIGH_RISK_FUNCTIONS = ['eval', 'exec', 'os.system', 'subprocess.call', 'subprocess.Popen']
|
|
|
|
|
|
|
|
"""Check if the code contains high-risk function calls."""
|
|
|
|
try:
|
|
|
|
tree = ast.parse(code)
|
|
|
|
for node in ast.walk(tree):
|
|
|
|
if isinstance(node, ast.Call):
|
|
|
|
if isinstance(node.func, ast.Name) and node.func.id in HIGH_RISK_FUNCTIONS:
|
|
|
|
return False
|
|
|
|
elif isinstance(node.func, ast.Attribute) and node.func.attr in HIGH_RISK_FUNCTIONS:
|
|
|
|
return False
|
|
|
|
return True
|
|
|
|
except SyntaxError:
|
|
|
|
return False
|
|
|
|
|
|
|
|
def validate_instruction(self, instruction):
|
|
|
|
#指令过滤
|
|
|
|
timeout = 0
|
|
|
|
instr = instruction.replace("python_code ","")
|
|
|
|
instr = instruction.replace("python-code ", "")
|
|
|
|
# Safety check
|
|
|
|
if not self.is_safe_code(instr):
|
|
|
|
return "", timeout
|
|
|
|
return instr,timeout
|
|
|
|
|
|
|
|
def safe_import(self,name,*args,**kwargs):
|
|
|
|
ALLOWED_MODULES = ['subprocess', 'json','re']
|
|
|
|
if name not in ALLOWED_MODULES:
|
|
|
|
raise ImportError(f"Import of '{name}' is not allowed")
|
|
|
|
return builtins.__import__(name, *args, **kwargs)
|
|
|
|
|
|
|
|
def execute_instruction(self, instruction_old):
|
|
|
|
'''
|
|
|
|
执行指令:验证合法性 -> 执行 -> 分析结果
|
|
|
|
:param instruction_old:
|
|
|
|
:return:
|
|
|
|
bool:true-正常返回给大模型,false-结果不返回给大模型
|
|
|
|
str:执行的指令
|
|
|
|
str:执行指令的结果
|
|
|
|
'''
|
|
|
|
ext_params = self.create_extparams()
|
|
|
|
|
|
|
|
# 定义允许的内置函数集合 --白名单
|
|
|
|
allowed_builtins = {
|
|
|
|
'__import__': builtins.__import__,
|
|
|
|
"abs": abs,
|
|
|
|
"all": all,
|
|
|
|
"any": any,
|
|
|
|
"bool": bool,
|
|
|
|
"chr": chr,
|
|
|
|
"dict": dict,
|
|
|
|
"float": float,
|
|
|
|
"int": int,
|
|
|
|
"len": len,
|
|
|
|
"list": list,
|
|
|
|
"max": max,
|
|
|
|
"min": min,
|
|
|
|
"print": print,
|
|
|
|
"range": range,
|
|
|
|
"set": set,
|
|
|
|
"str": str,
|
|
|
|
"sum": sum,
|
|
|
|
"type": type,
|
|
|
|
'open':open,
|
|
|
|
'Exception':Exception,
|
|
|
|
# 根据需要可以添加其他安全的内置函数
|
|
|
|
}
|
|
|
|
# 第一步:验证指令合法性
|
|
|
|
instruction,time_out = self.validate_instruction(instruction_old)
|
|
|
|
if not instruction:
|
|
|
|
return False, instruction_old, "该指令暂不执行!","",ext_params
|
|
|
|
# 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#?
|
|
|
|
|
|
|
|
# 第二步:执行指令
|
|
|
|
output = ""
|
|
|
|
try:
|
|
|
|
# 构造安全的全局命名空间,只包含我们允许的 __builtins__
|
|
|
|
# 虽然动态代码中包含了import subprocess,但是还是需要在全局命名空间中添加subprocess这些库
|
|
|
|
# 正常情况应该是不需要的,后续再研究
|
|
|
|
safe_globals = {"__builtins__": allowed_builtins,
|
|
|
|
'subprocess':subprocess,
|
|
|
|
'json':json,
|
|
|
|
're':re,
|
|
|
|
'paramiko':paramiko,
|
|
|
|
'impacket':impacket}
|
|
|
|
safe_locals = {} #不需要预设局部参数
|
|
|
|
# 在限制环境中执行代码
|
|
|
|
exec(instruction, safe_globals,safe_locals)
|
|
|
|
# Check if check_samba_vuln is defined
|
|
|
|
if 'dynamic_fun' not in safe_locals:
|
|
|
|
analysis = "Function dynamic_fun() is not defined"
|
|
|
|
ext_params['is_use'] = True
|
|
|
|
return True,instruction,analysis,analysis,ext_params
|
|
|
|
# Get the function and call it
|
|
|
|
dynamic_fun = safe_locals['dynamic_fun']
|
|
|
|
status, tmpout = dynamic_fun()
|
|
|
|
output = f"status:{status},output:{tmpout}"
|
|
|
|
except Exception as e:
|
|
|
|
analysis = f"执行动态代码时出错: {str(e)}"
|
|
|
|
ext_params['is_use'] = True
|
|
|
|
return True,instruction,analysis,analysis,ext_params
|
|
|
|
|
|
|
|
|
|
|
|
# 第三步:分析执行结果
|
|
|
|
analysis = self.analyze_result(output, instruction,"","")
|
|
|
|
# 指令和结果入数据库
|
|
|
|
# ?
|
|
|
|
if not analysis: # analysis为“” 不提交LLM
|
|
|
|
return False, instruction, analysis,"",ext_params
|
|
|
|
return True, instruction, analysis,"",ext_params
|
|
|
|
|
|
|
|
def analyze_result(self, result,instruction,stderr,stdout):
|
|
|
|
#指令结果分析
|
|
|
|
return result
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
llm_code = """
|
|
|
|
def run_test():
|
|
|
|
return 'Penetration test executed successfully!'
|
|
|
|
"""
|