Compare commits

...

2 Commits

Author SHA1 Message Date
张龙 53c3ed4d10 V0.1.1 1 month ago
张龙 5c988bb36a V0.1.1 1 month ago
  1. 16
      InstructionManager.py
  2. 173
      LLMManager.py
  3. 260
      TaskManager.py
  4. 4
      config.yaml
  5. 148
      mycode/AttackMap.py
  6. 180
      mycode/ControlCenter.py
  7. 19
      mycode/DBManager.py
  8. 23
      mycode/InitManager.py
  9. 233
      mycode/LLMManager.py
  10. 2
      test
  11. 4
      tools/MsfconsoleTool.py
  12. 38
      tools/NmapTool.py
  13. 1
      tools/OpensslTool.py
  14. 80
      tools/PythonTool.py
  15. 129
      tools/PythoncodeTool.py
  16. 107
      tools/SearchsploitTool.py
  17. 23
      tools/ToolBase.py

16
InstructionManager.py

@ -10,11 +10,13 @@ import subprocess
import importlib import importlib
import os import os
from tools.ToolBase import ToolBase from tools.ToolBase import ToolBase
from myutils.ReturnParams import ReturnParams
class InstructionManager: class InstructionManager:
def __init__(self): def __init__(self,TM):
self.tool_registry = {} #安全工具list self.TM = TM #任务类对象穿透
self.load_tools() #加载工具类 self.tool_registry = {} # 安全工具list
self.load_tools() # 加载工具类
def init_data(self): def init_data(self):
pass pass
@ -34,7 +36,7 @@ class InstructionManager:
cls = getattr(module,module_name) cls = getattr(module,module_name)
if(issubclass(cls, ToolBase) and cls != ToolBase): if(issubclass(cls, ToolBase) and cls != ToolBase):
tool_name = module_name.lower()[:-4] tool_name = module_name.lower()[:-4]
self.tool_registry[tool_name] = cls() self.tool_registry[tool_name] = cls(self.TM) #类对象穿透
except ImportError as e: except ImportError as e:
print(f"加载工具 {module_name} 失败:{str(e)}") print(f"加载工具 {module_name} 失败:{str(e)}")
@ -56,9 +58,15 @@ class InstructionManager:
#print(f"分析结果:{result}") #print(f"分析结果:{result}")
print(f"*****指令:{instr},执行完毕") print(f"*****指令:{instr},执行完毕")
else: else:
bres = False
instr = instruction #保障后续代码的一致性 instr = instruction #保障后续代码的一致性
source_result = result = f"未知工具:{tool_name}"
ext_params = ReturnParams()
ext_params["is_user"] = False # 是否要提交用户确认 -- 默认False
ext_params["is_vulnerability"] = False # 是否是脆弱点
print(f"执行指令:{instr}") print(f"执行指令:{instr}")
print(f"未知工具:{tool_name}") print(f"未知工具:{tool_name}")
return bres,instr,result,source_result,ext_params return bres,instr,result,source_result,ext_params
#过来指令:合规、判重、待执行等 #过来指令:合规、判重、待执行等

173
LLMManager.py

@ -1,173 +0,0 @@
'''
实现对大模型调用的封装隔离具体使用的LLM
pip install openai
'''
import openai
import json
import threading
import re
from openai import OpenAI
from myutils.MyTime import get_local_timestr
class LLMManager:
def __init__(self,illm_type=0):
self.api_key = None
self.api_url = None
self.task_id =0 #一个任务一个id
self.llm_sn = 0 # llm执行序列号,--一任务一序列
self.llm_sn_lock = threading.Lock() #
#temperature设置
#DS------代码生成/数学解题:0.0 -- 数据抽取/分析:1.0 -- 通用对话:1.3 -- 翻译:1.3 -- 创意类写作:1.5
#腾讯云---
self.temperature = 1.0
if illm_type == 0: #腾讯云
self.api_key = "fGBYaQLHykBOQsFwVrQdIFTsYr8YDtDVDQWFU41mFsmvfNPc"
self.api_url = ""
elif illm_type == 1: #DS
self.api_key ="sk-10360148b465424288218f02c87b0e1b"
self.api_url ="https://api.deepseek.com/v1"
self.model = "deepseek-reasoner" #model=deepseek-reasoner -- R1 model=deepseek-chat --V3
#创建会话对象 -- 一个任务的LLM必须唯一
self.client = OpenAI(api_key=self.api_key,base_url=self.api_url)
self.messages = []
# 初始化阶段和已知信息
current_stage = "信息收集"
known_info = {"url": "www.test.com"}
results = [] # 存储所有任务结果,用于生成报告
#******测试使用,设置slef.message的值
def test_old_message(self,strMessage):
try:
self.messages = json.loads(strMessage)
except json.JSONDecodeError as e:
print(f"JSON解析错误: {str(e)}")
except Exception as e:
print(f"错误: {str(e)}")
# 构建初始提示 初始化messages
def build_initial_prompt(self,target,know_info=""):
self.messages = [{"role": "system",
"content": "你是一位资深的渗透测试专家,需要动态控制整个渗透测试过程,包括信息收集、漏洞扫描、漏洞利用等阶段,最终生成渗透测试报告。由你规划执行的指令,我会根据你的指令执行并提交结果,你再对结果进行分析,规划并生成下一步指令,直到完成渗透测试,生成测试报告。"
"生成的指令需满足如下约束:"
"1.只返回具体的shell指令或Python代码,不要包含注释和说明;"
"2.shell指令以```bash(.*?)```包裹,python代码以```python(.*?)```包裹;"
"3.针对提供的Python代码,需要包含错误处理,并将执行结果保存到文件中(为每个任务指定唯一文件名),执行结束返回success或failure和该文件名;"
"4.如果认为渗透测试已完成,请生成生成报告的python代码,并返回success和complete"}] # 一个messages
return f"现在开始对目标{target}进行渗透测试,已知信息{know_info},请提供下一步执行的指令。"
# 构建反馈提示
def build_feedback_prompt(self,bres,instruction, result):
if bres:
return f"执行指令“{instruction}”的结果是“{result}”。请根据这些结果生成下一步具体的指令。"
else:
return ""
def init_data(self,task_id=0):
#初始化LLM数据
self.llm_sn = 0
self.task_id = task_id
self.messages = []
# 调用LLM生成指令
def get_llm_instruction(self,prompt,th_DBM):
'''
1.由于大模型API不记录用户请求的上下文一个任务的LLM不能并发
:param prompt:用户本次输入的内容
:return: instr_list
'''
#添加本次输入入队列
message = {"role":"user","content":prompt}
self.messages.append(message)
#提交LLM
post_time = get_local_timestr()
response = self.client.chat.completions.create(
model=self.model,
messages = self.messages
)
reasoning_content = ""
content = ""
#LLM返回处理
if self.model == "deepseek-reasoner":
#返回错误码:DS-https://api-docs.deepseek.com/zh-cn/quick_start/error_codes
reasoning_content = response.choices[0].message.reasoning_content #推理过程
print(reasoning_content)
content = response.choices[0].message.content #推理内容
print(content)
#记录历史信息
self.messages.append({'role': 'assistant', 'content': content})
elif self.model == "deepseek-chat":
content = response.choices[0].message
#记录历史信息
self.messages.append(content)
#LLM记录存数据库
with self.llm_sn_lock:
self.llm_sn += 1
#llm查询记录入库
bres = th_DBM.insert_llm(self.task_id,prompt,reasoning_content,content,post_time,self.llm_sn)
if not bres:
print("llm入库失败!")
#********测试时使用---输出和记录LLM返回指令的message
print(f"Messages:{self.messages}")
with open("test","w",encoding="utf-8") as f: #输出到文件
json.dump(self.messages,f,ensure_ascii=False)
#需要对指令进行提取
instr_list = self.fetch_instruction(content)
return instr_list
def fetch_instruction(self,response_text):
'''
提取命令列表包括
1. Python 代码块仅保留有效 Python 代码
2. Shell 命令分割空行每个块视为一条指令
:param text: 输入文本
:return: 解析后的命令列表
'''
#针对llm的回复,提取执行的指令
# 正则匹配 Python 代码块
python_blocks = re.findall(r"```python(.*?)```", response_text, flags=re.DOTALL)
# 处理 Python 代码块,去除空行并格式化
python_blocks = [block.strip() for block in python_blocks]
# 按连续的空行拆分
# 移除 Python 代码块,但保留内容用于返回
text_no_python = re.sub(r"```python.*?```", "PYTHON_BLOCK", response_text, flags=re.DOTALL)
# 这里用 \n\s*\n 匹配一个或多个空白行
parts = re.split(r'\n\s*\n', text_no_python)
commands = []
python_index = 0 # 记录 Python 代码块插入位置
for part in parts:
part = part.strip()
if not part:
continue
if "PYTHON_BLOCK" in part:
# 还原 Python 代码块
commands.append(f"python {python_blocks[python_index]}")
python_index += 1
else:
# 添加普通 Shell 命令
commands.append(part)
return commands
def test_llm(self):
with open("test", "r", encoding="utf-8") as f:
messages = json.load(f)
text = messages[-1]["content"]
list = self.fetch_instruction(text)
for itme in list:
print("***********")
print(itme)
if __name__ == "__main__":
LM = LLMManager(1)
LM.test_llm()

260
TaskManager.py

@ -2,24 +2,31 @@
渗透测试任务管理类 一次任务的闭合性要检查2025-3-10 一次任务后要清理LLM和InstrM的数据 渗透测试任务管理类 一次任务的闭合性要检查2025-3-10 一次任务后要清理LLM和InstrM的数据
''' '''
from TargetManager import TargetManager # 从模块导入类 from TargetManager import TargetManager # 从模块导入类
from LLMManager import LLMManager # 同理修正其他导入 #from LLMManager import LLMManager # 同理修正其他导入
from mycode.ControlCenter import ControlCenter #控制中心替代LLM--控制中心要实现一定的基础逻辑和渗透测试树的维护。
from myutils.FileManager import FileManager from myutils.FileManager import FileManager
from InstructionManager import InstructionManager from InstructionManager import InstructionManager
from mycode.DBManager import DBManager from mycode.DBManager import DBManager
from myutils.MyTime import get_local_timestr from myutils.MyTime import get_local_timestr
from myutils.MyLogger_logger import LogHandler
import pickle
import queue import queue
import time import time
import os import os
import threading import threading
class TaskManager: class TaskManager:
def __init__(self): def __init__(self):
self.TargetM = TargetManager() self.TargetM = TargetManager()
self.logger = LogHandler().get_logger("TaskManager")
# 生成功能对象 # 生成功能对象
self.LLMM = LLMManager(1)
self.InstrM = InstructionManager()
self.DBM = DBManager() #主进程一个DBM self.DBM = DBManager() #主进程一个DBM
self.DBM.connect() if not self.DBM.connect():
self.logger.error("数据库连接失败!终止工作!")
return
self.CCM = ControlCenter(self.DBM)
self.InstrM = InstructionManager(self) # 类对象渗透,要约束只读取信息
# 控制最大并发指令数量 # 控制最大并发指令数量
self.max_thread_num = 2 self.max_thread_num = 2
self.task_id = 0 #任务id -- self.task_id = 0 #任务id --
@ -28,146 +35,82 @@ class TaskManager:
self.batch_num = 0 #一个批次的指令数量 self.batch_num = 0 #一个批次的指令数量
self.long_instr_num = 0 #耗时指令数量 self.long_instr_num = 0 #耗时指令数量
self.long_time_instr = ['nikto'] #耗时操作不计入批量执行的数量,不加不减 self.long_time_instr = ['nikto'] #耗时操作不计入批量执行的数量,不加不减
self.instr_queue = queue.Queue() #线程安全 --待执行指令 self.node_queue = [] #
self.user_instr = queue.Queue() #需要用确认或手动执行的命令--待执行指令
self.user_done_instr = queue.Queue() #执行完成需要用户确认的指令
self.doing_instr = queue.Queue()#执行中的指令
self.done_instr = queue.Queue() #执行完成的指令
self.res_queue = queue.Queue() #结果队列
self.lock = threading.Lock() #线程锁 self.lock = threading.Lock() #线程锁
self.node_num = 0 #在处理Node线程的处理
self.do_sn_lock = threading.Lock() #指令执行顺序号锁 self.do_sn_lock = threading.Lock() #指令执行顺序号锁
self.brun = True self.brun = True
def is_user_instr(self,instr): def res_in_quere(self,bres,instr,reslut,start_time,end_time,th_DBM,source_result,ext_params,work_node):
'''
过滤需要人工确认或手动执行的指令 ---- 待完善
:param instr:
:return:
'''
#if instr.startswith("curl") or instr.startswith("http") or instr.startswith("wget"):
if instr.startswith("http") or instr.startswith("wget") or instr.startswith("ssh"):
return True
def instr_in_quere(self,instr_list):
'''
对于运行需要较长时间的不强求同一批次返回给LLM
:param instr_list:
:return:
'''
for instr in instr_list:
if self.is_user_instr(instr):
self.user_instr.put(instr)
print(f"需要人工确认的指令{instr}")
else:
matched =False
for prefix in self.long_time_instr:
if instr.startswith(prefix):
matched =True
if not matched:
with self.lock:
self.batch_num += 1 #非耗时指令+1
print(f"&&&&&&当前batch_num:{self.batch_num}")
else:
with self.lock:
self.long_instr_num +=1 #耗时指令数量+1
# 指令入队列
self.instr_queue.put(instr)
#入数据库放哪一层要待定-TaskManager , InstructionM,ToolBase-2025-3-10
def res_in_db(self,bres,instr,reslut,start_time,end_time,th_DBM,source_result,ext_params):
if th_DBM.ok:
with self.do_sn_lock:
self.do_sn += 1 #指令的执行序列是一个任务共用,要线程锁,错误问题也不大
th_DBM.insetr_result(self.task_id,instr,reslut,self.do_sn,start_time,end_time,source_result,ext_params)
else:
print("数据库连接失败!!")
def res_in_quere(self,bres,instr,reslut,start_time,end_time,th_DBM,source_result,ext_params):
''' '''
执行结果入队列若批量执行的指令都完成则提交LLM生成下一步指令 执行结果入队列
:param bres: :param bres:
:param instr: :param instr:
:param reslut: :param reslut:
:return: :return:
''' '''
matched = False #入数据库 -- bres True和False 都入数据库2025-3-10---加node_path(2025-3-18)#?
bover = False if th_DBM.ok:
#入数据库 -- bres True和False 都入数据库2025-3-10 work_node.do_sn += 1 # 指令的执行序列是一个任务共用,要线程锁,错误问题也不大
self.res_in_db(bres,instr,reslut,start_time,end_time,th_DBM,source_result,ext_params) th_DBM.insetr_result(self.task_id, instr, reslut, work_node.do_sn, start_time, end_time, source_result,
ext_params, work_node.path)
#结果入队列
if bres:
res = {'instr':instr,'reslut':reslut}
self.res_queue.put(res) #入队列
else: #对于不需要再提交给LLM的结果,如何处理待定。有执行false、未知工具
pass
#判断批次指令是否都执行完
for prefix in self.long_time_instr:
if instr.startswith(prefix):
matched = True
if not matched:
with self.lock:
self.batch_num -= 1 #非耗时指令-1
print(f"&&&&&&当前batch_num:{self.batch_num}")
if self.batch_num ==0: #只会轮询到最后一个线程才会是0
bover = True
else: else:
#这里会有个问题,若耗时指令执行完,结果入队列后,但LLM没有进一步的指令下发,会造成结果不会提交 self.logger.error("数据库连接失败!!")
#需要在生成“生成报告”的指令时进行研判--该点很重要
with self.lock:
self.long_instr_num -=1 #耗时指令数量-1
if bover:
#整合结果提交 -- 需要确保只有一个线程会执行
self.batch_res_post(th_DBM)
def batch_res_post(self,th_DBM):
'''
组合批量指令的结果一起提交到LLM生成下一步具体指令
:return:
'''
post_string = ""
while not self.res_queue.empty():
res = self.res_queue.get()
str = f"执行指令:{res['instr']}的结果是:{res['reslut']}"
post_string = post_string + "\n"
post_string = post_string + str
if post_string: #结果入队列---2025-3-18所有的指令均需返回给LLM便于节点状态的更新,所以bres作用要调整。
post_string = post_string + "\n请根据这些结果生成下一步具体的指令。" res = {'执行指令':instr,'结果':reslut}
work_node.add_res(res) #入节点结果队列
print(f"***************\n{post_string}")
with open("res","w",encoding="utf-8") as f:
f.write(post_string)
#*****测试时中断下一步指令的获取
# 提交提示词,得到下一步指令
# instr_list = self.LLMM.get_llm_instruction(post_string,th_DBM)
# if instr_list:
# if instr_list[0] == "生成报告": #“生成报告”要特殊处理#?
# self.brun = False
# print("生成报告--退出工作线程")
# else: # 继续工作
# self.instr_in_quere(instr_list)
def do_worker_th(self): def do_worker_th(self):
#线程的dbm需要一个线程一个 #线程的dbm需要一个线程一个
th_DBM = DBManager() th_DBM = DBManager()
th_DBM.connect() th_DBM.connect()
while self.brun: while self.brun:
try: work_node = None
instruction = self.instr_queue.get(block=False) #quere线程安全,block=false非阻塞get with self.lock:
self.doing_instr.put(instruction) #入执行队列 if self.node_queue:
#执行中会对指令进行微调,并有可能不执行直接返回空结果 # 获取测试节点
work_node = self.node_queue.pop(0)
self.node_num += 1
if work_node:
#开始执行指令
for instruction in work_node.instr_queue:
start_time = get_local_timestr() #指令执行开始时间 start_time = get_local_timestr() #指令执行开始时间
bres,instr,reslut,source_result,ext_params = self.InstrM.execute_instruction(instruction) bres, instr, reslut, source_result, ext_params = self.InstrM.execute_instruction(instruction)
end_time = get_local_timestr() #指令执行结束时间 end_time = get_local_timestr() # 指令执行结束时间
self.res_in_quere(bres,instr,reslut,start_time,end_time,th_DBM,source_result,ext_params) #执行结果入队列 self.res_in_quere(bres, instr, reslut, start_time, end_time, th_DBM, source_result,
self.done_instr.put(instruction) #执行完成队列 ext_params,work_node) # 执行结果入队列
#执行情况是否需要用户确认 #保存记录--测试使用
if ext_params["is_user"]: with self.lock:
pass self.node_num -=1
#print("该指令执行需要用户确认") if self.node_num == 0 and len(self.node_queue) == 0:
except queue.Empty: with open("attack_tree", 'wb') as f:
pickle.dump(TM.CCM.attack_tree, f)
#针对一个节点的指令执行完成后,提交LLM规划下一步操作
# node_list = self.CCM.get_llm_instruction(work_node)
# if not node_list:#该节点测试完成
# continue
# for node in node_list:
# self.node_queue.put(node)
else:
time.sleep(10) time.sleep(10)
# try:
# instruction = self.instr_queue.get(block=False) #quere线程安全,block=false非阻塞get
# self.doing_instr.put(instruction) #入执行队列--忘了入执行队列的作用
# #执行中会对指令进行微调,并有可能不执行直接返回空结果
# start_time = get_local_timestr() #指令执行开始时间
# bres,instr,reslut,source_result,ext_params = self.InstrM.execute_instruction(instruction)
# end_time = get_local_timestr() #指令执行结束时间
# self.res_in_quere(bres,instr,reslut,start_time,end_time,th_DBM,source_result,ext_params) #执行结果入队列
# self.done_instr.put(instruction) #执行完成队列
# #执行情况是否需要用户确认
# if ext_params["is_user"]:
# pass
# #print("该指令执行需要用户确认")
# except queue.Empty:
# time.sleep(10)
#函数结束,局部变量自动释放 #函数结束,局部变量自动释放
def start_task(self,target_name,target_in): def start_task(self,target_name,target_in):
@ -180,11 +123,12 @@ class TaskManager:
#获取基本信息: 读取数据库或预生成指令,获取基本的已知信息 #获取基本信息: 读取数据库或预生成指令,获取基本的已知信息
know_info = "" #? know_info = "" #?
#启动--初始化指令 #启动--初始化指令
prompt = self.LLMM.build_initial_prompt(target,know_info) node_list = self.CCM.start_do(target,self.task_id)
instr_list = self.LLMM.get_llm_instruction(prompt,self.DBM) with self.lock:
self.instr_in_quere(instr_list) #指令入队列 for node in node_list:
self.node_queue.append(node)
#创建工作线程 #创建工作线程----2025-3-18调整为一个节点一个线程,
for i in range(self.max_thread_num): for i in range(self.max_thread_num):
w_th = threading.Thread(target=self.do_worker_th) w_th = threading.Thread(target=self.do_worker_th)
w_th.start() w_th.start()
@ -200,7 +144,7 @@ class TaskManager:
def stop_task(self): def stop_task(self):
self.brun = False self.brun = False
self.LLMM.init_data() #清空一些全局变量 self.CCM.stop_do() #清空一些全局变量
self.InstrM.init_data() self.InstrM.init_data()
#结束任务需要收尾处理#? #结束任务需要收尾处理#?
@ -211,22 +155,18 @@ if __name__ == "__main__":
FM = FileManager() FM = FileManager()
current_path = os.path.dirname(os.path.realpath(__file__)) current_path = os.path.dirname(os.path.realpath(__file__))
strMsg = FM.read_file("test",1) strMsg = FM.read_file("test",1)
TM.LLMM.test_old_message(strMsg) #先设置message的值
test_type = 3 test_type = 3
if test_type == 1: if test_type == 1:
#测试执行指令 #测试执行指令
# instrS = ['gobuster dir -u https://58.216.217.70 -w /usr/share/wordlists/dirbuster/directory-list-2.3-medium.txt -k -t 20 --timeout 10s', with open("attack_tree", "rb") as f:
# 'searchsploit SoftEther VPN', TM.CCM.attack_tree = pickle.load(f)
# 'curl -kv -X POST -d "username=admin&password=admin" https://58.216.217.70/vpn/index.html --connect-timeout 10', # 遍历node,查看有res的数据
# 'gobuster dir -u http://58.216.217.70:10001 -w /usr/share/wordlists/dirbuster/directory-list-2.3-medium.txt -t 20 --timeout 10s'] nodes = TM.CCM.attack_tree.traverse_bfs()
#instrS = ['nmap -sV -sC -p- -Pn 192.168.204.137'] for node in nodes:
with open("test", "r", encoding="utf-8") as f: if node.instr_queue: # list
messages = json.load(f) TM.node_queue.append(node)
text = messages[-1]["content"]
#list = TM.LLMM.fetch_instruction(text)[9:10]
list = TM.LLMM.fetch_instruction(text)
#print(list)
TM.instr_in_quere(list)
#创建线程执行指令 #创建线程执行指令
for i in range(TM.max_thread_num): for i in range(TM.max_thread_num):
w_th = threading.Thread(target=TM.do_worker_th) w_th = threading.Thread(target=TM.do_worker_th)
@ -237,16 +177,32 @@ if __name__ == "__main__":
t.join() t.join()
elif test_type ==2: elif test_type ==2:
#测试LLM返回下一步指令 #测试LLM返回下一步指令
with open("res", "r", encoding="utf-8") as f: with open("attack_tree", "rb") as f:
prompt = f.read() TM.CCM.attack_tree = pickle.load(f)
if prompt: #遍历node,查看有res的数据
instr_list = TM.LLMM.get_llm_instruction(prompt,TM.DBM) nodes = TM.CCM.attack_tree.traverse_bfs()
TM.instr_in_quere(instr_list) # 指令入队列问题不大,任务执行线程没有起 for node in nodes:
elif test_type ==3: #新目标测试 if node.res_quere: #list
prompt = TM.LLMM.build_initial_prompt("192.168.204.137","") node_list = TM.CCM.get_llm_instruction(node)
if prompt: if not node_list:#该节点测试完成
instr_list = TM.LLMM.get_llm_instruction(prompt,TM.DBM) continue
TM.instr_in_quere(instr_list) # 指令入队列问题不大,任务执行线程没有起 with TM.lock:
for do_node in node_list:
TM.node_queue.append(do_node)
# 暂存状态--
with open("attack_tree", 'wb') as f:
pickle.dump(TM.CCM.attack_tree, f)
elif test_type == 3: #新目标测试
# 启动--初始化指令
node_list = TM.CCM.start_do("192.168.204.137", 0)
with TM.lock:
for node in node_list:
TM.node_queue.append(node)
#暂存状态--
with open("attack_tree",'wb') as f:
pickle.dump(TM.CCM.attack_tree,f)
elif test_type == 4: #读取messages
pass
else: else:
#完整过程测试---要设定终止条件 #完整过程测试---要设定终止条件
pass pass

4
config.yaml

@ -13,8 +13,8 @@ mysql:
passwd: anan2013@BABY passwd: anan2013@BABY
database: zfsafe database: zfsafe
#sqlit #LLM-Type
sqlite: zfbox.db LLM_type: 1 #0-腾讯云,1-DS,2-GPT
#用户初始密码 #用户初始密码
pw: zfkj_123!@# pw: zfkj_123!@#

148
mycode/AttackMap.py

@ -0,0 +1,148 @@
import queue
#渗透测试树结构维护类
class AttackTree:
def __init__(self,root_node):
#针对根节点处理
self.root = root_node
self.root.path = f"目标系统->{root_node.name}"
def set_root(self,root_node):
self.root = root_node
def add_node(self,parent_name,new_node):
"""根据父节点名称添加新节点"""
parent_node = self.find_node_by_name(parent_name)
if parent_node:
parent_node.add_child(new_node)
return True
return False
def traverse_bfs(self):
"""广度优先遍历"""
if not self.root:
return []
queue = [self.root]
result = []
while queue:
current = queue.pop(0)
result.append(current)
queue.extend(current.children)
return result
def traverse_dfs(self, node=None, result=None):
"""深度优先遍历(前序遍历)"""
if result is None:
result = []
if node is None:
node = self.root
if not node:
return []
result.append(node)
for child in node.children:
self.traverse_dfs(child, result)
return result
def find_node_by_name(self, name):
"""根据名称查找节点(广度优先)"""
nodes = self.traverse_bfs()
for node in nodes:
if node.name == name:
return node
return None
def find_node_by_nodepath(self,node_path):
'''基于节点路径查找节点,只返回找到的第一个节点,若有节点名称路径重复的情况暂不处理'''
current_node = self.root #从根节点开始
node_names = node_path.split('->')
for node_name in node_names:
if node_name == "目标系统":
continue
if node_name == current_node.name:#根节点开始
continue
else:
bfound = False
for child_node in current_node.children:
if child_node.name == node_name: #约束同一父节点下的子节点名称不能相同
current_node = child_node
bfound = True
break
if not bfound: #如果遍历子节点都没有符合的,说明路径有问题的,不处理中间一段路径情况
return None
#找到的话,就开始匹配下一层
return current_node
def find_nodes_by_status(self, status):
"""根据状态查找所有匹配节点"""
return [node for node in self.traverse_bfs() if node.status == status]
def find_nodes_by_vul_type(self, vul_type):
"""根据漏洞类型查找所有匹配节点"""
return [node for node in self.traverse_bfs() if node.vul_type == vul_type]
#考虑要不要用tree封装节点的操作--待定
def update_node_status(self, node_name, new_status):
"""修改节点状态"""
node = self.find_node_by_name(node_name)
if node:
node.status = new_status
return True
return False
def update_node_vul_type(self,node_name,vul_type):
"""修改节点漏洞类型"""
node = self.find_node_by_name(node_name)
if node:
node.vul_type = vul_type
return True
return False
def print_tree(self, node=None, level=0):
"""可视化打印树结构"""
if node is None:
node = self.root
prefix = " " * level + "|-- " if level > 0 else ""
print(f"{prefix}{node.name} [{node.status}, {node.vul_type}]")
for child in node.children:
self.print_tree(child, level + 1)
class TreeNode:
def __init__(self, name, status="未完成", vul_type="未发现"):
self.name = name # 节点名称
self.status = status # 节点状态
self.vul_type = vul_type # 漏洞类型
self.children = [] # 子节点列表
self.parent = None # 父节点引用
self.path = "" #当前节点的路径
self.instr_queue = [] #queue.Queue() #针对当前节点的执行指令----重要约束:一个节点只能有一个线程在执行指令
self.res_quere = [] #queue.Queue() #指令执行的结果,一批一批
self.llm_sn = 0 #针对该节点llm提交次数
self.do_sn = 0 #针对该节点instr执行次数
self.messages = [] #针对当前节点积累的messages -- 针对不同节点提交不同的messages
def add_child(self, child_node):
child_node.parent = self
child_node.path = self.path + f"->{child_node.name}" #子节点的路径赋值
child_node.messages = self.messages #传递messages #给什么时候的messages待验证#?
self.children.append(child_node)
def add_instr(self,instr):
self.instr_queue.append(instr)
def get_instr(self):
return self.instr_queue.pop(0) if self.instr_queue else None
def add_res(self,str_res): #结构化结果字串
self.res_quere.append(str_res)
def get_res(self):
return self.res_queue.pop(0) if self.res_queue else None
def __repr__(self):
return f"TreeNode({self.name}, {self.status}, {self.vul_type})"
if __name__ == "__main__":
pass

180
mycode/ControlCenter.py

@ -0,0 +1,180 @@
#自动化测试逻辑规则控制
#统一控制规则 和 渗透测试树的维护
import json
import re
import queue
from mycode.AttackMap import AttackTree
from mycode.AttackMap import TreeNode
from mycode.LLMManager import LLMManager
from myutils.ConfigManager import myCongif #单一实例
from myutils.MyLogger_logger import LogHandler
class ControlCenter:
def __init__(self,DBM):
self.logger = LogHandler().get_logger("ControlCenter")
self.task_id = None
self.target = None
self.attack_tree = None
self.DBM = DBM
#LLM对象
self.LLM = LLMManager(myCongif.get_data("LLM_type"))
def __del__(self):
self.task_id = None
self.target = None
self.attack_tree = None
self.DBM = None
def init_cc_data(self):
#一次任务一次数据
pass
def get_user_init_info(self):
'''开始任务初,获取用户设定的基础信息'''
# ?包括是否对目标进行初始化的信息收集
return ""
def start_do(self,target,task_id):
'''一个新任务的开始'''
self.task_id = task_id
self.target = target
#创建测试树
if self.attack_tree:
self.attack_tree = None #释放
root_node = TreeNode(target)
self.attack_tree = AttackTree(root_node)#创建测试树,同时更新根节点相关内容
#获取初始指令
know_info = self.get_user_init_info()
prompt = self.LLM.build_initial_prompt(target,know_info,root_node)
node_cmds, commands = self.LLM.get_llm_instruction(prompt,self.DBM,root_node)
# 更新tree
self.tree_manager(node_cmds,root_node)
# 分析指令入对应节点
node_list = self.instr_in_node(commands,root_node)
return node_list
#约束1:一个节点只能同时提交一次,未测试的节点不要重复
def get_llm_instruction(self,node):
#拼接结果字符串--由于测试需要queue改成了list 2025-3-19
# json_strings = []
# if node.res_quere:
# for item in node.res_quere:
# json_str = json.dumps(item, ensure_ascii=False)
# json_strings.append(json_str)
# res_str = ','.join(json_strings)
res_str = json.dumps(node.res_quere,ensure_ascii=False)
#构造本次提交的prompt
user_Prompt = f'''
当前分支路径{node.path}
当前节点信息
- 节点名称{node.name}
- 节点状态{node.status}
- 漏洞类型{node.vul_type}
上一步结果{res_str}
任务生成下一步渗透测试指令或结束该节点渗透测试
'''
node_cmds,commands = self.LLM.get_llm_instruction(user_Prompt,self.DBM,node)
#更新tree
self.tree_manager(node_cmds,node)
#分析指令入对应节点
node_list = self.instr_in_node(commands,node)
return node_list
def tree_manager(self,node_cmds,node):
'''更新渗透测试树'''
if not node_cmds or len(node_cmds)==0:
return
try:
node_jsons = json.loads(node_cmds)
for node_json in node_jsons:
if node_json["action"] == "add_node": #新增节点
parent_node_name = node_json["parent"]
node_name = node_json["node"]
status = node_json["status"]
#新增节点原则上应该都是当前节点增加子节点
if node.name == parent_node_name:
new_node = TreeNode(node_name,status)
node.add_child(new_node) #message的传递待验证
else:
self.logger.error("添加子节点时,遇到父节点名称不一致的,需要介入!!")
elif node_json["action"] == "update_status":
node_name = node_json["node"]
status = node_json["status"]
vul_type = node_json["vulnerability"]
if node.name == node_name:
node.status = status
node.vul_type = vul_type
else:
self.logger.error("遇到不是修改本节点状态的,需要介入!!")
else:
self.logger.error("node更新JSON遇到异常参数!")
except json.JSONDecodeError as e:
self.logger.error(f"JSON 解析失败:{e.msg}")
self.logger.error(f"错误位置:第{e.lineno}行,列{e.colno}")
except TypeError as e:
self.logger.error(f"输入类型错误:{str(e)}")
def instr_in_node(self,commands,node):
node_list = []
for command in commands:
# 使用正则匹配方括号中的node_path(非贪婪模式)
match = re.search(r'\[(.*?)\]', command)
if match:
node_path = match.group(1)
find_node = self.attack_tree.find_node_by_nodepath(node_path)
if find_node:
instruction = re.sub(r'\[.*?\]', "", command,count=1,flags=re.DOTALL)
find_node.instr_queue.append(instruction)
#入输出队列
if find_node not in node_list:
node_list.append(find_node)
else:
self.logger.error(f"基于节点路径没有找到对应的节点{node_path}")
else:
self.logger.error(f"得到的指令格式不符合规范:{command}")
return node_list
#待修改
def is_user_instr(self,instr):
'''
过滤需要人工确认或手动执行的指令 ---- 待完善
:param instr:
:return:
'''
#if instr.startswith("curl") or instr.startswith("http") or instr.startswith("wget"):
if instr.startswith("http") or instr.startswith("wget") or instr.startswith("ssh"):
return True
#指令入队列,待修改
def instr_in_quere(self,instr_list):
'''
对于运行需要较长时间的不强求同一批次返回给LLM
:param instr_list:
:return:
'''
for instr in instr_list:
if self.is_user_instr(instr):
self.user_instr.put(instr)
print(f"需要人工确认的指令{instr}")
else:
matched =False
for prefix in self.long_time_instr:
if instr.startswith(prefix):
matched =True
if not matched:
with self.lock:
self.batch_num += 1 #非耗时指令+1
print(f"&&&&&&当前batch_num:{self.batch_num}")
else:
with self.lock:
self.long_instr_num +=1 #耗时指令数量+1
# 指令入队列
self.instr_queue.append(instr)
def stop_do(self):
#清空数据
self.task_id = None
self.target = None
self.attack_tree = None

19
mycode/DBManager.py

@ -147,7 +147,7 @@ class DBManager:
return task_id return task_id
#指令执行结果入库 #指令执行结果入库
def insetr_result(self,task_id,instruction,result,do_sn,start_time,end_time,source_result,ext_params): def insetr_result(self,task_id,instruction,result,do_sn,start_time,end_time,source_result,ext_params,node_path):
# 统一将 result 转为 JSON 字符串(无论原始类型) # 统一将 result 转为 JSON 字符串(无论原始类型)
try: try:
if not isinstance(result, str): if not isinstance(result, str):
@ -162,16 +162,16 @@ class DBManager:
# 使用参数化查询 # 使用参数化查询
sql = """ sql = """
INSERT INTO task_result INSERT INTO task_result
(task_id, instruction, result, do_sn,start_time,end_time,source_result,is_user,is_vulnerability) (task_id, instruction, result, do_sn,start_time,end_time,source_result,is_user,is_vulnerability,node_path)
VALUES VALUES
(%s, %s, %s, %s, %s, %s,%s,%s,%s) (%s, %s, %s, %s, %s, %s,%s,%s,%s,%s)
""" """
params = (task_id, instruction, str_result, do_sn,start_time,end_time,source_result,ext_params['is_user'], params = (task_id, instruction, str_result, do_sn,start_time,end_time,source_result,ext_params['is_user'],
ext_params['is_vulnerability']) ext_params['is_vulnerability'],node_path)
return self.safe_do_sql(sql,params) return self.safe_do_sql(sql,params)
#llm数据入库 #llm数据入库
def insert_llm(self,task_id,prompt,reasoning_content,content,post_time,llm_sn): def insert_llm(self,task_id,prompt,reasoning_content,content,post_time,node):
str_reasoning = "" str_reasoning = ""
str_content = "" str_content = ""
try: try:
@ -196,11 +196,14 @@ class DBManager:
sql=""" sql="""
INSERT INTO task_llm INSERT INTO task_llm
(task_id,do_sn,prompt,reasoning_content,content,start_time) (task_id,do_sn,prompt,reasoning_content,content,start_time,node_path)
VALUES VALUES
(%s, %s, %s, %s, %s, %s) (%s, %s, %s, %s, %s, %s,%s)
""" """
params = (task_id,llm_sn,prompt,str_reasoning,str_content,post_time) str_reasoning = str_reasoning.encode('utf-8').decode('unicode_escape')
str_content = str_content.encode('utf-8').decode('unicode_escape')
params = (task_id,node.llm_sn,prompt,str_reasoning,str_content,post_time,node.path)
return self.safe_do_sql(sql,params) return self.safe_do_sql(sql,params)
def test(self): def test(self):

23
mycode/InitManager.py

@ -0,0 +1,23 @@
class InitManger:
def __init__(self):
listen_tatus = False
def init_tool_info(self,self_ip="",self_port=0):
'''
初始化工具信息和工作 - 一直运行的信息
:return:
'''
#建立一个socket监听线程,处理反向连接
if self_ip and self_port: #ip和端口有值后
listen_tatus = False
pass
def init_task_info(self,cookie=""):
'''
初始化任务信息-- 一次任务一次的信息
:return:
'''
pass
InitM = InitManger() #单一实例

233
mycode/LLMManager.py

@ -0,0 +1,233 @@
'''
实现对大模型调用的封装隔离具体使用的LLM
pip install openai
export OPENAI_API_KEY="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"
'''
import openai
import json
import threading
import re
from openai import OpenAI
from myutils.MyTime import get_local_timestr
from myutils.MyLogger_logger import LogHandler
class LLMManager:
def __init__(self,illm_type=0):
self.logger = LogHandler().get_logger("LLMManager")
self.api_key = None
self.api_url = None
self.task_id =0 #一个任务一个id
self.llm_sn = 0 # llm执行序列号,--一任务一序列
self.llm_sn_lock = threading.Lock() #
#temperature设置
#DS------代码生成/数学解题:0.0 -- 数据抽取/分析:1.0 -- 通用对话:1.3 -- 翻译:1.3 -- 创意类写作:1.5
#腾讯云---
self.temperature = 1.0
if illm_type == 0: #腾讯云
self.api_key = "fGBYaQLHykBOQsFwVrQdIFTsYr8YDtDVDQWFU41mFsmvfNPc"
self.api_url = ""
elif illm_type == 1: #DS
self.api_key ="sk-10360148b465424288218f02c87b0e1b"
self.api_url ="https://api.deepseek.com/v1"
self.model = "deepseek-reasoner" #model=deepseek-reasoner -- R1 model=deepseek-chat --V3
# 创建会话对象 -- 一个任务的LLM必须唯一
self.client = OpenAI(api_key=self.api_key, base_url=self.api_url)
elif illm_type ==3: #GPT
self.api_key =""
self.api_url = ""
self.model = ""
self.client = OpenAI()
self.messages = []
# 初始化阶段和已知信息
current_stage = "信息收集"
known_info = {"url": "www.test.com"}
results = [] # 存储所有任务结果,用于生成报告
#******测试使用,设置slef.message的值
def test_old_message(self,strMessage):
try:
self.messages = json.loads(strMessage)
except json.JSONDecodeError as e:
print(f"JSON解析错误: {str(e)}")
except Exception as e:
print(f"错误: {str(e)}")
# 构建初始提示 初始化messages
def build_initial_prompt(self,target,know_info="",node=None):
if not node:
return None
#根节点初始化message
node.messages = [{"role": "system",
"content":'''
你是一位渗透测试专家,由你动态控制整个渗透测试过程根据当前测试状态和返回结果决定下一步的测试指令推动测试阶段前进直至完成渗透测试本地程序会根据你的指令进行执行,然后将执行结果返回给你
总体要求说明
1.以目标系统所在IP为根节点随着信息收集和渗透测试的推进每个渗透测试点如端口服务漏洞点作为子节点形成树型结构测试树
2.测试树整体数据由本地程序存储你只需要关注当前节点的渗透测试推进节点状态(未完成已完成)的更新和是否有子节点新增
3.你需要返回两类指令节点指令和测试指令以空行间隔
--示例
{\"action\":\"add_node\", \"parent\": \"192.168.1.100\", \"node\": \"21端口\", \"status\": \"未完成\"}
```bash-[目标系统->192.168.1.100->3306端口]
mysql -u root -p 192.168.1.100```
若无节点修改或新增节点指令可以为空但测试指令必须对应已有节点
决策流程
1. 如果当前节点是IP地址且未进行端口扫描则执行端口扫描
2. 如果端口扫描发现开放端口则为每个开放端口新增一个测试节点
3. 如果当前节点是端口且未进行服务扫描则执行服务扫描
4. 如果服务扫描发现服务版本或漏洞则新增对应的漏洞测试节点
5. 如果当前节点是漏洞测试且漏洞利用成功则根据利用结果决定是否新增子节点进一步测试
6. 如果当前节点测试未发现新信息则更新节点状态为已完成并继续测试其他未完成节点
生成的节点指令需要满足如下约束
1.新增节点指令示例{\"action\":\"add_node\", \"parent\": \"80端口\", \"node\": \"http://192.168.1.100/index.php?id=1\", \"status\": \"未完成\"};
2.新增子节点时同一请求返回的子节点名node不能相同且必须同时提供对该节点的测试指令
3.若认为该节点已完成测试修改该节点为已完成状态完成节点测试且发现漏洞示例{\"action\": \"update_status\", \"node\": \"21端口\", \"status\": \"已完成\"\"vulnerability\": \"ftp匿名登录\"};
4.发现漏洞后可根据漏洞类型决定是否新增子节点继续测试{\"action\": \"add_node\", \"parent\": \"21端口\", \"node\": \"ftp配置检查\", \"status\": \"未完成\"};
生成的渗透测试指令需满足如下约束
1.只返回具体的shell指令或Python代码不要包含注释和说明
2.shell指令以```bash-[对应节点的路径](.*?)```包裹python代码以```python-[对应节点的路径](.*?)```包裹[对应节点的路径]为从根节点到目标节点的完整层级描述
3.若提供的是shell指令需要避免用户再次交互
4.若提供的是python代码主函数名为dynamic_fun需包含错误处理执行结束后必须返回一个tuple (status, output)其中status为'success''failure'output为补充输出信息
示例
```python-[目标系统->192.168.1.100->3306端口]
def dynamic_fun():
try:
result = "扫描完成"
return ("success", result)
except Exception as e:
return ("failure", str(e))
```
限制条件
1.仅在发现高危漏洞或关键测试路径时新增节点
'''}] # 一个messages
user_Prompt = f'''
当前分支路径目标系统->{target}
当前节点信息
- 节点名称{target}
- 节点状态未完成
- 漏洞类型未发现
上一步结果{know_info}
任务生成下一步渗透测试指令或结束该节点的渗透测试(修改节点状态为已完成)
'''
return user_Prompt
def init_data(self,task_id=0):
#初始化LLM数据
self.llm_sn = 0
self.task_id = task_id
self.messages = []
# 调用LLM生成指令
def get_llm_instruction(self,prompt,th_DBM,node):
'''
1.由于大模型API不记录用户请求的上下文一个任务的LLM不能并发
:param prompt:用户本次输入的内容
:return: instr_list
'''
#添加本次输入入该节点的message队列
message = {"role":"user","content":prompt}
node.messages.append(message)
#提交LLM
post_time = get_local_timestr()
response = self.client.chat.completions.create(
model=self.model,
messages = node.messages
)
#LLM返回结果处理
reasoning_content = ""
content = ""
#LLM返回处理
if self.model == "deepseek-reasoner":
#返回错误码:DS-https://api-docs.deepseek.com/zh-cn/quick_start/error_codes
reasoning_content = response.choices[0].message.reasoning_content #推理过程
print(reasoning_content)
content = response.choices[0].message.content #推理内容
print(content)
# 记录llm历史信息
node.messages.append({'role': 'assistant', 'content': content})
elif self.model == "deepseek-chat":
content = response.choices[0].message
# 记录llm历史信息
node.messages.append(content)
else:
self.logger.error("处理到未预设的模型!")
return None
#LLM记录存数据库
node.llm_sn += 1
bres = th_DBM.insert_llm(self.task_id,prompt,reasoning_content,content,post_time,node)
if not bres:
self.logger.error(f"{node.name}-llm入库失败!")
#需要对指令进行提取
node_cmds,commands = self.fetch_instruction(content,node)
return node_cmds,commands
def fetch_instruction(self,response_text,node):
'''
*****该函数很重要需要一定的容错能力解析LLM返回内容*****
处理边界只格式化分析LLM返回内容指令和节点操作等交其他模块
节点控制指令
渗透测试指令
提取命令列表包括
1. Python 代码块 python[](.*?)
2. Shell 命令``bash[](.*?)```
:param text: 输入文本
:return: node_cmds,python_blocks,shell_blocks
'''
#针对llm的回复,提取节点操作数据和执行的指令----
# 正则匹配 Python 代码块
python_blocks = re.findall(r"```python-(.*?)```", response_text, flags=re.DOTALL)
# 处理 Python 代码块,去除空行并格式化
python_blocks = [block.strip() for block in python_blocks]
#正则匹配shell指令
shell_blocks = re.findall(f"```bash-(.*?)```", response_text, flags=re.DOTALL)
shell_blocks = [block.strip() for block in shell_blocks]
# 按连续的空行拆分
# 移除 Python和bash 代码块
text_no_python = re.sub(r"```python.*?```", "PYTHON_BLOCK", response_text, flags=re.DOTALL)
text = re.sub(r"```bash.*?```", "SHELL_BLOCK", text_no_python, flags=re.DOTALL)
# 这里用 \n\s*\n 匹配一个或多个空白行
parts = re.split(r'\n\s*\n', text)
node_cmds = []
commands = []
python_index = 0
shell_index = 0
for part in parts:
part = part.strip()
if not part:
continue
if "PYTHON_BLOCK" in part:
# 还原 Python 代码块
commands.append(f"python_code {python_blocks[python_index]}")
python_index += 1
elif "SHELL_BLOCK" in part:
commands.append(shell_blocks[shell_index])
shell_index +=1
else:
#其他的认为是节点操作指令
node_cmds.append(part)
return node_cmds,commands
def test_llm(self):
with open("../test", "r", encoding="utf-8") as f:
messages = json.load(f)
text = messages[-1]["content"]
list = self.fetch_instruction(text)
for itme in list:
print("***********")
print(itme)
if __name__ == "__main__":
LM = LLMManager(1)
LM.test_llm()

2
test

File diff suppressed because one or more lines are too long

4
tools/MsfconsoleTool.py

@ -9,8 +9,8 @@ class MsfconsoleTool(ToolBase):
''' '''
Metasploit 这样的交互工具保持一个会话析构时结束会话 Metasploit 这样的交互工具保持一个会话析构时结束会话
''' '''
def __init__(self): def __init__(self,TM):
super().__init__() super().__init__(TM)
self.bOK = False self.bOK = False
self.msf_lock = threading.Lock() # 线程锁 self.msf_lock = threading.Lock() # 线程锁

38
tools/NmapTool.py

@ -1,5 +1,6 @@
# Nmap工具类 # Nmap工具类
import re import re
import json
from typing import List, Dict, Any from typing import List, Dict, Any
from tools.ToolBase import ToolBase from tools.ToolBase import ToolBase
@ -47,23 +48,56 @@ class NmapTool(ToolBase):
banners = [line.strip() for line in banner_pattern.findall(nmap_output)] banners = [line.strip() for line in banner_pattern.findall(nmap_output)]
result['banners'] = banners if banners else "No banner info found" result['banners'] = banners if banners else "No banner info found"
# 3. 提取 OS 信息 # 3. 提取 OS 信息--目前存在问题
# 常见格式:Service Info: OS: Linux; CPE: cpe:/o:linux:linux_kernel # 常见格式:Service Info: OS: Linux; CPE: cpe:/o:linux:linux_kernel
'''常见格式
Running: Linux 2.6.X
OS CPE: cpe:/o:linux:linux_kernel:2.6
OS details: Linux 2.6.9 - 2.6.33
Network Distance: 1 hop
Service Info: Hosts: metasploitable.localdomain, irc.Metasploitable.LAN; OSs: Unix, Linux; CPE: cpe:/o:linux:linux_kernel
'''
os_match = re.search(r"Service Info:\s+OS:\s*([^;]+);", nmap_output, re.IGNORECASE) os_match = re.search(r"Service Info:\s+OS:\s*([^;]+);", nmap_output, re.IGNORECASE)
result['os_info'] = os_match.group(1).strip() if os_match else "No OS info found" result['os_info'] = os_match.group(1).strip() if os_match else "No OS info found"
# 4. 其他信息:原始输出作为补充 # 4. 其他信息:原始输出作为补充
result['raw'] = nmap_output #result['raw'] = nmap_output
except Exception as e: except Exception as e:
result['error'] = f"Error parsing nmap output: {e}" result['error'] = f"Error parsing nmap output: {e}"
result = json.dumps(result)
return result return result
def extract_key_info(self,nmap_text):
"""
利用正则表达式提取每一行的端口状态服务及版本/额外信息
只处理形如 "端口/tcp 状态 服务 版本/额外信息" 的行达到字符缩减的效果
"""
# 正则模式:匹配开头数字、/tcp、状态、服务名称,后续内容为可选的额外信息
pattern = re.compile(r'^(\d+)/tcp\s+(\w+)\s+(\S+)(?:\s+(.*))?$')
key_info = []
for line in nmap_text.splitlines():
match = pattern.match(line)
if match:
port, state, service, extra = match.groups()
extra = extra.strip() if extra else ""
key_info.append({
"port": port,
"state": state,
"service": service,
"info": extra
})
return key_info
def analyze_result(self, result,instruction,stderr,stdout): def analyze_result(self, result,instruction,stderr,stdout):
# 检查结果 # 检查结果
start_index = result.find("If you know the service/version") start_index = result.find("If you know the service/version")
if start_index != -1: if start_index != -1:
return result[:start_index] return result[:start_index]
#result = self.parse_nmap_output(result) #result = self.parse_nmap_output(result)
# tmpstr = self.extract_key_info(result)
# result = json.dumps(tmpstr)
return result return result

1
tools/OpensslTool.py

@ -47,6 +47,7 @@ class OpensslTool(ToolBase):
except x509.ExtensionNotFound: except x509.ExtensionNotFound:
pass pass
if cert_obj:
results.append({ results.append({
'subject': str(cert_obj.subject), 'subject': str(cert_obj.subject),
'issuer': str(cert_obj.issuer), 'issuer': str(cert_obj.issuer),

80
tools/PythonTool.py

@ -1,80 +0,0 @@
#python代码动态执行
from tools.ToolBase import ToolBase
class PythonTool(ToolBase):
def validate_instruction(self, instruction):
#指令过滤
timeout = 0
return "",timeout
def execute_instruction(self, instruction_old):
'''
执行指令验证合法性 -> 执行 -> 分析结果
:param instruction_old:
:return:
bool:true-正常返回给大模型false-结果不返回给大模型
str:执行的指令
str:执行指令的结果
'''
ext_params = self.create_extparams()
# 定义允许的内置函数集合
allowed_builtins = {
"abs": abs,
"all": all,
"any": any,
"bool": bool,
"chr": chr,
"dict": dict,
"float": float,
"int": int,
"len": len,
"list": list,
"max": max,
"min": min,
"print": print,
"range": range,
"set": set,
"str": str,
"sum": sum,
"type": type,
# 根据需要可以添加其他安全的内置函数
}
# 第一步:验证指令合法性
instruction,time_out = self.validate_instruction(instruction_old)
if not instruction:
return False, instruction_old, "该指令暂不执行!","",ext_params
# 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#?
# 第二步:执行指令
output = ""
# 构造安全的全局命名空间,只包含我们允许的 __builtins__
safe_globals = {
"__builtins__": allowed_builtins,
}
try:
# 编译代码
code_obj = compile(instruction, filename="<dynamic>", mode="exec")
# 在限制环境中执行代码
exec(code_obj, safe_globals)
except Exception as e:
print(f"执行动态代码时出错: {e}")
# 第三步:分析执行结果
analysis = self.analyze_result(output, instruction,"","")
# 指令和结果入数据库
# ?
if not analysis: # analysis为“” 不提交LLM
return False, instruction, analysis,"",ext_params
return True, instruction, analysis,"",ext_params
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析
return result
if __name__ == "__main__":
llm_code = """
def run_test():
return 'Penetration test executed successfully!'
"""

129
tools/PythoncodeTool.py

@ -0,0 +1,129 @@
#python代码动态执行
import ast
import subprocess
import json
import builtins
import re
from tools.ToolBase import ToolBase
class PythoncodeTool(ToolBase):
def is_safe_code(self,code):
# List of high-risk functions to block (can be adjusted based on requirements)
HIGH_RISK_FUNCTIONS = ['eval', 'exec', 'os.system', 'subprocess.call', 'subprocess.Popen']
"""Check if the code contains high-risk function calls."""
try:
tree = ast.parse(code)
for node in ast.walk(tree):
if isinstance(node, ast.Call):
if isinstance(node.func, ast.Name) and node.func.id in HIGH_RISK_FUNCTIONS:
return False
elif isinstance(node.func, ast.Attribute) and node.func.attr in HIGH_RISK_FUNCTIONS:
return False
return True
except SyntaxError:
return False
def validate_instruction(self, instruction):
#指令过滤
timeout = 0
instr = instruction.replace("python_code ","")
# Safety check
if not self.is_safe_code(instr):
return "", timeout
return instr,timeout
def safe_import(self,name,*args,**kwargs):
ALLOWED_MODULES = ['subprocess', 'json','re']
if name not in ALLOWED_MODULES:
raise ImportError(f"Import of '{name}' is not allowed")
return builtins.__import__(name, *args, **kwargs)
def execute_instruction(self, instruction_old):
'''
执行指令验证合法性 -> 执行 -> 分析结果
:param instruction_old:
:return:
bool:true-正常返回给大模型false-结果不返回给大模型
str:执行的指令
str:执行指令的结果
'''
ext_params = self.create_extparams()
# 定义允许的内置函数集合 --白名单
allowed_builtins = {
'__import__': builtins.__import__,
"abs": abs,
"all": all,
"any": any,
"bool": bool,
"chr": chr,
"dict": dict,
"float": float,
"int": int,
"len": len,
"list": list,
"max": max,
"min": min,
"print": print,
"range": range,
"set": set,
"str": str,
"sum": sum,
"type": type,
'open':open,
'Exception':Exception,
# 根据需要可以添加其他安全的内置函数
}
# 第一步:验证指令合法性
instruction,time_out = self.validate_instruction(instruction_old)
if not instruction:
return False, instruction_old, "该指令暂不执行!","",ext_params
# 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#?
# 第二步:执行指令
output = ""
try:
# 构造安全的全局命名空间,只包含我们允许的 __builtins__
# 虽然动态代码中包含了import subprocess,但是还是需要在全局命名空间中添加subprocess这些库
# 正常情况应该是不需要的,后续再研究
safe_globals = {"__builtins__": allowed_builtins,
'subprocess':subprocess,
'json':json,
're':re,}
safe_locals = {} #不需要预设局部参数
# 在限制环境中执行代码
exec(instruction, safe_globals,safe_locals)
# Check if check_samba_vuln is defined
if 'dynamic_fun' not in safe_locals:
analysis = "Function dynamic_fun() is not defined"
ext_params['is_use'] = True
return True,instruction,analysis,analysis,ext_params
# Get the function and call it
dynamic_fun = safe_locals['dynamic_fun']
status, tmpout = dynamic_fun()
output = f"status:{status},output:{tmpout}"
except Exception as e:
analysis = f"执行动态代码时出错: {str(e)}"
ext_params['is_use'] = True
return True,instruction,analysis,analysis,ext_params
# 第三步:分析执行结果
analysis = self.analyze_result(output, instruction,"","")
# 指令和结果入数据库
# ?
if not analysis: # analysis为“” 不提交LLM
return False, instruction, analysis,"",ext_params
return True, instruction, analysis,"",ext_params
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析
return result
if __name__ == "__main__":
llm_code = """
def run_test():
return 'Penetration test executed successfully!'
"""

107
tools/SearchsploitTool.py

@ -1,17 +1,37 @@
from tools.ToolBase import ToolBase from tools.ToolBase import ToolBase
import re import re
import json import json
import subprocess
import os
import shutil
from pathlib import Path
class SearchsploitTool(ToolBase): class SearchsploitTool(ToolBase):
def validate_instruction(self, instruction): def validate_instruction(self, instruction):
#指令过滤 #指令过滤
timeout = 0 timeout = 0
if "-m " in instruction: # 下载利用代码
timeout = 60
return instruction,timeout return instruction,timeout
def analyze_result(self, result,instruction,stderr,stdout): def analyze_result(self, result,instruction,stderr,stdout):
#获取当前路径
cur_path = Path(__file__).resolve().parent
payload_dir = cur_path / "../payload"
#if instruction(result,bytes):
if type(result) is bytes:
result = result.decode('utf-8',errors='ignore')
"""去除 ANSI 颜色码""" """去除 ANSI 颜色码"""
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
clean_result = ansi_escape.sub('', result) clean_result = ansi_escape.sub('', result)
#指令结果分析 # 指令结果分析
if "-m " in instruction:#下载利用代码
if "Copied to:" in clean_result or "cp: overwrite " in clean_result:
result = "下载完成"
else:
result = "下载失败"
else: #目前只遇到两种searchsploit命令格式:
lines = clean_result.split("\n") lines = clean_result.split("\n")
exploits = [] exploits = []
@ -20,10 +40,91 @@ class SearchsploitTool(ToolBase):
if match: if match:
title = match.group(1).strip() title = match.group(1).strip()
path = match.group(2).strip() path = match.group(2).strip()
exploits.append({"title": title, "path": path}) if "Path" not in path:
#下载渗透脚本--进一步推进
ext_str = self.do_ext(path,payload_dir)
exploits.append({"title": title, "path": path,"补充信息":ext_str})
if len(exploits) > 0: if len(exploits) > 0:
result = json.dumps(exploits) #需要转化成字符串-必须 result = json.dumps(exploits,ensure_ascii=False) #输出原始的中文字符
else: else:
result = "没有检索到漏洞利用脚本" result = "没有检索到漏洞利用脚本"
return result return result
def find_file_in_directory(self,target_file, dir_path, case_sensitive=True, recursive=False):
"""
在指定目录中查找文件是否存在
:param target_file: 要查找的目标文件名含扩展名
:param search_dir: 要搜索的目录路径
:param case_sensitive: 是否区分大小写默认True
:param recursive: 是否递归搜索子目录默认False
:return: (bool, str) 是否存在完整路径如果找到
"""
try:
#dir_path = Path(search_dir)
# 验证目录是否存在
if not dir_path.is_dir():
return False, None
# 根据是否递归选择遍历方式
iterator = dir_path.rglob('*') if recursive else dir_path.iterdir()
for entry in iterator:
# 跳过目录只处理文件
if entry.is_file():
# 根据是否区分大小写进行比较
if case_sensitive:
match = entry.name == target_file
else:
match = entry.name.lower() == target_file.lower()
if match:
return True, str(entry.resolve())
return False, None
except Exception as e:
print(f"查找出错: {str(e)}")
return False, None
def do_download(self,filepath,dirpath):
'''
下载渗透脚本
:return:
'''
filename = filepath.split("/")[-1]
if ".py" in filename:
#对应payload库中是否存在该脚本
bfind,_ = self.find_file_in_directory(filename,dirpath)
if bfind:
return True
else:#下载
instruction = f"searchsploit -m {filepath}"
try:
subprocess.run(instruction, shell=True, check=True,timeout=60)
print("命令执行成功,文件下载完成。")
except subprocess.CalledProcessError as e: #超时暂不处理--后续补充
print(f"命令执行失败: {e}")
return False
if not os.path.exists(filename):
return False
# 移动文件到目标目录
try:
shutil.move(filename, os.path.join(dirpath, filename))
print(f"文件已成功移动到 {dirpath}")
return True
except Exception as e:
print(f"移动文件失败: {e}")
return False
else: #暂时只利用python脚本
return False
def do_ext(self,filepath,payload_dir) -> str:
bdownload = self.do_download(filepath,payload_dir)
if bdownload:
return "该渗透脚本已经复制到当前路径。"
else:
return "暂时不利用该渗透脚本。"

23
tools/ToolBase.py

@ -13,10 +13,10 @@ import sys
from myutils.ReturnParams import ReturnParams from myutils.ReturnParams import ReturnParams
class ToolBase(abc.ABC): class ToolBase(abc.ABC):
def __init__(self): def __init__(self,TM):
#self.res_type = 0 #补充一个结果类型 0-初始状态,1-有安全问题, #self.res_type = 0 #补充一个结果类型 0-初始状态,1-有安全问题,
#由于工具类会被多个线程调用,不能有全局变量 #由于工具类会被多个线程调用,全局变量不能修改,只能读取
pass self.TM = TM
def create_extparams(self): def create_extparams(self):
@ -29,6 +29,7 @@ class ToolBase(abc.ABC):
parser = argparse.ArgumentParser(add_help=False) #创建命令行参数解析器对象‌ parser = argparse.ArgumentParser(add_help=False) #创建命令行参数解析器对象‌
parser.add_argument("-o ", "--output", type=str) #添加需要解析的参数规则‌ parser.add_argument("-o ", "--output", type=str) #添加需要解析的参数规则‌
parser.add_argument("-oN ", "--Noutput", type=str) #nmap parser.add_argument("-oN ", "--Noutput", type=str) #nmap
parser.add_argument("-oG ","--NMG",type=str) #nmap
parser.add_argument("-output ", "--nikto", type=str) #nikto parser.add_argument("-output ", "--nikto", type=str) #nikto
args, _ = parser.parse_known_args(command.split()[1:]) args, _ = parser.parse_known_args(command.split()[1:])
return args return args
@ -83,11 +84,21 @@ class ToolBase(abc.ABC):
#-o 的命令需要处理 #-o 的命令需要处理
parsed_arg = self.parse_sublist3r_command(instruction) parsed_arg = self.parse_sublist3r_command(instruction)
if parsed_arg.output: if parsed_arg.output:
output = self.read_output_file(parsed_arg.output) file_out = self.read_output_file(parsed_arg.output)
stderr = ""
stdout = file_out
elif parsed_arg.Noutput: elif parsed_arg.Noutput:
output = self.read_output_file(parsed_arg.Noutput) file_out = self.read_output_file(parsed_arg.Noutput)
stderr = ""
stdout = file_out
elif parsed_arg.nikto: elif parsed_arg.nikto:
output = self.read_output_file(parsed_arg.nikto) file_out = self.read_output_file(parsed_arg.nikto)
stderr = ""
stdout = file_out
elif parsed_arg.NMG:
file_out = self.read_output_file(parsed_arg.NMG)
stderr = ""
stdout = file_out
else: else:
stderr = result.stderr stderr = result.stderr
stdout = result.stdout stdout = result.stdout

Loading…
Cancel
Save