You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

230 lines
8.2 KiB

1 week ago
#python代码动态执行
import queue
import ast
import subprocess
import json
import builtins
import re
import paramiko
import impacket
import psycopg2
import socket
import struct
import sys
import requests
import ssl
import mysql.connector
import telnetlib
import time
import uuid
import multiprocessing
import textwrap
from mycode.Result_merge import my_merge
from ftplib import FTP
from requests.auth import HTTPBasicAuth
from myutils.ReturnParams import ReturnParams
from concurrent.futures import ProcessPoolExecutor, TimeoutError
# --------------------------------------------
# 1) 全局 helper:放在模块顶层,才能被子进程 picklable 调用
# --------------------------------------------
def _execute_dynamic(instruction_str):
"""
在子进程中执行 instruction_str 所描述的 dynamic_fun
并返回 (status: bool, output: str)
"""
# 允许的内置函数白名单
allowed_builtins = {
'__name__': __name__,
'__import__': builtins.__import__,
'abs': abs, 'all': all, 'any': any, 'bool': bool,
'chr': chr, 'dict': dict, 'enumerate': enumerate,
'float': float, 'int': int, 'len': len, 'list': list,
'max': max, 'min': min, 'print': print, 'range': range,
'set': set, 'str': str, 'sum': sum, 'type': type,
'open': open, 'Exception': Exception, 'locals': locals
}
# 构造安全的 globals
safe_globals = {
'__builtins__': allowed_builtins,
'subprocess': subprocess,
'json': json,
're': re,
'paramiko': paramiko,
'impacket': impacket,
'psycopg2': psycopg2,
'socket': socket,
'mysql': mysql,
'mysql.connector': mysql.connector,
'struct': struct,
'sys': sys,
'requests': requests,
'ssl': ssl,
'FTP': FTP,
'HTTPBasicAuth': HTTPBasicAuth,
'telnetlib': telnetlib,
'time': time,
'uuid':uuid,
}
safe_locals = {}
try:
# 编译并执行用户提供的 code 字符串
compiled = compile(instruction_str, '<dynamic>', 'exec')
exec(compiled, safe_globals, safe_locals)
# dynamic_fun 必须存在
if 'dynamic_fun' not in safe_locals:
return False, "Function dynamic_fun() 未定义"
# 调用它并返回结果
res = safe_locals['dynamic_fun']()
if not (isinstance(res, tuple) and len(res) == 2 and isinstance(res[0], bool)):
return False, "dynamic_fun 返回值格式不对"
return res
except MemoryError:
return False, "内存溢出"
except RecursionError:
return False, "递归深度过深"
except Exception as e:
return False, f"子进程执行出错: {e}"
class PythoncodeTool():
def __init__(self,max_num):
self.proc_pool = ProcessPoolExecutor(max_workers=max_num)
def preprocess(self,code: str) -> str:
# 去掉最外层空行
code = code.strip('\n')
# 去除多余缩进
return textwrap.dedent(code)
def is_safe_code(self,code):
# List of high-risk functions to block (can be adjusted based on requirements)
# 只屏蔽这些“完整”函数调用
HIGH_RISK = {
'eval', # eval(...)
'exec', # exec(...)
'os.system', # os.system(...)
'subprocess.call', # subprocess.call(...)
'subprocess.Popen' # subprocess.Popen(...)
}
try:
tree = ast.parse(code)
for node in ast.walk(tree):
if isinstance(node, ast.Call):
fn = node.func
# 1) 裸 exec/eval
if isinstance(fn, ast.Name):
if fn.id in ('exec', 'eval'):
return False,"有高风险函数,暂不执行!"
# 2) 模块级别的 os.system、subprocess.call、subprocess.Popen
elif isinstance(fn, ast.Attribute):
# value 必须是 Name,才算"模块.方法"
if isinstance(fn.value, ast.Name):
fullname = f"{fn.value.id}.{fn.attr}"
if fullname in HIGH_RISK:
return False,"有高风险函数,暂不执行!"
return True,""
except SyntaxError as se:
# 语法都不通过,也算不安全
print("解析失败!", se, "", se.lineno, "")
print("出错的那行是:", code.splitlines()[se.lineno - 1])
return False,str(se)
def validate_instruction(self, instruction):
#指令过滤
timeout = 60*10
instr = instruction.replace("python_code ","")
instr = instr.replace("python-code ", "")
instr = self.preprocess(instr)
# Safety check
bsafe,error = self.is_safe_code((instr))
if not bsafe:
return "", timeout,error
return instr,timeout,""
def safe_import(self,name,*args,**kwargs):
ALLOWED_MODULES = ['subprocess', 'json','re']
if name not in ALLOWED_MODULES:
raise ImportError(f"Import of '{name}' is not allowed")
return builtins.__import__(name, *args, **kwargs)
def _run_dynamic(self, safe_locals, q):
"""子进程执行 dynamic_fun 并把结果放入队列"""
try:
fn = safe_locals['dynamic_fun']
res = fn()
q.put(res)
except Exception as e:
q.put((False, f"执行出错: {e}"))
def execute_instruction(self, instruction_old):
'''
执行指令验证合法性 -> 执行 -> 分析结果
:param instruction_old:
:return:
bool:true-正常返回给大模型false-结果不返回给大模型
str:执行的指令
str:执行指令的结果
'''
ext_params = ReturnParams()
ext_params["is_user"] = False # 是否要提交用户确认 -- 默认False
ext_params["is_vulnerability"] = False # 是否是脆弱点
# 第一步:验证指令合法性
instruction,time_out,error = self.validate_instruction(instruction_old)
if not instruction:
return False, instruction_old, error,"",ext_params
# 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#?
# 第二步:执行指令
future = self.proc_pool.submit(_execute_dynamic, instruction)
try:
# 在主进程中等待结果,超时则抛 TimeoutError
status, tmpout = future.result(timeout=time_out) #这里是阻塞的
except TimeoutError:
# 超时处理
future.cancel()
status, tmpout = False, f"执行超时({time_out} 秒)"
except Exception as e:
# 其他异常
status, tmpout = False, f"提交子进程运行出错: {e}"
output = f"status:{status},output:{tmpout}"
# 第三步:分析执行结果
analysis = self.analyze_result(output, instruction,"","")
# 指令和结果入数据库
# ?
if not analysis: # analysis为“” 不提交LLM
return False, instruction, analysis,"",ext_params
return True, instruction, analysis,"",ext_params
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析 --- 要不要限定一个max_len?
if "enum4linux " in instruction: #存在指令包装成Python代码返回的情况
result = my_merge("enum4linux",result)
else:
if len(result) > 3000: #超过2000长度时,尝试去重重复行
lines = result.splitlines()
seen = set()
unique_lines = []
for line in lines:
if line not in seen:
seen.add(line)
unique_lines.append(line)
return "\n".join(unique_lines)
return result
#关闭进程池
def shutdown(self):
self.proc_pool.shutdown(wait=True)
if __name__ == "__main__":
llm_code = """
def run_test():
return 'Penetration test executed successfully!'
"""