You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

286 lines
13 KiB

import requests
import shlex
import re
import json
from bs4 import BeautifulSoup
from tools.ToolBase import ToolBase
class CurlTool(ToolBase):
# def __init__(self):
# super.__init__()
# self.headers = {}
# self.url = None
# self.verify_ssl = True
#解析指令到requests
def parse_curl_to_requests(self,curl_command):
# Split command preserving quoted strings
parts = shlex.split(curl_command)
if parts[0] != 'curl':
raise ValueError("Command must start with 'curl'")
# Parse curl flags and arguments
i = 1
while i < len(parts):
arg = parts[i]
if arg == '-k' or arg == '--insecure':
self.verify_ssl = False
i += 1
elif arg == '-s' or arg == '--silent':
# Silencing isn't needed for requests, just skip
i += 1
elif arg == '-H' or arg == '--header':
if i + 1 >= len(parts):
raise ValueError("Missing header value after -H")
header_str = parts[i + 1]
header_name, header_value = header_str.split(':', 1)
self.headers[header_name.strip()] = header_value.strip()
i += 2
elif not arg.startswith('-'):
if self.url is None:
self.url = arg
i += 1
else:
i += 1
if self.url is None:
raise ValueError("No URL found in curl command")
#return url, headers, verify_ssl
def validate_instruction(self, instruction_old):
#instruction = instruction_old
#指令过滤
timeout = 0
#添加-i 返回信息头
parts = instruction_old.split()
if 'base64 -d' in instruction_old:
return instruction_old
if '-i' not in parts and '--include' not in parts:
url_index = next((i for i, p in enumerate(parts) if p.startswith(('http://', 'https://'))), None)
if url_index is not None:
# 在URL前插入 -i 参数‌:ml-citation{ref="1" data="citationList"}
parts.insert(url_index, '-i')
else:
# 无URL时直接在末尾添加
parts.append('-i')
return ' '.join(parts),timeout
# def execute_instruction(self, instruction_old):
# '''
# 执行指令:验证合法性 -> 执行 -> 分析结果
# :param instruction_old:
# :return:
# bool:true-正常返回给大模型,false-结果不返回给大模型
# str:执行的指令
# str:执行指令的结果
# '''
#
# # 第一步:验证指令合法性
# instruction = self.validate_instruction(instruction_old)
# if not instruction:
# return False,instruction_old,"该指令暂不执行!"
#
# # 第二步:执行指令 --- 基于request使用
# #print(f"执行指令:{instruction}")
# output = ""
#
# # 第三步:分析执行结果
# analysis = self.analyze_result(output,instruction)
# #指令和结果入数据库
# #?
# if not analysis: #analysis为“” 不提交LLM
# return False,instruction,analysis
# return True,instruction, analysis
def get_ssl_info(self,stderr,stdout):
# --------------------------
# 解释信息的安全意义:
#
# - 如果证书的 Common Name 与请求的 IP 不匹配(如这里的 'crnn.f3322.net'),
# 则可能表明服务隐藏了真实身份或存在配置错误,这在后续攻击中可以作为信息收集的一部分。
#
# - TLS 连接信息(如 TLS1.3 和加密套件)有助于判断是否存在弱加密或旧版协议问题。
#
# - HTTP 状态和 Content-Type 帮助确认返回的是一个合法的 Web 服务,
# 而 HTML Title 暗示了实际运行的是 SoftEther VPN Server,可能存在默认配置或已知漏洞。
#
# 这些信息可以作为进一步探测、漏洞验证和渗透测试的依据。
# --------------------------
# 从 stderr 中提取证书及 TLS 信息
# 提取 Common Name(CN)
cn_match = re.search(r"common name:\s*([^\s]+)", stderr, re.IGNORECASE)
cert_cn = cn_match.group(1) if cn_match else "N/A"
# 提取 TLS 连接信息(例如 TLS1.3 及加密套件)
tls_match = re.search(r"SSL connection using\s+([^\n]+)", stderr, re.IGNORECASE)
tls_info = tls_match.group(1).strip() if tls_match else "N/A"
# 提取 Issuer 信息
issuer_match = re.search(r"issuer:\s*(.+)", stderr, re.IGNORECASE)
issuer_info = issuer_match.group(1).strip() if issuer_match else "N/A"
# 从 stdout 中提取 HTTP 响应头和 HTML 标题
# 分离 HTTP 头部和 body(假设头部与 body 用两个换行符分隔)
parts = stdout.split("\n\n", 1)
headers_part = parts[0]
body_part = parts[1] if len(parts) > 1 else ""
# 从头部中提取状态行和部分常见头部信息
lines = headers_part.splitlines()
http_status = lines[0] if lines else "N/A"
content_type_match = re.search(r"Content-Type:\s*(.*)", headers_part, re.IGNORECASE)
content_type = content_type_match.group(1).strip() if content_type_match else "N/A"
# 使用 BeautifulSoup 提取 HTML <title>
soup = BeautifulSoup(body_part, "html.parser")
html_title = soup.title.string.strip() if soup.title and soup.title.string else "N/A"
# --------------------------
# 输出提取的信息
# print("=== 提取的有用信息 ===")
# print("HTTP 状态行:", http_status)
# print("Content-Type:", content_type)
# print("HTML Title:", html_title)
# print("TLS 连接信息:", tls_info)
# print("证书 Common Name:", cert_cn)
# print("证书 Issuer:", issuer_info)
result = f"HTTP 状态行:{http_status},Content-Type:{content_type},HTML Title:{html_title},TLS 连接信息:{tls_info},证书 Common Name:{cert_cn},证书 Issuer:{issuer_info}"
return result
def get_info_xpost(self,stdout,stderr):
"""
从 subprocess.run 执行 curl 后的结果中提取关键信息:
- HTTP 状态码
- 常见响应头(Content-Type, Content-Length)
- HTML 页面标题(如果内容为 HTML)
- 返回正文的前200字符(body_snippet)
- TLS/证书相关信息(从详细调试信息 stderr 中提取)
对于未匹配到的信息,返回“Not found”或空字符串。
"""
info = {}
# 处理 stdout: 拆分响应头与正文(假设用空行分隔)
parts = re.split(r'\r?\n\r?\n', stdout, maxsplit=1)
#***************解析方式一
# headers_str = parts[0] if parts else ""
# body = parts[1] if len(parts) > 1 else ""
#
# # 提取 HTTP 状态码(从响应头第一行中获取,例如 "HTTP/1.1 202 OK")
# header_lines = headers_str.splitlines()
# ***************解析方式二
if len(parts) == 2:
headers_str, body = parts
else:
# 如果没有拆分成功,可能 stdout 中只有正文,则从 stderr 尝试提取 HTTP 状态行
headers_str = ""
body = stdout
# 如果没有在 stdout 中找到头信息,则尝试从 stderr 中提取(部分信息可能在 stderr 中)
if not headers_str:
header_lines = stderr.splitlines()
else:
header_lines = headers_str.splitlines()
#**************************
if header_lines:
status_line = header_lines[0]
status_match = re.search(r'HTTP/\d+\.\d+\s+(\d+)', status_line)
info['status_code'] = status_match.group(1) if status_match else "Unknown"
else:
info['status_code'] = "No headers found"
# 提取常见响应头
content_type = "Not found"
content_length = "Not found"
for line in header_lines:
if line.lower().startswith("content-type:"):
info['content_type'] = line.split(":", 1)[1].strip()
elif line.lower().startswith("content-length:"):
info['content_length'] = line.split(":", 1)[1].strip()
# 如果未匹配到,则设置默认值
info.setdefault('content_type', "Not found")
info.setdefault('content_length', "Not found")
# 如果内容为 HTML,则使用 BeautifulSoup 提取 <title> 标签内容
if "html" in info['content_type'].lower():
try:
soup = BeautifulSoup(body, "html.parser")
if soup.title and soup.title.string:
info['html_title'] = soup.title.string.strip()
else:
info['html_title'] = "Not found"
except Exception as e:
info['html_title'] = f"Error: {e}"
else:
info['html_title'] = "N/A"
# 保存部分正文内容,便于后续分析
info['body_snippet'] = body[:200] # 前500字符
# 处理 stderr 中的 TLS/证书信息:只提取包含关键字的行
tls_info_lines = []
cert_info_lines = []
for line in stderr.splitlines():
# 过滤与 TLS/SSL 握手、证书相关的信息
if "SSL connection using" in line or "TLS" in line:
tls_info_lines.append(line.strip())
if "certificate" in line.lower():
cert_info_lines.append(line.strip())
info['tls_info'] = tls_info_lines if tls_info_lines else "Not found"
info['certificate_info'] = cert_info_lines if cert_info_lines else "Not found"
# 可选:保留完整的 verbose 信息以便后续分析
#info['verbose'] = stderr
#转换成字符串
result = json.dumps(info)
return result
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析
if("-H "in instruction and "Host:" in instruction):# 基于证书域名进行跨站劫持
if "HTTP/1.1 200" in result: # and "<urlset" in result:
#result = "存在问题:成功通过 Host 头获取 sitemap 内容。"
result = "返回200,存在问题" #要进一步分析返回内容
else:
result = "未检测到问题"
elif("-H "in instruction and "Range:" in instruction):
if "HTTP/1.1 200 OK" in result:
result = "正常返回了200 OK页面"
elif "HTTP/1.1 416" in result:
if "Content-Range: bytes" in result:
result = "返回 416 且 Content-Range 正常:服务器对 Range 请求处理正确。"
elif "Content-Range:" in result:
result = "返回 416 但缺少或异常 Content-Range 头"
else:#"返回其他状态码(", response.status_code, "):需要进一步分析。"
result = "服务器对Range请求处理正确"
elif("-H "in instruction and "Referer:" in instruction):
if "HTTP/1.1 200" in result:
result="该漏洞无法利用"
else: #保留原结果
pass
elif("resource=config.php" in instruction):
if "base64: 无效的输入" in result:
result="该漏洞无法利用"
elif("Date:" in instruction): #保留原结果
print("")
elif("-kv https://" in instruction or "-vk https://" in instruction):
result = self.get_ssl_info(stderr,stdout)
elif("-X POST " in instruction):
result = self.get_info_xpost(stdout,stderr)
elif("-v " in instruction): #curl -v http://192.168.204.137:8180/manager/html --user admin:admin 常规解析curl返回内容
result = self.get_info_xpost(stdout,stderr)
else: #非处理命令的结果,暂时不提交LLM
result =""
return result
if __name__ =="__main__":
import subprocess
CT = CurlTool()
strinstruction = "curl -kv -X POST -d \"username=admin&password=admin\" https://58.216.217.70/vpn/index.html --connect-timeout 10"
instruction,time_out = CT.validate_instruction(strinstruction)
if instruction:
result = subprocess.run(instruction, shell=True, capture_output=True, text=True)
res = CT.analyze_result(result.stdout,instruction,result.stderr,result.stdout)
print(res)