You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

129 lines
4.8 KiB

#python代码动态执行
import ast
import subprocess
import json
import builtins
import re
from tools.ToolBase import ToolBase
class PythoncodeTool(ToolBase):
def is_safe_code(self,code):
# List of high-risk functions to block (can be adjusted based on requirements)
HIGH_RISK_FUNCTIONS = ['eval', 'exec', 'os.system', 'subprocess.call', 'subprocess.Popen']
"""Check if the code contains high-risk function calls."""
try:
tree = ast.parse(code)
for node in ast.walk(tree):
if isinstance(node, ast.Call):
if isinstance(node.func, ast.Name) and node.func.id in HIGH_RISK_FUNCTIONS:
return False
elif isinstance(node.func, ast.Attribute) and node.func.attr in HIGH_RISK_FUNCTIONS:
return False
return True
except SyntaxError:
return False
def validate_instruction(self, instruction):
#指令过滤
timeout = 0
instr = instruction.replace("python_code ","")
# Safety check
if not self.is_safe_code(instr):
return "", timeout
return instr,timeout
def safe_import(self,name,*args,**kwargs):
ALLOWED_MODULES = ['subprocess', 'json','re']
if name not in ALLOWED_MODULES:
raise ImportError(f"Import of '{name}' is not allowed")
return builtins.__import__(name, *args, **kwargs)
def execute_instruction(self, instruction_old):
'''
执行指令:验证合法性 -> 执行 -> 分析结果
:param instruction_old:
:return:
bool:true-正常返回给大模型,false-结果不返回给大模型
str:执行的指令
str:执行指令的结果
'''
ext_params = self.create_extparams()
# 定义允许的内置函数集合 --白名单
allowed_builtins = {
'__import__': builtins.__import__,
"abs": abs,
"all": all,
"any": any,
"bool": bool,
"chr": chr,
"dict": dict,
"float": float,
"int": int,
"len": len,
"list": list,
"max": max,
"min": min,
"print": print,
"range": range,
"set": set,
"str": str,
"sum": sum,
"type": type,
'open':open,
'Exception':Exception,
# 根据需要可以添加其他安全的内置函数
}
# 第一步:验证指令合法性
instruction,time_out = self.validate_instruction(instruction_old)
if not instruction:
return False, instruction_old, "该指令暂不执行!","",ext_params
# 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#?
# 第二步:执行指令
output = ""
try:
# 构造安全的全局命名空间,只包含我们允许的 __builtins__
# 虽然动态代码中包含了import subprocess,但是还是需要在全局命名空间中添加subprocess这些库
# 正常情况应该是不需要的,后续再研究
safe_globals = {"__builtins__": allowed_builtins,
'subprocess':subprocess,
'json':json,
're':re,}
safe_locals = {} #不需要预设局部参数
# 在限制环境中执行代码
exec(instruction, safe_globals,safe_locals)
# Check if check_samba_vuln is defined
if 'dynamic_fun' not in safe_locals:
analysis = "Function dynamic_fun() is not defined"
ext_params['is_use'] = True
return True,instruction,analysis,analysis,ext_params
# Get the function and call it
dynamic_fun = safe_locals['dynamic_fun']
status, tmpout = dynamic_fun()
output = f"status:{status},output:{tmpout}"
except Exception as e:
analysis = f"执行动态代码时出错: {str(e)}"
ext_params['is_use'] = True
return True,instruction,analysis,analysis,ext_params
# 第三步:分析执行结果
analysis = self.analyze_result(output, instruction,"","")
# 指令和结果入数据库
# ?
if not analysis: # analysis为“” 不提交LLM
return False, instruction, analysis,"",ext_params
return True, instruction, analysis,"",ext_params
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析
return result
if __name__ == "__main__":
llm_code = """
def run_test():
return 'Penetration test executed successfully!'
"""