''' # SMTP命令注入测试(手动交互) telnet haitutech.cn 25 EHLO attacker.com MAIL FROM: RCPT TO: NOTIFY=SUCCESS,FAILURE ''' import socket import subprocess from tools.ToolBase import ToolBase class TelnetTool(ToolBase): def validate_instruction(self, instruction): #指令过滤 timeout = 60 return instruction,timeout def analyze_result(self, result,instruction,stderr,stdout): #指令结果分析 return result def smtp_injection_test(self,cmd_str: str,host,port): """ 测试 SMTP 命令注入漏洞。 参数: cmd_str: 多行字符串,其中第一行为类似 "telnet haitutech.cn 25" 后续行为具体的 SMTP 命令,例如: EHLO attacker.com MAIL FROM: RCPT TO: NOTIFY=SUCCESS,FAILURE 返回: 如果 RCPT 命令响应以 250 开头,认为可能存在漏洞(返回 True), 否则认为安全或不支持该注入(返回 False)。 """ last_response = "" lines = cmd_str.strip().splitlines() try: # 建立 TCP 连接 s = socket.create_connection((host, port), timeout=10) except Exception as e: #print("连接失败:", e) return f"连接失败:{e}" # 读取 SMTP Banner try: banner = s.recv(1024).decode('utf-8', errors='ignore') except Exception as e: #print("读取 Banner 失败:", e) s.close() return f"读取 Banner 失败:{e}" # 从第二行开始发送 SMTP 命令 for line in lines[1:]: cmd = line.strip() if not cmd: continue try: s.send((cmd + "\r\n").encode()) except Exception as e: s.close() return f"发送命令失败:{e}" try: response = s.recv(1024).decode('utf-8', errors='ignore') #print("收到响应:", response.strip()) last_response = response.strip() except Exception as e: #print("读取响应失败:", e) s.close() return f"读取响应失败:{e}" s.close() # 以 RCPT 命令的响应作为判断依据 # 注意:对于支持 DSN 的服务器,250 可能是合法的响应, # 但在不支持的情况下,若返回 250 则可能说明命令注入导致了不正常的处理。 if last_response.startswith("250"): return "RCPT 命令返回 250,可能存在 SMTP 命令注入风险。" else: return "RCPT 命令返回非 250 响应,暂未发现注入风险。" def execute_instruction(self, instruction_old): ''' 执行指令:验证合法性 -> 执行 -> 分析结果 :param instruction_old: :return: bool:true-正常返回给大模型,false-结果不返回给大模型 str:执行的指令 str:执行指令的结果 ''' ext_params = self.create_extparams() # 第一步:验证指令合法性 instruction,timeout = self.validate_instruction(instruction_old) if not instruction: return False,instruction_old,"该指令暂不执行!","",ext_params #过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#? # 第二步:执行指令 lines = instruction.strip().splitlines() # 解析第一行,格式应为:telnet host port parts = lines[0].strip().split() if len(parts) < 3: output = "第一行格式错误,预期格式: 'telnet host port'" ext_params["is_user"] = True return True,instruction,output,output,ext_params host = parts[1] port = 0 try: port = int(parts[2]) except ValueError: output = "端口转换失败" ext_params["is_user"] = True if port == 25: output = self.smtp_injection_test(instruction,host,port) else:#其他默认subprocess执行 try: # 指令过滤 if "<<<" in instruction: instruction = f"bash -c \"{instruction}\"" if timeout == 0: result = subprocess.run(instruction, shell=True, capture_output=True, text=True) elif timeout > 0: result = subprocess.run(instruction, shell=True, capture_output=True, text=True, timeout=timeout) else: print("timeout参数错误,需要自查程序逻辑!") stderr = result.stderr stdout = result.stdout except subprocess.TimeoutExpired as e: stdout = e.stdout if e.stdout is not None else "" stderr = e.stderr if e.stderr is not None else "" ext_params.is_user = True # 对于超时的也需要人工进行确认,是否是预期的超时 except Exception as e: ext_params.is_user = True return False, instruction, f"执行失败:{str(e)}", "", ext_params # 执行失败,提交给人工确认指令的正确性 output = stdout if stderr: output += stderr if isinstance(output, bytes): # 若是bytes则转成str output = output.decode('utf-8', errors='ignore') # 第三步:分析执行结果 analysis = self.analyze_result(output,instruction,"","") if not analysis: #analysis为“” 不提交LLM ext_params.is_user = True return False,instruction,analysis,output,ext_params return True,instruction, analysis,output,ext_params if __name__ == "__main__": pass