diff --git a/InstructionManager.py b/InstructionManager.py index 21f809f..ad8420c 100644 --- a/InstructionManager.py +++ b/InstructionManager.py @@ -50,6 +50,7 @@ class InstructionManager: ext_params = None tool_name_tmp = instruction.split()[0] # 提取工具名称 tool_name = tool_name_tmp.replace("-","") + tool_name = tool_name.replace("_", "") # 检查是否存在对应工具 if tool_name in self.tool_registry: tool = self.tool_registry[tool_name] @@ -62,7 +63,7 @@ class InstructionManager: instr = instruction #保障后续代码的一致性 source_result = result = f"未知工具:{tool_name}" ext_params = ReturnParams() - ext_params["is_user"] = False # 是否要提交用户确认 -- 默认False + ext_params["is_user"] = True # 是否要提交用户确认 -- 默认False ext_params["is_vulnerability"] = False # 是否是脆弱点 print(f"执行指令:{instr}") print(f"未知工具:{tool_name}") diff --git a/TaskManager.py b/TaskManager.py index 7981507..a620a68 100644 --- a/TaskManager.py +++ b/TaskManager.py @@ -25,21 +25,18 @@ class TaskManager: if not self.DBM.connect(): self.logger.error("数据库连接失败!终止工作!") return - self.CCM = ControlCenter(self.DBM) + self.CCM = ControlCenter(self.DBM,self) self.InstrM = InstructionManager(self) # 类对象渗透,要约束只读取信息 # 控制最大并发指令数量 self.max_thread_num = 2 self.task_id = 0 #任务id -- - self.do_sn = 0 #指令执行顺序号--暂时已执行完成顺序记录 self.workth_list = [] #线程句柄list - self.batch_num = 0 #一个批次的指令数量 - self.long_instr_num = 0 #耗时指令数量 - self.long_time_instr = ['nikto'] #耗时操作不计入批量执行的数量,不加不减 - self.node_queue = [] #2025-3-19修改为list,兼容pickle反序列化 + # self.long_instr_num = 0 #耗时指令数量 + # self.long_time_instr = ['nikto'] #耗时操作不计入批量执行的数量,不加不减 + self.node_queue = queue.Queue() self.lock = threading.Lock() #线程锁 self.node_num = 0 #在处理Node线程的处理 - self.do_sn_lock = threading.Lock() #指令执行顺序号锁 self.brun = True def res_in_quere(self,bres,instr,reslut,start_time,end_time,th_DBM,source_result,ext_params,work_node): @@ -60,6 +57,7 @@ class TaskManager: #结果入队列---2025-3-18所有的指令均需返回给LLM便于节点状态的更新,所以bres作用要调整。 res = {'执行指令':instr,'结果':reslut} + work_node.llm_type = 1 work_node.add_res(res) #入节点结果队列 def do_worker_th(self): @@ -67,40 +65,34 @@ class TaskManager: th_DBM = DBManager() th_DBM.connect() while self.brun: - work_node = None - with self.lock: - if self.node_queue: - # 获取测试节点 - work_node = self.node_queue.pop(0) - #测试时使用 + try: + work_node = self.node_queue.get(block=False) + # 测试时使用 + with self.lock: self.node_num += 1 - - if work_node: - #开始执行指令 - for instruction in work_node.instr_queue: - start_time = get_local_timestr() #指令执行开始时间 + # 开始执行指令 + while work_node.instr_queue: + #for instruction in work_node.instr_queue: #这里要重新调整#? + instruction = work_node.instr_queue.pop(0) + start_time = get_local_timestr() # 指令执行开始时间 bres, instr, reslut, source_result, ext_params = self.InstrM.execute_instruction(instruction) end_time = get_local_timestr() # 指令执行结束时间 self.res_in_quere(bres, instr, reslut, start_time, end_time, th_DBM, source_result, - ext_params,work_node) # 执行结果入队列 + ext_params, work_node) # 执行结果入队列 # #针对一个节点的指令执行完成后,提交LLM规划下一步操作 - # node_list = self.CCM.get_llm_instruction(work_node) - # if not node_list:#该节点测试完成----没有返回新指令 - # continue #取下一个节点 - # for node in node_list: #新指令入队列 - # self.node_queue.put(node) + #self.CCM.llm_quere.put(work_node) # 保存记录--测试时使用--后期增加人为停止测试时可以使用 with self.lock: self.node_num -= 1 - if self.node_num == 0 and len(self.node_queue) == 0: # + if self.node_num == 0 and self.node_queue.empty(): # with open("attack_tree", 'wb') as f: pickle.dump(TM.CCM.attack_tree, f) - else: #没有带处理的Node - time.sleep(10) - #函数结束,局部变量自动释放 + except queue.Empty: + self.logger.debug("暂无需要执行指令的节点!") + time.sleep(20) def start_task(self,target_name,target_in): #判断目标合法性 @@ -112,10 +104,7 @@ class TaskManager: #获取基本信息: 读取数据库或预生成指令,获取基本的已知信息 know_info = "无" #? #启动--初始化指令 - node_list = self.CCM.start_do(target,self.task_id) - with self.lock: - for node in node_list: - self.node_queue.append(node) + self.CCM.start_do(target,self.task_id) #创建工作线程----2025-3-18调整为一个节点一个线程, for i in range(self.max_thread_num): @@ -146,7 +135,13 @@ if __name__ == "__main__": strMsg = FM.read_file("test",1) test_type = 2 - if test_type == 1: + if test_type == 0: #新目标测试 + # 启动--初始化指令 + node_list = TM.CCM.start_do("192.168.204.137", 0) + #异步处理,需要等待线程结束了 + for th in TM.CCM.llmth_list: + th.join() + elif test_type == 1: #测试执行指令 with open("attack_tree", "rb") as f: TM.CCM.attack_tree = pickle.load(f) @@ -154,7 +149,7 @@ if __name__ == "__main__": nodes = TM.CCM.attack_tree.traverse_bfs() for node in nodes: if node.instr_queue: # list - TM.node_queue.append(node) + TM.node_queue.put(node) #创建线程执行指令 for i in range(TM.max_thread_num): @@ -169,22 +164,50 @@ if __name__ == "__main__": with open("attack_tree", "rb") as f: TM.CCM.attack_tree = pickle.load(f) #遍历node,查看有res的数据 + iput_index = 2 #0是根节点 + iput_max_num = 1 + iput_num = 0 nodes = TM.CCM.attack_tree.traverse_bfs() - for node in nodes: - if node.res_quere: #有结果需要提交LLM - node_list = TM.CCM.get_llm_instruction(node) #包括对节点操作 - # 暂存状态-- - with open("attack_tree", 'wb') as f: - pickle.dump(TM.CCM.attack_tree, f) - elif test_type == 0: #新目标测试 - # 启动--初始化指令 - node_list = TM.CCM.start_do("192.168.204.137", 0) - with TM.lock: - for node in node_list: - TM.node_queue.append(node) - #暂存状态-- - with open("attack_tree",'wb') as f: - pickle.dump(TM.CCM.attack_tree,f) + if iput_index != -1:#index 不为-1就是指定节点返回,人为保障不越界 + node = nodes[iput_index] + if node.res_quere: + TM.CCM.llm_quere.put(node) + else: + for node in nodes: + if node.res_quere: #有结果需要提交LLM + TM.CCM.llm_quere.put(node) + iput_num += 1 + if iput_max_num > 0: #0是有多少提交多少 + if iput_num == iput_max_num: + break + + #创建llm工作线程 + TM.CCM.brun = True + for i in range(TM.CCM.max_thread_num): + l_th = threading.Thread(target=TM.CCM.th_llm_worker()) + l_th.start() + TM.CCM.llmth_list.append(l_th) + # 等待线程结束 + for t in TM.CCM.llmth_list: + t.join() + elif test_type ==3: #执行指定指令 + instrlist=[''' +ftp -n 192.168.204.137 << EOF +user anonymous anonymous@example.com +ls +bye +EOF + ''',"wget -m --no-passive ftp://anonymous:anonymous@192.168.204.137/"] + for instr in instrlist: + start_time = get_local_timestr() # 指令执行开始时间 + bres, instr, reslut, source_result, ext_params = TM.InstrM.execute_instruction(instr) + end_time = get_local_timestr() # 指令执行结束时间 + # 入数据库 -- bres True和False 都入数据库2025-3-10---加node_path(2025-3-18)#? + if TM.DBM.ok: + TM.DBM.insetr_result(0, instr, reslut, 0, start_time, end_time, source_result, + ext_params, "独立命令执行") + else: + TM.logger.error("数据库连接失败!!") elif test_type == 4: #读取messages with open("attack_tree", "rb") as f: TM.CCM.attack_tree = pickle.load(f) diff --git a/config.yaml b/config.yaml index 4ef50a3..061f49e 100644 --- a/config.yaml +++ b/config.yaml @@ -15,6 +15,7 @@ mysql: #LLM-Type LLM_type: 1 #0-腾讯云,1-DS,2-GPT +LLM_max_chain_count: 10 #为了避免推理链过长,造成推理效果变差,应该控制一个推理链的长度上限 #用户初始密码 pw: zfkj_123!@# diff --git a/mycode/AttackMap.py b/mycode/AttackMap.py index ab2bf4d..8c0d893 100644 --- a/mycode/AttackMap.py +++ b/mycode/AttackMap.py @@ -52,25 +52,45 @@ class AttackTree: return node return None + def find_node_by_nodepath_parent(self,node_path,node): + node_names = node_path.split('->') + node_name = node_names[-1] + if node_name == node.name:#当前节点 + return node + else: + if node_names[-2] == node.name: #父节点是当前节点 + for child_node in node.children: + if child_node.name == node_name: + return child_node + #走到这说明没有匹配到-则新建一个节点 + newNode = TreeNode(node_name) + node.add_child(newNode) + return newNode + else: + return None #约束:不处理 + def find_node_by_nodepath(self,node_path): '''基于节点路径查找节点,只返回找到的第一个节点,若有节点名称路径重复的情况暂不处理''' current_node = self.root #从根节点开始 node_names = node_path.split('->') + layer_num = 0 for node_name in node_names: if node_name == "目标系统": + layer_num +=1 continue if node_name == current_node.name:#根节点开始 + layer_num += 1 continue else: bfound = False for child_node in current_node.children: if child_node.name == node_name: #约束同一父节点下的子节点名称不能相同 current_node = child_node + layer_num += 1 bfound = True break if not bfound: #如果遍历子节点都没有符合的,说明路径有问题的,不处理中间一段路径情况 return None - #找到的话,就开始匹配下一层 return current_node def find_nodes_by_status(self, status): @@ -116,11 +136,14 @@ class TreeNode: self.children = [] # 子节点列表 self.parent = None # 父节点引用 self.path = "" #当前节点的路径 - self.instr_queue = [] #queue.Queue() #针对当前节点的执行指令----重要约束:一个节点只能有一个线程在执行指令 - self.res_quere = [] #queue.Queue() #指令执行的结果,一批一批 + + self.messages = [] # 针对当前节点积累的messages -- 针对不同节点提交不同的messages + self.llm_type = 0 #llm提交类型 0--初始状态无任务状态,1--指令结果反馈,2--llm错误反馈 self.llm_sn = 0 #针对该节点llm提交次数 self.do_sn = 0 #针对该节点instr执行次数 - self.messages = [] #针对当前节点积累的messages -- 针对不同节点提交不同的messages + self.instr_queue = [] # queue.Queue() #针对当前节点的执行指令----重要约束:一个节点只能有一个线程在执行指令 + self.res_quere = [] # queue.Queue() #指令执行的结果,一批一批 + def add_child(self, child_node): child_node.parent = self diff --git a/mycode/ControlCenter.py b/mycode/ControlCenter.py index e761912..720ad8d 100644 --- a/mycode/ControlCenter.py +++ b/mycode/ControlCenter.py @@ -3,23 +3,33 @@ import json import re import queue +import time +import threading +import pickle from mycode.AttackMap import AttackTree from mycode.AttackMap import TreeNode from mycode.LLMManager import LLMManager from myutils.ConfigManager import myCongif #单一实例 from myutils.MyLogger_logger import LogHandler +from mycode.DBManager import DBManager class ControlCenter: - def __init__(self,DBM): + def __init__(self,DBM,TM): self.logger = LogHandler().get_logger("ControlCenter") self.task_id = None self.target = None self.attack_tree = None self.DBM = DBM + self.TM = TM #LLM对象 self.LLM = LLMManager(myCongif.get_data("LLM_type")) + self.llm_quere = queue.Queue() #提交LLM指令的队列---- + self.max_thread_num = 1 # 控制最大并发指令数量 + self.llmth_list = [] #llm线程list + self.brun = False def __del__(self): + self.brun =False self.task_id = None self.target = None self.attack_tree = None @@ -32,91 +42,281 @@ class ControlCenter: def get_user_init_info(self): '''开始任务初,获取用户设定的基础信息''' # ?包括是否对目标进行初始化的信息收集 - return "无" + return {"已知信息":"无"} def start_do(self,target,task_id): '''一个新任务的开始''' self.task_id = task_id self.target = target - #创建测试树 + #创建/初始化测试树 if self.attack_tree: self.attack_tree = None #释放 root_node = TreeNode(target) self.attack_tree = AttackTree(root_node)#创建测试树,同时更新根节点相关内容 - #获取初始指令 + #初始化启动提示信息 know_info = self.get_user_init_info() - prompt = self.LLM.build_initial_prompt(target,know_info,root_node) - node_cmds, commands = self.LLM.get_llm_instruction(prompt,self.DBM,root_node) - # 更新tree - self.tree_manager(node_cmds,root_node) - # 分析指令入对应节点 - node_list = self.instr_in_node(commands,root_node) - return node_list + self.LLM.build_initial_prompt(root_node) + #提交到待处理队列 + self.put_one_llm_work(know_info,root_node,1) + + # 启动LLM请求提交线程 + self.brun = True + #启动线程 + ineed_create_num = self.max_thread_num - len(self.llmth_list) #正常应该就是0或者是max_num + for i in range(ineed_create_num): + l_th = threading.Thread(target=self.th_llm_worker) + l_th.start() + self.llmth_list.append(l_th) + + def put_one_llm_work(self,str_res,node,llm_type): + '''提交任务到llm_quere''' + if llm_type == 0: + self.logger.debug("llm_type不能设置为0") + return + node.llm_type = llm_type #目前处理逻辑中一个node应该只会在一个queue中,且只会有一次记录。所以llm_type复用 + node.res_quere.append(str_res) + #提交到待处理队列 + self.llm_quere.put(node) + + def get_one_llm_work(self,node): + '''获取该节点的llm提交数据,会清空type和res_quere''' + llm_type = node.llm_type + node.llm_type = 0 #回归0 + res_list = node.res_quere[:] #复制独立副本 + node.res_quere.clear() #清空待处理数据 + return llm_type,res_list + + #llm请求提交线程 + def th_llm_worker(self):#LLM没有修改的全局变量,应该可以共用一个client + ''' + 几个规则--TM的work线程同 + 1.线程获取一个节点后,其他线程不能再获取这个节点(遇到被执行的节点,直接放弃执行)--- 加了没办法保存中间结果进行测试 + 2.llm返回的指令,只可能是该节点,或者是子节点的,不符合这条规则的都不处理,避免llm处理混乱。 + :return: + ''' + # 线程的dbm需要一个线程一个 + th_DBM = DBManager() + th_DBM.connect() + while self.brun: + try: + #节点锁 + node = self.llm_quere.get(block=False) + self.get_llm_instruction(node,th_DBM) + #释放锁 + + # 暂存状态--测试时使用 + with open("attack_tree", 'wb') as f: + pickle.dump(self.attack_tree, f) + + except queue.Empty: + self.logger.debug("llm队列中暂时无新的提交任务!") + time.sleep(30) #约束1:一个节点只能同时提交一次,未测试的节点不要重复 - def get_llm_instruction(self,node): - #拼接结果字符串--由于测试需要queue改成了list 2025-3-19 - # json_strings = [] - # if node.res_quere: - # for item in node.res_quere: - # json_str = json.dumps(item, ensure_ascii=False) - # json_strings.append(json_str) - # res_str = ','.join(json_strings) - res_str = json.dumps(node.res_quere,ensure_ascii=False) - #构造本次提交的prompt + def get_llm_instruction(self,node,DBM): + user_Prompt = "" + ext_Prompt = "" + llm_type, res_list = self.get_one_llm_work(node) + try: + res_str = json.dumps(res_list, ensure_ascii=False) + except TypeError as e: + self.logger.error(f"{res_list}序列化失败:{e},需要自查程序添加代码!") + return # 直接返回 + if llm_type == 0: + self.logger.error("这里type不应该为0,请自查程序逻辑!") + return + #2025-3-20增加了llm_type user_Prompt = f''' - 当前分支路径:{node.path} - 当前节点信息: - - 节点名称:{node.name} - - 节点状态:{node.status} - - 漏洞类型:{node.vul_type} - 上一步结果:{res_str} - 任务:生成下一步渗透测试指令或结束该节点渗透测试。 - ''' - node_cmds,commands = self.LLM.get_llm_instruction(user_Prompt,self.DBM,node) - #更新tree - self.tree_manager(node_cmds,node) - #分析指令入对应节点 - node_list = self.instr_in_node(commands,node) - return node_list +当前分支路径:{node.path} +当前节点信息: +- 节点名称:{node.name} +- 节点状态:{node.status} +- 漏洞类型:{node.vul_type} +''' + if llm_type == 1: #提交指令执行结果 --- 正常提交 + # 构造本次提交的prompt + ext_Prompt = f''' +上一步结果:{res_str} +任务:生成下一步渗透测试指令或判断是否完成该节点测试。 +''' + elif llm_type ==2: #llm返回的指令存在问题,需要再次请求返回 + ext_Prompt = f''' +反馈类型:节点指令格式错误 +错误信息:{res_str} +任务:请按格式要求重新生成该节点上一次返回中生成的所有指令。 +''' + elif llm_type ==3: #已生成节点,但未生成测试指令 + ext_Prompt = f''' +反馈类型:需要继续补充信息 +缺失信息:{res_str} +任务: +1.请生成这些节点的测试指令; +2.这些节点的父节点为当前节点,请正确生成这些节点的节点路径; +3.若还有节点未能生成测试指令,必须返回未生成指令的节点列表。 +''' + elif llm_type ==4: #未生成节点列表 + ext_Prompt = f''' +反馈类型:需要继续补充信息 +缺失信息:{res_str} +任务: +1.请生成这些节点的新增节点指令,并生成对应的测试指令; +2.这些节点的父节点为当前节点,请正确生成这些节点的节点路径; +3.若节点未能全部新增,必须返回未新增的节点列表 +4.若有未生成指令的节点,必须返回未生成指令的节点列表。 +''' + elif llm_type ==5: + ext_Prompt = f''' +反馈类型:测试指令格式错误 +错误信息:{res_str} +任务:请根据格式要求,重新生成该测试指令。 +''' + else: + self.logger.debug("意外的类型参数") + return + if not ext_Prompt: + self.logger.error("未成功获取本次提交的user_prompt") + return + #提交LLM + user_Prompt = user_Prompt + ext_Prompt + node_cmds, commands = self.LLM.get_llm_instruction(user_Prompt,DBM, node) # message要更新 + ''' + 对于LLM返回的错误处理机制 + 1.验证节点是否都有测试指令返回 + 2.LLM的回复开始反复时(有点难判断) + ''' + # 更新tree + bok = self.tree_manager(node_cmds, node,commands) + # 分析指令入对应节点 + if bok: #节点指令若存在错误,测试指令都不处理,需要LLM重新生成 + node_list = self.instr_in_node(commands, node) + # 插入TM的node_queue中,交TM线程处理---除了LLM在不同的请求返回针对同一节点的测试指令,正常业务不会产生两次进队列 + for node in node_list: + self.TM.node_queue.put(node) + + def verify_node_cmds(self,node_cmds,node): + ''' + 验证节点指令的合规性,持续维护 + :param node_cmds: + :param node: + :return: Flase 存在问题, True 合规 + ''' + strerror = "" + for node_json in node_cmds: + if "action" not in node_json: + self.logger.error(f"缺少action节点:{node_json}") + strerror = {"节点指令错误":f"{node_json}缺少action节点,不符合格式要求!"} + break + action = node_json["action"] + if action == "add_node": + if "parent" not in node_json or "status" not in node_json or "nodes" not in node_json: + strerror = {"节点指令错误": f"{node_json}不符合格式要求,缺少节点!"} + break + elif action == "update_status": + if "status" not in node_json or "node" not in node_json: + strerror = {"节点指令错误": f"{node_json}不符合格式要求,缺少节点!"} + break + elif action =="no_instruction" or action=="no_create": + if "nodes" not in node_json: + strerror = {"节点指令错误": f"{node_json}不符合格式要求,缺少节点!"} + break + else: + strerror = {"节点指令错误": f"{node_json}不可识别的action值!"} + break + if not strerror: + return True + #提交一个错误反馈任务 + self.put_one_llm_work(strerror,node,2) + return False - def tree_manager(self,node_cmds,node): + def tree_manager(self,node_cmds,node,commands): '''更新渗透测试树 node_cmds是json-list + 2025-03-22添加commands参数,用于处理LLM对同一个节点返回了测试指令,但还返回了no_instruction节点指令 ''' - if not node_cmds or len(node_cmds)==0: - return + if not node_cmds: # or len(node_cmds)==0: 正常not判断就可以有没有节点指令 + return True + #对节点指令进行校验 + if not self.verify_node_cmds(node_cmds,node): + return False #节点指令存在问题,终止执行 + #执行节点操作 for node_json in node_cmds: - if node_json["action"] == "add_node": #新增节点 + action = node_json["action"] + if action == "add_node": #新增节点 parent_node_name = node_json["parent"] - node_name = node_json["node"] - status = node_json["status"] - #新增节点原则上应该都是当前节点增加子节点 + # 新增节点原则上应该都是当前节点增加子节点 if node.name == parent_node_name: - new_node = TreeNode(node_name,status) - node.add_child(new_node) #message的传递待验证 + status = node_json["status"] + node_names = node_json["nodes"].split(',') + for node_name in node_names: + #判重 + bfind = False + for node_child in node.children: + if node_child.name == node_name: + bfind = True + break + if not bfind: + #添加节点 + new_node = TreeNode(node_name,status) + node.add_child(new_node) #message的传递待验证 else: - self.logger.error("添加子节点时,遇到父节点名称不一致的,需要介入!!") - elif node_json["action"] == "update_status": + self.logger.error(f"添加子节点时,遇到父节点名称不一致的,需要介入!!{node_json}") #丢弃该节点 + elif action == "update_status": node_name = node_json["node"] status = node_json["status"] - vul_type = node_json["vulnerability"] + vul_type = "未发现" + if "vulnerability" in node_json: + vul_type = json.dumps(node_json["vulnerability"]) if node.name == node_name: node.status = status node.vul_type = vul_type else: - self.logger.error("遇到不是修改本节点状态的,需要介入!!") + self.logger.error(f"遇到不是修改本节点状态的,需要介入!!{node_json}") + elif action == "no_instruction": + #返回的未生成指令的数据进行校验:1.要有数据;2.节点不是当前节点就是子节点 + nodes = [] + node_names = node_json["nodes"].split(',') + for node_name in node_names: + #先判断是否在测试指令中,若在则不提交llm任务,只能接受在一次返回中同一节点有多条测试指令,不允许分次返回 + bcommand = False + for com in commands: + if node_name in com: + bcommand = True + break + if bcommand: #如果存在测试指令,则不把该节点放入补充信息llm任务 + continue + if node_name == node.name: + nodes.append(node_name) + else: + for child_node in node.children: + if child_node.name == node_name: + nodes.append(node_name) + break + if nodes: #找到对应的节点才返回 + str_nodes = ",".join(nodes) + str_add = {"已新增但未生成测试指令的节点":str_nodes} + # 提交一个错误反馈任务--但继续后续工作 + self.put_one_llm_work(str_add, node, 3) + self.logger.debug(f"已新增但未生成指令的节点有:{nodes}") + elif action == "no_create": + nodes = node_json["nodes"] + if nodes: + str_add = {"未新增的节点": nodes} + # 提交一个错误反馈任务--但继续后续工作 + self.put_one_llm_work(str_add, node, 4) + self.logger.debug(f"未新增的节点有:{nodes}") else: - self.logger.error("node更新JSON遇到异常参数!") + self.logger.error("****不应该执行到这!程序逻辑存在问题!") + return True def instr_in_node(self,commands,node): - node_list = [] + node_list = [] #一次返回的测试指令 for command in commands: # 使用正则匹配方括号中的node_path(非贪婪模式) match = re.search(r'\[(.*?)\]', command) if match: node_path = match.group(1) - find_node = self.attack_tree.find_node_by_nodepath(node_path) + #'''强制约束,不是本节点或者是子节点的指令不处理''' + find_node = self.attack_tree.find_node_by_nodepath_parent(node_path,node) if find_node: instruction = re.sub(r'\[.*?\]', "", command,count=1,flags=re.DOTALL) find_node.instr_queue.append(instruction) @@ -124,9 +324,13 @@ class ControlCenter: if find_node not in node_list: node_list.append(find_node) else: - self.logger.error(f"基于节点路径没有找到对应的节点{node_path}") + self.logger.error(f"基于节点路径没有找到对应的节点{node_path},父节点都没找到!")#丢弃该指令 else: - self.logger.error(f"得到的指令格式不符合规范:{command}") + self.logger.error(f"得到的指令格式不符合规范:{command}")#丢弃该指令--- + #这里对于丢弃指令,有几种方案: + # 1.直接丢弃不处理,但需要考虑会不会产生节点缺失指令的问题,需要有机制验证节点;------ 需要有个独立线程,节点要加锁--首选待改进方案 + # 2.入当前节点的res_queue,但有可能当前节点没有其他指令,不会触发提交,另外就算提交,会不会产生预设范围外的返回,不确定; + # 3.独立队列处理 return node_list #待修改 @@ -170,4 +374,8 @@ class ControlCenter: #清空数据 self.task_id = None self.target = None - self.attack_tree = None \ No newline at end of file + self.attack_tree = None + #停止llm处理线程 + self.brun =False + for th in self.llmth_list: + th.jion() \ No newline at end of file diff --git a/mycode/DBManager.py b/mycode/DBManager.py index 85748b1..c25031e 100644 --- a/mycode/DBManager.py +++ b/mycode/DBManager.py @@ -148,6 +148,8 @@ class DBManager: #指令执行结果入库 def insetr_result(self,task_id,instruction,result,do_sn,start_time,end_time,source_result,ext_params,node_path): + str_result = "" + str_source_result = "" # 统一将 result 转为 JSON 字符串(无论原始类型) try: if not isinstance(result, str): @@ -157,7 +159,17 @@ class DBManager: json.loads(result) str_result = result except (TypeError, json.JSONDecodeError): - str_result = json.dumps(str(result)) # 兜底处理非 JSON 字符串 + str_result = json.dumps(str(result),ensure_ascii=False) # 兜底处理非 JSON 字符串 + + try: + if not isinstance(source_result, str): + str_source_result = json.dumps(source_result, ensure_ascii=False) + else: + # 如果是字符串,先验证是否为合法 JSON(可选) + json.loads(source_result) + str_source_result = source_result + except (TypeError, json.JSONDecodeError): + str_source_result = json.dumps(str(source_result),ensure_ascii=False) # 兜底处理非 JSON 字符串 # 使用参数化查询 sql = """ @@ -176,7 +188,7 @@ class DBManager: str_content = "" try: if not isinstance(reasoning_content, str): - str_reasoning = json.dumps(reasoning_content, ensure_ascii=False) + str_reasoning = json.dumps(reasoning_content) #,ensure_ascii=False else: # 如果是字符串,先验证是否为合法 JSON(可选) json.loads(reasoning_content) @@ -186,7 +198,7 @@ class DBManager: try: if not isinstance(content, str): - str_content = json.dumps(content, ensure_ascii=False) + str_content = json.dumps(content) else: # 如果是字符串,先验证是否为合法 JSON(可选) json.loads(content) diff --git a/mycode/LLMManager.py b/mycode/LLMManager.py index e6b7884..b3818f7 100644 --- a/mycode/LLMManager.py +++ b/mycode/LLMManager.py @@ -17,12 +17,10 @@ class LLMManager: self.api_key = None self.api_url = None self.task_id =0 #一个任务一个id - self.llm_sn = 0 # llm执行序列号,--一任务一序列 - self.llm_sn_lock = threading.Lock() # + # self.llm_sn = 0 # llm执行序列号,--一任务一序列 + # self.llm_sn_lock = threading.Lock() # #temperature设置 #DS------代码生成/数学解题:0.0 -- 数据抽取/分析:1.0 -- 通用对话:1.3 -- 翻译:1.3 -- 创意类写作:1.5 - #腾讯云--- - self.temperature = 1.0 if illm_type == 0: #腾讯云 self.api_key = "fGBYaQLHykBOQsFwVrQdIFTsYr8YDtDVDQWFU41mFsmvfNPc" self.api_url = "" @@ -38,12 +36,6 @@ class LLMManager: self.model = "" self.client = OpenAI() - self.messages = [] - # 初始化阶段和已知信息 - current_stage = "信息收集" - known_info = {"url": "www.test.com"} - results = [] # 存储所有任务结果,用于生成报告 - #******测试使用,设置slef.message的值 def test_old_message(self,strMessage): try: @@ -53,8 +45,8 @@ class LLMManager: except Exception as e: print(f"错误: {str(e)}") - # 构建初始提示 初始化messages - def build_initial_prompt(self,target,know_info="无",node=None): + # 初始化messages + def build_initial_prompt(self,node): if not node: return None '''暂时未添加到提示词 @@ -65,21 +57,24 @@ class LLMManager: #根节点初始化message node.messages = [{"role": "system", "content":''' -你是一位渗透测试专家,负责动态控制整个渗透测试过程,根据当前测试状态和返回结果,决定下一步测试指令,推动测试前进,直至完成渗透测试。本地程序会执行你的指令进并返回结果。 +你是一位渗透测试专家,基于你的专业知识和经验,负责动态控制整个渗透测试过程,根据当前测试状态和返回结果,决定下一步测试指令,推动测试前进,直至完成渗透测试。本地程序会执行你的指令进并返回结果。 **总体要求** 1.以目标系统IP为根节点,每个渗透测试点(如端口、服务、漏洞点)作为子节点,形成树型结构(测试树); -2.测试树整体数据由本地程序存储,你只需要关注当前节点的测试推进、状态更新(未完成/已完成)及子节点新增; +2.测试树整体数据由本地程序存储,你只需要关注当前节点的测试推进、状态更新(未完成/已完成)及完整新增子节点; 3.返回两类指令:节点指令和测试指令,以空行间隔,不要包含注释和说明; -4.若无节点修改或新增,节点指令可以为空,但测试指令必须对应已有节点。 +4.若一次性新增的节点过多,无法为每个节点都匹配测试指令,请优先保障新增节点的完整性,若有未生成指令的节点,必须返回已新增但未生成指令的节点列表。如果节点无法全部新增,必须返回未新增的节点列表; +5.若无节点修改,新增,未生成指令等数据,节点指令可以为空,但测试指令必须对应已有节点; **决策流程** 1. 若当前节点是IP且未进行端口扫描,则执行端口扫描; -2. 若端口扫描发现开放端口,为每个端口新增节点并提供测试指令; +2. 若端口扫描发现开放端口,对可能存在中高危以上风险的端口新增节点并提供测试指令; 3. 若当前节点是端口且未进行服务扫描,则执行服务扫描; 4. 若服务扫描发现服务版本或漏洞,则新增漏洞测试节点并提供测试指令; -5. 若漏洞利用成功,则根据结果决定是否新增子节点并提供测试指令; -6. 若节点测试无新信息,则更新状态为“已完成”。 +5. 若漏洞验证成功,则根据结果决定是否需要进一步测试,若需要进一步测试,则新增子节点并提供测试指令; +6. 若节点测试无新信息和测试指令,则更新状态为“已完成”。 **节点指令格式** -- 新增节点:{\"action\":\"add_node\", \"parent\": \"80端口\", \"node\": \"http://192.168.1.100/index.php?id=1\", \"status\": \"未完成\"}; +- 新增节点:{\"action\":\"add_node\", \"parent\": \"80端口\", \"nodes\": \"21端口,80端口,445端口,3306端口,1000端口,http://192.168.1.100/index.php?id=1\", \"status\": \"未完成\"}; +- 已新增但未生成指令的节点列表:{\"action\": \"no_instruction\", \"nodes\": \"3306端口,1000端口\"}; +- 未新增的节点列表:{\"action\": \"no_create\", \"nodes\": \"8080端口,8081端口,9000端口\"}; - 更新节点未发现漏洞:{\"action\": \"update_status\", \"node\": \"21端口\", \"status\": \"已完成\"}; - 更新节点发现漏洞:{\"action\": \"update_status\", \"node\": \"21端口\", \"status\": \"已完成\",\"vulnerability\": {\"name\":\"ftp匿名登录\",\"risk\":\"高\"}}; **测试指令格式** @@ -104,16 +99,6 @@ def dynamic_fun(): return ("failure", str(e)) ``` '''}] # 一个messages - user_Prompt = f''' -当前分支路径:目标系统->{target} -当前节点信息: -- 节点名称:{target} -- 节点状态:未完成 -- 漏洞类型:未发现 -上一步结果:{know_info} -任务:生成下一步渗透测试指令或结束该节点的渗透测试(修改节点状态为:已完成)。 - ''' - return user_Prompt def init_data(self,task_id=0): #初始化LLM数据 @@ -165,12 +150,11 @@ def dynamic_fun(): if not bres: self.logger.error(f"{node.name}-llm入库失败!") - #需要对指令进行提取 - node_cmds,commands = self.fetch_instruction(content,node) - + #按格式规定对指令进行提取 + node_cmds,commands = self.fetch_instruction(content) return node_cmds,commands - def fetch_instruction(self,response_text,node): + def fetch_instruction(self,response_text): ''' *****该函数很重要,需要一定的容错能力,解析LLM返回内容***** 处理边界:只格式化分析LLM返回内容,指令和节点操作等交其他模块。 @@ -209,29 +193,23 @@ def dynamic_fun(): continue if "PYTHON_BLOCK" in part: # 还原 Python 代码块 - commands.append(f"python_code {python_blocks[python_index]}") + commands.append(f"python-code {python_blocks[python_index]}") python_index += 1 elif "SHELL_BLOCK" in part: commands.append(shell_blocks[shell_index]) shell_index +=1 - else: - #其他的认为是节点操作指令--指令格式还存在不确定性,需要正则匹配 - pattern = re.compile(r'\{.*?\}', re.DOTALL) + else:#其他的认为是节点操作指令--指令格式还存在不确定性,需要正则匹配,要求是JSON + pattern = re.compile(r'\{.*\}', re.DOTALL) #贪婪模式会匹配到最后一个},能使用嵌套的JSON # 遍历所有匹配到的 JSON 结构 - for match in pattern.findall(part): - try: - node_cmds.append(json.loads(match)) # 解析 JSON 并添加到列表 - except json.JSONDecodeError as e:#解析不了的不入队列 - print(f"JSON 解析错误: {e}") + strlines = part.strip('\n') #按行拆分,避免贪婪模式下,匹配到多行的最后一个} + for strline in strlines: + for match in pattern.findall(strline): #正常只能有一个 + try: + node_cmds.append(json.loads(match)) # 解析 JSON 并添加到列表 + except json.JSONDecodeError as e:#解析不了的不入队列 + self.logger.error(f"LLM-{part}-JSON 解析错误: {e}") #这是需不需要人为介入? return node_cmds,commands - def llm_error_feedback(self,response_text,node): - '''验证llm返回--错误反馈机制--兼容LLM的不稳定性,需要持续补充 - 1.验证节点是否都有测试指令返回 - ''' - pass - - def test_llm(self): with open("../test", "r", encoding="utf-8") as f: messages = json.load(f) diff --git a/test.py b/test.py index e9cc61e..bc7d5c7 100644 --- a/test.py +++ b/test.py @@ -1,86 +1,72 @@ -import openai +import re import subprocess - -# 设置OpenAI API密钥 -openai.api_key = "fGBYaQLHykBOQsFwVrQdIFTsYr8YDtDVDQWFU41mFsmvfNPc" - -# 初始化阶段和已知信息 -current_stage = "信息收集" -known_info = {"url": "www.test.com"} -results = [] # 存储所有任务结果,用于生成报告 - - -# 构建初始提示 -def build_initial_prompt(stage, known_info): - return f"你是一个渗透测试专家,正在对网站{known_info['url']}进行渗透测试。当前阶段是{stage},已知信息是{known_info}。请生成下一步的指令。" - - -# 构建反馈提示 -def build_feedback_prompt(instruction, result): - return f"执行指令“{instruction}”的结果是“{result}”。请根据这个结果生成下一步的指令。" - - -# 调用LLM生成指令 -def get_llm_instruction(prompt): - response = openai.Completion.create( - engine="deepseek-r1", # 替换为你的模型 - prompt=prompt, - max_tokens=100 - ) - return response.choices[0].text.strip() - - -# 执行指令 -def execute_instruction(instruction): - # 示例:支持Nmap和dirb指令 - if "nmap" in instruction: - try: - result = subprocess.run(instruction, shell=True, capture_output=True, text=True) - return result.stdout if result.stdout else result.stderr - except Exception as e: - return f"执行失败:{str(e)}" - elif "dirb" in instruction: - try: - result = subprocess.run(instruction, shell=True, capture_output=True, text=True) - return result.stdout if result.stdout else result.stderr - except Exception as e: - return f"执行失败:{str(e)}" - else: - return "未知指令,请重新生成。" - - -# 主循环 -while current_stage != "报告生成": - # 构建提示并获取指令 - if not results: # 第一次执行 - prompt = build_initial_prompt(current_stage, known_info) - else: # 反馈结果 - prompt = build_feedback_prompt(last_instruction, last_result) - - instruction = get_llm_instruction(prompt) - print(f"生成的指令:{instruction}") - - # 执行指令 - task_result = execute_instruction(instruction) - print(f"任务结果:{task_result}") - results.append({"instruction": instruction, "result": task_result}) - - # 更新变量 - last_instruction = instruction - last_result = task_result - - # 示例阶段更新逻辑(可根据实际结果调整) - if current_stage == "信息收集" and "开放端口" in task_result: - current_stage = "漏洞扫描" - known_info["ports"] = "80, 443" # 示例更新已知信息 - elif current_stage == "漏洞扫描" and "扫描完成" in task_result: - current_stage = "漏洞利用" - # 添加更多阶段切换逻辑 - -# 生成测试报告 -report = "渗透测试报告\n" -report += f"目标网站:{known_info['url']}\n" -report += "测试结果:\n" -for res in results: - report += f"指令:{res['instruction']}\n结果:{res['result']}\n\n" -print(report) \ No newline at end of file +import tempfile +import os +import pexpect + + +def do_worker(str_instruction): + try: + # 使用 subprocess 执行 shell 命令 + result = subprocess.run(str_instruction, shell=True, text=True,capture_output=True) + + return { + "returncode": result.returncode, + "stdout": result.stdout, + "stderr": result.stderr + } + except Exception as e: + return {"error": str(e)} + +def do_worker_ftp_pexpect(str_instruction): + # 解析指令 + lines = str_instruction.strip().split('\n') + cmd_line = lines[0].split('<<')[0].strip() # 提取 "ftp -n 192.168.204.137" + inputs = [line.strip() for line in lines[1:] if line.strip() != 'EOF'] + + # 使用 pexpect 执行命令 + child = pexpect.spawn(cmd_line) + for input_line in inputs: + child.expect('.*') # 等待任意提示 + child.sendline(input_line) # 发送输入 + child.expect(pexpect.EOF) # 等待命令结束 + output = child.before.decode() # 获取输出 + child.close() + return output + +def do_worker_ftp_script(str_instruction): + # 创建临时文件保存输出 + with tempfile.NamedTemporaryFile(delete=False) as tmpfile: + output_file = tmpfile.name + + # 构建并执行 script 命令 + script_cmd = f"script -c '{str_instruction}' {output_file}" + result = subprocess.run(script_cmd, shell=True, text=True) + + # 读取输出文件内容 + with open(output_file, 'r') as f: + output = f.read() + + # 删除临时文件 + os.remove(output_file) + return output + +if __name__ == "__main__": + # 示例使用 + str_instruction = """ +ftp -n 192.168.204.137 << EOF +user anonymous anonymous@example.com +ls +bye +EOF + """ + output = do_worker(str_instruction) + print(f"*****\n{output}\n*****") + + output = do_worker_ftp_script(str_instruction) + lines = output.splitlines() + # 跳过第一行(Script started)和最后一行(Script done) + ftp_output = lines[1:-1] + strout = '\n'.join(ftp_output) + print("111111111") + print(strout) \ No newline at end of file diff --git a/tools/CurlTool.py b/tools/CurlTool.py index f2e39d0..291802d 100644 --- a/tools/CurlTool.py +++ b/tools/CurlTool.py @@ -234,7 +234,7 @@ class CurlTool(ToolBase): # 可选:保留完整的 verbose 信息以便后续分析 #info['verbose'] = stderr #转换成字符串 - result = json.dumps(info) + result = json.dumps(info,ensure_ascii=False) return result def analyze_result(self, result,instruction,stderr,stdout): diff --git a/tools/FtpTool.py b/tools/FtpTool.py index 0c8dd6f..fb0bea5 100644 --- a/tools/FtpTool.py +++ b/tools/FtpTool.py @@ -1,7 +1,10 @@ #Ftp import ftplib import re -import ipaddress +import os +import ipaddress +import subprocess +import tempfile from tools.ToolBase import ToolBase class FtpTool(ToolBase): @@ -41,16 +44,74 @@ class FtpTool(ToolBase): return res def validate_instruction(self, instruction): - #ftp暂时不做指令过滤和变化,只执行匿名攻击 - timeout = 0 - #lines = instruction.splitlines() - # if(len(lines) > 1): - # modified_code = "\n".join(lines[1:]) - # else: - # modified_code = "" - #print(modified_code) - modified_code = "ftp匿名登录测试" - return modified_code,timeout + timeout = 30 + #modified_code = "ftp匿名登录测试" + + return instruction,timeout + + def do_worker_subprocess(self,str_instruction,timeout,ext_params): + output = "" + stdout = "" + stderr = "" + try: + if timeout == 0: + result = subprocess.run(str_instruction, shell=True, capture_output=True, text=True) + elif timeout > 0: + result = subprocess.run(str_instruction, shell=True, capture_output=True, text=True, timeout=timeout) + else: + print("timeout参数错误") + stderr = result.stderr + stdout = result.stdout + except subprocess.TimeoutExpired as e: + stdout = e.stdout if e.stdout is not None else "" + stderr = e.stderr if e.stderr is not None else "" + ext_params.is_user = True # 对于超时的也需要人工进行确认,是否是预期的超时 + except Exception as e: + ext_params.is_user = True + return False, str_instruction, f"执行失败:{str(e)}", "", ext_params # 执行失败,提交给人工确认指令的正确性 + + output = stdout + if stderr: + output += stderr + if isinstance(output, bytes): # 若是bytes则转成str + output = output.decode('utf-8', errors='ignore') + return output + + def do_worker_script(self,str_instruction,timeout,ext_params): + # 创建临时文件保存输出 + with tempfile.NamedTemporaryFile(delete=False) as tmpfile: + output_file = tmpfile.name + + # 构建并执行 script 命令 + script_cmd = f"script -c '{str_instruction}' {output_file}" + try: + result = subprocess.run(script_cmd, shell=True, text=True,timeout=timeout) + # 读取输出文件内容 + with open(output_file, 'r') as f: + output = f.read() + lines = output.splitlines() + # 跳过第一行(Script started)和最后一行(Script done) + ftp_output = lines[1:-1] + output = '\n'.join(ftp_output) + except subprocess.TimeoutExpired: + output = "命令超时返回" + try: + with open(output_file, 'r') as f: + partial_output = f.read() + if partial_output: + output += f"\n部分输出:\n{partial_output}" + except FileNotFoundError: + pass # 文件可能未创建 + except subprocess.CalledProcessError as e: + output = f"错误: {e}" + finally: + # 删除临时文件 + try: + os.remove(output_file) + except FileNotFoundError: + pass # 文件可能未创建 + return output + #对于非sh命令调用的工具,自己实现命令执行的内容 def execute_instruction(self, instruction_old): @@ -61,17 +122,24 @@ class FtpTool(ToolBase): return False, instruction_old, "该指令暂不执行!","",ext_params # 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#? - # 第二步:执行指令 - #target = instruction_old.split()[1] #有赌的成分! - target = "" - for str in instruction_old.split(): - if self.is_ip_domain(str): - target = str - - if target: - output = self.test_anonymous_ftp_login(target) - else: - output = f"ftp指令未兼容{instruction_old}" + # 第二步:执行指令---需要对ftp指令进行区分判断 + pattern = re.compile(r'ftp\s+-n\s+\S+\s+<< EOF') + match = pattern.search(instruction) + if bool(match): #如果是 ftp -n 192.168.204.137 <0: result = subprocess.run(instruction, shell=True, capture_output=True, text=True, timeout=timeout) else: - print("timeout参数错误") + print("timeout参数错误,需要自查程序逻辑!") #output = result.stdout if result.stdout else result.stderr #-o 的命令需要处理 parsed_arg = self.parse_sublist3r_command(instruction) @@ -114,9 +114,11 @@ class ToolBase(abc.ABC): output = stdout if stderr: output += stderr + if isinstance(output,bytes):#若是bytes则转成str + output = output.decode('utf-8', errors='ignore') + analysis = self.analyze_result(output,instruction,stderr,stdout) - #指令和结果入数据库 - #? + if not analysis: #analysis为“” 不提交LLM ext_params.is_user = True return False,instruction,analysis,output,ext_params diff --git a/tools/WgetTool.py b/tools/WgetTool.py new file mode 100644 index 0000000..a0aa025 --- /dev/null +++ b/tools/WgetTool.py @@ -0,0 +1,11 @@ +from tools.ToolBase import ToolBase + +class WgetTool(ToolBase): + def validate_instruction(self, instruction): + #指令过滤 + timeout = 0 + return instruction,timeout + + def analyze_result(self, result,instruction,stderr,stdout): + #指令结果分析 + return result \ No newline at end of file