#mysql #pip install mysql-connector-python import subprocess import mysql.connector from mysql.connector import Error from tools.ToolBase import ToolBase class MysqlTool(ToolBase): def test_empty_password_mysql_connection(self,host, username='root'): """ 测试使用空密码连接到指定 MySQL 服务器。 参数: host (str): MySQL 服务器的主机地址,例如 'haitutech.cn' username (str): MySQL 用户名,默认值为 'root' """ try: # 尝试使用空密码连接 MySQL connection = mysql.connector.connect( host=host, # 主机地址 user=username, # 用户名 password='', # 空密码 connection_timeout=10 # 设置10秒连接超时 ) if connection.is_connected(): res = f"成功连接到 {host},用户 {username} 使用空密码" connection.close() # 关闭连接以释放资源 except Error as e: # 捕获并打印连接错误 res = f"连接失败: {host} - {e}" return res def validate_instruction(self, instruction): timeout = 30 #modified_code = "mysql空密码登录测试" instr = instruction.replace("--ssl-mode=DISABLED","--ssl=0") #mariaDB 没有ssl-mode参数 # if "--ssl=0" not in instr: # instr = instr + " --ssl=0" return instr,timeout #对于非sh命令调用的工具,自己实现命令执行的内容 --#2025-3-24暂时不使用 def execute_instruction_old(self, instruction_old): ext_params = self.create_extparams() # 第一步:验证指令合法性 instruction,timeout = self.validate_instruction(instruction_old) if not instruction: return False, instruction_old, "该指令暂不执行!","",ext_params # 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#? # 第二步:执行指令 # target = "" # parts = instruction_old.split() # for i, part in enumerate(parts): # if part == "-h" and i + 1 < len(parts): # target = parts[i + 1] # output = self.test_empty_password_mysql_connection(target)#弱密码攻击如何处理? output = "" stdout = "" stderr = "" try: if timeout == 0: result = subprocess.run(instruction, shell=True, capture_output=True, text=True) elif timeout > 0: result = subprocess.run(instruction, shell=True, capture_output=True, text=True, timeout=timeout) else: print("timeout参数错误,需要自查程序逻辑!") stderr = result.stderr stdout = result.stdout except subprocess.TimeoutExpired as e: stdout = e.stdout if e.stdout is not None else "" stderr = e.stderr if e.stderr is not None else "" ext_params.is_user = True # 对于超时的也需要人工进行确认,是否是预期的超时 except Exception as e: ext_params.is_user = True return False, instruction, f"执行失败:{str(e)}", "", ext_params # 执行失败,提交给人工确认指令的正确性 # 第三步:分析执行结果 output = stdout if stderr: output += stderr if isinstance(output, bytes): # 若是bytes则转成str output = output.decode('utf-8', errors='ignore') analysis = self.analyze_result(output, instruction, stderr, stdout) if not analysis: # analysis为“” 不提交LLM ext_params.is_user = True return False, instruction, analysis, output, ext_params return True, instruction, analysis, output, ext_params def analyze_result(self, result,instruction,stderr,stdout): # return result