Compare commits
2 Commits
392022633a
...
53c3ed4d10
Author | SHA1 | Date |
---|---|---|
|
53c3ed4d10 | 1 month ago |
|
5c988bb36a | 1 month ago |
17 changed files with 1030 additions and 456 deletions
@ -1,173 +0,0 @@ |
|||
''' |
|||
实现对大模型调用的封装,隔离具体使用的LLM |
|||
pip install openai |
|||
''' |
|||
import openai |
|||
import json |
|||
import threading |
|||
import re |
|||
from openai import OpenAI |
|||
from myutils.MyTime import get_local_timestr |
|||
|
|||
class LLMManager: |
|||
def __init__(self,illm_type=0): |
|||
self.api_key = None |
|||
self.api_url = None |
|||
self.task_id =0 #一个任务一个id |
|||
self.llm_sn = 0 # llm执行序列号,--一任务一序列 |
|||
self.llm_sn_lock = threading.Lock() # |
|||
#temperature设置 |
|||
#DS------代码生成/数学解题:0.0 -- 数据抽取/分析:1.0 -- 通用对话:1.3 -- 翻译:1.3 -- 创意类写作:1.5 |
|||
#腾讯云--- |
|||
self.temperature = 1.0 |
|||
if illm_type == 0: #腾讯云 |
|||
self.api_key = "fGBYaQLHykBOQsFwVrQdIFTsYr8YDtDVDQWFU41mFsmvfNPc" |
|||
self.api_url = "" |
|||
elif illm_type == 1: #DS |
|||
self.api_key ="sk-10360148b465424288218f02c87b0e1b" |
|||
self.api_url ="https://api.deepseek.com/v1" |
|||
self.model = "deepseek-reasoner" #model=deepseek-reasoner -- R1 model=deepseek-chat --V3 |
|||
#创建会话对象 -- 一个任务的LLM必须唯一 |
|||
self.client = OpenAI(api_key=self.api_key,base_url=self.api_url) |
|||
self.messages = [] |
|||
# 初始化阶段和已知信息 |
|||
current_stage = "信息收集" |
|||
known_info = {"url": "www.test.com"} |
|||
results = [] # 存储所有任务结果,用于生成报告 |
|||
|
|||
#******测试使用,设置slef.message的值 |
|||
def test_old_message(self,strMessage): |
|||
try: |
|||
self.messages = json.loads(strMessage) |
|||
except json.JSONDecodeError as e: |
|||
print(f"JSON解析错误: {str(e)}") |
|||
except Exception as e: |
|||
print(f"错误: {str(e)}") |
|||
|
|||
# 构建初始提示 初始化messages |
|||
def build_initial_prompt(self,target,know_info=""): |
|||
self.messages = [{"role": "system", |
|||
"content": "你是一位资深的渗透测试专家,需要动态控制整个渗透测试过程,包括信息收集、漏洞扫描、漏洞利用等阶段,最终生成渗透测试报告。由你规划执行的指令,我会根据你的指令执行并提交结果,你再对结果进行分析,规划并生成下一步指令,直到完成渗透测试,生成测试报告。" |
|||
"生成的指令需满足如下约束:" |
|||
"1.只返回具体的shell指令或Python代码,不要包含注释和说明;" |
|||
"2.shell指令以```bash(.*?)```包裹,python代码以```python(.*?)```包裹;" |
|||
"3.针对提供的Python代码,需要包含错误处理,并将执行结果保存到文件中(为每个任务指定唯一文件名),执行结束返回success或failure和该文件名;" |
|||
"4.如果认为渗透测试已完成,请生成生成报告的python代码,并返回success和complete"}] # 一个messages |
|||
return f"现在开始对目标{target}进行渗透测试,已知信息{know_info},请提供下一步执行的指令。" |
|||
|
|||
# 构建反馈提示 |
|||
def build_feedback_prompt(self,bres,instruction, result): |
|||
if bres: |
|||
return f"执行指令“{instruction}”的结果是“{result}”。请根据这些结果生成下一步具体的指令。" |
|||
else: |
|||
return "" |
|||
|
|||
def init_data(self,task_id=0): |
|||
#初始化LLM数据 |
|||
self.llm_sn = 0 |
|||
self.task_id = task_id |
|||
self.messages = [] |
|||
|
|||
# 调用LLM生成指令 |
|||
def get_llm_instruction(self,prompt,th_DBM): |
|||
''' |
|||
1.由于大模型API不记录用户请求的上下文,一个任务的LLM不能并发! |
|||
:param prompt:用户本次输入的内容 |
|||
:return: instr_list |
|||
''' |
|||
#添加本次输入入队列 |
|||
message = {"role":"user","content":prompt} |
|||
self.messages.append(message) |
|||
|
|||
#提交LLM |
|||
post_time = get_local_timestr() |
|||
response = self.client.chat.completions.create( |
|||
model=self.model, |
|||
messages = self.messages |
|||
) |
|||
|
|||
reasoning_content = "" |
|||
content = "" |
|||
#LLM返回处理 |
|||
if self.model == "deepseek-reasoner": |
|||
#返回错误码:DS-https://api-docs.deepseek.com/zh-cn/quick_start/error_codes |
|||
reasoning_content = response.choices[0].message.reasoning_content #推理过程 |
|||
print(reasoning_content) |
|||
content = response.choices[0].message.content #推理内容 |
|||
print(content) |
|||
#记录历史信息 |
|||
self.messages.append({'role': 'assistant', 'content': content}) |
|||
elif self.model == "deepseek-chat": |
|||
content = response.choices[0].message |
|||
#记录历史信息 |
|||
self.messages.append(content) |
|||
#LLM记录存数据库 |
|||
with self.llm_sn_lock: |
|||
self.llm_sn += 1 |
|||
#llm查询记录入库 |
|||
bres = th_DBM.insert_llm(self.task_id,prompt,reasoning_content,content,post_time,self.llm_sn) |
|||
if not bres: |
|||
print("llm入库失败!") |
|||
|
|||
#********测试时使用---输出和记录LLM返回指令的message |
|||
print(f"Messages:{self.messages}") |
|||
with open("test","w",encoding="utf-8") as f: #输出到文件 |
|||
json.dump(self.messages,f,ensure_ascii=False) |
|||
|
|||
#需要对指令进行提取 |
|||
instr_list = self.fetch_instruction(content) |
|||
return instr_list |
|||
|
|||
def fetch_instruction(self,response_text): |
|||
''' |
|||
提取命令列表,包括: |
|||
1. Python 代码块(仅保留有效 Python 代码) |
|||
2. Shell 命令(分割空行,每个块视为一条指令) |
|||
|
|||
:param text: 输入文本 |
|||
:return: 解析后的命令列表 |
|||
''' |
|||
#针对llm的回复,提取执行的指令 |
|||
# 正则匹配 Python 代码块 |
|||
python_blocks = re.findall(r"```python(.*?)```", response_text, flags=re.DOTALL) |
|||
# 处理 Python 代码块,去除空行并格式化 |
|||
python_blocks = [block.strip() for block in python_blocks] |
|||
|
|||
# 按连续的空行拆分 |
|||
# 移除 Python 代码块,但保留内容用于返回 |
|||
text_no_python = re.sub(r"```python.*?```", "PYTHON_BLOCK", response_text, flags=re.DOTALL) |
|||
# 这里用 \n\s*\n 匹配一个或多个空白行 |
|||
parts = re.split(r'\n\s*\n', text_no_python) |
|||
|
|||
commands = [] |
|||
python_index = 0 # 记录 Python 代码块插入位置 |
|||
|
|||
for part in parts: |
|||
part = part.strip() |
|||
if not part: |
|||
continue |
|||
if "PYTHON_BLOCK" in part: |
|||
# 还原 Python 代码块 |
|||
commands.append(f"python {python_blocks[python_index]}") |
|||
python_index += 1 |
|||
else: |
|||
# 添加普通 Shell 命令 |
|||
commands.append(part) |
|||
|
|||
return commands |
|||
|
|||
def test_llm(self): |
|||
with open("test", "r", encoding="utf-8") as f: |
|||
messages = json.load(f) |
|||
text = messages[-1]["content"] |
|||
list = self.fetch_instruction(text) |
|||
for itme in list: |
|||
print("***********") |
|||
print(itme) |
|||
|
|||
|
|||
if __name__ == "__main__": |
|||
LM = LLMManager(1) |
|||
LM.test_llm() |
|||
|
|||
|
@ -0,0 +1,148 @@ |
|||
import queue |
|||
#渗透测试树结构维护类 |
|||
class AttackTree: |
|||
def __init__(self,root_node): |
|||
#针对根节点处理 |
|||
self.root = root_node |
|||
self.root.path = f"目标系统->{root_node.name}" |
|||
|
|||
def set_root(self,root_node): |
|||
self.root = root_node |
|||
|
|||
def add_node(self,parent_name,new_node): |
|||
"""根据父节点名称添加新节点""" |
|||
parent_node = self.find_node_by_name(parent_name) |
|||
if parent_node: |
|||
parent_node.add_child(new_node) |
|||
return True |
|||
return False |
|||
|
|||
def traverse_bfs(self): |
|||
"""广度优先遍历""" |
|||
if not self.root: |
|||
return [] |
|||
|
|||
queue = [self.root] |
|||
result = [] |
|||
while queue: |
|||
current = queue.pop(0) |
|||
result.append(current) |
|||
queue.extend(current.children) |
|||
return result |
|||
|
|||
def traverse_dfs(self, node=None, result=None): |
|||
"""深度优先遍历(前序遍历)""" |
|||
if result is None: |
|||
result = [] |
|||
if node is None: |
|||
node = self.root |
|||
if not node: |
|||
return [] |
|||
|
|||
result.append(node) |
|||
for child in node.children: |
|||
self.traverse_dfs(child, result) |
|||
return result |
|||
|
|||
def find_node_by_name(self, name): |
|||
"""根据名称查找节点(广度优先)""" |
|||
nodes = self.traverse_bfs() |
|||
for node in nodes: |
|||
if node.name == name: |
|||
return node |
|||
return None |
|||
|
|||
def find_node_by_nodepath(self,node_path): |
|||
'''基于节点路径查找节点,只返回找到的第一个节点,若有节点名称路径重复的情况暂不处理''' |
|||
current_node = self.root #从根节点开始 |
|||
node_names = node_path.split('->') |
|||
for node_name in node_names: |
|||
if node_name == "目标系统": |
|||
continue |
|||
if node_name == current_node.name:#根节点开始 |
|||
continue |
|||
else: |
|||
bfound = False |
|||
for child_node in current_node.children: |
|||
if child_node.name == node_name: #约束同一父节点下的子节点名称不能相同 |
|||
current_node = child_node |
|||
bfound = True |
|||
break |
|||
if not bfound: #如果遍历子节点都没有符合的,说明路径有问题的,不处理中间一段路径情况 |
|||
return None |
|||
#找到的话,就开始匹配下一层 |
|||
return current_node |
|||
|
|||
def find_nodes_by_status(self, status): |
|||
"""根据状态查找所有匹配节点""" |
|||
return [node for node in self.traverse_bfs() if node.status == status] |
|||
|
|||
def find_nodes_by_vul_type(self, vul_type): |
|||
"""根据漏洞类型查找所有匹配节点""" |
|||
return [node for node in self.traverse_bfs() if node.vul_type == vul_type] |
|||
|
|||
#考虑要不要用tree封装节点的操作--待定 |
|||
def update_node_status(self, node_name, new_status): |
|||
"""修改节点状态""" |
|||
node = self.find_node_by_name(node_name) |
|||
if node: |
|||
node.status = new_status |
|||
return True |
|||
return False |
|||
|
|||
def update_node_vul_type(self,node_name,vul_type): |
|||
"""修改节点漏洞类型""" |
|||
node = self.find_node_by_name(node_name) |
|||
if node: |
|||
node.vul_type = vul_type |
|||
return True |
|||
return False |
|||
|
|||
def print_tree(self, node=None, level=0): |
|||
"""可视化打印树结构""" |
|||
if node is None: |
|||
node = self.root |
|||
prefix = " " * level + "|-- " if level > 0 else "" |
|||
print(f"{prefix}{node.name} [{node.status}, {node.vul_type}]") |
|||
for child in node.children: |
|||
self.print_tree(child, level + 1) |
|||
|
|||
|
|||
class TreeNode: |
|||
def __init__(self, name, status="未完成", vul_type="未发现"): |
|||
self.name = name # 节点名称 |
|||
self.status = status # 节点状态 |
|||
self.vul_type = vul_type # 漏洞类型 |
|||
self.children = [] # 子节点列表 |
|||
self.parent = None # 父节点引用 |
|||
self.path = "" #当前节点的路径 |
|||
self.instr_queue = [] #queue.Queue() #针对当前节点的执行指令----重要约束:一个节点只能有一个线程在执行指令 |
|||
self.res_quere = [] #queue.Queue() #指令执行的结果,一批一批 |
|||
self.llm_sn = 0 #针对该节点llm提交次数 |
|||
self.do_sn = 0 #针对该节点instr执行次数 |
|||
self.messages = [] #针对当前节点积累的messages -- 针对不同节点提交不同的messages |
|||
|
|||
def add_child(self, child_node): |
|||
child_node.parent = self |
|||
child_node.path = self.path + f"->{child_node.name}" #子节点的路径赋值 |
|||
child_node.messages = self.messages #传递messages #给什么时候的messages待验证#? |
|||
self.children.append(child_node) |
|||
|
|||
def add_instr(self,instr): |
|||
self.instr_queue.append(instr) |
|||
|
|||
def get_instr(self): |
|||
return self.instr_queue.pop(0) if self.instr_queue else None |
|||
|
|||
def add_res(self,str_res): #结构化结果字串 |
|||
self.res_quere.append(str_res) |
|||
|
|||
def get_res(self): |
|||
return self.res_queue.pop(0) if self.res_queue else None |
|||
|
|||
def __repr__(self): |
|||
return f"TreeNode({self.name}, {self.status}, {self.vul_type})" |
|||
|
|||
|
|||
if __name__ == "__main__": |
|||
pass |
@ -0,0 +1,180 @@ |
|||
#自动化测试逻辑规则控制 |
|||
#统一控制规则 和 渗透测试树的维护 |
|||
import json |
|||
import re |
|||
import queue |
|||
from mycode.AttackMap import AttackTree |
|||
from mycode.AttackMap import TreeNode |
|||
from mycode.LLMManager import LLMManager |
|||
from myutils.ConfigManager import myCongif #单一实例 |
|||
from myutils.MyLogger_logger import LogHandler |
|||
|
|||
class ControlCenter: |
|||
def __init__(self,DBM): |
|||
self.logger = LogHandler().get_logger("ControlCenter") |
|||
self.task_id = None |
|||
self.target = None |
|||
self.attack_tree = None |
|||
self.DBM = DBM |
|||
#LLM对象 |
|||
self.LLM = LLMManager(myCongif.get_data("LLM_type")) |
|||
|
|||
def __del__(self): |
|||
self.task_id = None |
|||
self.target = None |
|||
self.attack_tree = None |
|||
self.DBM = None |
|||
|
|||
def init_cc_data(self): |
|||
#一次任务一次数据 |
|||
pass |
|||
|
|||
def get_user_init_info(self): |
|||
'''开始任务初,获取用户设定的基础信息''' |
|||
# ?包括是否对目标进行初始化的信息收集 |
|||
return "无" |
|||
|
|||
def start_do(self,target,task_id): |
|||
'''一个新任务的开始''' |
|||
self.task_id = task_id |
|||
self.target = target |
|||
#创建测试树 |
|||
if self.attack_tree: |
|||
self.attack_tree = None #释放 |
|||
root_node = TreeNode(target) |
|||
self.attack_tree = AttackTree(root_node)#创建测试树,同时更新根节点相关内容 |
|||
#获取初始指令 |
|||
know_info = self.get_user_init_info() |
|||
prompt = self.LLM.build_initial_prompt(target,know_info,root_node) |
|||
node_cmds, commands = self.LLM.get_llm_instruction(prompt,self.DBM,root_node) |
|||
# 更新tree |
|||
self.tree_manager(node_cmds,root_node) |
|||
# 分析指令入对应节点 |
|||
node_list = self.instr_in_node(commands,root_node) |
|||
return node_list |
|||
|
|||
#约束1:一个节点只能同时提交一次,未测试的节点不要重复 |
|||
def get_llm_instruction(self,node): |
|||
#拼接结果字符串--由于测试需要queue改成了list 2025-3-19 |
|||
# json_strings = [] |
|||
# if node.res_quere: |
|||
# for item in node.res_quere: |
|||
# json_str = json.dumps(item, ensure_ascii=False) |
|||
# json_strings.append(json_str) |
|||
# res_str = ','.join(json_strings) |
|||
|
|||
res_str = json.dumps(node.res_quere,ensure_ascii=False) |
|||
|
|||
#构造本次提交的prompt |
|||
user_Prompt = f''' |
|||
当前分支路径:{node.path} |
|||
当前节点信息: |
|||
- 节点名称:{node.name} |
|||
- 节点状态:{node.status} |
|||
- 漏洞类型:{node.vul_type} |
|||
上一步结果:{res_str} |
|||
任务:生成下一步渗透测试指令或结束该节点渗透测试。 |
|||
''' |
|||
node_cmds,commands = self.LLM.get_llm_instruction(user_Prompt,self.DBM,node) |
|||
#更新tree |
|||
self.tree_manager(node_cmds,node) |
|||
#分析指令入对应节点 |
|||
node_list = self.instr_in_node(commands,node) |
|||
return node_list |
|||
|
|||
def tree_manager(self,node_cmds,node): |
|||
'''更新渗透测试树''' |
|||
if not node_cmds or len(node_cmds)==0: |
|||
return |
|||
try: |
|||
node_jsons = json.loads(node_cmds) |
|||
for node_json in node_jsons: |
|||
if node_json["action"] == "add_node": #新增节点 |
|||
parent_node_name = node_json["parent"] |
|||
node_name = node_json["node"] |
|||
status = node_json["status"] |
|||
#新增节点原则上应该都是当前节点增加子节点 |
|||
if node.name == parent_node_name: |
|||
new_node = TreeNode(node_name,status) |
|||
node.add_child(new_node) #message的传递待验证 |
|||
else: |
|||
self.logger.error("添加子节点时,遇到父节点名称不一致的,需要介入!!") |
|||
elif node_json["action"] == "update_status": |
|||
node_name = node_json["node"] |
|||
status = node_json["status"] |
|||
vul_type = node_json["vulnerability"] |
|||
if node.name == node_name: |
|||
node.status = status |
|||
node.vul_type = vul_type |
|||
else: |
|||
self.logger.error("遇到不是修改本节点状态的,需要介入!!") |
|||
else: |
|||
self.logger.error("node更新JSON遇到异常参数!") |
|||
except json.JSONDecodeError as e: |
|||
self.logger.error(f"JSON 解析失败:{e.msg}") |
|||
self.logger.error(f"错误位置:第{e.lineno}行,列{e.colno}") |
|||
except TypeError as e: |
|||
self.logger.error(f"输入类型错误:{str(e)}") |
|||
|
|||
def instr_in_node(self,commands,node): |
|||
node_list = [] |
|||
for command in commands: |
|||
# 使用正则匹配方括号中的node_path(非贪婪模式) |
|||
match = re.search(r'\[(.*?)\]', command) |
|||
if match: |
|||
node_path = match.group(1) |
|||
find_node = self.attack_tree.find_node_by_nodepath(node_path) |
|||
if find_node: |
|||
instruction = re.sub(r'\[.*?\]', "", command,count=1,flags=re.DOTALL) |
|||
find_node.instr_queue.append(instruction) |
|||
#入输出队列 |
|||
if find_node not in node_list: |
|||
node_list.append(find_node) |
|||
else: |
|||
self.logger.error(f"基于节点路径没有找到对应的节点{node_path}") |
|||
else: |
|||
self.logger.error(f"得到的指令格式不符合规范:{command}") |
|||
return node_list |
|||
|
|||
#待修改 |
|||
def is_user_instr(self,instr): |
|||
''' |
|||
过滤需要人工确认或手动执行的指令 ---- 待完善 |
|||
:param instr: |
|||
:return: |
|||
''' |
|||
#if instr.startswith("curl") or instr.startswith("http") or instr.startswith("wget"): |
|||
if instr.startswith("http") or instr.startswith("wget") or instr.startswith("ssh"): |
|||
return True |
|||
|
|||
#指令入队列,待修改 |
|||
def instr_in_quere(self,instr_list): |
|||
''' |
|||
对于运行需要较长时间的不强求同一批次返回给LLM |
|||
:param instr_list: |
|||
:return: |
|||
''' |
|||
for instr in instr_list: |
|||
if self.is_user_instr(instr): |
|||
self.user_instr.put(instr) |
|||
print(f"需要人工确认的指令{instr}") |
|||
else: |
|||
matched =False |
|||
for prefix in self.long_time_instr: |
|||
if instr.startswith(prefix): |
|||
matched =True |
|||
if not matched: |
|||
with self.lock: |
|||
self.batch_num += 1 #非耗时指令+1 |
|||
print(f"&&&&&&当前batch_num:{self.batch_num}") |
|||
else: |
|||
with self.lock: |
|||
self.long_instr_num +=1 #耗时指令数量+1 |
|||
# 指令入队列 |
|||
self.instr_queue.append(instr) |
|||
|
|||
def stop_do(self): |
|||
#清空数据 |
|||
self.task_id = None |
|||
self.target = None |
|||
self.attack_tree = None |
@ -0,0 +1,23 @@ |
|||
|
|||
class InitManger: |
|||
def __init__(self): |
|||
listen_tatus = False |
|||
|
|||
def init_tool_info(self,self_ip="",self_port=0): |
|||
''' |
|||
初始化工具信息和工作 - 一直运行的信息 |
|||
:return: |
|||
''' |
|||
#建立一个socket监听线程,处理反向连接 |
|||
if self_ip and self_port: #ip和端口有值后 |
|||
listen_tatus = False |
|||
pass |
|||
|
|||
def init_task_info(self,cookie=""): |
|||
''' |
|||
初始化任务信息,-- 一次任务一次的信息 |
|||
:return: |
|||
''' |
|||
pass |
|||
|
|||
InitM = InitManger() #单一实例 |
@ -0,0 +1,233 @@ |
|||
''' |
|||
实现对大模型调用的封装,隔离具体使用的LLM |
|||
pip install openai |
|||
export OPENAI_API_KEY="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" |
|||
''' |
|||
import openai |
|||
import json |
|||
import threading |
|||
import re |
|||
from openai import OpenAI |
|||
from myutils.MyTime import get_local_timestr |
|||
from myutils.MyLogger_logger import LogHandler |
|||
|
|||
class LLMManager: |
|||
def __init__(self,illm_type=0): |
|||
self.logger = LogHandler().get_logger("LLMManager") |
|||
self.api_key = None |
|||
self.api_url = None |
|||
self.task_id =0 #一个任务一个id |
|||
self.llm_sn = 0 # llm执行序列号,--一任务一序列 |
|||
self.llm_sn_lock = threading.Lock() # |
|||
#temperature设置 |
|||
#DS------代码生成/数学解题:0.0 -- 数据抽取/分析:1.0 -- 通用对话:1.3 -- 翻译:1.3 -- 创意类写作:1.5 |
|||
#腾讯云--- |
|||
self.temperature = 1.0 |
|||
if illm_type == 0: #腾讯云 |
|||
self.api_key = "fGBYaQLHykBOQsFwVrQdIFTsYr8YDtDVDQWFU41mFsmvfNPc" |
|||
self.api_url = "" |
|||
elif illm_type == 1: #DS |
|||
self.api_key ="sk-10360148b465424288218f02c87b0e1b" |
|||
self.api_url ="https://api.deepseek.com/v1" |
|||
self.model = "deepseek-reasoner" #model=deepseek-reasoner -- R1 model=deepseek-chat --V3 |
|||
# 创建会话对象 -- 一个任务的LLM必须唯一 |
|||
self.client = OpenAI(api_key=self.api_key, base_url=self.api_url) |
|||
elif illm_type ==3: #GPT |
|||
self.api_key ="" |
|||
self.api_url = "" |
|||
self.model = "" |
|||
self.client = OpenAI() |
|||
|
|||
self.messages = [] |
|||
# 初始化阶段和已知信息 |
|||
current_stage = "信息收集" |
|||
known_info = {"url": "www.test.com"} |
|||
results = [] # 存储所有任务结果,用于生成报告 |
|||
|
|||
#******测试使用,设置slef.message的值 |
|||
def test_old_message(self,strMessage): |
|||
try: |
|||
self.messages = json.loads(strMessage) |
|||
except json.JSONDecodeError as e: |
|||
print(f"JSON解析错误: {str(e)}") |
|||
except Exception as e: |
|||
print(f"错误: {str(e)}") |
|||
|
|||
# 构建初始提示 初始化messages |
|||
def build_initial_prompt(self,target,know_info="无",node=None): |
|||
if not node: |
|||
return None |
|||
#根节点初始化message |
|||
node.messages = [{"role": "system", |
|||
"content":''' |
|||
你是一位渗透测试专家,由你动态控制整个渗透测试过程,根据当前测试状态和返回结果,决定下一步的测试指令,推动测试阶段前进,直至完成渗透测试。本地程序会根据你的指令进行执行,然后将执行结果返回给你。 |
|||
总体要求说明: |
|||
1.以目标系统所在IP为根节点,随着信息收集和渗透测试的推进,每个渗透测试点(如端口、服务、漏洞点)作为子节点,形成树型结构(测试树); |
|||
2.测试树整体数据由本地程序存储,你只需要关注当前节点的渗透测试推进,节点状态(未完成、已完成)的更新,和是否有子节点新增; |
|||
3.你需要返回两类指令:节点指令和测试指令,以空行间隔; |
|||
--示例: |
|||
{\"action\":\"add_node\", \"parent\": \"192.168.1.100\", \"node\": \"21端口\", \"status\": \"未完成\"} |
|||
|
|||
```bash-[目标系统->192.168.1.100->3306端口] |
|||
mysql -u root -p 192.168.1.100``` |
|||
若无节点修改或新增,节点指令可以为空,但测试指令必须对应已有节点; |
|||
决策流程: |
|||
1. 如果当前节点是IP地址,且未进行端口扫描,则执行端口扫描。 |
|||
2. 如果端口扫描发现开放端口,则为每个开放端口新增一个测试节点。 |
|||
3. 如果当前节点是端口,且未进行服务扫描,则执行服务扫描。 |
|||
4. 如果服务扫描发现服务版本或漏洞,则新增对应的漏洞测试节点。 |
|||
5. 如果当前节点是漏洞测试,且漏洞利用成功,则根据利用结果决定是否新增子节点进一步测试。 |
|||
6. 如果当前节点测试未发现新信息,则更新节点状态为“已完成”,并继续测试其他未完成节点。 |
|||
生成的节点指令需要满足如下约束: |
|||
1.新增节点指令示例:{\"action\":\"add_node\", \"parent\": \"80端口\", \"node\": \"http://192.168.1.100/index.php?id=1\", \"status\": \"未完成\"}; |
|||
2.新增子节点时,同一请求返回的子节点名(node)不能相同,且必须同时提供对该节点的测试指令; |
|||
3.若认为该节点已完成测试,修改该节点为“已完成”状态,完成节点测试且发现漏洞示例:{\"action\": \"update_status\", \"node\": \"21端口\", \"status\": \"已完成\",\"vulnerability\": \"ftp匿名登录\"}; |
|||
4.发现漏洞后,可根据漏洞类型决定是否新增子节点继续测试,如:{\"action\": \"add_node\", \"parent\": \"21端口\", \"node\": \"ftp配置检查\", \"status\": \"未完成\"}; |
|||
生成的渗透测试指令需满足如下约束: |
|||
1.只返回具体的shell指令或Python代码,不要包含注释和说明; |
|||
2.shell指令以```bash-[对应节点的路径](.*?)```包裹,python代码以```python-[对应节点的路径](.*?)```包裹,[对应节点的路径]为从根节点到目标节点的完整层级描述; |
|||
3.若提供的是shell指令,需要避免用户再次交互; |
|||
4.若提供的是python代码,主函数名为dynamic_fun,需包含错误处理,执行结束后必须返回一个tuple (status, output),其中status为'success'或'failure',output为补充输出信息; |
|||
示例: |
|||
```python-[目标系统->192.168.1.100->3306端口] |
|||
def dynamic_fun(): |
|||
try: |
|||
result = "扫描完成" |
|||
return ("success", result) |
|||
except Exception as e: |
|||
return ("failure", str(e)) |
|||
``` |
|||
限制条件: |
|||
1.仅在发现高危漏洞或关键测试路径时新增节点 |
|||
'''}] # 一个messages |
|||
user_Prompt = f''' |
|||
当前分支路径:目标系统->{target} |
|||
当前节点信息: |
|||
- 节点名称:{target} |
|||
- 节点状态:未完成 |
|||
- 漏洞类型:未发现 |
|||
上一步结果:{know_info} |
|||
任务:生成下一步渗透测试指令或结束该节点的渗透测试(修改节点状态为:已完成)。 |
|||
''' |
|||
return user_Prompt |
|||
|
|||
def init_data(self,task_id=0): |
|||
#初始化LLM数据 |
|||
self.llm_sn = 0 |
|||
self.task_id = task_id |
|||
self.messages = [] |
|||
|
|||
# 调用LLM生成指令 |
|||
def get_llm_instruction(self,prompt,th_DBM,node): |
|||
''' |
|||
1.由于大模型API不记录用户请求的上下文,一个任务的LLM不能并发! |
|||
:param prompt:用户本次输入的内容 |
|||
:return: instr_list |
|||
''' |
|||
#添加本次输入入该节点的message队列 |
|||
message = {"role":"user","content":prompt} |
|||
node.messages.append(message) |
|||
|
|||
#提交LLM |
|||
post_time = get_local_timestr() |
|||
response = self.client.chat.completions.create( |
|||
model=self.model, |
|||
messages = node.messages |
|||
) |
|||
|
|||
#LLM返回结果处理 |
|||
reasoning_content = "" |
|||
content = "" |
|||
#LLM返回处理 |
|||
if self.model == "deepseek-reasoner": |
|||
#返回错误码:DS-https://api-docs.deepseek.com/zh-cn/quick_start/error_codes |
|||
reasoning_content = response.choices[0].message.reasoning_content #推理过程 |
|||
print(reasoning_content) |
|||
content = response.choices[0].message.content #推理内容 |
|||
print(content) |
|||
# 记录llm历史信息 |
|||
node.messages.append({'role': 'assistant', 'content': content}) |
|||
elif self.model == "deepseek-chat": |
|||
content = response.choices[0].message |
|||
# 记录llm历史信息 |
|||
node.messages.append(content) |
|||
else: |
|||
self.logger.error("处理到未预设的模型!") |
|||
return None |
|||
|
|||
#LLM记录存数据库 |
|||
node.llm_sn += 1 |
|||
bres = th_DBM.insert_llm(self.task_id,prompt,reasoning_content,content,post_time,node) |
|||
if not bres: |
|||
self.logger.error(f"{node.name}-llm入库失败!") |
|||
|
|||
#需要对指令进行提取 |
|||
node_cmds,commands = self.fetch_instruction(content,node) |
|||
|
|||
return node_cmds,commands |
|||
|
|||
def fetch_instruction(self,response_text,node): |
|||
''' |
|||
*****该函数很重要,需要一定的容错能力,解析LLM返回内容***** |
|||
处理边界:只格式化分析LLM返回内容,指令和节点操作等交其他模块。 |
|||
节点控制指令 |
|||
渗透测试指令 |
|||
提取命令列表,包括: |
|||
1. Python 代码块 python[](.*?) |
|||
2. Shell 命令``bash[](.*?)``` |
|||
:param text: 输入文本 |
|||
:return: node_cmds,python_blocks,shell_blocks |
|||
''' |
|||
#针对llm的回复,提取节点操作数据和执行的指令---- |
|||
# 正则匹配 Python 代码块 |
|||
python_blocks = re.findall(r"```python-(.*?)```", response_text, flags=re.DOTALL) |
|||
# 处理 Python 代码块,去除空行并格式化 |
|||
python_blocks = [block.strip() for block in python_blocks] |
|||
|
|||
#正则匹配shell指令 |
|||
shell_blocks = re.findall(f"```bash-(.*?)```", response_text, flags=re.DOTALL) |
|||
shell_blocks = [block.strip() for block in shell_blocks] |
|||
|
|||
# 按连续的空行拆分 |
|||
# 移除 Python和bash 代码块 |
|||
text_no_python = re.sub(r"```python.*?```", "PYTHON_BLOCK", response_text, flags=re.DOTALL) |
|||
text = re.sub(r"```bash.*?```", "SHELL_BLOCK", text_no_python, flags=re.DOTALL) |
|||
|
|||
# 这里用 \n\s*\n 匹配一个或多个空白行 |
|||
parts = re.split(r'\n\s*\n', text) |
|||
node_cmds = [] |
|||
commands = [] |
|||
python_index = 0 |
|||
shell_index = 0 |
|||
for part in parts: |
|||
part = part.strip() |
|||
if not part: |
|||
continue |
|||
if "PYTHON_BLOCK" in part: |
|||
# 还原 Python 代码块 |
|||
commands.append(f"python_code {python_blocks[python_index]}") |
|||
python_index += 1 |
|||
elif "SHELL_BLOCK" in part: |
|||
commands.append(shell_blocks[shell_index]) |
|||
shell_index +=1 |
|||
else: |
|||
#其他的认为是节点操作指令 |
|||
node_cmds.append(part) |
|||
|
|||
return node_cmds,commands |
|||
|
|||
def test_llm(self): |
|||
with open("../test", "r", encoding="utf-8") as f: |
|||
messages = json.load(f) |
|||
text = messages[-1]["content"] |
|||
list = self.fetch_instruction(text) |
|||
for itme in list: |
|||
print("***********") |
|||
print(itme) |
|||
|
|||
|
|||
if __name__ == "__main__": |
|||
LM = LLMManager(1) |
|||
LM.test_llm() |
|||
|
|||
|
File diff suppressed because one or more lines are too long
@ -1,80 +0,0 @@ |
|||
#python代码动态执行 |
|||
|
|||
from tools.ToolBase import ToolBase |
|||
|
|||
class PythonTool(ToolBase): |
|||
def validate_instruction(self, instruction): |
|||
#指令过滤 |
|||
timeout = 0 |
|||
return "",timeout |
|||
|
|||
def execute_instruction(self, instruction_old): |
|||
''' |
|||
执行指令:验证合法性 -> 执行 -> 分析结果 |
|||
:param instruction_old: |
|||
:return: |
|||
bool:true-正常返回给大模型,false-结果不返回给大模型 |
|||
str:执行的指令 |
|||
str:执行指令的结果 |
|||
''' |
|||
ext_params = self.create_extparams() |
|||
# 定义允许的内置函数集合 |
|||
allowed_builtins = { |
|||
"abs": abs, |
|||
"all": all, |
|||
"any": any, |
|||
"bool": bool, |
|||
"chr": chr, |
|||
"dict": dict, |
|||
"float": float, |
|||
"int": int, |
|||
"len": len, |
|||
"list": list, |
|||
"max": max, |
|||
"min": min, |
|||
"print": print, |
|||
"range": range, |
|||
"set": set, |
|||
"str": str, |
|||
"sum": sum, |
|||
"type": type, |
|||
# 根据需要可以添加其他安全的内置函数 |
|||
} |
|||
# 第一步:验证指令合法性 |
|||
instruction,time_out = self.validate_instruction(instruction_old) |
|||
if not instruction: |
|||
return False, instruction_old, "该指令暂不执行!","",ext_params |
|||
# 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#? |
|||
|
|||
# 第二步:执行指令 |
|||
output = "" |
|||
# 构造安全的全局命名空间,只包含我们允许的 __builtins__ |
|||
safe_globals = { |
|||
"__builtins__": allowed_builtins, |
|||
} |
|||
try: |
|||
# 编译代码 |
|||
code_obj = compile(instruction, filename="<dynamic>", mode="exec") |
|||
# 在限制环境中执行代码 |
|||
exec(code_obj, safe_globals) |
|||
except Exception as e: |
|||
print(f"执行动态代码时出错: {e}") |
|||
|
|||
|
|||
# 第三步:分析执行结果 |
|||
analysis = self.analyze_result(output, instruction,"","") |
|||
# 指令和结果入数据库 |
|||
# ? |
|||
if not analysis: # analysis为“” 不提交LLM |
|||
return False, instruction, analysis,"",ext_params |
|||
return True, instruction, analysis,"",ext_params |
|||
|
|||
def analyze_result(self, result,instruction,stderr,stdout): |
|||
#指令结果分析 |
|||
return result |
|||
|
|||
if __name__ == "__main__": |
|||
llm_code = """ |
|||
def run_test(): |
|||
return 'Penetration test executed successfully!' |
|||
""" |
@ -0,0 +1,129 @@ |
|||
#python代码动态执行 |
|||
import ast |
|||
import subprocess |
|||
import json |
|||
import builtins |
|||
import re |
|||
from tools.ToolBase import ToolBase |
|||
|
|||
class PythoncodeTool(ToolBase): |
|||
|
|||
def is_safe_code(self,code): |
|||
# List of high-risk functions to block (can be adjusted based on requirements) |
|||
HIGH_RISK_FUNCTIONS = ['eval', 'exec', 'os.system', 'subprocess.call', 'subprocess.Popen'] |
|||
|
|||
"""Check if the code contains high-risk function calls.""" |
|||
try: |
|||
tree = ast.parse(code) |
|||
for node in ast.walk(tree): |
|||
if isinstance(node, ast.Call): |
|||
if isinstance(node.func, ast.Name) and node.func.id in HIGH_RISK_FUNCTIONS: |
|||
return False |
|||
elif isinstance(node.func, ast.Attribute) and node.func.attr in HIGH_RISK_FUNCTIONS: |
|||
return False |
|||
return True |
|||
except SyntaxError: |
|||
return False |
|||
|
|||
def validate_instruction(self, instruction): |
|||
#指令过滤 |
|||
timeout = 0 |
|||
instr = instruction.replace("python_code ","") |
|||
# Safety check |
|||
if not self.is_safe_code(instr): |
|||
return "", timeout |
|||
return instr,timeout |
|||
|
|||
def safe_import(self,name,*args,**kwargs): |
|||
ALLOWED_MODULES = ['subprocess', 'json','re'] |
|||
if name not in ALLOWED_MODULES: |
|||
raise ImportError(f"Import of '{name}' is not allowed") |
|||
return builtins.__import__(name, *args, **kwargs) |
|||
|
|||
def execute_instruction(self, instruction_old): |
|||
''' |
|||
执行指令:验证合法性 -> 执行 -> 分析结果 |
|||
:param instruction_old: |
|||
:return: |
|||
bool:true-正常返回给大模型,false-结果不返回给大模型 |
|||
str:执行的指令 |
|||
str:执行指令的结果 |
|||
''' |
|||
ext_params = self.create_extparams() |
|||
|
|||
# 定义允许的内置函数集合 --白名单 |
|||
allowed_builtins = { |
|||
'__import__': builtins.__import__, |
|||
"abs": abs, |
|||
"all": all, |
|||
"any": any, |
|||
"bool": bool, |
|||
"chr": chr, |
|||
"dict": dict, |
|||
"float": float, |
|||
"int": int, |
|||
"len": len, |
|||
"list": list, |
|||
"max": max, |
|||
"min": min, |
|||
"print": print, |
|||
"range": range, |
|||
"set": set, |
|||
"str": str, |
|||
"sum": sum, |
|||
"type": type, |
|||
'open':open, |
|||
'Exception':Exception, |
|||
# 根据需要可以添加其他安全的内置函数 |
|||
} |
|||
# 第一步:验证指令合法性 |
|||
instruction,time_out = self.validate_instruction(instruction_old) |
|||
if not instruction: |
|||
return False, instruction_old, "该指令暂不执行!","",ext_params |
|||
# 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#? |
|||
|
|||
# 第二步:执行指令 |
|||
output = "" |
|||
try: |
|||
# 构造安全的全局命名空间,只包含我们允许的 __builtins__ |
|||
# 虽然动态代码中包含了import subprocess,但是还是需要在全局命名空间中添加subprocess这些库 |
|||
# 正常情况应该是不需要的,后续再研究 |
|||
safe_globals = {"__builtins__": allowed_builtins, |
|||
'subprocess':subprocess, |
|||
'json':json, |
|||
're':re,} |
|||
safe_locals = {} #不需要预设局部参数 |
|||
# 在限制环境中执行代码 |
|||
exec(instruction, safe_globals,safe_locals) |
|||
# Check if check_samba_vuln is defined |
|||
if 'dynamic_fun' not in safe_locals: |
|||
analysis = "Function dynamic_fun() is not defined" |
|||
ext_params['is_use'] = True |
|||
return True,instruction,analysis,analysis,ext_params |
|||
# Get the function and call it |
|||
dynamic_fun = safe_locals['dynamic_fun'] |
|||
status, tmpout = dynamic_fun() |
|||
output = f"status:{status},output:{tmpout}" |
|||
except Exception as e: |
|||
analysis = f"执行动态代码时出错: {str(e)}" |
|||
ext_params['is_use'] = True |
|||
return True,instruction,analysis,analysis,ext_params |
|||
|
|||
|
|||
# 第三步:分析执行结果 |
|||
analysis = self.analyze_result(output, instruction,"","") |
|||
# 指令和结果入数据库 |
|||
# ? |
|||
if not analysis: # analysis为“” 不提交LLM |
|||
return False, instruction, analysis,"",ext_params |
|||
return True, instruction, analysis,"",ext_params |
|||
|
|||
def analyze_result(self, result,instruction,stderr,stdout): |
|||
#指令结果分析 |
|||
return result |
|||
|
|||
if __name__ == "__main__": |
|||
llm_code = """ |
|||
def run_test(): |
|||
return 'Penetration test executed successfully!' |
|||
""" |
@ -1,29 +1,130 @@ |
|||
from tools.ToolBase import ToolBase |
|||
import re |
|||
import json |
|||
import subprocess |
|||
import os |
|||
import shutil |
|||
from pathlib import Path |
|||
class SearchsploitTool(ToolBase): |
|||
def validate_instruction(self, instruction): |
|||
#指令过滤 |
|||
timeout = 0 |
|||
if "-m " in instruction: # 下载利用代码 |
|||
timeout = 60 |
|||
return instruction,timeout |
|||
|
|||
def analyze_result(self, result,instruction,stderr,stdout): |
|||
#获取当前路径 |
|||
cur_path = Path(__file__).resolve().parent |
|||
payload_dir = cur_path / "../payload" |
|||
|
|||
#if instruction(result,bytes): |
|||
if type(result) is bytes: |
|||
result = result.decode('utf-8',errors='ignore') |
|||
|
|||
"""去除 ANSI 颜色码""" |
|||
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') |
|||
clean_result = ansi_escape.sub('', result) |
|||
#指令结果分析 |
|||
lines = clean_result.split("\n") |
|||
exploits = [] |
|||
|
|||
for line in lines: |
|||
match = re.match(r"(.+?)\s+\|\s+(\S+)", line) |
|||
if match: |
|||
title = match.group(1).strip() |
|||
path = match.group(2).strip() |
|||
exploits.append({"title": title, "path": path}) |
|||
|
|||
if len(exploits) > 0: |
|||
result = json.dumps(exploits) #需要转化成字符串-必须 |
|||
else: |
|||
result = "没有检索到漏洞利用脚本" |
|||
# 指令结果分析 |
|||
if "-m " in instruction:#下载利用代码 |
|||
if "Copied to:" in clean_result or "cp: overwrite " in clean_result: |
|||
result = "下载完成" |
|||
else: |
|||
result = "下载失败" |
|||
else: #目前只遇到两种searchsploit命令格式: |
|||
lines = clean_result.split("\n") |
|||
exploits = [] |
|||
|
|||
for line in lines: |
|||
match = re.match(r"(.+?)\s+\|\s+(\S+)", line) |
|||
if match: |
|||
title = match.group(1).strip() |
|||
path = match.group(2).strip() |
|||
if "Path" not in path: |
|||
#下载渗透脚本--进一步推进 |
|||
ext_str = self.do_ext(path,payload_dir) |
|||
exploits.append({"title": title, "path": path,"补充信息":ext_str}) |
|||
|
|||
if len(exploits) > 0: |
|||
result = json.dumps(exploits,ensure_ascii=False) #输出原始的中文字符 |
|||
else: |
|||
result = "没有检索到漏洞利用脚本" |
|||
return result |
|||
|
|||
def find_file_in_directory(self,target_file, dir_path, case_sensitive=True, recursive=False): |
|||
""" |
|||
在指定目录中查找文件是否存在 |
|||
:param target_file: 要查找的目标文件名(含扩展名) |
|||
:param search_dir: 要搜索的目录路径 |
|||
:param case_sensitive: 是否区分大小写(默认True) |
|||
:param recursive: 是否递归搜索子目录(默认False) |
|||
:return: (bool, str) 是否存在,完整路径(如果找到) |
|||
""" |
|||
try: |
|||
#dir_path = Path(search_dir) |
|||
|
|||
# 验证目录是否存在 |
|||
if not dir_path.is_dir(): |
|||
return False, None |
|||
|
|||
# 根据是否递归选择遍历方式 |
|||
iterator = dir_path.rglob('*') if recursive else dir_path.iterdir() |
|||
|
|||
for entry in iterator: |
|||
# 跳过目录只处理文件 |
|||
if entry.is_file(): |
|||
# 根据是否区分大小写进行比较 |
|||
if case_sensitive: |
|||
match = entry.name == target_file |
|||
else: |
|||
match = entry.name.lower() == target_file.lower() |
|||
|
|||
if match: |
|||
return True, str(entry.resolve()) |
|||
|
|||
return False, None |
|||
|
|||
except Exception as e: |
|||
print(f"查找出错: {str(e)}") |
|||
return False, None |
|||
|
|||
def do_download(self,filepath,dirpath): |
|||
''' |
|||
下载渗透脚本 |
|||
:return: |
|||
''' |
|||
filename = filepath.split("/")[-1] |
|||
if ".py" in filename: |
|||
#对应payload库中是否存在该脚本 |
|||
bfind,_ = self.find_file_in_directory(filename,dirpath) |
|||
if bfind: |
|||
return True |
|||
else:#下载 |
|||
instruction = f"searchsploit -m {filepath}" |
|||
try: |
|||
subprocess.run(instruction, shell=True, check=True,timeout=60) |
|||
print("命令执行成功,文件下载完成。") |
|||
except subprocess.CalledProcessError as e: #超时暂不处理--后续补充 |
|||
print(f"命令执行失败: {e}") |
|||
return False |
|||
|
|||
if not os.path.exists(filename): |
|||
return False |
|||
# 移动文件到目标目录 |
|||
try: |
|||
shutil.move(filename, os.path.join(dirpath, filename)) |
|||
print(f"文件已成功移动到 {dirpath}") |
|||
return True |
|||
except Exception as e: |
|||
print(f"移动文件失败: {e}") |
|||
return False |
|||
else: #暂时只利用python脚本 |
|||
return False |
|||
|
|||
|
|||
def do_ext(self,filepath,payload_dir) -> str: |
|||
bdownload = self.do_download(filepath,payload_dir) |
|||
if bdownload: |
|||
return "该渗透脚本已经复制到当前路径。" |
|||
else: |
|||
return "暂时不利用该渗透脚本。" |
|||
|
Loading…
Reference in new issue