Browse Source

v0.1.1 node_tree_0.5 完善了几个工具类的处理,节点树增加了messages控制机制(只保留三层)

master
张龙 1 month ago
parent
commit
ab0e715500
  1. 149
      TaskManager.py
  2. 2
      config.yaml
  3. 56
      mycode/AttackMap.py
  4. 169
      mycode/ControlCenter.py
  5. 143
      mycode/LLMManager.py
  6. 90
      mycode/Result_merge.py
  7. 1
      payload/test.txt
  8. 7
      pipfile
  9. 45
      test.py
  10. 159
      tools/CurlTool.py
  11. 17
      tools/DirbTool.py
  12. 2
      tools/EchoTool.py
  13. 13
      tools/Enum4linuxTool.py
  14. 14
      tools/FtpTool.py
  15. 56
      tools/HydraTool.py
  16. 63
      tools/MysqlTool.py
  17. 2
      tools/NcTool.py
  18. 64
      tools/NiktoTool.py
  19. 84
      tools/PsqlTool.py
  20. 23
      tools/PythoncodeTool.py
  21. 8
      tools/SearchsploitTool.py
  22. 11
      tools/ShowmountTool.py
  23. 24
      tools/SmtpuserenumTool.py
  24. 11
      tools/SwaksTool.py
  25. 132
      tools/TelnetTool.py
  26. 73
      tools/ToolBase.py

149
TaskManager.py

@ -28,7 +28,7 @@ class TaskManager:
self.CCM = ControlCenter(self.DBM,self) self.CCM = ControlCenter(self.DBM,self)
self.InstrM = InstructionManager(self) # 类对象渗透,要约束只读取信息 self.InstrM = InstructionManager(self) # 类对象渗透,要约束只读取信息
# 控制最大并发指令数量 # 控制最大并发指令数量
self.max_thread_num = 2 self.max_thread_num = 6
self.task_id = 0 #任务id -- self.task_id = 0 #任务id --
self.workth_list = [] #线程句柄list self.workth_list = [] #线程句柄list
# self.long_instr_num = 0 #耗时指令数量 # self.long_instr_num = 0 #耗时指令数量
@ -38,6 +38,7 @@ class TaskManager:
self.lock = threading.Lock() #线程锁 self.lock = threading.Lock() #线程锁
self.node_num = 0 #在处理Node线程的处理 self.node_num = 0 #在处理Node线程的处理
self.brun = True self.brun = True
self.cookie = "" #cookie参数
def res_in_quere(self,bres,instr,reslut,start_time,end_time,th_DBM,source_result,ext_params,work_node): def res_in_quere(self,bres,instr,reslut,start_time,end_time,th_DBM,source_result,ext_params,work_node):
''' '''
@ -57,8 +58,9 @@ class TaskManager:
#结果入队列---2025-3-18所有的指令均需返回给LLM便于节点状态的更新,所以bres作用要调整。 #结果入队列---2025-3-18所有的指令均需返回给LLM便于节点状态的更新,所以bres作用要调整。
res = {'执行指令':instr,'结果':reslut} res = {'执行指令':instr,'结果':reslut}
str_res = json.dumps(res,ensure_ascii=False) #直接字符串组合也可以-待验证
work_node.llm_type = 1 work_node.llm_type = 1
work_node.add_res(res) #入节点结果队列 work_node.add_res(str_res) #入节点结果队列
def do_worker_th(self): def do_worker_th(self):
#线程的dbm需要一个线程一个 #线程的dbm需要一个线程一个
@ -87,14 +89,21 @@ class TaskManager:
with self.lock: with self.lock:
self.node_num -= 1 self.node_num -= 1
if self.node_num == 0 and self.node_queue.empty(): # if self.node_num == 0 and self.node_queue.empty(): #
self.logger.debug("此批次指令执行完成!")
with open("attack_tree", 'wb') as f: with open("attack_tree", 'wb') as f:
pickle.dump(TM.CCM.attack_tree, f) pickle.dump(self.CCM.attack_tree, f)
except queue.Empty: except queue.Empty:
self.logger.debug("暂无需要执行指令的节点!") self.logger.debug("暂无需要执行指令的节点!")
time.sleep(20) time.sleep(20)
def start_task(self,target_name,target_in): def start_task(self,target_name,target_in):
'''
:param target_name: 任务目标名字
:param target_in: 任务目标访问地址
:return:
'''
#判断目标合法性 #判断目标合法性
bok,target,type = self.TargetM.validate_and_extract(target_in) bok,target,type = self.TargetM.validate_and_extract(target_in)
if bok: if bok:
@ -134,8 +143,10 @@ if __name__ == "__main__":
current_path = os.path.dirname(os.path.realpath(__file__)) current_path = os.path.dirname(os.path.realpath(__file__))
strMsg = FM.read_file("test",1) strMsg = FM.read_file("test",1)
test_type = 5 test_type = 1
iput_index = 6 # 0是根节点 instr_index = 19
iput_index = -1 # 0是根节点
indexs = []
if test_type == 0: #新目标测试 if test_type == 0: #新目标测试
# 启动--初始化指令 # 启动--初始化指令
node_list = TM.CCM.start_do("192.168.204.137", 0) node_list = TM.CCM.start_do("192.168.204.137", 0)
@ -147,10 +158,16 @@ if __name__ == "__main__":
with open("attack_tree", "rb") as f: with open("attack_tree", "rb") as f:
TM.CCM.attack_tree = pickle.load(f) TM.CCM.attack_tree = pickle.load(f)
# 遍历node,查看有instr的ndoe # 遍历node,查看有instr的ndoe
nodes = TM.CCM.attack_tree.traverse_bfs() nodes = TM.CCM.attack_tree.traverse_dfs()
for node in nodes: if indexs:
if node.instr_queue: # list for index in indexs:
TM.node_queue.put(node) node = nodes[index]
if node.instr_queue: # list
TM.node_queue.put(node)
else:
for node in nodes:
if node.instr_queue:
TM.node_queue.put(node)
#创建线程执行指令 #创建线程执行指令
for i in range(TM.max_thread_num): for i in range(TM.max_thread_num):
@ -165,21 +182,18 @@ if __name__ == "__main__":
with open("attack_tree", "rb") as f: with open("attack_tree", "rb") as f:
TM.CCM.attack_tree = pickle.load(f) TM.CCM.attack_tree = pickle.load(f)
#遍历node,查看有res的数据 #遍历node,查看有res的数据
iput_max_num = 1 iput_max_num = 0
iput_num = 0 iput_num = 0
nodes = TM.CCM.attack_tree.traverse_bfs() nodes = TM.CCM.attack_tree.traverse_dfs()
if iput_index != -1:#index 不为-1就是指定节点返回,人为保障不越界 if indexs:
node = nodes[iput_index] for index in indexs:
if node.res_quere: node = nodes[index]
TM.CCM.llm_quere.put(node) if node.res_quere:
TM.CCM.llm_quere.put(node)
else: else:
for node in nodes: for node in nodes:
if node.res_quere: #有结果需要提交LLM if node.res_quere:
TM.CCM.llm_quere.put(node) TM.CCM.llm_quere.put(node)
iput_num += 1
if iput_max_num > 0: #0是有多少提交多少
if iput_num == iput_max_num:
break
#创建llm工作线程 #创建llm工作线程
TM.CCM.brun = True TM.CCM.brun = True
@ -191,25 +205,27 @@ if __name__ == "__main__":
for t in TM.CCM.llmth_list: for t in TM.CCM.llmth_list:
t.join() t.join()
elif test_type ==3: #执行指定指令 elif test_type ==3: #执行指定指令
instrlist=[ with open("attack_tree", "rb") as f:
"msfconsole -q -x \"use auxiliary/scanner/smb/smb_version; set RHOSTS 192.168.204.137; run; exit\""] TM.CCM.attack_tree = pickle.load(f)
# 遍历node,查看有instr的ndoe
nodes = TM.CCM.attack_tree.traverse_dfs()
instrlist = nodes[instr_index].instr_queue
# instrlist = ['''
# ''']
for instr in instrlist: for instr in instrlist:
start_time = get_local_timestr() # 指令执行开始时间 start_time = get_local_timestr() # 指令执行开始时间
bres, instr, reslut, source_result, ext_params = TM.InstrM.execute_instruction(instr) bres, instr, reslut, source_result, ext_params = TM.InstrM.execute_instruction(instr)
end_time = get_local_timestr() # 指令执行结束时间 end_time = get_local_timestr() # 指令执行结束时间
# 入数据库 -- bres True和False 都入数据库2025-3-10---加node_path(2025-3-18)#?
if TM.DBM.ok: res = {'执行结果': reslut}
TM.DBM.insetr_result(0, instr, reslut, 0, start_time, end_time, source_result, str_res = json.dumps(res,ensure_ascii=False) # 直接字符串组合也可以-待验证
ext_params, "独立命令执行") print(str_res)
else:
TM.logger.error("数据库连接失败!!")
elif test_type == 4: #修改Message elif test_type == 4: #修改Message
with open("attack_tree", "rb") as f: with open("attack_tree", "rb") as f:
TM.CCM.attack_tree = pickle.load(f) TM.CCM.attack_tree = pickle.load(f)
#创建一个新的节点 #创建一个新的节点
from mycode.AttackMap import TreeNode from mycode.AttackMap import TreeNode
testnode = TreeNode("test",0) testnode = TreeNode("test",0,0)
TM.CCM.LLM.build_initial_prompt(testnode)#新的Message TM.CCM.LLM.build_initial_prompt(testnode)#新的Message
systems = testnode.messages[0]["content"] systems = testnode.messages[0]["content"]
#print(systems) #print(systems)
@ -222,9 +238,76 @@ if __name__ == "__main__":
elif test_type ==5: #显示指令和结果list elif test_type ==5: #显示指令和结果list
with open("attack_tree", "rb") as f: with open("attack_tree", "rb") as f:
TM.CCM.attack_tree = pickle.load(f) TM.CCM.attack_tree = pickle.load(f)
nodes = TM.CCM.attack_tree.traverse_bfs() nodes = TM.CCM.attack_tree.traverse_dfs()
print(f"********\n{','.join(nodes[iput_index].instr_queue)}\n********") if iput_index == -1:
print(f"&&&&&&&&\n{','.join(nodes[iput_index].res_quere)}\n&&&&&&&&") for node in nodes:
print(f"----{node.path}-{node.status}----\n****instr_quere")
print(f"{','.join(node.instr_queue)}\n****res_quere")
try:
print(f"{','.join(node.res_quere)}")
except:
print(f"{json.dumps(node.res_quere)}")
elif iput_index == -2:#只输出有instruction的数据
index = 0
for node in nodes:
if node.instr_queue:
print(f"----{index}--{node.path}--{node.status}----")
print(f"{','.join(node.instr_queue)}")
index += 1
else:
print(f"********\n{','.join(nodes[iput_index].instr_queue)}\n********")
print(f"&&&&&&&&\n{','.join(nodes[iput_index].res_quere)}\n&&&&&&&&")
elif test_type == 6: #给指定节点添加测试指令
with open("attack_tree", "rb") as f:
TM.CCM.attack_tree = pickle.load(f)
nodes = TM.CCM.attack_tree.traverse_dfs()
str_instr = "nmap -sV -p- 192.168.204.137 -T4 -oN nmap_full_scan.txt"
index = 9
nodes[index].instr_queue.append(str_instr)
nodes[index].res_quere = []
with open("attack_tree", 'wb') as f:
pickle.dump(TM.CCM.attack_tree, f)
elif test_type == 7: #给指定节点修改指令的执行结果
with open("attack_tree", "rb") as f:
TM.CCM.attack_tree = pickle.load(f)
nodes = TM.CCM.attack_tree.traverse_dfs()
str_instr = "psql -h 192.168.204.137 -U postgres -c '\l'"
start_time = get_local_timestr() # 指令执行开始时间
bres, instr, reslut, source_result, ext_params = TM.InstrM.execute_instruction(str_instr)
end_time = get_local_timestr() # 指令执行结束时间
# 入数据库 -- bres True和False 都入数据库2025-3-10---加node_path(2025-3-18)#?
if TM.DBM.ok:
TM.DBM.insetr_result(0, instr, reslut, 0, start_time, end_time, source_result,
ext_params, "独立命令执行")
index = 9
nodes[index].res_quere.clear()
nodes[index].res_quere.append(reslut)
with open("attack_tree", 'wb') as f:
pickle.dump(TM.CCM.attack_tree, f)
elif test_type == 9: #处理自定义llm回复内容
with open("attack_tree", "rb") as f:
TM.CCM.attack_tree = pickle.load(f)
nodes = TM.CCM.attack_tree.traverse_dfs()
node = nodes[5]
strconent = '''
{'role': 'assistant', 'content': '{"action":"update_status", "node": "25端口", "status": "已完成", "vulnerability": {"name":"SMTP用户枚举漏洞","risk":"中危","info":"VRFY命令可验证有效用户"}}\n\n```bash-[目标系统->192.168.204.137->25端口]\nsmtp-user-enum -M VRFY -U /usr/share/wordlists/metasploit/unix_users.txt -t 192.168.204.137\n```\n\n```bash-[目标系统->192.168.204.137->25端口]\nnc -nv 192.168.204.137 25 << EOF\nEXPN root\nMAIL FROM: attacker@example.com\nRCPT TO: external@example.com\nDATA\nTest open relay\n.\nQUIT\nEOF\n```'}
'''
strjson = json.loads(strconent)
node_cmds,commands = TM.CCM.LLM.fetch_instruction(strjson["content"])
TM.CCM.tree_manager(node_cmds)
# 更新tree
bok, new_commands = TM.CCM.tree_manager(node_cmds, node, commands, TM.DBM)
# 分析指令入对应节点
if bok: # 节点指令若存在错误,测试指令都不处理,需要LLM重新生成
node_list = TM.CCM.instr_in_node(new_commands, node)
#报不保存待定--
with open("attack_tree", 'wb') as f:
pickle.dump(TM.CCM.attack_tree, f)
else: else:
#完整过程测试---要设定终止条件 #完整过程测试---要设定终止条件
pass pass

2
config.yaml

@ -14,7 +14,7 @@ mysql:
database: zfsafe database: zfsafe
#LLM-Type #LLM-Type
LLM_type: 1 #0-腾讯云,1-DS,2-GPT LLM_type: 2 #0-腾讯云,1-DS,2-2233ai,3-GPT
LLM_max_chain_count: 10 #为了避免推理链过长,造成推理效果变差,应该控制一个推理链的长度上限 LLM_max_chain_count: 10 #为了避免推理链过长,造成推理效果变差,应该控制一个推理链的长度上限
#用户初始密码 #用户初始密码

56
mycode/AttackMap.py

@ -1,4 +1,7 @@
import queue import queue
import copy
import re
#渗透测试树结构维护类 #渗透测试树结构维护类
class AttackTree: class AttackTree:
def __init__(self,root_node): def __init__(self,root_node):
@ -130,12 +133,17 @@ class AttackTree:
class TreeNode: class TreeNode:
def __init__(self, name,task_id,status="未完成", vul_type="未发现"): def __init__(self, name,task_id,status="未完成", vul_type="未发现"):
self.task_id = task_id #任务id
self.name = name # 节点名称 self.name = name # 节点名称
self.status = status # 节点状态 self.status = status # 节点状态
self.vul_type = vul_type # 漏洞类型 self.vul_type = vul_type # 漏洞类型
self.vul_name = ""
self.vul_grade = ""
self.vul_info = ""
self.children = [] # 子节点列表 self.children = [] # 子节点列表
self.parent = None # 父节点引用 self.parent = None # 父节点引用
self.path = "" #当前节点的路径 self.path = "" #当前节点的路径
self.bwork = True #当前节点是否工作,默认True
self.messages = [] # 针对当前节点积累的messages -- 针对不同节点提交不同的messages self.messages = [] # 针对当前节点积累的messages -- 针对不同节点提交不同的messages
self.llm_type = 0 #llm提交类型 0--初始状态无任务状态,1--指令结果反馈,2--llm错误反馈 self.llm_type = 0 #llm提交类型 0--初始状态无任务状态,1--指令结果反馈,2--llm错误反馈
@ -143,12 +151,58 @@ class TreeNode:
self.do_sn = 0 #针对该节点instr执行次数 self.do_sn = 0 #针对该节点instr执行次数
self.instr_queue = [] # queue.Queue() #针对当前节点的执行指令----重要约束:一个节点只能有一个线程在执行指令 self.instr_queue = [] # queue.Queue() #针对当前节点的执行指令----重要约束:一个节点只能有一个线程在执行指令
self.res_quere = [] # queue.Queue() #指令执行的结果,一批一批 self.res_quere = [] # queue.Queue() #指令执行的结果,一批一批
#用户补充信息
self.cookie = ""
self.ext_info = ""
#设置用户信息
def set_user_info(self,cookie,ext_info):
self.cookie = cookie
self.ext_info = ext_info
def copy_messages(self,childe_node):
'''
子节点继承父节点的messages目前规则保留上两层节点的message信息
:param childe_node:
:return:
'''
tmp_messages = copy.deepcopy(self.messages)
if not self.parent:
childe_node.messages = tmp_messages
else:
parent_path = self.parent.path
bfind = False
for msg in tmp_messages:
if msg["role"] == "system":
childe_node.messages.append(msg)
elif msg["role"] == "user":
if not bfind:
#获取user的node_path
content = msg["content"]
pattern = r"当前分支路径:(.+?)\n"
match = re.search(pattern, content)
if match:
path = match.group(1)
if parent_path in path:#当前节点的父节点路径在存入子节点messages
childe_node.messages.append(msg)
bfind = True #后续messages都保留
else:
print("提交的用户提示词结构有问题!")
else:
childe_node.messages.append(msg)
elif msg["role"] == "assistant":
if bfind:
childe_node.messages.append(msg)
else:
print("非法的信息体类型!")
def add_child(self, child_node): def add_child(self, child_node):
child_node.parent = self child_node.parent = self
child_node.path = self.path + f"->{child_node.name}" #子节点的路径赋值 child_node.path = self.path + f"->{child_node.name}" #子节点的路径赋值
child_node.messages = self.messages #传递messages #给什么时候的messages待验证#? #child_node.messages = copy.deepcopy(self.messages) #传递messages #给什么时候的messages待验证#?
self.copy_messages(child_node) #传递messages--只保留两层
self.children.append(child_node) self.children.append(child_node)
def add_instr(self,instr): def add_instr(self,instr):

169
mycode/ControlCenter.py

@ -40,7 +40,11 @@ class ControlCenter:
pass pass
def get_user_init_info(self): def get_user_init_info(self):
'''开始任务初,获取用户设定的基础信息''' '''开始任务初,获取用户设定的基础信息,初始信息可以分为两块:
1.提交llm的补充信息 保留在本地的信息如工具补充参数等cookie
2.用户可以设置全局和指定节点端口
3.补充测试节点
'''
# ?包括是否对目标进行初始化的信息收集 # ?包括是否对目标进行初始化的信息收集
return {"已知信息":""} return {"已知信息":""}
@ -82,10 +86,14 @@ class ControlCenter:
'''获取该节点的llm提交数据,会清空type和res_quere''' '''获取该节点的llm提交数据,会清空type和res_quere'''
llm_type = node.llm_type llm_type = node.llm_type
node.llm_type = 0 #回归0 node.llm_type = 0 #回归0
res_list = node.res_quere[:] #复制独立副本 res_list = node.res_quere[:] #浅拷贝,复制第一层
node.res_quere.clear() #清空待处理数据 node.res_quere.clear() #清空待处理数据,相当于把原应用关系接触
return llm_type,res_list return llm_type,res_list
def restore_one_llm_work(self,node,llm_type,res_list):
node.llm_type = llm_type
node.res_quere = res_list
#llm请求提交线程 #llm请求提交线程
def th_llm_worker(self):#LLM没有修改的全局变量,应该可以共用一个client def th_llm_worker(self):#LLM没有修改的全局变量,应该可以共用一个client
''' '''
@ -104,7 +112,7 @@ class ControlCenter:
self.get_llm_instruction(node,th_DBM) self.get_llm_instruction(node,th_DBM)
#释放锁 #释放锁
# 暂存状态--测试时使用 # 暂存状态--测试时使用--限制条件llm工作线程只能未1个
with open("attack_tree", 'wb') as f: with open("attack_tree", 'wb') as f:
pickle.dump(self.attack_tree, f) pickle.dump(self.attack_tree, f)
@ -137,8 +145,7 @@ class ControlCenter:
# 构造本次提交的prompt # 构造本次提交的prompt
ext_Prompt = f''' ext_Prompt = f'''
上一步结果{res_str} 上一步结果{res_str}
任务生成下一步渗透测试指令或判断是否完成该节点测试 任务生成下一步渗透测试指令或判断是否完成该节点测试
请确保生成的指令满足以下
''' '''
elif llm_type ==2: #llm返回的指令存在问题,需要再次请求返回 elif llm_type ==2: #llm返回的指令存在问题,需要再次请求返回
ext_Prompt = f''' ext_Prompt = f'''
@ -146,25 +153,17 @@ class ControlCenter:
错误信息{res_str} 错误信息{res_str}
任务请按格式要求重新生成该节点上一次返回中生成的所有指令 任务请按格式要求重新生成该节点上一次返回中生成的所有指令
''' '''
elif llm_type ==3: #已生成节点,但未生成测试指令 # '''
ext_Prompt = f''' # elif llm_type ==4: #未生成节点列表
反馈类型需要继续补充信息 # ext_Prompt = f'''
缺失信息{res_str} # 反馈类型:需要继续补充信息
任务 # 缺失信息:{res_str}
1.请生成这些节点的测试指令 # 任务:
2.这些节点的父节点为当前节点请正确生成这些节点的节点路径 # 1.请生成这些节点的新增节点指令,并生成对应的测试指令;
3.若还有节点未能生成测试指令必须返回未生成指令的节点列表 # 2.这些节点的父节点为当前节点,请正确生成这些节点的节点路径;
''' # 3.若节点未能全部新增,必须返回未新增的节点列表
elif llm_type ==4: #未生成节点列表 # 4.若有未生成指令的节点,必须返回未生成指令的节点列表。
ext_Prompt = f''' # '''
反馈类型需要继续补充信息
缺失信息{res_str}
任务
1.请生成这些节点的新增节点指令并生成对应的测试指令
2.这些节点的父节点为当前节点请正确生成这些节点的节点路径
3.若节点未能全部新增必须返回未新增的节点列表
4.若有未生成指令的节点必须返回未生成指令的节点列表
'''
elif llm_type ==5: elif llm_type ==5:
ext_Prompt = f''' ext_Prompt = f'''
反馈类型测试指令格式错误 反馈类型测试指令格式错误
@ -186,10 +185,10 @@ class ControlCenter:
2.LLM的回复开始反复时有点难判断 2.LLM的回复开始反复时有点难判断
''' '''
# 更新tree # 更新tree
bok = self.tree_manager(node_cmds, node,commands) bok,new_commands = self.tree_manager(node_cmds, node,commands,DBM)
# 分析指令入对应节点 # 分析指令入对应节点
if bok: #节点指令若存在错误,测试指令都不处理,需要LLM重新生成 if bok: #节点指令若存在错误,测试指令都不处理,需要LLM重新生成
node_list = self.instr_in_node(commands, node) node_list = self.instr_in_node(new_commands, node)
# 插入TM的node_queue中,交TM线程处理---除了LLM在不同的请求返回针对同一节点的测试指令,正常业务不会产生两次进队列 # 插入TM的node_queue中,交TM线程处理---除了LLM在不同的请求返回针对同一节点的测试指令,正常业务不会产生两次进队列
for node in node_list: for node in node_list:
self.TM.node_queue.put(node) self.TM.node_queue.put(node)
@ -229,49 +228,66 @@ class ControlCenter:
self.put_one_llm_work(strerror,node,2) self.put_one_llm_work(strerror,node,2)
return False return False
def tree_manager(self,node_cmds,node,commands): def tree_manager(self,node_cmds,node,commands,DBM):
'''更新渗透测试树 '''更新渗透测试树
node_cmds是json-list node_cmds是json-list
2025-03-22添加commands参数用于处理LLM对同一个节点返回了测试指令但还返回了no_instruction节点指令 2025-03-22添加commands参数用于处理LLM对同一个节点返回了测试指令但还返回了no_instruction节点指令
''' '''
if not node_cmds: # or len(node_cmds)==0: 正常not判断就可以有没有节点指令 if not node_cmds: # or len(node_cmds)==0: 正常not判断就可以有没有节点指令
return True return True,commands
#对节点指令进行校验 #对节点指令进行校验
if not self.verify_node_cmds(node_cmds,node): if not self.verify_node_cmds(node_cmds,node):
return False #节点指令存在问题,终止执行 return False,commands #节点指令存在问题,终止执行
#执行节点操作
#执行节点操作---先执行add_node,怕返回顺序不一直
residue_node_cmds = []
for node_json in node_cmds: for node_json in node_cmds:
action = node_json["action"] action = node_json["action"]
if action == "add_node": #新增节点 if action == "add_node": # 新增节点
parent_node_name = node_json["parent"] parent_node_name = node_json["parent"]
# 新增节点原则上应该都是当前节点增加子节点 # 新增节点原则上应该都是当前节点增加子节点
if node.name == parent_node_name: if node.name == parent_node_name or parent_node_name.endswith(node.name):
status = node_json["status"] status = node_json["status"]
node_names = node_json["nodes"].split(',') node_names = node_json["nodes"].split(',')
for node_name in node_names: for node_name in node_names:
#判重 # 判重---遇到过补充未生成指令的节点时,返回了新增这些节点的指令
bfind = False bfind = False
for node_child in node.children: for node_child in node.children:
if node_child.name == node_name: if node_child.name == node_name:
bfind = True bfind = True
break break
if not bfind: if not bfind:
#添加节点 # 添加节点
new_node = TreeNode(node_name,node.task_id,status) new_node = TreeNode(node_name, node.task_id, status)
node.add_child(new_node) #message的传递待验证 node.add_child(new_node) # message的传递待验证
else: else:
self.logger.error(f"添加子节点时,遇到父节点名称不一致的,需要介入!!{node_json}") #丢弃该节点 self.logger.error(f"添加子节点时,遇到父节点名称不一致的,需要介入!!{node_json}") # 丢弃该节点
elif action == "update_status": else:#其他指令添加到list
residue_node_cmds.append(node_json)
#执行剩余的节点指令--不分先后
for node_json in residue_node_cmds:
action = node_json["action"]
if action == "update_status":
node_name = node_json["node"] node_name = node_json["node"]
status = node_json["status"] status = node_json["status"]
vul_type = "未发现" vul_type = "未发现"
if "vulnerability" in node_json:
vul_type = json.dumps(node_json["vulnerability"])
if node.name == node_name: if node.name == node_name:
node.status = status node.status = status
if "vulnerability" in node_json:
#{\"name\":\"漏洞名称\",\"risk\":\"风险等级(低危/中危/高危)\",\"info\":\"补充信息(没有可为空)\"}};
vul_type = json.dumps(node_json["vulnerability"],ensure_ascii=False) #json转字符串
try:
node.name = node_json["vulnerability"]["name"]
node.vul_grade = node_json["vulnerability"]["risk"]
node.vul_info = node_json["vulnerability"]["info"]
except:
self.logger.error("漏洞信息错误")
node.vul_type = vul_type node.vul_type = vul_type
else: else:
self.logger.error(f"遇到不是修改本节点状态的,需要介入!!{node_json}") str_user = f"遇到不是修改本节点状态的,需要介入!!{node_json}"
self.logger.error(str_user)
self.need_user_know(str_user,node)
elif action == "no_instruction": elif action == "no_instruction":
#返回的未生成指令的数据进行校验:1.要有数据;2.节点不是当前节点就是子节点 #返回的未生成指令的数据进行校验:1.要有数据;2.节点不是当前节点就是子节点
nodes = [] nodes = []
@ -285,29 +301,68 @@ class ControlCenter:
break break
if bcommand: #如果存在测试指令,则不把该节点放入补充信息llm任务 if bcommand: #如果存在测试指令,则不把该节点放入补充信息llm任务
continue continue
#验证对应节点是否已经创建---本节点或子节点,其他节点不处理(更狠一点就是本节点都不行)
if node_name == node.name: if node_name == node.name:
nodes.append(node_name) nodes.append(node_name)
# str_add = "请生成测试指令"
# self.put_one_llm_work(str_add,node,1)
else: else:
for child_node in node.children: for child_node in node.children:
if child_node.name == node_name: if child_node.name == node_name:
nodes.append(node_name) nodes.append(node_name)
# str_add = "无"
# self.put_one_llm_work(str_add, child_node, 1)
break break
if nodes: #找到对应的节点才返回 if nodes: #阻塞式,在当前节点提交补充信息,完善节点指令 -- 优势是省token
str_nodes = ",".join(nodes) new_commands = self.get_other_instruction(nodes,DBM,node)
str_add = {"已新增但未生成测试指令的节点":str_nodes} commands.extend(new_commands)
# 提交一个错误反馈任务--但继续后续工作 elif action == "no_create": #提交人工确认
self.put_one_llm_work(str_add, node, 3)
self.logger.debug(f"已新增但未生成指令的节点有:{nodes}")
elif action == "no_create":
nodes = node_json["nodes"] nodes = node_json["nodes"]
if nodes: if nodes:
str_add = {"未新增的节点": nodes} str_add = {"未新增的节点": nodes}
# 提交一个错误反馈任务--但继续后续工作 self.logger.debug(str_add)
self.put_one_llm_work(str_add, node, 4) # 提交一个继续反馈任务--继续后续工作 2025-3-25不自动处理
self.logger.debug(f"未新增的节点有:{nodes}") # self.put_one_llm_work(str_add, node, 4)
# self.logger.debug(f"未新增的节点有:{nodes}")
else: else:
self.logger.error("****不应该执行到这!程序逻辑存在问题!") self.logger.error("****不应该执行到这!程序逻辑存在问题!")
return True return True,commands
#阻塞轮询补充指令
def get_other_instruction(self,nodes,DBM,cur_node):
res_str = ','.join(nodes)
new_commands = []
while res_str:
self.logger.debug(f"开始针对f{res_str}这些节点请求测试指令")
user_Prompt = f'''
当前分支路径{cur_node.path}
当前节点信息
- 节点名称{cur_node.name}
- 节点状态{cur_node.status}
- 漏洞类型{cur_node.vul_type}
反馈类型需要补充信息
缺失信息针对{res_str}的测试指令
任务
1.请生成这些节点的测试指令
2.这些节点的父节点为当前节点请正确生成这些节点的节点路径
3.若还有节点未能生成测试指令必须返回未生成指令的节点列表
'''
res_str = ""
node_cmds, commands = self.LLM.get_llm_instruction(user_Prompt, DBM, cur_node) # message要更新
#把返回的测试指令进行追加
new_commands.extend(commands)
#判断是否还有未添加指令的节点
for node_json in node_cmds: #正常应该只有一条no_instruction
if "no_instruction" in node_json and "nodes" in node_json:
tmp_nodes = []
node_names = node_json["nodes"].split(',')
for node_name in node_names:
if node_name in nodes:
tmp_nodes.append(node_name)
res_str = ','.join(tmp_nodes)
break
self.logger.debug("为添加指令的节点,都已完成指令的添加!")
return new_commands
def instr_in_node(self,commands,node): def instr_in_node(self,commands,node):
node_list = [] #一次返回的测试指令 node_list = [] #一次返回的测试指令
@ -334,6 +389,10 @@ class ControlCenter:
# 3.独立队列处理 # 3.独立队列处理
return node_list return node_list
#需要用户确认的信息--待完善
def need_user_know(self,strinfo,node):
pass
#待修改 #待修改
def is_user_instr(self,instr): def is_user_instr(self,instr):
''' '''
@ -379,4 +438,6 @@ class ControlCenter:
#停止llm处理线程 #停止llm处理线程
self.brun =False self.brun =False
for th in self.llmth_list: for th in self.llmth_list:
th.jion() th.jion()

143
mycode/LLMManager.py

@ -7,12 +7,14 @@ import openai
import json import json
import threading import threading
import re import re
import os
from openai import OpenAI from openai import OpenAI
from mycode.DBManager import DBManager
from myutils.MyTime import get_local_timestr from myutils.MyTime import get_local_timestr
from myutils.MyLogger_logger import LogHandler from myutils.MyLogger_logger import LogHandler
class LLMManager: class LLMManager:
def __init__(self,illm_type=0): def __init__(self,illm_type=3):
self.logger = LogHandler().get_logger("LLMManager") self.logger = LogHandler().get_logger("LLMManager")
self.api_key = None self.api_key = None
self.api_url = None self.api_url = None
@ -30,11 +32,24 @@ class LLMManager:
self.model = "deepseek-reasoner" #model=deepseek-reasoner -- R1 model=deepseek-chat --V3 self.model = "deepseek-reasoner" #model=deepseek-reasoner -- R1 model=deepseek-chat --V3
# 创建会话对象 -- 一个任务的LLM必须唯一 # 创建会话对象 -- 一个任务的LLM必须唯一
self.client = OpenAI(api_key=self.api_key, base_url=self.api_url) self.client = OpenAI(api_key=self.api_key, base_url=self.api_url)
elif illm_type == 2: #2233.ai
self.api_key = "sk-J3562ad9aece8fd2855bb495bfa1a852a4e8de8a2a1IOchD"
self.api_url = "https://api.gptsapi.net/v1"
self.model = "o3-mini-2025-01-31"
self.client = OpenAI(api_key=self.api_key,base_url=self.api_url)
elif illm_type ==3: #GPT elif illm_type ==3: #GPT
self.api_key ="" # 定义代理服务器地址
self.api_url = "" proxy_url = "http://192.168.3.102:3128"
self.model = "" os.environ["HTTP_PROXY"] = proxy_url
self.client = OpenAI() os.environ["HTTPS_PROXY"] = proxy_url
self.api_key ="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"
self.api_url = "https://api.openai.com/v1"
self.model = "o3-mini-2025-01-31"
openai.proxy = proxy_url
openai.api_key = self.api_key
#self.client = openai
self.client = OpenAI(api_key=self.api_key,base_url=self.api_url)
#******测试使用,设置slef.message的值 #******测试使用,设置slef.message的值
def test_old_message(self,strMessage): def test_old_message(self,strMessage):
@ -45,62 +60,55 @@ class LLMManager:
except Exception as e: except Exception as e:
print(f"错误: {str(e)}") print(f"错误: {str(e)}")
'''
**决策原则**
- 根据节点类型和状态优先执行基础测试如端口扫描服务扫描
- 仅在发现新信息或漏洞时新增子节点
- 确保每个新增节点匹配测试指令
'''
# 初始化messages # 初始化messages
def build_initial_prompt(self,node): def build_initial_prompt(self,node,str_ip=""):
if not node: if not node:
return return
'''暂时未添加到提示词
**核心要求**
- 每次新增节点时必须在同一响应中为该节点提供测试指令
- 分批新增节点每次响应中新增节点不超过3个确保指令完整
'''
#根节点初始化message #根节点初始化message
node.messages = [{"role": "system", node.messages = [{"role": "system",
"content":''' "content":'''
你是一位渗透测试专家基于你的专业知识和经验负责动态控制整个渗透测试过程根据当前测试状态和返回结果决定下一步测试指令推动测试前进直至完成渗透测试本地程序会执行你的指令并返回结果 你是一位渗透测试专家基于你的专业知识和经验负责动态控制整个渗透测试过程根据当前测试状态和返回结果决定下一步测试指令推动测试前进直至完成渗透测试本地程序会执行你的指令并返回结果
**总体要求** **总体要求**
1.以目标系统IP为根节点每个渗透测试点如端口服务漏洞点作为子节点形成树型结构测试树 1.以目标系统IP为根节点每个渗透测试点如端口服务漏洞点作为子节点形成树型结构测试树
2.测试树整体数据由本地程序存储你只需要关注当前节点的测试推进状态更新(未完成/已完成)及完整新增子节点 2.测试树整体数据由本地程序存储你只需要关注当前节点的测试推进状态更新(未完成/已完成)及完整新增子节点
3.返回两类指令节点指令和测试指令以空行间隔不要包含注释和说明 3.返回两类指令节点指令和测试指令以空行间隔不要包含注释和说明
4.若一次性新增的节点过多无法为每个节点都匹配测试指令请优先保障新增节点的完整性若有未生成指令的节点必须返回已新增但未生成指令的节点列表如果节点无法全部新增必须返回未新增的节点列表 4.若一次性新增的节点过多无法为每个节点都匹配测试指令请优先保障新增中高危节点的完整性若有未生成测试指令的节点必须返回未生成指令节点列表
5.若无节点修改,新增,未生成指令等数据节点指令可以为空但测试指令必须对应已有节点 5.若无需要处理的节点数据节点指令可以不生成但测试指令必须对应已有节点
**决策流程** **决策流程**
1. 若当前节点是IP且未进行端口扫描则执行端口扫描 1. 若当前节点是IP且未进行端口扫描对当前节点执行端口扫描
2. 若端口扫描发现开放端口对可能存在中高危以上风险的端口新增节点并提供测试指令 2. 若端口扫描发现开放端口对可能存在中高危以上风险的端口新增节点并提供测试指令
3. 若当前节点是端口且未进行服务扫描则执行服务扫描 3. 若当前节点是端口且未进行服务扫描则执行服务扫描
4. 若服务扫描发现服务版本或漏洞则新增漏洞测试节点并提供测试指令 4. 若服务扫描发现服务版本或漏洞则新增漏洞测试节点并提供测试指令
5. 若漏洞验证成功则根据结果决定是否需要进一步测试若需要进一步测试则新增子节点并提供测试指令 5. 若漏洞验证成功则根据结果决定是否需要进一步测试若需要进一步测试则为测试内容新增子节点并提供测试指令
6. 若节点测试无新信息和测试指令则更新状态为已完成 6. 当当前节点执行完成所有可能的测试指令更新状态为已完成
**测试指令生成要求** **测试指令生成准则**
1.明确每个测试指令的测试目标并优先尝试最简单最直接的办法 1.明确每个测试指令的测试目标并优先尝试最简单最直接的办法不要在同一个请求生成测试效果覆盖的指令
2.对于复杂的测试点使用递进逻辑组织指令先尝试基础测试方法根据执行结果决定是否进行更深入的测试 2.使用递进逻辑组织指令先尝试基础测试方法根据执行结果决定是否进行更深入的测试;
3.本地的IP地址为192.168.204.135
**节点指令格式** **节点指令格式**
- 新增节点{\"action\":\"add_node\", \"parent\": \"父节点\", \"nodes\": \"子节点1,子节点2\", \"status\": \"未完成\"}; - 新增节点{\"action\":\"add_node\", \"parent\": \"父节点\", \"nodes\": \"节点1,节点2\", \"status\": \"未完成\"};
- 已新增但未生成指令的节点列表{\"action\": \"no_instruction\", \"nodes\": \"3306端口,1000端口\"}; - 未生成指令节点列表{\"action\": \"no_instruction\", \"nodes\": \"节点1,节点2\"};
- 未新增的节点列表{\"action\": \"no_create\", \"nodes\": \"8080端口,8081端口,9000端口\"}; - 完成测试未发现漏洞{\"action\": \"update_status\", \"node\": \"节点\", \"status\": \"已完成\"};
- 节点完成测试未发现漏洞{\"action\": \"update_status\", \"node\": \"21端口\", \"status\": \"已完成\"}; - 完成测试且发现漏洞{\"action\": \"update_status\", \"node\": \"节点\", \"status\": \"已完成\"\"vulnerability\": {\"name\":\"漏洞名称\",\"risk\":\"风险等级(低危/中危/高危)\",\"info\":\"补充信息(没有可为空)\"}};
- 节点完成测试发现漏洞{\"action\": \"update_status\", \"node\": \"21端口\", \"status\": \"已完成\"\"vulnerability\": {\"name\":\"ftp匿名登录\",\"risk\":\"\"}};
**测试指令格式** **测试指令格式**
- shell指令```bash-[节点路径](.*?)```包裹需要避免用户交互 - shell指令```bash-[节点路径](.*?)```包裹需要避免用户交互,若涉及到多步指令请生成python代码
- python指令```python-[节点路径](.*?)```包裹主函数名为dynamic_fun需包含错误处理执行结束后必须返回一个tuple (status, output)其中status为'success''failure'output为补充输出信息 - python指令```python-[节点路径](.*?)```包裹主函数名为dynamic_fun需包含错误处理必须返回一个tuple(status, output)
- [节点路径]为从根节点到目标节点的完整层级描述 - [节点路径]为从根节点到目标节点的完整层级描述
**核心要求**
- 优先保障新增中高危测试节点的完整性
- 指令之间必须要有一个空行
**响应示例** **响应示例**
{\"action\":\"add_node\", \"parent\": \"192.168.1.100\", \"node\": \"3306端口\", \"status\": \"未完成\"} {\"action\":\"add_node\", \"parent\": \"192.168.1.100\", \"nodes\": \"3306端口,22端口\", \"status\": \"未完成\"}
```bash-[目标系统->192.168.1.100->3306端口] ```bash-[目标系统->192.168.1.100->3306端口]
mysql -u root -p 192.168.1.100 mysql -u root -p 192.168.1.100
``` ```
{\"action\":\"add_node\", \"parent\": \"192.168.1.100\", \"node\": \"22端口\", \"status\": \"未完成\"}
```python-[目标系统->192.168.1.100->22端口]
def dynamic_fun():
try:
result = "扫描完成"
return ("success", result)
except Exception as e:
return ("failure", str(e))
```
'''}] # 一个messages '''}] # 一个messages
def init_data(self,task_id=0): def init_data(self,task_id=0):
@ -122,11 +130,12 @@ def dynamic_fun():
#提交LLM #提交LLM
post_time = get_local_timestr() post_time = get_local_timestr()
response = self.client.chat.completions.create( response = self.client.chat.completions.create(
model=self.model, model=self.model,
reasoning_effort="high",
messages = node.messages messages = node.messages
) )
#LLM返回结果处理 #LLM返回结果处理
reasoning_content = "" reasoning_content = ""
content = "" content = ""
@ -143,6 +152,12 @@ def dynamic_fun():
content = response.choices[0].message content = response.choices[0].message
# 记录llm历史信息 # 记录llm历史信息
node.messages.append(content) node.messages.append(content)
elif self.model == "o3-mini-2025-01-31":
reasoning_content = "" #gpt不返回推理内容
content = response.choices[0].message.content
print(content)
# 记录llm历史信息
node.messages.append({'role': 'assistant', 'content': content})
else: else:
self.logger.error("处理到未预设的模型!") self.logger.error("处理到未预设的模型!")
return None return None
@ -202,37 +217,33 @@ def dynamic_fun():
commands.append(shell_blocks[shell_index]) commands.append(shell_blocks[shell_index])
shell_index +=1 shell_index +=1
else:#其他的认为是节点操作指令--指令格式还存在不确定性,需要正则匹配,要求是JSON else:#其他的认为是节点操作指令--指令格式还存在不确定性,需要正则匹配,要求是JSON
pattern = re.compile(r'\{.*\}', re.DOTALL) #贪婪模式会匹配到最后一个},能使用嵌套的JSON pattern = re.compile(r'\{(?:[^{}]|\{[^{}]*\})*\}')
# 遍历所有匹配到的 JSON 结构 # 遍历所有匹配到的 JSON 结构
strlines = part.strip('\n') #按行拆分,避免贪婪模式下,匹配到多行的最后一个}
for strline in strlines: # strlines = part.strip('\n') #按行拆分,避免贪婪模式下,匹配到多行的最后一个}
for match in pattern.findall(strline): #正常只能有一个 # for strline in strlines:
try: for match in pattern.findall(part): #正常只能有一个
node_cmds.append(json.loads(match)) # 解析 JSON 并添加到列表 try:
except json.JSONDecodeError as e:#解析不了的不入队列 node_cmds.append(json.loads(match)) # 解析 JSON 并添加到列表
self.logger.error(f"LLM-{part}-JSON 解析错误: {e}") #这是需不需要人为介入? except json.JSONDecodeError as e:#解析不了的不入队列
self.logger.error(f"LLM-{part}-JSON 解析错误: {e}") #这是需不需要人为介入?
return node_cmds,commands return node_cmds,commands
def test_llm(self): def test_llm(self):
with open("../test", "r", encoding="utf-8") as f: messages = [
messages = json.load(f) {"role": "system", "content": "You are a helpful assistant."},
text = messages[-1]["content"] {"role": "user", "content": "讲个笑话吧。"}
list = self.fetch_instruction(text) ]
for itme in list: response = self.client.chat.completions.create(
print("***********") model=self.model,
print(itme) reasoning_effort="medium",
messages=messages
)
print(response)
if __name__ == "__main__": if __name__ == "__main__":
# LM = LLMManager(1) llm = LLMManager(3)
# LM.test_llm() llm.test_llm()
tlist1 = []
tlist2 = []
tlist2.append(1)
if not tlist1:
print("list1空")
if not tlist2:
print("list2空")
if tlist2:
print("list2不为空")

90
mycode/Result_merge.py

@ -0,0 +1,90 @@
#结果合并功能函数模块
import re
def my_merge(fun_name,result):
if fun_name == "enum4linux":
result = enum4linux_merge(result)
else:
pass
return result
#--------------------enum4linux--------------------
def enum4linux_merge(result):
print("enum4linux")
# 1.用户列表(用于密码爆破)
users = extract_users(result)
# 2. 共享目录(用于未授权访问/文件泄露)
shares = extract_shares(result)
# 3. 密码策略(指导爆破规则)
policy = extract_password_policy(result)
# 4. 操作系统信息(用于漏洞匹配)
os_info = extract_os_info(result)
# 整合输出
result = f"Users:{users}\nShares:{shares}\nPolicy:{policy}\nOS Info:{os_info}\n"
print(result)
return result
def extract_users(data):
"""提取所有用户列表(含 RID)"""
users = {}
pattern = re.compile(r"user:\[(.*?)\] rid:\[(0x[a-fA-F0-9]+)\]")
matches = pattern.findall(data)
for user, rid in matches:
users[user] = rid
return users
def extract_shares(data):
"""提取共享目录并清理 ANSI 转义码"""
shares = []
share_block = re.search(r"Share Enumeration.*?=\n(.*?)\n\n", data, re.DOTALL)
if share_block:
lines = share_block.group(1).split('\n')
for line in lines:
# 清理 ANSI 转义码(如 \x1b[35m)
line_clean = re.sub(r'\x1b\[[0-9;]*m', '', line)
if 'Disk' in line_clean or 'IPC' in line_clean:
parts = list(filter(None, line_clean.split()))
if len(parts) >= 3:
share = {
"name": parts[0],
"type": parts[1],
"access": "Unknown"
}
# 提取清理后的访问权限
access_line = re.search(rf"//.*{re.escape(parts[0])}.*Mapping: (.*?) ", data)
if access_line:
access_clean = re.sub(r'\x1b\[[0-9;]*m', '', access_line.group(1))
share["access"] = access_clean
shares.append(share)
return shares
def extract_password_policy(data):
"""提取密码策略"""
policy = {}
policy_block = re.search(r"Password Policy Information.*?=\n(.*?)\n\n", data, re.DOTALL)
if not policy_block:
return policy
policy_text = policy_block.group(1)
# 提取最小密码长度(处理未匹配情况)
min_length_match = re.search(r"Minimum password length: (\d+)", policy_text)
policy["min_length"] = min_length_match.group(1) if min_length_match else "未知"
# 提取密码复杂性要求
complexity_match = re.search(r"Password Complexity: (Enabled|Disabled)", policy_text)
policy["complexity"] = complexity_match.group(1) if complexity_match else "未知"
# 提取账户锁定阈值
lockout_match = re.search(r"Account Lockout Threshold: (\d+|None)", policy_text)
policy["lockout_threshold"] = lockout_match.group(1) if lockout_match else "未知"
return policy
def extract_os_info(data):
"""提取操作系统信息"""
os_info = {}
match = re.search(r"server \(([^)]+)\)", data)
if match:
os_info["samba_version"] = match.group(1)
return os_info
#------------------------------------------------

1
payload/test.txt

@ -0,0 +1 @@
测试文件

7
pipfile

@ -14,5 +14,10 @@ pip install loguru -i https://pypi.tuna.tsinghua.edu.cn/simple/
pip install paramiko -i https://pypi.tuna.tsinghua.edu.cn/simple/ pip install paramiko -i https://pypi.tuna.tsinghua.edu.cn/simple/
pip install impacket -i https://pypi.tuna.tsinghua.edu.cn/simple/ pip install impacket -i https://pypi.tuna.tsinghua.edu.cn/simple/
sudo apt-get install libpq-dev python3-dev
pip install psycopg2 -i https://pypi.tuna.tsinghua.edu.cn/simple/
cd /usr/share/wordlists/ cd /usr/share/wordlists/
gzip -d rockyou.txt.gz gzip -d rockyou.txt.gz
#searchsploit -u 更新漏洞信息

45
test.py

@ -3,6 +3,10 @@ import subprocess
import tempfile import tempfile
import os import os
import pexpect import pexpect
import struct
import sys
import mysql.connector
import requests
def do_worker(str_instruction): def do_worker(str_instruction):
@ -51,22 +55,29 @@ def do_worker_ftp_script(str_instruction):
os.remove(output_file) os.remove(output_file)
return output return output
import socket
def dynamic_fun():
try:
host = "192.168.204.137"
port = 8009
# 尝试建立连接
sock = socket.create_connection((host, port), timeout=15)
# 发送一个基础的AJP协议探测包(仅用于检测响应)
payload = b'\x12\x34\x00\x02' # 示例数据包
sock.sendall(payload)
response = sock.recv(1024)
sock.close()
if response:
return (1, "收到响应,可能存在CVE-2020-1938漏洞风险,请进一步人工验证")
else:
return (0, "无响应,暂未检测到漏洞")
except Exception as e:
return (0, "连接失败或错误: " + str(e))
if __name__ == "__main__": if __name__ == "__main__":
# 示例使用 # 示例使用
str_instruction = """ bok,res = dynamic_fun()
ftp -n 192.168.204.137 << EOF print(bok,res)
user anonymous anonymous@example.com
ls
bye
EOF
"""
output = do_worker(str_instruction)
print(f"*****\n{output}\n*****")
output = do_worker_ftp_script(str_instruction)
lines = output.splitlines()
# 跳过第一行(Script started)和最后一行(Script done)
ftp_output = lines[1:-1]
strout = '\n'.join(ftp_output)
print("111111111")
print(strout)

159
tools/CurlTool.py

@ -66,33 +66,6 @@ class CurlTool(ToolBase):
parts.append('-i') parts.append('-i')
return ' '.join(parts),timeout return ' '.join(parts),timeout
# def execute_instruction(self, instruction_old):
# '''
# 执行指令:验证合法性 -> 执行 -> 分析结果
# :param instruction_old:
# :return:
# bool:true-正常返回给大模型,false-结果不返回给大模型
# str:执行的指令
# str:执行指令的结果
# '''
#
# # 第一步:验证指令合法性
# instruction = self.validate_instruction(instruction_old)
# if not instruction:
# return False,instruction_old,"该指令暂不执行!"
#
# # 第二步:执行指令 --- 基于request使用
# #print(f"执行指令:{instruction}")
# output = ""
#
# # 第三步:分析执行结果
# analysis = self.analyze_result(output,instruction)
# #指令和结果入数据库
# #?
# if not analysis: #analysis为“” 不提交LLM
# return False,instruction,analysis
# return True,instruction, analysis
def get_ssl_info(self,stderr,stdout): def get_ssl_info(self,stderr,stdout):
# -------------------------- # --------------------------
# 解释信息的安全意义: # 解释信息的安全意义:
@ -148,42 +121,23 @@ class CurlTool(ToolBase):
result = f"HTTP 状态行:{http_status},Content-Type:{content_type},HTML Title:{html_title},TLS 连接信息:{tls_info},证书 Common Name:{cert_cn},证书 Issuer:{issuer_info}" result = f"HTTP 状态行:{http_status},Content-Type:{content_type},HTML Title:{html_title},TLS 连接信息:{tls_info},证书 Common Name:{cert_cn},证书 Issuer:{issuer_info}"
return result return result
def get_info_xpost(self,stdout,stderr): def get_info_curl(self,instruction,stdout,stderr):
"""
subprocess.run 执行 curl 后的结果中提取关键信息
- HTTP 状态码
- 常见响应头Content-Type, Content-Length
- HTML 页面标题如果内容为 HTML
- 返回正文的前200字符body_snippet
- TLS/证书相关信息从详细调试信息 stderr 中提取
对于未匹配到的信息返回Not found或空字符串
"""
info = {} info = {}
# 处理 stdout: 拆分响应头与正文(假设用空行分隔) # 处理 stdout: 拆分响应头与正文(假设用空行分隔)
parts = re.split(r'\r?\n\r?\n', stdout, maxsplit=1) parts = re.split(r'\r?\n\r?\n', stdout, maxsplit=1)
#***************解析方式一
# headers_str = parts[0] if parts else ""
# body = parts[1] if len(parts) > 1 else ""
#
# # 提取 HTTP 状态码(从响应头第一行中获取,例如 "HTTP/1.1 202 OK")
# header_lines = headers_str.splitlines()
# ***************解析方式二
if len(parts) == 2: if len(parts) == 2:
headers_str, body = parts headers_str, body = parts
else: else:
# 如果没有拆分成功,可能 stdout 中只有正文,则从 stderr 尝试提取 HTTP 状态行 # 如果没有拆分成功,可能 stdout 中只有正文,则从 stderr 尝试提取 HTTP 状态行
headers_str = "" headers_str = ""
body = stdout body = stdout
# 如果没有在 stdout 中找到头信息,则尝试从 stderr 中提取(部分信息可能在 stderr 中)
# 如果没有在 stdout 中找到头信息,则尝试从 stderr 中提取(部分信息可能在 stderr 中)
if not headers_str: if not headers_str:
header_lines = stderr.splitlines() header_lines = stderr.splitlines()
else: else:
header_lines = headers_str.splitlines() header_lines = headers_str.splitlines()
#**************************
#status_code
if header_lines: if header_lines:
status_line = header_lines[0] status_line = header_lines[0]
status_match = re.search(r'HTTP/\d+\.\d+\s+(\d+)', status_line) status_match = re.search(r'HTTP/\d+\.\d+\s+(\d+)', status_line)
@ -191,20 +145,24 @@ class CurlTool(ToolBase):
else: else:
info['status_code'] = "No headers found" info['status_code'] = "No headers found"
# 提取常见响应头 #Server
m = re.search(r'^Server:\s*(.+)$', headers_str, re.MULTILINE)
if m:
info["server"] = m.group(1).strip()
#content-type,content-length
content_type = "Not found" content_type = "Not found"
content_length = "Not found" content_length = "Not found"
for line in header_lines: for line in header_lines:
if line.lower().startswith("content-type:"): if line.lower().startswith("content-type:"):
info['content_type'] = line.split(":", 1)[1].strip() info['content-type'] = line.split(":", 1)[1].strip()
elif line.lower().startswith("content-length:"): elif line.lower().startswith("content-length:"):
info['content_length'] = line.split(":", 1)[1].strip() info['content-length'] = line.split(":", 1)[1].strip()
# 如果未匹配到,则设置默认值 info.setdefault('content-type', "Not found")
info.setdefault('content_type', "Not found") info.setdefault('content-length', "Not found")
info.setdefault('content_length', "Not found")
# 如果内容为 HTML,则使用 BeautifulSoup 提取 <title> 标签内容 # 如果内容为 HTML,则使用 BeautifulSoup 提取 <title> 标签内容
if "html" in info['content_type'].lower(): if "html" in info['content-type'].lower():
try: try:
soup = BeautifulSoup(body, "html.parser") soup = BeautifulSoup(body, "html.parser")
if soup.title and soup.title.string: if soup.title and soup.title.string:
@ -216,25 +174,76 @@ class CurlTool(ToolBase):
else: else:
info['html_title'] = "N/A" info['html_title'] = "N/A"
# 保存部分正文内容,便于后续分析 #------------正文部分解析------------
info['body_snippet'] = body[:200] # 前500字符 if "phpinfo.php" in instruction:
info["configurations"] = {}
info["sensitive_info"] = {}
# 提取PHP版本信息,可以尝试从phpinfo表格中提取
m = re.search(r'PHP Version\s*</th>\s*<td[^>]*>\s*([\d.]+)\s*</td>', body, re.IGNORECASE)
if m:
info["php_version"] = m.group(1).strip()
else:
# 备用方案:在页面中查找 "PHP Version" 后面的数字
m = re.search(r'PHP\s*Version\s*([\d.]+)', body, re.IGNORECASE)
if m:
info["php_version"] = m.group(1).strip()
# 提取配置信息(如allow_url_include, display_errors, file_uploads, open_basedir)
configs = ["allow_url_include", "display_errors", "file_uploads", "open_basedir"]
for key in configs:
# 尝试匹配HTML表格形式:<td>key</td><td>value</td>
regex = re.compile(r'<td[^>]*>\s*' + re.escape(key) + r'\s*</td>\s*<td[^>]*>\s*([^<]+?)\s*</td>',
re.IGNORECASE)
m = regex.search(body)
if m:
info["configurations"][key] = m.group(1).strip()
# 处理 stderr 中的 TLS/证书信息:只提取包含关键字的行 # 提取敏感信息,这里以MYSQL_PASSWORD为例
tls_info_lines = [] sensitive_keys = ["MYSQL_PASSWORD"]
cert_info_lines = [] for key in sensitive_keys:
for line in stderr.splitlines(): regex = re.compile(r'<td[^>]*>\s*' + re.escape(key) + r'\s*</td>\s*<td[^>]*>\s*([^<]+?)\s*</td>',
# 过滤与 TLS/SSL 握手、证书相关的信息 re.IGNORECASE)
if "SSL connection using" in line or "TLS" in line: m = regex.search(body)
tls_info_lines.append(line.strip()) if m:
if "certificate" in line.lower(): info["sensitive_info"][key] = m.group(1).strip()
cert_info_lines.append(line.strip()) elif "phpMyAdmin" in instruction:
info['tls_info'] = tls_info_lines if tls_info_lines else "Not found" info["security_info"] = {}
info['certificate_info'] = cert_info_lines if cert_info_lines else "Not found" info["login_info"] = {}
# 查找登录表单中用户名、密码字段(例如 name="pma_username" 和 name="pma_password")
m = re.search(r'<input[^>]+name=["\'](pma_username)["\']', body, re.IGNORECASE)
if m:
info["login_info"]["username_field"] = m.group(1).strip()
m = re.search(r'<input[^>]+name=["\'](pma_password)["\']', body, re.IGNORECASE)
if m:
info["login_info"]["password_field"] = m.group(1).strip()
#安全信息
# csrf_protection:尝试查找隐藏域中是否存在 csrf token(例如 name="csrf_token" 或 "token")
m = re.search(r'<input[^>]+name=["\'](csrf_token|token)["\']', stdout, re.IGNORECASE)
info["security_info"]["csrf_protection"] = True if m else False
# httponly_cookie:从响应头中查找 Set-Cookie 行中是否包含 HttpOnly
m = re.search(r'Set-Cookie:.*HttpOnly', stdout, re.IGNORECASE)
info["security_info"]["httponly_cookie"] = True if m else False
# secure_cookie:从响应头中查找 Set-Cookie 行中是否包含 Secure
m = re.search(r'Set-Cookie:.*Secure', stdout, re.IGNORECASE)
info["security_info"]["secure_cookie"] = True if m else False
else: #
#info['body_snippet'] = body[:200] # 前500字符
if stderr:
# 处理 stderr 中的 TLS/证书信息:只提取包含关键字的行
tls_info_lines = []
cert_info_lines = []
for line in stderr.splitlines():
# 过滤与 TLS/SSL 握手、证书相关的信息
if "SSL connection using" in line or "TLS" in line:
tls_info_lines.append(line.strip())
if "certificate" in line.lower():
cert_info_lines.append(line.strip())
info['tls_info'] = tls_info_lines if tls_info_lines else "Not found"
info['certificate_info'] = cert_info_lines if cert_info_lines else "Not found"
# 可选:保留完整的 verbose 信息以便后续分析
#info['verbose'] = stderr
#转换成字符串 #转换成字符串
result = json.dumps(info,ensure_ascii=False) result = json.dumps(info,ensure_ascii=False)
#print(result)
return result return result
def analyze_result(self, result,instruction,stderr,stdout): def analyze_result(self, result,instruction,stderr,stdout):
@ -268,11 +277,11 @@ class CurlTool(ToolBase):
elif("-kv https://" in instruction or "-vk https://" in instruction): elif("-kv https://" in instruction or "-vk https://" in instruction):
result = self.get_ssl_info(stderr,stdout) result = self.get_ssl_info(stderr,stdout)
elif("-X POST " in instruction): elif("-X POST " in instruction):
result = self.get_info_xpost(stdout,stderr) result = self.get_info_curl(instruction,stdout,stderr)
elif("-v " in instruction): #curl -v http://192.168.204.137:8180/manager/html --user admin:admin 常规解析curl返回内容 elif("-v " in instruction): #curl -v http://192.168.204.137:8180/manager/html --user admin:admin 常规解析curl返回内容
result = self.get_info_xpost(stdout,stderr) result = self.get_info_curl(instruction,stdout,stderr)
else: #非处理命令的结果,暂时不提交LLM else:
result ="" result = self.get_info_curl(instruction,stdout,stderr)
return result return result
if __name__ =="__main__": if __name__ =="__main__":

17
tools/DirbTool.py

@ -0,0 +1,17 @@
import subprocess
import tempfile
import os
from tools.ToolBase import ToolBase
class DirbTool(ToolBase):
def validate_instruction(self, instruction):
#指令过滤
timeout = 0
instruction = instruction.strip()
if " -o" not in instruction:
instruction += " -o dirout.txt"
return instruction,timeout
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析
return result

2
tools/EchoTool.py

@ -16,6 +16,6 @@ class EchoTool(ToolBase):
else: else:
result ="不存在安全问题" result ="不存在安全问题"
else:#未预处理的情况,暂时不返回LLM else:#未预处理的情况,暂时不返回LLM
result = "" pass
return result return result

13
tools/Enum4linuxTool.py

@ -0,0 +1,13 @@
from tools.ToolBase import ToolBase
from mycode.Result_merge import my_merge
class Enum4linuxTool(ToolBase):
def validate_instruction(self, instruction):
#指令过滤
timeout = 0
return instruction,timeout
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析
result = my_merge("enum4linux", result)
return result

14
tools/FtpTool.py

@ -46,8 +46,10 @@ class FtpTool(ToolBase):
def validate_instruction(self, instruction): def validate_instruction(self, instruction):
timeout = 30 timeout = 30
#modified_code = "ftp匿名登录测试" #modified_code = "ftp匿名登录测试"
#若有put文件,则替换为payload文件
return instruction,timeout new_file = "payload/test.txt"
new_instr = re.sub(r'(put\s+)\S+', r'\1' + new_file, instruction)
return new_instr,timeout
def do_worker_subprocess(self,str_instruction,timeout,ext_params): def do_worker_subprocess(self,str_instruction,timeout,ext_params):
output = "" output = ""
@ -123,12 +125,12 @@ class FtpTool(ToolBase):
# 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#? # 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#?
# 第二步:执行指令---需要对ftp指令进行区分判断 # 第二步:执行指令---需要对ftp指令进行区分判断
pattern = re.compile(r'ftp\s+-n\s+\S+\s+<< EOF') pattern = re.compile(r'ftp\s+-n\s+\S+(\s+\d+)?\s+<<\s*EOF')
match = pattern.search(instruction) match = pattern.search(instruction)
if bool(match): #如果是 ftp -n 192.168.204.137 <<EOF 开头 if bool(match): #如果是 ftp -n 192.168.204.137 <<EOF 开头
output = self.do_worker_subprocess(instruction,time_out,ext_params) # output = self.do_worker_subprocess(instruction,time_out,ext_params)
if not output: # if not output:
output = self.do_worker_script(instruction,time_out,ext_params) output = self.do_worker_script(instruction,time_out,ext_params)
else: #最后使用ftp匿名登陆验证代码 else: #最后使用ftp匿名登陆验证代码
target = "" target = ""

56
tools/HydraTool.py

@ -1,6 +1,7 @@
import os import os
import shlex import shlex
import re import re
from collections import OrderedDict
from tools.ToolBase import ToolBase from tools.ToolBase import ToolBase
class HydraTool(ToolBase): class HydraTool(ToolBase):
@ -13,22 +14,63 @@ class HydraTool(ToolBase):
if match_p: if match_p:
str_p = match_p.group(1) str_p = match_p.group(1)
#判断文件是否存在 #判断文件是否存在
if not os.path.exists(str_p): #文件不存在要替换 #if not os.path.exists(str_p): #文件不存在要替换
new_pass_path = os.path.join(current_path, "../payload", "passwords") new_pass_path = os.path.join(current_path, "../payload", "passwords")
instruction = instruction.replace(str_p,new_pass_path) instruction = instruction.replace(str_p,new_pass_path)
if match_l: if match_l:
str_l = match_l.group(1) str_l = match_l.group(1)
#判断文件是否存在 #判断文件是否存在
if not os.path.exists(str_l): #if not os.path.exists(str_l):
new_user_path = os.path.join(current_path, "../payload", "users") new_user_path = os.path.join(current_path, "../payload", "users")
instruction = instruction.replace(str_l, new_user_path) instruction = instruction.replace(str_l, new_user_path)
#不是双字典的情况加-f
if "-l" in instruction or "-p" in instruction: if "-l" in instruction or "-p" in instruction:
if "-f" not in instruction: if "-f" not in instruction:
instruction = instruction + " -f" #当是单密码,或单用户名时,使用成功即停止模式 instruction = instruction.strip() + " -f" #当是单密码,或单用户名时,使用成功即停止模式
#取消-v -V
instruction = instruction.replace(" -V "," ")
instruction = instruction.replace(" -v "," ")
instruction = instruction.replace(" -vV","")
#加-o 存在个不确定项是:若没有匹配到,输出文件里面是只有一行执行的命令,没有结果描述
if " -o" not in instruction:
instruction = instruction + " -o hydra_result.txt"
# # 加 -q
# if " -q" not in instruction:
# instruction = instruction + " -q"
return instruction,timeout return instruction,timeout
def merge_info(self,result):
try:
# 按行分割输出,保留非空行
lines = [line.strip() for line in result.splitlines() if line.strip() != ""]
# 使用有序字典统计相同行的出现次数,保持原始顺序
counts = OrderedDict()
for line in lines:
if line in counts:
counts[line] += 1
else:
counts[line] = 1
# 生成整合后的输出,重复的行后面跟上*次数标记
output_lines = []
for line, count in counts.items():
if count > 1:
output_lines.append(f"{line} *{count}")
else:
output_lines.append(line)
consolidated = "\n".join(output_lines)
return consolidated
except Exception as e:
return result
def analyze_result(self, result,instruction,stderr,stdout): def analyze_result(self, result,instruction,stderr,stdout):
#返回结果 #返回结果
# result = self.merge_info(result)
# print(result)
#加文件后缀了
lines = result.splitlines()
if len(lines) == 1:
result = "没有匹配到成功的结果"
return result return result

63
tools/MysqlTool.py

@ -1,5 +1,6 @@
#mysql #mysql
#pip install mysql-connector-python #pip install mysql-connector-python
import subprocess
import mysql.connector import mysql.connector
from mysql.connector import Error from mysql.connector import Error
from tools.ToolBase import ToolBase from tools.ToolBase import ToolBase
@ -31,33 +32,61 @@ class MysqlTool(ToolBase):
return res return res
def validate_instruction(self, instruction): def validate_instruction(self, instruction):
#mysql暂只执行空密码攻击 timeout = 30
timeout = 0 #modified_code = "mysql空密码登录测试"
modified_code = "mysql空密码登录测试" instr = instruction.replace("--ssl-mode=DISABLED","--ssl=0") #mariaDB 没有ssl-mode参数
return modified_code,0 # if "--ssl=0" not in instr:
# instr = instr + " --ssl=0"
return instr,timeout
#对于非sh命令调用的工具,自己实现命令执行的内容 #对于非sh命令调用的工具,自己实现命令执行的内容 --#2025-3-24暂时不使用
def execute_instruction(self, instruction_old): def execute_instruction_old(self, instruction_old):
ext_params = self.create_extparams() ext_params = self.create_extparams()
# 第一步:验证指令合法性 # 第一步:验证指令合法性
instruction,time_out = self.validate_instruction(instruction_old) instruction,timeout = self.validate_instruction(instruction_old)
if not instruction: if not instruction:
return False, instruction_old, "该指令暂不执行!","",ext_params return False, instruction_old, "该指令暂不执行!","",ext_params
# 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#? # 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#?
# 第二步:执行指令 # 第二步:执行指令
target = "" # target = ""
parts = instruction_old.split() # parts = instruction_old.split()
for i, part in enumerate(parts): # for i, part in enumerate(parts):
if part == "-h" and i + 1 < len(parts): # if part == "-h" and i + 1 < len(parts):
target = parts[i + 1] # target = parts[i + 1]
output = self.test_empty_password_mysql_connection(target)#弱密码攻击如何处理? # output = self.test_empty_password_mysql_connection(target)#弱密码攻击如何处理?
output = ""
stdout = ""
stderr = ""
try:
if timeout == 0:
result = subprocess.run(instruction, shell=True, capture_output=True, text=True)
elif timeout > 0:
result = subprocess.run(instruction, shell=True, capture_output=True, text=True, timeout=timeout)
else:
print("timeout参数错误,需要自查程序逻辑!")
stderr = result.stderr
stdout = result.stdout
except subprocess.TimeoutExpired as e:
stdout = e.stdout if e.stdout is not None else ""
stderr = e.stderr if e.stderr is not None else ""
ext_params.is_user = True # 对于超时的也需要人工进行确认,是否是预期的超时
except Exception as e:
ext_params.is_user = True
return False, instruction, f"执行失败:{str(e)}", "", ext_params # 执行失败,提交给人工确认指令的正确性
# 第三步:分析执行结果 # 第三步:分析执行结果
analysis = self.analyze_result(output,instruction,"","") output = stdout
# 指令和结果入数据库 if stderr:
# ? output += stderr
return True, instruction, analysis,output,ext_params if isinstance(output, bytes): # 若是bytes则转成str
output = output.decode('utf-8', errors='ignore')
analysis = self.analyze_result(output, instruction, stderr, stdout)
if not analysis: # analysis为“” 不提交LLM
ext_params.is_user = True
return False, instruction, analysis, output, ext_params
return True, instruction, analysis, output, ext_params
def analyze_result(self, result,instruction,stderr,stdout): def analyze_result(self, result,instruction,stderr,stdout):
# #

2
tools/NcTool.py

@ -2,7 +2,7 @@ from tools.ToolBase import ToolBase
class NcTool(ToolBase): class NcTool(ToolBase):
def validate_instruction(self, instruction): def validate_instruction(self, instruction):
timeout = 30 timeout = 60
#指令过滤 #指令过滤
if "<<<" in instruction: if "<<<" in instruction:
instruction = f"bash -c \"{instruction}\"" instruction = f"bash -c \"{instruction}\""

64
tools/NiktoTool.py

@ -14,9 +14,71 @@ class NiktoTool(ToolBase):
# 使用正则表达式匹配 -ssl 参数及其相邻的空格 # 使用正则表达式匹配 -ssl 参数及其相邻的空格
cleaned_command = re.sub(r'\s+-ssl\b|\b-ssl\b', '', instruction, flags=re.IGNORECASE) cleaned_command = re.sub(r'\s+-ssl\b|\b-ssl\b', '', instruction, flags=re.IGNORECASE)
# 处理可能残留的多余空格 # 处理可能残留的多余空格
return re.sub(r'\s+', ' ', cleaned_command).strip(),timeout command = re.sub(r'\s+', ' ', cleaned_command).strip()
command = command.replace("-p80","-p 80")
return command,timeout
def analyze_result(self, result,instruction,stderr,stdout): def analyze_result(self, result,instruction,stderr,stdout):
# 检查结果 # 检查结果
if stderr:
result = stderr
else:
result = self.parse_nikto_full_info(result)
return result
def parse_nikto_full_info(self,nikto_output: str) -> dict:
"""
Nikto 扫描结果中提取尽可能多的漏洞信息和对后续渗透测试有用的信息
解析思路
1. 通过正则表达式提取以+开头的行Nikto 输出通常以+开头
2. 针对常见漏洞格式 OSVDB-xxxx进行匹配
3. 针对可能包含vulnerabilitywarningpotential等关键字的行进行提取
4. 针对配置或其他信息 "Allowed HTTP Methods", "Cookie", "Directory indexing" 也进行匹配
5. 对于未知格式尽量保留原始提示以供后续人工分析
返回的字典包含以下键
- vulnerabilities: 漏洞相关信息 OSVDB 编号潜在漏洞提示等
- configuration_issues: 与服务器配置HTTP 方法Cookie安全策略等相关的提示
- misc_info: 其他可能对渗透测试有帮助的信息
- error: 如解析过程中发生异常则记录异常信息
"""
result = {
"vulnerabilities": [],
"configuration_issues": [],
"misc_info": []
}
try:
# 获取所有以 + 开头的行
plus_lines = re.findall(r'^\+\s*(.+)$', nikto_output, re.MULTILINE)
# 定义一些常见关键词,出现这些关键词的行我们认为是漏洞或安全问题
vuln_keywords = [
"OSVDB", "vulnerab", "potential", "insecure", "misconfig", "expos",
"Directory indexing", "Outdated", "Deprecated", "SSL", "Cookie", "error"
]
config_keywords = [
"Allowed HTTP Methods", "Server:", "Banner", "HTTP Options", "Headers", "trace", "PUT", "DELETE"
]
for line in plus_lines:
# 先将行统一为小写做匹配,但保留原始行内容
low_line = line.lower()
# 判断是否属于漏洞信息
if re.search(r'osvdb-\d+', line):
result["vulnerabilities"].append(line.strip())
elif any(kw.lower() in low_line for kw in vuln_keywords):
result["vulnerabilities"].append(line.strip())
# 判断是否属于配置问题(例如HTTP方法、Cookie安全、服务器banner异常等)
elif any(kw.lower() in low_line for kw in config_keywords):
result["configuration_issues"].append(line.strip())
else:
# 其他信息,可能对后续渗透测试有启示
result["misc_info"].append(line.strip())
except Exception as e:
result["error"] = f"解析过程中出现异常: {str(e)}"
return result return result

84
tools/PsqlTool.py

@ -0,0 +1,84 @@
import subprocess
import tempfile
import os
from tools.ToolBase import ToolBase
class PsqlTool(ToolBase):
def validate_instruction(self, instruction):
#指令过滤
timeout = 60
return instruction,timeout
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析
return result
def do_worker_script(self,str_instruction,timeout,ext_params):
# 创建临时文件保存输出
with tempfile.NamedTemporaryFile(delete=False) as tmpfile:
output_file = tmpfile.name
# 构建并执行 script 命令
script_cmd = f"script -c '{str_instruction}' {output_file}"
try:
result = subprocess.run(script_cmd, shell=True, text=True,timeout=timeout)
# 读取输出文件内容
with open(output_file, 'r') as f:
output = f.read()
lines = output.splitlines()
# 跳过第一行(Script started)和最后一行(Script done)
ftp_output = lines[1:-1]
output = '\n'.join(ftp_output)
except subprocess.TimeoutExpired:
output = "命令超时返回"
try:
with open(output_file, 'r') as f:
partial_output = f.read()
if partial_output:
output += f"\n部分输出:\n{partial_output}"
except FileNotFoundError:
pass # 文件可能未创建
except subprocess.CalledProcessError as e:
output = f"错误: {e}"
finally:
# 删除临时文件
try:
os.remove(output_file)
except FileNotFoundError:
pass # 文件可能未创建
return output
def execute_instruction1(self, instruction_old):
'''
执行指令验证合法性 -> 执行 -> 分析结果
*****如果指令要做验证只做白名单所有逻辑不是全放开就是白名单*****
:param instruction_old:
:return:
bool:true-正常返回给大模型处理下一步false-结果不返回给大模型,2--需要人工确认的指令
str:执行的指令
str:执行指令的结果-解析过滤后的结果--也是提交给LLM的结果
str:执行指令的结果-原结果
object:补充参数-封装一个对象 0-不知是否攻击成功1-明确存在漏洞2-明确不存在漏洞
'''
ext_params = self.create_extparams()
# 第一步:验证指令合法性
instruction,timeout = self.validate_instruction(instruction_old)
if not instruction:
ext_params.is_user= True
return False,instruction_old,"该指令暂不执行!由用户确认是否要兼容支持","",ext_params #未
#过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#?
# 第二步:执行指令
output = self.do_worker_script(instruction,timeout,ext_params)
# 第三步:分析执行结果
if isinstance(output,bytes):#若是bytes则转成str
output = output.decode('utf-8', errors='ignore')
analysis = self.analyze_result(output,instruction,"","")
if not analysis: #analysis为“” 不提交LLM
ext_params.is_user = True
return False,instruction,analysis,output,ext_params
return True,instruction, analysis,output,ext_params

23
tools/PythoncodeTool.py

@ -6,7 +6,14 @@ import builtins
import re import re
import paramiko import paramiko
import impacket import impacket
import psycopg2
import socket
import struct
import sys
import requests
import mysql.connector
from tools.ToolBase import ToolBase from tools.ToolBase import ToolBase
from mycode.Result_merge import my_merge
class PythoncodeTool(ToolBase): class PythoncodeTool(ToolBase):
@ -56,6 +63,7 @@ class PythoncodeTool(ToolBase):
# 定义允许的内置函数集合 --白名单 # 定义允许的内置函数集合 --白名单
allowed_builtins = { allowed_builtins = {
'__name__': __name__,
'__import__': builtins.__import__, '__import__': builtins.__import__,
"abs": abs, "abs": abs,
"all": all, "all": all,
@ -96,7 +104,14 @@ class PythoncodeTool(ToolBase):
'json':json, 'json':json,
're':re, 're':re,
'paramiko':paramiko, 'paramiko':paramiko,
'impacket':impacket} 'impacket':impacket,
'psycopg2':psycopg2,
'socket':socket,
'mysql':mysql,
'mysql.connector':mysql.connector,
'struct':struct,
'sys':sys,
'requests':requests}
safe_locals = {} #不需要预设局部参数 safe_locals = {} #不需要预设局部参数
# 在限制环境中执行代码 # 在限制环境中执行代码
exec(instruction, safe_globals,safe_locals) exec(instruction, safe_globals,safe_locals)
@ -107,7 +122,7 @@ class PythoncodeTool(ToolBase):
return True,instruction,analysis,analysis,ext_params return True,instruction,analysis,analysis,ext_params
# Get the function and call it # Get the function and call it
dynamic_fun = safe_locals['dynamic_fun'] dynamic_fun = safe_locals['dynamic_fun']
status, tmpout = dynamic_fun() status, tmpout = dynamic_fun() #LLM存在status定义错误的情况(执行成功,却返回的是False) #重点要处理
output = f"status:{status},output:{tmpout}" output = f"status:{status},output:{tmpout}"
except Exception as e: except Exception as e:
analysis = f"执行动态代码时出错: {str(e)}" analysis = f"执行动态代码时出错: {str(e)}"
@ -124,7 +139,9 @@ class PythoncodeTool(ToolBase):
return True, instruction, analysis,"",ext_params return True, instruction, analysis,"",ext_params
def analyze_result(self, result,instruction,stderr,stdout): def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析 #指令结果分析 --- 要不要限定一个max_len?
if "enum4linux " in instruction: #存在指令包装成Python代码返回的情况
result = my_merge("enum4linux",result)
return result return result
if __name__ == "__main__": if __name__ == "__main__":

8
tools/SearchsploitTool.py

@ -103,9 +103,9 @@ class SearchsploitTool(ToolBase):
instruction = f"searchsploit -m {filepath}" instruction = f"searchsploit -m {filepath}"
try: try:
subprocess.run(instruction, shell=True, check=True,timeout=60) subprocess.run(instruction, shell=True, check=True,timeout=60)
print("命令执行成功,文件下载完成。") #print("命令执行成功,文件下载完成。")
except subprocess.CalledProcessError as e: #超时暂不处理--后续补充 except subprocess.CalledProcessError as e: #超时暂不处理--后续补充
print(f"命令执行失败: {e}") #print(f"命令执行失败: {e}")
return False return False
if not os.path.exists(filename): if not os.path.exists(filename):
@ -113,10 +113,10 @@ class SearchsploitTool(ToolBase):
# 移动文件到目标目录 # 移动文件到目标目录
try: try:
shutil.move(filename, os.path.join(dirpath, filename)) shutil.move(filename, os.path.join(dirpath, filename))
print(f"文件已成功移动到 {dirpath}") #print(f"文件已成功移动到 {dirpath}")
return True return True
except Exception as e: except Exception as e:
print(f"移动文件失败: {e}") #print(f"移动文件失败: {e}")
return False return False
else: #暂时只利用python脚本 else: #暂时只利用python脚本
return False return False

11
tools/ShowmountTool.py

@ -0,0 +1,11 @@
from tools.ToolBase import ToolBase
class ShowmountTool(ToolBase):
def validate_instruction(self, instruction):
#指令过滤
timeout = 0
return instruction,timeout
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析
return result

24
tools/SmtpuserenumTool.py

@ -1,29 +1,25 @@
import os import os
import shlex import shlex
import re
from tools.ToolBase import ToolBase from tools.ToolBase import ToolBase
class SmtpuserenumTool(ToolBase): class SmtpuserenumTool(ToolBase):
def validate_instruction(self, instruction): def validate_instruction(self, instruction):
timeout = 0 timeout = 0
# 分割指令为参数列表
cmd_parts = shlex.split(instruction)
new_cmd = []
# 获取当前程序所在目录 # 获取当前程序所在目录
current_path = os.path.dirname(os.path.realpath(__file__)) current_path = os.path.dirname(os.path.realpath(__file__))
new_user_path = os.path.join(current_path, "../payload", "users") new_user_path = os.path.join(current_path, "../payload", "users")
i = 0
while i < len(cmd_parts):
part = cmd_parts[i]
new_cmd.append(part)
# 检测到-P参数
if part == "-U" and i + 1 < len(cmd_parts): # 用户名
# 替换下一参数为指定路径
new_cmd.append(new_user_path)
i += 1 # 跳过原路径参数
i += 1
return " ".join(shlex.quote(p) for p in new_cmd),timeout match = re.search(r'-U\s+(\S+)', instruction)
if match:
file_path = match.group(1)
# 检查该文件是否存在
if not os.path.isfile(file_path):
# 替换原文件路径为新的路径
instruction = instruction.replace(file_path, new_user_path)
return instruction,timeout
def analyze_result(self, result,instruction,stderr,stdout): def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析 #指令结果分析

11
tools/SwaksTool.py

@ -0,0 +1,11 @@
from tools.ToolBase import ToolBase
class SwaksTool(ToolBase):
def validate_instruction(self, instruction):
#指令过滤
timeout = 0
return instruction,timeout
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析
return result

132
tools/TelnetTool.py

@ -6,7 +6,7 @@ MAIL FROM: <admin@haitutech.cn>
RCPT TO: <target@example.com> NOTIFY=SUCCESS,FAILURE RCPT TO: <target@example.com> NOTIFY=SUCCESS,FAILURE
''' '''
import socket import socket
import subprocess
from tools.ToolBase import ToolBase from tools.ToolBase import ToolBase
class TelnetTool(ToolBase): class TelnetTool(ToolBase):
@ -19,10 +19,9 @@ class TelnetTool(ToolBase):
#指令结果分析 #指令结果分析
return result return result
def smtp_injection_test(self,cmd_str: str): def smtp_injection_test(self,cmd_str: str,host,port):
""" """
测试 SMTP 命令注入漏洞 测试 SMTP 命令注入漏洞
参数: 参数:
cmd_str: 多行字符串其中第一行为类似 "telnet haitutech.cn 25" cmd_str: 多行字符串其中第一行为类似 "telnet haitutech.cn 25"
后续行为具体的 SMTP 命令例如 后续行为具体的 SMTP 命令例如
@ -35,67 +34,42 @@ class TelnetTool(ToolBase):
如果 RCPT 命令响应以 250 开头认为可能存在漏洞返回 True 如果 RCPT 命令响应以 250 开头认为可能存在漏洞返回 True
否则认为安全或不支持该注入返回 False 否则认为安全或不支持该注入返回 False
""" """
last_response = ""
lines = cmd_str.strip().splitlines() lines = cmd_str.strip().splitlines()
if not lines: try:
print("未提供命令") # 建立 TCP 连接
return "" s = socket.create_connection((host, port), timeout=10)
except Exception as e:
# 解析第一行,格式应为:telnet host port #print("连接失败:", e)
parts = lines[0].strip().split() return f"连接失败:{e}"
if len(parts) < 3:
print("第一行格式错误,预期格式: 'telnet host port'")
return ""
host = parts[1] # 读取 SMTP Banner
try: try:
port = int(parts[2]) banner = s.recv(1024).decode('utf-8', errors='ignore')
except ValueError: except Exception as e:
print("端口转换失败") #print("读取 Banner 失败:", e)
return "" s.close()
return f"读取 Banner 失败:{e}"
last_response = "" # 从第二行开始发送 SMTP 命令
if port == 25: for line in lines[1:]:
cmd = line.strip()
if not cmd:
continue
try: try:
# 建立 TCP 连接 s.send((cmd + "\r\n").encode())
s = socket.create_connection((host, port), timeout=10)
except Exception as e: except Exception as e:
#print("连接失败:", e) s.close()
return f"连接失败:{e}" return f"发送命令失败:{e}"
# 读取 SMTP Banner
try: try:
banner = s.recv(1024).decode('utf-8', errors='ignore') response = s.recv(1024).decode('utf-8', errors='ignore')
print("Banner:", banner.strip()) #print("收到响应:", response.strip())
last_response = response.strip()
except Exception as e: except Exception as e:
#print("读取 Banner 失败:", e) #print("读取响应失败:", e)
s.close() s.close()
return f"读取 Banner 失败:{e}" return f"读取响应失败:{e}"
s.close()
# 从第二行开始发送 SMTP 命令
for line in lines[1:]:
cmd = line.strip()
if not cmd:
continue
print("发送命令:", cmd)
try:
s.send((cmd + "\r\n").encode())
except Exception as e:
print("发送命令失败:", e)
s.close()
return ""
try:
response = s.recv(1024).decode('utf-8', errors='ignore')
print("收到响应:", response.strip())
last_response = response.strip()
except Exception as e:
print("读取响应失败:", e)
s.close()
return f"读取响应失败:{e}"
s.close()
else:
return ""
# 以 RCPT 命令的响应作为判断依据 # 以 RCPT 命令的响应作为判断依据
# 注意:对于支持 DSN 的服务器,250 可能是合法的响应, # 注意:对于支持 DSN 的服务器,250 可能是合法的响应,
# 但在不支持的情况下,若返回 250 则可能说明命令注入导致了不正常的处理。 # 但在不支持的情况下,若返回 250 则可能说明命令注入导致了不正常的处理。
@ -115,19 +89,59 @@ class TelnetTool(ToolBase):
''' '''
ext_params = self.create_extparams() ext_params = self.create_extparams()
# 第一步:验证指令合法性 # 第一步:验证指令合法性
instruction,time_out = self.validate_instruction(instruction_old) instruction,timeout = self.validate_instruction(instruction_old)
if not instruction: if not instruction:
return False,instruction_old,"该指令暂不执行!","",ext_params return False,instruction_old,"该指令暂不执行!","",ext_params
#过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#? #过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#?
# 第二步:执行指令 # 第二步:执行指令
output = self.smtp_injection_test(instruction) lines = instruction.strip().splitlines()
# 解析第一行,格式应为:telnet host port
parts = lines[0].strip().split()
if len(parts) < 3:
output = "第一行格式错误,预期格式: 'telnet host port'"
ext_params["is_user"] = True
return True,instruction,output,output,ext_params
host = parts[1]
try:
port = int(parts[2])
except ValueError:
output = "端口转换失败"
ext_params["is_user"] = True
return True, instruction, output, output, ext_params
if port == 25:
output = self.smtp_injection_test(instruction,host,port)
else:#其他默认subprocess执行
try:
# 指令过滤
if "<<<" in instruction:
instruction = f"bash -c \"{instruction}\""
if timeout == 0:
result = subprocess.run(instruction, shell=True, capture_output=True, text=True)
elif timeout > 0:
result = subprocess.run(instruction, shell=True, capture_output=True, text=True, timeout=timeout)
else:
print("timeout参数错误,需要自查程序逻辑!")
stderr = result.stderr
stdout = result.stdout
except subprocess.TimeoutExpired as e:
stdout = e.stdout if e.stdout is not None else ""
stderr = e.stderr if e.stderr is not None else ""
ext_params.is_user = True # 对于超时的也需要人工进行确认,是否是预期的超时
except Exception as e:
ext_params.is_user = True
return False, instruction, f"执行失败:{str(e)}", "", ext_params # 执行失败,提交给人工确认指令的正确性
output = stdout
if stderr:
output += stderr
if isinstance(output, bytes): # 若是bytes则转成str
output = output.decode('utf-8', errors='ignore')
# 第三步:分析执行结果 # 第三步:分析执行结果
analysis = self.analyze_result(output,instruction,"","") analysis = self.analyze_result(output,instruction,"","")
#指令和结果入数据库
#?
if not analysis: #analysis为“” 不提交LLM if not analysis: #analysis为“” 不提交LLM
ext_params.is_user = True
return False,instruction,analysis,output,ext_params return False,instruction,analysis,output,ext_params
return True,instruction, analysis,output,ext_params return True,instruction, analysis,output,ext_params

73
tools/ToolBase.py

@ -9,6 +9,7 @@
import abc import abc
import subprocess import subprocess
import argparse import argparse
import shlex
import sys import sys
from myutils.ReturnParams import ReturnParams from myutils.ReturnParams import ReturnParams
@ -26,26 +27,62 @@ class ToolBase(abc.ABC):
return ext_params return ext_params
def parse_sublist3r_command(self,command): def parse_sublist3r_command(self,command):
parser = argparse.ArgumentParser(add_help=False) #创建命令行参数解析器对象‌ parser = argparse.ArgumentParser(add_help=False,allow_abbrev=False) #创建命令行参数解析器对象‌
parser.add_argument("-o ", "--output", type=str) #添加需要解析的参数规则‌ parser.add_argument("-o", "--output", type=str) #添加需要解析的参数规则‌
parser.add_argument("-oN ", "--Noutput", type=str) #nmap parser.add_argument("-oN", "--Noutput", type=str) #nmap
parser.add_argument("-oG ","--NMG",type=str) #nmap parser.add_argument("-oG","--NMG",type=str) #nmap
parser.add_argument("-output ", "--nikto", type=str) #nikto parser.add_argument("-output", "--nikto", type=str) #nikto
args, _ = parser.parse_known_args(command.split()[1:]) args, _ = parser.parse_known_args(command.split()[1:])
return args return args
def parse_output_params(self,command, valid_flags=None):
"""
解析 shell 命令中用于输出文件的参数
只检查完全匹配的 token忽略诸如 "-oKexAlgorithms" 这样的参数
:param command: 完整的命令字符串
:param valid_flags: 合法的输出参数列表默认支持 -o, -oN, -oG, -output
:return: dict键为输出参数值为对应的文件路径如果有的话
"""
if valid_flags is None:
valid_flags = ['-o', '-oN', '-oG', '-output']
tokens = shlex.split(command)
output_params = {}
output_file = ""
i = 0
while i < len(tokens):
token = tokens[i]
# 如果 token 完全匹配有效的参数,则认为它是输出参数
if token in valid_flags:
# 如果紧跟着有值,则把它作为输出文件路径,否则为 None
if i + 1 < len(tokens):
#output_params[token] = tokens[i + 1]
#i += 2
output_file = tokens[i+1]
# else:
# output_params[token] = None
# i += 1
break # 一般只有一个输出参数
else:
i += 1
return output_file
def read_output_file(self,filename): def read_output_file(self,filename):
""" """
读取 -o 参数指定的文件内容 读取 -o 参数指定的文件内容
""" """
content = ""
try: try:
with open(filename, "r") as f: with open(filename, "r") as f:
return f.read() content = f.read()
with open(filename,'w') as f:
f.write("")
except FileNotFoundError: except FileNotFoundError:
print(f"错误: 文件 {filename} 不存在") print(f"错误: 文件 {filename} 不存在")
except PermissionError: except PermissionError:
print(f"错误: 无权限读取文件 {filename}") print(f"错误: 无权限读取文件 {filename}")
return "" return content
def execute_instruction(self, instruction_old): def execute_instruction(self, instruction_old):
''' '''
@ -82,23 +119,11 @@ class ToolBase(abc.ABC):
print("timeout参数错误,需要自查程序逻辑!") print("timeout参数错误,需要自查程序逻辑!")
#output = result.stdout if result.stdout else result.stderr #output = result.stdout if result.stdout else result.stderr
#-o 的命令需要处理 #-o 的命令需要处理
parsed_arg = self.parse_sublist3r_command(instruction) parsed_arg = self.parse_output_params(instruction)
if parsed_arg.output: if parsed_arg:
file_out = self.read_output_file(parsed_arg.output) fole_out = self.read_output_file(parsed_arg)
stderr = "" stderr =""
stdout = file_out stdout = fole_out
elif parsed_arg.Noutput:
file_out = self.read_output_file(parsed_arg.Noutput)
stderr = ""
stdout = file_out
elif parsed_arg.nikto:
file_out = self.read_output_file(parsed_arg.nikto)
stderr = ""
stdout = file_out
elif parsed_arg.NMG:
file_out = self.read_output_file(parsed_arg.NMG)
stderr = ""
stdout = file_out
else: else:
stderr = result.stderr stderr = result.stderr
stdout = result.stdout stdout = result.stdout

Loading…
Cancel
Save