import requests import shlex import re import json from bs4 import BeautifulSoup from tools.ToolBase import ToolBase class CurlTool(ToolBase): # def __init__(self): # super.__init__() # self.headers = {} # self.url = None # self.verify_ssl = True def get_time_out(self): return 60*5 def validate_instruction(self, instruction_old): #instruction = instruction_old #指令过滤 timeout = 0 #curl指令遇到不返回的情况 curl --path-as-is -i http://192.168.204.137:8180/webapps/../conf/tomcat-users.xml #添加-i 返回信息头 if 'base64 -d' in instruction_old: return instruction_old # 如果指令中含有管道,将第一部分(即 curl 命令)单独处理,再拼接回去 parts = instruction_old.split('|') #grep情况处理 first_cmd = parts[0].strip() # 第一条命令 # 分割成单词列表便于处理参数 curl_parts = first_cmd.split() # 如果第一条命令是 curl,则进行处理 if curl_parts and curl_parts[0] == "curl": #只是处理了第一个curl # 判断是否需要添加包含响应头的选项:若没有 -I、-i 或 --include,则在 URL前插入 -i if "-I" not in curl_parts and '-i' not in curl_parts and '--include' not in curl_parts: # 查找以 http:// 或 https:// 开头的 URL 参数所在位置 url_index = next((i for i, p in enumerate(curl_parts) if p.startswith('http://') or p.startswith('https://')), None) if url_index is not None: curl_parts.insert(url_index, '-i') else: curl_parts.append('-i') # 判断是否已经有 --max-time 参数 if not any(p.startswith("--max-time") for p in curl_parts): curl_parts.append("--max-time") curl_parts.append(str(self.get_time_out())) #添加超时时间 # 将第一部分命令重组 parts[0] = ' '.join(curl_parts) # 将所有部分重新用 | 拼接起来(保留管道符号两边的空格) final_instruction = ' | '.join(part.strip() for part in parts) return final_instruction, timeout def get_ssl_info(self,stderr,stdout): # -------------------------- # 解释信息的安全意义: # # - 如果证书的 Common Name 与请求的 IP 不匹配(如这里的 'crnn.f3322.net'), # 则可能表明服务隐藏了真实身份或存在配置错误,这在后续攻击中可以作为信息收集的一部分。 # # - TLS 连接信息(如 TLS1.3 和加密套件)有助于判断是否存在弱加密或旧版协议问题。 # # - HTTP 状态和 Content-Type 帮助确认返回的是一个合法的 Web 服务, # 而 HTML Title 暗示了实际运行的是 SoftEther VPN Server,可能存在默认配置或已知漏洞。 # # 这些信息可以作为进一步探测、漏洞验证和渗透测试的依据。 # -------------------------- # 从 stderr 中提取证书及 TLS 信息 # 提取 Common Name(CN) cn_match = re.search(r"common name:\s*([^\s]+)", stderr, re.IGNORECASE) cert_cn = cn_match.group(1) if cn_match else "N/A" # 提取 TLS 连接信息(例如 TLS1.3 及加密套件) tls_match = re.search(r"SSL connection using\s+([^\n]+)", stderr, re.IGNORECASE) tls_info = tls_match.group(1).strip() if tls_match else "N/A" # 提取 Issuer 信息 issuer_match = re.search(r"issuer:\s*(.+)", stderr, re.IGNORECASE) issuer_info = issuer_match.group(1).strip() if issuer_match else "N/A" # 从 stdout 中提取 HTTP 响应头和 HTML 标题 # 分离 HTTP 头部和 body(假设头部与 body 用两个换行符分隔) parts = stdout.split("\n\n", 1) headers_part = parts[0] body_part = parts[1] if len(parts) > 1 else "" # 从头部中提取状态行和部分常见头部信息 lines = headers_part.splitlines() http_status = lines[0] if lines else "N/A" content_type_match = re.search(r"Content-Type:\s*(.*)", headers_part, re.IGNORECASE) content_type = content_type_match.group(1).strip() if content_type_match else "N/A" # 使用 BeautifulSoup 提取 HTML soup = BeautifulSoup(body_part, "html.parser") html_title = soup.title.string.strip() if soup.title and soup.title.string else "N/A" # -------------------------- # 输出提取的信息 # print("=== 提取的有用信息 ===") # print("HTTP 状态行:", http_status) # print("Content-Type:", content_type) # print("HTML Title:", html_title) # print("TLS 连接信息:", tls_info) # print("证书 Common Name:", cert_cn) # print("证书 Issuer:", issuer_info) result = f"HTTP 状态行:{http_status},Content-Type:{content_type},HTML Title:{html_title},TLS 连接信息:{tls_info},证书 Common Name:{cert_cn},证书 Issuer:{issuer_info}" return result def get_info_curl(self,instruction,stdout,stderr): info = {} # 处理 stdout: 拆分响应头与正文(假设用空行分隔) parts = re.split(r'\r?\n\r?\n', stdout, maxsplit=1) if len(parts) == 2: headers_str, body = parts else: # 如果没有拆分成功,可能 stdout 中只有正文,则从 stderr 尝试提取 HTTP 状态行 headers_str = "" body = stdout # 如果没有在 stdout 中找到头信息,则尝试从 stderr 中提取(部分信息可能在 stderr 中) if not headers_str: header_lines = stderr.splitlines() else: header_lines = headers_str.splitlines() #status_code if header_lines: status_line = header_lines[0] status_match = re.search(r'HTTP/\d+\.\d+\s+(\d+)', status_line) info['status_code'] = status_match.group(1) if status_match else "Unknown" else: info['status_code'] = "No headers found" #Server m = re.search(r'^Server:\s*(.+)$', headers_str, re.MULTILINE) if m: info["server"] = m.group(1).strip() #content-type,content-length content_type = "Not found" content_length = "Not found" for line in header_lines: if line.lower().startswith("content-type:"): info['content-type'] = line.split(":", 1)[1].strip() elif line.lower().startswith("content-length:"): info['content-length'] = line.split(":", 1)[1].strip() info.setdefault('content-type', "Not found") info.setdefault('content-length', "Not found") # 如果内容为 HTML,则使用 BeautifulSoup 提取 <title> 标签内容 if "html" in info['content-type'].lower(): try: soup = BeautifulSoup(body, "html.parser") if soup.title and soup.title.string: info['html_title'] = soup.title.string.strip() else: info['html_title'] = "Not found" except Exception as e: info['html_title'] = f"Error: {e}" else: info['html_title'] = "N/A" #------------正文部分解析------------ if "phpinfo.php" in instruction: info["configurations"] = {} info["sensitive_info"] = {} # 提取PHP版本信息,可以尝试从phpinfo表格中提取 m = re.search(r'PHP Version\s*</th>\s*<td[^>]*>\s*([\d.]+)\s*</td>', body, re.IGNORECASE) if m: info["php_version"] = m.group(1).strip() else: # 备用方案:在页面中查找 "PHP Version" 后面的数字 m = re.search(r'PHP\s*Version\s*([\d.]+)', body, re.IGNORECASE) if m: info["php_version"] = m.group(1).strip() # 提取配置信息(如allow_url_include, display_errors, file_uploads, open_basedir) configs = ["allow_url_include", "display_errors", "file_uploads", "open_basedir"] for key in configs: # 尝试匹配HTML表格形式:<td>key</td><td>value</td> regex = re.compile(r'<td[^>]*>\s*' + re.escape(key) + r'\s*</td>\s*<td[^>]*>\s*([^<]+?)\s*</td>', re.IGNORECASE) m = regex.search(body) if m: info["configurations"][key] = m.group(1).strip() # 提取敏感信息,这里以MYSQL_PASSWORD为例 sensitive_keys = ["MYSQL_PASSWORD"] for key in sensitive_keys: regex = re.compile(r'<td[^>]*>\s*' + re.escape(key) + r'\s*</td>\s*<td[^>]*>\s*([^<]+?)\s*</td>', re.IGNORECASE) m = regex.search(body) if m: info["sensitive_info"][key] = m.group(1).strip() elif "phpMyAdmin" in instruction: info["security_info"] = {} info["login_info"] = {} # 查找登录表单中用户名、密码字段(例如 name="pma_username" 和 name="pma_password") m = re.search(r'<input[^>]+name=["\'](pma_username)["\']', body, re.IGNORECASE) if m: info["login_info"]["username_field"] = m.group(1).strip() m = re.search(r'<input[^>]+name=["\'](pma_password)["\']', body, re.IGNORECASE) if m: info["login_info"]["password_field"] = m.group(1).strip() #安全信息 # csrf_protection:尝试查找隐藏域中是否存在 csrf token(例如 name="csrf_token" 或 "token") m = re.search(r'<input[^>]+name=["\'](csrf_token|token)["\']', stdout, re.IGNORECASE) info["security_info"]["csrf_protection"] = True if m else False # httponly_cookie:从响应头中查找 Set-Cookie 行中是否包含 HttpOnly m = re.search(r'Set-Cookie:.*HttpOnly', stdout, re.IGNORECASE) info["security_info"]["httponly_cookie"] = True if m else False # secure_cookie:从响应头中查找 Set-Cookie 行中是否包含 Secure m = re.search(r'Set-Cookie:.*Secure', stdout, re.IGNORECASE) info["security_info"]["secure_cookie"] = True if m else False else: # #info['body_snippet'] = body[:200] # 前500字符 if stderr: # 处理 stderr 中的 TLS/证书信息:只提取包含关键字的行 tls_info_lines = [] cert_info_lines = [] for line in stderr.splitlines(): # 过滤与 TLS/SSL 握手、证书相关的信息 if "SSL connection using" in line or "TLS" in line: tls_info_lines.append(line.strip()) if "certificate" in line.lower(): cert_info_lines.append(line.strip()) info['tls_info'] = tls_info_lines if tls_info_lines else "Not found" info['certificate_info'] = cert_info_lines if cert_info_lines else "Not found" #转换成字符串 result = json.dumps(info,ensure_ascii=False) result = result+"\n结果已经被格式化提取,若需要查看其他内容,可以添加grep参数后返回指令。" #print(result) return result def analyze_result(self, result,instruction,stderr,stdout): #指令结果分析 if("-H "in instruction and "Host:" in instruction):# 基于证书域名进行跨站劫持 if "HTTP/1.1 200" in result: # and "<urlset" in result: #result = "存在问题:成功通过 Host 头获取 sitemap 内容。" result = "返回200,存在问题" #要进一步分析返回内容 else: result = "未检测到问题" elif("-H "in instruction and "Range:" in instruction): if "HTTP/1.1 200 OK" in result: result = "正常返回了200 OK页面" elif "HTTP/1.1 416" in result: if "Content-Range: bytes" in result: result = "返回 416 且 Content-Range 正常:服务器对 Range 请求处理正确。" elif "Content-Range:" in result: result = "返回 416 但缺少或异常 Content-Range 头" else:#"返回其他状态码(", response.status_code, "):需要进一步分析。" result = "服务器对Range请求处理正确" elif("-H "in instruction and "Referer:" in instruction): if "HTTP/1.1 200" in result: result="该漏洞无法利用" else: #保留原结果 pass elif("resource=config.php" in instruction): if "base64: 无效的输入" in result: result="该漏洞无法利用" elif("-kv https://" in instruction or "-vk https://" in instruction): result = self.get_ssl_info(stderr,stdout) elif("grep " in instruction or " -T " in instruction or "Date:" in instruction): return result # elif("-X POST " in instruction): # result = self.get_info_curl(instruction,stdout,stderr) # elif("-v " in instruction): #curl -v http://192.168.204.137:8180/manager/html --user admin:admin 常规解析curl返回内容 # result = self.get_info_curl(instruction,stdout,stderr) else: result = self.get_info_curl(instruction,stdout,stderr) return result if __name__ =="__main__": import subprocess CT = CurlTool() strinstruction = "curl -kv -X POST -d \"username=admin&password=admin\" https://58.216.217.70/vpn/index.html --connect-timeout 10" instruction,time_out = CT.validate_instruction(strinstruction) if instruction: result = subprocess.run(instruction, shell=True, capture_output=True, text=True) res = CT.analyze_result(result.stdout,instruction,result.stderr,result.stdout) print(res)