diff --git a/InstructionManager.py b/InstructionManager.py index 25121a3..21f809f 100644 --- a/InstructionManager.py +++ b/InstructionManager.py @@ -10,11 +10,13 @@ import subprocess import importlib import os from tools.ToolBase import ToolBase +from myutils.ReturnParams import ReturnParams class InstructionManager: - def __init__(self): - self.tool_registry = {} #安全工具list - self.load_tools() #加载工具类 + def __init__(self,TM): + self.TM = TM #任务类对象穿透 + self.tool_registry = {} # 安全工具list + self.load_tools() # 加载工具类 def init_data(self): pass @@ -34,7 +36,7 @@ class InstructionManager: cls = getattr(module,module_name) if(issubclass(cls, ToolBase) and cls != ToolBase): tool_name = module_name.lower()[:-4] - self.tool_registry[tool_name] = cls() + self.tool_registry[tool_name] = cls(self.TM) #类对象穿透 except ImportError as e: print(f"加载工具 {module_name} 失败:{str(e)}") @@ -56,9 +58,15 @@ class InstructionManager: #print(f"分析结果:{result}") print(f"*****指令:{instr},执行完毕") else: + bres = False instr = instruction #保障后续代码的一致性 + source_result = result = f"未知工具:{tool_name}" + ext_params = ReturnParams() + ext_params["is_user"] = False # 是否要提交用户确认 -- 默认False + ext_params["is_vulnerability"] = False # 是否是脆弱点 print(f"执行指令:{instr}") print(f"未知工具:{tool_name}") + return bres,instr,result,source_result,ext_params #过来指令:合规、判重、待执行等 diff --git a/LLMManager.py b/LLMManager.py deleted file mode 100644 index e849118..0000000 --- a/LLMManager.py +++ /dev/null @@ -1,173 +0,0 @@ -''' -实现对大模型调用的封装,隔离具体使用的LLM -pip install openai -''' -import openai -import json -import threading -import re -from openai import OpenAI -from myutils.MyTime import get_local_timestr - -class LLMManager: - def __init__(self,illm_type=0): - self.api_key = None - self.api_url = None - self.task_id =0 #一个任务一个id - self.llm_sn = 0 # llm执行序列号,--一任务一序列 - self.llm_sn_lock = threading.Lock() # - #temperature设置 - #DS------代码生成/数学解题:0.0 -- 数据抽取/分析:1.0 -- 通用对话:1.3 -- 翻译:1.3 -- 创意类写作:1.5 - #腾讯云--- - self.temperature = 1.0 - if illm_type == 0: #腾讯云 - self.api_key = "fGBYaQLHykBOQsFwVrQdIFTsYr8YDtDVDQWFU41mFsmvfNPc" - self.api_url = "" - elif illm_type == 1: #DS - self.api_key ="sk-10360148b465424288218f02c87b0e1b" - self.api_url ="https://api.deepseek.com/v1" - self.model = "deepseek-reasoner" #model=deepseek-reasoner -- R1 model=deepseek-chat --V3 - #创建会话对象 -- 一个任务的LLM必须唯一 - self.client = OpenAI(api_key=self.api_key,base_url=self.api_url) - self.messages = [] - # 初始化阶段和已知信息 - current_stage = "信息收集" - known_info = {"url": "www.test.com"} - results = [] # 存储所有任务结果,用于生成报告 - - #******测试使用,设置slef.message的值 - def test_old_message(self,strMessage): - try: - self.messages = json.loads(strMessage) - except json.JSONDecodeError as e: - print(f"JSON解析错误: {str(e)}") - except Exception as e: - print(f"错误: {str(e)}") - - # 构建初始提示 初始化messages - def build_initial_prompt(self,target,know_info=""): - self.messages = [{"role": "system", - "content": "你是一位资深的渗透测试专家,需要动态控制整个渗透测试过程,包括信息收集、漏洞扫描、漏洞利用等阶段,最终生成渗透测试报告。由你规划执行的指令,我会根据你的指令执行并提交结果,你再对结果进行分析,规划并生成下一步指令,直到完成渗透测试,生成测试报告。" - "生成的指令需满足如下约束:" - "1.只返回具体的shell指令或Python代码,不要包含注释和说明;" - "2.shell指令以```bash(.*?)```包裹,python代码以```python(.*?)```包裹;" - "3.针对提供的Python代码,需要包含错误处理,并将执行结果保存到文件中(为每个任务指定唯一文件名),执行结束返回success或failure和该文件名;" - "4.如果认为渗透测试已完成,请生成生成报告的python代码,并返回success和complete"}] # 一个messages - return f"现在开始对目标{target}进行渗透测试,已知信息{know_info},请提供下一步执行的指令。" - - # 构建反馈提示 - def build_feedback_prompt(self,bres,instruction, result): - if bres: - return f"执行指令“{instruction}”的结果是“{result}”。请根据这些结果生成下一步具体的指令。" - else: - return "" - - def init_data(self,task_id=0): - #初始化LLM数据 - self.llm_sn = 0 - self.task_id = task_id - self.messages = [] - - # 调用LLM生成指令 - def get_llm_instruction(self,prompt,th_DBM): - ''' - 1.由于大模型API不记录用户请求的上下文,一个任务的LLM不能并发! - :param prompt:用户本次输入的内容 - :return: instr_list - ''' - #添加本次输入入队列 - message = {"role":"user","content":prompt} - self.messages.append(message) - - #提交LLM - post_time = get_local_timestr() - response = self.client.chat.completions.create( - model=self.model, - messages = self.messages - ) - - reasoning_content = "" - content = "" - #LLM返回处理 - if self.model == "deepseek-reasoner": - #返回错误码:DS-https://api-docs.deepseek.com/zh-cn/quick_start/error_codes - reasoning_content = response.choices[0].message.reasoning_content #推理过程 - print(reasoning_content) - content = response.choices[0].message.content #推理内容 - print(content) - #记录历史信息 - self.messages.append({'role': 'assistant', 'content': content}) - elif self.model == "deepseek-chat": - content = response.choices[0].message - #记录历史信息 - self.messages.append(content) - #LLM记录存数据库 - with self.llm_sn_lock: - self.llm_sn += 1 - #llm查询记录入库 - bres = th_DBM.insert_llm(self.task_id,prompt,reasoning_content,content,post_time,self.llm_sn) - if not bres: - print("llm入库失败!") - - #********测试时使用---输出和记录LLM返回指令的message - print(f"Messages:{self.messages}") - with open("test","w",encoding="utf-8") as f: #输出到文件 - json.dump(self.messages,f,ensure_ascii=False) - - #需要对指令进行提取 - instr_list = self.fetch_instruction(content) - return instr_list - - def fetch_instruction(self,response_text): - ''' - 提取命令列表,包括: - 1. Python 代码块(仅保留有效 Python 代码) - 2. Shell 命令(分割空行,每个块视为一条指令) - - :param text: 输入文本 - :return: 解析后的命令列表 - ''' - #针对llm的回复,提取执行的指令 - # 正则匹配 Python 代码块 - python_blocks = re.findall(r"```python(.*?)```", response_text, flags=re.DOTALL) - # 处理 Python 代码块,去除空行并格式化 - python_blocks = [block.strip() for block in python_blocks] - - # 按连续的空行拆分 - # 移除 Python 代码块,但保留内容用于返回 - text_no_python = re.sub(r"```python.*?```", "PYTHON_BLOCK", response_text, flags=re.DOTALL) - # 这里用 \n\s*\n 匹配一个或多个空白行 - parts = re.split(r'\n\s*\n', text_no_python) - - commands = [] - python_index = 0 # 记录 Python 代码块插入位置 - - for part in parts: - part = part.strip() - if not part: - continue - if "PYTHON_BLOCK" in part: - # 还原 Python 代码块 - commands.append(f"python {python_blocks[python_index]}") - python_index += 1 - else: - # 添加普通 Shell 命令 - commands.append(part) - - return commands - - def test_llm(self): - with open("test", "r", encoding="utf-8") as f: - messages = json.load(f) - text = messages[-1]["content"] - list = self.fetch_instruction(text) - for itme in list: - print("***********") - print(itme) - - -if __name__ == "__main__": - LM = LLMManager(1) - LM.test_llm() - - diff --git a/TaskManager.py b/TaskManager.py index 3835a0f..d0c8fdd 100644 --- a/TaskManager.py +++ b/TaskManager.py @@ -2,24 +2,31 @@ 渗透测试任务管理类 一次任务的闭合性要检查2025-3-10 一次任务后要清理LLM和InstrM的数据 ''' from TargetManager import TargetManager # 从模块导入类 -from LLMManager import LLMManager # 同理修正其他导入 +#from LLMManager import LLMManager # 同理修正其他导入 +from mycode import ControlCenter #控制中心替代LLM--控制中心要实现一定的基础逻辑和渗透测试树的维护。 from myutils.FileManager import FileManager from InstructionManager import InstructionManager from mycode.DBManager import DBManager from myutils.MyTime import get_local_timestr +from myutils.MyLogger_logger import LogHandler +import pickle import queue import time import os import threading + class TaskManager: def __init__(self): self.TargetM = TargetManager() + self.logger = LogHandler.get_logger("TaskManager") # 生成功能对象 - self.LLMM = LLMManager(1) - self.InstrM = InstructionManager() self.DBM = DBManager() #主进程一个DBM - self.DBM.connect() + if not self.DBM.connect(): + self.logger.error("数据库连接失败!终止工作!") + return + self.CCM = ControlCenter(self.DBM) + self.InstrM = InstructionManager(self) # 类对象渗透,要约束只读取信息 # 控制最大并发指令数量 self.max_thread_num = 2 self.task_id = 0 #任务id -- @@ -28,125 +35,32 @@ class TaskManager: self.batch_num = 0 #一个批次的指令数量 self.long_instr_num = 0 #耗时指令数量 self.long_time_instr = ['nikto'] #耗时操作不计入批量执行的数量,不加不减 - self.instr_queue = queue.Queue() #线程安全 --待执行指令 - self.user_instr = queue.Queue() #需要用确认或手动执行的命令--待执行指令 - self.user_done_instr = queue.Queue() #执行完成需要用户确认的指令 - self.doing_instr = queue.Queue()#执行中的指令 - self.done_instr = queue.Queue() #执行完成的指令 - self.res_queue = queue.Queue() #结果队列 + self.node_queue = queue.Queue() #线程安全 --待执行指令 + self.lock = threading.Lock() #线程锁 + self.node_num = 0 #在处理Node线程的处理 self.do_sn_lock = threading.Lock() #指令执行顺序号锁 self.brun = True - def is_user_instr(self,instr): - ''' - 过滤需要人工确认或手动执行的指令 ---- 待完善 - :param instr: - :return: - ''' - #if instr.startswith("curl") or instr.startswith("http") or instr.startswith("wget"): - if instr.startswith("http") or instr.startswith("wget") or instr.startswith("ssh"): - return True - - def instr_in_quere(self,instr_list): - ''' - 对于运行需要较长时间的不强求同一批次返回给LLM - :param instr_list: - :return: - ''' - for instr in instr_list: - if self.is_user_instr(instr): - self.user_instr.put(instr) - print(f"需要人工确认的指令{instr}") - else: - matched =False - for prefix in self.long_time_instr: - if instr.startswith(prefix): - matched =True - if not matched: - with self.lock: - self.batch_num += 1 #非耗时指令+1 - print(f"&&&&&&当前batch_num:{self.batch_num}") - else: - with self.lock: - self.long_instr_num +=1 #耗时指令数量+1 - # 指令入队列 - self.instr_queue.put(instr) - - #入数据库放哪一层要待定-TaskManager , InstructionM,ToolBase-2025-3-10 - def res_in_db(self,bres,instr,reslut,start_time,end_time,th_DBM,source_result,ext_params): - if th_DBM.ok: - with self.do_sn_lock: - self.do_sn += 1 #指令的执行序列是一个任务共用,要线程锁,错误问题也不大 - th_DBM.insetr_result(self.task_id,instr,reslut,self.do_sn,start_time,end_time,source_result,ext_params) - else: - print("数据库连接失败!!") - - def res_in_quere(self,bres,instr,reslut,start_time,end_time,th_DBM,source_result,ext_params): + def res_in_quere(self,bres,instr,reslut,start_time,end_time,th_DBM,source_result,ext_params,work_node): ''' - 执行结果入队列,若批量执行的指令都完成,则提交LLM生成下一步指令 + 执行结果入队列 :param bres: :param instr: :param reslut: :return: ''' - matched = False - bover = False - #入数据库 -- bres True和False 都入数据库2025-3-10 - self.res_in_db(bres,instr,reslut,start_time,end_time,th_DBM,source_result,ext_params) - - #结果入队列 - if bres: - res = {'instr':instr,'reslut':reslut} - self.res_queue.put(res) #入队列 - else: #对于不需要再提交给LLM的结果,如何处理待定。有执行false、未知工具 - pass - #判断批次指令是否都执行完 - for prefix in self.long_time_instr: - if instr.startswith(prefix): - matched = True - if not matched: - with self.lock: - self.batch_num -= 1 #非耗时指令-1 - print(f"&&&&&&当前batch_num:{self.batch_num}") - if self.batch_num ==0: #只会轮询到最后一个线程才会是0 - bover = True + #入数据库 -- bres True和False 都入数据库2025-3-10---加node_path(2025-3-18)#? + if th_DBM.ok: + work_node.do_sn += 1 # 指令的执行序列是一个任务共用,要线程锁,错误问题也不大 + th_DBM.insetr_result(self.task_id, instr, reslut, work_node.do_sn, start_time, end_time, source_result, + ext_params, work_node.path) else: - #这里会有个问题,若耗时指令执行完,结果入队列后,但LLM没有进一步的指令下发,会造成结果不会提交 - #需要在生成“生成报告”的指令时进行研判--该点很重要 - with self.lock: - self.long_instr_num -=1 #耗时指令数量-1 - if bover: - #整合结果提交 -- 需要确保只有一个线程会执行 - self.batch_res_post(th_DBM) - - def batch_res_post(self,th_DBM): - ''' - 组合批量指令的结果,一起提交到LLM,生成下一步具体指令 - :return: - ''' - post_string = "" - while not self.res_queue.empty(): - res = self.res_queue.get() - str = f"执行指令:{res['instr']}的结果是:{res['reslut']}。" - post_string = post_string + "\n" - post_string = post_string + str - - if post_string: - post_string = post_string + "\n请根据这些结果生成下一步具体的指令。" - - print(f"***************\n{post_string}") - with open("res","w",encoding="utf-8") as f: - f.write(post_string) - #*****测试时中断下一步指令的获取 - # 提交提示词,得到下一步指令 - # instr_list = self.LLMM.get_llm_instruction(post_string,th_DBM) - # if instr_list: - # if instr_list[0] == "生成报告": #“生成报告”要特殊处理#? - # self.brun = False - # print("生成报告--退出工作线程") - # else: # 继续工作 - # self.instr_in_quere(instr_list) + self.logger.error("数据库连接失败!!") + + #结果入队列---2025-3-18所有的指令均需返回给LLM便于节点状态的更新,所以bres作用要调整。 + res = {'instr':instr,'reslut':reslut} + work_node.add_res(res) #入节点结果队列 def do_worker_th(self): #线程的dbm需要一个线程一个 @@ -154,20 +68,47 @@ class TaskManager: th_DBM.connect() while self.brun: try: - instruction = self.instr_queue.get(block=False) #quere线程安全,block=false非阻塞get - self.doing_instr.put(instruction) #入执行队列 - #执行中会对指令进行微调,并有可能不执行直接返回空结果 - start_time = get_local_timestr() #指令执行开始时间 - bres,instr,reslut,source_result,ext_params = self.InstrM.execute_instruction(instruction) - end_time = get_local_timestr() #指令执行结束时间 - self.res_in_quere(bres,instr,reslut,start_time,end_time,th_DBM,source_result,ext_params) #执行结果入队列 - self.done_instr.put(instruction) #执行完成队列 - #执行情况是否需要用户确认 - if ext_params["is_user"]: - pass - #print("该指令执行需要用户确认") + with self.lock: + self.node_num += 1 + + #获取测试节点 + work_node = self.node_queue.get(block=False) + #开始执行指令 + for instruction in work_node.instr_queue: + start_time = get_local_timestr() #指令执行开始时间 + bres, instr, reslut, source_result, ext_params = self.InstrM.execute_instruction(instruction) + end_time = get_local_timestr() # 指令执行结束时间 + self.res_in_quere(bres, instr, reslut, start_time, end_time, th_DBM, source_result, + ext_params,work_node) # 执行结果入队列 + #保存记录--测试使用 + with self.lock: + self.node_num -=1 + if self.node_num ==0 and self.node_queue.empty(): + with open("attack_tree", 'wb') as f: + pickle.dump(TM.CCM.attack_tree, f) + #针对一个节点的指令执行完成后,提交LLM规划下一步操作 + # node_list = self.CCM.get_llm_instruction(work_node) + # if not node_list:#该节点测试完成 + # continue + # for node in node_list: + # self.node_queue.put(node) except queue.Empty: time.sleep(10) + # try: + # instruction = self.instr_queue.get(block=False) #quere线程安全,block=false非阻塞get + # self.doing_instr.put(instruction) #入执行队列--忘了入执行队列的作用 + # #执行中会对指令进行微调,并有可能不执行直接返回空结果 + # start_time = get_local_timestr() #指令执行开始时间 + # bres,instr,reslut,source_result,ext_params = self.InstrM.execute_instruction(instruction) + # end_time = get_local_timestr() #指令执行结束时间 + # self.res_in_quere(bres,instr,reslut,start_time,end_time,th_DBM,source_result,ext_params) #执行结果入队列 + # self.done_instr.put(instruction) #执行完成队列 + # #执行情况是否需要用户确认 + # if ext_params["is_user"]: + # pass + # #print("该指令执行需要用户确认") + # except queue.Empty: + # time.sleep(10) #函数结束,局部变量自动释放 def start_task(self,target_name,target_in): @@ -180,11 +121,11 @@ class TaskManager: #获取基本信息: 读取数据库或预生成指令,获取基本的已知信息 know_info = "无" #? #启动--初始化指令 - prompt = self.LLMM.build_initial_prompt(target,know_info) - instr_list = self.LLMM.get_llm_instruction(prompt,self.DBM) - self.instr_in_quere(instr_list) #指令入队列 + node_list = self.CCM.start_do(target,self.task_id) + for node in node_list: + self.node_queue.put(node) - #创建工作线程 + #创建工作线程----2025-3-18调整为一个节点一个线程, for i in range(self.max_thread_num): w_th = threading.Thread(target=self.do_worker_th) w_th.start() @@ -200,7 +141,7 @@ class TaskManager: def stop_task(self): self.brun = False - self.LLMM.init_data() #清空一些全局变量 + self.CCM.stop_do() #清空一些全局变量 self.InstrM.init_data() #结束任务需要收尾处理#? @@ -211,22 +152,15 @@ if __name__ == "__main__": FM = FileManager() current_path = os.path.dirname(os.path.realpath(__file__)) strMsg = FM.read_file("test",1) - TM.LLMM.test_old_message(strMsg) #先设置message的值 + test_type = 3 if test_type == 1: #测试执行指令 - # instrS = ['gobuster dir -u https://58.216.217.70 -w /usr/share/wordlists/dirbuster/directory-list-2.3-medium.txt -k -t 20 --timeout 10s', - # 'searchsploit SoftEther VPN', - # 'curl -kv -X POST -d "username=admin&password=admin" https://58.216.217.70/vpn/index.html --connect-timeout 10', - # 'gobuster dir -u http://58.216.217.70:10001 -w /usr/share/wordlists/dirbuster/directory-list-2.3-medium.txt -t 20 --timeout 10s'] - #instrS = ['nmap -sV -sC -p- -Pn 192.168.204.137'] - with open("test", "r", encoding="utf-8") as f: - messages = json.load(f) - text = messages[-1]["content"] - #list = TM.LLMM.fetch_instruction(text)[9:10] - list = TM.LLMM.fetch_instruction(text) - #print(list) - TM.instr_in_quere(list) + with open("attack_tree", "rb") as f: + TM.CCM.attack_tree = pickle.load(f) + with open("node_list", 'rb') as f: + TM.node_queue = pickle.load(f) + #创建线程执行指令 for i in range(TM.max_thread_num): w_th = threading.Thread(target=TM.do_worker_th) @@ -237,16 +171,28 @@ if __name__ == "__main__": t.join() elif test_type ==2: #测试LLM返回下一步指令 - with open("res", "r", encoding="utf-8") as f: - prompt = f.read() - if prompt: - instr_list = TM.LLMM.get_llm_instruction(prompt,TM.DBM) - TM.instr_in_quere(instr_list) # 指令入队列问题不大,任务执行线程没有起 + with open("attack_tree", "rb") as f: + TM.CCM.attack_tree = pickle.load(f) + #遍历node,查看有res的数据 + nodes = TM.CCM.attack_tree.traverse_bfs() + for node in nodes: + if not node.res_quere.empty(): + node_list = TM.CCM.get_llm_instruction(node) + if not node_list:#该节点测试完成 + continue + for do_node in node_list: + TM.node_queue.put(do_node) elif test_type ==3: #新目标测试 - prompt = TM.LLMM.build_initial_prompt("192.168.204.137","无") - if prompt: - instr_list = TM.LLMM.get_llm_instruction(prompt,TM.DBM) - TM.instr_in_quere(instr_list) # 指令入队列问题不大,任务执行线程没有起 + # 启动--初始化指令 + node_list = TM.CCM.start_do("192.168.204.137", 0) + for node in node_list: + TM.node_queue.put(node) + #暂存状态-- + with open("attack_tree",'wb') as f: + pickle.dump(TM.CCM.attack_tree,f) + with open("node_list",'wb') as f: + pickle.dump(TM.node_queue,f) + else: #完整过程测试---要设定终止条件 pass diff --git a/config.yaml b/config.yaml index a9ecb3c..4ef50a3 100644 --- a/config.yaml +++ b/config.yaml @@ -13,8 +13,8 @@ mysql: passwd: anan2013@BABY database: zfsafe -#sqlit -sqlite: zfbox.db +#LLM-Type +LLM_type: 1 #0-腾讯云,1-DS,2-GPT #用户初始密码 pw: zfkj_123!@# diff --git a/mycode/AttackMap.py b/mycode/AttackMap.py new file mode 100644 index 0000000..b928262 --- /dev/null +++ b/mycode/AttackMap.py @@ -0,0 +1,150 @@ +import queue +#渗透测试树结构维护类 +class AttackTree: + def __init__(self,root_node): + #针对根节点处理 + self.root = root_node + self.root.path = f"目标系统->{root_node.name}" + + def set_root(self,root_node): + self.root = root_node + + def add_node(self,parent_name,new_node): + """根据父节点名称添加新节点""" + parent_node = self.find_node_by_name(parent_name) + if parent_node: + parent_node.add_child(new_node) + return True + return False + + def traverse_bfs(self): + """广度优先遍历""" + if not self.root: + return [] + + queue = [self.root] + result = [] + while queue: + current = queue.pop(0) + result.append(current) + queue.extend(current.children) + return result + + def traverse_dfs(self, node=None, result=None): + """深度优先遍历(前序遍历)""" + if result is None: + result = [] + if node is None: + node = self.root + if not node: + return [] + + result.append(node) + for child in node.children: + self.traverse_dfs(child, result) + return result + + def find_node_by_name(self, name): + """根据名称查找节点(广度优先)""" + nodes = self.traverse_bfs() + for node in nodes: + if node.name == name: + return node + return None + + def find_node_by_nodepath(self,node_path): + '''基于节点路径查找节点,只返回找到的第一个节点,若有节点名称路径重复的情况暂不处理''' + current_node = self.root #从根节点开始 + node_names = node_path.split('->') + for node_name in node_names: + if node_name == current_node.name:#根节点开始 + continue + else: + bfound = False + for child_node in current_node.children: + if child_node.name == node_name: #约束同一父节点下的子节点名称不能相同 + current_node = child_node + bfound = True + break + if not bfound: #如果遍历子节点都没有符合的,说明路径有问题的,不处理中间一段路径情况 + return None + #找到的话,就开始匹配下一层 + return current_node + + def find_nodes_by_status(self, status): + """根据状态查找所有匹配节点""" + return [node for node in self.traverse_bfs() if node.status == status] + + def find_nodes_by_vul_type(self, vul_type): + """根据漏洞类型查找所有匹配节点""" + return [node for node in self.traverse_bfs() if node.vul_type == vul_type] + + #考虑要不要用tree封装节点的操作--待定 + def update_node_status(self, node_name, new_status): + """修改节点状态""" + node = self.find_node_by_name(node_name) + if node: + node.status = new_status + return True + return False + + def update_node_vul_type(self,node_name,vul_type): + """修改节点漏洞类型""" + node = self.find_node_by_name(node_name) + if node: + node.vul_type = vul_type + return True + return False + + def print_tree(self, node=None, level=0): + """可视化打印树结构""" + if node is None: + node = self.root + prefix = " " * level + "|-- " if level > 0 else "" + print(f"{prefix}{node.name} [{node.status}, {node.vul_type}]") + for child in node.children: + self.print_tree(child, level + 1) + + +class TreeNode: + def __init__(self, name, status="未完成", vul_type="未发现"): + self.name = name # 节点名称 + self.status = status # 节点状态 + self.vul_type = vul_type # 漏洞类型 + self.children = [] # 子节点列表 + self.parent = None # 父节点引用 + self.path = "" #当前节点的路径 + self.instr_queue = queue.Queue() #针对当前节点的执行指令----重要约束:一个节点只能有一个线程在执行指令 + self.res_quere = queue.Queue() #指令执行的结果,一批一批 + self.llm_sn = 0 #针对该节点llm提交次数 + self.do_sn = 0 #针对该节点instr执行次数 + self.messages = [] #针对当前节点积累的messages -- 针对不同节点提交不同的messages + + def add_child(self, child_node): + child_node.parent = self + child_node.path = self.path + f"->{child_node.name}" #子节点的路径赋值 + child_node.messages = self.messages #传递messages #给什么时候的messages待验证#? + self.children.append(child_node) + + def add_instr(self,instr): + self.instr_queue.put(instr) + + def get_instr(self): + if self.instr_queue.empty(): + return None + return self.instr_queue.get() + + def add_res(self,str_res): #结构化结果字串 + self.res_quere.put(str_res) + + def get_res(self): + if self.res_quere.empty(): + return None + return self.res_quere.get() + + def __repr__(self): + return f"TreeNode({self.name}, {self.status}, {self.vul_type})" + + +if __name__ == "__main__": + pass \ No newline at end of file diff --git a/mycode/ControlCenter.py b/mycode/ControlCenter.py new file mode 100644 index 0000000..4a05fce --- /dev/null +++ b/mycode/ControlCenter.py @@ -0,0 +1,179 @@ +#自动化测试逻辑规则控制 +#统一控制规则 和 渗透测试树的维护 +import json +import re +import queue +from mycode.AttackMap import AttackTree +from mycode.AttackMap import TreeNode +from mycode.LLMManager import LLMManager +from myutils.ConfigManager import myCongif #单一实例 +from myutils.MyLogger_logger import LogHandler + +class ControlCenter: + def __init__(self,DBM): + self.logger = LogHandler.get_logger("ControlCenter") + self.task_id = None + self.target = None + self.attack_tree = None + self.DBM = DBM + #LLM对象 + self.LLM = LLMManager(myCongif.get_data("LLM_type")) + + def __del__(self): + self.task_id = None + self.target = None + self.attack_tree = None + self.DBM = None + + def init_cc_data(self): + #一次任务一次数据 + pass + + def get_user_init_info(self): + '''开始任务初,获取用户设定的基础信息''' + # ?包括是否对目标进行初始化的信息收集 + return "无" + + def start_do(self,target,task_id): + '''一个新任务的开始''' + self.task_id = task_id + self.target = target + #创建测试树 + if self.attack_tree: + self.attack_tree = None #释放 + root_node = TreeNode(target) + self.attack_tree = AttackTree(root_node)#创建测试树,同时更新根节点相关内容 + #获取初始指令 + know_info = self.get_user_init_info() + prompt = self.LLM.build_initial_prompt(target,know_info,root_node) + node_cmds, commands = self.LLM.get_llm_instruction(prompt,self.DBM,root_node) + # 更新tree + self.tree_manager(node_cmds,root_node) + # 分析指令入对应节点 + node_list = self.instr_in_node(commands,root_node) + return node_list + + #约束1:一个节点只能同时提交一次,未测试的节点不要重复 + def get_llm_instruction(self,node): + #拼接结果字符串 + json_strings = [] + while True: + try: + item = node.res_quere.get_nowait() + json_str = json.dumps(item,ensure_ascii=False) + json_strings.append(json_str) + except queue.Empty: + break + res_str = ','.join(json_strings) + + #构造本次提交的prompt + user_Prompt = f''' + 当前分支路径:{node.path} + 当前节点信息: + - 节点名称:{node.name} + - 节点状态:{node.status} + - 漏洞类型:{node.vul_type} + 上一步结果:{res_str} + 任务:生成下一步渗透测试指令或结束该节点渗透测试。 + ''' + node_cmds,commands = self.LLM.get_llm_instruction(user_Prompt,self.DBM,node) + #更新tree + self.tree_manager(node_cmds,node) + #分析指令入对应节点 + node_list = self.instr_in_node(commands,node) + return node_list + + def tree_manager(self,node_cmds,node): + '''更新渗透测试树''' + try: + node_jsons = json.loads(node_cmds) + for node_json in node_jsons: + if node_json["action"] == "add_node": #新增节点 + parent_node_name = node_json["parent"] + node_name = node_json["node"] + status = node_json["status"] + #新增节点原则上应该都是当前节点增加子节点 + if node.name == parent_node_name: + new_node = TreeNode(node_name,status) + node.add_child(new_node) #message的传递待验证 + else: + self.logger.error("添加子节点时,遇到父节点名称不一致的,需要介入!!") + elif node_json["action"] == "update_status": + node_name = node_json["node"] + status = node_json["status"] + vul_type = node_json["vulnerability"] + if node.name == node_name: + node.status = status + node.vul_type = vul_type + else: + self.logger.error("遇到不是修改本节点状态的,需要介入!!") + else: + self.logger.error("node更新JSON遇到异常参数!") + except json.JSONDecodeError as e: + self.logger.error(f"JSON 解析失败:{e.msg}") + self.logger.error(f"错误位置:第{e.lineno}行,列{e.colno}") + except TypeError as e: + self.logger.error(f"输入类型错误:{str(e)}") + + def instr_in_node(self,commands,node): + node_list = [] + for command in commands: + # 使用正则匹配方括号中的node_path(非贪婪模式) + match = re.search(r'\[(.*?)\]', command) + if match: + node_path = match.group(1) + find_node = self.attack_tree.find_node_by_nodepath(node_path) + if find_node: + instruction = re.sub(r'\[.*?\]', "", command,count=1,flags=re.DOTALL) + find_node.instr_queue.put(instruction) + #入输出队列 + if find_node not in node_list: + node_list.append(find_node) + else: + self.logger.error(f"基于节点路径没有找到对应的节点{node_path}") + else: + self.logger.error(f"得到的指令格式不符合规范:{command}") + return node_list + + + def is_user_instr(self,instr): + ''' + 过滤需要人工确认或手动执行的指令 ---- 待完善 + :param instr: + :return: + ''' + #if instr.startswith("curl") or instr.startswith("http") or instr.startswith("wget"): + if instr.startswith("http") or instr.startswith("wget") or instr.startswith("ssh"): + return True + + #指令入队列,待修改 + def instr_in_quere(self,instr_list): + ''' + 对于运行需要较长时间的不强求同一批次返回给LLM + :param instr_list: + :return: + ''' + for instr in instr_list: + if self.is_user_instr(instr): + self.user_instr.put(instr) + print(f"需要人工确认的指令{instr}") + else: + matched =False + for prefix in self.long_time_instr: + if instr.startswith(prefix): + matched =True + if not matched: + with self.lock: + self.batch_num += 1 #非耗时指令+1 + print(f"&&&&&&当前batch_num:{self.batch_num}") + else: + with self.lock: + self.long_instr_num +=1 #耗时指令数量+1 + # 指令入队列 + self.instr_queue.put(instr) + + def stop_do(self): + #清空数据 + self.task_id = None + self.target = None + self.attack_tree = None \ No newline at end of file diff --git a/mycode/DBManager.py b/mycode/DBManager.py index 45f56ae..85748b1 100644 --- a/mycode/DBManager.py +++ b/mycode/DBManager.py @@ -147,7 +147,7 @@ class DBManager: return task_id #指令执行结果入库 - def insetr_result(self,task_id,instruction,result,do_sn,start_time,end_time,source_result,ext_params): + def insetr_result(self,task_id,instruction,result,do_sn,start_time,end_time,source_result,ext_params,node_path): # 统一将 result 转为 JSON 字符串(无论原始类型) try: if not isinstance(result, str): @@ -162,16 +162,16 @@ class DBManager: # 使用参数化查询 sql = """ INSERT INTO task_result - (task_id, instruction, result, do_sn,start_time,end_time,source_result,is_user,is_vulnerability) + (task_id, instruction, result, do_sn,start_time,end_time,source_result,is_user,is_vulnerability,node_path) VALUES - (%s, %s, %s, %s, %s, %s,%s,%s,%s) + (%s, %s, %s, %s, %s, %s,%s,%s,%s,%s) """ params = (task_id, instruction, str_result, do_sn,start_time,end_time,source_result,ext_params['is_user'], - ext_params['is_vulnerability']) + ext_params['is_vulnerability'],node_path) return self.safe_do_sql(sql,params) #llm数据入库 - def insert_llm(self,task_id,prompt,reasoning_content,content,post_time,llm_sn): + def insert_llm(self,task_id,prompt,reasoning_content,content,post_time,node): str_reasoning = "" str_content = "" try: @@ -196,11 +196,14 @@ class DBManager: sql=""" INSERT INTO task_llm - (task_id,do_sn,prompt,reasoning_content,content,start_time) + (task_id,do_sn,prompt,reasoning_content,content,start_time,node_path) VALUES - (%s, %s, %s, %s, %s, %s) + (%s, %s, %s, %s, %s, %s,%s) """ - params = (task_id,llm_sn,prompt,str_reasoning,str_content,post_time) + str_reasoning = str_reasoning.encode('utf-8').decode('unicode_escape') + str_content = str_content.encode('utf-8').decode('unicode_escape') + + params = (task_id,node.llm_sn,prompt,str_reasoning,str_content,post_time,node.path) return self.safe_do_sql(sql,params) def test(self): diff --git a/mycode/InitManager.py b/mycode/InitManager.py new file mode 100644 index 0000000..5a96735 --- /dev/null +++ b/mycode/InitManager.py @@ -0,0 +1,23 @@ + +class InitManger: + def __init__(self): + listen_tatus = False + + def init_tool_info(self,self_ip="",self_port=0): + ''' + 初始化工具信息和工作 - 一直运行的信息 + :return: + ''' + #建立一个socket监听线程,处理反向连接 + if self_ip and self_port: #ip和端口有值后 + listen_tatus = False + pass + + def init_task_info(self,cookie=""): + ''' + 初始化任务信息,-- 一次任务一次的信息 + :return: + ''' + pass + +InitM = InitManger() #单一实例 \ No newline at end of file diff --git a/mycode/LLMManager.py b/mycode/LLMManager.py new file mode 100644 index 0000000..d1a240e --- /dev/null +++ b/mycode/LLMManager.py @@ -0,0 +1,230 @@ +''' +实现对大模型调用的封装,隔离具体使用的LLM +pip install openai + export OPENAI_API_KEY="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" +''' +import openai +import json +import threading +import re +from openai import OpenAI +from myutils.MyTime import get_local_timestr +from myutils.MyLogger_logger import LogHandler + +class LLMManager: + def __init__(self,illm_type=0): + self.logger = LogHandler().get_logger("LLMManager") + self.api_key = None + self.api_url = None + self.task_id =0 #一个任务一个id + self.llm_sn = 0 # llm执行序列号,--一任务一序列 + self.llm_sn_lock = threading.Lock() # + #temperature设置 + #DS------代码生成/数学解题:0.0 -- 数据抽取/分析:1.0 -- 通用对话:1.3 -- 翻译:1.3 -- 创意类写作:1.5 + #腾讯云--- + self.temperature = 1.0 + if illm_type == 0: #腾讯云 + self.api_key = "fGBYaQLHykBOQsFwVrQdIFTsYr8YDtDVDQWFU41mFsmvfNPc" + self.api_url = "" + elif illm_type == 1: #DS + self.api_key ="sk-10360148b465424288218f02c87b0e1b" + self.api_url ="https://api.deepseek.com/v1" + self.model = "deepseek-reasoner" #model=deepseek-reasoner -- R1 model=deepseek-chat --V3 + # 创建会话对象 -- 一个任务的LLM必须唯一 + self.client = OpenAI(api_key=self.api_key, base_url=self.api_url) + elif illm_type ==3: #GPT + self.api_key ="" + self.api_url = "" + self.model = "" + self.client = OpenAI() + + self.messages = [] + # 初始化阶段和已知信息 + current_stage = "信息收集" + known_info = {"url": "www.test.com"} + results = [] # 存储所有任务结果,用于生成报告 + + #******测试使用,设置slef.message的值 + def test_old_message(self,strMessage): + try: + self.messages = json.loads(strMessage) + except json.JSONDecodeError as e: + print(f"JSON解析错误: {str(e)}") + except Exception as e: + print(f"错误: {str(e)}") + + # 构建初始提示 初始化messages + def build_initial_prompt(self,target,know_info="无",node=None): + # self.messages = [{"role": "system", + # "content": "你是一位渗透测试专家,需要动态控制整个渗透测试过程,包括信息收集、漏洞扫描、漏洞利用等阶段,最终生成渗透测试报告。由你规划执行的指令,我会根据你的指令执行并提交结果,你再对结果进行分析,规划并生成下一步指令,直到完成渗透测试,生成测试报告。" + # "生成的指令需满足如下约束:" + # "1.只返回具体的shell指令或Python代码,不要包含注释和说明;" + # "2.shell指令以```bash(.*?)```包裹,python代码以```python(.*?)```包裹;" + # "3.若提供的是shell指令,需要避免用户再次交互;" + # "4.若提供的是python代码,主函数名为dynamic_fun,需要包含错误处理,执行结束返回两个str(status-'success'或'failure'和output-补充输出信息);" + # "5.如果认为渗透测试已完成,请生成生成报告的python代码,并返回'complete'和报告文件路径"}] # 一个messages + #return f"现在开始对目标{target}进行渗透测试,已知信息{know_info},请提供下一步执行的指令。" + if not node: + return None + #根节点初始化message + node.messages = [{"role": "system", + "content":''' + 你是一位渗透测试专家,由你动态控制整个渗透测试过程,根据当前测试状态和返回结果,决定下一步的测试指令,推动测试阶段前进,直至完成渗透测试。本地程序会根据你的指令进行执行,然后将执行结果返回给你。 + 整体过程说明: + 1.以目标系统所在IP为根节点,每个渗透测试点(如端口、服务、漏洞点)作为子节点,形成树型结构(测试树); + 2.测试树整体数据由本地程序存储,你只需要关注当前节点的渗透测试推进,节点状态(未完成、已完成)的更新,和是否有子节点新增; + 3.新增子节点时,同一父节点下的子节点名称不能相同,并提供对该节点的测试指令; + 3.新增节点示例:{\"action\":\"add_node\", \"parent\": \"80端口\", \"node\": \"http://192.168.1.100/index.php?id=1\", \"status\": \"未完成\"}; + 4.完成节点测试且发现漏洞示例:{\"action\": \"update_status\", \"node\": \"21端口\", \"status\": \"已完成\",\"vulnerability\": \"ftp匿名登录\"}; + 5.发现漏洞后,可根据漏洞类型决定是否新增子节点继续测试,如:{\"action\": \"add_node\", \"parent\": \"21端口\", \"node\": \"ftp配置检查\", \"status\": \"未完成\"}; + 6.当本地程序确认所有节点状态均为“已完成”时,生成测试报告。 + 生成的渗透测试指令需满足如下约束: + 1.只返回具体的shell指令或Python代码,不要包含注释和说明; + 2.shell指令以```bash-[对应节点的路径](.*?)```包裹,python代码以```python-[对应节点的路径](.*?)```包裹; + 3.若提供的是shell指令,需要避免用户再次交互; + 4.若提供的是python代码,主函数名为dynamic_fun,需包含错误处理,执行结束后返回一个tuple (status, output),其中status为'success'或'failure',output为补充输出信息; + 示例: + ```python-[目标系统->192.168.1.100->3306端口] + def dynamic_fun(): + try: + result = "扫描完成" + return ("success", result) + except Exception as e: + return ("failure", str(e)) + ``` + 5.如果认为渗透测试已完成,请生成生成报告的python代码,并返回'complete'和报告文件路径。 + '''}] # 一个messages + user_Prompt = f''' + 当前分支路径:目标系统->{target} + 当前节点信息: + - 节点名称:{target} + - 节点状态:未完成 + - 漏洞类型:未发现 + 上一步结果:已知信息-{know_info} + 任务:生成下一步渗透测试指令或结束该节点的渗透测试(修改节点状态为:已完成)。 + ''' + return user_Prompt + + def init_data(self,task_id=0): + #初始化LLM数据 + self.llm_sn = 0 + self.task_id = task_id + self.messages = [] + + # 调用LLM生成指令 + def get_llm_instruction(self,prompt,th_DBM,node): + ''' + 1.由于大模型API不记录用户请求的上下文,一个任务的LLM不能并发! + :param prompt:用户本次输入的内容 + :return: instr_list + ''' + #添加本次输入入该节点的message队列 + message = {"role":"user","content":prompt} + node.messages.append(message) + + #提交LLM + post_time = get_local_timestr() + response = self.client.chat.completions.create( + model=self.model, + messages = self.messages + ) + + #LLM返回结果处理 + reasoning_content = "" + content = "" + #LLM返回处理 + if self.model == "deepseek-reasoner": + #返回错误码:DS-https://api-docs.deepseek.com/zh-cn/quick_start/error_codes + reasoning_content = response.choices[0].message.reasoning_content #推理过程 + print(reasoning_content) + content = response.choices[0].message.content #推理内容 + print(content) + # 记录llm历史信息 + node.messages.append({'role': 'assistant', 'content': content}) + elif self.model == "deepseek-chat": + content = response.choices[0].message + # 记录llm历史信息 + node.messages.append(content) + else: + self.logger.error("处理到未预设的模型!") + return None + + #LLM记录存数据库 + node.llm_sn += 1 + bres = th_DBM.insert_llm(self.task_id,prompt,reasoning_content,content,post_time,node) + if not bres: + self.logger.error(f"{node.name}-llm入库失败!") + + #需要对指令进行提取 + node_list = self.fetch_instruction(content,node) + + #********测试时使用---输出和记录LLM返回指令的message + print(f"Messages:{self.messages}") + with open("../test", "w", encoding="utf-8") as f: #输出到文件 + json.dump(self.messages,f,ensure_ascii=False) + + return node_list + + def fetch_instruction(self,response_text,node): + ''' + 节点控制指令 + 渗透测试指令 + 提取命令列表,包括: + 1. Python 代码块 python[](.*?) + 2. Shell 命令``bash[](.*?)``` + :param text: 输入文本 + :return: node_cmds,python_blocks,shell_blocks + ''' + #针对llm的回复,提取节点操作数据和执行的指令---- + # 正则匹配 Python 代码块 + python_blocks = re.findall(r"```python-(.*?)```", response_text, flags=re.DOTALL) + # 处理 Python 代码块,去除空行并格式化 + python_blocks = [block.strip() for block in python_blocks] + + #正则匹配shell指令 + shell_blocks = re.findall(f"```bash-(.*?)```", response_text, flags=re.DOTALL) + shell_blocks = [block.strip() for block in shell_blocks] + + # 按连续的空行拆分 + # 移除 Python和bash 代码块 + text_no_python = re.sub(r"```python.*?```", "PYTHON_BLOCK", response_text, flags=re.DOTALL) + text = re.sub(r"```bash.*?```", "SHELL_BLOCK", text_no_python, flags=re.DOTALL) + + # 这里用 \n\s*\n 匹配一个或多个空白行 + parts = re.split(r'\n\s*\n', text) + node_cmds = [] + commands = [] + python_index = 0 + shell_index = 0 + for part in parts: + part = part.strip() + if not part: + continue + if "PYTHON_BLOCK" in part: + # 还原 Python 代码块 + commands.append(f"python_code {python_blocks[python_index]}") + python_index += 1 + elif "SHELL_BLOCK" in part: + commands.append(shell_blocks[shell_index]) + shell_index +=1 + else: + #其他的认为是节点操作指令 + node_cmds.append(part) + + return node_cmds,commands #?存在一个问题:这样分list返回,执行顺序会丢失 + + def test_llm(self): + with open("../test", "r", encoding="utf-8") as f: + messages = json.load(f) + text = messages[-1]["content"] + list = self.fetch_instruction(text) + for itme in list: + print("***********") + print(itme) + + +if __name__ == "__main__": + LM = LLMManager(1) + LM.test_llm() + + diff --git a/test b/test index 93c5643..14e8fac 100644 --- a/test +++ b/test @@ -1 +1 @@ -[{"role": "system", "content": "你是一位资深的渗透测试专家,你的任务是动态控制整个渗透测试过程。由你规划执行的指令,我会根据你的指令执行并返回给你结果,你再对结果进行分析,规划并生成下一步指令,直到完成渗透测试,生成测试报告。指令需满足:1.只返回具体的指令或Python代码,不含注释和说明;2.若需要python代码需要包含try-except错误处理,返回执行结果"}, {"role": "user", "content": "现在开始对目标192.168.204.137进行渗透测试,已知信息无请提供下一步执行的指令。"}, {"role": "assistant", "content": "nmap -sV -sC -p- -Pn 192.168.204.137"}, {"role": "user", "content": "\n执行指令:nmap -sV -sC -p- -Pn 192.168.204.137的结果是:Starting Nmap 7.95 ( https://nmap.org ) at 2025-03-10 19:50 CST\nNmap scan report for 192.168.204.137\nHost is up (0.0062s latency).\nNot shown: 65505 closed tcp ports (reset)\nPORT STATE SERVICE VERSION\n21/tcp open ftp vsftpd 2.3.4\n| ftp-syst: \n| STAT: \n| FTP server status:\n| Connected to 192.168.204.135\n| Logged in as ftp\n| TYPE: ASCII\n| No session bandwidth limit\n| Session timeout in seconds is 300\n| Control connection is plain text\n| Data connections will be plain text\n| vsFTPd 2.3.4 - secure, fast, stable\n|_End of status\n|_ftp-anon: Anonymous FTP login allowed (FTP code 230)\n22/tcp open ssh OpenSSH 4.7p1 Debian 8ubuntu1 (protocol 2.0)\n| ssh-hostkey: \n| 1024 60:0f:cf:e1:c0:5f:6a:74:d6:90:24:fa:c4:d5:6c:cd (DSA)\n|_ 2048 56:56:24:0f:21:1d:de:a7:2b:ae:61:b1:24:3d:e8:f3 (RSA)\n23/tcp open telnet Linux telnetd\n25/tcp open smtp Postfix smtpd\n| ssl-cert: Subject: commonName=ubuntu804-base.localdomain/organizationName=OCOSA/stateOrProvinceName=There is no such thing outside US/countryName=XX\n| Not valid before: 2010-03-17T14:07:45\n|_Not valid after: 2010-04-16T14:07:45\n|_ssl-date: 2025-03-10T11:53:04+00:00; 0s from scanner time.\n| sslv2: \n| SSLv2 supported\n| ciphers: \n| SSL2_RC4_128_WITH_MD5\n| SSL2_RC2_128_CBC_EXPORT40_WITH_MD5\n| SSL2_RC4_128_EXPORT40_WITH_MD5\n| SSL2_DES_64_CBC_WITH_MD5\n| SSL2_DES_192_EDE3_CBC_WITH_MD5\n|_ SSL2_RC2_128_CBC_WITH_MD5\n|_smtp-commands: metasploitable.localdomain, PIPELINING, SIZE 10240000, VRFY, ETRN, STARTTLS, ENHANCEDSTATUSCODES, 8BITMIME, DSN\n53/tcp open domain ISC BIND 9.4.2\n| dns-nsid: \n|_ bind.version: 9.4.2\n80/tcp open http Apache httpd 2.2.8 ((Ubuntu) DAV/2)\n|_http-server-header: Apache/2.2.8 (Ubuntu) DAV/2\n|_http-title: Metasploitable2 - Linux\n111/tcp open rpcbind 2 (RPC #100000)\n| rpcinfo: \n| program version port/proto service\n| 100003 2,3,4 2049/tcp nfs\n| 100003 2,3,4 2049/udp nfs\n| 100005 1,2,3 39603/tcp mountd\n| 100005 1,2,3 49863/udp mountd\n| 100021 1,3,4 39393/udp nlockmgr\n|_ 100021 1,3,4 44083/tcp nlockmgr\n139/tcp open netbios-ssn Samba smbd 3.X - 4.X (workgroup: WORKGROUP)\n445/tcp open netbios-ssn Samba smbd 3.0.20-Debian (workgroup: WORKGROUP)\n512/tcp open exec netkit-rsh rexecd\n513/tcp open login?\n514/tcp open tcpwrapped\n1099/tcp open java-rmi GNU Classpath grmiregistry\n1524/tcp open bindshell Metasploitable root shell\n2049/tcp open nfs 2-4 (RPC #100003)\n2121/tcp open ftp ProFTPD 1.3.1\n3306/tcp open mysql MySQL 5.0.51a-3ubuntu5\n| mysql-info: \n| Protocol: 10\n| Version: 5.0.51a-3ubuntu5\n| Thread ID: 9\n| Capabilities flags: 43564\n| Some Capabilities: SupportsCompression, LongColumnFlag, Speaks41ProtocolNew, SwitchToSSLAfterHandshake, SupportsTransactions, Support41Auth, ConnectWithDatabase\n| Status: Autocommit\n|_ Salt: ixB}p, NetBIOS MAC: (unknown)\n\nService detection performed. Please report any incorrect results at https://nmap.org/submit/ .\nNmap done: 1 IP address (1 host up) scanned in 181.20 seconds\n。\n请根据这些结果生成下一步具体的指令。"}, {"role": "assistant", "content": "ftp -v 192.168.204.137 << 'EOF' \nuser anonymous pass \nls \nbye \nEOF \n\n```python\nimport ftplib \ntry: \n ftp = ftplib.FTP('192.168.204.137') \n ftp.login('anonymous', 'pass') \n print(\"FTP files:\", ftp.nlst()) \n ftp.quit() \nexcept Exception as e: \n print(f\"Error: {e}\") \n``` \n\nsearchsploit vsftpd 2.3.4 \n\nmsfconsole -q -x \"use exploit/unix/ftp/vsftpd_234_backdoor; set RHOST 192.168.204.137; exploit\" \n\nmsfconsole -q -x \"use exploit/multi/samba/usermap_script; set RHOST 192.168.204.137; exploit\" \n\nnc -nv 192.168.204.137 6667 <<< $'USER backdoortest 0 0 :test\\nNICK backdoortest\\n' \n\ngobuster dir -u http://192.168.204.137 -w /usr/share/wordlists/dirbuster/directory-list-2.3-medium.txt -t 50 \n\ncurl -v http://192.168.204.137:8180/manager/html --user admin:admin \n\nmysql -h 192.168.204.137 -u root -e \"SHOW DATABASES;\" \n\nnc -nv 192.168.204.137 1524"}] \ No newline at end of file +[{"role": "system", "content": "你是一位资深的渗透测试专家,需要动态控制整个渗透测试过程,包括信息收集、漏洞扫描、漏洞利用等阶段,最终生成渗透测试报告。由你规划执行的指令,我会根据你的指令执行并提交结果,你再对结果进行分析,规划并生成下一步指令,直到完成渗透测试,生成测试报告。生成的指令需满足如下约束:1.只返回具体的shell指令或Python代码,不要包含注释和说明;2.shell指令以```bash(.*?)```包裹,python代码以```python(.*?)```包裹;3.针对提供的Python代码,需要包含错误处理,并将执行结果保存到文件中(为每个任务指定唯一文件名),执行结束返回success或failure和该文件名;4.如果认为渗透测试已完成,请生成生成报告的python代码,并返回success和complete"}, {"role": "user", "content": "现在开始对目标192.168.204.137进行渗透测试,已知信息无,请提供下一步执行的指令。"}, {"role": "assistant", "content": "```bash\nnmap -Pn -sS -sV -O -T4 -p- 192.168.204.137 -oN nmap_initial_scan.txt\n```"}] \ No newline at end of file diff --git a/tools/MsfconsoleTool.py b/tools/MsfconsoleTool.py index 0a3f428..82c0a13 100644 --- a/tools/MsfconsoleTool.py +++ b/tools/MsfconsoleTool.py @@ -9,8 +9,8 @@ class MsfconsoleTool(ToolBase): ''' Metasploit 这样的交互工具,保持一个会话,析构时结束会话 ''' - def __init__(self): - super().__init__() + def __init__(self,TM): + super().__init__(TM) self.bOK = False self.msf_lock = threading.Lock() # 线程锁 diff --git a/tools/NmapTool.py b/tools/NmapTool.py index 44fd37f..5bb3ee0 100644 --- a/tools/NmapTool.py +++ b/tools/NmapTool.py @@ -1,5 +1,6 @@ # Nmap工具类 import re +import json from typing import List, Dict, Any from tools.ToolBase import ToolBase @@ -47,23 +48,56 @@ class NmapTool(ToolBase): banners = [line.strip() for line in banner_pattern.findall(nmap_output)] result['banners'] = banners if banners else "No banner info found" - # 3. 提取 OS 信息 + # 3. 提取 OS 信息--目前存在问题 # 常见格式:Service Info: OS: Linux; CPE: cpe:/o:linux:linux_kernel + '''常见格式 + Running: Linux 2.6.X + OS CPE: cpe:/o:linux:linux_kernel:2.6 + OS details: Linux 2.6.9 - 2.6.33 + Network Distance: 1 hop + Service Info: Hosts: metasploitable.localdomain, irc.Metasploitable.LAN; OSs: Unix, Linux; CPE: cpe:/o:linux:linux_kernel + ''' os_match = re.search(r"Service Info:\s+OS:\s*([^;]+);", nmap_output, re.IGNORECASE) result['os_info'] = os_match.group(1).strip() if os_match else "No OS info found" # 4. 其他信息:原始输出作为补充 - result['raw'] = nmap_output + #result['raw'] = nmap_output except Exception as e: result['error'] = f"Error parsing nmap output: {e}" + result = json.dumps(result) return result + def extract_key_info(self,nmap_text): + """ + 利用正则表达式提取每一行的端口、状态、服务及版本/额外信息。 + 只处理形如 "端口/tcp 状态 服务 版本/额外信息" 的行,达到字符缩减的效果。 + """ + # 正则模式:匹配开头数字、/tcp、状态、服务名称,后续内容为可选的额外信息 + pattern = re.compile(r'^(\d+)/tcp\s+(\w+)\s+(\S+)(?:\s+(.*))?$') + key_info = [] + for line in nmap_text.splitlines(): + match = pattern.match(line) + if match: + port, state, service, extra = match.groups() + extra = extra.strip() if extra else "" + key_info.append({ + "port": port, + "state": state, + "service": service, + "info": extra + }) + return key_info + def analyze_result(self, result,instruction,stderr,stdout): # 检查结果 start_index = result.find("If you know the service/version") if start_index != -1: return result[:start_index] + #result = self.parse_nmap_output(result) + # tmpstr = self.extract_key_info(result) + # result = json.dumps(tmpstr) + return result \ No newline at end of file diff --git a/tools/OpensslTool.py b/tools/OpensslTool.py index b4a5450..860d309 100644 --- a/tools/OpensslTool.py +++ b/tools/OpensslTool.py @@ -47,16 +47,17 @@ class OpensslTool(ToolBase): except x509.ExtensionNotFound: pass - results.append({ - 'subject': str(cert_obj.subject), - 'issuer': str(cert_obj.issuer), - 'san': str(san_list), - 'validity': { - 'start': str(cert_obj.not_valid_before), - 'end': str(cert_obj.not_valid_after) - }, - 'signature_algorithm': str(cert_obj.signature_algorithm_oid._name) - }) + if cert_obj: + results.append({ + 'subject': str(cert_obj.subject), + 'issuer': str(cert_obj.issuer), + 'san': str(san_list), + 'validity': { + 'start': str(cert_obj.not_valid_before), + 'end': str(cert_obj.not_valid_after) + }, + 'signature_algorithm': str(cert_obj.signature_algorithm_oid._name) + }) return results diff --git a/tools/PythonTool.py b/tools/PythonTool.py deleted file mode 100644 index e91a585..0000000 --- a/tools/PythonTool.py +++ /dev/null @@ -1,80 +0,0 @@ -#python代码动态执行 - -from tools.ToolBase import ToolBase - -class PythonTool(ToolBase): - def validate_instruction(self, instruction): - #指令过滤 - timeout = 0 - return "",timeout - - def execute_instruction(self, instruction_old): - ''' - 执行指令:验证合法性 -> 执行 -> 分析结果 - :param instruction_old: - :return: - bool:true-正常返回给大模型,false-结果不返回给大模型 - str:执行的指令 - str:执行指令的结果 - ''' - ext_params = self.create_extparams() - # 定义允许的内置函数集合 - allowed_builtins = { - "abs": abs, - "all": all, - "any": any, - "bool": bool, - "chr": chr, - "dict": dict, - "float": float, - "int": int, - "len": len, - "list": list, - "max": max, - "min": min, - "print": print, - "range": range, - "set": set, - "str": str, - "sum": sum, - "type": type, - # 根据需要可以添加其他安全的内置函数 - } - # 第一步:验证指令合法性 - instruction,time_out = self.validate_instruction(instruction_old) - if not instruction: - return False, instruction_old, "该指令暂不执行!","",ext_params - # 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#? - - # 第二步:执行指令 - output = "" - # 构造安全的全局命名空间,只包含我们允许的 __builtins__ - safe_globals = { - "__builtins__": allowed_builtins, - } - try: - # 编译代码 - code_obj = compile(instruction, filename="", mode="exec") - # 在限制环境中执行代码 - exec(code_obj, safe_globals) - except Exception as e: - print(f"执行动态代码时出错: {e}") - - - # 第三步:分析执行结果 - analysis = self.analyze_result(output, instruction,"","") - # 指令和结果入数据库 - # ? - if not analysis: # analysis为“” 不提交LLM - return False, instruction, analysis,"",ext_params - return True, instruction, analysis,"",ext_params - - def analyze_result(self, result,instruction,stderr,stdout): - #指令结果分析 - return result - -if __name__ == "__main__": - llm_code = """ - def run_test(): - return 'Penetration test executed successfully!' - """ \ No newline at end of file diff --git a/tools/PythoncodeTool.py b/tools/PythoncodeTool.py new file mode 100644 index 0000000..1aeeb2d --- /dev/null +++ b/tools/PythoncodeTool.py @@ -0,0 +1,129 @@ +#python代码动态执行 +import ast +import subprocess +import json +import builtins +import re +from tools.ToolBase import ToolBase + +class PythonTool(ToolBase): + + def is_safe_code(self,code): + # List of high-risk functions to block (can be adjusted based on requirements) + HIGH_RISK_FUNCTIONS = ['eval', 'exec', 'os.system', 'subprocess.call', 'subprocess.Popen'] + + """Check if the code contains high-risk function calls.""" + try: + tree = ast.parse(code) + for node in ast.walk(tree): + if isinstance(node, ast.Call): + if isinstance(node.func, ast.Name) and node.func.id in HIGH_RISK_FUNCTIONS: + return False + elif isinstance(node.func, ast.Attribute) and node.func.attr in HIGH_RISK_FUNCTIONS: + return False + return True + except SyntaxError: + return False + + def validate_instruction(self, instruction): + #指令过滤 + timeout = 0 + instr = instruction.replace("python_code ","") + # Safety check + if not self.is_safe_code(instr): + return "", timeout + return instr,timeout + + def safe_import(self,name,*args,**kwargs): + ALLOWED_MODULES = ['subprocess', 'json','re'] + if name not in ALLOWED_MODULES: + raise ImportError(f"Import of '{name}' is not allowed") + return builtins.__import__(name, *args, **kwargs) + + def execute_instruction(self, instruction_old): + ''' + 执行指令:验证合法性 -> 执行 -> 分析结果 + :param instruction_old: + :return: + bool:true-正常返回给大模型,false-结果不返回给大模型 + str:执行的指令 + str:执行指令的结果 + ''' + ext_params = self.create_extparams() + + # 定义允许的内置函数集合 --白名单 + allowed_builtins = { + '__import__': builtins.__import__, + "abs": abs, + "all": all, + "any": any, + "bool": bool, + "chr": chr, + "dict": dict, + "float": float, + "int": int, + "len": len, + "list": list, + "max": max, + "min": min, + "print": print, + "range": range, + "set": set, + "str": str, + "sum": sum, + "type": type, + 'open':open, + 'Exception':Exception, + # 根据需要可以添加其他安全的内置函数 + } + # 第一步:验证指令合法性 + instruction,time_out = self.validate_instruction(instruction_old) + if not instruction: + return False, instruction_old, "该指令暂不执行!","",ext_params + # 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#? + + # 第二步:执行指令 + output = "" + try: + # 构造安全的全局命名空间,只包含我们允许的 __builtins__ + # 虽然动态代码中包含了import subprocess,但是还是需要在全局命名空间中添加subprocess这些库 + # 正常情况应该是不需要的,后续再研究 + safe_globals = {"__builtins__": allowed_builtins, + 'subprocess':subprocess, + 'json':json, + 're':re,} + safe_locals = {} #不需要预设局部参数 + # 在限制环境中执行代码 + exec(instruction, safe_globals,safe_locals) + # Check if check_samba_vuln is defined + if 'dynamic_fun' not in safe_locals: + analysis = "Function dynamic_fun() is not defined" + ext_params['is_use'] = True + return True,instruction,analysis,analysis,ext_params + # Get the function and call it + dynamic_fun = safe_locals['dynamic_fun'] + status, tmpout = dynamic_fun() + output = f"status:{status},output:{tmpout}" + except Exception as e: + analysis = f"执行动态代码时出错: {str(e)}" + ext_params['is_use'] = True + return True,instruction,analysis,analysis,ext_params + + + # 第三步:分析执行结果 + analysis = self.analyze_result(output, instruction,"","") + # 指令和结果入数据库 + # ? + if not analysis: # analysis为“” 不提交LLM + return False, instruction, analysis,"",ext_params + return True, instruction, analysis,"",ext_params + + def analyze_result(self, result,instruction,stderr,stdout): + #指令结果分析 + return result + +if __name__ == "__main__": + llm_code = """ + def run_test(): + return 'Penetration test executed successfully!' + """ \ No newline at end of file diff --git a/tools/SearchsploitTool.py b/tools/SearchsploitTool.py index e762a02..ddcdda8 100644 --- a/tools/SearchsploitTool.py +++ b/tools/SearchsploitTool.py @@ -1,29 +1,130 @@ from tools.ToolBase import ToolBase import re import json +import subprocess +import os +import shutil +from pathlib import Path class SearchsploitTool(ToolBase): def validate_instruction(self, instruction): #指令过滤 timeout = 0 + if "-m " in instruction: # 下载利用代码 + timeout = 60 return instruction,timeout def analyze_result(self, result,instruction,stderr,stdout): + #获取当前路径 + cur_path = Path(__file__).resolve().parent + payload_dir = cur_path / "../payload" + + #if instruction(result,bytes): + if type(result) is bytes: + result = result.decode('utf-8',errors='ignore') + """去除 ANSI 颜色码""" ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') clean_result = ansi_escape.sub('', result) - #指令结果分析 - lines = clean_result.split("\n") - exploits = [] - - for line in lines: - match = re.match(r"(.+?)\s+\|\s+(\S+)", line) - if match: - title = match.group(1).strip() - path = match.group(2).strip() - exploits.append({"title": title, "path": path}) - - if len(exploits) > 0: - result = json.dumps(exploits) #需要转化成字符串-必须 + # 指令结果分析 + if "-m " in instruction:#下载利用代码 + if "Copied to:" in clean_result or "cp: overwrite " in clean_result: + result = "下载完成" + else: + result = "下载失败" + else: #目前只遇到两种searchsploit命令格式: + lines = clean_result.split("\n") + exploits = [] + + for line in lines: + match = re.match(r"(.+?)\s+\|\s+(\S+)", line) + if match: + title = match.group(1).strip() + path = match.group(2).strip() + if "Path" not in path: + #下载渗透脚本--进一步推进 + ext_str = self.do_ext(path,payload_dir) + exploits.append({"title": title, "path": path,"补充信息":ext_str}) + + if len(exploits) > 0: + result = json.dumps(exploits,ensure_ascii=False) #输出原始的中文字符 + else: + result = "没有检索到漏洞利用脚本" + return result + + def find_file_in_directory(self,target_file, dir_path, case_sensitive=True, recursive=False): + """ + 在指定目录中查找文件是否存在 + :param target_file: 要查找的目标文件名(含扩展名) + :param search_dir: 要搜索的目录路径 + :param case_sensitive: 是否区分大小写(默认True) + :param recursive: 是否递归搜索子目录(默认False) + :return: (bool, str) 是否存在,完整路径(如果找到) + """ + try: + #dir_path = Path(search_dir) + + # 验证目录是否存在 + if not dir_path.is_dir(): + return False, None + + # 根据是否递归选择遍历方式 + iterator = dir_path.rglob('*') if recursive else dir_path.iterdir() + + for entry in iterator: + # 跳过目录只处理文件 + if entry.is_file(): + # 根据是否区分大小写进行比较 + if case_sensitive: + match = entry.name == target_file + else: + match = entry.name.lower() == target_file.lower() + + if match: + return True, str(entry.resolve()) + + return False, None + + except Exception as e: + print(f"查找出错: {str(e)}") + return False, None + + def do_download(self,filepath,dirpath): + ''' + 下载渗透脚本 + :return: + ''' + filename = filepath.split("/")[-1] + if ".py" in filename: + #对应payload库中是否存在该脚本 + bfind,_ = self.find_file_in_directory(filename,dirpath) + if bfind: + return True + else:#下载 + instruction = f"searchsploit -m {filepath}" + try: + subprocess.run(instruction, shell=True, check=True,timeout=60) + print("命令执行成功,文件下载完成。") + except subprocess.CalledProcessError as e: #超时暂不处理--后续补充 + print(f"命令执行失败: {e}") + return False + + if not os.path.exists(filename): + return False + # 移动文件到目标目录 + try: + shutil.move(filename, os.path.join(dirpath, filename)) + print(f"文件已成功移动到 {dirpath}") + return True + except Exception as e: + print(f"移动文件失败: {e}") + return False + else: #暂时只利用python脚本 + return False + + + def do_ext(self,filepath,payload_dir) -> str: + bdownload = self.do_download(filepath,payload_dir) + if bdownload: + return "该渗透脚本已经复制到当前路径。" else: - result = "没有检索到漏洞利用脚本" - return result \ No newline at end of file + return "暂时不利用该渗透脚本。" diff --git a/tools/ToolBase.py b/tools/ToolBase.py index 506567d..75557f0 100644 --- a/tools/ToolBase.py +++ b/tools/ToolBase.py @@ -13,10 +13,10 @@ import sys from myutils.ReturnParams import ReturnParams class ToolBase(abc.ABC): - def __init__(self): + def __init__(self,TM): #self.res_type = 0 #补充一个结果类型 0-初始状态,1-有安全问题, - #由于工具类会被多个线程调用,不能有全局变量 - pass + #由于工具类会被多个线程调用,全局变量不能修改,只能读取 + self.TM = TM def create_extparams(self): @@ -83,11 +83,17 @@ class ToolBase(abc.ABC): #-o 的命令需要处理 parsed_arg = self.parse_sublist3r_command(instruction) if parsed_arg.output: - output = self.read_output_file(parsed_arg.output) + file_out = self.read_output_file(parsed_arg.output) + stderr = "" + stdout = file_out elif parsed_arg.Noutput: - output = self.read_output_file(parsed_arg.Noutput) + file_out = self.read_output_file(parsed_arg.Noutput) + stderr = "" + stdout = file_out elif parsed_arg.nikto: - output = self.read_output_file(parsed_arg.nikto) + file_out = self.read_output_file(parsed_arg.nikto) + stderr = "" + stdout = file_out else: stderr = result.stderr stdout = result.stdout