You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

153 lines
5.8 KiB

#python代码动态执行
import ast
import subprocess
import json
import builtins
import re
import paramiko
import impacket
import psycopg2
import socket
import struct
import sys
import requests
import ssl
import mysql.connector
from tools.ToolBase import ToolBase
from mycode.Result_merge import my_merge
class PythoncodeTool(ToolBase):
def is_safe_code(self,code):
# List of high-risk functions to block (can be adjusted based on requirements)
HIGH_RISK_FUNCTIONS = ['eval', 'exec', 'os.system', 'subprocess.call', 'subprocess.Popen']
"""Check if the code contains high-risk function calls."""
try:
tree = ast.parse(code)
for node in ast.walk(tree):
if isinstance(node, ast.Call):
if isinstance(node.func, ast.Name) and node.func.id in HIGH_RISK_FUNCTIONS:
return False
elif isinstance(node.func, ast.Attribute) and node.func.attr in HIGH_RISK_FUNCTIONS:
return False
return True
except SyntaxError:
return False
def validate_instruction(self, instruction):
#指令过滤
timeout = 0
instr = instruction.replace("python_code ","")
instr = instruction.replace("python-code ", "")
# Safety check
if not self.is_safe_code(instr):
return "", timeout
return instr,timeout
def safe_import(self,name,*args,**kwargs):
ALLOWED_MODULES = ['subprocess', 'json','re']
if name not in ALLOWED_MODULES:
raise ImportError(f"Import of '{name}' is not allowed")
return builtins.__import__(name, *args, **kwargs)
def execute_instruction(self, instruction_old):
'''
执行指令:验证合法性 -> 执行 -> 分析结果
:param instruction_old:
:return:
bool:true-正常返回给大模型,false-结果不返回给大模型
str:执行的指令
str:执行指令的结果
'''
ext_params = self.create_extparams()
# 定义允许的内置函数集合 --白名单
allowed_builtins = {
'__name__': __name__,
'__import__': builtins.__import__,
"abs": abs,
"all": all,
"any": any,
"bool": bool,
"chr": chr,
"dict": dict,
"float": float,
"int": int,
"len": len,
"list": list,
"max": max,
"min": min,
"print": print,
"range": range,
"set": set,
"str": str,
"sum": sum,
"type": type,
'open':open,
'Exception':Exception,
# 根据需要可以添加其他安全的内置函数
}
# 第一步:验证指令合法性
instruction,time_out = self.validate_instruction(instruction_old)
if not instruction:
return False, instruction_old, "该指令暂不执行!","",ext_params
# 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#?
# 第二步:执行指令
output = ""
try:
# 构造安全的全局命名空间,只包含我们允许的 __builtins__
# 虽然动态代码中包含了import subprocess,但是还是需要在全局命名空间中添加subprocess这些库
# 正常情况应该是不需要的,后续再研究
safe_globals = {"__builtins__": allowed_builtins,
'subprocess':subprocess,
'json':json,
're':re,
'paramiko':paramiko,
'impacket':impacket,
'psycopg2':psycopg2,
'socket':socket,
'mysql':mysql,
'mysql.connector':mysql.connector,
'struct':struct,
'sys':sys,
'requests':requests,
'ssl':ssl}
safe_locals = {} #不需要预设局部参数
# 在限制环境中执行代码
exec(instruction, safe_globals,safe_locals)
# Check if check_samba_vuln is defined
if 'dynamic_fun' not in safe_locals:
analysis = "Function dynamic_fun() is not defined"
ext_params['is_use'] = True
return True,instruction,analysis,analysis,ext_params
# Get the function and call it
dynamic_fun = safe_locals['dynamic_fun']
status, tmpout = dynamic_fun() #LLM存在status定义错误的情况(执行成功,却返回的是False) #重点要处理
output = f"status:{status},output:{tmpout}"
except Exception as e:
analysis = f"执行动态代码时出错: {str(e)}"
ext_params['is_use'] = True
return True,instruction,analysis,analysis,ext_params
# 第三步:分析执行结果
analysis = self.analyze_result(output, instruction,"","")
# 指令和结果入数据库
# ?
if not analysis: # analysis为“” 不提交LLM
return False, instruction, analysis,"",ext_params
return True, instruction, analysis,"",ext_params
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析 --- 要不要限定一个max_len?
if "enum4linux " in instruction: #存在指令包装成Python代码返回的情况
result = my_merge("enum4linux",result)
return result
if __name__ == "__main__":
llm_code = """
def run_test():
return 'Penetration test executed successfully!'
"""