|
|
|
'''
|
|
|
|
# SMTP命令注入测试(手动交互)
|
|
|
|
telnet haitutech.cn 25
|
|
|
|
EHLO attacker.com
|
|
|
|
MAIL FROM: <admin@haitutech.cn>
|
|
|
|
RCPT TO: <target@example.com> NOTIFY=SUCCESS,FAILURE
|
|
|
|
'''
|
|
|
|
import socket
|
|
|
|
import subprocess
|
|
|
|
from tools.ToolBase import ToolBase
|
|
|
|
|
|
|
|
class TelnetTool(ToolBase):
|
|
|
|
def validate_instruction(self, instruction):
|
|
|
|
#指令过滤
|
|
|
|
timeout = 0
|
|
|
|
return instruction,timeout
|
|
|
|
|
|
|
|
def analyze_result(self, result,instruction,stderr,stdout):
|
|
|
|
#指令结果分析
|
|
|
|
return result
|
|
|
|
|
|
|
|
def smtp_injection_test(self,cmd_str: str,host,port):
|
|
|
|
"""
|
|
|
|
测试 SMTP 命令注入漏洞。
|
|
|
|
参数:
|
|
|
|
cmd_str: 多行字符串,其中第一行为类似 "telnet haitutech.cn 25"
|
|
|
|
后续行为具体的 SMTP 命令,例如:
|
|
|
|
|
|
|
|
EHLO attacker.com
|
|
|
|
MAIL FROM: <admin@haitutech.cn>
|
|
|
|
RCPT TO: <target@example.com> NOTIFY=SUCCESS,FAILURE
|
|
|
|
|
|
|
|
返回:
|
|
|
|
如果 RCPT 命令响应以 250 开头,认为可能存在漏洞(返回 True),
|
|
|
|
否则认为安全或不支持该注入(返回 False)。
|
|
|
|
"""
|
|
|
|
last_response = ""
|
|
|
|
lines = cmd_str.strip().splitlines()
|
|
|
|
try:
|
|
|
|
# 建立 TCP 连接
|
|
|
|
s = socket.create_connection((host, port), timeout=10)
|
|
|
|
except Exception as e:
|
|
|
|
#print("连接失败:", e)
|
|
|
|
return f"连接失败:{e}"
|
|
|
|
|
|
|
|
# 读取 SMTP Banner
|
|
|
|
try:
|
|
|
|
banner = s.recv(1024).decode('utf-8', errors='ignore')
|
|
|
|
except Exception as e:
|
|
|
|
#print("读取 Banner 失败:", e)
|
|
|
|
s.close()
|
|
|
|
return f"读取 Banner 失败:{e}"
|
|
|
|
|
|
|
|
# 从第二行开始发送 SMTP 命令
|
|
|
|
for line in lines[1:]:
|
|
|
|
cmd = line.strip()
|
|
|
|
if not cmd:
|
|
|
|
continue
|
|
|
|
try:
|
|
|
|
s.send((cmd + "\r\n").encode())
|
|
|
|
except Exception as e:
|
|
|
|
s.close()
|
|
|
|
return f"发送命令失败:{e}"
|
|
|
|
try:
|
|
|
|
response = s.recv(1024).decode('utf-8', errors='ignore')
|
|
|
|
#print("收到响应:", response.strip())
|
|
|
|
last_response = response.strip()
|
|
|
|
except Exception as e:
|
|
|
|
#print("读取响应失败:", e)
|
|
|
|
s.close()
|
|
|
|
return f"读取响应失败:{e}"
|
|
|
|
s.close()
|
|
|
|
# 以 RCPT 命令的响应作为判断依据
|
|
|
|
# 注意:对于支持 DSN 的服务器,250 可能是合法的响应,
|
|
|
|
# 但在不支持的情况下,若返回 250 则可能说明命令注入导致了不正常的处理。
|
|
|
|
if last_response.startswith("250"):
|
|
|
|
return "RCPT 命令返回 250,可能存在 SMTP 命令注入风险。"
|
|
|
|
else:
|
|
|
|
return "RCPT 命令返回非 250 响应,暂未发现注入风险。"
|
|
|
|
|
|
|
|
def execute_instruction(self, instruction_old):
|
|
|
|
'''
|
|
|
|
执行指令:验证合法性 -> 执行 -> 分析结果
|
|
|
|
:param instruction_old:
|
|
|
|
:return:
|
|
|
|
bool:true-正常返回给大模型,false-结果不返回给大模型
|
|
|
|
str:执行的指令
|
|
|
|
str:执行指令的结果
|
|
|
|
'''
|
|
|
|
ext_params = self.create_extparams()
|
|
|
|
# 第一步:验证指令合法性
|
|
|
|
instruction,timeout = self.validate_instruction(instruction_old)
|
|
|
|
if not instruction:
|
|
|
|
return False,instruction_old,"该指令暂不执行!","",ext_params
|
|
|
|
#过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#?
|
|
|
|
|
|
|
|
# 第二步:执行指令
|
|
|
|
lines = instruction.strip().splitlines()
|
|
|
|
# 解析第一行,格式应为:telnet host port
|
|
|
|
parts = lines[0].strip().split()
|
|
|
|
if len(parts) < 3:
|
|
|
|
output = "第一行格式错误,预期格式: 'telnet host port'"
|
|
|
|
ext_params["is_user"] = True
|
|
|
|
return True,instruction,output,output,ext_params
|
|
|
|
host = parts[1]
|
|
|
|
try:
|
|
|
|
port = int(parts[2])
|
|
|
|
except ValueError:
|
|
|
|
output = "端口转换失败"
|
|
|
|
ext_params["is_user"] = True
|
|
|
|
return True, instruction, output, output, ext_params
|
|
|
|
if port == 25:
|
|
|
|
output = self.smtp_injection_test(instruction,host,port)
|
|
|
|
else:#其他默认subprocess执行
|
|
|
|
try:
|
|
|
|
# 指令过滤
|
|
|
|
if "<<<" in instruction:
|
|
|
|
instruction = f"bash -c \"{instruction}\""
|
|
|
|
|
|
|
|
if timeout == 0:
|
|
|
|
result = subprocess.run(instruction, shell=True, capture_output=True, text=True)
|
|
|
|
elif timeout > 0:
|
|
|
|
result = subprocess.run(instruction, shell=True, capture_output=True, text=True, timeout=timeout)
|
|
|
|
else:
|
|
|
|
print("timeout参数错误,需要自查程序逻辑!")
|
|
|
|
stderr = result.stderr
|
|
|
|
stdout = result.stdout
|
|
|
|
except subprocess.TimeoutExpired as e:
|
|
|
|
stdout = e.stdout if e.stdout is not None else ""
|
|
|
|
stderr = e.stderr if e.stderr is not None else ""
|
|
|
|
ext_params.is_user = True # 对于超时的也需要人工进行确认,是否是预期的超时
|
|
|
|
except Exception as e:
|
|
|
|
ext_params.is_user = True
|
|
|
|
return False, instruction, f"执行失败:{str(e)}", "", ext_params # 执行失败,提交给人工确认指令的正确性
|
|
|
|
output = stdout
|
|
|
|
if stderr:
|
|
|
|
output += stderr
|
|
|
|
if isinstance(output, bytes): # 若是bytes则转成str
|
|
|
|
output = output.decode('utf-8', errors='ignore')
|
|
|
|
|
|
|
|
# 第三步:分析执行结果
|
|
|
|
analysis = self.analyze_result(output,instruction,"","")
|
|
|
|
if not analysis: #analysis为“” 不提交LLM
|
|
|
|
ext_params.is_user = True
|
|
|
|
return False,instruction,analysis,output,ext_params
|
|
|
|
return True,instruction, analysis,output,ext_params
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
pass
|