You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
93 lines
3.9 KiB
93 lines
3.9 KiB
#mysql
|
|
#pip install mysql-connector-python
|
|
import subprocess
|
|
import mysql.connector
|
|
from mysql.connector import Error
|
|
from tools.ToolBase import ToolBase
|
|
|
|
class MysqlTool(ToolBase):
|
|
|
|
def test_empty_password_mysql_connection(self,host, username='root'):
|
|
"""
|
|
测试使用空密码连接到指定 MySQL 服务器。
|
|
|
|
参数:
|
|
host (str): MySQL 服务器的主机地址,例如 'haitutech.cn'
|
|
username (str): MySQL 用户名,默认值为 'root'
|
|
"""
|
|
try:
|
|
# 尝试使用空密码连接 MySQL
|
|
connection = mysql.connector.connect(
|
|
host=host, # 主机地址
|
|
user=username, # 用户名
|
|
password='', # 空密码
|
|
connection_timeout=10 # 设置10秒连接超时
|
|
)
|
|
if connection.is_connected():
|
|
res = f"成功连接到 {host},用户 {username} 使用空密码"
|
|
connection.close() # 关闭连接以释放资源
|
|
except Error as e:
|
|
# 捕获并打印连接错误
|
|
res = f"连接失败: {host} - {e}"
|
|
return res
|
|
|
|
def validate_instruction(self, instruction):
|
|
timeout = 30
|
|
#modified_code = "mysql空密码登录测试"
|
|
instr = instruction.replace("--ssl-mode=DISABLED","--ssl=0") #mariaDB 没有ssl-mode参数
|
|
# if "--ssl=0" not in instr:
|
|
# instr = instr + " --ssl=0"
|
|
return instr,timeout
|
|
|
|
#对于非sh命令调用的工具,自己实现命令执行的内容 --#2025-3-24暂时不使用
|
|
def execute_instruction_old(self, instruction_old):
|
|
ext_params = self.create_extparams()
|
|
# 第一步:验证指令合法性
|
|
instruction,timeout = self.validate_instruction(instruction_old)
|
|
if not instruction:
|
|
return False, instruction_old, "该指令暂不执行!","",ext_params
|
|
# 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#?
|
|
|
|
# 第二步:执行指令
|
|
# target = ""
|
|
# parts = instruction_old.split()
|
|
# for i, part in enumerate(parts):
|
|
# if part == "-h" and i + 1 < len(parts):
|
|
# target = parts[i + 1]
|
|
# output = self.test_empty_password_mysql_connection(target)#弱密码攻击如何处理?
|
|
|
|
output = ""
|
|
stdout = ""
|
|
stderr = ""
|
|
try:
|
|
if timeout == 0:
|
|
result = subprocess.run(instruction, shell=True, capture_output=True, text=True)
|
|
elif timeout > 0:
|
|
result = subprocess.run(instruction, shell=True, capture_output=True, text=True, timeout=timeout)
|
|
else:
|
|
print("timeout参数错误,需要自查程序逻辑!")
|
|
stderr = result.stderr
|
|
stdout = result.stdout
|
|
except subprocess.TimeoutExpired as e:
|
|
stdout = e.stdout if e.stdout is not None else ""
|
|
stderr = e.stderr if e.stderr is not None else ""
|
|
ext_params.is_user = True # 对于超时的也需要人工进行确认,是否是预期的超时
|
|
except Exception as e:
|
|
ext_params.is_user = True
|
|
return False, instruction, f"执行失败:{str(e)}", "", ext_params # 执行失败,提交给人工确认指令的正确性
|
|
|
|
# 第三步:分析执行结果
|
|
output = stdout
|
|
if stderr:
|
|
output += stderr
|
|
if isinstance(output, bytes): # 若是bytes则转成str
|
|
output = output.decode('utf-8', errors='ignore')
|
|
analysis = self.analyze_result(output, instruction, stderr, stdout)
|
|
if not analysis: # analysis为“” 不提交LLM
|
|
ext_params.is_user = True
|
|
return False, instruction, analysis, output, ext_params
|
|
return True, instruction, analysis, output, ext_params
|
|
|
|
def analyze_result(self, result,instruction,stderr,stdout):
|
|
#
|
|
return result
|