Compare commits
2 Commits
392022633a
...
53c3ed4d10
Author | SHA1 | Date |
---|---|---|
|
53c3ed4d10 | 1 month ago |
|
5c988bb36a | 1 month ago |
17 changed files with 1030 additions and 456 deletions
@ -1,173 +0,0 @@ |
|||||
''' |
|
||||
实现对大模型调用的封装,隔离具体使用的LLM |
|
||||
pip install openai |
|
||||
''' |
|
||||
import openai |
|
||||
import json |
|
||||
import threading |
|
||||
import re |
|
||||
from openai import OpenAI |
|
||||
from myutils.MyTime import get_local_timestr |
|
||||
|
|
||||
class LLMManager: |
|
||||
def __init__(self,illm_type=0): |
|
||||
self.api_key = None |
|
||||
self.api_url = None |
|
||||
self.task_id =0 #一个任务一个id |
|
||||
self.llm_sn = 0 # llm执行序列号,--一任务一序列 |
|
||||
self.llm_sn_lock = threading.Lock() # |
|
||||
#temperature设置 |
|
||||
#DS------代码生成/数学解题:0.0 -- 数据抽取/分析:1.0 -- 通用对话:1.3 -- 翻译:1.3 -- 创意类写作:1.5 |
|
||||
#腾讯云--- |
|
||||
self.temperature = 1.0 |
|
||||
if illm_type == 0: #腾讯云 |
|
||||
self.api_key = "fGBYaQLHykBOQsFwVrQdIFTsYr8YDtDVDQWFU41mFsmvfNPc" |
|
||||
self.api_url = "" |
|
||||
elif illm_type == 1: #DS |
|
||||
self.api_key ="sk-10360148b465424288218f02c87b0e1b" |
|
||||
self.api_url ="https://api.deepseek.com/v1" |
|
||||
self.model = "deepseek-reasoner" #model=deepseek-reasoner -- R1 model=deepseek-chat --V3 |
|
||||
#创建会话对象 -- 一个任务的LLM必须唯一 |
|
||||
self.client = OpenAI(api_key=self.api_key,base_url=self.api_url) |
|
||||
self.messages = [] |
|
||||
# 初始化阶段和已知信息 |
|
||||
current_stage = "信息收集" |
|
||||
known_info = {"url": "www.test.com"} |
|
||||
results = [] # 存储所有任务结果,用于生成报告 |
|
||||
|
|
||||
#******测试使用,设置slef.message的值 |
|
||||
def test_old_message(self,strMessage): |
|
||||
try: |
|
||||
self.messages = json.loads(strMessage) |
|
||||
except json.JSONDecodeError as e: |
|
||||
print(f"JSON解析错误: {str(e)}") |
|
||||
except Exception as e: |
|
||||
print(f"错误: {str(e)}") |
|
||||
|
|
||||
# 构建初始提示 初始化messages |
|
||||
def build_initial_prompt(self,target,know_info=""): |
|
||||
self.messages = [{"role": "system", |
|
||||
"content": "你是一位资深的渗透测试专家,需要动态控制整个渗透测试过程,包括信息收集、漏洞扫描、漏洞利用等阶段,最终生成渗透测试报告。由你规划执行的指令,我会根据你的指令执行并提交结果,你再对结果进行分析,规划并生成下一步指令,直到完成渗透测试,生成测试报告。" |
|
||||
"生成的指令需满足如下约束:" |
|
||||
"1.只返回具体的shell指令或Python代码,不要包含注释和说明;" |
|
||||
"2.shell指令以```bash(.*?)```包裹,python代码以```python(.*?)```包裹;" |
|
||||
"3.针对提供的Python代码,需要包含错误处理,并将执行结果保存到文件中(为每个任务指定唯一文件名),执行结束返回success或failure和该文件名;" |
|
||||
"4.如果认为渗透测试已完成,请生成生成报告的python代码,并返回success和complete"}] # 一个messages |
|
||||
return f"现在开始对目标{target}进行渗透测试,已知信息{know_info},请提供下一步执行的指令。" |
|
||||
|
|
||||
# 构建反馈提示 |
|
||||
def build_feedback_prompt(self,bres,instruction, result): |
|
||||
if bres: |
|
||||
return f"执行指令“{instruction}”的结果是“{result}”。请根据这些结果生成下一步具体的指令。" |
|
||||
else: |
|
||||
return "" |
|
||||
|
|
||||
def init_data(self,task_id=0): |
|
||||
#初始化LLM数据 |
|
||||
self.llm_sn = 0 |
|
||||
self.task_id = task_id |
|
||||
self.messages = [] |
|
||||
|
|
||||
# 调用LLM生成指令 |
|
||||
def get_llm_instruction(self,prompt,th_DBM): |
|
||||
''' |
|
||||
1.由于大模型API不记录用户请求的上下文,一个任务的LLM不能并发! |
|
||||
:param prompt:用户本次输入的内容 |
|
||||
:return: instr_list |
|
||||
''' |
|
||||
#添加本次输入入队列 |
|
||||
message = {"role":"user","content":prompt} |
|
||||
self.messages.append(message) |
|
||||
|
|
||||
#提交LLM |
|
||||
post_time = get_local_timestr() |
|
||||
response = self.client.chat.completions.create( |
|
||||
model=self.model, |
|
||||
messages = self.messages |
|
||||
) |
|
||||
|
|
||||
reasoning_content = "" |
|
||||
content = "" |
|
||||
#LLM返回处理 |
|
||||
if self.model == "deepseek-reasoner": |
|
||||
#返回错误码:DS-https://api-docs.deepseek.com/zh-cn/quick_start/error_codes |
|
||||
reasoning_content = response.choices[0].message.reasoning_content #推理过程 |
|
||||
print(reasoning_content) |
|
||||
content = response.choices[0].message.content #推理内容 |
|
||||
print(content) |
|
||||
#记录历史信息 |
|
||||
self.messages.append({'role': 'assistant', 'content': content}) |
|
||||
elif self.model == "deepseek-chat": |
|
||||
content = response.choices[0].message |
|
||||
#记录历史信息 |
|
||||
self.messages.append(content) |
|
||||
#LLM记录存数据库 |
|
||||
with self.llm_sn_lock: |
|
||||
self.llm_sn += 1 |
|
||||
#llm查询记录入库 |
|
||||
bres = th_DBM.insert_llm(self.task_id,prompt,reasoning_content,content,post_time,self.llm_sn) |
|
||||
if not bres: |
|
||||
print("llm入库失败!") |
|
||||
|
|
||||
#********测试时使用---输出和记录LLM返回指令的message |
|
||||
print(f"Messages:{self.messages}") |
|
||||
with open("test","w",encoding="utf-8") as f: #输出到文件 |
|
||||
json.dump(self.messages,f,ensure_ascii=False) |
|
||||
|
|
||||
#需要对指令进行提取 |
|
||||
instr_list = self.fetch_instruction(content) |
|
||||
return instr_list |
|
||||
|
|
||||
def fetch_instruction(self,response_text): |
|
||||
''' |
|
||||
提取命令列表,包括: |
|
||||
1. Python 代码块(仅保留有效 Python 代码) |
|
||||
2. Shell 命令(分割空行,每个块视为一条指令) |
|
||||
|
|
||||
:param text: 输入文本 |
|
||||
:return: 解析后的命令列表 |
|
||||
''' |
|
||||
#针对llm的回复,提取执行的指令 |
|
||||
# 正则匹配 Python 代码块 |
|
||||
python_blocks = re.findall(r"```python(.*?)```", response_text, flags=re.DOTALL) |
|
||||
# 处理 Python 代码块,去除空行并格式化 |
|
||||
python_blocks = [block.strip() for block in python_blocks] |
|
||||
|
|
||||
# 按连续的空行拆分 |
|
||||
# 移除 Python 代码块,但保留内容用于返回 |
|
||||
text_no_python = re.sub(r"```python.*?```", "PYTHON_BLOCK", response_text, flags=re.DOTALL) |
|
||||
# 这里用 \n\s*\n 匹配一个或多个空白行 |
|
||||
parts = re.split(r'\n\s*\n', text_no_python) |
|
||||
|
|
||||
commands = [] |
|
||||
python_index = 0 # 记录 Python 代码块插入位置 |
|
||||
|
|
||||
for part in parts: |
|
||||
part = part.strip() |
|
||||
if not part: |
|
||||
continue |
|
||||
if "PYTHON_BLOCK" in part: |
|
||||
# 还原 Python 代码块 |
|
||||
commands.append(f"python {python_blocks[python_index]}") |
|
||||
python_index += 1 |
|
||||
else: |
|
||||
# 添加普通 Shell 命令 |
|
||||
commands.append(part) |
|
||||
|
|
||||
return commands |
|
||||
|
|
||||
def test_llm(self): |
|
||||
with open("test", "r", encoding="utf-8") as f: |
|
||||
messages = json.load(f) |
|
||||
text = messages[-1]["content"] |
|
||||
list = self.fetch_instruction(text) |
|
||||
for itme in list: |
|
||||
print("***********") |
|
||||
print(itme) |
|
||||
|
|
||||
|
|
||||
if __name__ == "__main__": |
|
||||
LM = LLMManager(1) |
|
||||
LM.test_llm() |
|
||||
|
|
||||
|
|
@ -0,0 +1,148 @@ |
|||||
|
import queue |
||||
|
#渗透测试树结构维护类 |
||||
|
class AttackTree: |
||||
|
def __init__(self,root_node): |
||||
|
#针对根节点处理 |
||||
|
self.root = root_node |
||||
|
self.root.path = f"目标系统->{root_node.name}" |
||||
|
|
||||
|
def set_root(self,root_node): |
||||
|
self.root = root_node |
||||
|
|
||||
|
def add_node(self,parent_name,new_node): |
||||
|
"""根据父节点名称添加新节点""" |
||||
|
parent_node = self.find_node_by_name(parent_name) |
||||
|
if parent_node: |
||||
|
parent_node.add_child(new_node) |
||||
|
return True |
||||
|
return False |
||||
|
|
||||
|
def traverse_bfs(self): |
||||
|
"""广度优先遍历""" |
||||
|
if not self.root: |
||||
|
return [] |
||||
|
|
||||
|
queue = [self.root] |
||||
|
result = [] |
||||
|
while queue: |
||||
|
current = queue.pop(0) |
||||
|
result.append(current) |
||||
|
queue.extend(current.children) |
||||
|
return result |
||||
|
|
||||
|
def traverse_dfs(self, node=None, result=None): |
||||
|
"""深度优先遍历(前序遍历)""" |
||||
|
if result is None: |
||||
|
result = [] |
||||
|
if node is None: |
||||
|
node = self.root |
||||
|
if not node: |
||||
|
return [] |
||||
|
|
||||
|
result.append(node) |
||||
|
for child in node.children: |
||||
|
self.traverse_dfs(child, result) |
||||
|
return result |
||||
|
|
||||
|
def find_node_by_name(self, name): |
||||
|
"""根据名称查找节点(广度优先)""" |
||||
|
nodes = self.traverse_bfs() |
||||
|
for node in nodes: |
||||
|
if node.name == name: |
||||
|
return node |
||||
|
return None |
||||
|
|
||||
|
def find_node_by_nodepath(self,node_path): |
||||
|
'''基于节点路径查找节点,只返回找到的第一个节点,若有节点名称路径重复的情况暂不处理''' |
||||
|
current_node = self.root #从根节点开始 |
||||
|
node_names = node_path.split('->') |
||||
|
for node_name in node_names: |
||||
|
if node_name == "目标系统": |
||||
|
continue |
||||
|
if node_name == current_node.name:#根节点开始 |
||||
|
continue |
||||
|
else: |
||||
|
bfound = False |
||||
|
for child_node in current_node.children: |
||||
|
if child_node.name == node_name: #约束同一父节点下的子节点名称不能相同 |
||||
|
current_node = child_node |
||||
|
bfound = True |
||||
|
break |
||||
|
if not bfound: #如果遍历子节点都没有符合的,说明路径有问题的,不处理中间一段路径情况 |
||||
|
return None |
||||
|
#找到的话,就开始匹配下一层 |
||||
|
return current_node |
||||
|
|
||||
|
def find_nodes_by_status(self, status): |
||||
|
"""根据状态查找所有匹配节点""" |
||||
|
return [node for node in self.traverse_bfs() if node.status == status] |
||||
|
|
||||
|
def find_nodes_by_vul_type(self, vul_type): |
||||
|
"""根据漏洞类型查找所有匹配节点""" |
||||
|
return [node for node in self.traverse_bfs() if node.vul_type == vul_type] |
||||
|
|
||||
|
#考虑要不要用tree封装节点的操作--待定 |
||||
|
def update_node_status(self, node_name, new_status): |
||||
|
"""修改节点状态""" |
||||
|
node = self.find_node_by_name(node_name) |
||||
|
if node: |
||||
|
node.status = new_status |
||||
|
return True |
||||
|
return False |
||||
|
|
||||
|
def update_node_vul_type(self,node_name,vul_type): |
||||
|
"""修改节点漏洞类型""" |
||||
|
node = self.find_node_by_name(node_name) |
||||
|
if node: |
||||
|
node.vul_type = vul_type |
||||
|
return True |
||||
|
return False |
||||
|
|
||||
|
def print_tree(self, node=None, level=0): |
||||
|
"""可视化打印树结构""" |
||||
|
if node is None: |
||||
|
node = self.root |
||||
|
prefix = " " * level + "|-- " if level > 0 else "" |
||||
|
print(f"{prefix}{node.name} [{node.status}, {node.vul_type}]") |
||||
|
for child in node.children: |
||||
|
self.print_tree(child, level + 1) |
||||
|
|
||||
|
|
||||
|
class TreeNode: |
||||
|
def __init__(self, name, status="未完成", vul_type="未发现"): |
||||
|
self.name = name # 节点名称 |
||||
|
self.status = status # 节点状态 |
||||
|
self.vul_type = vul_type # 漏洞类型 |
||||
|
self.children = [] # 子节点列表 |
||||
|
self.parent = None # 父节点引用 |
||||
|
self.path = "" #当前节点的路径 |
||||
|
self.instr_queue = [] #queue.Queue() #针对当前节点的执行指令----重要约束:一个节点只能有一个线程在执行指令 |
||||
|
self.res_quere = [] #queue.Queue() #指令执行的结果,一批一批 |
||||
|
self.llm_sn = 0 #针对该节点llm提交次数 |
||||
|
self.do_sn = 0 #针对该节点instr执行次数 |
||||
|
self.messages = [] #针对当前节点积累的messages -- 针对不同节点提交不同的messages |
||||
|
|
||||
|
def add_child(self, child_node): |
||||
|
child_node.parent = self |
||||
|
child_node.path = self.path + f"->{child_node.name}" #子节点的路径赋值 |
||||
|
child_node.messages = self.messages #传递messages #给什么时候的messages待验证#? |
||||
|
self.children.append(child_node) |
||||
|
|
||||
|
def add_instr(self,instr): |
||||
|
self.instr_queue.append(instr) |
||||
|
|
||||
|
def get_instr(self): |
||||
|
return self.instr_queue.pop(0) if self.instr_queue else None |
||||
|
|
||||
|
def add_res(self,str_res): #结构化结果字串 |
||||
|
self.res_quere.append(str_res) |
||||
|
|
||||
|
def get_res(self): |
||||
|
return self.res_queue.pop(0) if self.res_queue else None |
||||
|
|
||||
|
def __repr__(self): |
||||
|
return f"TreeNode({self.name}, {self.status}, {self.vul_type})" |
||||
|
|
||||
|
|
||||
|
if __name__ == "__main__": |
||||
|
pass |
@ -0,0 +1,180 @@ |
|||||
|
#自动化测试逻辑规则控制 |
||||
|
#统一控制规则 和 渗透测试树的维护 |
||||
|
import json |
||||
|
import re |
||||
|
import queue |
||||
|
from mycode.AttackMap import AttackTree |
||||
|
from mycode.AttackMap import TreeNode |
||||
|
from mycode.LLMManager import LLMManager |
||||
|
from myutils.ConfigManager import myCongif #单一实例 |
||||
|
from myutils.MyLogger_logger import LogHandler |
||||
|
|
||||
|
class ControlCenter: |
||||
|
def __init__(self,DBM): |
||||
|
self.logger = LogHandler().get_logger("ControlCenter") |
||||
|
self.task_id = None |
||||
|
self.target = None |
||||
|
self.attack_tree = None |
||||
|
self.DBM = DBM |
||||
|
#LLM对象 |
||||
|
self.LLM = LLMManager(myCongif.get_data("LLM_type")) |
||||
|
|
||||
|
def __del__(self): |
||||
|
self.task_id = None |
||||
|
self.target = None |
||||
|
self.attack_tree = None |
||||
|
self.DBM = None |
||||
|
|
||||
|
def init_cc_data(self): |
||||
|
#一次任务一次数据 |
||||
|
pass |
||||
|
|
||||
|
def get_user_init_info(self): |
||||
|
'''开始任务初,获取用户设定的基础信息''' |
||||
|
# ?包括是否对目标进行初始化的信息收集 |
||||
|
return "无" |
||||
|
|
||||
|
def start_do(self,target,task_id): |
||||
|
'''一个新任务的开始''' |
||||
|
self.task_id = task_id |
||||
|
self.target = target |
||||
|
#创建测试树 |
||||
|
if self.attack_tree: |
||||
|
self.attack_tree = None #释放 |
||||
|
root_node = TreeNode(target) |
||||
|
self.attack_tree = AttackTree(root_node)#创建测试树,同时更新根节点相关内容 |
||||
|
#获取初始指令 |
||||
|
know_info = self.get_user_init_info() |
||||
|
prompt = self.LLM.build_initial_prompt(target,know_info,root_node) |
||||
|
node_cmds, commands = self.LLM.get_llm_instruction(prompt,self.DBM,root_node) |
||||
|
# 更新tree |
||||
|
self.tree_manager(node_cmds,root_node) |
||||
|
# 分析指令入对应节点 |
||||
|
node_list = self.instr_in_node(commands,root_node) |
||||
|
return node_list |
||||
|
|
||||
|
#约束1:一个节点只能同时提交一次,未测试的节点不要重复 |
||||
|
def get_llm_instruction(self,node): |
||||
|
#拼接结果字符串--由于测试需要queue改成了list 2025-3-19 |
||||
|
# json_strings = [] |
||||
|
# if node.res_quere: |
||||
|
# for item in node.res_quere: |
||||
|
# json_str = json.dumps(item, ensure_ascii=False) |
||||
|
# json_strings.append(json_str) |
||||
|
# res_str = ','.join(json_strings) |
||||
|
|
||||
|
res_str = json.dumps(node.res_quere,ensure_ascii=False) |
||||
|
|
||||
|
#构造本次提交的prompt |
||||
|
user_Prompt = f''' |
||||
|
当前分支路径:{node.path} |
||||
|
当前节点信息: |
||||
|
- 节点名称:{node.name} |
||||
|
- 节点状态:{node.status} |
||||
|
- 漏洞类型:{node.vul_type} |
||||
|
上一步结果:{res_str} |
||||
|
任务:生成下一步渗透测试指令或结束该节点渗透测试。 |
||||
|
''' |
||||
|
node_cmds,commands = self.LLM.get_llm_instruction(user_Prompt,self.DBM,node) |
||||
|
#更新tree |
||||
|
self.tree_manager(node_cmds,node) |
||||
|
#分析指令入对应节点 |
||||
|
node_list = self.instr_in_node(commands,node) |
||||
|
return node_list |
||||
|
|
||||
|
def tree_manager(self,node_cmds,node): |
||||
|
'''更新渗透测试树''' |
||||
|
if not node_cmds or len(node_cmds)==0: |
||||
|
return |
||||
|
try: |
||||
|
node_jsons = json.loads(node_cmds) |
||||
|
for node_json in node_jsons: |
||||
|
if node_json["action"] == "add_node": #新增节点 |
||||
|
parent_node_name = node_json["parent"] |
||||
|
node_name = node_json["node"] |
||||
|
status = node_json["status"] |
||||
|
#新增节点原则上应该都是当前节点增加子节点 |
||||
|
if node.name == parent_node_name: |
||||
|
new_node = TreeNode(node_name,status) |
||||
|
node.add_child(new_node) #message的传递待验证 |
||||
|
else: |
||||
|
self.logger.error("添加子节点时,遇到父节点名称不一致的,需要介入!!") |
||||
|
elif node_json["action"] == "update_status": |
||||
|
node_name = node_json["node"] |
||||
|
status = node_json["status"] |
||||
|
vul_type = node_json["vulnerability"] |
||||
|
if node.name == node_name: |
||||
|
node.status = status |
||||
|
node.vul_type = vul_type |
||||
|
else: |
||||
|
self.logger.error("遇到不是修改本节点状态的,需要介入!!") |
||||
|
else: |
||||
|
self.logger.error("node更新JSON遇到异常参数!") |
||||
|
except json.JSONDecodeError as e: |
||||
|
self.logger.error(f"JSON 解析失败:{e.msg}") |
||||
|
self.logger.error(f"错误位置:第{e.lineno}行,列{e.colno}") |
||||
|
except TypeError as e: |
||||
|
self.logger.error(f"输入类型错误:{str(e)}") |
||||
|
|
||||
|
def instr_in_node(self,commands,node): |
||||
|
node_list = [] |
||||
|
for command in commands: |
||||
|
# 使用正则匹配方括号中的node_path(非贪婪模式) |
||||
|
match = re.search(r'\[(.*?)\]', command) |
||||
|
if match: |
||||
|
node_path = match.group(1) |
||||
|
find_node = self.attack_tree.find_node_by_nodepath(node_path) |
||||
|
if find_node: |
||||
|
instruction = re.sub(r'\[.*?\]', "", command,count=1,flags=re.DOTALL) |
||||
|
find_node.instr_queue.append(instruction) |
||||
|
#入输出队列 |
||||
|
if find_node not in node_list: |
||||
|
node_list.append(find_node) |
||||
|
else: |
||||
|
self.logger.error(f"基于节点路径没有找到对应的节点{node_path}") |
||||
|
else: |
||||
|
self.logger.error(f"得到的指令格式不符合规范:{command}") |
||||
|
return node_list |
||||
|
|
||||
|
#待修改 |
||||
|
def is_user_instr(self,instr): |
||||
|
''' |
||||
|
过滤需要人工确认或手动执行的指令 ---- 待完善 |
||||
|
:param instr: |
||||
|
:return: |
||||
|
''' |
||||
|
#if instr.startswith("curl") or instr.startswith("http") or instr.startswith("wget"): |
||||
|
if instr.startswith("http") or instr.startswith("wget") or instr.startswith("ssh"): |
||||
|
return True |
||||
|
|
||||
|
#指令入队列,待修改 |
||||
|
def instr_in_quere(self,instr_list): |
||||
|
''' |
||||
|
对于运行需要较长时间的不强求同一批次返回给LLM |
||||
|
:param instr_list: |
||||
|
:return: |
||||
|
''' |
||||
|
for instr in instr_list: |
||||
|
if self.is_user_instr(instr): |
||||
|
self.user_instr.put(instr) |
||||
|
print(f"需要人工确认的指令{instr}") |
||||
|
else: |
||||
|
matched =False |
||||
|
for prefix in self.long_time_instr: |
||||
|
if instr.startswith(prefix): |
||||
|
matched =True |
||||
|
if not matched: |
||||
|
with self.lock: |
||||
|
self.batch_num += 1 #非耗时指令+1 |
||||
|
print(f"&&&&&&当前batch_num:{self.batch_num}") |
||||
|
else: |
||||
|
with self.lock: |
||||
|
self.long_instr_num +=1 #耗时指令数量+1 |
||||
|
# 指令入队列 |
||||
|
self.instr_queue.append(instr) |
||||
|
|
||||
|
def stop_do(self): |
||||
|
#清空数据 |
||||
|
self.task_id = None |
||||
|
self.target = None |
||||
|
self.attack_tree = None |
@ -0,0 +1,23 @@ |
|||||
|
|
||||
|
class InitManger: |
||||
|
def __init__(self): |
||||
|
listen_tatus = False |
||||
|
|
||||
|
def init_tool_info(self,self_ip="",self_port=0): |
||||
|
''' |
||||
|
初始化工具信息和工作 - 一直运行的信息 |
||||
|
:return: |
||||
|
''' |
||||
|
#建立一个socket监听线程,处理反向连接 |
||||
|
if self_ip and self_port: #ip和端口有值后 |
||||
|
listen_tatus = False |
||||
|
pass |
||||
|
|
||||
|
def init_task_info(self,cookie=""): |
||||
|
''' |
||||
|
初始化任务信息,-- 一次任务一次的信息 |
||||
|
:return: |
||||
|
''' |
||||
|
pass |
||||
|
|
||||
|
InitM = InitManger() #单一实例 |
@ -0,0 +1,233 @@ |
|||||
|
''' |
||||
|
实现对大模型调用的封装,隔离具体使用的LLM |
||||
|
pip install openai |
||||
|
export OPENAI_API_KEY="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" |
||||
|
''' |
||||
|
import openai |
||||
|
import json |
||||
|
import threading |
||||
|
import re |
||||
|
from openai import OpenAI |
||||
|
from myutils.MyTime import get_local_timestr |
||||
|
from myutils.MyLogger_logger import LogHandler |
||||
|
|
||||
|
class LLMManager: |
||||
|
def __init__(self,illm_type=0): |
||||
|
self.logger = LogHandler().get_logger("LLMManager") |
||||
|
self.api_key = None |
||||
|
self.api_url = None |
||||
|
self.task_id =0 #一个任务一个id |
||||
|
self.llm_sn = 0 # llm执行序列号,--一任务一序列 |
||||
|
self.llm_sn_lock = threading.Lock() # |
||||
|
#temperature设置 |
||||
|
#DS------代码生成/数学解题:0.0 -- 数据抽取/分析:1.0 -- 通用对话:1.3 -- 翻译:1.3 -- 创意类写作:1.5 |
||||
|
#腾讯云--- |
||||
|
self.temperature = 1.0 |
||||
|
if illm_type == 0: #腾讯云 |
||||
|
self.api_key = "fGBYaQLHykBOQsFwVrQdIFTsYr8YDtDVDQWFU41mFsmvfNPc" |
||||
|
self.api_url = "" |
||||
|
elif illm_type == 1: #DS |
||||
|
self.api_key ="sk-10360148b465424288218f02c87b0e1b" |
||||
|
self.api_url ="https://api.deepseek.com/v1" |
||||
|
self.model = "deepseek-reasoner" #model=deepseek-reasoner -- R1 model=deepseek-chat --V3 |
||||
|
# 创建会话对象 -- 一个任务的LLM必须唯一 |
||||
|
self.client = OpenAI(api_key=self.api_key, base_url=self.api_url) |
||||
|
elif illm_type ==3: #GPT |
||||
|
self.api_key ="" |
||||
|
self.api_url = "" |
||||
|
self.model = "" |
||||
|
self.client = OpenAI() |
||||
|
|
||||
|
self.messages = [] |
||||
|
# 初始化阶段和已知信息 |
||||
|
current_stage = "信息收集" |
||||
|
known_info = {"url": "www.test.com"} |
||||
|
results = [] # 存储所有任务结果,用于生成报告 |
||||
|
|
||||
|
#******测试使用,设置slef.message的值 |
||||
|
def test_old_message(self,strMessage): |
||||
|
try: |
||||
|
self.messages = json.loads(strMessage) |
||||
|
except json.JSONDecodeError as e: |
||||
|
print(f"JSON解析错误: {str(e)}") |
||||
|
except Exception as e: |
||||
|
print(f"错误: {str(e)}") |
||||
|
|
||||
|
# 构建初始提示 初始化messages |
||||
|
def build_initial_prompt(self,target,know_info="无",node=None): |
||||
|
if not node: |
||||
|
return None |
||||
|
#根节点初始化message |
||||
|
node.messages = [{"role": "system", |
||||
|
"content":''' |
||||
|
你是一位渗透测试专家,由你动态控制整个渗透测试过程,根据当前测试状态和返回结果,决定下一步的测试指令,推动测试阶段前进,直至完成渗透测试。本地程序会根据你的指令进行执行,然后将执行结果返回给你。 |
||||
|
总体要求说明: |
||||
|
1.以目标系统所在IP为根节点,随着信息收集和渗透测试的推进,每个渗透测试点(如端口、服务、漏洞点)作为子节点,形成树型结构(测试树); |
||||
|
2.测试树整体数据由本地程序存储,你只需要关注当前节点的渗透测试推进,节点状态(未完成、已完成)的更新,和是否有子节点新增; |
||||
|
3.你需要返回两类指令:节点指令和测试指令,以空行间隔; |
||||
|
--示例: |
||||
|
{\"action\":\"add_node\", \"parent\": \"192.168.1.100\", \"node\": \"21端口\", \"status\": \"未完成\"} |
||||
|
|
||||
|
```bash-[目标系统->192.168.1.100->3306端口] |
||||
|
mysql -u root -p 192.168.1.100``` |
||||
|
若无节点修改或新增,节点指令可以为空,但测试指令必须对应已有节点; |
||||
|
决策流程: |
||||
|
1. 如果当前节点是IP地址,且未进行端口扫描,则执行端口扫描。 |
||||
|
2. 如果端口扫描发现开放端口,则为每个开放端口新增一个测试节点。 |
||||
|
3. 如果当前节点是端口,且未进行服务扫描,则执行服务扫描。 |
||||
|
4. 如果服务扫描发现服务版本或漏洞,则新增对应的漏洞测试节点。 |
||||
|
5. 如果当前节点是漏洞测试,且漏洞利用成功,则根据利用结果决定是否新增子节点进一步测试。 |
||||
|
6. 如果当前节点测试未发现新信息,则更新节点状态为“已完成”,并继续测试其他未完成节点。 |
||||
|
生成的节点指令需要满足如下约束: |
||||
|
1.新增节点指令示例:{\"action\":\"add_node\", \"parent\": \"80端口\", \"node\": \"http://192.168.1.100/index.php?id=1\", \"status\": \"未完成\"}; |
||||
|
2.新增子节点时,同一请求返回的子节点名(node)不能相同,且必须同时提供对该节点的测试指令; |
||||
|
3.若认为该节点已完成测试,修改该节点为“已完成”状态,完成节点测试且发现漏洞示例:{\"action\": \"update_status\", \"node\": \"21端口\", \"status\": \"已完成\",\"vulnerability\": \"ftp匿名登录\"}; |
||||
|
4.发现漏洞后,可根据漏洞类型决定是否新增子节点继续测试,如:{\"action\": \"add_node\", \"parent\": \"21端口\", \"node\": \"ftp配置检查\", \"status\": \"未完成\"}; |
||||
|
生成的渗透测试指令需满足如下约束: |
||||
|
1.只返回具体的shell指令或Python代码,不要包含注释和说明; |
||||
|
2.shell指令以```bash-[对应节点的路径](.*?)```包裹,python代码以```python-[对应节点的路径](.*?)```包裹,[对应节点的路径]为从根节点到目标节点的完整层级描述; |
||||
|
3.若提供的是shell指令,需要避免用户再次交互; |
||||
|
4.若提供的是python代码,主函数名为dynamic_fun,需包含错误处理,执行结束后必须返回一个tuple (status, output),其中status为'success'或'failure',output为补充输出信息; |
||||
|
示例: |
||||
|
```python-[目标系统->192.168.1.100->3306端口] |
||||
|
def dynamic_fun(): |
||||
|
try: |
||||
|
result = "扫描完成" |
||||
|
return ("success", result) |
||||
|
except Exception as e: |
||||
|
return ("failure", str(e)) |
||||
|
``` |
||||
|
限制条件: |
||||
|
1.仅在发现高危漏洞或关键测试路径时新增节点 |
||||
|
'''}] # 一个messages |
||||
|
user_Prompt = f''' |
||||
|
当前分支路径:目标系统->{target} |
||||
|
当前节点信息: |
||||
|
- 节点名称:{target} |
||||
|
- 节点状态:未完成 |
||||
|
- 漏洞类型:未发现 |
||||
|
上一步结果:{know_info} |
||||
|
任务:生成下一步渗透测试指令或结束该节点的渗透测试(修改节点状态为:已完成)。 |
||||
|
''' |
||||
|
return user_Prompt |
||||
|
|
||||
|
def init_data(self,task_id=0): |
||||
|
#初始化LLM数据 |
||||
|
self.llm_sn = 0 |
||||
|
self.task_id = task_id |
||||
|
self.messages = [] |
||||
|
|
||||
|
# 调用LLM生成指令 |
||||
|
def get_llm_instruction(self,prompt,th_DBM,node): |
||||
|
''' |
||||
|
1.由于大模型API不记录用户请求的上下文,一个任务的LLM不能并发! |
||||
|
:param prompt:用户本次输入的内容 |
||||
|
:return: instr_list |
||||
|
''' |
||||
|
#添加本次输入入该节点的message队列 |
||||
|
message = {"role":"user","content":prompt} |
||||
|
node.messages.append(message) |
||||
|
|
||||
|
#提交LLM |
||||
|
post_time = get_local_timestr() |
||||
|
response = self.client.chat.completions.create( |
||||
|
model=self.model, |
||||
|
messages = node.messages |
||||
|
) |
||||
|
|
||||
|
#LLM返回结果处理 |
||||
|
reasoning_content = "" |
||||
|
content = "" |
||||
|
#LLM返回处理 |
||||
|
if self.model == "deepseek-reasoner": |
||||
|
#返回错误码:DS-https://api-docs.deepseek.com/zh-cn/quick_start/error_codes |
||||
|
reasoning_content = response.choices[0].message.reasoning_content #推理过程 |
||||
|
print(reasoning_content) |
||||
|
content = response.choices[0].message.content #推理内容 |
||||
|
print(content) |
||||
|
# 记录llm历史信息 |
||||
|
node.messages.append({'role': 'assistant', 'content': content}) |
||||
|
elif self.model == "deepseek-chat": |
||||
|
content = response.choices[0].message |
||||
|
# 记录llm历史信息 |
||||
|
node.messages.append(content) |
||||
|
else: |
||||
|
self.logger.error("处理到未预设的模型!") |
||||
|
return None |
||||
|
|
||||
|
#LLM记录存数据库 |
||||
|
node.llm_sn += 1 |
||||
|
bres = th_DBM.insert_llm(self.task_id,prompt,reasoning_content,content,post_time,node) |
||||
|
if not bres: |
||||
|
self.logger.error(f"{node.name}-llm入库失败!") |
||||
|
|
||||
|
#需要对指令进行提取 |
||||
|
node_cmds,commands = self.fetch_instruction(content,node) |
||||
|
|
||||
|
return node_cmds,commands |
||||
|
|
||||
|
def fetch_instruction(self,response_text,node): |
||||
|
''' |
||||
|
*****该函数很重要,需要一定的容错能力,解析LLM返回内容***** |
||||
|
处理边界:只格式化分析LLM返回内容,指令和节点操作等交其他模块。 |
||||
|
节点控制指令 |
||||
|
渗透测试指令 |
||||
|
提取命令列表,包括: |
||||
|
1. Python 代码块 python[](.*?) |
||||
|
2. Shell 命令``bash[](.*?)``` |
||||
|
:param text: 输入文本 |
||||
|
:return: node_cmds,python_blocks,shell_blocks |
||||
|
''' |
||||
|
#针对llm的回复,提取节点操作数据和执行的指令---- |
||||
|
# 正则匹配 Python 代码块 |
||||
|
python_blocks = re.findall(r"```python-(.*?)```", response_text, flags=re.DOTALL) |
||||
|
# 处理 Python 代码块,去除空行并格式化 |
||||
|
python_blocks = [block.strip() for block in python_blocks] |
||||
|
|
||||
|
#正则匹配shell指令 |
||||
|
shell_blocks = re.findall(f"```bash-(.*?)```", response_text, flags=re.DOTALL) |
||||
|
shell_blocks = [block.strip() for block in shell_blocks] |
||||
|
|
||||
|
# 按连续的空行拆分 |
||||
|
# 移除 Python和bash 代码块 |
||||
|
text_no_python = re.sub(r"```python.*?```", "PYTHON_BLOCK", response_text, flags=re.DOTALL) |
||||
|
text = re.sub(r"```bash.*?```", "SHELL_BLOCK", text_no_python, flags=re.DOTALL) |
||||
|
|
||||
|
# 这里用 \n\s*\n 匹配一个或多个空白行 |
||||
|
parts = re.split(r'\n\s*\n', text) |
||||
|
node_cmds = [] |
||||
|
commands = [] |
||||
|
python_index = 0 |
||||
|
shell_index = 0 |
||||
|
for part in parts: |
||||
|
part = part.strip() |
||||
|
if not part: |
||||
|
continue |
||||
|
if "PYTHON_BLOCK" in part: |
||||
|
# 还原 Python 代码块 |
||||
|
commands.append(f"python_code {python_blocks[python_index]}") |
||||
|
python_index += 1 |
||||
|
elif "SHELL_BLOCK" in part: |
||||
|
commands.append(shell_blocks[shell_index]) |
||||
|
shell_index +=1 |
||||
|
else: |
||||
|
#其他的认为是节点操作指令 |
||||
|
node_cmds.append(part) |
||||
|
|
||||
|
return node_cmds,commands |
||||
|
|
||||
|
def test_llm(self): |
||||
|
with open("../test", "r", encoding="utf-8") as f: |
||||
|
messages = json.load(f) |
||||
|
text = messages[-1]["content"] |
||||
|
list = self.fetch_instruction(text) |
||||
|
for itme in list: |
||||
|
print("***********") |
||||
|
print(itme) |
||||
|
|
||||
|
|
||||
|
if __name__ == "__main__": |
||||
|
LM = LLMManager(1) |
||||
|
LM.test_llm() |
||||
|
|
||||
|
|
File diff suppressed because one or more lines are too long
@ -1,80 +0,0 @@ |
|||||
#python代码动态执行 |
|
||||
|
|
||||
from tools.ToolBase import ToolBase |
|
||||
|
|
||||
class PythonTool(ToolBase): |
|
||||
def validate_instruction(self, instruction): |
|
||||
#指令过滤 |
|
||||
timeout = 0 |
|
||||
return "",timeout |
|
||||
|
|
||||
def execute_instruction(self, instruction_old): |
|
||||
''' |
|
||||
执行指令:验证合法性 -> 执行 -> 分析结果 |
|
||||
:param instruction_old: |
|
||||
:return: |
|
||||
bool:true-正常返回给大模型,false-结果不返回给大模型 |
|
||||
str:执行的指令 |
|
||||
str:执行指令的结果 |
|
||||
''' |
|
||||
ext_params = self.create_extparams() |
|
||||
# 定义允许的内置函数集合 |
|
||||
allowed_builtins = { |
|
||||
"abs": abs, |
|
||||
"all": all, |
|
||||
"any": any, |
|
||||
"bool": bool, |
|
||||
"chr": chr, |
|
||||
"dict": dict, |
|
||||
"float": float, |
|
||||
"int": int, |
|
||||
"len": len, |
|
||||
"list": list, |
|
||||
"max": max, |
|
||||
"min": min, |
|
||||
"print": print, |
|
||||
"range": range, |
|
||||
"set": set, |
|
||||
"str": str, |
|
||||
"sum": sum, |
|
||||
"type": type, |
|
||||
# 根据需要可以添加其他安全的内置函数 |
|
||||
} |
|
||||
# 第一步:验证指令合法性 |
|
||||
instruction,time_out = self.validate_instruction(instruction_old) |
|
||||
if not instruction: |
|
||||
return False, instruction_old, "该指令暂不执行!","",ext_params |
|
||||
# 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#? |
|
||||
|
|
||||
# 第二步:执行指令 |
|
||||
output = "" |
|
||||
# 构造安全的全局命名空间,只包含我们允许的 __builtins__ |
|
||||
safe_globals = { |
|
||||
"__builtins__": allowed_builtins, |
|
||||
} |
|
||||
try: |
|
||||
# 编译代码 |
|
||||
code_obj = compile(instruction, filename="<dynamic>", mode="exec") |
|
||||
# 在限制环境中执行代码 |
|
||||
exec(code_obj, safe_globals) |
|
||||
except Exception as e: |
|
||||
print(f"执行动态代码时出错: {e}") |
|
||||
|
|
||||
|
|
||||
# 第三步:分析执行结果 |
|
||||
analysis = self.analyze_result(output, instruction,"","") |
|
||||
# 指令和结果入数据库 |
|
||||
# ? |
|
||||
if not analysis: # analysis为“” 不提交LLM |
|
||||
return False, instruction, analysis,"",ext_params |
|
||||
return True, instruction, analysis,"",ext_params |
|
||||
|
|
||||
def analyze_result(self, result,instruction,stderr,stdout): |
|
||||
#指令结果分析 |
|
||||
return result |
|
||||
|
|
||||
if __name__ == "__main__": |
|
||||
llm_code = """ |
|
||||
def run_test(): |
|
||||
return 'Penetration test executed successfully!' |
|
||||
""" |
|
@ -0,0 +1,129 @@ |
|||||
|
#python代码动态执行 |
||||
|
import ast |
||||
|
import subprocess |
||||
|
import json |
||||
|
import builtins |
||||
|
import re |
||||
|
from tools.ToolBase import ToolBase |
||||
|
|
||||
|
class PythoncodeTool(ToolBase): |
||||
|
|
||||
|
def is_safe_code(self,code): |
||||
|
# List of high-risk functions to block (can be adjusted based on requirements) |
||||
|
HIGH_RISK_FUNCTIONS = ['eval', 'exec', 'os.system', 'subprocess.call', 'subprocess.Popen'] |
||||
|
|
||||
|
"""Check if the code contains high-risk function calls.""" |
||||
|
try: |
||||
|
tree = ast.parse(code) |
||||
|
for node in ast.walk(tree): |
||||
|
if isinstance(node, ast.Call): |
||||
|
if isinstance(node.func, ast.Name) and node.func.id in HIGH_RISK_FUNCTIONS: |
||||
|
return False |
||||
|
elif isinstance(node.func, ast.Attribute) and node.func.attr in HIGH_RISK_FUNCTIONS: |
||||
|
return False |
||||
|
return True |
||||
|
except SyntaxError: |
||||
|
return False |
||||
|
|
||||
|
def validate_instruction(self, instruction): |
||||
|
#指令过滤 |
||||
|
timeout = 0 |
||||
|
instr = instruction.replace("python_code ","") |
||||
|
# Safety check |
||||
|
if not self.is_safe_code(instr): |
||||
|
return "", timeout |
||||
|
return instr,timeout |
||||
|
|
||||
|
def safe_import(self,name,*args,**kwargs): |
||||
|
ALLOWED_MODULES = ['subprocess', 'json','re'] |
||||
|
if name not in ALLOWED_MODULES: |
||||
|
raise ImportError(f"Import of '{name}' is not allowed") |
||||
|
return builtins.__import__(name, *args, **kwargs) |
||||
|
|
||||
|
def execute_instruction(self, instruction_old): |
||||
|
''' |
||||
|
执行指令:验证合法性 -> 执行 -> 分析结果 |
||||
|
:param instruction_old: |
||||
|
:return: |
||||
|
bool:true-正常返回给大模型,false-结果不返回给大模型 |
||||
|
str:执行的指令 |
||||
|
str:执行指令的结果 |
||||
|
''' |
||||
|
ext_params = self.create_extparams() |
||||
|
|
||||
|
# 定义允许的内置函数集合 --白名单 |
||||
|
allowed_builtins = { |
||||
|
'__import__': builtins.__import__, |
||||
|
"abs": abs, |
||||
|
"all": all, |
||||
|
"any": any, |
||||
|
"bool": bool, |
||||
|
"chr": chr, |
||||
|
"dict": dict, |
||||
|
"float": float, |
||||
|
"int": int, |
||||
|
"len": len, |
||||
|
"list": list, |
||||
|
"max": max, |
||||
|
"min": min, |
||||
|
"print": print, |
||||
|
"range": range, |
||||
|
"set": set, |
||||
|
"str": str, |
||||
|
"sum": sum, |
||||
|
"type": type, |
||||
|
'open':open, |
||||
|
'Exception':Exception, |
||||
|
# 根据需要可以添加其他安全的内置函数 |
||||
|
} |
||||
|
# 第一步:验证指令合法性 |
||||
|
instruction,time_out = self.validate_instruction(instruction_old) |
||||
|
if not instruction: |
||||
|
return False, instruction_old, "该指令暂不执行!","",ext_params |
||||
|
# 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#? |
||||
|
|
||||
|
# 第二步:执行指令 |
||||
|
output = "" |
||||
|
try: |
||||
|
# 构造安全的全局命名空间,只包含我们允许的 __builtins__ |
||||
|
# 虽然动态代码中包含了import subprocess,但是还是需要在全局命名空间中添加subprocess这些库 |
||||
|
# 正常情况应该是不需要的,后续再研究 |
||||
|
safe_globals = {"__builtins__": allowed_builtins, |
||||
|
'subprocess':subprocess, |
||||
|
'json':json, |
||||
|
're':re,} |
||||
|
safe_locals = {} #不需要预设局部参数 |
||||
|
# 在限制环境中执行代码 |
||||
|
exec(instruction, safe_globals,safe_locals) |
||||
|
# Check if check_samba_vuln is defined |
||||
|
if 'dynamic_fun' not in safe_locals: |
||||
|
analysis = "Function dynamic_fun() is not defined" |
||||
|
ext_params['is_use'] = True |
||||
|
return True,instruction,analysis,analysis,ext_params |
||||
|
# Get the function and call it |
||||
|
dynamic_fun = safe_locals['dynamic_fun'] |
||||
|
status, tmpout = dynamic_fun() |
||||
|
output = f"status:{status},output:{tmpout}" |
||||
|
except Exception as e: |
||||
|
analysis = f"执行动态代码时出错: {str(e)}" |
||||
|
ext_params['is_use'] = True |
||||
|
return True,instruction,analysis,analysis,ext_params |
||||
|
|
||||
|
|
||||
|
# 第三步:分析执行结果 |
||||
|
analysis = self.analyze_result(output, instruction,"","") |
||||
|
# 指令和结果入数据库 |
||||
|
# ? |
||||
|
if not analysis: # analysis为“” 不提交LLM |
||||
|
return False, instruction, analysis,"",ext_params |
||||
|
return True, instruction, analysis,"",ext_params |
||||
|
|
||||
|
def analyze_result(self, result,instruction,stderr,stdout): |
||||
|
#指令结果分析 |
||||
|
return result |
||||
|
|
||||
|
if __name__ == "__main__": |
||||
|
llm_code = """ |
||||
|
def run_test(): |
||||
|
return 'Penetration test executed successfully!' |
||||
|
""" |
@ -1,29 +1,130 @@ |
|||||
from tools.ToolBase import ToolBase |
from tools.ToolBase import ToolBase |
||||
import re |
import re |
||||
import json |
import json |
||||
|
import subprocess |
||||
|
import os |
||||
|
import shutil |
||||
|
from pathlib import Path |
||||
class SearchsploitTool(ToolBase): |
class SearchsploitTool(ToolBase): |
||||
def validate_instruction(self, instruction): |
def validate_instruction(self, instruction): |
||||
#指令过滤 |
#指令过滤 |
||||
timeout = 0 |
timeout = 0 |
||||
|
if "-m " in instruction: # 下载利用代码 |
||||
|
timeout = 60 |
||||
return instruction,timeout |
return instruction,timeout |
||||
|
|
||||
def analyze_result(self, result,instruction,stderr,stdout): |
def analyze_result(self, result,instruction,stderr,stdout): |
||||
|
#获取当前路径 |
||||
|
cur_path = Path(__file__).resolve().parent |
||||
|
payload_dir = cur_path / "../payload" |
||||
|
|
||||
|
#if instruction(result,bytes): |
||||
|
if type(result) is bytes: |
||||
|
result = result.decode('utf-8',errors='ignore') |
||||
|
|
||||
"""去除 ANSI 颜色码""" |
"""去除 ANSI 颜色码""" |
||||
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') |
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') |
||||
clean_result = ansi_escape.sub('', result) |
clean_result = ansi_escape.sub('', result) |
||||
#指令结果分析 |
# 指令结果分析 |
||||
lines = clean_result.split("\n") |
if "-m " in instruction:#下载利用代码 |
||||
exploits = [] |
if "Copied to:" in clean_result or "cp: overwrite " in clean_result: |
||||
|
result = "下载完成" |
||||
for line in lines: |
else: |
||||
match = re.match(r"(.+?)\s+\|\s+(\S+)", line) |
result = "下载失败" |
||||
if match: |
else: #目前只遇到两种searchsploit命令格式: |
||||
title = match.group(1).strip() |
lines = clean_result.split("\n") |
||||
path = match.group(2).strip() |
exploits = [] |
||||
exploits.append({"title": title, "path": path}) |
|
||||
|
for line in lines: |
||||
if len(exploits) > 0: |
match = re.match(r"(.+?)\s+\|\s+(\S+)", line) |
||||
result = json.dumps(exploits) #需要转化成字符串-必须 |
if match: |
||||
else: |
title = match.group(1).strip() |
||||
result = "没有检索到漏洞利用脚本" |
path = match.group(2).strip() |
||||
|
if "Path" not in path: |
||||
|
#下载渗透脚本--进一步推进 |
||||
|
ext_str = self.do_ext(path,payload_dir) |
||||
|
exploits.append({"title": title, "path": path,"补充信息":ext_str}) |
||||
|
|
||||
|
if len(exploits) > 0: |
||||
|
result = json.dumps(exploits,ensure_ascii=False) #输出原始的中文字符 |
||||
|
else: |
||||
|
result = "没有检索到漏洞利用脚本" |
||||
return result |
return result |
||||
|
|
||||
|
def find_file_in_directory(self,target_file, dir_path, case_sensitive=True, recursive=False): |
||||
|
""" |
||||
|
在指定目录中查找文件是否存在 |
||||
|
:param target_file: 要查找的目标文件名(含扩展名) |
||||
|
:param search_dir: 要搜索的目录路径 |
||||
|
:param case_sensitive: 是否区分大小写(默认True) |
||||
|
:param recursive: 是否递归搜索子目录(默认False) |
||||
|
:return: (bool, str) 是否存在,完整路径(如果找到) |
||||
|
""" |
||||
|
try: |
||||
|
#dir_path = Path(search_dir) |
||||
|
|
||||
|
# 验证目录是否存在 |
||||
|
if not dir_path.is_dir(): |
||||
|
return False, None |
||||
|
|
||||
|
# 根据是否递归选择遍历方式 |
||||
|
iterator = dir_path.rglob('*') if recursive else dir_path.iterdir() |
||||
|
|
||||
|
for entry in iterator: |
||||
|
# 跳过目录只处理文件 |
||||
|
if entry.is_file(): |
||||
|
# 根据是否区分大小写进行比较 |
||||
|
if case_sensitive: |
||||
|
match = entry.name == target_file |
||||
|
else: |
||||
|
match = entry.name.lower() == target_file.lower() |
||||
|
|
||||
|
if match: |
||||
|
return True, str(entry.resolve()) |
||||
|
|
||||
|
return False, None |
||||
|
|
||||
|
except Exception as e: |
||||
|
print(f"查找出错: {str(e)}") |
||||
|
return False, None |
||||
|
|
||||
|
def do_download(self,filepath,dirpath): |
||||
|
''' |
||||
|
下载渗透脚本 |
||||
|
:return: |
||||
|
''' |
||||
|
filename = filepath.split("/")[-1] |
||||
|
if ".py" in filename: |
||||
|
#对应payload库中是否存在该脚本 |
||||
|
bfind,_ = self.find_file_in_directory(filename,dirpath) |
||||
|
if bfind: |
||||
|
return True |
||||
|
else:#下载 |
||||
|
instruction = f"searchsploit -m {filepath}" |
||||
|
try: |
||||
|
subprocess.run(instruction, shell=True, check=True,timeout=60) |
||||
|
print("命令执行成功,文件下载完成。") |
||||
|
except subprocess.CalledProcessError as e: #超时暂不处理--后续补充 |
||||
|
print(f"命令执行失败: {e}") |
||||
|
return False |
||||
|
|
||||
|
if not os.path.exists(filename): |
||||
|
return False |
||||
|
# 移动文件到目标目录 |
||||
|
try: |
||||
|
shutil.move(filename, os.path.join(dirpath, filename)) |
||||
|
print(f"文件已成功移动到 {dirpath}") |
||||
|
return True |
||||
|
except Exception as e: |
||||
|
print(f"移动文件失败: {e}") |
||||
|
return False |
||||
|
else: #暂时只利用python脚本 |
||||
|
return False |
||||
|
|
||||
|
|
||||
|
def do_ext(self,filepath,payload_dir) -> str: |
||||
|
bdownload = self.do_download(filepath,payload_dir) |
||||
|
if bdownload: |
||||
|
return "该渗透脚本已经复制到当前路径。" |
||||
|
else: |
||||
|
return "暂时不利用该渗透脚本。" |
||||
|
Loading…
Reference in new issue