diff --git a/TaskManager.py b/TaskManager.py index d0c8fdd..afd33f5 100644 --- a/TaskManager.py +++ b/TaskManager.py @@ -3,7 +3,7 @@ ''' from TargetManager import TargetManager # 从模块导入类 #from LLMManager import LLMManager # 同理修正其他导入 -from mycode import ControlCenter #控制中心替代LLM--控制中心要实现一定的基础逻辑和渗透测试树的维护。 +from mycode.ControlCenter import ControlCenter #控制中心替代LLM--控制中心要实现一定的基础逻辑和渗透测试树的维护。 from myutils.FileManager import FileManager from InstructionManager import InstructionManager from mycode.DBManager import DBManager @@ -19,7 +19,7 @@ class TaskManager: def __init__(self): self.TargetM = TargetManager() - self.logger = LogHandler.get_logger("TaskManager") + self.logger = LogHandler().get_logger("TaskManager") # 生成功能对象 self.DBM = DBManager() #主进程一个DBM if not self.DBM.connect(): @@ -35,7 +35,7 @@ class TaskManager: self.batch_num = 0 #一个批次的指令数量 self.long_instr_num = 0 #耗时指令数量 self.long_time_instr = ['nikto'] #耗时操作不计入批量执行的数量,不加不减 - self.node_queue = queue.Queue() #线程安全 --待执行指令 + self.node_queue = [] # self.lock = threading.Lock() #线程锁 self.node_num = 0 #在处理Node线程的处理 @@ -59,7 +59,7 @@ class TaskManager: self.logger.error("数据库连接失败!!") #结果入队列---2025-3-18所有的指令均需返回给LLM便于节点状态的更新,所以bres作用要调整。 - res = {'instr':instr,'reslut':reslut} + res = {'执行指令':instr,'结果':reslut} work_node.add_res(res) #入节点结果队列 def do_worker_th(self): @@ -67,12 +67,14 @@ class TaskManager: th_DBM = DBManager() th_DBM.connect() while self.brun: - try: - with self.lock: + work_node = None + with self.lock: + if self.node_queue: + # 获取测试节点 + work_node = self.node_queue.pop(0) self.node_num += 1 - #获取测试节点 - work_node = self.node_queue.get(block=False) + if work_node: #开始执行指令 for instruction in work_node.instr_queue: start_time = get_local_timestr() #指令执行开始时间 @@ -83,7 +85,7 @@ class TaskManager: #保存记录--测试使用 with self.lock: self.node_num -=1 - if self.node_num ==0 and self.node_queue.empty(): + if self.node_num == 0 and len(self.node_queue) == 0: with open("attack_tree", 'wb') as f: pickle.dump(TM.CCM.attack_tree, f) #针对一个节点的指令执行完成后,提交LLM规划下一步操作 @@ -92,7 +94,7 @@ class TaskManager: # continue # for node in node_list: # self.node_queue.put(node) - except queue.Empty: + else: time.sleep(10) # try: # instruction = self.instr_queue.get(block=False) #quere线程安全,block=false非阻塞get @@ -122,8 +124,9 @@ class TaskManager: know_info = "无" #? #启动--初始化指令 node_list = self.CCM.start_do(target,self.task_id) - for node in node_list: - self.node_queue.put(node) + with self.lock: + for node in node_list: + self.node_queue.append(node) #创建工作线程----2025-3-18调整为一个节点一个线程, for i in range(self.max_thread_num): @@ -158,8 +161,11 @@ if __name__ == "__main__": #测试执行指令 with open("attack_tree", "rb") as f: TM.CCM.attack_tree = pickle.load(f) - with open("node_list", 'rb') as f: - TM.node_queue = pickle.load(f) + # 遍历node,查看有res的数据 + nodes = TM.CCM.attack_tree.traverse_bfs() + for node in nodes: + if node.instr_queue: # list + TM.node_queue.append(node) #创建线程执行指令 for i in range(TM.max_thread_num): @@ -176,23 +182,27 @@ if __name__ == "__main__": #遍历node,查看有res的数据 nodes = TM.CCM.attack_tree.traverse_bfs() for node in nodes: - if not node.res_quere.empty(): + if node.res_quere: #list node_list = TM.CCM.get_llm_instruction(node) if not node_list:#该节点测试完成 continue - for do_node in node_list: - TM.node_queue.put(do_node) - elif test_type ==3: #新目标测试 + with TM.lock: + for do_node in node_list: + TM.node_queue.append(do_node) + # 暂存状态-- + with open("attack_tree", 'wb') as f: + pickle.dump(TM.CCM.attack_tree, f) + elif test_type == 3: #新目标测试 # 启动--初始化指令 node_list = TM.CCM.start_do("192.168.204.137", 0) - for node in node_list: - TM.node_queue.put(node) + with TM.lock: + for node in node_list: + TM.node_queue.append(node) #暂存状态-- with open("attack_tree",'wb') as f: pickle.dump(TM.CCM.attack_tree,f) - with open("node_list",'wb') as f: - pickle.dump(TM.node_queue,f) - + elif test_type == 4: #读取messages + pass else: #完整过程测试---要设定终止条件 pass diff --git a/mycode/AttackMap.py b/mycode/AttackMap.py index b928262..ab2bf4d 100644 --- a/mycode/AttackMap.py +++ b/mycode/AttackMap.py @@ -57,6 +57,8 @@ class AttackTree: current_node = self.root #从根节点开始 node_names = node_path.split('->') for node_name in node_names: + if node_name == "目标系统": + continue if node_name == current_node.name:#根节点开始 continue else: @@ -114,8 +116,8 @@ class TreeNode: self.children = [] # 子节点列表 self.parent = None # 父节点引用 self.path = "" #当前节点的路径 - self.instr_queue = queue.Queue() #针对当前节点的执行指令----重要约束:一个节点只能有一个线程在执行指令 - self.res_quere = queue.Queue() #指令执行的结果,一批一批 + self.instr_queue = [] #queue.Queue() #针对当前节点的执行指令----重要约束:一个节点只能有一个线程在执行指令 + self.res_quere = [] #queue.Queue() #指令执行的结果,一批一批 self.llm_sn = 0 #针对该节点llm提交次数 self.do_sn = 0 #针对该节点instr执行次数 self.messages = [] #针对当前节点积累的messages -- 针对不同节点提交不同的messages @@ -127,20 +129,16 @@ class TreeNode: self.children.append(child_node) def add_instr(self,instr): - self.instr_queue.put(instr) + self.instr_queue.append(instr) def get_instr(self): - if self.instr_queue.empty(): - return None - return self.instr_queue.get() + return self.instr_queue.pop(0) if self.instr_queue else None def add_res(self,str_res): #结构化结果字串 - self.res_quere.put(str_res) + self.res_quere.append(str_res) def get_res(self): - if self.res_quere.empty(): - return None - return self.res_quere.get() + return self.res_queue.pop(0) if self.res_queue else None def __repr__(self): return f"TreeNode({self.name}, {self.status}, {self.vul_type})" diff --git a/mycode/ControlCenter.py b/mycode/ControlCenter.py index 4a05fce..ccce462 100644 --- a/mycode/ControlCenter.py +++ b/mycode/ControlCenter.py @@ -11,7 +11,7 @@ from myutils.MyLogger_logger import LogHandler class ControlCenter: def __init__(self,DBM): - self.logger = LogHandler.get_logger("ControlCenter") + self.logger = LogHandler().get_logger("ControlCenter") self.task_id = None self.target = None self.attack_tree = None @@ -55,16 +55,15 @@ class ControlCenter: #约束1:一个节点只能同时提交一次,未测试的节点不要重复 def get_llm_instruction(self,node): - #拼接结果字符串 - json_strings = [] - while True: - try: - item = node.res_quere.get_nowait() - json_str = json.dumps(item,ensure_ascii=False) - json_strings.append(json_str) - except queue.Empty: - break - res_str = ','.join(json_strings) + #拼接结果字符串--由于测试需要queue改成了list 2025-3-19 + # json_strings = [] + # if node.res_quere: + # for item in node.res_quere: + # json_str = json.dumps(item, ensure_ascii=False) + # json_strings.append(json_str) + # res_str = ','.join(json_strings) + + res_str = json.dumps(node.res_quere,ensure_ascii=False) #构造本次提交的prompt user_Prompt = f''' @@ -85,6 +84,8 @@ class ControlCenter: def tree_manager(self,node_cmds,node): '''更新渗透测试树''' + if not node_cmds or len(node_cmds)==0: + return try: node_jsons = json.loads(node_cmds) for node_json in node_jsons: @@ -125,7 +126,7 @@ class ControlCenter: find_node = self.attack_tree.find_node_by_nodepath(node_path) if find_node: instruction = re.sub(r'\[.*?\]', "", command,count=1,flags=re.DOTALL) - find_node.instr_queue.put(instruction) + find_node.instr_queue.append(instruction) #入输出队列 if find_node not in node_list: node_list.append(find_node) @@ -135,7 +136,7 @@ class ControlCenter: self.logger.error(f"得到的指令格式不符合规范:{command}") return node_list - + #待修改 def is_user_instr(self,instr): ''' 过滤需要人工确认或手动执行的指令 ---- 待完善 @@ -170,7 +171,7 @@ class ControlCenter: with self.lock: self.long_instr_num +=1 #耗时指令数量+1 # 指令入队列 - self.instr_queue.put(instr) + self.instr_queue.append(instr) def stop_do(self): #清空数据 diff --git a/mycode/LLMManager.py b/mycode/LLMManager.py index d1a240e..728c61b 100644 --- a/mycode/LLMManager.py +++ b/mycode/LLMManager.py @@ -55,53 +55,59 @@ class LLMManager: # 构建初始提示 初始化messages def build_initial_prompt(self,target,know_info="无",node=None): - # self.messages = [{"role": "system", - # "content": "你是一位渗透测试专家,需要动态控制整个渗透测试过程,包括信息收集、漏洞扫描、漏洞利用等阶段,最终生成渗透测试报告。由你规划执行的指令,我会根据你的指令执行并提交结果,你再对结果进行分析,规划并生成下一步指令,直到完成渗透测试,生成测试报告。" - # "生成的指令需满足如下约束:" - # "1.只返回具体的shell指令或Python代码,不要包含注释和说明;" - # "2.shell指令以```bash(.*?)```包裹,python代码以```python(.*?)```包裹;" - # "3.若提供的是shell指令,需要避免用户再次交互;" - # "4.若提供的是python代码,主函数名为dynamic_fun,需要包含错误处理,执行结束返回两个str(status-'success'或'failure'和output-补充输出信息);" - # "5.如果认为渗透测试已完成,请生成生成报告的python代码,并返回'complete'和报告文件路径"}] # 一个messages - #return f"现在开始对目标{target}进行渗透测试,已知信息{know_info},请提供下一步执行的指令。" if not node: return None #根节点初始化message node.messages = [{"role": "system", - "content":''' - 你是一位渗透测试专家,由你动态控制整个渗透测试过程,根据当前测试状态和返回结果,决定下一步的测试指令,推动测试阶段前进,直至完成渗透测试。本地程序会根据你的指令进行执行,然后将执行结果返回给你。 - 整体过程说明: - 1.以目标系统所在IP为根节点,每个渗透测试点(如端口、服务、漏洞点)作为子节点,形成树型结构(测试树); - 2.测试树整体数据由本地程序存储,你只需要关注当前节点的渗透测试推进,节点状态(未完成、已完成)的更新,和是否有子节点新增; - 3.新增子节点时,同一父节点下的子节点名称不能相同,并提供对该节点的测试指令; - 3.新增节点示例:{\"action\":\"add_node\", \"parent\": \"80端口\", \"node\": \"http://192.168.1.100/index.php?id=1\", \"status\": \"未完成\"}; - 4.完成节点测试且发现漏洞示例:{\"action\": \"update_status\", \"node\": \"21端口\", \"status\": \"已完成\",\"vulnerability\": \"ftp匿名登录\"}; - 5.发现漏洞后,可根据漏洞类型决定是否新增子节点继续测试,如:{\"action\": \"add_node\", \"parent\": \"21端口\", \"node\": \"ftp配置检查\", \"status\": \"未完成\"}; - 6.当本地程序确认所有节点状态均为“已完成”时,生成测试报告。 - 生成的渗透测试指令需满足如下约束: - 1.只返回具体的shell指令或Python代码,不要包含注释和说明; - 2.shell指令以```bash-[对应节点的路径](.*?)```包裹,python代码以```python-[对应节点的路径](.*?)```包裹; - 3.若提供的是shell指令,需要避免用户再次交互; - 4.若提供的是python代码,主函数名为dynamic_fun,需包含错误处理,执行结束后返回一个tuple (status, output),其中status为'success'或'failure',output为补充输出信息; - 示例: - ```python-[目标系统->192.168.1.100->3306端口] - def dynamic_fun(): - try: - result = "扫描完成" - return ("success", result) - except Exception as e: - return ("failure", str(e)) - ``` - 5.如果认为渗透测试已完成,请生成生成报告的python代码,并返回'complete'和报告文件路径。 - '''}] # 一个messages + "content":''' +你是一位渗透测试专家,由你动态控制整个渗透测试过程,根据当前测试状态和返回结果,决定下一步的测试指令,推动测试阶段前进,直至完成渗透测试。本地程序会根据你的指令进行执行,然后将执行结果返回给你。 +总体要求说明: +1.以目标系统所在IP为根节点,随着信息收集和渗透测试的推进,每个渗透测试点(如端口、服务、漏洞点)作为子节点,形成树型结构(测试树); +2.测试树整体数据由本地程序存储,你只需要关注当前节点的渗透测试推进,节点状态(未完成、已完成)的更新,和是否有子节点新增; +3.你需要返回两类指令:节点指令和测试指令,以空行间隔; +--示例: +{\"action\":\"add_node\", \"parent\": \"192.168.1.100\", \"node\": \"21端口\", \"status\": \"未完成\"} + +```bash-[目标系统->192.168.1.100->3306端口] +mysql -u root -p 192.168.1.100``` +若无节点修改或新增,节点指令可以为空,但测试指令必须对应已有节点; +决策流程: +1. 如果当前节点是IP地址,且未进行端口扫描,则执行端口扫描。 +2. 如果端口扫描发现开放端口,则为每个开放端口新增一个测试节点。 +3. 如果当前节点是端口,且未进行服务扫描,则执行服务扫描。 +4. 如果服务扫描发现服务版本或漏洞,则新增对应的漏洞测试节点。 +5. 如果当前节点是漏洞测试,且漏洞利用成功,则根据利用结果决定是否新增子节点进一步测试。 +6. 如果当前节点测试未发现新信息,则更新节点状态为“已完成”,并继续测试其他未完成节点。 +生成的节点指令需要满足如下约束: +1.新增节点指令示例:{\"action\":\"add_node\", \"parent\": \"80端口\", \"node\": \"http://192.168.1.100/index.php?id=1\", \"status\": \"未完成\"}; +2.新增子节点时,同一请求返回的子节点名(node)不能相同,且必须同时提供对该节点的测试指令; +3.若认为该节点已完成测试,修改该节点为“已完成”状态,完成节点测试且发现漏洞示例:{\"action\": \"update_status\", \"node\": \"21端口\", \"status\": \"已完成\",\"vulnerability\": \"ftp匿名登录\"}; +4.发现漏洞后,可根据漏洞类型决定是否新增子节点继续测试,如:{\"action\": \"add_node\", \"parent\": \"21端口\", \"node\": \"ftp配置检查\", \"status\": \"未完成\"}; +生成的渗透测试指令需满足如下约束: +1.只返回具体的shell指令或Python代码,不要包含注释和说明; +2.shell指令以```bash-[对应节点的路径](.*?)```包裹,python代码以```python-[对应节点的路径](.*?)```包裹,[对应节点的路径]为从根节点到目标节点的完整层级描述; +3.若提供的是shell指令,需要避免用户再次交互; +4.若提供的是python代码,主函数名为dynamic_fun,需包含错误处理,执行结束后必须返回一个tuple (status, output),其中status为'success'或'failure',output为补充输出信息; +示例: +```python-[目标系统->192.168.1.100->3306端口] +def dynamic_fun(): + try: + result = "扫描完成" + return ("success", result) + except Exception as e: + return ("failure", str(e)) +``` +限制条件: +1.仅在发现高危漏洞或关键测试路径时新增节点 + '''}] # 一个messages user_Prompt = f''' - 当前分支路径:目标系统->{target} - 当前节点信息: - - 节点名称:{target} - - 节点状态:未完成 - - 漏洞类型:未发现 - 上一步结果:已知信息-{know_info} - 任务:生成下一步渗透测试指令或结束该节点的渗透测试(修改节点状态为:已完成)。 +当前分支路径:目标系统->{target} +当前节点信息: +- 节点名称:{target} +- 节点状态:未完成 +- 漏洞类型:未发现 +上一步结果:{know_info} +任务:生成下一步渗透测试指令或结束该节点的渗透测试(修改节点状态为:已完成)。 ''' return user_Prompt @@ -126,7 +132,7 @@ class LLMManager: post_time = get_local_timestr() response = self.client.chat.completions.create( model=self.model, - messages = self.messages + messages = node.messages ) #LLM返回结果处理 @@ -156,17 +162,14 @@ class LLMManager: self.logger.error(f"{node.name}-llm入库失败!") #需要对指令进行提取 - node_list = self.fetch_instruction(content,node) + node_cmds,commands = self.fetch_instruction(content,node) - #********测试时使用---输出和记录LLM返回指令的message - print(f"Messages:{self.messages}") - with open("../test", "w", encoding="utf-8") as f: #输出到文件 - json.dump(self.messages,f,ensure_ascii=False) - - return node_list + return node_cmds,commands def fetch_instruction(self,response_text,node): ''' + *****该函数很重要,需要一定的容错能力,解析LLM返回内容***** + 处理边界:只格式化分析LLM返回内容,指令和节点操作等交其他模块。 节点控制指令 渗透测试指令 提取命令列表,包括: @@ -211,7 +214,7 @@ class LLMManager: #其他的认为是节点操作指令 node_cmds.append(part) - return node_cmds,commands #?存在一个问题:这样分list返回,执行顺序会丢失 + return node_cmds,commands def test_llm(self): with open("../test", "r", encoding="utf-8") as f: diff --git a/tools/PythoncodeTool.py b/tools/PythoncodeTool.py index 1aeeb2d..c93618f 100644 --- a/tools/PythoncodeTool.py +++ b/tools/PythoncodeTool.py @@ -6,7 +6,7 @@ import builtins import re from tools.ToolBase import ToolBase -class PythonTool(ToolBase): +class PythoncodeTool(ToolBase): def is_safe_code(self,code): # List of high-risk functions to block (can be adjusted based on requirements) diff --git a/tools/ToolBase.py b/tools/ToolBase.py index 75557f0..c5b14b6 100644 --- a/tools/ToolBase.py +++ b/tools/ToolBase.py @@ -29,6 +29,7 @@ class ToolBase(abc.ABC): parser = argparse.ArgumentParser(add_help=False) #创建命令行参数解析器对象‌ parser.add_argument("-o ", "--output", type=str) #添加需要解析的参数规则‌ parser.add_argument("-oN ", "--Noutput", type=str) #nmap + parser.add_argument("-oG ","--NMG",type=str) #nmap parser.add_argument("-output ", "--nikto", type=str) #nikto args, _ = parser.parse_known_args(command.split()[1:]) return args @@ -94,6 +95,10 @@ class ToolBase(abc.ABC): file_out = self.read_output_file(parsed_arg.nikto) stderr = "" stdout = file_out + elif parsed_arg.NMG: + file_out = self.read_output_file(parsed_arg.NMG) + stderr = "" + stdout = file_out else: stderr = result.stderr stdout = result.stdout