diff --git a/TaskManager.py b/TaskManager.py index 48be73a..a668d6e 100644 --- a/TaskManager.py +++ b/TaskManager.py @@ -28,7 +28,7 @@ class TaskManager: self.CCM = ControlCenter(self.DBM,self) self.InstrM = InstructionManager(self) # 类对象渗透,要约束只读取信息 # 控制最大并发指令数量 - self.max_thread_num = 2 + self.max_thread_num = 6 self.task_id = 0 #任务id -- self.workth_list = [] #线程句柄list # self.long_instr_num = 0 #耗时指令数量 @@ -38,6 +38,7 @@ class TaskManager: self.lock = threading.Lock() #线程锁 self.node_num = 0 #在处理Node线程的处理 self.brun = True + self.cookie = "" #cookie参数 def res_in_quere(self,bres,instr,reslut,start_time,end_time,th_DBM,source_result,ext_params,work_node): ''' @@ -57,8 +58,9 @@ class TaskManager: #结果入队列---2025-3-18所有的指令均需返回给LLM便于节点状态的更新,所以bres作用要调整。 res = {'执行指令':instr,'结果':reslut} + str_res = json.dumps(res,ensure_ascii=False) #直接字符串组合也可以-待验证 work_node.llm_type = 1 - work_node.add_res(res) #入节点结果队列 + work_node.add_res(str_res) #入节点结果队列 def do_worker_th(self): #线程的dbm需要一个线程一个 @@ -87,14 +89,21 @@ class TaskManager: with self.lock: self.node_num -= 1 if self.node_num == 0 and self.node_queue.empty(): # + self.logger.debug("此批次指令执行完成!") with open("attack_tree", 'wb') as f: - pickle.dump(TM.CCM.attack_tree, f) + pickle.dump(self.CCM.attack_tree, f) except queue.Empty: self.logger.debug("暂无需要执行指令的节点!") time.sleep(20) def start_task(self,target_name,target_in): + ''' + + :param target_name: 任务目标名字 + :param target_in: 任务目标访问地址 + :return: + ''' #判断目标合法性 bok,target,type = self.TargetM.validate_and_extract(target_in) if bok: @@ -134,8 +143,10 @@ if __name__ == "__main__": current_path = os.path.dirname(os.path.realpath(__file__)) strMsg = FM.read_file("test",1) - test_type = 5 - iput_index = 6 # 0是根节点 + test_type = 1 + instr_index = 19 + iput_index = -1 # 0是根节点 + indexs = [] if test_type == 0: #新目标测试 # 启动--初始化指令 node_list = TM.CCM.start_do("192.168.204.137", 0) @@ -147,10 +158,16 @@ if __name__ == "__main__": with open("attack_tree", "rb") as f: TM.CCM.attack_tree = pickle.load(f) # 遍历node,查看有instr的ndoe - nodes = TM.CCM.attack_tree.traverse_bfs() - for node in nodes: - if node.instr_queue: # list - TM.node_queue.put(node) + nodes = TM.CCM.attack_tree.traverse_dfs() + if indexs: + for index in indexs: + node = nodes[index] + if node.instr_queue: # list + TM.node_queue.put(node) + else: + for node in nodes: + if node.instr_queue: + TM.node_queue.put(node) #创建线程执行指令 for i in range(TM.max_thread_num): @@ -165,21 +182,18 @@ if __name__ == "__main__": with open("attack_tree", "rb") as f: TM.CCM.attack_tree = pickle.load(f) #遍历node,查看有res的数据 - iput_max_num = 1 + iput_max_num = 0 iput_num = 0 - nodes = TM.CCM.attack_tree.traverse_bfs() - if iput_index != -1:#index 不为-1就是指定节点返回,人为保障不越界 - node = nodes[iput_index] - if node.res_quere: - TM.CCM.llm_quere.put(node) + nodes = TM.CCM.attack_tree.traverse_dfs() + if indexs: + for index in indexs: + node = nodes[index] + if node.res_quere: + TM.CCM.llm_quere.put(node) else: for node in nodes: - if node.res_quere: #有结果需要提交LLM + if node.res_quere: TM.CCM.llm_quere.put(node) - iput_num += 1 - if iput_max_num > 0: #0是有多少提交多少 - if iput_num == iput_max_num: - break #创建llm工作线程 TM.CCM.brun = True @@ -191,25 +205,27 @@ if __name__ == "__main__": for t in TM.CCM.llmth_list: t.join() elif test_type ==3: #执行指定指令 - instrlist=[ - "msfconsole -q -x \"use auxiliary/scanner/smb/smb_version; set RHOSTS 192.168.204.137; run; exit\""] - + with open("attack_tree", "rb") as f: + TM.CCM.attack_tree = pickle.load(f) + # 遍历node,查看有instr的ndoe + nodes = TM.CCM.attack_tree.traverse_dfs() + instrlist = nodes[instr_index].instr_queue + # instrlist = [''' + # '''] for instr in instrlist: start_time = get_local_timestr() # 指令执行开始时间 bres, instr, reslut, source_result, ext_params = TM.InstrM.execute_instruction(instr) end_time = get_local_timestr() # 指令执行结束时间 - # 入数据库 -- bres True和False 都入数据库2025-3-10---加node_path(2025-3-18)#? - if TM.DBM.ok: - TM.DBM.insetr_result(0, instr, reslut, 0, start_time, end_time, source_result, - ext_params, "独立命令执行") - else: - TM.logger.error("数据库连接失败!!") + + res = {'执行结果': reslut} + str_res = json.dumps(res,ensure_ascii=False) # 直接字符串组合也可以-待验证 + print(str_res) elif test_type == 4: #修改Message with open("attack_tree", "rb") as f: TM.CCM.attack_tree = pickle.load(f) #创建一个新的节点 from mycode.AttackMap import TreeNode - testnode = TreeNode("test",0) + testnode = TreeNode("test",0,0) TM.CCM.LLM.build_initial_prompt(testnode)#新的Message systems = testnode.messages[0]["content"] #print(systems) @@ -222,9 +238,76 @@ if __name__ == "__main__": elif test_type ==5: #显示指令和结果list with open("attack_tree", "rb") as f: TM.CCM.attack_tree = pickle.load(f) - nodes = TM.CCM.attack_tree.traverse_bfs() - print(f"********\n{','.join(nodes[iput_index].instr_queue)}\n********") - print(f"&&&&&&&&\n{','.join(nodes[iput_index].res_quere)}\n&&&&&&&&") + nodes = TM.CCM.attack_tree.traverse_dfs() + if iput_index == -1: + for node in nodes: + print(f"----{node.path}-{node.status}----\n****instr_quere") + print(f"{','.join(node.instr_queue)}\n****res_quere") + try: + print(f"{','.join(node.res_quere)}") + except: + print(f"{json.dumps(node.res_quere)}") + elif iput_index == -2:#只输出有instruction的数据 + index = 0 + for node in nodes: + if node.instr_queue: + print(f"----{index}--{node.path}--{node.status}----") + print(f"{','.join(node.instr_queue)}") + index += 1 + else: + print(f"********\n{','.join(nodes[iput_index].instr_queue)}\n********") + print(f"&&&&&&&&\n{','.join(nodes[iput_index].res_quere)}\n&&&&&&&&") + elif test_type == 6: #给指定节点添加测试指令 + with open("attack_tree", "rb") as f: + TM.CCM.attack_tree = pickle.load(f) + nodes = TM.CCM.attack_tree.traverse_dfs() + str_instr = "nmap -sV -p- 192.168.204.137 -T4 -oN nmap_full_scan.txt" + index = 9 + nodes[index].instr_queue.append(str_instr) + nodes[index].res_quere = [] + with open("attack_tree", 'wb') as f: + pickle.dump(TM.CCM.attack_tree, f) + elif test_type == 7: #给指定节点修改指令的执行结果 + with open("attack_tree", "rb") as f: + TM.CCM.attack_tree = pickle.load(f) + nodes = TM.CCM.attack_tree.traverse_dfs() + str_instr = "psql -h 192.168.204.137 -U postgres -c '\l'" + start_time = get_local_timestr() # 指令执行开始时间 + bres, instr, reslut, source_result, ext_params = TM.InstrM.execute_instruction(str_instr) + end_time = get_local_timestr() # 指令执行结束时间 + # 入数据库 -- bres True和False 都入数据库2025-3-10---加node_path(2025-3-18)#? + if TM.DBM.ok: + TM.DBM.insetr_result(0, instr, reslut, 0, start_time, end_time, source_result, + ext_params, "独立命令执行") + index = 9 + nodes[index].res_quere.clear() + nodes[index].res_quere.append(reslut) + + with open("attack_tree", 'wb') as f: + pickle.dump(TM.CCM.attack_tree, f) + elif test_type == 9: #处理自定义llm回复内容 + with open("attack_tree", "rb") as f: + TM.CCM.attack_tree = pickle.load(f) + nodes = TM.CCM.attack_tree.traverse_dfs() + node = nodes[5] + strconent = ''' + {'role': 'assistant', 'content': '{"action":"update_status", "node": "25端口", "status": "已完成", "vulnerability": {"name":"SMTP用户枚举漏洞","risk":"中危","info":"VRFY命令可验证有效用户"}}\n\n```bash-[目标系统->192.168.204.137->25端口]\nsmtp-user-enum -M VRFY -U /usr/share/wordlists/metasploit/unix_users.txt -t 192.168.204.137\n```\n\n```bash-[目标系统->192.168.204.137->25端口]\nnc -nv 192.168.204.137 25 << EOF\nEXPN root\nMAIL FROM: attacker@example.com\nRCPT TO: external@example.com\nDATA\nTest open relay\n.\nQUIT\nEOF\n```'} + ''' + strjson = json.loads(strconent) + node_cmds,commands = TM.CCM.LLM.fetch_instruction(strjson["content"]) + TM.CCM.tree_manager(node_cmds) + + # 更新tree + bok, new_commands = TM.CCM.tree_manager(node_cmds, node, commands, TM.DBM) + # 分析指令入对应节点 + if bok: # 节点指令若存在错误,测试指令都不处理,需要LLM重新生成 + node_list = TM.CCM.instr_in_node(new_commands, node) + + #报不保存待定-- + with open("attack_tree", 'wb') as f: + pickle.dump(TM.CCM.attack_tree, f) + + else: #完整过程测试---要设定终止条件 pass diff --git a/config.yaml b/config.yaml index 061f49e..7df3f1a 100644 --- a/config.yaml +++ b/config.yaml @@ -14,7 +14,7 @@ mysql: database: zfsafe #LLM-Type -LLM_type: 1 #0-腾讯云,1-DS,2-GPT +LLM_type: 2 #0-腾讯云,1-DS,2-2233ai,3-GPT LLM_max_chain_count: 10 #为了避免推理链过长,造成推理效果变差,应该控制一个推理链的长度上限 #用户初始密码 diff --git a/mycode/AttackMap.py b/mycode/AttackMap.py index bd38ef3..f7233ba 100644 --- a/mycode/AttackMap.py +++ b/mycode/AttackMap.py @@ -1,4 +1,7 @@ import queue +import copy +import re + #渗透测试树结构维护类 class AttackTree: def __init__(self,root_node): @@ -130,12 +133,17 @@ class AttackTree: class TreeNode: def __init__(self, name,task_id,status="未完成", vul_type="未发现"): + self.task_id = task_id #任务id self.name = name # 节点名称 self.status = status # 节点状态 self.vul_type = vul_type # 漏洞类型 + self.vul_name = "" + self.vul_grade = "" + self.vul_info = "" self.children = [] # 子节点列表 self.parent = None # 父节点引用 self.path = "" #当前节点的路径 + self.bwork = True #当前节点是否工作,默认True self.messages = [] # 针对当前节点积累的messages -- 针对不同节点提交不同的messages self.llm_type = 0 #llm提交类型 0--初始状态无任务状态,1--指令结果反馈,2--llm错误反馈 @@ -143,12 +151,58 @@ class TreeNode: self.do_sn = 0 #针对该节点instr执行次数 self.instr_queue = [] # queue.Queue() #针对当前节点的执行指令----重要约束:一个节点只能有一个线程在执行指令 self.res_quere = [] # queue.Queue() #指令执行的结果,一批一批 + #用户补充信息 + self.cookie = "" + self.ext_info = "" + + #设置用户信息 + def set_user_info(self,cookie,ext_info): + self.cookie = cookie + self.ext_info = ext_info + + def copy_messages(self,childe_node): + ''' + 子节点继承父节点的messages,目前规则保留上两层节点的message信息 + :param childe_node: + :return: + ''' + tmp_messages = copy.deepcopy(self.messages) + if not self.parent: + childe_node.messages = tmp_messages + else: + parent_path = self.parent.path + bfind = False + for msg in tmp_messages: + if msg["role"] == "system": + childe_node.messages.append(msg) + elif msg["role"] == "user": + if not bfind: + #获取user的node_path + content = msg["content"] + pattern = r"当前分支路径:(.+?)\n" + match = re.search(pattern, content) + if match: + path = match.group(1) + if parent_path in path:#当前节点的父节点路径在存入子节点messages + childe_node.messages.append(msg) + bfind = True #后续messages都保留 + else: + print("提交的用户提示词结构有问题!") + else: + childe_node.messages.append(msg) + elif msg["role"] == "assistant": + if bfind: + childe_node.messages.append(msg) + else: + print("非法的信息体类型!") def add_child(self, child_node): child_node.parent = self child_node.path = self.path + f"->{child_node.name}" #子节点的路径赋值 - child_node.messages = self.messages #传递messages #给什么时候的messages待验证#? + #child_node.messages = copy.deepcopy(self.messages) #传递messages #给什么时候的messages待验证#? + self.copy_messages(child_node) #传递messages--只保留两层 + self.children.append(child_node) def add_instr(self,instr): diff --git a/mycode/ControlCenter.py b/mycode/ControlCenter.py index 179a8b3..1384ae2 100644 --- a/mycode/ControlCenter.py +++ b/mycode/ControlCenter.py @@ -40,7 +40,11 @@ class ControlCenter: pass def get_user_init_info(self): - '''开始任务初,获取用户设定的基础信息''' + '''开始任务初,获取用户设定的基础信息,初始信息可以分为两块: + 1.提交llm的补充信息 和 保留在本地的信息(如工具补充参数等,cookie) + 2.用户可以设置全局,和指定节点(端口) + 3.补充测试节点 + ''' # ?包括是否对目标进行初始化的信息收集 return {"已知信息":"无"} @@ -82,10 +86,14 @@ class ControlCenter: '''获取该节点的llm提交数据,会清空type和res_quere''' llm_type = node.llm_type node.llm_type = 0 #回归0 - res_list = node.res_quere[:] #复制独立副本 - node.res_quere.clear() #清空待处理数据 + res_list = node.res_quere[:] #浅拷贝,复制第一层 + node.res_quere.clear() #清空待处理数据,相当于把原应用关系接触 return llm_type,res_list + def restore_one_llm_work(self,node,llm_type,res_list): + node.llm_type = llm_type + node.res_quere = res_list + #llm请求提交线程 def th_llm_worker(self):#LLM没有修改的全局变量,应该可以共用一个client ''' @@ -104,7 +112,7 @@ class ControlCenter: self.get_llm_instruction(node,th_DBM) #释放锁 - # 暂存状态--测试时使用 + # 暂存状态--测试时使用--限制条件llm工作线程只能未1个 with open("attack_tree", 'wb') as f: pickle.dump(self.attack_tree, f) @@ -137,8 +145,7 @@ class ControlCenter: # 构造本次提交的prompt ext_Prompt = f''' 上一步结果:{res_str} -任务:生成下一步渗透测试指令或判断是否完成该节点测试 -请确保生成的指令满足以下 +任务:生成下一步渗透测试指令或判断是否完成该节点测试。 ''' elif llm_type ==2: #llm返回的指令存在问题,需要再次请求返回 ext_Prompt = f''' @@ -146,25 +153,17 @@ class ControlCenter: 错误信息:{res_str} 任务:请按格式要求重新生成该节点上一次返回中生成的所有指令。 ''' - elif llm_type ==3: #已生成节点,但未生成测试指令 - ext_Prompt = f''' -反馈类型:需要继续补充信息 -缺失信息:{res_str} -任务: -1.请生成这些节点的测试指令; -2.这些节点的父节点为当前节点,请正确生成这些节点的节点路径; -3.若还有节点未能生成测试指令,必须返回未生成指令的节点列表。 -''' - elif llm_type ==4: #未生成节点列表 - ext_Prompt = f''' -反馈类型:需要继续补充信息 -缺失信息:{res_str} -任务: -1.请生成这些节点的新增节点指令,并生成对应的测试指令; -2.这些节点的父节点为当前节点,请正确生成这些节点的节点路径; -3.若节点未能全部新增,必须返回未新增的节点列表 -4.若有未生成指令的节点,必须返回未生成指令的节点列表。 -''' +# ''' +# elif llm_type ==4: #未生成节点列表 +# ext_Prompt = f''' +# 反馈类型:需要继续补充信息 +# 缺失信息:{res_str} +# 任务: +# 1.请生成这些节点的新增节点指令,并生成对应的测试指令; +# 2.这些节点的父节点为当前节点,请正确生成这些节点的节点路径; +# 3.若节点未能全部新增,必须返回未新增的节点列表 +# 4.若有未生成指令的节点,必须返回未生成指令的节点列表。 +# ''' elif llm_type ==5: ext_Prompt = f''' 反馈类型:测试指令格式错误 @@ -186,10 +185,10 @@ class ControlCenter: 2.LLM的回复开始反复时(有点难判断) ''' # 更新tree - bok = self.tree_manager(node_cmds, node,commands) + bok,new_commands = self.tree_manager(node_cmds, node,commands,DBM) # 分析指令入对应节点 if bok: #节点指令若存在错误,测试指令都不处理,需要LLM重新生成 - node_list = self.instr_in_node(commands, node) + node_list = self.instr_in_node(new_commands, node) # 插入TM的node_queue中,交TM线程处理---除了LLM在不同的请求返回针对同一节点的测试指令,正常业务不会产生两次进队列 for node in node_list: self.TM.node_queue.put(node) @@ -229,49 +228,66 @@ class ControlCenter: self.put_one_llm_work(strerror,node,2) return False - def tree_manager(self,node_cmds,node,commands): + def tree_manager(self,node_cmds,node,commands,DBM): '''更新渗透测试树 node_cmds是json-list 2025-03-22添加commands参数,用于处理LLM对同一个节点返回了测试指令,但还返回了no_instruction节点指令 ''' if not node_cmds: # or len(node_cmds)==0: 正常not判断就可以有没有节点指令 - return True + return True,commands #对节点指令进行校验 if not self.verify_node_cmds(node_cmds,node): - return False #节点指令存在问题,终止执行 - #执行节点操作 + return False,commands #节点指令存在问题,终止执行 + + #执行节点操作---先执行add_node,怕返回顺序不一直 + residue_node_cmds = [] for node_json in node_cmds: action = node_json["action"] - if action == "add_node": #新增节点 + if action == "add_node": # 新增节点 parent_node_name = node_json["parent"] # 新增节点原则上应该都是当前节点增加子节点 - if node.name == parent_node_name: + if node.name == parent_node_name or parent_node_name.endswith(node.name): status = node_json["status"] node_names = node_json["nodes"].split(',') for node_name in node_names: - #判重 + # 判重---遇到过补充未生成指令的节点时,返回了新增这些节点的指令 bfind = False for node_child in node.children: if node_child.name == node_name: bfind = True break if not bfind: - #添加节点 - new_node = TreeNode(node_name,node.task_id,status) - node.add_child(new_node) #message的传递待验证 + # 添加节点 + new_node = TreeNode(node_name, node.task_id, status) + node.add_child(new_node) # message的传递待验证 else: - self.logger.error(f"添加子节点时,遇到父节点名称不一致的,需要介入!!{node_json}") #丢弃该节点 - elif action == "update_status": + self.logger.error(f"添加子节点时,遇到父节点名称不一致的,需要介入!!{node_json}") # 丢弃该节点 + else:#其他指令添加到list + residue_node_cmds.append(node_json) + + #执行剩余的节点指令--不分先后 + for node_json in residue_node_cmds: + action = node_json["action"] + if action == "update_status": node_name = node_json["node"] status = node_json["status"] vul_type = "未发现" - if "vulnerability" in node_json: - vul_type = json.dumps(node_json["vulnerability"]) if node.name == node_name: node.status = status + if "vulnerability" in node_json: + #{\"name\":\"漏洞名称\",\"risk\":\"风险等级(低危/中危/高危)\",\"info\":\"补充信息(没有可为空)\"}}; + vul_type = json.dumps(node_json["vulnerability"],ensure_ascii=False) #json转字符串 + try: + node.name = node_json["vulnerability"]["name"] + node.vul_grade = node_json["vulnerability"]["risk"] + node.vul_info = node_json["vulnerability"]["info"] + except: + self.logger.error("漏洞信息错误") node.vul_type = vul_type else: - self.logger.error(f"遇到不是修改本节点状态的,需要介入!!{node_json}") + str_user = f"遇到不是修改本节点状态的,需要介入!!{node_json}" + self.logger.error(str_user) + self.need_user_know(str_user,node) elif action == "no_instruction": #返回的未生成指令的数据进行校验:1.要有数据;2.节点不是当前节点就是子节点 nodes = [] @@ -285,29 +301,68 @@ class ControlCenter: break if bcommand: #如果存在测试指令,则不把该节点放入补充信息llm任务 continue + #验证对应节点是否已经创建---本节点或子节点,其他节点不处理(更狠一点就是本节点都不行) if node_name == node.name: nodes.append(node_name) + # str_add = "请生成测试指令" + # self.put_one_llm_work(str_add,node,1) else: for child_node in node.children: if child_node.name == node_name: nodes.append(node_name) + # str_add = "无" + # self.put_one_llm_work(str_add, child_node, 1) break - if nodes: #找到对应的节点才返回 - str_nodes = ",".join(nodes) - str_add = {"已新增但未生成测试指令的节点":str_nodes} - # 提交一个错误反馈任务--但继续后续工作 - self.put_one_llm_work(str_add, node, 3) - self.logger.debug(f"已新增但未生成指令的节点有:{nodes}") - elif action == "no_create": + if nodes: #阻塞式,在当前节点提交补充信息,完善节点指令 -- 优势是省token + new_commands = self.get_other_instruction(nodes,DBM,node) + commands.extend(new_commands) + elif action == "no_create": #提交人工确认 nodes = node_json["nodes"] if nodes: str_add = {"未新增的节点": nodes} - # 提交一个错误反馈任务--但继续后续工作 - self.put_one_llm_work(str_add, node, 4) - self.logger.debug(f"未新增的节点有:{nodes}") + self.logger.debug(str_add) + # 提交一个继续反馈任务--继续后续工作 2025-3-25不自动处理 + # self.put_one_llm_work(str_add, node, 4) + # self.logger.debug(f"未新增的节点有:{nodes}") else: self.logger.error("****不应该执行到这!程序逻辑存在问题!") - return True + return True,commands + + #阻塞轮询补充指令 + def get_other_instruction(self,nodes,DBM,cur_node): + res_str = ','.join(nodes) + new_commands = [] + while res_str: + self.logger.debug(f"开始针对f{res_str}这些节点请求测试指令") + user_Prompt = f''' + 当前分支路径:{cur_node.path} + 当前节点信息: + - 节点名称:{cur_node.name} + - 节点状态:{cur_node.status} + - 漏洞类型:{cur_node.vul_type} + 反馈类型:需要补充信息 + 缺失信息:针对{res_str}的测试指令 + 任务: + 1.请生成这些节点的测试指令; + 2.这些节点的父节点为当前节点,请正确生成这些节点的节点路径; + 3.若还有节点未能生成测试指令,必须返回未生成指令的节点列表。 + ''' + res_str = "" + node_cmds, commands = self.LLM.get_llm_instruction(user_Prompt, DBM, cur_node) # message要更新 + #把返回的测试指令进行追加 + new_commands.extend(commands) + #判断是否还有未添加指令的节点 + for node_json in node_cmds: #正常应该只有一条no_instruction + if "no_instruction" in node_json and "nodes" in node_json: + tmp_nodes = [] + node_names = node_json["nodes"].split(',') + for node_name in node_names: + if node_name in nodes: + tmp_nodes.append(node_name) + res_str = ','.join(tmp_nodes) + break + self.logger.debug("为添加指令的节点,都已完成指令的添加!") + return new_commands def instr_in_node(self,commands,node): node_list = [] #一次返回的测试指令 @@ -334,6 +389,10 @@ class ControlCenter: # 3.独立队列处理 return node_list + #需要用户确认的信息--待完善 + def need_user_know(self,strinfo,node): + pass + #待修改 def is_user_instr(self,instr): ''' @@ -379,4 +438,6 @@ class ControlCenter: #停止llm处理线程 self.brun =False for th in self.llmth_list: - th.jion() \ No newline at end of file + th.jion() + + diff --git a/mycode/LLMManager.py b/mycode/LLMManager.py index a5fc2f6..e3d8101 100644 --- a/mycode/LLMManager.py +++ b/mycode/LLMManager.py @@ -7,12 +7,14 @@ import openai import json import threading import re +import os from openai import OpenAI +from mycode.DBManager import DBManager from myutils.MyTime import get_local_timestr from myutils.MyLogger_logger import LogHandler class LLMManager: - def __init__(self,illm_type=0): + def __init__(self,illm_type=3): self.logger = LogHandler().get_logger("LLMManager") self.api_key = None self.api_url = None @@ -30,11 +32,24 @@ class LLMManager: self.model = "deepseek-reasoner" #model=deepseek-reasoner -- R1 model=deepseek-chat --V3 # 创建会话对象 -- 一个任务的LLM必须唯一 self.client = OpenAI(api_key=self.api_key, base_url=self.api_url) + elif illm_type == 2: #2233.ai + self.api_key = "sk-J3562ad9aece8fd2855bb495bfa1a852a4e8de8a2a1IOchD" + self.api_url = "https://api.gptsapi.net/v1" + self.model = "o3-mini-2025-01-31" + self.client = OpenAI(api_key=self.api_key,base_url=self.api_url) elif illm_type ==3: #GPT - self.api_key ="" - self.api_url = "" - self.model = "" - self.client = OpenAI() + # 定义代理服务器地址 + proxy_url = "http://192.168.3.102:3128" + os.environ["HTTP_PROXY"] = proxy_url + os.environ["HTTPS_PROXY"] = proxy_url + self.api_key ="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" + self.api_url = "https://api.openai.com/v1" + self.model = "o3-mini-2025-01-31" + + openai.proxy = proxy_url + openai.api_key = self.api_key + #self.client = openai + self.client = OpenAI(api_key=self.api_key,base_url=self.api_url) #******测试使用,设置slef.message的值 def test_old_message(self,strMessage): @@ -45,62 +60,55 @@ class LLMManager: except Exception as e: print(f"错误: {str(e)}") + ''' + **决策原则** + - 根据节点类型和状态,优先执行基础测试(如端口扫描、服务扫描)。 + - 仅在发现新信息或漏洞时新增子节点。 + - 确保每个新增节点匹配测试指令。 + ''' # 初始化messages - def build_initial_prompt(self,node): + def build_initial_prompt(self,node,str_ip=""): if not node: return - '''暂时未添加到提示词 - **核心要求**: - - 每次新增节点时,必须在同一响应中为该节点提供测试指令。 - - 分批新增节点,每次响应中新增节点不超过3个,确保指令完整。 - ''' #根节点初始化message node.messages = [{"role": "system", "content":''' -你是一位渗透测试专家,基于你的专业知识和经验,负责动态控制整个渗透测试过程,根据当前测试状态和返回结果,决定下一步测试指令,推动测试前进,直至完成渗透测试。本地程序会执行你的指令进并返回结果。 +你是一位渗透测试专家,基于你的专业知识和经验,负责动态控制整个渗透测试过程,根据当前测试状态和返回结果,决定下一步测试指令,推动测试前进,直至完成渗透测试。本地程序会执行你的指令并返回结果。 **总体要求** 1.以目标系统IP为根节点,每个渗透测试点(如端口、服务、漏洞点)作为子节点,形成树型结构(测试树); 2.测试树整体数据由本地程序存储,你只需要关注当前节点的测试推进、状态更新(未完成/已完成)及完整新增子节点; 3.返回两类指令:节点指令和测试指令,以空行间隔,不要包含注释和说明; -4.若一次性新增的节点过多,无法为每个节点都匹配测试指令,请优先保障新增节点的完整性,若有未生成指令的节点,必须返回已新增但未生成指令的节点列表。如果节点无法全部新增,必须返回未新增的节点列表; -5.若无节点修改,新增,未生成指令等数据,节点指令可以为空,但测试指令必须对应已有节点; +4.若一次性新增的节点过多,无法为每个节点都匹配测试指令,请优先保障新增中高危节点的完整性,若有未生成测试指令的节点,必须返回未生成指令节点列表; +5.若无需要处理的节点数据,节点指令可以不生成,但测试指令必须对应已有节点; **决策流程** -1. 若当前节点是IP且未进行端口扫描,则执行端口扫描; +1. 若当前节点是IP且未进行端口扫描,则对当前节点执行端口扫描; 2. 若端口扫描发现开放端口,对可能存在中高危以上风险的端口新增节点并提供测试指令; 3. 若当前节点是端口且未进行服务扫描,则执行服务扫描; 4. 若服务扫描发现服务版本或漏洞,则新增漏洞测试节点并提供测试指令; -5. 若漏洞验证成功,则根据结果决定是否需要进一步测试,若需要进一步测试,则新增子节点并提供测试指令; -6. 若节点测试无新信息和测试指令,则更新状态为“已完成”。 -**测试指令生成要求** -1.明确每个测试指令的测试目标,并优先尝试最简单、最直接的办法; -2.对于复杂的测试点,使用递进逻辑组织指令:先尝试基础测试方法,根据执行结果决定是否进行更深入的测试。 +5. 若漏洞验证成功,则根据结果决定是否需要进一步测试,若需要进一步测试,则为测试内容新增子节点并提供测试指令; +6. 当当前节点执行完成所有可能的测试指令,更新状态为“已完成”。 +**测试指令生成准则** +1.明确每个测试指令的测试目标,并优先尝试最简单、最直接的办法,不要在同一个请求生成测试效果覆盖的指令; +2.使用递进逻辑组织指令:先尝试基础测试方法,根据执行结果决定是否进行更深入的测试; +3.本地的IP地址为:192.168.204.135。 **节点指令格式** -- 新增节点:{\"action\":\"add_node\", \"parent\": \"父节点\", \"nodes\": \"子节点1,子节点2\", \"status\": \"未完成\"}; -- 已新增但未生成指令的节点列表:{\"action\": \"no_instruction\", \"nodes\": \"3306端口,1000端口\"}; -- 未新增的节点列表:{\"action\": \"no_create\", \"nodes\": \"8080端口,8081端口,9000端口\"}; -- 节点完成测试未发现漏洞:{\"action\": \"update_status\", \"node\": \"21端口\", \"status\": \"已完成\"}; -- 节点完成测试发现漏洞:{\"action\": \"update_status\", \"node\": \"21端口\", \"status\": \"已完成\",\"vulnerability\": {\"name\":\"ftp匿名登录\",\"risk\":\"高\"}}; +- 新增节点:{\"action\":\"add_node\", \"parent\": \"父节点\", \"nodes\": \"节点1,节点2\", \"status\": \"未完成\"}; +- 未生成指令节点列表:{\"action\": \"no_instruction\", \"nodes\": \"节点1,节点2\"}; +- 完成测试未发现漏洞:{\"action\": \"update_status\", \"node\": \"节点\", \"status\": \"已完成\"}; +- 完成测试且发现漏洞:{\"action\": \"update_status\", \"node\": \"节点\", \"status\": \"已完成\",\"vulnerability\": {\"name\":\"漏洞名称\",\"risk\":\"风险等级(低危/中危/高危)\",\"info\":\"补充信息(没有可为空)\"}}; **测试指令格式** -- shell指令:```bash-[节点路径](.*?)```包裹,需要避免用户交互; -- python指令:```python-[节点路径](.*?)```包裹,主函数名为dynamic_fun,需包含错误处理,执行结束后必须返回一个tuple (status, output),其中status为'success'或'failure',output为补充输出信息; +- shell指令:```bash-[节点路径](.*?)```包裹,需要避免用户交互,若涉及到多步指令,请生成python代码; +- python指令:```python-[节点路径](.*?)```包裹,主函数名为dynamic_fun,需包含错误处理,必须返回一个tuple(status, output); - [节点路径]为从根节点到目标节点的完整层级描述。 +**核心要求** +- 优先保障新增中高危测试节点的完整性。 +- 指令之间必须要有一个空行。 **响应示例** -{\"action\":\"add_node\", \"parent\": \"192.168.1.100\", \"node\": \"3306端口\", \"status\": \"未完成\"} +{\"action\":\"add_node\", \"parent\": \"192.168.1.100\", \"nodes\": \"3306端口,22端口\", \"status\": \"未完成\"} ```bash-[目标系统->192.168.1.100->3306端口] mysql -u root -p 192.168.1.100 ``` - -{\"action\":\"add_node\", \"parent\": \"192.168.1.100\", \"node\": \"22端口\", \"status\": \"未完成\"} - -```python-[目标系统->192.168.1.100->22端口] -def dynamic_fun(): - try: - result = "扫描完成" - return ("success", result) - except Exception as e: - return ("failure", str(e)) -``` '''}] # 一个messages def init_data(self,task_id=0): @@ -122,11 +130,12 @@ def dynamic_fun(): #提交LLM post_time = get_local_timestr() + response = self.client.chat.completions.create( model=self.model, + reasoning_effort="high", messages = node.messages ) - #LLM返回结果处理 reasoning_content = "" content = "" @@ -143,6 +152,12 @@ def dynamic_fun(): content = response.choices[0].message # 记录llm历史信息 node.messages.append(content) + elif self.model == "o3-mini-2025-01-31": + reasoning_content = "" #gpt不返回推理内容 + content = response.choices[0].message.content + print(content) + # 记录llm历史信息 + node.messages.append({'role': 'assistant', 'content': content}) else: self.logger.error("处理到未预设的模型!") return None @@ -202,37 +217,33 @@ def dynamic_fun(): commands.append(shell_blocks[shell_index]) shell_index +=1 else:#其他的认为是节点操作指令--指令格式还存在不确定性,需要正则匹配,要求是JSON - pattern = re.compile(r'\{.*\}', re.DOTALL) #贪婪模式会匹配到最后一个},能使用嵌套的JSON + pattern = re.compile(r'\{(?:[^{}]|\{[^{}]*\})*\}') # 遍历所有匹配到的 JSON 结构 - strlines = part.strip('\n') #按行拆分,避免贪婪模式下,匹配到多行的最后一个} - for strline in strlines: - for match in pattern.findall(strline): #正常只能有一个 - try: - node_cmds.append(json.loads(match)) # 解析 JSON 并添加到列表 - except json.JSONDecodeError as e:#解析不了的不入队列 - self.logger.error(f"LLM-{part}-JSON 解析错误: {e}") #这是需不需要人为介入? + + # strlines = part.strip('\n') #按行拆分,避免贪婪模式下,匹配到多行的最后一个} + # for strline in strlines: + for match in pattern.findall(part): #正常只能有一个 + try: + node_cmds.append(json.loads(match)) # 解析 JSON 并添加到列表 + except json.JSONDecodeError as e:#解析不了的不入队列 + self.logger.error(f"LLM-{part}-JSON 解析错误: {e}") #这是需不需要人为介入? return node_cmds,commands def test_llm(self): - with open("../test", "r", encoding="utf-8") as f: - messages = json.load(f) - text = messages[-1]["content"] - list = self.fetch_instruction(text) - for itme in list: - print("***********") - print(itme) + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "讲个笑话吧。"} + ] + response = self.client.chat.completions.create( + model=self.model, + reasoning_effort="medium", + messages=messages + ) + print(response) if __name__ == "__main__": - # LM = LLMManager(1) - # LM.test_llm() - tlist1 = [] - tlist2 = [] - tlist2.append(1) - if not tlist1: - print("list1空") - if not tlist2: - print("list2空") - if tlist2: - print("list2不为空") + llm = LLMManager(3) + llm.test_llm() + diff --git a/mycode/Result_merge.py b/mycode/Result_merge.py new file mode 100644 index 0000000..ce61a10 --- /dev/null +++ b/mycode/Result_merge.py @@ -0,0 +1,90 @@ +#结果合并功能函数模块 +import re + +def my_merge(fun_name,result): + if fun_name == "enum4linux": + result = enum4linux_merge(result) + else: + pass + return result + + +#--------------------enum4linux-------------------- +def enum4linux_merge(result): + print("enum4linux") + # 1.用户列表(用于密码爆破) + users = extract_users(result) + # 2. 共享目录(用于未授权访问/文件泄露) + shares = extract_shares(result) + # 3. 密码策略(指导爆破规则) + policy = extract_password_policy(result) + # 4. 操作系统信息(用于漏洞匹配) + os_info = extract_os_info(result) + # 整合输出 + result = f"Users:{users}\nShares:{shares}\nPolicy:{policy}\nOS Info:{os_info}\n" + print(result) + return result + +def extract_users(data): + """提取所有用户列表(含 RID)""" + users = {} + pattern = re.compile(r"user:\[(.*?)\] rid:\[(0x[a-fA-F0-9]+)\]") + matches = pattern.findall(data) + for user, rid in matches: + users[user] = rid + return users + +def extract_shares(data): + """提取共享目录并清理 ANSI 转义码""" + shares = [] + share_block = re.search(r"Share Enumeration.*?=\n(.*?)\n\n", data, re.DOTALL) + if share_block: + lines = share_block.group(1).split('\n') + for line in lines: + # 清理 ANSI 转义码(如 \x1b[35m) + line_clean = re.sub(r'\x1b\[[0-9;]*m', '', line) + if 'Disk' in line_clean or 'IPC' in line_clean: + parts = list(filter(None, line_clean.split())) + if len(parts) >= 3: + share = { + "name": parts[0], + "type": parts[1], + "access": "Unknown" + } + # 提取清理后的访问权限 + access_line = re.search(rf"//.*{re.escape(parts[0])}.*Mapping: (.*?) ", data) + if access_line: + access_clean = re.sub(r'\x1b\[[0-9;]*m', '', access_line.group(1)) + share["access"] = access_clean + shares.append(share) + return shares + +def extract_password_policy(data): + """提取密码策略""" + policy = {} + policy_block = re.search(r"Password Policy Information.*?=\n(.*?)\n\n", data, re.DOTALL) + if not policy_block: + return policy + + policy_text = policy_block.group(1) + # 提取最小密码长度(处理未匹配情况) + min_length_match = re.search(r"Minimum password length: (\d+)", policy_text) + policy["min_length"] = min_length_match.group(1) if min_length_match else "未知" + + # 提取密码复杂性要求 + complexity_match = re.search(r"Password Complexity: (Enabled|Disabled)", policy_text) + policy["complexity"] = complexity_match.group(1) if complexity_match else "未知" + + # 提取账户锁定阈值 + lockout_match = re.search(r"Account Lockout Threshold: (\d+|None)", policy_text) + policy["lockout_threshold"] = lockout_match.group(1) if lockout_match else "未知" + return policy + +def extract_os_info(data): + """提取操作系统信息""" + os_info = {} + match = re.search(r"server \(([^)]+)\)", data) + if match: + os_info["samba_version"] = match.group(1) + return os_info +#------------------------------------------------ diff --git a/payload/test.txt b/payload/test.txt new file mode 100644 index 0000000..ac1f9a4 --- /dev/null +++ b/payload/test.txt @@ -0,0 +1 @@ +测试文件 \ No newline at end of file diff --git a/pipfile b/pipfile index 1b5b6d3..7c486fe 100644 --- a/pipfile +++ b/pipfile @@ -14,5 +14,10 @@ pip install loguru -i https://pypi.tuna.tsinghua.edu.cn/simple/ pip install paramiko -i https://pypi.tuna.tsinghua.edu.cn/simple/ pip install impacket -i https://pypi.tuna.tsinghua.edu.cn/simple/ +sudo apt-get install libpq-dev python3-dev +pip install psycopg2 -i https://pypi.tuna.tsinghua.edu.cn/simple/ + cd /usr/share/wordlists/ -gzip -d rockyou.txt.gz \ No newline at end of file +gzip -d rockyou.txt.gz + +#searchsploit -u 更新漏洞信息 \ No newline at end of file diff --git a/test.py b/test.py index bc7d5c7..0df3139 100644 --- a/test.py +++ b/test.py @@ -3,6 +3,10 @@ import subprocess import tempfile import os import pexpect +import struct +import sys +import mysql.connector +import requests def do_worker(str_instruction): @@ -51,22 +55,29 @@ def do_worker_ftp_script(str_instruction): os.remove(output_file) return output + +import socket + + +def dynamic_fun(): + try: + host = "192.168.204.137" + port = 8009 + # 尝试建立连接 + sock = socket.create_connection((host, port), timeout=15) + # 发送一个基础的AJP协议探测包(仅用于检测响应) + payload = b'\x12\x34\x00\x02' # 示例数据包 + sock.sendall(payload) + response = sock.recv(1024) + sock.close() + if response: + return (1, "收到响应,可能存在CVE-2020-1938漏洞风险,请进一步人工验证") + else: + return (0, "无响应,暂未检测到漏洞") + except Exception as e: + return (0, "连接失败或错误: " + str(e)) + if __name__ == "__main__": # 示例使用 - str_instruction = """ -ftp -n 192.168.204.137 << EOF -user anonymous anonymous@example.com -ls -bye -EOF - """ - output = do_worker(str_instruction) - print(f"*****\n{output}\n*****") - - output = do_worker_ftp_script(str_instruction) - lines = output.splitlines() - # 跳过第一行(Script started)和最后一行(Script done) - ftp_output = lines[1:-1] - strout = '\n'.join(ftp_output) - print("111111111") - print(strout) \ No newline at end of file + bok,res = dynamic_fun() + print(bok,res) \ No newline at end of file diff --git a/tools/CurlTool.py b/tools/CurlTool.py index 291802d..5a26741 100644 --- a/tools/CurlTool.py +++ b/tools/CurlTool.py @@ -66,33 +66,6 @@ class CurlTool(ToolBase): parts.append('-i') return ' '.join(parts),timeout - # def execute_instruction(self, instruction_old): - # ''' - # 执行指令:验证合法性 -> 执行 -> 分析结果 - # :param instruction_old: - # :return: - # bool:true-正常返回给大模型,false-结果不返回给大模型 - # str:执行的指令 - # str:执行指令的结果 - # ''' - # - # # 第一步:验证指令合法性 - # instruction = self.validate_instruction(instruction_old) - # if not instruction: - # return False,instruction_old,"该指令暂不执行!" - # - # # 第二步:执行指令 --- 基于request使用 - # #print(f"执行指令:{instruction}") - # output = "" - # - # # 第三步:分析执行结果 - # analysis = self.analyze_result(output,instruction) - # #指令和结果入数据库 - # #? - # if not analysis: #analysis为“” 不提交LLM - # return False,instruction,analysis - # return True,instruction, analysis - def get_ssl_info(self,stderr,stdout): # -------------------------- # 解释信息的安全意义: @@ -148,42 +121,23 @@ class CurlTool(ToolBase): result = f"HTTP 状态行:{http_status},Content-Type:{content_type},HTML Title:{html_title},TLS 连接信息:{tls_info},证书 Common Name:{cert_cn},证书 Issuer:{issuer_info}" return result - def get_info_xpost(self,stdout,stderr): - """ - 从 subprocess.run 执行 curl 后的结果中提取关键信息: - - HTTP 状态码 - - 常见响应头(Content-Type, Content-Length) - - HTML 页面标题(如果内容为 HTML) - - 返回正文的前200字符(body_snippet) - - TLS/证书相关信息(从详细调试信息 stderr 中提取) - - 对于未匹配到的信息,返回“Not found”或空字符串。 - """ + def get_info_curl(self,instruction,stdout,stderr): info = {} - # 处理 stdout: 拆分响应头与正文(假设用空行分隔) parts = re.split(r'\r?\n\r?\n', stdout, maxsplit=1) - #***************解析方式一 - # headers_str = parts[0] if parts else "" - # body = parts[1] if len(parts) > 1 else "" - # - # # 提取 HTTP 状态码(从响应头第一行中获取,例如 "HTTP/1.1 202 OK") - # header_lines = headers_str.splitlines() - # ***************解析方式二 if len(parts) == 2: headers_str, body = parts else: # 如果没有拆分成功,可能 stdout 中只有正文,则从 stderr 尝试提取 HTTP 状态行 headers_str = "" body = stdout - - # 如果没有在 stdout 中找到头信息,则尝试从 stderr 中提取(部分信息可能在 stderr 中) + # 如果没有在 stdout 中找到头信息,则尝试从 stderr 中提取(部分信息可能在 stderr 中) if not headers_str: header_lines = stderr.splitlines() else: header_lines = headers_str.splitlines() - #************************** + #status_code if header_lines: status_line = header_lines[0] status_match = re.search(r'HTTP/\d+\.\d+\s+(\d+)', status_line) @@ -191,20 +145,24 @@ class CurlTool(ToolBase): else: info['status_code'] = "No headers found" - # 提取常见响应头 + #Server + m = re.search(r'^Server:\s*(.+)$', headers_str, re.MULTILINE) + if m: + info["server"] = m.group(1).strip() + + #content-type,content-length content_type = "Not found" content_length = "Not found" for line in header_lines: if line.lower().startswith("content-type:"): - info['content_type'] = line.split(":", 1)[1].strip() + info['content-type'] = line.split(":", 1)[1].strip() elif line.lower().startswith("content-length:"): - info['content_length'] = line.split(":", 1)[1].strip() - # 如果未匹配到,则设置默认值 - info.setdefault('content_type', "Not found") - info.setdefault('content_length', "Not found") + info['content-length'] = line.split(":", 1)[1].strip() + info.setdefault('content-type', "Not found") + info.setdefault('content-length', "Not found") # 如果内容为 HTML,则使用 BeautifulSoup 提取