From ab0e715500adeeccedf142361d9313796eb25a79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E9=BE=99?= Date: Sun, 30 Mar 2025 11:12:08 +0800 Subject: [PATCH] =?UTF-8?q?v0.1.1=20node=5Ftree=5F0.5=20=E5=AE=8C=E5=96=84?= =?UTF-8?q?=E4=BA=86=E5=87=A0=E4=B8=AA=E5=B7=A5=E5=85=B7=E7=B1=BB=E7=9A=84?= =?UTF-8?q?=E5=A4=84=E7=90=86=EF=BC=8C=E8=8A=82=E7=82=B9=E6=A0=91=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E4=BA=86messages=E6=8E=A7=E5=88=B6=E6=9C=BA=E5=88=B6?= =?UTF-8?q?=EF=BC=88=E5=8F=AA=E4=BF=9D=E7=95=99=E4=B8=89=E5=B1=82=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- TaskManager.py | 149 +++++++++++++++++++++++++-------- config.yaml | 2 +- mycode/AttackMap.py | 56 ++++++++++++- mycode/ControlCenter.py | 169 ++++++++++++++++++++++++++------------ mycode/LLMManager.py | 143 +++++++++++++++++--------------- mycode/Result_merge.py | 90 ++++++++++++++++++++ payload/test.txt | 1 + pipfile | 7 +- test.py | 45 ++++++---- tools/CurlTool.py | 159 ++++++++++++++++++----------------- tools/DirbTool.py | 17 ++++ tools/EchoTool.py | 2 +- tools/Enum4linuxTool.py | 13 +++ tools/FtpTool.py | 14 ++-- tools/HydraTool.py | 56 +++++++++++-- tools/MysqlTool.py | 63 ++++++++++---- tools/NcTool.py | 2 +- tools/NiktoTool.py | 64 ++++++++++++++- tools/PsqlTool.py | 84 +++++++++++++++++++ tools/PythoncodeTool.py | 23 +++++- tools/SearchsploitTool.py | 8 +- tools/ShowmountTool.py | 11 +++ tools/SmtpuserenumTool.py | 24 +++--- tools/SwaksTool.py | 11 +++ tools/TelnetTool.py | 132 ++++++++++++++++------------- tools/ToolBase.py | 73 ++++++++++------ 26 files changed, 1033 insertions(+), 385 deletions(-) create mode 100644 mycode/Result_merge.py create mode 100644 payload/test.txt create mode 100644 tools/DirbTool.py create mode 100644 tools/Enum4linuxTool.py create mode 100644 tools/PsqlTool.py create mode 100644 tools/ShowmountTool.py create mode 100644 tools/SwaksTool.py diff --git a/TaskManager.py b/TaskManager.py index 48be73a..a668d6e 100644 --- a/TaskManager.py +++ b/TaskManager.py @@ -28,7 +28,7 @@ class TaskManager: self.CCM = ControlCenter(self.DBM,self) self.InstrM = InstructionManager(self) # 类对象渗透,要约束只读取信息 # 控制最大并发指令数量 - self.max_thread_num = 2 + self.max_thread_num = 6 self.task_id = 0 #任务id -- self.workth_list = [] #线程句柄list # self.long_instr_num = 0 #耗时指令数量 @@ -38,6 +38,7 @@ class TaskManager: self.lock = threading.Lock() #线程锁 self.node_num = 0 #在处理Node线程的处理 self.brun = True + self.cookie = "" #cookie参数 def res_in_quere(self,bres,instr,reslut,start_time,end_time,th_DBM,source_result,ext_params,work_node): ''' @@ -57,8 +58,9 @@ class TaskManager: #结果入队列---2025-3-18所有的指令均需返回给LLM便于节点状态的更新,所以bres作用要调整。 res = {'执行指令':instr,'结果':reslut} + str_res = json.dumps(res,ensure_ascii=False) #直接字符串组合也可以-待验证 work_node.llm_type = 1 - work_node.add_res(res) #入节点结果队列 + work_node.add_res(str_res) #入节点结果队列 def do_worker_th(self): #线程的dbm需要一个线程一个 @@ -87,14 +89,21 @@ class TaskManager: with self.lock: self.node_num -= 1 if self.node_num == 0 and self.node_queue.empty(): # + self.logger.debug("此批次指令执行完成!") with open("attack_tree", 'wb') as f: - pickle.dump(TM.CCM.attack_tree, f) + pickle.dump(self.CCM.attack_tree, f) except queue.Empty: self.logger.debug("暂无需要执行指令的节点!") time.sleep(20) def start_task(self,target_name,target_in): + ''' + + :param target_name: 任务目标名字 + :param target_in: 任务目标访问地址 + :return: + ''' #判断目标合法性 bok,target,type = self.TargetM.validate_and_extract(target_in) if bok: @@ -134,8 +143,10 @@ if __name__ == "__main__": current_path = os.path.dirname(os.path.realpath(__file__)) strMsg = FM.read_file("test",1) - test_type = 5 - iput_index = 6 # 0是根节点 + test_type = 1 + instr_index = 19 + iput_index = -1 # 0是根节点 + indexs = [] if test_type == 0: #新目标测试 # 启动--初始化指令 node_list = TM.CCM.start_do("192.168.204.137", 0) @@ -147,10 +158,16 @@ if __name__ == "__main__": with open("attack_tree", "rb") as f: TM.CCM.attack_tree = pickle.load(f) # 遍历node,查看有instr的ndoe - nodes = TM.CCM.attack_tree.traverse_bfs() - for node in nodes: - if node.instr_queue: # list - TM.node_queue.put(node) + nodes = TM.CCM.attack_tree.traverse_dfs() + if indexs: + for index in indexs: + node = nodes[index] + if node.instr_queue: # list + TM.node_queue.put(node) + else: + for node in nodes: + if node.instr_queue: + TM.node_queue.put(node) #创建线程执行指令 for i in range(TM.max_thread_num): @@ -165,21 +182,18 @@ if __name__ == "__main__": with open("attack_tree", "rb") as f: TM.CCM.attack_tree = pickle.load(f) #遍历node,查看有res的数据 - iput_max_num = 1 + iput_max_num = 0 iput_num = 0 - nodes = TM.CCM.attack_tree.traverse_bfs() - if iput_index != -1:#index 不为-1就是指定节点返回,人为保障不越界 - node = nodes[iput_index] - if node.res_quere: - TM.CCM.llm_quere.put(node) + nodes = TM.CCM.attack_tree.traverse_dfs() + if indexs: + for index in indexs: + node = nodes[index] + if node.res_quere: + TM.CCM.llm_quere.put(node) else: for node in nodes: - if node.res_quere: #有结果需要提交LLM + if node.res_quere: TM.CCM.llm_quere.put(node) - iput_num += 1 - if iput_max_num > 0: #0是有多少提交多少 - if iput_num == iput_max_num: - break #创建llm工作线程 TM.CCM.brun = True @@ -191,25 +205,27 @@ if __name__ == "__main__": for t in TM.CCM.llmth_list: t.join() elif test_type ==3: #执行指定指令 - instrlist=[ - "msfconsole -q -x \"use auxiliary/scanner/smb/smb_version; set RHOSTS 192.168.204.137; run; exit\""] - + with open("attack_tree", "rb") as f: + TM.CCM.attack_tree = pickle.load(f) + # 遍历node,查看有instr的ndoe + nodes = TM.CCM.attack_tree.traverse_dfs() + instrlist = nodes[instr_index].instr_queue + # instrlist = [''' + # '''] for instr in instrlist: start_time = get_local_timestr() # 指令执行开始时间 bres, instr, reslut, source_result, ext_params = TM.InstrM.execute_instruction(instr) end_time = get_local_timestr() # 指令执行结束时间 - # 入数据库 -- bres True和False 都入数据库2025-3-10---加node_path(2025-3-18)#? - if TM.DBM.ok: - TM.DBM.insetr_result(0, instr, reslut, 0, start_time, end_time, source_result, - ext_params, "独立命令执行") - else: - TM.logger.error("数据库连接失败!!") + + res = {'执行结果': reslut} + str_res = json.dumps(res,ensure_ascii=False) # 直接字符串组合也可以-待验证 + print(str_res) elif test_type == 4: #修改Message with open("attack_tree", "rb") as f: TM.CCM.attack_tree = pickle.load(f) #创建一个新的节点 from mycode.AttackMap import TreeNode - testnode = TreeNode("test",0) + testnode = TreeNode("test",0,0) TM.CCM.LLM.build_initial_prompt(testnode)#新的Message systems = testnode.messages[0]["content"] #print(systems) @@ -222,9 +238,76 @@ if __name__ == "__main__": elif test_type ==5: #显示指令和结果list with open("attack_tree", "rb") as f: TM.CCM.attack_tree = pickle.load(f) - nodes = TM.CCM.attack_tree.traverse_bfs() - print(f"********\n{','.join(nodes[iput_index].instr_queue)}\n********") - print(f"&&&&&&&&\n{','.join(nodes[iput_index].res_quere)}\n&&&&&&&&") + nodes = TM.CCM.attack_tree.traverse_dfs() + if iput_index == -1: + for node in nodes: + print(f"----{node.path}-{node.status}----\n****instr_quere") + print(f"{','.join(node.instr_queue)}\n****res_quere") + try: + print(f"{','.join(node.res_quere)}") + except: + print(f"{json.dumps(node.res_quere)}") + elif iput_index == -2:#只输出有instruction的数据 + index = 0 + for node in nodes: + if node.instr_queue: + print(f"----{index}--{node.path}--{node.status}----") + print(f"{','.join(node.instr_queue)}") + index += 1 + else: + print(f"********\n{','.join(nodes[iput_index].instr_queue)}\n********") + print(f"&&&&&&&&\n{','.join(nodes[iput_index].res_quere)}\n&&&&&&&&") + elif test_type == 6: #给指定节点添加测试指令 + with open("attack_tree", "rb") as f: + TM.CCM.attack_tree = pickle.load(f) + nodes = TM.CCM.attack_tree.traverse_dfs() + str_instr = "nmap -sV -p- 192.168.204.137 -T4 -oN nmap_full_scan.txt" + index = 9 + nodes[index].instr_queue.append(str_instr) + nodes[index].res_quere = [] + with open("attack_tree", 'wb') as f: + pickle.dump(TM.CCM.attack_tree, f) + elif test_type == 7: #给指定节点修改指令的执行结果 + with open("attack_tree", "rb") as f: + TM.CCM.attack_tree = pickle.load(f) + nodes = TM.CCM.attack_tree.traverse_dfs() + str_instr = "psql -h 192.168.204.137 -U postgres -c '\l'" + start_time = get_local_timestr() # 指令执行开始时间 + bres, instr, reslut, source_result, ext_params = TM.InstrM.execute_instruction(str_instr) + end_time = get_local_timestr() # 指令执行结束时间 + # 入数据库 -- bres True和False 都入数据库2025-3-10---加node_path(2025-3-18)#? + if TM.DBM.ok: + TM.DBM.insetr_result(0, instr, reslut, 0, start_time, end_time, source_result, + ext_params, "独立命令执行") + index = 9 + nodes[index].res_quere.clear() + nodes[index].res_quere.append(reslut) + + with open("attack_tree", 'wb') as f: + pickle.dump(TM.CCM.attack_tree, f) + elif test_type == 9: #处理自定义llm回复内容 + with open("attack_tree", "rb") as f: + TM.CCM.attack_tree = pickle.load(f) + nodes = TM.CCM.attack_tree.traverse_dfs() + node = nodes[5] + strconent = ''' + {'role': 'assistant', 'content': '{"action":"update_status", "node": "25端口", "status": "已完成", "vulnerability": {"name":"SMTP用户枚举漏洞","risk":"中危","info":"VRFY命令可验证有效用户"}}\n\n```bash-[目标系统->192.168.204.137->25端口]\nsmtp-user-enum -M VRFY -U /usr/share/wordlists/metasploit/unix_users.txt -t 192.168.204.137\n```\n\n```bash-[目标系统->192.168.204.137->25端口]\nnc -nv 192.168.204.137 25 << EOF\nEXPN root\nMAIL FROM: attacker@example.com\nRCPT TO: external@example.com\nDATA\nTest open relay\n.\nQUIT\nEOF\n```'} + ''' + strjson = json.loads(strconent) + node_cmds,commands = TM.CCM.LLM.fetch_instruction(strjson["content"]) + TM.CCM.tree_manager(node_cmds) + + # 更新tree + bok, new_commands = TM.CCM.tree_manager(node_cmds, node, commands, TM.DBM) + # 分析指令入对应节点 + if bok: # 节点指令若存在错误,测试指令都不处理,需要LLM重新生成 + node_list = TM.CCM.instr_in_node(new_commands, node) + + #报不保存待定-- + with open("attack_tree", 'wb') as f: + pickle.dump(TM.CCM.attack_tree, f) + + else: #完整过程测试---要设定终止条件 pass diff --git a/config.yaml b/config.yaml index 061f49e..7df3f1a 100644 --- a/config.yaml +++ b/config.yaml @@ -14,7 +14,7 @@ mysql: database: zfsafe #LLM-Type -LLM_type: 1 #0-腾讯云,1-DS,2-GPT +LLM_type: 2 #0-腾讯云,1-DS,2-2233ai,3-GPT LLM_max_chain_count: 10 #为了避免推理链过长,造成推理效果变差,应该控制一个推理链的长度上限 #用户初始密码 diff --git a/mycode/AttackMap.py b/mycode/AttackMap.py index bd38ef3..f7233ba 100644 --- a/mycode/AttackMap.py +++ b/mycode/AttackMap.py @@ -1,4 +1,7 @@ import queue +import copy +import re + #渗透测试树结构维护类 class AttackTree: def __init__(self,root_node): @@ -130,12 +133,17 @@ class AttackTree: class TreeNode: def __init__(self, name,task_id,status="未完成", vul_type="未发现"): + self.task_id = task_id #任务id self.name = name # 节点名称 self.status = status # 节点状态 self.vul_type = vul_type # 漏洞类型 + self.vul_name = "" + self.vul_grade = "" + self.vul_info = "" self.children = [] # 子节点列表 self.parent = None # 父节点引用 self.path = "" #当前节点的路径 + self.bwork = True #当前节点是否工作,默认True self.messages = [] # 针对当前节点积累的messages -- 针对不同节点提交不同的messages self.llm_type = 0 #llm提交类型 0--初始状态无任务状态,1--指令结果反馈,2--llm错误反馈 @@ -143,12 +151,58 @@ class TreeNode: self.do_sn = 0 #针对该节点instr执行次数 self.instr_queue = [] # queue.Queue() #针对当前节点的执行指令----重要约束:一个节点只能有一个线程在执行指令 self.res_quere = [] # queue.Queue() #指令执行的结果,一批一批 + #用户补充信息 + self.cookie = "" + self.ext_info = "" + + #设置用户信息 + def set_user_info(self,cookie,ext_info): + self.cookie = cookie + self.ext_info = ext_info + + def copy_messages(self,childe_node): + ''' + 子节点继承父节点的messages,目前规则保留上两层节点的message信息 + :param childe_node: + :return: + ''' + tmp_messages = copy.deepcopy(self.messages) + if not self.parent: + childe_node.messages = tmp_messages + else: + parent_path = self.parent.path + bfind = False + for msg in tmp_messages: + if msg["role"] == "system": + childe_node.messages.append(msg) + elif msg["role"] == "user": + if not bfind: + #获取user的node_path + content = msg["content"] + pattern = r"当前分支路径:(.+?)\n" + match = re.search(pattern, content) + if match: + path = match.group(1) + if parent_path in path:#当前节点的父节点路径在存入子节点messages + childe_node.messages.append(msg) + bfind = True #后续messages都保留 + else: + print("提交的用户提示词结构有问题!") + else: + childe_node.messages.append(msg) + elif msg["role"] == "assistant": + if bfind: + childe_node.messages.append(msg) + else: + print("非法的信息体类型!") def add_child(self, child_node): child_node.parent = self child_node.path = self.path + f"->{child_node.name}" #子节点的路径赋值 - child_node.messages = self.messages #传递messages #给什么时候的messages待验证#? + #child_node.messages = copy.deepcopy(self.messages) #传递messages #给什么时候的messages待验证#? + self.copy_messages(child_node) #传递messages--只保留两层 + self.children.append(child_node) def add_instr(self,instr): diff --git a/mycode/ControlCenter.py b/mycode/ControlCenter.py index 179a8b3..1384ae2 100644 --- a/mycode/ControlCenter.py +++ b/mycode/ControlCenter.py @@ -40,7 +40,11 @@ class ControlCenter: pass def get_user_init_info(self): - '''开始任务初,获取用户设定的基础信息''' + '''开始任务初,获取用户设定的基础信息,初始信息可以分为两块: + 1.提交llm的补充信息 和 保留在本地的信息(如工具补充参数等,cookie) + 2.用户可以设置全局,和指定节点(端口) + 3.补充测试节点 + ''' # ?包括是否对目标进行初始化的信息收集 return {"已知信息":"无"} @@ -82,10 +86,14 @@ class ControlCenter: '''获取该节点的llm提交数据,会清空type和res_quere''' llm_type = node.llm_type node.llm_type = 0 #回归0 - res_list = node.res_quere[:] #复制独立副本 - node.res_quere.clear() #清空待处理数据 + res_list = node.res_quere[:] #浅拷贝,复制第一层 + node.res_quere.clear() #清空待处理数据,相当于把原应用关系接触 return llm_type,res_list + def restore_one_llm_work(self,node,llm_type,res_list): + node.llm_type = llm_type + node.res_quere = res_list + #llm请求提交线程 def th_llm_worker(self):#LLM没有修改的全局变量,应该可以共用一个client ''' @@ -104,7 +112,7 @@ class ControlCenter: self.get_llm_instruction(node,th_DBM) #释放锁 - # 暂存状态--测试时使用 + # 暂存状态--测试时使用--限制条件llm工作线程只能未1个 with open("attack_tree", 'wb') as f: pickle.dump(self.attack_tree, f) @@ -137,8 +145,7 @@ class ControlCenter: # 构造本次提交的prompt ext_Prompt = f''' 上一步结果:{res_str} -任务:生成下一步渗透测试指令或判断是否完成该节点测试 -请确保生成的指令满足以下 +任务:生成下一步渗透测试指令或判断是否完成该节点测试。 ''' elif llm_type ==2: #llm返回的指令存在问题,需要再次请求返回 ext_Prompt = f''' @@ -146,25 +153,17 @@ class ControlCenter: 错误信息:{res_str} 任务:请按格式要求重新生成该节点上一次返回中生成的所有指令。 ''' - elif llm_type ==3: #已生成节点,但未生成测试指令 - ext_Prompt = f''' -反馈类型:需要继续补充信息 -缺失信息:{res_str} -任务: -1.请生成这些节点的测试指令; -2.这些节点的父节点为当前节点,请正确生成这些节点的节点路径; -3.若还有节点未能生成测试指令,必须返回未生成指令的节点列表。 -''' - elif llm_type ==4: #未生成节点列表 - ext_Prompt = f''' -反馈类型:需要继续补充信息 -缺失信息:{res_str} -任务: -1.请生成这些节点的新增节点指令,并生成对应的测试指令; -2.这些节点的父节点为当前节点,请正确生成这些节点的节点路径; -3.若节点未能全部新增,必须返回未新增的节点列表 -4.若有未生成指令的节点,必须返回未生成指令的节点列表。 -''' +# ''' +# elif llm_type ==4: #未生成节点列表 +# ext_Prompt = f''' +# 反馈类型:需要继续补充信息 +# 缺失信息:{res_str} +# 任务: +# 1.请生成这些节点的新增节点指令,并生成对应的测试指令; +# 2.这些节点的父节点为当前节点,请正确生成这些节点的节点路径; +# 3.若节点未能全部新增,必须返回未新增的节点列表 +# 4.若有未生成指令的节点,必须返回未生成指令的节点列表。 +# ''' elif llm_type ==5: ext_Prompt = f''' 反馈类型:测试指令格式错误 @@ -186,10 +185,10 @@ class ControlCenter: 2.LLM的回复开始反复时(有点难判断) ''' # 更新tree - bok = self.tree_manager(node_cmds, node,commands) + bok,new_commands = self.tree_manager(node_cmds, node,commands,DBM) # 分析指令入对应节点 if bok: #节点指令若存在错误,测试指令都不处理,需要LLM重新生成 - node_list = self.instr_in_node(commands, node) + node_list = self.instr_in_node(new_commands, node) # 插入TM的node_queue中,交TM线程处理---除了LLM在不同的请求返回针对同一节点的测试指令,正常业务不会产生两次进队列 for node in node_list: self.TM.node_queue.put(node) @@ -229,49 +228,66 @@ class ControlCenter: self.put_one_llm_work(strerror,node,2) return False - def tree_manager(self,node_cmds,node,commands): + def tree_manager(self,node_cmds,node,commands,DBM): '''更新渗透测试树 node_cmds是json-list 2025-03-22添加commands参数,用于处理LLM对同一个节点返回了测试指令,但还返回了no_instruction节点指令 ''' if not node_cmds: # or len(node_cmds)==0: 正常not判断就可以有没有节点指令 - return True + return True,commands #对节点指令进行校验 if not self.verify_node_cmds(node_cmds,node): - return False #节点指令存在问题,终止执行 - #执行节点操作 + return False,commands #节点指令存在问题,终止执行 + + #执行节点操作---先执行add_node,怕返回顺序不一直 + residue_node_cmds = [] for node_json in node_cmds: action = node_json["action"] - if action == "add_node": #新增节点 + if action == "add_node": # 新增节点 parent_node_name = node_json["parent"] # 新增节点原则上应该都是当前节点增加子节点 - if node.name == parent_node_name: + if node.name == parent_node_name or parent_node_name.endswith(node.name): status = node_json["status"] node_names = node_json["nodes"].split(',') for node_name in node_names: - #判重 + # 判重---遇到过补充未生成指令的节点时,返回了新增这些节点的指令 bfind = False for node_child in node.children: if node_child.name == node_name: bfind = True break if not bfind: - #添加节点 - new_node = TreeNode(node_name,node.task_id,status) - node.add_child(new_node) #message的传递待验证 + # 添加节点 + new_node = TreeNode(node_name, node.task_id, status) + node.add_child(new_node) # message的传递待验证 else: - self.logger.error(f"添加子节点时,遇到父节点名称不一致的,需要介入!!{node_json}") #丢弃该节点 - elif action == "update_status": + self.logger.error(f"添加子节点时,遇到父节点名称不一致的,需要介入!!{node_json}") # 丢弃该节点 + else:#其他指令添加到list + residue_node_cmds.append(node_json) + + #执行剩余的节点指令--不分先后 + for node_json in residue_node_cmds: + action = node_json["action"] + if action == "update_status": node_name = node_json["node"] status = node_json["status"] vul_type = "未发现" - if "vulnerability" in node_json: - vul_type = json.dumps(node_json["vulnerability"]) if node.name == node_name: node.status = status + if "vulnerability" in node_json: + #{\"name\":\"漏洞名称\",\"risk\":\"风险等级(低危/中危/高危)\",\"info\":\"补充信息(没有可为空)\"}}; + vul_type = json.dumps(node_json["vulnerability"],ensure_ascii=False) #json转字符串 + try: + node.name = node_json["vulnerability"]["name"] + node.vul_grade = node_json["vulnerability"]["risk"] + node.vul_info = node_json["vulnerability"]["info"] + except: + self.logger.error("漏洞信息错误") node.vul_type = vul_type else: - self.logger.error(f"遇到不是修改本节点状态的,需要介入!!{node_json}") + str_user = f"遇到不是修改本节点状态的,需要介入!!{node_json}" + self.logger.error(str_user) + self.need_user_know(str_user,node) elif action == "no_instruction": #返回的未生成指令的数据进行校验:1.要有数据;2.节点不是当前节点就是子节点 nodes = [] @@ -285,29 +301,68 @@ class ControlCenter: break if bcommand: #如果存在测试指令,则不把该节点放入补充信息llm任务 continue + #验证对应节点是否已经创建---本节点或子节点,其他节点不处理(更狠一点就是本节点都不行) if node_name == node.name: nodes.append(node_name) + # str_add = "请生成测试指令" + # self.put_one_llm_work(str_add,node,1) else: for child_node in node.children: if child_node.name == node_name: nodes.append(node_name) + # str_add = "无" + # self.put_one_llm_work(str_add, child_node, 1) break - if nodes: #找到对应的节点才返回 - str_nodes = ",".join(nodes) - str_add = {"已新增但未生成测试指令的节点":str_nodes} - # 提交一个错误反馈任务--但继续后续工作 - self.put_one_llm_work(str_add, node, 3) - self.logger.debug(f"已新增但未生成指令的节点有:{nodes}") - elif action == "no_create": + if nodes: #阻塞式,在当前节点提交补充信息,完善节点指令 -- 优势是省token + new_commands = self.get_other_instruction(nodes,DBM,node) + commands.extend(new_commands) + elif action == "no_create": #提交人工确认 nodes = node_json["nodes"] if nodes: str_add = {"未新增的节点": nodes} - # 提交一个错误反馈任务--但继续后续工作 - self.put_one_llm_work(str_add, node, 4) - self.logger.debug(f"未新增的节点有:{nodes}") + self.logger.debug(str_add) + # 提交一个继续反馈任务--继续后续工作 2025-3-25不自动处理 + # self.put_one_llm_work(str_add, node, 4) + # self.logger.debug(f"未新增的节点有:{nodes}") else: self.logger.error("****不应该执行到这!程序逻辑存在问题!") - return True + return True,commands + + #阻塞轮询补充指令 + def get_other_instruction(self,nodes,DBM,cur_node): + res_str = ','.join(nodes) + new_commands = [] + while res_str: + self.logger.debug(f"开始针对f{res_str}这些节点请求测试指令") + user_Prompt = f''' + 当前分支路径:{cur_node.path} + 当前节点信息: + - 节点名称:{cur_node.name} + - 节点状态:{cur_node.status} + - 漏洞类型:{cur_node.vul_type} + 反馈类型:需要补充信息 + 缺失信息:针对{res_str}的测试指令 + 任务: + 1.请生成这些节点的测试指令; + 2.这些节点的父节点为当前节点,请正确生成这些节点的节点路径; + 3.若还有节点未能生成测试指令,必须返回未生成指令的节点列表。 + ''' + res_str = "" + node_cmds, commands = self.LLM.get_llm_instruction(user_Prompt, DBM, cur_node) # message要更新 + #把返回的测试指令进行追加 + new_commands.extend(commands) + #判断是否还有未添加指令的节点 + for node_json in node_cmds: #正常应该只有一条no_instruction + if "no_instruction" in node_json and "nodes" in node_json: + tmp_nodes = [] + node_names = node_json["nodes"].split(',') + for node_name in node_names: + if node_name in nodes: + tmp_nodes.append(node_name) + res_str = ','.join(tmp_nodes) + break + self.logger.debug("为添加指令的节点,都已完成指令的添加!") + return new_commands def instr_in_node(self,commands,node): node_list = [] #一次返回的测试指令 @@ -334,6 +389,10 @@ class ControlCenter: # 3.独立队列处理 return node_list + #需要用户确认的信息--待完善 + def need_user_know(self,strinfo,node): + pass + #待修改 def is_user_instr(self,instr): ''' @@ -379,4 +438,6 @@ class ControlCenter: #停止llm处理线程 self.brun =False for th in self.llmth_list: - th.jion() \ No newline at end of file + th.jion() + + diff --git a/mycode/LLMManager.py b/mycode/LLMManager.py index a5fc2f6..e3d8101 100644 --- a/mycode/LLMManager.py +++ b/mycode/LLMManager.py @@ -7,12 +7,14 @@ import openai import json import threading import re +import os from openai import OpenAI +from mycode.DBManager import DBManager from myutils.MyTime import get_local_timestr from myutils.MyLogger_logger import LogHandler class LLMManager: - def __init__(self,illm_type=0): + def __init__(self,illm_type=3): self.logger = LogHandler().get_logger("LLMManager") self.api_key = None self.api_url = None @@ -30,11 +32,24 @@ class LLMManager: self.model = "deepseek-reasoner" #model=deepseek-reasoner -- R1 model=deepseek-chat --V3 # 创建会话对象 -- 一个任务的LLM必须唯一 self.client = OpenAI(api_key=self.api_key, base_url=self.api_url) + elif illm_type == 2: #2233.ai + self.api_key = "sk-J3562ad9aece8fd2855bb495bfa1a852a4e8de8a2a1IOchD" + self.api_url = "https://api.gptsapi.net/v1" + self.model = "o3-mini-2025-01-31" + self.client = OpenAI(api_key=self.api_key,base_url=self.api_url) elif illm_type ==3: #GPT - self.api_key ="" - self.api_url = "" - self.model = "" - self.client = OpenAI() + # 定义代理服务器地址 + proxy_url = "http://192.168.3.102:3128" + os.environ["HTTP_PROXY"] = proxy_url + os.environ["HTTPS_PROXY"] = proxy_url + self.api_key ="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" + self.api_url = "https://api.openai.com/v1" + self.model = "o3-mini-2025-01-31" + + openai.proxy = proxy_url + openai.api_key = self.api_key + #self.client = openai + self.client = OpenAI(api_key=self.api_key,base_url=self.api_url) #******测试使用,设置slef.message的值 def test_old_message(self,strMessage): @@ -45,62 +60,55 @@ class LLMManager: except Exception as e: print(f"错误: {str(e)}") + ''' + **决策原则** + - 根据节点类型和状态,优先执行基础测试(如端口扫描、服务扫描)。 + - 仅在发现新信息或漏洞时新增子节点。 + - 确保每个新增节点匹配测试指令。 + ''' # 初始化messages - def build_initial_prompt(self,node): + def build_initial_prompt(self,node,str_ip=""): if not node: return - '''暂时未添加到提示词 - **核心要求**: - - 每次新增节点时,必须在同一响应中为该节点提供测试指令。 - - 分批新增节点,每次响应中新增节点不超过3个,确保指令完整。 - ''' #根节点初始化message node.messages = [{"role": "system", "content":''' -你是一位渗透测试专家,基于你的专业知识和经验,负责动态控制整个渗透测试过程,根据当前测试状态和返回结果,决定下一步测试指令,推动测试前进,直至完成渗透测试。本地程序会执行你的指令进并返回结果。 +你是一位渗透测试专家,基于你的专业知识和经验,负责动态控制整个渗透测试过程,根据当前测试状态和返回结果,决定下一步测试指令,推动测试前进,直至完成渗透测试。本地程序会执行你的指令并返回结果。 **总体要求** 1.以目标系统IP为根节点,每个渗透测试点(如端口、服务、漏洞点)作为子节点,形成树型结构(测试树); 2.测试树整体数据由本地程序存储,你只需要关注当前节点的测试推进、状态更新(未完成/已完成)及完整新增子节点; 3.返回两类指令:节点指令和测试指令,以空行间隔,不要包含注释和说明; -4.若一次性新增的节点过多,无法为每个节点都匹配测试指令,请优先保障新增节点的完整性,若有未生成指令的节点,必须返回已新增但未生成指令的节点列表。如果节点无法全部新增,必须返回未新增的节点列表; -5.若无节点修改,新增,未生成指令等数据,节点指令可以为空,但测试指令必须对应已有节点; +4.若一次性新增的节点过多,无法为每个节点都匹配测试指令,请优先保障新增中高危节点的完整性,若有未生成测试指令的节点,必须返回未生成指令节点列表; +5.若无需要处理的节点数据,节点指令可以不生成,但测试指令必须对应已有节点; **决策流程** -1. 若当前节点是IP且未进行端口扫描,则执行端口扫描; +1. 若当前节点是IP且未进行端口扫描,则对当前节点执行端口扫描; 2. 若端口扫描发现开放端口,对可能存在中高危以上风险的端口新增节点并提供测试指令; 3. 若当前节点是端口且未进行服务扫描,则执行服务扫描; 4. 若服务扫描发现服务版本或漏洞,则新增漏洞测试节点并提供测试指令; -5. 若漏洞验证成功,则根据结果决定是否需要进一步测试,若需要进一步测试,则新增子节点并提供测试指令; -6. 若节点测试无新信息和测试指令,则更新状态为“已完成”。 -**测试指令生成要求** -1.明确每个测试指令的测试目标,并优先尝试最简单、最直接的办法; -2.对于复杂的测试点,使用递进逻辑组织指令:先尝试基础测试方法,根据执行结果决定是否进行更深入的测试。 +5. 若漏洞验证成功,则根据结果决定是否需要进一步测试,若需要进一步测试,则为测试内容新增子节点并提供测试指令; +6. 当当前节点执行完成所有可能的测试指令,更新状态为“已完成”。 +**测试指令生成准则** +1.明确每个测试指令的测试目标,并优先尝试最简单、最直接的办法,不要在同一个请求生成测试效果覆盖的指令; +2.使用递进逻辑组织指令:先尝试基础测试方法,根据执行结果决定是否进行更深入的测试; +3.本地的IP地址为:192.168.204.135。 **节点指令格式** -- 新增节点:{\"action\":\"add_node\", \"parent\": \"父节点\", \"nodes\": \"子节点1,子节点2\", \"status\": \"未完成\"}; -- 已新增但未生成指令的节点列表:{\"action\": \"no_instruction\", \"nodes\": \"3306端口,1000端口\"}; -- 未新增的节点列表:{\"action\": \"no_create\", \"nodes\": \"8080端口,8081端口,9000端口\"}; -- 节点完成测试未发现漏洞:{\"action\": \"update_status\", \"node\": \"21端口\", \"status\": \"已完成\"}; -- 节点完成测试发现漏洞:{\"action\": \"update_status\", \"node\": \"21端口\", \"status\": \"已完成\",\"vulnerability\": {\"name\":\"ftp匿名登录\",\"risk\":\"高\"}}; +- 新增节点:{\"action\":\"add_node\", \"parent\": \"父节点\", \"nodes\": \"节点1,节点2\", \"status\": \"未完成\"}; +- 未生成指令节点列表:{\"action\": \"no_instruction\", \"nodes\": \"节点1,节点2\"}; +- 完成测试未发现漏洞:{\"action\": \"update_status\", \"node\": \"节点\", \"status\": \"已完成\"}; +- 完成测试且发现漏洞:{\"action\": \"update_status\", \"node\": \"节点\", \"status\": \"已完成\",\"vulnerability\": {\"name\":\"漏洞名称\",\"risk\":\"风险等级(低危/中危/高危)\",\"info\":\"补充信息(没有可为空)\"}}; **测试指令格式** -- shell指令:```bash-[节点路径](.*?)```包裹,需要避免用户交互; -- python指令:```python-[节点路径](.*?)```包裹,主函数名为dynamic_fun,需包含错误处理,执行结束后必须返回一个tuple (status, output),其中status为'success'或'failure',output为补充输出信息; +- shell指令:```bash-[节点路径](.*?)```包裹,需要避免用户交互,若涉及到多步指令,请生成python代码; +- python指令:```python-[节点路径](.*?)```包裹,主函数名为dynamic_fun,需包含错误处理,必须返回一个tuple(status, output); - [节点路径]为从根节点到目标节点的完整层级描述。 +**核心要求** +- 优先保障新增中高危测试节点的完整性。 +- 指令之间必须要有一个空行。 **响应示例** -{\"action\":\"add_node\", \"parent\": \"192.168.1.100\", \"node\": \"3306端口\", \"status\": \"未完成\"} +{\"action\":\"add_node\", \"parent\": \"192.168.1.100\", \"nodes\": \"3306端口,22端口\", \"status\": \"未完成\"} ```bash-[目标系统->192.168.1.100->3306端口] mysql -u root -p 192.168.1.100 ``` - -{\"action\":\"add_node\", \"parent\": \"192.168.1.100\", \"node\": \"22端口\", \"status\": \"未完成\"} - -```python-[目标系统->192.168.1.100->22端口] -def dynamic_fun(): - try: - result = "扫描完成" - return ("success", result) - except Exception as e: - return ("failure", str(e)) -``` '''}] # 一个messages def init_data(self,task_id=0): @@ -122,11 +130,12 @@ def dynamic_fun(): #提交LLM post_time = get_local_timestr() + response = self.client.chat.completions.create( model=self.model, + reasoning_effort="high", messages = node.messages ) - #LLM返回结果处理 reasoning_content = "" content = "" @@ -143,6 +152,12 @@ def dynamic_fun(): content = response.choices[0].message # 记录llm历史信息 node.messages.append(content) + elif self.model == "o3-mini-2025-01-31": + reasoning_content = "" #gpt不返回推理内容 + content = response.choices[0].message.content + print(content) + # 记录llm历史信息 + node.messages.append({'role': 'assistant', 'content': content}) else: self.logger.error("处理到未预设的模型!") return None @@ -202,37 +217,33 @@ def dynamic_fun(): commands.append(shell_blocks[shell_index]) shell_index +=1 else:#其他的认为是节点操作指令--指令格式还存在不确定性,需要正则匹配,要求是JSON - pattern = re.compile(r'\{.*\}', re.DOTALL) #贪婪模式会匹配到最后一个},能使用嵌套的JSON + pattern = re.compile(r'\{(?:[^{}]|\{[^{}]*\})*\}') # 遍历所有匹配到的 JSON 结构 - strlines = part.strip('\n') #按行拆分,避免贪婪模式下,匹配到多行的最后一个} - for strline in strlines: - for match in pattern.findall(strline): #正常只能有一个 - try: - node_cmds.append(json.loads(match)) # 解析 JSON 并添加到列表 - except json.JSONDecodeError as e:#解析不了的不入队列 - self.logger.error(f"LLM-{part}-JSON 解析错误: {e}") #这是需不需要人为介入? + + # strlines = part.strip('\n') #按行拆分,避免贪婪模式下,匹配到多行的最后一个} + # for strline in strlines: + for match in pattern.findall(part): #正常只能有一个 + try: + node_cmds.append(json.loads(match)) # 解析 JSON 并添加到列表 + except json.JSONDecodeError as e:#解析不了的不入队列 + self.logger.error(f"LLM-{part}-JSON 解析错误: {e}") #这是需不需要人为介入? return node_cmds,commands def test_llm(self): - with open("../test", "r", encoding="utf-8") as f: - messages = json.load(f) - text = messages[-1]["content"] - list = self.fetch_instruction(text) - for itme in list: - print("***********") - print(itme) + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "讲个笑话吧。"} + ] + response = self.client.chat.completions.create( + model=self.model, + reasoning_effort="medium", + messages=messages + ) + print(response) if __name__ == "__main__": - # LM = LLMManager(1) - # LM.test_llm() - tlist1 = [] - tlist2 = [] - tlist2.append(1) - if not tlist1: - print("list1空") - if not tlist2: - print("list2空") - if tlist2: - print("list2不为空") + llm = LLMManager(3) + llm.test_llm() + diff --git a/mycode/Result_merge.py b/mycode/Result_merge.py new file mode 100644 index 0000000..ce61a10 --- /dev/null +++ b/mycode/Result_merge.py @@ -0,0 +1,90 @@ +#结果合并功能函数模块 +import re + +def my_merge(fun_name,result): + if fun_name == "enum4linux": + result = enum4linux_merge(result) + else: + pass + return result + + +#--------------------enum4linux-------------------- +def enum4linux_merge(result): + print("enum4linux") + # 1.用户列表(用于密码爆破) + users = extract_users(result) + # 2. 共享目录(用于未授权访问/文件泄露) + shares = extract_shares(result) + # 3. 密码策略(指导爆破规则) + policy = extract_password_policy(result) + # 4. 操作系统信息(用于漏洞匹配) + os_info = extract_os_info(result) + # 整合输出 + result = f"Users:{users}\nShares:{shares}\nPolicy:{policy}\nOS Info:{os_info}\n" + print(result) + return result + +def extract_users(data): + """提取所有用户列表(含 RID)""" + users = {} + pattern = re.compile(r"user:\[(.*?)\] rid:\[(0x[a-fA-F0-9]+)\]") + matches = pattern.findall(data) + for user, rid in matches: + users[user] = rid + return users + +def extract_shares(data): + """提取共享目录并清理 ANSI 转义码""" + shares = [] + share_block = re.search(r"Share Enumeration.*?=\n(.*?)\n\n", data, re.DOTALL) + if share_block: + lines = share_block.group(1).split('\n') + for line in lines: + # 清理 ANSI 转义码(如 \x1b[35m) + line_clean = re.sub(r'\x1b\[[0-9;]*m', '', line) + if 'Disk' in line_clean or 'IPC' in line_clean: + parts = list(filter(None, line_clean.split())) + if len(parts) >= 3: + share = { + "name": parts[0], + "type": parts[1], + "access": "Unknown" + } + # 提取清理后的访问权限 + access_line = re.search(rf"//.*{re.escape(parts[0])}.*Mapping: (.*?) ", data) + if access_line: + access_clean = re.sub(r'\x1b\[[0-9;]*m', '', access_line.group(1)) + share["access"] = access_clean + shares.append(share) + return shares + +def extract_password_policy(data): + """提取密码策略""" + policy = {} + policy_block = re.search(r"Password Policy Information.*?=\n(.*?)\n\n", data, re.DOTALL) + if not policy_block: + return policy + + policy_text = policy_block.group(1) + # 提取最小密码长度(处理未匹配情况) + min_length_match = re.search(r"Minimum password length: (\d+)", policy_text) + policy["min_length"] = min_length_match.group(1) if min_length_match else "未知" + + # 提取密码复杂性要求 + complexity_match = re.search(r"Password Complexity: (Enabled|Disabled)", policy_text) + policy["complexity"] = complexity_match.group(1) if complexity_match else "未知" + + # 提取账户锁定阈值 + lockout_match = re.search(r"Account Lockout Threshold: (\d+|None)", policy_text) + policy["lockout_threshold"] = lockout_match.group(1) if lockout_match else "未知" + return policy + +def extract_os_info(data): + """提取操作系统信息""" + os_info = {} + match = re.search(r"server \(([^)]+)\)", data) + if match: + os_info["samba_version"] = match.group(1) + return os_info +#------------------------------------------------ diff --git a/payload/test.txt b/payload/test.txt new file mode 100644 index 0000000..ac1f9a4 --- /dev/null +++ b/payload/test.txt @@ -0,0 +1 @@ +测试文件 \ No newline at end of file diff --git a/pipfile b/pipfile index 1b5b6d3..7c486fe 100644 --- a/pipfile +++ b/pipfile @@ -14,5 +14,10 @@ pip install loguru -i https://pypi.tuna.tsinghua.edu.cn/simple/ pip install paramiko -i https://pypi.tuna.tsinghua.edu.cn/simple/ pip install impacket -i https://pypi.tuna.tsinghua.edu.cn/simple/ +sudo apt-get install libpq-dev python3-dev +pip install psycopg2 -i https://pypi.tuna.tsinghua.edu.cn/simple/ + cd /usr/share/wordlists/ -gzip -d rockyou.txt.gz \ No newline at end of file +gzip -d rockyou.txt.gz + +#searchsploit -u 更新漏洞信息 \ No newline at end of file diff --git a/test.py b/test.py index bc7d5c7..0df3139 100644 --- a/test.py +++ b/test.py @@ -3,6 +3,10 @@ import subprocess import tempfile import os import pexpect +import struct +import sys +import mysql.connector +import requests def do_worker(str_instruction): @@ -51,22 +55,29 @@ def do_worker_ftp_script(str_instruction): os.remove(output_file) return output + +import socket + + +def dynamic_fun(): + try: + host = "192.168.204.137" + port = 8009 + # 尝试建立连接 + sock = socket.create_connection((host, port), timeout=15) + # 发送一个基础的AJP协议探测包(仅用于检测响应) + payload = b'\x12\x34\x00\x02' # 示例数据包 + sock.sendall(payload) + response = sock.recv(1024) + sock.close() + if response: + return (1, "收到响应,可能存在CVE-2020-1938漏洞风险,请进一步人工验证") + else: + return (0, "无响应,暂未检测到漏洞") + except Exception as e: + return (0, "连接失败或错误: " + str(e)) + if __name__ == "__main__": # 示例使用 - str_instruction = """ -ftp -n 192.168.204.137 << EOF -user anonymous anonymous@example.com -ls -bye -EOF - """ - output = do_worker(str_instruction) - print(f"*****\n{output}\n*****") - - output = do_worker_ftp_script(str_instruction) - lines = output.splitlines() - # 跳过第一行(Script started)和最后一行(Script done) - ftp_output = lines[1:-1] - strout = '\n'.join(ftp_output) - print("111111111") - print(strout) \ No newline at end of file + bok,res = dynamic_fun() + print(bok,res) \ No newline at end of file diff --git a/tools/CurlTool.py b/tools/CurlTool.py index 291802d..5a26741 100644 --- a/tools/CurlTool.py +++ b/tools/CurlTool.py @@ -66,33 +66,6 @@ class CurlTool(ToolBase): parts.append('-i') return ' '.join(parts),timeout - # def execute_instruction(self, instruction_old): - # ''' - # 执行指令:验证合法性 -> 执行 -> 分析结果 - # :param instruction_old: - # :return: - # bool:true-正常返回给大模型,false-结果不返回给大模型 - # str:执行的指令 - # str:执行指令的结果 - # ''' - # - # # 第一步:验证指令合法性 - # instruction = self.validate_instruction(instruction_old) - # if not instruction: - # return False,instruction_old,"该指令暂不执行!" - # - # # 第二步:执行指令 --- 基于request使用 - # #print(f"执行指令:{instruction}") - # output = "" - # - # # 第三步:分析执行结果 - # analysis = self.analyze_result(output,instruction) - # #指令和结果入数据库 - # #? - # if not analysis: #analysis为“” 不提交LLM - # return False,instruction,analysis - # return True,instruction, analysis - def get_ssl_info(self,stderr,stdout): # -------------------------- # 解释信息的安全意义: @@ -148,42 +121,23 @@ class CurlTool(ToolBase): result = f"HTTP 状态行:{http_status},Content-Type:{content_type},HTML Title:{html_title},TLS 连接信息:{tls_info},证书 Common Name:{cert_cn},证书 Issuer:{issuer_info}" return result - def get_info_xpost(self,stdout,stderr): - """ - 从 subprocess.run 执行 curl 后的结果中提取关键信息: - - HTTP 状态码 - - 常见响应头(Content-Type, Content-Length) - - HTML 页面标题(如果内容为 HTML) - - 返回正文的前200字符(body_snippet) - - TLS/证书相关信息(从详细调试信息 stderr 中提取) - - 对于未匹配到的信息,返回“Not found”或空字符串。 - """ + def get_info_curl(self,instruction,stdout,stderr): info = {} - # 处理 stdout: 拆分响应头与正文(假设用空行分隔) parts = re.split(r'\r?\n\r?\n', stdout, maxsplit=1) - #***************解析方式一 - # headers_str = parts[0] if parts else "" - # body = parts[1] if len(parts) > 1 else "" - # - # # 提取 HTTP 状态码(从响应头第一行中获取,例如 "HTTP/1.1 202 OK") - # header_lines = headers_str.splitlines() - # ***************解析方式二 if len(parts) == 2: headers_str, body = parts else: # 如果没有拆分成功,可能 stdout 中只有正文,则从 stderr 尝试提取 HTTP 状态行 headers_str = "" body = stdout - - # 如果没有在 stdout 中找到头信息,则尝试从 stderr 中提取(部分信息可能在 stderr 中) + # 如果没有在 stdout 中找到头信息,则尝试从 stderr 中提取(部分信息可能在 stderr 中) if not headers_str: header_lines = stderr.splitlines() else: header_lines = headers_str.splitlines() - #************************** + #status_code if header_lines: status_line = header_lines[0] status_match = re.search(r'HTTP/\d+\.\d+\s+(\d+)', status_line) @@ -191,20 +145,24 @@ class CurlTool(ToolBase): else: info['status_code'] = "No headers found" - # 提取常见响应头 + #Server + m = re.search(r'^Server:\s*(.+)$', headers_str, re.MULTILINE) + if m: + info["server"] = m.group(1).strip() + + #content-type,content-length content_type = "Not found" content_length = "Not found" for line in header_lines: if line.lower().startswith("content-type:"): - info['content_type'] = line.split(":", 1)[1].strip() + info['content-type'] = line.split(":", 1)[1].strip() elif line.lower().startswith("content-length:"): - info['content_length'] = line.split(":", 1)[1].strip() - # 如果未匹配到,则设置默认值 - info.setdefault('content_type', "Not found") - info.setdefault('content_length', "Not found") + info['content-length'] = line.split(":", 1)[1].strip() + info.setdefault('content-type', "Not found") + info.setdefault('content-length', "Not found") # 如果内容为 HTML,则使用 BeautifulSoup 提取 标签内容 - if "html" in info['content_type'].lower(): + if "html" in info['content-type'].lower(): try: soup = BeautifulSoup(body, "html.parser") if soup.title and soup.title.string: @@ -216,25 +174,76 @@ class CurlTool(ToolBase): else: info['html_title'] = "N/A" - # 保存部分正文内容,便于后续分析 - info['body_snippet'] = body[:200] # 前500字符 + #------------正文部分解析------------ + if "phpinfo.php" in instruction: + info["configurations"] = {} + info["sensitive_info"] = {} + # 提取PHP版本信息,可以尝试从phpinfo表格中提取 + m = re.search(r'PHP Version\s*</th>\s*<td[^>]*>\s*([\d.]+)\s*</td>', body, re.IGNORECASE) + if m: + info["php_version"] = m.group(1).strip() + else: + # 备用方案:在页面中查找 "PHP Version" 后面的数字 + m = re.search(r'PHP\s*Version\s*([\d.]+)', body, re.IGNORECASE) + if m: + info["php_version"] = m.group(1).strip() + + # 提取配置信息(如allow_url_include, display_errors, file_uploads, open_basedir) + configs = ["allow_url_include", "display_errors", "file_uploads", "open_basedir"] + for key in configs: + # 尝试匹配HTML表格形式:<td>key</td><td>value</td> + regex = re.compile(r'<td[^>]*>\s*' + re.escape(key) + r'\s*</td>\s*<td[^>]*>\s*([^<]+?)\s*</td>', + re.IGNORECASE) + m = regex.search(body) + if m: + info["configurations"][key] = m.group(1).strip() - # 处理 stderr 中的 TLS/证书信息:只提取包含关键字的行 - tls_info_lines = [] - cert_info_lines = [] - for line in stderr.splitlines(): - # 过滤与 TLS/SSL 握手、证书相关的信息 - if "SSL connection using" in line or "TLS" in line: - tls_info_lines.append(line.strip()) - if "certificate" in line.lower(): - cert_info_lines.append(line.strip()) - info['tls_info'] = tls_info_lines if tls_info_lines else "Not found" - info['certificate_info'] = cert_info_lines if cert_info_lines else "Not found" + # 提取敏感信息,这里以MYSQL_PASSWORD为例 + sensitive_keys = ["MYSQL_PASSWORD"] + for key in sensitive_keys: + regex = re.compile(r'<td[^>]*>\s*' + re.escape(key) + r'\s*</td>\s*<td[^>]*>\s*([^<]+?)\s*</td>', + re.IGNORECASE) + m = regex.search(body) + if m: + info["sensitive_info"][key] = m.group(1).strip() + elif "phpMyAdmin" in instruction: + info["security_info"] = {} + info["login_info"] = {} + # 查找登录表单中用户名、密码字段(例如 name="pma_username" 和 name="pma_password") + m = re.search(r'<input[^>]+name=["\'](pma_username)["\']', body, re.IGNORECASE) + if m: + info["login_info"]["username_field"] = m.group(1).strip() + m = re.search(r'<input[^>]+name=["\'](pma_password)["\']', body, re.IGNORECASE) + if m: + info["login_info"]["password_field"] = m.group(1).strip() + #安全信息 + # csrf_protection:尝试查找隐藏域中是否存在 csrf token(例如 name="csrf_token" 或 "token") + m = re.search(r'<input[^>]+name=["\'](csrf_token|token)["\']', stdout, re.IGNORECASE) + info["security_info"]["csrf_protection"] = True if m else False + # httponly_cookie:从响应头中查找 Set-Cookie 行中是否包含 HttpOnly + m = re.search(r'Set-Cookie:.*HttpOnly', stdout, re.IGNORECASE) + info["security_info"]["httponly_cookie"] = True if m else False + # secure_cookie:从响应头中查找 Set-Cookie 行中是否包含 Secure + m = re.search(r'Set-Cookie:.*Secure', stdout, re.IGNORECASE) + info["security_info"]["secure_cookie"] = True if m else False + else: # + #info['body_snippet'] = body[:200] # 前500字符 + if stderr: + # 处理 stderr 中的 TLS/证书信息:只提取包含关键字的行 + tls_info_lines = [] + cert_info_lines = [] + for line in stderr.splitlines(): + # 过滤与 TLS/SSL 握手、证书相关的信息 + if "SSL connection using" in line or "TLS" in line: + tls_info_lines.append(line.strip()) + if "certificate" in line.lower(): + cert_info_lines.append(line.strip()) + info['tls_info'] = tls_info_lines if tls_info_lines else "Not found" + info['certificate_info'] = cert_info_lines if cert_info_lines else "Not found" - # 可选:保留完整的 verbose 信息以便后续分析 - #info['verbose'] = stderr #转换成字符串 result = json.dumps(info,ensure_ascii=False) + #print(result) return result def analyze_result(self, result,instruction,stderr,stdout): @@ -268,11 +277,11 @@ class CurlTool(ToolBase): elif("-kv https://" in instruction or "-vk https://" in instruction): result = self.get_ssl_info(stderr,stdout) elif("-X POST " in instruction): - result = self.get_info_xpost(stdout,stderr) + result = self.get_info_curl(instruction,stdout,stderr) elif("-v " in instruction): #curl -v http://192.168.204.137:8180/manager/html --user admin:admin 常规解析curl返回内容 - result = self.get_info_xpost(stdout,stderr) - else: #非处理命令的结果,暂时不提交LLM - result ="" + result = self.get_info_curl(instruction,stdout,stderr) + else: + result = self.get_info_curl(instruction,stdout,stderr) return result if __name__ =="__main__": diff --git a/tools/DirbTool.py b/tools/DirbTool.py new file mode 100644 index 0000000..ee2cb31 --- /dev/null +++ b/tools/DirbTool.py @@ -0,0 +1,17 @@ +import subprocess +import tempfile +import os +from tools.ToolBase import ToolBase + +class DirbTool(ToolBase): + def validate_instruction(self, instruction): + #指令过滤 + timeout = 0 + instruction = instruction.strip() + if " -o" not in instruction: + instruction += " -o dirout.txt" + return instruction,timeout + + def analyze_result(self, result,instruction,stderr,stdout): + #指令结果分析 + return result \ No newline at end of file diff --git a/tools/EchoTool.py b/tools/EchoTool.py index b218e08..476d9af 100644 --- a/tools/EchoTool.py +++ b/tools/EchoTool.py @@ -16,6 +16,6 @@ class EchoTool(ToolBase): else: result ="不存在安全问题" else:#未预处理的情况,暂时不返回LLM - result = "" + pass return result \ No newline at end of file diff --git a/tools/Enum4linuxTool.py b/tools/Enum4linuxTool.py new file mode 100644 index 0000000..250e1d5 --- /dev/null +++ b/tools/Enum4linuxTool.py @@ -0,0 +1,13 @@ +from tools.ToolBase import ToolBase +from mycode.Result_merge import my_merge + +class Enum4linuxTool(ToolBase): + def validate_instruction(self, instruction): + #指令过滤 + timeout = 0 + return instruction,timeout + + def analyze_result(self, result,instruction,stderr,stdout): + #指令结果分析 + result = my_merge("enum4linux", result) + return result \ No newline at end of file diff --git a/tools/FtpTool.py b/tools/FtpTool.py index fb0bea5..f4aff61 100644 --- a/tools/FtpTool.py +++ b/tools/FtpTool.py @@ -46,8 +46,10 @@ class FtpTool(ToolBase): def validate_instruction(self, instruction): timeout = 30 #modified_code = "ftp匿名登录测试" - - return instruction,timeout + #若有put文件,则替换为payload文件 + new_file = "payload/test.txt" + new_instr = re.sub(r'(put\s+)\S+', r'\1' + new_file, instruction) + return new_instr,timeout def do_worker_subprocess(self,str_instruction,timeout,ext_params): output = "" @@ -123,12 +125,12 @@ class FtpTool(ToolBase): # 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#? # 第二步:执行指令---需要对ftp指令进行区分判断 - pattern = re.compile(r'ftp\s+-n\s+\S+\s+<< EOF') + pattern = re.compile(r'ftp\s+-n\s+\S+(\s+\d+)?\s+<<\s*EOF') match = pattern.search(instruction) if bool(match): #如果是 ftp -n 192.168.204.137 <<EOF 开头 - output = self.do_worker_subprocess(instruction,time_out,ext_params) - if not output: - output = self.do_worker_script(instruction,time_out,ext_params) + # output = self.do_worker_subprocess(instruction,time_out,ext_params) + # if not output: + output = self.do_worker_script(instruction,time_out,ext_params) else: #最后使用ftp匿名登陆验证代码 target = "" diff --git a/tools/HydraTool.py b/tools/HydraTool.py index 3f56a9e..2ec81d1 100644 --- a/tools/HydraTool.py +++ b/tools/HydraTool.py @@ -1,6 +1,7 @@ import os import shlex import re +from collections import OrderedDict from tools.ToolBase import ToolBase class HydraTool(ToolBase): @@ -13,22 +14,63 @@ class HydraTool(ToolBase): if match_p: str_p = match_p.group(1) #判断文件是否存在 - if not os.path.exists(str_p): #文件不存在要替换 - new_pass_path = os.path.join(current_path, "../payload", "passwords") - instruction = instruction.replace(str_p,new_pass_path) + #if not os.path.exists(str_p): #文件不存在要替换 + new_pass_path = os.path.join(current_path, "../payload", "passwords") + instruction = instruction.replace(str_p,new_pass_path) if match_l: str_l = match_l.group(1) #判断文件是否存在 - if not os.path.exists(str_l): - new_user_path = os.path.join(current_path, "../payload", "users") - instruction = instruction.replace(str_l, new_user_path) + #if not os.path.exists(str_l): + new_user_path = os.path.join(current_path, "../payload", "users") + instruction = instruction.replace(str_l, new_user_path) + #不是双字典的情况加-f if "-l" in instruction or "-p" in instruction: if "-f" not in instruction: - instruction = instruction + " -f" #当是单密码,或单用户名时,使用成功即停止模式 + instruction = instruction.strip() + " -f" #当是单密码,或单用户名时,使用成功即停止模式 + + #取消-v -V + instruction = instruction.replace(" -V "," ") + instruction = instruction.replace(" -v "," ") + instruction = instruction.replace(" -vV","") + #加-o 存在个不确定项是:若没有匹配到,输出文件里面是只有一行执行的命令,没有结果描述 + if " -o" not in instruction: + instruction = instruction + " -o hydra_result.txt" + # # 加 -q + # if " -q" not in instruction: + # instruction = instruction + " -q" return instruction,timeout + def merge_info(self,result): + try: + # 按行分割输出,保留非空行 + lines = [line.strip() for line in result.splitlines() if line.strip() != ""] + # 使用有序字典统计相同行的出现次数,保持原始顺序 + counts = OrderedDict() + for line in lines: + if line in counts: + counts[line] += 1 + else: + counts[line] = 1 + # 生成整合后的输出,重复的行后面跟上*次数标记 + output_lines = [] + for line, count in counts.items(): + if count > 1: + output_lines.append(f"{line} *{count}") + else: + output_lines.append(line) + consolidated = "\n".join(output_lines) + return consolidated + except Exception as e: + return result + def analyze_result(self, result,instruction,stderr,stdout): #返回结果 + # result = self.merge_info(result) + # print(result) + #加文件后缀了 + lines = result.splitlines() + if len(lines) == 1: + result = "没有匹配到成功的结果" return result \ No newline at end of file diff --git a/tools/MysqlTool.py b/tools/MysqlTool.py index ee06ce3..49c73b8 100644 --- a/tools/MysqlTool.py +++ b/tools/MysqlTool.py @@ -1,5 +1,6 @@ #mysql #pip install mysql-connector-python +import subprocess import mysql.connector from mysql.connector import Error from tools.ToolBase import ToolBase @@ -31,33 +32,61 @@ class MysqlTool(ToolBase): return res def validate_instruction(self, instruction): - #mysql暂只执行空密码攻击 - timeout = 0 - modified_code = "mysql空密码登录测试" - return modified_code,0 + timeout = 30 + #modified_code = "mysql空密码登录测试" + instr = instruction.replace("--ssl-mode=DISABLED","--ssl=0") #mariaDB 没有ssl-mode参数 + # if "--ssl=0" not in instr: + # instr = instr + " --ssl=0" + return instr,timeout - #对于非sh命令调用的工具,自己实现命令执行的内容 - def execute_instruction(self, instruction_old): + #对于非sh命令调用的工具,自己实现命令执行的内容 --#2025-3-24暂时不使用 + def execute_instruction_old(self, instruction_old): ext_params = self.create_extparams() # 第一步:验证指令合法性 - instruction,time_out = self.validate_instruction(instruction_old) + instruction,timeout = self.validate_instruction(instruction_old) if not instruction: return False, instruction_old, "该指令暂不执行!","",ext_params # 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#? # 第二步:执行指令 - target = "" - parts = instruction_old.split() - for i, part in enumerate(parts): - if part == "-h" and i + 1 < len(parts): - target = parts[i + 1] - output = self.test_empty_password_mysql_connection(target)#弱密码攻击如何处理? + # target = "" + # parts = instruction_old.split() + # for i, part in enumerate(parts): + # if part == "-h" and i + 1 < len(parts): + # target = parts[i + 1] + # output = self.test_empty_password_mysql_connection(target)#弱密码攻击如何处理? + + output = "" + stdout = "" + stderr = "" + try: + if timeout == 0: + result = subprocess.run(instruction, shell=True, capture_output=True, text=True) + elif timeout > 0: + result = subprocess.run(instruction, shell=True, capture_output=True, text=True, timeout=timeout) + else: + print("timeout参数错误,需要自查程序逻辑!") + stderr = result.stderr + stdout = result.stdout + except subprocess.TimeoutExpired as e: + stdout = e.stdout if e.stdout is not None else "" + stderr = e.stderr if e.stderr is not None else "" + ext_params.is_user = True # 对于超时的也需要人工进行确认,是否是预期的超时 + except Exception as e: + ext_params.is_user = True + return False, instruction, f"执行失败:{str(e)}", "", ext_params # 执行失败,提交给人工确认指令的正确性 # 第三步:分析执行结果 - analysis = self.analyze_result(output,instruction,"","") - # 指令和结果入数据库 - # ? - return True, instruction, analysis,output,ext_params + output = stdout + if stderr: + output += stderr + if isinstance(output, bytes): # 若是bytes则转成str + output = output.decode('utf-8', errors='ignore') + analysis = self.analyze_result(output, instruction, stderr, stdout) + if not analysis: # analysis为“” 不提交LLM + ext_params.is_user = True + return False, instruction, analysis, output, ext_params + return True, instruction, analysis, output, ext_params def analyze_result(self, result,instruction,stderr,stdout): # diff --git a/tools/NcTool.py b/tools/NcTool.py index beb1709..0b69d32 100644 --- a/tools/NcTool.py +++ b/tools/NcTool.py @@ -2,7 +2,7 @@ from tools.ToolBase import ToolBase class NcTool(ToolBase): def validate_instruction(self, instruction): - timeout = 30 + timeout = 60 #指令过滤 if "<<<" in instruction: instruction = f"bash -c \"{instruction}\"" diff --git a/tools/NiktoTool.py b/tools/NiktoTool.py index 4591154..3b045b3 100644 --- a/tools/NiktoTool.py +++ b/tools/NiktoTool.py @@ -14,9 +14,71 @@ class NiktoTool(ToolBase): # 使用正则表达式匹配 -ssl 参数及其相邻的空格 cleaned_command = re.sub(r'\s+-ssl\b|\b-ssl\b', '', instruction, flags=re.IGNORECASE) # 处理可能残留的多余空格 - return re.sub(r'\s+', ' ', cleaned_command).strip(),timeout + command = re.sub(r'\s+', ' ', cleaned_command).strip() + command = command.replace("-p80","-p 80") + return command,timeout def analyze_result(self, result,instruction,stderr,stdout): # 检查结果 + if stderr: + result = stderr + else: + result = self.parse_nikto_full_info(result) + return result + + def parse_nikto_full_info(self,nikto_output: str) -> dict: + """ + 从 Nikto 扫描结果中提取尽可能多的漏洞信息和对后续渗透测试有用的信息。 + + 解析思路: + 1. 通过正则表达式提取以“+”开头的行(Nikto 输出通常以“+”开头)。 + 2. 针对常见漏洞格式(如 OSVDB-xxxx)进行匹配。 + 3. 针对可能包含“vulnerability”、“warning”、“potential”等关键字的行进行提取。 + 4. 针对配置或其他信息(如 "Allowed HTTP Methods", "Cookie", "Directory indexing" 等)也进行匹配。 + 5. 对于未知格式,尽量保留原始提示,以供后续人工分析。 + + 返回的字典包含以下键: + - vulnerabilities: 漏洞相关信息(如 OSVDB 编号、潜在漏洞提示等) + - configuration_issues: 与服务器配置、HTTP 方法、Cookie、安全策略等相关的提示 + - misc_info: 其他可能对渗透测试有帮助的信息 + - error: 如解析过程中发生异常,则记录异常信息 + """ + result = { + "vulnerabilities": [], + "configuration_issues": [], + "misc_info": [] + } + + try: + # 获取所有以 + 开头的行 + plus_lines = re.findall(r'^\+\s*(.+)$', nikto_output, re.MULTILINE) + + # 定义一些常见关键词,出现这些关键词的行我们认为是漏洞或安全问题 + vuln_keywords = [ + "OSVDB", "vulnerab", "potential", "insecure", "misconfig", "expos", + "Directory indexing", "Outdated", "Deprecated", "SSL", "Cookie", "error" + ] + config_keywords = [ + "Allowed HTTP Methods", "Server:", "Banner", "HTTP Options", "Headers", "trace", "PUT", "DELETE" + ] + + for line in plus_lines: + # 先将行统一为小写做匹配,但保留原始行内容 + low_line = line.lower() + + # 判断是否属于漏洞信息 + if re.search(r'osvdb-\d+', line): + result["vulnerabilities"].append(line.strip()) + elif any(kw.lower() in low_line for kw in vuln_keywords): + result["vulnerabilities"].append(line.strip()) + # 判断是否属于配置问题(例如HTTP方法、Cookie安全、服务器banner异常等) + elif any(kw.lower() in low_line for kw in config_keywords): + result["configuration_issues"].append(line.strip()) + else: + # 其他信息,可能对后续渗透测试有启示 + result["misc_info"].append(line.strip()) + except Exception as e: + result["error"] = f"解析过程中出现异常: {str(e)}" + return result \ No newline at end of file diff --git a/tools/PsqlTool.py b/tools/PsqlTool.py new file mode 100644 index 0000000..15107d2 --- /dev/null +++ b/tools/PsqlTool.py @@ -0,0 +1,84 @@ +import subprocess +import tempfile +import os +from tools.ToolBase import ToolBase + +class PsqlTool(ToolBase): + def validate_instruction(self, instruction): + #指令过滤 + timeout = 60 + return instruction,timeout + + def analyze_result(self, result,instruction,stderr,stdout): + #指令结果分析 + return result + + def do_worker_script(self,str_instruction,timeout,ext_params): + # 创建临时文件保存输出 + with tempfile.NamedTemporaryFile(delete=False) as tmpfile: + output_file = tmpfile.name + + # 构建并执行 script 命令 + script_cmd = f"script -c '{str_instruction}' {output_file}" + try: + result = subprocess.run(script_cmd, shell=True, text=True,timeout=timeout) + # 读取输出文件内容 + with open(output_file, 'r') as f: + output = f.read() + lines = output.splitlines() + # 跳过第一行(Script started)和最后一行(Script done) + ftp_output = lines[1:-1] + output = '\n'.join(ftp_output) + except subprocess.TimeoutExpired: + output = "命令超时返回" + try: + with open(output_file, 'r') as f: + partial_output = f.read() + if partial_output: + output += f"\n部分输出:\n{partial_output}" + except FileNotFoundError: + pass # 文件可能未创建 + except subprocess.CalledProcessError as e: + output = f"错误: {e}" + finally: + # 删除临时文件 + try: + os.remove(output_file) + except FileNotFoundError: + pass # 文件可能未创建 + return output + + def execute_instruction1(self, instruction_old): + ''' + 执行指令:验证合法性 -> 执行 -> 分析结果 + *****如果指令要做验证,只做白名单,所有逻辑不是全放开就是白名单***** + :param instruction_old: + :return: + bool:true-正常返回给大模型处理下一步,false-结果不返回给大模型,2--需要人工确认的指令 + str:执行的指令 + str:执行指令的结果-解析过滤后的结果--也是提交给LLM的结果 + str:执行指令的结果-原结果 + object:补充参数-封装一个对象: 0-不知是否攻击成功,1-明确存在漏洞,2-明确不存在漏洞 + ''' + + ext_params = self.create_extparams() + # 第一步:验证指令合法性 + instruction,timeout = self.validate_instruction(instruction_old) + if not instruction: + ext_params.is_user= True + return False,instruction_old,"该指令暂不执行!由用户确认是否要兼容支持","",ext_params #未 + #过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#? + + # 第二步:执行指令 + output = self.do_worker_script(instruction,timeout,ext_params) + + # 第三步:分析执行结果 + if isinstance(output,bytes):#若是bytes则转成str + output = output.decode('utf-8', errors='ignore') + + analysis = self.analyze_result(output,instruction,"","") + + if not analysis: #analysis为“” 不提交LLM + ext_params.is_user = True + return False,instruction,analysis,output,ext_params + return True,instruction, analysis,output,ext_params diff --git a/tools/PythoncodeTool.py b/tools/PythoncodeTool.py index a6384f1..91ed859 100644 --- a/tools/PythoncodeTool.py +++ b/tools/PythoncodeTool.py @@ -6,7 +6,14 @@ import builtins import re import paramiko import impacket +import psycopg2 +import socket +import struct +import sys +import requests +import mysql.connector from tools.ToolBase import ToolBase +from mycode.Result_merge import my_merge class PythoncodeTool(ToolBase): @@ -56,6 +63,7 @@ class PythoncodeTool(ToolBase): # 定义允许的内置函数集合 --白名单 allowed_builtins = { + '__name__': __name__, '__import__': builtins.__import__, "abs": abs, "all": all, @@ -96,7 +104,14 @@ class PythoncodeTool(ToolBase): 'json':json, 're':re, 'paramiko':paramiko, - 'impacket':impacket} + 'impacket':impacket, + 'psycopg2':psycopg2, + 'socket':socket, + 'mysql':mysql, + 'mysql.connector':mysql.connector, + 'struct':struct, + 'sys':sys, + 'requests':requests} safe_locals = {} #不需要预设局部参数 # 在限制环境中执行代码 exec(instruction, safe_globals,safe_locals) @@ -107,7 +122,7 @@ class PythoncodeTool(ToolBase): return True,instruction,analysis,analysis,ext_params # Get the function and call it dynamic_fun = safe_locals['dynamic_fun'] - status, tmpout = dynamic_fun() + status, tmpout = dynamic_fun() #LLM存在status定义错误的情况(执行成功,却返回的是False) #重点要处理 output = f"status:{status},output:{tmpout}" except Exception as e: analysis = f"执行动态代码时出错: {str(e)}" @@ -124,7 +139,9 @@ class PythoncodeTool(ToolBase): return True, instruction, analysis,"",ext_params def analyze_result(self, result,instruction,stderr,stdout): - #指令结果分析 + #指令结果分析 --- 要不要限定一个max_len? + if "enum4linux " in instruction: #存在指令包装成Python代码返回的情况 + result = my_merge("enum4linux",result) return result if __name__ == "__main__": diff --git a/tools/SearchsploitTool.py b/tools/SearchsploitTool.py index 7fa25a4..bc628f1 100644 --- a/tools/SearchsploitTool.py +++ b/tools/SearchsploitTool.py @@ -103,9 +103,9 @@ class SearchsploitTool(ToolBase): instruction = f"searchsploit -m {filepath}" try: subprocess.run(instruction, shell=True, check=True,timeout=60) - print("命令执行成功,文件下载完成。") + #print("命令执行成功,文件下载完成。") except subprocess.CalledProcessError as e: #超时暂不处理--后续补充 - print(f"命令执行失败: {e}") + #print(f"命令执行失败: {e}") return False if not os.path.exists(filename): @@ -113,10 +113,10 @@ class SearchsploitTool(ToolBase): # 移动文件到目标目录 try: shutil.move(filename, os.path.join(dirpath, filename)) - print(f"文件已成功移动到 {dirpath}") + #print(f"文件已成功移动到 {dirpath}") return True except Exception as e: - print(f"移动文件失败: {e}") + #print(f"移动文件失败: {e}") return False else: #暂时只利用python脚本 return False diff --git a/tools/ShowmountTool.py b/tools/ShowmountTool.py new file mode 100644 index 0000000..632af75 --- /dev/null +++ b/tools/ShowmountTool.py @@ -0,0 +1,11 @@ +from tools.ToolBase import ToolBase + +class ShowmountTool(ToolBase): + def validate_instruction(self, instruction): + #指令过滤 + timeout = 0 + return instruction,timeout + + def analyze_result(self, result,instruction,stderr,stdout): + #指令结果分析 + return result \ No newline at end of file diff --git a/tools/SmtpuserenumTool.py b/tools/SmtpuserenumTool.py index 6dd3e2e..37bf6cc 100644 --- a/tools/SmtpuserenumTool.py +++ b/tools/SmtpuserenumTool.py @@ -1,29 +1,25 @@ import os import shlex +import re from tools.ToolBase import ToolBase class SmtpuserenumTool(ToolBase): def validate_instruction(self, instruction): timeout = 0 - # 分割指令为参数列表 - cmd_parts = shlex.split(instruction) - new_cmd = [] # 获取当前程序所在目录 current_path = os.path.dirname(os.path.realpath(__file__)) new_user_path = os.path.join(current_path, "../payload", "users") - i = 0 - while i < len(cmd_parts): - part = cmd_parts[i] - new_cmd.append(part) - # 检测到-P参数 - if part == "-U" and i + 1 < len(cmd_parts): # 用户名 - # 替换下一参数为指定路径 - new_cmd.append(new_user_path) - i += 1 # 跳过原路径参数 - i += 1 - return " ".join(shlex.quote(p) for p in new_cmd),timeout + match = re.search(r'-U\s+(\S+)', instruction) + if match: + file_path = match.group(1) + # 检查该文件是否存在 + if not os.path.isfile(file_path): + # 替换原文件路径为新的路径 + instruction = instruction.replace(file_path, new_user_path) + + return instruction,timeout def analyze_result(self, result,instruction,stderr,stdout): #指令结果分析 diff --git a/tools/SwaksTool.py b/tools/SwaksTool.py new file mode 100644 index 0000000..1e35861 --- /dev/null +++ b/tools/SwaksTool.py @@ -0,0 +1,11 @@ +from tools.ToolBase import ToolBase + +class SwaksTool(ToolBase): + def validate_instruction(self, instruction): + #指令过滤 + timeout = 0 + return instruction,timeout + + def analyze_result(self, result,instruction,stderr,stdout): + #指令结果分析 + return result \ No newline at end of file diff --git a/tools/TelnetTool.py b/tools/TelnetTool.py index ec5f0f9..38ee3b1 100644 --- a/tools/TelnetTool.py +++ b/tools/TelnetTool.py @@ -6,7 +6,7 @@ MAIL FROM: <admin@haitutech.cn> RCPT TO: <target@example.com> NOTIFY=SUCCESS,FAILURE ''' import socket - +import subprocess from tools.ToolBase import ToolBase class TelnetTool(ToolBase): @@ -19,10 +19,9 @@ class TelnetTool(ToolBase): #指令结果分析 return result - def smtp_injection_test(self,cmd_str: str): + def smtp_injection_test(self,cmd_str: str,host,port): """ 测试 SMTP 命令注入漏洞。 - 参数: cmd_str: 多行字符串,其中第一行为类似 "telnet haitutech.cn 25" 后续行为具体的 SMTP 命令,例如: @@ -35,67 +34,42 @@ class TelnetTool(ToolBase): 如果 RCPT 命令响应以 250 开头,认为可能存在漏洞(返回 True), 否则认为安全或不支持该注入(返回 False)。 """ + last_response = "" lines = cmd_str.strip().splitlines() - if not lines: - print("未提供命令") - return "" - - # 解析第一行,格式应为:telnet host port - parts = lines[0].strip().split() - if len(parts) < 3: - print("第一行格式错误,预期格式: 'telnet host port'") - return "" + try: + # 建立 TCP 连接 + s = socket.create_connection((host, port), timeout=10) + except Exception as e: + #print("连接失败:", e) + return f"连接失败:{e}" - host = parts[1] + # 读取 SMTP Banner try: - port = int(parts[2]) - except ValueError: - print("端口转换失败") - return "" + banner = s.recv(1024).decode('utf-8', errors='ignore') + except Exception as e: + #print("读取 Banner 失败:", e) + s.close() + return f"读取 Banner 失败:{e}" - last_response = "" - if port == 25: + # 从第二行开始发送 SMTP 命令 + for line in lines[1:]: + cmd = line.strip() + if not cmd: + continue try: - # 建立 TCP 连接 - s = socket.create_connection((host, port), timeout=10) + s.send((cmd + "\r\n").encode()) except Exception as e: - #print("连接失败:", e) - return f"连接失败:{e}" - - # 读取 SMTP Banner + s.close() + return f"发送命令失败:{e}" try: - banner = s.recv(1024).decode('utf-8', errors='ignore') - print("Banner:", banner.strip()) + response = s.recv(1024).decode('utf-8', errors='ignore') + #print("收到响应:", response.strip()) + last_response = response.strip() except Exception as e: - #print("读取 Banner 失败:", e) + #print("读取响应失败:", e) s.close() - return f"读取 Banner 失败:{e}" - - # 从第二行开始发送 SMTP 命令 - for line in lines[1:]: - cmd = line.strip() - if not cmd: - continue - print("发送命令:", cmd) - try: - s.send((cmd + "\r\n").encode()) - except Exception as e: - print("发送命令失败:", e) - s.close() - return "" - - try: - response = s.recv(1024).decode('utf-8', errors='ignore') - print("收到响应:", response.strip()) - last_response = response.strip() - except Exception as e: - print("读取响应失败:", e) - s.close() - return f"读取响应失败:{e}" - s.close() - else: - return "" - + return f"读取响应失败:{e}" + s.close() # 以 RCPT 命令的响应作为判断依据 # 注意:对于支持 DSN 的服务器,250 可能是合法的响应, # 但在不支持的情况下,若返回 250 则可能说明命令注入导致了不正常的处理。 @@ -115,19 +89,59 @@ class TelnetTool(ToolBase): ''' ext_params = self.create_extparams() # 第一步:验证指令合法性 - instruction,time_out = self.validate_instruction(instruction_old) + instruction,timeout = self.validate_instruction(instruction_old) if not instruction: return False,instruction_old,"该指令暂不执行!","",ext_params #过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#? # 第二步:执行指令 - output = self.smtp_injection_test(instruction) + lines = instruction.strip().splitlines() + # 解析第一行,格式应为:telnet host port + parts = lines[0].strip().split() + if len(parts) < 3: + output = "第一行格式错误,预期格式: 'telnet host port'" + ext_params["is_user"] = True + return True,instruction,output,output,ext_params + host = parts[1] + try: + port = int(parts[2]) + except ValueError: + output = "端口转换失败" + ext_params["is_user"] = True + return True, instruction, output, output, ext_params + if port == 25: + output = self.smtp_injection_test(instruction,host,port) + else:#其他默认subprocess执行 + try: + # 指令过滤 + if "<<<" in instruction: + instruction = f"bash -c \"{instruction}\"" + + if timeout == 0: + result = subprocess.run(instruction, shell=True, capture_output=True, text=True) + elif timeout > 0: + result = subprocess.run(instruction, shell=True, capture_output=True, text=True, timeout=timeout) + else: + print("timeout参数错误,需要自查程序逻辑!") + stderr = result.stderr + stdout = result.stdout + except subprocess.TimeoutExpired as e: + stdout = e.stdout if e.stdout is not None else "" + stderr = e.stderr if e.stderr is not None else "" + ext_params.is_user = True # 对于超时的也需要人工进行确认,是否是预期的超时 + except Exception as e: + ext_params.is_user = True + return False, instruction, f"执行失败:{str(e)}", "", ext_params # 执行失败,提交给人工确认指令的正确性 + output = stdout + if stderr: + output += stderr + if isinstance(output, bytes): # 若是bytes则转成str + output = output.decode('utf-8', errors='ignore') # 第三步:分析执行结果 analysis = self.analyze_result(output,instruction,"","") - #指令和结果入数据库 - #? if not analysis: #analysis为“” 不提交LLM + ext_params.is_user = True return False,instruction,analysis,output,ext_params return True,instruction, analysis,output,ext_params diff --git a/tools/ToolBase.py b/tools/ToolBase.py index 481fdfd..f8708c6 100644 --- a/tools/ToolBase.py +++ b/tools/ToolBase.py @@ -9,6 +9,7 @@ import abc import subprocess import argparse +import shlex import sys from myutils.ReturnParams import ReturnParams @@ -26,26 +27,62 @@ class ToolBase(abc.ABC): return ext_params def parse_sublist3r_command(self,command): - parser = argparse.ArgumentParser(add_help=False) #创建命令行参数解析器对象‌ - parser.add_argument("-o ", "--output", type=str) #添加需要解析的参数规则‌ - parser.add_argument("-oN ", "--Noutput", type=str) #nmap - parser.add_argument("-oG ","--NMG",type=str) #nmap - parser.add_argument("-output ", "--nikto", type=str) #nikto + parser = argparse.ArgumentParser(add_help=False,allow_abbrev=False) #创建命令行参数解析器对象‌ + parser.add_argument("-o", "--output", type=str) #添加需要解析的参数规则‌ + parser.add_argument("-oN", "--Noutput", type=str) #nmap + parser.add_argument("-oG","--NMG",type=str) #nmap + parser.add_argument("-output", "--nikto", type=str) #nikto args, _ = parser.parse_known_args(command.split()[1:]) return args + def parse_output_params(self,command, valid_flags=None): + """ + 解析 shell 命令中用于输出文件的参数。 + 只检查完全匹配的 token,忽略诸如 "-oKexAlgorithms" 这样的参数。 + + :param command: 完整的命令字符串 + :param valid_flags: 合法的输出参数列表,默认支持 -o, -oN, -oG, -output + :return: dict,键为输出参数,值为对应的文件路径(如果有的话) + """ + if valid_flags is None: + valid_flags = ['-o', '-oN', '-oG', '-output'] + + tokens = shlex.split(command) + output_params = {} + output_file = "" + i = 0 + while i < len(tokens): + token = tokens[i] + # 如果 token 完全匹配有效的参数,则认为它是输出参数 + if token in valid_flags: + # 如果紧跟着有值,则把它作为输出文件路径,否则为 None + if i + 1 < len(tokens): + #output_params[token] = tokens[i + 1] + #i += 2 + output_file = tokens[i+1] + # else: + # output_params[token] = None + # i += 1 + break # 一般只有一个输出参数 + else: + i += 1 + return output_file + def read_output_file(self,filename): """ 读取 -o 参数指定的文件内容 """ + content = "" try: with open(filename, "r") as f: - return f.read() + content = f.read() + with open(filename,'w') as f: + f.write("") except FileNotFoundError: print(f"错误: 文件 {filename} 不存在") except PermissionError: print(f"错误: 无权限读取文件 {filename}") - return "" + return content def execute_instruction(self, instruction_old): ''' @@ -82,23 +119,11 @@ class ToolBase(abc.ABC): print("timeout参数错误,需要自查程序逻辑!") #output = result.stdout if result.stdout else result.stderr #-o 的命令需要处理 - parsed_arg = self.parse_sublist3r_command(instruction) - if parsed_arg.output: - file_out = self.read_output_file(parsed_arg.output) - stderr = "" - stdout = file_out - elif parsed_arg.Noutput: - file_out = self.read_output_file(parsed_arg.Noutput) - stderr = "" - stdout = file_out - elif parsed_arg.nikto: - file_out = self.read_output_file(parsed_arg.nikto) - stderr = "" - stdout = file_out - elif parsed_arg.NMG: - file_out = self.read_output_file(parsed_arg.NMG) - stderr = "" - stdout = file_out + parsed_arg = self.parse_output_params(instruction) + if parsed_arg: + fole_out = self.read_output_file(parsed_arg) + stderr ="" + stdout = fole_out else: stderr = result.stderr stdout = result.stdout