From 9cd92ffd326b57a1d197725d1709bb928a11de48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E9=BE=99?= Date: Thu, 20 Mar 2025 16:34:03 +0800 Subject: [PATCH] V0.1.1 node_tree_0.3 --- TaskManager.py | 68 ++++++++++++---------------- mycode/ControlCenter.py | 55 ++++++++++------------- mycode/LLMManager.py | 98 +++++++++++++++++++++++++---------------- 3 files changed, 113 insertions(+), 108 deletions(-) diff --git a/TaskManager.py b/TaskManager.py index afd33f5..7981507 100644 --- a/TaskManager.py +++ b/TaskManager.py @@ -35,7 +35,7 @@ class TaskManager: self.batch_num = 0 #一个批次的指令数量 self.long_instr_num = 0 #耗时指令数量 self.long_time_instr = ['nikto'] #耗时操作不计入批量执行的数量,不加不减 - self.node_queue = [] # + self.node_queue = [] #2025-3-19修改为list,兼容pickle反序列化 self.lock = threading.Lock() #线程锁 self.node_num = 0 #在处理Node线程的处理 @@ -72,6 +72,7 @@ class TaskManager: if self.node_queue: # 获取测试节点 work_node = self.node_queue.pop(0) + #测试时使用 self.node_num += 1 if work_node: @@ -82,35 +83,23 @@ class TaskManager: end_time = get_local_timestr() # 指令执行结束时间 self.res_in_quere(bres, instr, reslut, start_time, end_time, th_DBM, source_result, ext_params,work_node) # 执行结果入队列 - #保存记录--测试使用 + + # #针对一个节点的指令执行完成后,提交LLM规划下一步操作 + # node_list = self.CCM.get_llm_instruction(work_node) + # if not node_list:#该节点测试完成----没有返回新指令 + # continue #取下一个节点 + # for node in node_list: #新指令入队列 + # self.node_queue.put(node) + + # 保存记录--测试时使用--后期增加人为停止测试时可以使用 with self.lock: - self.node_num -=1 - if self.node_num == 0 and len(self.node_queue) == 0: + self.node_num -= 1 + if self.node_num == 0 and len(self.node_queue) == 0: # with open("attack_tree", 'wb') as f: pickle.dump(TM.CCM.attack_tree, f) - #针对一个节点的指令执行完成后,提交LLM规划下一步操作 - # node_list = self.CCM.get_llm_instruction(work_node) - # if not node_list:#该节点测试完成 - # continue - # for node in node_list: - # self.node_queue.put(node) - else: + else: #没有带处理的Node time.sleep(10) - # try: - # instruction = self.instr_queue.get(block=False) #quere线程安全,block=false非阻塞get - # self.doing_instr.put(instruction) #入执行队列--忘了入执行队列的作用 - # #执行中会对指令进行微调,并有可能不执行直接返回空结果 - # start_time = get_local_timestr() #指令执行开始时间 - # bres,instr,reslut,source_result,ext_params = self.InstrM.execute_instruction(instruction) - # end_time = get_local_timestr() #指令执行结束时间 - # self.res_in_quere(bres,instr,reslut,start_time,end_time,th_DBM,source_result,ext_params) #执行结果入队列 - # self.done_instr.put(instruction) #执行完成队列 - # #执行情况是否需要用户确认 - # if ext_params["is_user"]: - # pass - # #print("该指令执行需要用户确认") - # except queue.Empty: - # time.sleep(10) + #函数结束,局部变量自动释放 def start_task(self,target_name,target_in): @@ -156,12 +145,12 @@ if __name__ == "__main__": current_path = os.path.dirname(os.path.realpath(__file__)) strMsg = FM.read_file("test",1) - test_type = 3 + test_type = 2 if test_type == 1: #测试执行指令 with open("attack_tree", "rb") as f: TM.CCM.attack_tree = pickle.load(f) - # 遍历node,查看有res的数据 + # 遍历node,查看有instr的ndoe nodes = TM.CCM.attack_tree.traverse_bfs() for node in nodes: if node.instr_queue: # list @@ -182,17 +171,12 @@ if __name__ == "__main__": #遍历node,查看有res的数据 nodes = TM.CCM.attack_tree.traverse_bfs() for node in nodes: - if node.res_quere: #list - node_list = TM.CCM.get_llm_instruction(node) - if not node_list:#该节点测试完成 - continue - with TM.lock: - for do_node in node_list: - TM.node_queue.append(do_node) - # 暂存状态-- - with open("attack_tree", 'wb') as f: - pickle.dump(TM.CCM.attack_tree, f) - elif test_type == 3: #新目标测试 + if node.res_quere: #有结果需要提交LLM + node_list = TM.CCM.get_llm_instruction(node) #包括对节点操作 + # 暂存状态-- + with open("attack_tree", 'wb') as f: + pickle.dump(TM.CCM.attack_tree, f) + elif test_type == 0: #新目标测试 # 启动--初始化指令 node_list = TM.CCM.start_do("192.168.204.137", 0) with TM.lock: @@ -202,7 +186,11 @@ if __name__ == "__main__": with open("attack_tree",'wb') as f: pickle.dump(TM.CCM.attack_tree,f) elif test_type == 4: #读取messages - pass + with open("attack_tree", "rb") as f: + TM.CCM.attack_tree = pickle.load(f) + # 遍历node,查看有instr的ndoe + nodes = TM.CCM.attack_tree.traverse_bfs() + print(nodes[0].messages) else: #完整过程测试---要设定终止条件 pass diff --git a/mycode/ControlCenter.py b/mycode/ControlCenter.py index ccce462..e761912 100644 --- a/mycode/ControlCenter.py +++ b/mycode/ControlCenter.py @@ -62,9 +62,7 @@ class ControlCenter: # json_str = json.dumps(item, ensure_ascii=False) # json_strings.append(json_str) # res_str = ','.join(json_strings) - res_str = json.dumps(node.res_quere,ensure_ascii=False) - #构造本次提交的prompt user_Prompt = f''' 当前分支路径:{node.path} @@ -83,38 +81,33 @@ class ControlCenter: return node_list def tree_manager(self,node_cmds,node): - '''更新渗透测试树''' + '''更新渗透测试树 + node_cmds是json-list + ''' if not node_cmds or len(node_cmds)==0: return - try: - node_jsons = json.loads(node_cmds) - for node_json in node_jsons: - if node_json["action"] == "add_node": #新增节点 - parent_node_name = node_json["parent"] - node_name = node_json["node"] - status = node_json["status"] - #新增节点原则上应该都是当前节点增加子节点 - if node.name == parent_node_name: - new_node = TreeNode(node_name,status) - node.add_child(new_node) #message的传递待验证 - else: - self.logger.error("添加子节点时,遇到父节点名称不一致的,需要介入!!") - elif node_json["action"] == "update_status": - node_name = node_json["node"] - status = node_json["status"] - vul_type = node_json["vulnerability"] - if node.name == node_name: - node.status = status - node.vul_type = vul_type - else: - self.logger.error("遇到不是修改本节点状态的,需要介入!!") + for node_json in node_cmds: + if node_json["action"] == "add_node": #新增节点 + parent_node_name = node_json["parent"] + node_name = node_json["node"] + status = node_json["status"] + #新增节点原则上应该都是当前节点增加子节点 + if node.name == parent_node_name: + new_node = TreeNode(node_name,status) + node.add_child(new_node) #message的传递待验证 else: - self.logger.error("node更新JSON遇到异常参数!") - except json.JSONDecodeError as e: - self.logger.error(f"JSON 解析失败:{e.msg}") - self.logger.error(f"错误位置:第{e.lineno}行,列{e.colno}") - except TypeError as e: - self.logger.error(f"输入类型错误:{str(e)}") + self.logger.error("添加子节点时,遇到父节点名称不一致的,需要介入!!") + elif node_json["action"] == "update_status": + node_name = node_json["node"] + status = node_json["status"] + vul_type = node_json["vulnerability"] + if node.name == node_name: + node.status = status + node.vul_type = vul_type + else: + self.logger.error("遇到不是修改本节点状态的,需要介入!!") + else: + self.logger.error("node更新JSON遇到异常参数!") def instr_in_node(self,commands,node): node_list = [] diff --git a/mycode/LLMManager.py b/mycode/LLMManager.py index 728c61b..e6b7884 100644 --- a/mycode/LLMManager.py +++ b/mycode/LLMManager.py @@ -57,39 +57,45 @@ class LLMManager: def build_initial_prompt(self,target,know_info="无",node=None): if not node: return None + '''暂时未添加到提示词 + **核心要求**: + - 每次新增节点时,必须在同一响应中为该节点提供测试指令。 + - 分批新增节点,每次响应中新增节点不超过3个,确保指令完整。 + ''' #根节点初始化message node.messages = [{"role": "system", "content":''' -你是一位渗透测试专家,由你动态控制整个渗透测试过程,根据当前测试状态和返回结果,决定下一步的测试指令,推动测试阶段前进,直至完成渗透测试。本地程序会根据你的指令进行执行,然后将执行结果返回给你。 -总体要求说明: -1.以目标系统所在IP为根节点,随着信息收集和渗透测试的推进,每个渗透测试点(如端口、服务、漏洞点)作为子节点,形成树型结构(测试树); -2.测试树整体数据由本地程序存储,你只需要关注当前节点的渗透测试推进,节点状态(未完成、已完成)的更新,和是否有子节点新增; -3.你需要返回两类指令:节点指令和测试指令,以空行间隔; ---示例: -{\"action\":\"add_node\", \"parent\": \"192.168.1.100\", \"node\": \"21端口\", \"status\": \"未完成\"} +你是一位渗透测试专家,负责动态控制整个渗透测试过程,根据当前测试状态和返回结果,决定下一步测试指令,推动测试前进,直至完成渗透测试。本地程序会执行你的指令进并返回结果。 +**总体要求** +1.以目标系统IP为根节点,每个渗透测试点(如端口、服务、漏洞点)作为子节点,形成树型结构(测试树); +2.测试树整体数据由本地程序存储,你只需要关注当前节点的测试推进、状态更新(未完成/已完成)及子节点新增; +3.返回两类指令:节点指令和测试指令,以空行间隔,不要包含注释和说明; +4.若无节点修改或新增,节点指令可以为空,但测试指令必须对应已有节点。 +**决策流程** +1. 若当前节点是IP且未进行端口扫描,则执行端口扫描; +2. 若端口扫描发现开放端口,为每个端口新增节点并提供测试指令; +3. 若当前节点是端口且未进行服务扫描,则执行服务扫描; +4. 若服务扫描发现服务版本或漏洞,则新增漏洞测试节点并提供测试指令; +5. 若漏洞利用成功,则根据结果决定是否新增子节点并提供测试指令; +6. 若节点测试无新信息,则更新状态为“已完成”。 +**节点指令格式** +- 新增节点:{\"action\":\"add_node\", \"parent\": \"80端口\", \"node\": \"http://192.168.1.100/index.php?id=1\", \"status\": \"未完成\"}; +- 更新节点未发现漏洞:{\"action\": \"update_status\", \"node\": \"21端口\", \"status\": \"已完成\"}; +- 更新节点发现漏洞:{\"action\": \"update_status\", \"node\": \"21端口\", \"status\": \"已完成\",\"vulnerability\": {\"name\":\"ftp匿名登录\",\"risk\":\"高\"}}; +**测试指令格式** +- shell指令:```bash-[节点路径](.*?)```包裹,需要避免用户交互; +- python指令:```python-[节点路径](.*?)```包裹,主函数名为dynamic_fun,需包含错误处理,执行结束后必须返回一个tuple (status, output),其中status为'success'或'failure',output为补充输出信息; +- [节点路径]为从根节点到目标节点的完整层级描述。 +**响应示例** +{\"action\":\"add_node\", \"parent\": \"192.168.1.100\", \"node\": \"3306端口\", \"status\": \"未完成\"} ```bash-[目标系统->192.168.1.100->3306端口] -mysql -u root -p 192.168.1.100``` -若无节点修改或新增,节点指令可以为空,但测试指令必须对应已有节点; -决策流程: -1. 如果当前节点是IP地址,且未进行端口扫描,则执行端口扫描。 -2. 如果端口扫描发现开放端口,则为每个开放端口新增一个测试节点。 -3. 如果当前节点是端口,且未进行服务扫描,则执行服务扫描。 -4. 如果服务扫描发现服务版本或漏洞,则新增对应的漏洞测试节点。 -5. 如果当前节点是漏洞测试,且漏洞利用成功,则根据利用结果决定是否新增子节点进一步测试。 -6. 如果当前节点测试未发现新信息,则更新节点状态为“已完成”,并继续测试其他未完成节点。 -生成的节点指令需要满足如下约束: -1.新增节点指令示例:{\"action\":\"add_node\", \"parent\": \"80端口\", \"node\": \"http://192.168.1.100/index.php?id=1\", \"status\": \"未完成\"}; -2.新增子节点时,同一请求返回的子节点名(node)不能相同,且必须同时提供对该节点的测试指令; -3.若认为该节点已完成测试,修改该节点为“已完成”状态,完成节点测试且发现漏洞示例:{\"action\": \"update_status\", \"node\": \"21端口\", \"status\": \"已完成\",\"vulnerability\": \"ftp匿名登录\"}; -4.发现漏洞后,可根据漏洞类型决定是否新增子节点继续测试,如:{\"action\": \"add_node\", \"parent\": \"21端口\", \"node\": \"ftp配置检查\", \"status\": \"未完成\"}; -生成的渗透测试指令需满足如下约束: -1.只返回具体的shell指令或Python代码,不要包含注释和说明; -2.shell指令以```bash-[对应节点的路径](.*?)```包裹,python代码以```python-[对应节点的路径](.*?)```包裹,[对应节点的路径]为从根节点到目标节点的完整层级描述; -3.若提供的是shell指令,需要避免用户再次交互; -4.若提供的是python代码,主函数名为dynamic_fun,需包含错误处理,执行结束后必须返回一个tuple (status, output),其中status为'success'或'failure',output为补充输出信息; -示例: -```python-[目标系统->192.168.1.100->3306端口] +mysql -u root -p 192.168.1.100 +``` + +{\"action\":\"add_node\", \"parent\": \"192.168.1.100\", \"node\": \"22端口\", \"status\": \"未完成\"} + +```python-[目标系统->192.168.1.100->22端口] def dynamic_fun(): try: result = "扫描完成" @@ -97,9 +103,7 @@ def dynamic_fun(): except Exception as e: return ("failure", str(e)) ``` -限制条件: -1.仅在发现高危漏洞或关键测试路径时新增节点 - '''}] # 一个messages +'''}] # 一个messages user_Prompt = f''' 当前分支路径:目标系统->{target} 当前节点信息: @@ -211,11 +215,23 @@ def dynamic_fun(): commands.append(shell_blocks[shell_index]) shell_index +=1 else: - #其他的认为是节点操作指令 - node_cmds.append(part) - + #其他的认为是节点操作指令--指令格式还存在不确定性,需要正则匹配 + pattern = re.compile(r'\{.*?\}', re.DOTALL) + # 遍历所有匹配到的 JSON 结构 + for match in pattern.findall(part): + try: + node_cmds.append(json.loads(match)) # 解析 JSON 并添加到列表 + except json.JSONDecodeError as e:#解析不了的不入队列 + print(f"JSON 解析错误: {e}") return node_cmds,commands + def llm_error_feedback(self,response_text,node): + '''验证llm返回--错误反馈机制--兼容LLM的不稳定性,需要持续补充 + 1.验证节点是否都有测试指令返回 + ''' + pass + + def test_llm(self): with open("../test", "r", encoding="utf-8") as f: messages = json.load(f) @@ -227,7 +243,15 @@ def dynamic_fun(): if __name__ == "__main__": - LM = LLMManager(1) - LM.test_llm() - + # LM = LLMManager(1) + # LM.test_llm() + tlist1 = [] + tlist2 = [] + tlist2.append(1) + if not tlist1: + print("list1空") + if not tlist2: + print("list2空") + if tlist2: + print("list2不为空")