import re import subprocess import tempfile import os import pexpect import struct import sys import mysql.connector import requests from mycode.LLMManager import LLMManager from mycode.TaskObject import TaskObject from myutils.PickleManager import g_PKM from mycode.InstructionManager import g_instrM from mycode.TaskManager import g_TaskM from mycode.PythonTManager import PythonTManager from myutils.ConfigManager import myCongif from mycode.DBManager import app_DBM import textwrap class Mytest: def update_node_inter(self,attack_index): attack_tree = g_PKM.ReadData(attack_index) nodes = attack_tree.traverse_dfs() # 06-0=>p instr = nodes[6].get_instr_user().pop(0) nodes[6].parent.get_instr_user().append(instr) # 39-1 instr = nodes[39].get_instr_user().pop(1) nodes[39].parent.get_instr_user().append(instr) # 49-0 instr = nodes[49].get_instr_user().pop(0) nodes[49].parent.get_instr_user().append(instr) g_PKM.WriteData(attack_tree, attack_index) def dynamic_fun(self): try: # 尝试无密码连接VNC result = subprocess.run( ['vncviewer', '-passwd', '/dev/null', '192.168.204.137:5900', '-geometry', '1x1'], timeout=15, capture_output=True, text=True ) if 'Authentication failure' in result.stderr: # 尝试常见弱口令组合 credentials = [ ('admin', 'admin'), ('root', 'root'), ('vnc', 'vnc'), ('user', 'password') ] for user, pwd in credentials: cmd = f'vncauth {user} {pwd}' auth_test = subprocess.run(cmd, shell=True, capture_output=True) if auth_test.returncode == 0: return (True, f'Valid credentials found: {user}/{pwd}') return (False, 'No weak credentials found') elif 'Connected' in result.stdout: return (True, 'VNC access without authentication') except subprocess.TimeoutExpired: return (False, 'Connection timeout') except Exception as e: return (False, f'Error: {str(e)}') def do_test(self): import mysql.connector cnx = mysql.connector.connect( host="192.168.204.137", user="root", password="", ssl_disabled=True ) cur = cnx.cursor() cur.execute("SHOW VARIABLES LIKE 'character_set_client'") print(cur.fetchall()) # 应该显示 ('character_set_client', 'utf8') cnx.close() def tmp_test(self): list_a = [0,1,2,3,4,5,6,7,8,9] isart = len(list_a) - 4 # 正常应该都是两个两个 if isart % 2 != 0: print("c_msg数量不对称,需要检查逻辑!") for msg in list_a[isart:]: print(msg) if __name__ == "__main__": # 示例使用 mytest = Mytest() LLM = LLMManager(1) PythonM = PythonTManager(myCongif.get_data("Python_max_procs")) current_path = os.path.dirname(os.path.realpath(__file__)) print(current_path) test_type = 1 task_id = 16 task_Object = TaskObject("test_target","cookie_info",1,1,1,"local_ip",None) if test_type == 0: mytest.dynamic_fun() elif test_type == 1: # # 获取所有自定义函数详情 HIGH_RISK_FUNCTIONS = ['eval', 'exec', 'os.system', 'subprocess.call', 'subprocess.Popen'] str_instr = '''python-code import ssl from socket import create_connection def dynamic_fun(): try: # 强制使用CBC模式弱加密套件 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) context.set_ciphers('AES128-SHA') # 构造异常填充测试数据 sock = create_connection(('58.216.217.70', 443)) ssock = context.wrap_socket(sock, server_hostname='58.216.217.70') # 发送包含异常填充的测试请求 ssock.send(b"GET / HTTP/1.1\\r\\nHost: 58.216.217.70\\r\\n" b"Cookie: test=AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\\r\\n\\r\\n") response = ssock.recv(2048) # 检测异常响应模式 if b"HTTP/1.1 200 OK" in response: return (True, "服务器接受异常填充数据") return (False, "未检测到典型漏洞特征") except ssl.SSLError as e: return (False, f"加密错误: {repr(e)}") except Exception as e: return (False, f"验证失败: {str(e)}") ''' #str_instr = str_instr.strip() + " --max-time 10" dedented_code = textwrap.dedent(str_instr.strip()) #对多shell指令的情况进行处理--也有风险 if "python-code" not in dedented_code: if "&&" in dedented_code: dedented_code = task_Object.mill_instr_preprocess(dedented_code, "&&") elif "||" in dedented_code: dedented_code = task_Object.mill_instr_preprocess(dedented_code, "||") instr, reslut, source_result, ext_params = g_instrM.execute_instruction(dedented_code) else: instr, reslut, source_result, ext_params = PythonM.execute_instruction(dedented_code) # 只取结果的5000长度 reslut = task_Object.smart_truncate(reslut) oneres = {'执行指令': instr, '结果': reslut} print("----执行结果----") print(reslut) elif test_type == 2: #给节点添加指令 g_TaskM.load_tasks() task = g_TaskM.tasks[task_id] nodes = task.attack_tree.traverse_dfs() cur_node = nodes[78] commands = [ ] for cmd in commands: cur_node.add_instr(cmd) cur_node.update_work_status(1) #保存数据 g_PKM.WriteData(task.attack_tree,str(task.task_id)) elif test_type ==3: #测试指令入节点 strinstr = ''' ) ''' strNodes = "执行系统命令探测,权限提升尝试,横向移动测试" nodes = strNodes.split(', ') unique_names = list(set(nodes)) # 去重 for node_name in unique_names: print(node_name) elif test_type == 4: # 修改Messages attact_tree = g_PKM.ReadData("27") # 创建一个新的节点 from mycode.AttackMap import TreeNode testnode = TreeNode("test", 0) LLM.build_initial_prompt(testnode) # 新的Message systems = testnode.parent_messages[0]["content"] # print(systems) # 遍历node,查看有instr的ndoe nodes = attact_tree.traverse_bfs() for node in nodes: node.parent_messages[0]["content"] = systems g_PKM.WriteData(attact_tree, "27") print("完成Messgae更新") elif test_type ==5: mytest.do_test() elif test_type == 6: mytest.tmp_test() else: pass