You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
74 lines
3.2 KiB
74 lines
3.2 KiB
import pexpect
|
|
import shlex
|
|
from tools.ToolBase import ToolBase
|
|
|
|
class PsqlTool(ToolBase):
|
|
def validate_instruction(self, instruction):
|
|
#指令过滤
|
|
timeout = 60
|
|
return instruction,timeout
|
|
|
|
def analyze_result(self, result,instruction,stderr,stdout):
|
|
#指令结果分析
|
|
return result
|
|
|
|
def do_worker_pexpect(self,str_instruction,timeout,ext_params):
|
|
try:
|
|
safe_command = shlex.quote(str_instruction)
|
|
cmd = f"bash -c {safe_command}"
|
|
exc_do = pexpect.spawn(cmd,timeout=timeout,encoding='utf-8')
|
|
index = exc_do.expect([
|
|
pexpect.TIMEOUT,
|
|
pexpect.EOF,
|
|
'Password for user postgres:',
|
|
'用户 postgres 的口令:'
|
|
])
|
|
#strout = exc_do.before.decode('utf-8', errors='replace')
|
|
strout = exc_do.before
|
|
if index == 0 or index ==1:
|
|
return strout
|
|
elif index ==2 or index==3:
|
|
strout += '用户 postgres 的口令:'
|
|
exc_do.sendline('') #输入空密码后不知道会有多少种情况,密码不对,密码对
|
|
index = exc_do.expect([pexpect.TIMEOUT,pexpect.EOF])
|
|
print(index)
|
|
strout += exc_do.before
|
|
return strout
|
|
except Exception as e:
|
|
return f"执行错误: {str(e)}"
|
|
|
|
def execute_instruction(self, instruction_old):
|
|
'''
|
|
执行指令:验证合法性 -> 执行 -> 分析结果
|
|
*****如果指令要做验证,只做白名单,所有逻辑不是全放开就是白名单*****
|
|
:param instruction_old:
|
|
:return:
|
|
bool:true-正常返回给大模型处理下一步,false-结果不返回给大模型,2--需要人工确认的指令
|
|
str:执行的指令
|
|
str:执行指令的结果-解析过滤后的结果--也是提交给LLM的结果
|
|
str:执行指令的结果-原结果
|
|
object:补充参数-封装一个对象: 0-不知是否攻击成功,1-明确存在漏洞,2-明确不存在漏洞
|
|
'''
|
|
|
|
ext_params = self.create_extparams()
|
|
# 第一步:验证指令合法性
|
|
instruction,timeout = self.validate_instruction(instruction_old)
|
|
if not instruction:
|
|
ext_params.is_user= True
|
|
return False,instruction_old,"该指令暂不执行!由用户确认是否要兼容支持","",ext_params #未
|
|
#过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#?
|
|
|
|
# 第二步:执行指令
|
|
#output = self.do_worker_script(instruction, timeout, ext_params)
|
|
output = self.do_worker_pexpect(instruction,timeout,ext_params)
|
|
|
|
# 第三步:分析执行结果
|
|
if isinstance(output,bytes):#若是bytes则转成str
|
|
output = output.decode('utf-8', errors='ignore')
|
|
|
|
analysis = self.analyze_result(output,instruction,"","")
|
|
|
|
if not analysis: #analysis为“” 不提交LLM
|
|
ext_params.is_user = True
|
|
return False,instruction,analysis,output,ext_params
|
|
return True,instruction, analysis,output,ext_params
|
|
|