From 6f5d873ee8e6e42af326878044dd772588c217a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E9=BE=99?= Date: Wed, 18 Sep 2024 20:23:27 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B4=E5=90=88=E4=BA=86=E6=A8=A1=E5=9E=8B?= =?UTF-8?q?=E7=8B=AC=E7=AB=8B=E8=BF=9B=E7=A8=8B=EF=BC=8C=E9=80=9A=E8=BF=87?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E6=96=87=E4=BB=B6=E4=B8=AD=E7=9A=84workType?= =?UTF-8?q?=E8=BF=9B=E8=A1=8C=E6=8E=A7=E5=88=B6=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.yaml | 7 + core/CapManager.py | 5 +- core/ChannelData.py | 629 ++++++++++++++++++ core/ChannelManager.py | 484 ++------------ core/DataStruct.py | 19 + core/ModelManager.py | 46 +- core/ModelNode.py | 133 ++++ model/base_model/ascnedcl/classes.py | 9 + model/base_model/ascnedcl/det_utils.py | 1 + model/base_model/ascnedcl/det_utils_v10.py | 107 +++ model/plugins/ModelBase.py | 8 + model/plugins/RYRQ_ACL/RYRQ_Model_ACL.py | 76 ++- .../plugins/RYRQ_Model_ACL/RYRQ_Model_ACL.py | 103 ++- myutils/MyDeque.py | 15 +- myutils/myutil.py | 0 web/API/viedo.py | 76 ++- 16 files changed, 1218 insertions(+), 500 deletions(-) create mode 100644 core/ChannelData.py create mode 100644 core/ModelNode.py create mode 100644 model/base_model/ascnedcl/classes.py create mode 100644 model/base_model/ascnedcl/det_utils_v10.py create mode 100644 myutils/myutil.py diff --git a/config.yaml b/config.yaml index f2e3fe1..d293428 100644 --- a/config.yaml +++ b/config.yaml @@ -30,6 +30,9 @@ RTSP_Check_Time : 600 #10分钟 -- 2024-7-8 取消使用 #max_channel_num max_channel_num : 8 #最大视频通道数量 +encode_param : 50 #无参数默认是95 +mywidth: 640 +myheight: 480 #model model_platform : acl #acl gpu cpu @@ -49,3 +52,7 @@ reconnect_attempts: 5 #cap 读取帧失败几次后进行重连 #system --- 指定网卡 wired_interface : eth0 wireless_interface : WLAN + +#独立模型线程相关 +workType : 1 # 1--一通道一线程。2--模型独立线程 + diff --git a/core/CapManager.py b/core/CapManager.py index 3b7b869..a9f7c07 100644 --- a/core/CapManager.py +++ b/core/CapManager.py @@ -42,7 +42,7 @@ class VideoCaptureWithFPS: self.running = True #self.frame_queue = queue.Queue(maxsize=1) - self.frame_queue = MyDeque(10) + self.frame_queue = MyDeque(5) #self.frame = None #self.read_lock = threading.Lock() self.thread = threading.Thread(target=self.update) @@ -63,6 +63,7 @@ class VideoCaptureWithFPS: def update(self): sleep_time = myCongif.get_data("cap_sleep_time") reconnect_attempts = myCongif.get_data("reconnect_attempts") + while self.running: try: self.openViedo_opencv(self.source) @@ -141,6 +142,8 @@ class VideoCaptureWithFPS: frame = self.frame_queue.mypopleft() if frame is not None: ret = True + else: + print("____读取cap帧为空,采集速度过慢___") return ret, frame def release(self): diff --git a/core/ChannelData.py b/core/ChannelData.py new file mode 100644 index 0000000..e0268e7 --- /dev/null +++ b/core/ChannelData.py @@ -0,0 +1,629 @@ +import time +import copy +import importlib.util +import datetime +import time +import threading +import cv2 +import ffmpeg +import subprocess +from collections import deque +from myutils.MyLogger_logger import LogHandler +from core.CapManager import VideoCaptureWithFPS +from core.ACLModelManager import ACLModeManger +from model.plugins.ModelBase import ModelBase +from core.WarnManager import WarnData +from core.DataStruct import ModelinData,ModeloutData +from myutils.MyDeque import MyDeque +from myutils.ConfigManager import myCongif + +class ChannelData: + def __init__(self,channel_id,deque_length,icount_max,warnM): + self.logger = LogHandler().get_logger("ChannelDat") + self.model_platform = myCongif.get_data("model_platform") + self.channel_id = channel_id #该通道的通道ID + self.warnM = warnM #报警线程管理对象--MQ + self.ffprocess = self.start_h264_encoder(myCongif.get_data("mywidth"),myCongif.get_data("myheight"))#基于frame进行编码 + #视频采集相关 + self.cap = None #该通道视频采集对象 + self.frame_rate = myCongif.get_data("frame_rate") + self.frame_interval = 1.0 / int(myCongif.get_data("verify_rate")) + + #模型采集相关 + self.model = None #模型对象 -- 一通道只关联一个模型 + self.work_th = None #该通道工作线程句柄 + self.b_model = False #是否有运行模型线程 + self.bool_run = True # 线程运行标识 + self.lock = threading.RLock() # 用于保证线程安全 + self.icount_max = icount_max # 帧序列号上限 + self.max_len = myCongif.get_data("buffer_len") + self.deque_frame = deque(maxlen=self.max_len) #视频缓冲区用于保存录像 + self.last_frame = None # 保存图片数据 + #self.frame_queue = queue.Queue(maxsize=1) + self.frame_queue = MyDeque(10) #分析画面MQ + self.counter = 0 #帧序列号--保存报警录像使用 + + #model独立线程相关 + self.per_th = None #预处理线程句柄 + self.per_status= False #预处理线程状态 + self.post_th = None #后处理线程句柄 + self.post_status = False #后处理线程状态 + self.model_node= None #模型对象 -- inmq,outmq + self.out_mq = MyDeque(30) #放通道里面 + + #设置JPEG压缩基本 + self.encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), myCongif.get_data("encode_param")] # 50 是压缩质量(0到100) + + #添加一帧图片 + def add_deque(self, value): + if len(self.deque_frame) == self.max_len: + removed_frame = self.deque_frame.popleft() + del removed_frame + removed_frame = None + self.deque_frame.append(value) #deque 满了以后会把前面的数据移除 + + #拷贝一份数据 + def copy_deque(self): + return copy.deepcopy(self.deque_frame) + + #获取最后一帧图片 + def get_last_frame(self): + if self.b_model: + # with self.lock: + # frame = self.last_frame + # return frame + + # try: + # frame = self.frame_queue.get(timeout=0.3) #web传输没有做帧率控制了,可以超时时间长一点 + # except queue.Empty: + # self.logger.debug(f"{self.channel_id}--web--获取分析画面失败,队列空") + # return None + + frame = self.frame_queue.mypopleft() + return frame + else: #如果没有运行,直接从cap获取画面 + if self.cap: + ret, frame = self.cap.read() # 除了第一帧,其它应该都是有画面的 + if not ret: + self.logger.debug(f"{self.channel_id}--web--获取原画失败,队列空") + return None + ret,buffer_bgr_webp = self._encode_frame(frame) + return buffer_bgr_webp + + # frame_bgr_webp = self.encode_frame_to_flv(frame) + # return frame_bgr_webp + return None + + def encode_frame_to_flv(self,frame): + try: + process = ( + ffmpeg + .input('pipe:', format='rawvideo', pix_fmt='bgr24', s=f'{frame.shape[1]}x{frame.shape[0]}') + .output('pipe:', format='flv',vcodec='libx264') + .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) + ) + out, err = process.communicate(input=frame.tobytes()) + + if process.returncode != 0: + raise RuntimeError(f"FFmpeg encoding failed: {err.decode('utf-8')}") + + return out + + except Exception as e: + print(f"Error during frame encoding: {e}") + return None + + def update_last_frame(self,buffer): + if buffer: + self.frame_queue.myappend(buffer) + + # with self.lock: + # self.last_frame = None + # self.last_frame = buffer + + # if self.frame_queue.full(): + # try: + # print("channel--丢帧") + # self.frame_queue.get(timeout=0.01) + # except queue.Empty: #为空不处理 + # pass + # self.frame_queue.put(buffer) + + # try: + # self.frame_queue.put(buffer,timeout=0.05) + # except queue.Full: + # self.logger.debug(f"{self.channel_id}分析画面队列满,插入失败") + # pass + + #------------h264编码相关--------------- + def start_h264_encoder(self,width, height): #宽高一样,初步定全进程一个 + process = subprocess.Popen( + ['ffmpeg', + '-f', 'rawvideo', + '-pix_fmt', 'bgr24', + '-s', f'{width}x{height}', + '-i', '-', # Take input from stdin + '-an', # No audio + '-vcodec', 'h264_ascend', + '-preset', 'ultrafast', + '-f', 'h264', # Output format H.264 + '-'], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + return process + + def encode_frame_h264(self, frame): + if self.process.poll() is not None: + raise RuntimeError("FFmpeg process has exited unexpectedly.") + # Write frame to stdin of the FFmpeg process + try: + self.process.stdin.write(frame.tobytes()) + except Exception as e: + raise RuntimeError(f"Failed to write frame to FFmpeg: {e}") + + # Capture the encoded output + buffer_size = 1024 * 10 # Adjust this based on the size of the encoded frame + encoded_frame = bytearray() + + while True: + chunk = self.process.stdout.read(buffer_size) + if not chunk: + break + encoded_frame.extend(chunk) + + if not encoded_frame: + raise RuntimeError("No encoded data received from FFmpeg.") + + # Optional: Check for errors in stderr + # stderr_output = self.process.stderr.read() + # if "error" in stderr_output.lower(): + # raise RuntimeError(f"FFmpeg error: {stderr_output}") + + return bytes(encoded_frame) + + def _encode_frame(self,frame,itype=0): + ret = False + buffer_bgr_webp = None + if itype == 0: #jpg + ret, frame_bgr_webp = cv2.imencode('.jpg', frame, self.encode_param) + if ret: + buffer_bgr_webp = frame_bgr_webp.tobytes() + elif itype == 1: #H264 + try: + buffer_bgr_webp = self.encode_frame_h264(frame) + ret = True + except Exception as e: + print(e) + else: + print("错误的参数!!") + return ret,buffer_bgr_webp + + #帧序列号自增 一个线程中处理,不用加锁 + def increment_counter(self): + self.counter += 1 + if self.counter > self.icount_max: + self.counter = 0 + + def get_counter(self): + return self.counter + + def _start_cap_th(self,source,type=1): + '''开始cap采集线程 + type = 打开摄像头 0--USB摄像头,1-RTSP,2-海康SDK + ''' + ret = False + if self.cap: + self.cap.release() + self.cap = None + self.cap = VideoCaptureWithFPS(source,type) + if self.cap: + ret = True + return ret + + def _stop_cap_th(self): + '''停止cap采集线程 + 重要约束:停止cap线程前,必须先停止model线程 + ''' + if self.b_model: + self.logger.error("停止采集线程前,请先停止model线程") + return False + else: + if self.cap: + self.cap.release() + self.cap = None + return True #一般不会没有cap + + def _pre_work_th(self,schedule): + '''一个通道一个线程,关联的模型在一个线程检测,局部变量都是一个通道独有''' + if not self.cap: + self.logger.error("采集线程未正常启动,不进行工作") + return + while self.model_node.model_th_status == 0: #避免模型没启动成功,模型线程在运行 + time.sleep(1) + if self.model_node.model_th_status == 1: + # 开始循环处理业务 + last_frame_time = time.time() # 初始化个读帧时间 + self.per_status = True + self.b_model = True + while self.bool_run: # 基于tag 作为运行标识。 线程里只是读,住线程更新,最多晚一轮,应该不用线程锁。需验证 + # 帧率控制帧率 + current_time = time.time() + elapsed_time = current_time - last_frame_time + if elapsed_time < self.frame_interval: + time.sleep(self.frame_interval - elapsed_time) # 若小于间隔时间则休眠 + last_frame_time = time.time() + # *********取画面************* + ret, frame = self.cap.read() # 除了第一帧,其它应该都是有画面的 + if not ret: + # self.logger.debug(f"{self.channel_id}--model--获取cap画面失败,队列空") + continue # 没读到画面继续 + # 验证检测计划,是否在布防时间内 + now = datetime.datetime.now() # 获取当前日期和时间 + weekday = now.weekday() # 获取星期几,星期一是0,星期天是6 + hour = now.hour + if schedule[weekday][hour] == 1: + #图片预处理 + img,scale_ratio, pad_size = self.model_node.model.prework(frame) + indata = ModelinData(self.channel_id,img,frame,scale_ratio, pad_size) + self.model_node.in_mq.myappend(indata) + else:# 不在计划则不进行验证,直接返回图片 --存在问题是:result 漏数据 + ret, frame_bgr_webp = cv2.imencode('.jpg', frame,self.encode_param) + if not ret: + buffer_bgr_webp = None + else: + buffer_bgr_webp = frame_bgr_webp.tobytes() + self.update_last_frame(buffer_bgr_webp) + else: + self.logger.error("模型线程为启动成功,不进行工作") + return + self.b_model = False + self.per_status = False + + def _post_work_th(self,duration_time,proportion,verify_rate,warn_interval,model_name,check_area,polygon,conf_threshold,iou_thres): + # 初始化业务数据 + result = [0 for _ in range(duration_time * verify_rate)] # 初始化时间*验证帧率数量的结果list + warn_last_time = time.time() + while self.bool_run: + out_data = self.out_mq.mypopleft() #(image,scale_ratio, pad_size,outputs): + if not out_data: + time.sleep(0.1) + continue + #开始后处理 + bwarn, warn_text = self.model_node.model.postwork(out_data.image,out_data.outputs,out_data.scale_ratio,out_data.pad_size, + check_area,polygon,conf_threshold,iou_thres) + # 对识别结果要部要进行处理 + if bwarn: + # 绘制报警文本 + cv2.putText(out_data.image, warn_text, (50, 50), + cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) + result.append(1) # 要验证数组修改,是地址修改吗? + else: # 没有产生报警也需要记录,统一计算占比 + result.append(0) + #分析画面保存 + ret, frame_bgr_webp = cv2.imencode('.jpg', out_data.image,self.encode_param) + buffer_bgr_webp = None + if ret: + buffer_bgr_webp = frame_bgr_webp.tobytes() + # 分析图片放入缓冲区内存中 + self.add_deque(out_data.image) # 缓冲区大小由maxlen控制 超上限后,删除最前的数据 + # 分析画面一直更新最新帧,提供网页端显示 + self.update_last_frame(buffer_bgr_webp) + + if bwarn: + # 验证result -是否触发报警要求 --遍历每个模型执行的result + count_one = float(sum(result)) # 1,0 把1累加的和就是1的数量 + ratio_of_ones = count_one / len(result) + # self.logger.debug(result) + if ratio_of_ones >= proportion: # 触发报警 + # 基于时间间隔判断 + current_time = time.time() + elapsed_time = current_time - warn_last_time + if elapsed_time < warn_interval: + continue + warn_last_time = current_time + # 处理报警 + warn_data = WarnData() + warn_data.model_name = model_name + warn_data.warn_text = warn_text + warn_data.img_buffer = self.copy_deque() # 深度复制缓冲区 + warn_data.width = self.cap.width + warn_data.height = self.cap.height + warn_data.channel_id = self.channel_id + self.warnM.add_warn_data(warn_data) + + # 结果记录要清空 + for i in range(len(result)): + result[i] = 0 + + def _verify(self,frame,model,model_data,schedule,result,isdraw=1): + '''验证执行主函数,实现遍历通道关联的模型,调用对应模型执行验证''' + #img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + img = frame + #验证检测计划,是否在布防时间内 + now = datetime.datetime.now() # 获取当前日期和时间 + weekday = now.weekday() # 获取星期几,星期一是0,星期天是6 + hour = now.hour + result.pop(0) # 保障结果数组定长 --先把最早的结果推出数组 + + warntext = "" + if model and schedule[weekday][hour] == 1: #不在计划则不进行验证,直接返回图片 + # 调用模型,进行检测,model是动态加载的,具体的判断标准由模型内执行 ---- ********* + bwarn, warntext = model.verify(img, model_data,isdraw) #****************重要 + # 对识别结果要部要进行处理 + if bwarn: + # 绘制报警文本 + cv2.putText(img, warntext, (50, 50), + cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) + result.append(1) #要验证数组修改,是地址修改吗? + else: #没有产生报警也需要记录,统一计算占比 + warntext = "" + result.append(0) + else: + result.append(0) + + # 将检测结果图像转换为帧--暂时用不到AVFrame--2024-7-5 + # new_frame_rgb_avframe = av.VideoFrame.from_ndarray(img, format="rgb24") # AVFrame + # new_frame_rgb_avframe.pts = None # 添加此行确保有pts属性 + # if isinstance(img, np.ndarray): -- 留个纪念 + + #处理完的图片后返回-bgr模式 + #img_bgr_ndarray = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) + + ret,buffer_bgr_webp = self._encode_frame(img) + + #buffer_bgr_webp = self.encode_frame_to_flv(img) + return buffer_bgr_webp,img,warntext + + def _dowork_thread(self,channel_id,model_data,schedule,verify_rate,warn_interval): + '''一个通道一个线程,关联的模型在一个线程检测,局部变量都是一个通道独有''' + if not self.cap: + self.logger.error("采集线程未正常启动,退出model线程") + return + #加载自定义模型文件 + model = self._import_model(str(channel_id), model_data[5], model_data[8], model_data[9]) # 动态加载模型处理文件py + if not model: + self.logger.error("自定义模型文件加载失败,退出model线程") + return + #初始化模型运行资源 + context = None + device_id = myCongif.get_data("device_id") + if self.model_platform == "acl": # ACL线程中初始化内容 + context = ACLModeManger.th_inti_acl(device_id) #创建context + #初始化模型资源 -- 加载模型文件 + ret = model.init_acl_resource() #加载和初始化离线模型文件--om文件 + if not ret: + print("初始化模型资源出错,退出线程!") + return + #初始化业务数据 + result = [0 for _ in range(model_data[3] * verify_rate)] # 初始化时间*验证帧率数量的结果list + proportion = model_data[4] # 判断是否报警的占比 + warn_last_time = time.time() + #warn_save_count = 0 #保存录像的最新帧初始化为0 + + #开始循环处理业务 + last_frame_time = time.time() #初始化个读帧时间 + self.b_model = True + while self.bool_run: #基于tag 作为运行标识。 线程里只是读,住线程更新,最多晚一轮,应该不用线程锁。需验证 + # 帧率控制帧率 + current_time = time.time() + elapsed_time = current_time - last_frame_time + if elapsed_time < self.frame_interval: + time.sleep(self.frame_interval - elapsed_time) #若小于间隔时间则休眠 + last_frame_time = time.time() + #*********取画面************* + ret,frame = self.cap.read() #除了第一帧,其它应该都是有画面的 + if not ret: + #self.logger.debug(f"{self.channel_id}--model--获取cap画面失败,队列空") + continue #没读到画面继续 + + #执行图片推理 + buffer_bgr_webp,img_bgr_ndarray,warn_text = self._verify(frame,model,model_data,schedule,result) + + #分析图片放入内存中 + self.add_deque(img_bgr_ndarray) # 缓冲区大小由maxlen控制 超上限后,删除最前的数据 + + # 分析画面一直更新最新帧,提供网页端显示 + self.update_last_frame(buffer_bgr_webp) + + if warn_text: + #验证result -是否触发报警要求 --遍历每个模型执行的result + count_one = float(sum(result)) #1,0 把1累加的和就是1的数量 + ratio_of_ones = count_one / len(result) + #self.logger.debug(result) + if ratio_of_ones >= proportion: #触发报警 + # 基于时间间隔判断 + current_time = time.time() + elapsed_time = current_time - warn_last_time + if elapsed_time < warn_interval: + continue + warn_last_time = current_time + # 处理报警 + warn_data = WarnData() + warn_data.model_name = model_data[7] + warn_data.warn_text = warn_text + warn_data.img_buffer = self.copy_deque() # 深度复制缓冲区 + warn_data.width = self.cap.width + warn_data.height = self.cap.height + warn_data.channel_id = channel_id + self.warnM.add_warn_data(warn_data) + + #结果记录要清空 + for i in range(len(result)): + result[i] = 0 + + # end_time = time.time() # 结束时间 + # print(f"Processing time: {end_time - start_time} seconds") + # 本地显示---测试使用 + # if channel_id == 2: + # cv2.imshow(str(channel_id), img) + # if cv2.waitKey(1) & 0xFF == ord('q'): + # break + + #结束线程 + print("开始结束工作线程") + self.b_model = False + #反初始化 + if self.model_platform == "acl": + try: + model.release() #释放模型资源资源 + # 删除模型对象 + del model + #释放context + if context: # ACL线程中反初始化内容 -- 若线程异常退出,这些资源就不能正常释放了 + # 再释放context + ACLModeManger.th_del_acl(context) + except Exception as e: + print(e) + #cv2.destroyAllWindows() + print("线程结束!!!!") + + #2024-9-9 新增兼容独立model线程 根据self.model_node判断,None:1通道1线程,not None:独立线程 + def _start_model_th(self,model_data,schedule,type=1): + verify_rate = myCongif.get_data("verify_rate") + warn_interval = myCongif.get_data("warn_interval") + self.bool_run = True + if self.model_node:#要起个预处理线程,和一个后处理线程 + #启动后处理线程 + self.post_th = threading.Thread(target=self._post_work_th, + args=(model_data[3],model_data[4],verify_rate,warn_interval,model_data[7], + model_data[1],model_data[2],model_data[8],model_data[9])) + self.post_th.start() + #启动模型线程,若线程已启动,则+1 mq + self.model_node.start_model_th(self.channel_id,self.out_mq) + #启动预处理线程 + self.per_th = threading.Thread(target=self._pre_work_th,args=(schedule,)) + self.per_th.start() + + else: + self.work_th = threading.Thread(target=self._dowork_thread, + args=(self.channel_id, model_data, schedule, verify_rate, + warn_interval)) # 一个视频通道一个线程 + self.work_th.start() + + def _stop_model_th(self): + if self.model_node: #独立线程,需要停止预处理线程,和后处理线程 + self.bool_run = False + #停止预处理线程 + if self.per_th: + self.per_th.join() + self.per_th = None + #停止model线程 -1 + self.model_node.stop_model_th(self.channel_id) + #停止后处理线程 + if self.post_th: + self.post_th.join() + self.post_th = None + self.out_mq.myclear()#清空后处理mq中未处理的数据 + else: + if self.work_th: + if self.b_model: + self.bool_run = False + self.work_th.join() #线程结束时,会把b_model置False + self.logger.debug(f"{self.channel_id}停止工作线程") + self.work_th = None + + def _import_model(self,model_name,model_path,threshold,iou_thres): + ''' + 根据路径,动态导入模块 + :param model_name: 模块名称 --用通道ID代替 + :param model_path: 模块路径 + :param threshold: 置信阈值 + :param iou_thres: iou阈值 + :return: + ''' + try: + module_path = model_path.replace("/", ".").rsplit(".", 1)[0] + print(module_path) + # 动态导入模块 + module = importlib.import_module(module_path) + # 从模块中获取指定的类 + Model = getattr(module, "Model") + # 使用 Model 类 + model_instance = Model(model_path,threshold,iou_thres) + return model_instance + except ModuleNotFoundError as e: + print(f"Module not found: {e}") + return None + except AttributeError as e: + print(f"Class not found in module: {e}") + return None + except Exception as e: + print(f"An unexpected error occurred: {e}") + return None + # if os.path.exists(model_path): + # module_spec = importlib.util.spec_from_file_location(model_name, model_path) + # if module_spec is None: + # self.logger.error(f"{model_path} 加载错误") + # return None + # module = importlib.util.module_from_spec(module_spec) + # start_time = time.time() + # print(f"通道{self.channel_id},开始exec_module自定义模型文件") + # module_spec.loader.exec_module(module) + # end_time = time.time() + # print(f"通道{self.channel_id},完成exec_module模型文件实例化耗时{end_time - start_time}") + # + # try: + # md = getattr(module, "Model")(model_path,threshold,iou_thres) #实例化类 + # except Exception as e: + # self.logger.error(f"{model_path} 实例化错误,退出模型线程") + # return None + # + # if not isinstance(md, ModelBase): + # self.logger.error("{} not zf_model".format(md)) + # return None + # else: + # self.logger.error("{}文件不存在".format(model_path)) + # return None + # self.logger.debug(f"{model_path} 加载成功!!!!") + # return md + + def start_work(self,cap_data,model_data,schedule,type,model_Node=None): + ''' + 开始工作线程,包括视频通道采集和模型处理 + :param cap_data: [source,type] + :param model_data: 跟通道关联的模型数据 + strsql = ( + f"select t1.model_id,t1.check_area,t1.polygon ,t2.duration_time,t2.proportion,t2.model_path,t1.ID," + f"t2.model_name,t1.conf_thres,t1.iou_thres " + f"from channel2model t1 left join model t2 on t1.model_id = t2.ID where t1.channel_id ={channel_id};") + :param schedule + :param type: 0-启动所有线程,1-只启动cap采集线程,2-只启动model模型处理线程 + :return: True,False + ''' + ret = False + self.model_node = model_Node + if type==0: + self._start_cap_th(cap_data[0],cap_data[1]) #先cap,再model + self._start_model_th(model_data,schedule) + ret = True + elif type == 1: + self._start_cap_th(cap_data[0],cap_data[1]) + ret = True + elif type == 2: + self._start_model_th(model_data,schedule) + ret = True + else: + self.logger.error("暂时还不支持该类型工作!") + return ret + + def stop_work(self,type=0): + ''' + 清空数据,停止工作线程(若有 ,并删除deque 和 last_frame) + :param type: 0-停止所有线程,1-只停止cap采集线程,2-只停止model模型处理线程 + :return: True,False + ''' + ret = False + if type == 0: + self._stop_model_th() + self._stop_cap_th() + ret = True + elif type == 1: + self._stop_cap_th() + ret = True + elif type == 2: + self.logger.debug("单独停止工作线程") + self._stop_model_th() + ret = True + else: + self.logger.error("暂时还不支持该类型工作!") + return ret diff --git a/core/ChannelManager.py b/core/ChannelManager.py index 5761e89..478d42c 100644 --- a/core/ChannelManager.py +++ b/core/ChannelManager.py @@ -1,439 +1,10 @@ -import numpy as np -import time -import copy import cv2 import threading -import importlib.util -import datetime -import time -import os -import ffmpeg -from collections import deque -from myutils.MyLogger_logger import LogHandler -from myutils.ConfigManager import myCongif -from core.CapManager import VideoCaptureWithFPS -from core.ACLModelManager import ACLModeManger -from model.plugins.ModelBase import ModelBase -from core.WarnManager import WarnData -from myutils.MyDeque import MyDeque import base64 - -class ChannelData: - def __init__(self,channel_id,deque_length,icount_max,warnM): - self.logger = LogHandler().get_logger("ChannelDat") - self.model_platform = myCongif.get_data("model_platform") - self.channel_id = channel_id #该通道的通道ID - self.warnM = warnM #报警线程管理对象--MQ - #视频采集相关 - self.cap = None #该通道视频采集对象 - self.frame_rate = myCongif.get_data("frame_rate") - self.frame_interval = 1.0 / int(myCongif.get_data("verify_rate")) - - #模型采集相关 - self.model = None #模型对象 -- 一通道只关联一个模型 - self.work_th = None #该通道工作线程句柄 - self.b_model = False #是否有运行模型线程 - self.bool_run = True # 线程运行标识 - self.lock = threading.RLock() # 用于保证线程安全 - self.icount_max = icount_max # 帧序列号上限 - self.max_len = myCongif.get_data("buffer_len") - self.deque_frame = deque(maxlen=self.max_len) #视频缓冲区用于保存录像 - self.last_frame = None # 保存图片数据 - #self.frame_queue = queue.Queue(maxsize=1) - self.frame_queue = MyDeque(10) #分析画面MQ - self.counter = 0 #帧序列号--保存报警录像使用 - - - - #添加一帧图片 - def add_deque(self, value): - if len(self.deque_frame) == self.max_len: - removed_frame = self.deque_frame.popleft() - del removed_frame - removed_frame = None - self.deque_frame.append(value) #deque 满了以后会把前面的数据移除 - - #拷贝一份数据 - def copy_deque(self): - return copy.deepcopy(self.deque_frame) - - #获取最后一帧图片 - def get_last_frame(self): - if self.b_model: - # with self.lock: - # frame = self.last_frame - # return frame - - # try: - # frame = self.frame_queue.get(timeout=0.3) #web传输没有做帧率控制了,可以超时时间长一点 - # except queue.Empty: - # self.logger.debug(f"{self.channel_id}--web--获取分析画面失败,队列空") - # return None - - frame = self.frame_queue.mypopleft() - return frame - else: #如果没有运行,直接从cap获取画面 - if self.cap: - ret, frame = self.cap.read() # 除了第一帧,其它应该都是有画面的 - if not ret: - self.logger.debug(f"{self.channel_id}--web--获取原画失败,队列空") - return None - ret, frame_bgr_webp = cv2.imencode('.jpg', frame) - if not ret: - buffer_bgr_webp = None - else: - buffer_bgr_webp = frame_bgr_webp.tobytes() - return buffer_bgr_webp - - # frame_bgr_webp = self.encode_frame_to_flv(frame) - # return frame_bgr_webp - return None - - def encode_frame_to_flv(self,frame): - try: - process = ( - ffmpeg - .input('pipe:', format='rawvideo', pix_fmt='bgr24', s=f'{frame.shape[1]}x{frame.shape[0]}') - .output('pipe:', format='flv',vcodec='libx264') - .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) - ) - out, err = process.communicate(input=frame.tobytes()) - - if process.returncode != 0: - raise RuntimeError(f"FFmpeg encoding failed: {err.decode('utf-8')}") - - return out - - except Exception as e: - print(f"Error during frame encoding: {e}") - return None - - def update_last_frame(self,buffer): - if buffer: - self.frame_queue.myappend(buffer) - - # with self.lock: - # self.last_frame = None - # self.last_frame = buffer - - # if self.frame_queue.full(): - # try: - # print("channel--丢帧") - # self.frame_queue.get(timeout=0.01) - # except queue.Empty: #为空不处理 - # pass - # self.frame_queue.put(buffer) - - # try: - # self.frame_queue.put(buffer,timeout=0.05) - # except queue.Full: - # self.logger.debug(f"{self.channel_id}分析画面队列满,插入失败") - # pass - - - #帧序列号自增 一个线程中处理,不用加锁 - def increment_counter(self): - self.counter += 1 - if self.counter > self.icount_max: - self.counter = 0 - - def get_counter(self): - return self.counter - - def _start_cap_th(self,source,type=1): - '''开始cap采集线程 - type = 打开摄像头 0--USB摄像头,1-RTSP,2-海康SDK - ''' - ret = False - if self.cap: - self.cap.release() - self.cap = None - self.cap = VideoCaptureWithFPS(source,type) - if self.cap: - ret = True - return ret - - def _stop_cap_th(self): - '''停止cap采集线程 - 重要约束:停止cap线程前,必须先停止model线程 - ''' - if self.b_model: - self.logger.error("停止采集线程前,请先停止model线程") - return False - else: - if self.cap: - self.cap.release() - self.cap = None - return True #一般不会没有cap - - def _start_model_th(self,model_data,schedule): - verify_rate = myCongif.get_data("verify_rate") - warn_interval = myCongif.get_data("warn_interval") - self.bool_run = True - self.work_th = threading.Thread(target=self._dowork_thread, - args=(self.channel_id, model_data, schedule, verify_rate, - warn_interval)) # 一个视频通道一个线程 - self.work_th.start() - - def _stop_model_th(self): - if self.work_th: - if self.b_model: - self.bool_run = False - self.work_th.join() #线程结束时,会把b_model置False - self.logger.debug(f"{self.channel_id}停止工作线程") - self.work_th = None - - def _import_model(self,model_name,model_path,threshold,iou_thres): - ''' - 根据路径,动态导入模块 - :param model_name: 模块名称 --用通道ID代替 - :param model_path: 模块路径 - :param threshold: 置信阈值 - :param iou_thres: iou阈值 - :return: - ''' - try: - module_path = model_path.replace("/", ".").rsplit(".", 1)[0] - print(module_path) - # 动态导入模块 - module = importlib.import_module(module_path) - # 从模块中获取指定的类 - Model = getattr(module, "Model") - # 使用 Model 类 - model_instance = Model(model_path,threshold,iou_thres) - return model_instance - except ModuleNotFoundError as e: - print(f"Module not found: {e}") - return None - except AttributeError as e: - print(f"Class not found in module: {e}") - return None - except Exception as e: - print(f"An unexpected error occurred: {e}") - return None - # if os.path.exists(model_path): - # module_spec = importlib.util.spec_from_file_location(model_name, model_path) - # if module_spec is None: - # self.logger.error(f"{model_path} 加载错误") - # return None - # module = importlib.util.module_from_spec(module_spec) - # start_time = time.time() - # print(f"通道{self.channel_id},开始exec_module自定义模型文件") - # module_spec.loader.exec_module(module) - # end_time = time.time() - # print(f"通道{self.channel_id},完成exec_module模型文件实例化耗时{end_time - start_time}") - # - # try: - # md = getattr(module, "Model")(model_path,threshold,iou_thres) #实例化类 - # except Exception as e: - # self.logger.error(f"{model_path} 实例化错误,退出模型线程") - # return None - # - # if not isinstance(md, ModelBase): - # self.logger.error("{} not zf_model".format(md)) - # return None - # else: - # self.logger.error("{}文件不存在".format(model_path)) - # return None - # self.logger.debug(f"{model_path} 加载成功!!!!") - # return md - - def _verify(self,frame,model,model_data,schedule,result,isdraw=1): - '''验证执行主函数,实现遍历通道关联的模型,调用对应模型执行验证''' - #img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) - img = frame - #验证检测计划,是否在布防时间内 - now = datetime.datetime.now() # 获取当前日期和时间 - weekday = now.weekday() # 获取星期几,星期一是0,星期天是6 - hour = now.hour - result.pop(0) # 保障结果数组定长 --先把最早的结果推出数组 - - warntext = "" - if model and schedule[weekday][hour] == 1: #不在计划则不进行验证,直接返回图片 - # 调用模型,进行检测,model是动态加载的,具体的判断标准由模型内执行 ---- ********* - bwarn, warntext = model.verify(img, model_data,isdraw) #****************重要 - # 对识别结果要部要进行处理 - if bwarn: - # 绘制报警文本 - cv2.putText(img, warntext, (50, 50), - cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) - result.append(1) #要验证数组修改,是地址修改吗? - else: #没有产生报警也需要记录,统一计算占比 - warntext = "" - result.append(0) - else: - result.append(0) - - # 将检测结果图像转换为帧--暂时用不到AVFrame--2024-7-5 - # new_frame_rgb_avframe = av.VideoFrame.from_ndarray(img, format="rgb24") # AVFrame - # new_frame_rgb_avframe.pts = None # 添加此行确保有pts属性 - # if isinstance(img, np.ndarray): -- 留个纪念 - - #处理完的图片后返回-bgr模式 - #img_bgr_ndarray = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) - # 将检查结果转换为WebP格式图片 --在线程里面完成应该可以减少网页端处理时间 - ret,frame_bgr_webp=cv2.imencode('.jpg', img) - if not ret: - buffer_bgr_webp = None - else: - buffer_bgr_webp = frame_bgr_webp.tobytes() - - #buffer_bgr_webp = self.encode_frame_to_flv(img) - return buffer_bgr_webp,img,warntext - - def _dowork_thread(self,channel_id,model_data,schedule,verify_rate,warn_interval): - '''一个通道一个线程,关联的模型在一个线程检测,局部变量都是一个通道独有''' - if not self.cap: - self.logger.error("采集线程未正常启动,退出model线程") - return - #加载自定义模型文件 - model = self._import_model(str(channel_id), model_data[5], model_data[8], model_data[9]) # 动态加载模型处理文件py - if not model: - self.logger.error("自定义模型文件加载失败,退出model线程") - return - #初始化模型运行资源 - context = None - device_id = myCongif.get_data("device_id") - if self.model_platform == "acl": # ACL线程中初始化内容 - context = ACLModeManger.th_inti_acl(device_id) #创建context - #初始化模型资源 -- 加载模型文件 - ret = model.init_acl_resource() #加载和初始化离线模型文件--om文件 - if not ret: - print("初始化模型资源出错,退出线程!") - return - #初始化业务数据 - result = [0 for _ in range(model_data[3] * verify_rate)] # 初始化时间*验证帧率数量的结果list - proportion = model_data[4] # 判断是否报警的占比 - warn_last_time = time.time() - #warn_save_count = 0 #保存录像的最新帧初始化为0 - - #开始循环处理业务 - last_frame_time = time.time() #初始化个读帧时间 - self.b_model = True - while self.bool_run: #基于tag 作为运行标识。 线程里只是读,住线程更新,最多晚一轮,应该不用线程锁。需验证 - # 帧率控制帧率 - current_time = time.time() - elapsed_time = current_time - last_frame_time - if elapsed_time < self.frame_interval: - time.sleep(self.frame_interval - elapsed_time) #若小于间隔时间则休眠 - last_frame_time = time.time() - #*********取画面************* - ret,frame = self.cap.read() #除了第一帧,其它应该都是有画面的 - if not ret: - #self.logger.debug(f"{self.channel_id}--model--获取cap画面失败,队列空") - continue #没读到画面继续 - - #执行图片推理 - buffer_bgr_webp,img_bgr_ndarray,warn_text = self._verify(frame,model,model_data,schedule,result) - - #分析图片放入内存中 - self.add_deque(img_bgr_ndarray) # 缓冲区大小由maxlen控制 超上限后,删除最前的数据 - - # 分析画面一直更新最新帧,提供网页端显示 - self.update_last_frame(buffer_bgr_webp) - - if warn_text: - #验证result -是否触发报警要求 --遍历每个模型执行的result - count_one = float(sum(result)) #1,0 把1累加的和就是1的数量 - ratio_of_ones = count_one / len(result) - #self.logger.debug(result) - if ratio_of_ones >= proportion: #触发报警 - # 基于时间间隔判断 - current_time = time.time() - elapsed_time = current_time - warn_last_time - if elapsed_time < warn_interval: - continue - warn_last_time = current_time - # 处理报警 - warn_data = WarnData() - warn_data.model_name = model_data[7] - warn_data.warn_text = warn_text - warn_data.img_buffer = self.copy_deque() # 深度复制缓冲区 - warn_data.width = self.cap.width - warn_data.height = self.cap.height - warn_data.channel_id = channel_id - self.warnM.add_warn_data(warn_data) - - #结果记录要清空 - for i in range(len(result)): - result[i] = 0 - - # end_time = time.time() # 结束时间 - # print(f"Processing time: {end_time - start_time} seconds") - # 本地显示---测试使用 - # if channel_id == 2: - # cv2.imshow(str(channel_id), img) - # if cv2.waitKey(1) & 0xFF == ord('q'): - # break - - #结束线程 - print("开始结束工作线程") - self.b_model = False - #反初始化 - if self.model_platform == "acl": - try: - model.release() #释放模型资源资源 - # 删除模型对象 - del model - #释放context - if context: # ACL线程中反初始化内容 -- 若线程异常退出,这些资源就不能正常释放了 - # 再释放context - ACLModeManger.th_del_acl(context) - except Exception as e: - print(e) - #cv2.destroyAllWindows() - print("线程结束!!!!") - - def start_work(self,cap_data,model_data,schedule,type=0): - ''' - 开始工作线程,包括视频通道采集和模型处理 - :param cap_data: [source,type] - :param model_data: 跟通道关联的模型数据 - strsql = ( - f"select t1.model_id,t1.check_area,t1.polygon ,t2.duration_time,t2.proportion,t2.model_path,t1.ID," - f"t2.model_name,t1.conf_thres,t1.iou_thres " - f"from channel2model t1 left join model t2 on t1.model_id = t2.ID where t1.channel_id ={channel_id};") - :param schedule - :param type: 0-启动所有线程,1-只启动cap采集线程,2-只启动model模型处理线程 - :return: True,False - ''' - ret = False - if type==0: - self._start_cap_th(cap_data[0],cap_data[1]) #先cap,再model - self._start_model_th(model_data,schedule) - ret = True - elif type == 1: - self._start_cap_th(cap_data[0],cap_data[1]) - ret = True - elif type == 2: - self._start_model_th(model_data,schedule) - ret = True - else: - self.logger.error("暂时还不支持该类型工作!") - return ret - - def stop_work(self,type=0): - ''' - 清空数据,停止工作线程(若有 ,并删除deque 和 last_frame) - :param type: 0-停止所有线程,1-只停止cap采集线程,2-只停止model模型处理线程 - :return: True,False - ''' - ret = False - if type == 0: - self._stop_model_th() - self._stop_cap_th() - ret = True - elif type == 1: - self._stop_cap_th() - ret = True - elif type == 2: - self.logger.debug("单独停止工作线程") - self._stop_model_th() - ret = True - else: - self.logger.error("暂时还不支持该类型工作!") - return ret - +from myutils.ConfigManager import myCongif +#独立模型线程 +from core.ModelNode import ModelNode +from core.ChannelData import ChannelData #其实ChannelNode会更加贴切一些 ''' 通道对象管理类,只针对通道节点的维护,和工作线程的开启和停止, ChannelData的操作必须维护在ChannelManager里面!不能输出ChannelData对象! @@ -443,6 +14,11 @@ class ChannelManager: self._channels = {} self.cm_lock = threading.RLock() # 用于保证字典操作的线程安全 + # 独立Model_th相关参数 --- modelNode 用一个类封装下model线程和其相关参数 + self.model_list = {} # model_id -- modelNode + self.workType = int(myCongif.get_data("workType")) + self.device_id = myCongif.get_data("device_id") + #增加节点 def add_channel(self, channel_id,deque_length,icount_max,warnM): ret = False @@ -450,6 +26,8 @@ class ChannelManager: if channel_id in self._channels: #若已经有数据,先删除后再增加 self._channels[channel_id].stop_work(0) # 停止工作 del self._channels[channel_id] + if self.workType == 2: # 若有model线程停止了,则删除该model节点 + self.delModelNode() #channel_id,deque_length,icount_max,warnM ch_data = ChannelData(channel_id, deque_length, icount_max,warnM) self._channels[channel_id] = ch_data @@ -459,7 +37,7 @@ class ChannelManager: #删除节点 def delete_channel(self, channel_id): #需要验证资源的释放清空 '''删除节点,包含了停止该节点工作线程''' - ret = False + ret = True with self.cm_lock: if channel_id == 0: for clannel_data in self._channels: @@ -469,14 +47,15 @@ class ChannelManager: if channel_id in self._channels: self._channels[channel_id].stop_work(0) #停止工作 del self._channels[channel_id] - ret = True + if self.workType == 2: #若有model线程停止了,则删除该model节点 + self.delModelNode() return ret #开始工作线程 def start_channel(self,channel_id,cap_data,model_data,schedule,type): ''' 启动通道的工作线程 -- 启动应该没有一下子启动所有 - :param channel_id: 启动通道的ID + :param channel_id: 启动通道的ID channel_id == 0的情况已经在上一层处理了,这里不会有0 :param cap_data: [source,type] :param model_data: 跟该通道关联的模型数据 :param schedule @@ -486,12 +65,16 @@ class ChannelManager: ret = False with self.cm_lock: if channel_id in self._channels: - ret = self._channels[channel_id].start_work(cap_data,model_data,schedule,type) + c_node = self._channels[channel_id] + model_node = None + if self.workType == 2 and type !=1: #需要确保当type!=1时,model_data必须有数据 -- 调用时已经有判断 + model_node = self.CreateModelNode(model_data[0], model_data[5], channel_id) + ret = c_node.start_work(cap_data,model_data,schedule,type,model_node) return ret #停止工作线程---要把视频采集线程停止掉 - def stop_channel(self,channel_id,type): + def stop_channel(self,channel_id,type): #9-10截止目前就重启模型线程时用到该函数(channel_id,2) ''' 停止通道的工作线程 :param channel_id: 0-停止所有通道的工作线程,其他值为具体停止哪个工作线程 @@ -509,8 +92,17 @@ class ChannelManager: if channel_id in self._channels: self._channels[channel_id].stop_work(type) ret =True + + if self.workType == 2: #若有model线程停止了,则删除该model节点 + self.delModelNode() return ret + def get_channel_data(self,channel_id): + cdata = None + if channel_id in self._channels: + cdata = self._channels[channel_id] + return cdata + def cm_get_last_frame(self,channel_id): #这里如果加锁的话,阻塞会比较厉害,但具体程度也需要验证。 frame = None if channel_id in self._channels: @@ -529,7 +121,7 @@ class ChannelManager: for i in range(5): ret, frame = channel_data.cap.read() if ret: - ret, frame_bgr_webp = cv2.imencode('.jpg', frame) + ret, frame_bgr_webp = cv2.imencode('.jpg', frame,self.encode_param) if ret: # 将图像数据编码为Base64 img_base64 = base64.b64encode(frame_bgr_webp).decode('utf-8') @@ -538,6 +130,20 @@ class ChannelManager: print(e) return img_base64 + '''模型独立线程修改2024-9-9,要求是双模式兼容''' + def CreateModelNode(self, model_id, model_path, channel_id): + if model_id in self.model_list: + modelN = self.model_list[model_id] + else: + modelN = ModelNode(self.device_id,model_path) + self.model_list[model_id] = modelN + return modelN + + def delModelNode(self): #关于modelnodel :1.考虑modelnode是否可以不删除,清空inmq即可,2.mdel_list是否需要加锁。#? + return + for model_id, modelNode in self.model_list.items(): + if modelNode.ch_count == 0: + del self.model_list[model_id] if __name__ == "__main__": diff --git a/core/DataStruct.py b/core/DataStruct.py index f993bfa..0a333e4 100644 --- a/core/DataStruct.py +++ b/core/DataStruct.py @@ -22,4 +22,23 @@ class CSAPPLogin_Data(ctypes.Structure): ('passwd', ctypes.c_char * 32) ] +class ModelinData: + def __init__(self,channel_id,img,image,scale_ratio, pad_size): + self.channel_id = channel_id #通道ID + self.img = img #预处理后的图片数据 + self.image = image #原图的数据 + self.scale_ratio = scale_ratio + self.pad_size = pad_size + def __del__(self): + pass + +class ModeloutData: + def __init__(self,image,scale_ratio, pad_size,outputs): + self.image = image #原图 + self.outputs = outputs #模型推理后结果 + self.scale_ratio = scale_ratio + self.pad_size = pad_size + + def __del__(self): + pass diff --git a/core/ModelManager.py b/core/ModelManager.py index 0e77538..a1a91fc 100644 --- a/core/ModelManager.py +++ b/core/ModelManager.py @@ -1,5 +1,5 @@ # 导入代码依赖 -import cv2 + from core.DBManager import mDBM from myutils.MyLogger_logger import LogHandler from myutils.ConfigManager import myCongif @@ -7,7 +7,6 @@ from core.ChannelManager import ChannelManager from core.ACLModelManager import ACLModeManger from core.WarnManager import WarnManager - class ModelManager: def __init__(self): self.verify_list = ChannelManager() #模型的主要数据 -- 2024-7-5修改为类管理通道数据 @@ -18,6 +17,7 @@ class ModelManager: # 报警处理线程-全进程独立一个线程处理 self.warnM = None + # acl初始化 -- 一个进程一个 self.model_platform = myCongif.get_data("model_platform") if self.model_platform == "acl": @@ -50,6 +50,8 @@ class ModelManager: self.warnM = WarnManager() self.warnM.start_warnmanager_th() + + #工作线程:cap+model if channel_id ==0: strsql = "select id,ulr,type from channel where is_work = 1;" #执行所有通道 @@ -70,11 +72,10 @@ class ModelManager: f"t2.model_name,t1.conf_thres,t1.iou_thres " f"from channel2model t1 left join model t2 on t1.model_id = t2.ID where t1.channel_id ={channel_id};") model_data = mDBM.do_select(strsql, 1) # 2024-7-12调整规则,一个通道只关联一个模型,表结构暂时不动 - # cap_data[source,type] - cap_data = [data[1], data[2]] + cap_data = [data[1], data[2]] # cap_data[source,type] if model_data and model_data[0]: # 如果该通道关联了模型 schedule = mDBM.getschedule(model_data[6]) # 获取布防计划 - #启动 + #启动通道线程 self.verify_list.start_channel(channel_id,cap_data,model_data,schedule,0) #cap + model else: self.logger.debug(f"{channel_id}通道没有关联模型,只运行cap采集线程") @@ -83,10 +84,10 @@ class ModelManager: else: print("*****有错误********") - def stop_work(self,channel_id=0): #要对应start_work 1.停止model线程,2.停止cap线程。3.删除c_data + def stop_work(self,channel_id=0): #全停 要对应start_work 1.停止model线程,2.停止cap线程。3.删除c_data '''停止工作线程(包括采集线程,并删除通道数据对象),0-停止所有,非0停止对应通道ID的线程''' try: - self.verify_list.delete_channel(channel_id) # 1,2,3 + self.verify_list.delete_channel(channel_id) except Exception as e: print(e) @@ -96,6 +97,17 @@ class ModelManager: del self.warnM self.warnM = None + def restartC2M(self,channel_id): + ''' + 修改通道管理的算法模型后需要对该通道算法执行部分重新加载执行 -- 只重启model线程 + :param channel_id: + :return: + ''' + #停止该通道的工作线程 -- 不停cap + self.verify_list.stop_channel(channel_id,2) + #重新读取工作线程数据,重启该通道的工作线程 + self.startModelWork(channel_id) + def startModelWork(self,channel_id): strsql = ( f"select t1.model_id,t1.check_area,t1.polygon ,t2.duration_time,t2.proportion,t2.model_path,t1.ID," @@ -104,28 +116,14 @@ class ModelManager: model_data = mDBM.do_select(strsql, 1) # 2024-7-12调整规则,一个通道只关联一个模型,表结构暂时不动 if model_data and model_data[0]: # 如果该通道关联了模型 schedule = mDBM.getschedule(model_data[6]) # 获取布防计划 - # cap_data[source,type] - cap_data = None + + cap_data = None # cap_data[source,type] # 启动 self.logger.debug(f"通道{channel_id}重新启动model线程") - self.verify_list.start_channel(channel_id, cap_data, model_data, schedule, 2) # cap + model + self.verify_list.start_channel(channel_id, cap_data, model_data, schedule, 2) # model else: self.logger.debug(f"通道{channel_id}没有关联模型,不需要启动model线程") - def restartC2M(self,channel_id): - ''' - 修改通道管理的算法模型后需要对该通道算法执行部分重新加载执行 -- 只重启model线程 - :param channel_id: - :return: - ''' - #停止该通道的工作线程 -- 不停cap - self.verify_list.stop_channel(channel_id,2) - #重新读取工作线程数据,重启该通道的工作线程 - self.startModelWork(channel_id) - - '''模型独立线程修改2024-9-9,要求是双模式兼容''' - def thModel(self): - pass def test1(self): # from model.plugins.RYRQ_Model_ACL.RYRQ_Model_ACL import Model diff --git a/core/ModelNode.py b/core/ModelNode.py new file mode 100644 index 0000000..86c8a99 --- /dev/null +++ b/core/ModelNode.py @@ -0,0 +1,133 @@ +import threading +import importlib.util +import time +from myutils.MyDeque import MyDeque +from myutils.ConfigManager import myCongif +from myutils.MyLogger_logger import LogHandler +from core.ACLModelManager import ACLModeManger +from core.DataStruct import ModelinData,ModeloutData +from threading import Lock + +class ModelNode: + def __init__(self,device,model_path): + self.device = device + self.model_path = model_path + self.model = None #模型对象 + self.model_th = None #模型线程句柄 + self.brun = True #模型控制标识 + self.model_th_status = 0 #模型线程运行状态 0--初始状态,1-线程执行成功,2-线程退出 + self.in_mq = MyDeque(50) # + self.channel_list = {} #channel_id out_mq --需要线程安全 + self.clist_Lock = Lock() #channel_list的维护锁 + self.ch_count = 0 #关联启动的通道数量 + self.count_Lock = Lock() #count的维护锁 + self.model_platform = myCongif.get_data("model_platform") + self.logger = LogHandler().get_logger("ModelNode") + + + + def __del__(self): + pass + + def _reset(self): #重置数据 + #self.model_th_status = 0 # 模型线程运行状态 0--初始状态,1-线程执行成功,2-线程退出 + self.in_mq.myclear() + + def _import_model(self,model_path,threshold=0.5,iou_thres=0.5): + ''' + 根据路径,动态导入模块 + :param model_path: 模块路径 + :param threshold: 置信阈值 + :param iou_thres: iou阈值 + :return: + ''' + try: + module_path = model_path.replace("/", ".").rsplit(".", 1)[0] + print(module_path) + # 动态导入模块 + module = importlib.import_module(module_path) + # 从模块中获取指定的类 + Model = getattr(module, "Model") + # 使用 Model 类 + model_instance = Model(model_path,threshold,iou_thres) + return model_instance + except ModuleNotFoundError as e: + print(f"Module not found: {e}") + return None + except AttributeError as e: + print(f"Class not found in module: {e}") + return None + except Exception as e: + print(f"An unexpected error occurred: {e}") + return None + + def _model_th(self): + # 加载自定义模型文件 + self.model = self._import_model(self.model_path) # 动态加载模型处理文件py + if not self.model: + self.logger.error("自定义模型文件加载失败,退出model线程") + self.model_th_status = 2 + return + # 初始化模型运行资源 + context = None + if self.model_platform == "acl": # ACL线程中初始化内容 + context = ACLModeManger.th_inti_acl(self.device) # 创建context + # 初始化模型资源 -- 加载模型文件 + ret = self.model.init_acl_resource() # 加载和初始化离线模型文件--om文件 + if not ret: + print("初始化模型资源出错,退出线程!") + self.model_th_status = 2 + return + #执行工作 + self.model_th_status = 1 + while self.brun: + inData = self.in_mq.mypopleft() #空时,返回None #(self,channel_id,img,image,scale_ratio, pad_size): + if inData: + outputs = self.model.execute([inData.img,])#创建input,执行模型,返回结果 --失败返回None + outdata = ModeloutData(inData.image,inData.scale_ratio,inData.pad_size,outputs) + del inData.img + with self.clist_Lock: + if inData.channel_id in self.channel_list: + self.channel_list[inData.channel_id].myappend(outdata) + else: + time.sleep(0.05) + + #结束线程,释放资源 + self.model_th_status = 0 + self._reset() + # 反初始化 + if self.model_platform == "acl": + try: + self.model.release() # 释放模型资源资源 + # 删除模型对象 + del self.model + # 释放context + if context: # ACL线程中反初始化内容 -- 若线程异常退出,这些资源就不能正常释放了 + # 再释放context + ACLModeManger.th_del_acl(context) + except Exception as e: + print(e) + + def start_model_th(self,channel_id,out_mq): + with self.count_Lock: + with self.clist_Lock: + if channel_id in self.channel_list: + return #这个可以删除老的,新增新的 + self.channel_list[channel_id] = out_mq + if self.ch_count == 0: #第一次启动线程 + self.brun = True + self.model_th = threading.Thread(target=self._model_th) + self.model_th.start() + self.ch_count += 1 #有通道调用一次就加一 + + def stop_model_th(self,channel_id): + with self.count_Lock: + with self.clist_Lock: + if channel_id in self.channel_list: + del self.channel_list[channel_id] + self.ch_count -= 1 + if self.ch_count == 0: #所有通道结束 + self.brun = False + self.model_th.join() + self.model_th = None + diff --git a/model/base_model/ascnedcl/classes.py b/model/base_model/ascnedcl/classes.py new file mode 100644 index 0000000..c952ed0 --- /dev/null +++ b/model/base_model/ascnedcl/classes.py @@ -0,0 +1,9 @@ +CLASSES = ['person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', 'boat', 'traffic light', + 'fire hydrant', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow', + 'elephant', 'bear', 'zebra', 'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee', + 'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard', + 'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple', + 'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch', + 'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone', + 'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear', + 'hair drier', 'toothbrush'] diff --git a/model/base_model/ascnedcl/det_utils.py b/model/base_model/ascnedcl/det_utils.py index f908a56..322c446 100644 --- a/model/base_model/ascnedcl/det_utils.py +++ b/model/base_model/ascnedcl/det_utils.py @@ -55,6 +55,7 @@ def letterbox(img, new_shape=(640, 640), color=(114, 114, 114), auto=False, scal return img, ratio, (dw, dh) + def non_max_suppression( prediction, conf_thres=0.25, diff --git a/model/base_model/ascnedcl/det_utils_v10.py b/model/base_model/ascnedcl/det_utils_v10.py new file mode 100644 index 0000000..b8f3fff --- /dev/null +++ b/model/base_model/ascnedcl/det_utils_v10.py @@ -0,0 +1,107 @@ +import cv2 +import numpy as np +from model.base_model.ascnedcl.classes import CLASSES + + +def letterbox(img, new_shape=(640, 640), auto=False, scaleFill=False, scaleup=True, center=True, stride=32): + # Resize and pad image while meeting stride-multiple constraints + shape = img.shape[:2] # current shape [height, width] + if isinstance(new_shape, int): + new_shape = (new_shape, new_shape) + + # Scale ratio (new / old) + r = min(new_shape[0] / shape[0], new_shape[1] / shape[1]) + if not scaleup: # only scale down, do not scale up (for better val mAP) + r = min(r, 1.0) + + # Compute padding + ratio = r, r # width, height ratios + new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r)) + dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding + if auto: # minimum rectangle + dw, dh = np.mod(dw, stride), np.mod(dh, stride) # wh padding + elif scaleFill: # stretch + dw, dh = 0.0, 0.0 + new_unpad = (new_shape[1], new_shape[0]) + ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios + + if center: + dw /= 2 # divide padding into 2 sides + dh /= 2 + + if shape[::-1] != new_unpad: # resize + img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR) + top, bottom = int(round(dh - 0.1)) if center else 0, int(round(dh + 0.1)) + left, right = int(round(dw - 0.1)) if center else 0, int(round(dw + 0.1)) + img = cv2.copyMakeBorder( + img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=(114, 114, 114) + ) # add border + + return img, ratio, dw, dh + +def non_max_suppression_v10(prediction,conf_thres,ratio,dw,dh): + result = [] + for i in range(prediction.shape[0]): + data = prediction[i] + # 读取类别置信度 + confidence = data[4] + # 用阈值进行过滤 + if confidence > conf_thres: + # 读取类别索引 + label = int(data[5]) + # 读取类坐标值,把坐标还原到原始图像 + xmin = int((data[0] - int(round(dw - 0.1))) / ratio[0]) + ymin = int((data[1] - int(round(dh - 0.1))) / ratio[1]) + xmax = int((data[2] - int(round(dw + 0.1))) / ratio[0]) + ymax = int((data[3] - int(round(dh + 0.1))) / ratio[1]) + result.append([xmin, ymin, xmax, ymax, confidence, label]) + return result + + +def draw_bbox_old(bbox, img0, color, wt): + det_result_str = '' + for idx, class_id in enumerate(bbox[:, 5]): + if float(bbox[idx][4] < float(0.05)): + continue + img0 = cv2.rectangle(img0, (int(bbox[idx][0]), int(bbox[idx][1])), (int(bbox[idx][2]), int(bbox[idx][3])), color, wt) + img0 = cv2.putText(img0, str(idx) + ' ' + CLASSES[int(class_id)], (int(bbox[idx][0]), int(bbox[idx][1] + 16)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1) + img0 = cv2.putText(img0, '{:.4f}'.format(bbox[idx][4]), (int(bbox[idx][0]), int(bbox[idx][1] + 32)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1) + det_result_str += '{} {} {} {} {} {}\n'.format(CLASSES[bbox[idx][5]], str(bbox[idx][4]), bbox[idx][0], bbox[idx][1], bbox[idx][2], bbox[idx][3]) + return img0 + +def draw_box(img, + box, # [xmin, ymin, xmax, ymax] + score, + class_id): + '''Draws a bounding box on the image''' + + # Retrieve the color for the class ID + color_palette = np.random.uniform(0, 255, size=(len(CLASSES), 3)) + color = color_palette[class_id] + + # Draw the bounding box on the image + cv2.rectangle(img, (box[0], box[1]), (box[2], box[3]), color, 2) + + # Create the label text with class name and score + label = f'{CLASSES[class_id]}: {score:.2f}' + + # Calculate the dimensions of the label text + (label_width, label_height), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1) + + # Calculate the position of the label text + label_x = box[0] + label_y = box[1] - 10 if box[1] - 10 > label_height else box[1] + 10 + + # Draw a filled rectangle as the background for the label text + cv2.rectangle( + img, + (int(label_x), int(label_y - label_height)), + (int(label_x + label_width), int(label_y + label_height)), + color, + cv2.FILLED, + ) + + # Draw the label text on the image + cv2.putText(img, label, (int(label_x), int(label_y)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA) + + return img \ No newline at end of file diff --git a/model/plugins/ModelBase.py b/model/plugins/ModelBase.py index d630258..f4ac02c 100644 --- a/model/plugins/ModelBase.py +++ b/model/plugins/ModelBase.py @@ -259,4 +259,12 @@ class ModelBase(ABC): :param isdraw: 是否需要绘制线框:0-不绘制,1-绘制 :return: detections,bwarn,warntext bwarn:0-没有识别到符合要求的目标,1-没有识别到符合要求的目标。 ''' + pass + + @abstractmethod + def prework(self,image): + pass + + @abstractmethod + def postwork(self,image,outputs,scale_ratio,pad_size,check_area,polygon,conf_threshold,iou_thres): pass \ No newline at end of file diff --git a/model/plugins/RYRQ_ACL/RYRQ_Model_ACL.py b/model/plugins/RYRQ_ACL/RYRQ_Model_ACL.py index ad78942..ae9f294 100644 --- a/model/plugins/RYRQ_ACL/RYRQ_Model_ACL.py +++ b/model/plugins/RYRQ_ACL/RYRQ_Model_ACL.py @@ -11,6 +11,8 @@ class Model(ModelBase): dirpath, filename = os.path.split(path) self.model_file = os.path.join(dirpath, "yolov5s_bs1.om") # 目前约束模型文件和py文件在同一目录 self.coco_file = os.path.join(dirpath, "coco_names.txt") + self.labels_dict = get_labels_from_txt(self.coco_file) # 得到类别信息,返回序号与类别对应的字典 + super().__init__(self.model_file) # acl环境初始化基类负责类的实例化 self.name = "人员入侵-yolov5" @@ -19,9 +21,72 @@ class Model(ModelBase): self.neth = 640 # 缩放的目标高度, 也即模型的输入高度 self.netw = 640 # 缩放的目标宽度, 也即模型的输入宽度 - self.conf_threshold = threshold # 置信度阈值 - self.iou_thres = iou_thres # IOU阈值 + # self.conf_threshold = threshold # 置信度阈值 + # self.iou_thres = iou_thres #IOU阈值 + + def prework(self,image): + '''模型输入图片数据前处理 --- 针对每个模型特有的预处理内容 -''' + img, scale_ratio, pad_size = letterbox(image, new_shape=[640, 640]) # 对图像进行缩放与填充 + img = img[:, :, ::-1].transpose(2, 0, 1) # BGR to RGB, HWC to CHW #图片在输入时已经做了转换 + img = np.ascontiguousarray(img, dtype=np.float32) / 255.0 # 转换为内存连续存储的数组 + return img,scale_ratio, pad_size + + def postwork(self,image,outputs,scale_ratio,pad_size,check_area,polygon,conf_threshold,iou_thres): + ''' + 针对每个模型特有的后处理内容 + :param image: + :param outputs: + :param scale_ratio: + :param pad_size: + :param check_area: + :param polygon: + :param conf_threshold: + :param iou_thres: + :return: + ''' + bwarn = False + warn_text = "" + # 是否有检测区域,有先绘制检测区域 由于在该函数生成了polygon对象,所有需要在检测区域前调用。 + if check_area == 1: + self.draw_polygon(image, polygon, (255, 0, 0)) + + if outputs: + output = outputs[0] # 只放了一张图片 -- #是否能批量验证? + # 后处理 -- boxout 是 tensor-list: [tensor([[],[].[]])] --[x1,y1,x2,y2,置信度,coco_index] + # 利用非极大值抑制处理模型输出,conf_thres 为置信度阈值,iou_thres 为iou阈值 + output_torch = torch.tensor(output) + boxout = nms(output_torch, conf_thres=conf_threshold, iou_thres=iou_thres) + del output_torch + pred_all = boxout[0].numpy() # 转换为numpy数组 -- [[],[],[]] --[x1,y1,x2,y2,置信度,coco_index] + # pred_all[:, :4] 取所有行的前4列,pred_all[:,1]--第一列 + scale_coords([640, 640], pred_all[:, :4], image.shape, ratio_pad=(scale_ratio, pad_size)) # 将推理结果缩放到原始图片大小 + + # 过滤掉不是目标标签的数据 -- 序号0-- person self.labels_dict --这个考虑下是否可以放到nms前面#? + filtered_pred_all = pred_all[pred_all[:, 5] == 0] + # 绘制检测结果 --- 也需要封装在类里, + for pred in filtered_pred_all: + x1, y1, x2, y2 = int(pred[0]), int(pred[1]), int(pred[2]), int(pred[3]) + # # 绘制目标识别的锚框 --已经在draw_bbox里处理 + # cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2) + if check_area == 1: # 指定了检测区域 + x_center = (x1 + x2) / 2 + y_center = (y1 + y2) / 2 + # 绘制中心点? + cv2.circle(image, (int(x_center), int(y_center)), 5, (0, 0, 255), -1) + # 判断是否区域点 + if not self.is_point_in_region((x_center, y_center)): + continue # 没产生报警-继续 + # 产生报警 -- 有一个符合即可 + bwarn = True + warn_text = "People Intruder detected!" + draw_bbox(filtered_pred_all, image, (0, 255, 0), 2, self.labels_dict) # 画出检测框、类别、概率 + # 清理内存 + del outputs, output + del boxout + del pred_all, filtered_pred_all + #图片绘制是在原图生效 image + return bwarn, warn_text def verify(self,image,data,isdraw=1): labels_dict = get_labels_from_txt('/mnt/zfbox/model/plugins/RYRQ_ACL/coco_names.txt') # 得到类别信息,返回序号与类别对应的字典 @@ -36,7 +101,6 @@ class Model(ModelBase): filtered_pred_all = None bwarn = False warn_text = "" - # 是否有检测区域,有先绘制检测区域 由于在该函数生成了polygon对象,所有需要在检测区域前调用。 if data[1] == 1: self.draw_polygon(image, data[2], (255, 0, 0)) @@ -70,8 +134,8 @@ class Model(ModelBase): bwarn = True warn_text = "People Intruder detected!" draw_bbox(filtered_pred_all, image, (0, 255, 0), 2, labels_dict) # 画出检测框、类别、概率 - #清理内存 - del outputs,output + # 清理内存 + del outputs, output del boxout del pred_all,filtered_pred_all @@ -79,4 +143,4 @@ class Model(ModelBase): return bwarn, warn_text def testRun(self): - print("1111") \ No newline at end of file + print("I am RYRQ-Model-ACL!!!") \ No newline at end of file diff --git a/model/plugins/RYRQ_Model_ACL/RYRQ_Model_ACL.py b/model/plugins/RYRQ_Model_ACL/RYRQ_Model_ACL.py index d92b5c6..22ba6ed 100644 --- a/model/plugins/RYRQ_Model_ACL/RYRQ_Model_ACL.py +++ b/model/plugins/RYRQ_Model_ACL/RYRQ_Model_ACL.py @@ -1,6 +1,7 @@ import os.path from model.plugins.ModelBase import ModelBase -from model.base_model.ascnedcl.det_utils import get_labels_from_txt, letterbox, scale_coords, nms, draw_bbox # 模型前后处理相关函数 +#from model.base_model.ascnedcl.det_utils import get_labels_from_txt, letterbox, scale_coords, nms, draw_bbox # 模型前后处理相关函数 +from model.base_model.ascnedcl.det_utils_v10 import draw_box,draw_bbox_old, letterbox,non_max_suppression_v10 import cv2 import numpy as np import torch # 深度学习运算框架,此处主要用来处理数据 @@ -9,71 +10,127 @@ class Model(ModelBase): def __init__(self,path,threshold=0.5,iou_thres=0.5): # 找pt模型路径 -- 一个约束py文件和模型文件的路径关系需要固定, -- 上传模型时,要解压好路径 dirpath, filename = os.path.split(path) - self.model_file = os.path.join(dirpath, "yolov5s_bs1.om") # 目前约束模型文件和py文件在同一目录 + #self.model_file = os.path.join(dirpath, "yolov5s_bs1.om") # 目前约束模型文件和py文件在同一目录 + self.model_file = os.path.join(dirpath, "yolov10m_310B4.om") # 目前约束模型文件和py文件在同一目录 self.coco_file = os.path.join(dirpath, "coco_names.txt") + #self.labels_dict = get_labels_from_txt(self.coco_file) # 得到类别信息,返回序号与类别对应的字典 + super().__init__(self.model_file) # acl环境初始化基类负责类的实例化 - self.name = "人员入侵-yolov5" + self.name = "人员入侵-yolov10" self.version = "V1.0" self.model_type = 2 self.neth = 640 # 缩放的目标高度, 也即模型的输入高度 self.netw = 640 # 缩放的目标宽度, 也即模型的输入宽度 + self.conf_threshold = threshold # 置信度阈值 self.iou_thres = iou_thres #IOU阈值 - - def verify(self,image,data,isdraw=1): - labels_dict = get_labels_from_txt('/mnt/zfbox/model/plugins/RYRQ_ACL/coco_names.txt') # 得到类别信息,返回序号与类别对应的字典 - # 数据前处理 + def prework(self,image): + '''模型输入图片数据前处理 --- 针对每个模型特有的预处理内容 -''' img, scale_ratio, pad_size = letterbox(image, new_shape=[640, 640]) # 对图像进行缩放与填充 img = img[:, :, ::-1].transpose(2, 0, 1) # BGR to RGB, HWC to CHW #图片在输入时已经做了转换 img = np.ascontiguousarray(img, dtype=np.float32) / 255.0 # 转换为内存连续存储的数组 + return img,scale_ratio, pad_size - # 模型推理, 得到模型输出 - outputs = self.execute([img,])#创建input,执行模型,返回结果 --失败返回None - - filtered_pred_all = None + def postwork(self,image,outputs,scale_ratio,pad_size,check_area,polygon,conf_threshold,iou_thres): + ''' + 针对每个模型特有的后处理内容 + :param image: + :param outputs: + :param scale_ratio: + :param pad_size: + :param check_area: + :param polygon: + :param conf_threshold: + :param iou_thres: + :return: + ''' bwarn = False warn_text = "" # 是否有检测区域,有先绘制检测区域 由于在该函数生成了polygon对象,所有需要在检测区域前调用。 - if data[1] == 1: - self.draw_polygon(image, data[2], (255, 0, 0)) + if check_area == 1: + self.draw_polygon(image, polygon, (255, 0, 0)) if outputs: - output = outputs[0] #只放了一张图片 + output = outputs[0] # 只放了一张图片 -- #是否能批量验证? # 后处理 -- boxout 是 tensor-list: [tensor([[],[].[]])] --[x1,y1,x2,y2,置信度,coco_index] # 利用非极大值抑制处理模型输出,conf_thres 为置信度阈值,iou_thres 为iou阈值 output_torch = torch.tensor(output) - boxout = nms(output_torch, conf_thres=0.4,iou_thres=0.5) + boxout = nms(output_torch, conf_thres=conf_threshold, iou_thres=iou_thres) del output_torch pred_all = boxout[0].numpy() # 转换为numpy数组 -- [[],[],[]] --[x1,y1,x2,y2,置信度,coco_index] # pred_all[:, :4] 取所有行的前4列,pred_all[:,1]--第一列 scale_coords([640, 640], pred_all[:, :4], image.shape, ratio_pad=(scale_ratio, pad_size)) # 将推理结果缩放到原始图片大小 - #过滤掉不是目标标签的数据 -- 序号0-- person + + # 过滤掉不是目标标签的数据 -- 序号0-- person self.labels_dict --这个考虑下是否可以放到nms前面#? filtered_pred_all = pred_all[pred_all[:, 5] == 0] # 绘制检测结果 --- 也需要封装在类里, for pred in filtered_pred_all: x1, y1, x2, y2 = int(pred[0]), int(pred[1]), int(pred[2]), int(pred[3]) # # 绘制目标识别的锚框 --已经在draw_bbox里处理 # cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2) - if data[1] == 1: # 指定了检测区域 + if check_area == 1: # 指定了检测区域 x_center = (x1 + x2) / 2 y_center = (y1 + y2) / 2 - #绘制中心点? + # 绘制中心点? cv2.circle(image, (int(x_center), int(y_center)), 5, (0, 0, 255), -1) - #判断是否区域点 + # 判断是否区域点 if not self.is_point_in_region((x_center, y_center)): - continue #没产生报警-继续 - #产生报警 -- 有一个符合即可 + continue # 没产生报警-继续 + # 产生报警 -- 有一个符合即可 bwarn = True warn_text = "People Intruder detected!" - draw_bbox(filtered_pred_all, image, (0, 255, 0), 2, labels_dict) # 画出检测框、类别、概率 + draw_bbox(filtered_pred_all, image, (0, 255, 0), 2, self.labels_dict) # 画出检测框、类别、概率 # 清理内存 del outputs, output del boxout - del pred_all,filtered_pred_all + del pred_all, filtered_pred_all + #图片绘制是在原图生效 image + return bwarn, warn_text + + def verify(self,image,data,isdraw=1): + # 数据前处理 + img, scale_ratio, dw,dh = letterbox(image, new_shape=[self.netw, self.neth]) # 对图像进行缩放与填充 + img = img[:, :, ::-1].transpose(2, 0, 1) # BGR to RGB, HWC to CHW #图片在输入时已经做了转换 + img = np.ascontiguousarray(img, dtype=np.float32) / 255.0 # 转换为内存连续存储的数组 + + # 模型推理, 得到模型输出 + outputs = self.execute([img,])#创建input,执行模型,返回结果 --失败返回None + + filtered_pred_all = None + bwarn = False + warn_text = "" + # 是否有检测区域,有先绘制检测区域 由于在该函数生成了polygon对象,所有需要在检测区域前调用。 + if data[1] == 1: + self.draw_polygon(image, data[2], (255, 0, 0)) + if outputs: + output = np.squeeze(outputs[0]) #移除张量为1的维度 --暂时不明白其具体意义 + pred_all = non_max_suppression_v10(output,self.conf_threshold,scale_ratio,dw,dh) + + for xmin, ymin, xmax, ymax, confidence, label in pred_all: + # # 绘制目标识别的锚框 --已经在draw_bbox里处理 + # cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2) + draw_box(image, [xmin, ymin, xmax, ymax], confidence, label) # 画出检测框、类别、概率 + if label == 0: # person + #判断是否产生告警 + x1, y1, x2, y2 = int(xmin), int(ymin), int(xmax), int(ymax) + if data[1] == 1: # 指定了检测区域 + x_center = (x1 + x2) / 2 + y_center = (y1 + y2) / 2 + # 绘制中心点? + cv2.circle(image, (int(x_center), int(y_center)), 5, (0, 0, 255), -1) + # 判断是否区域点 + if not self.is_point_in_region((x_center, y_center)): + continue # 没产生报警-继续 + # 产生报警 -- 有一个符合即可 + bwarn = True + warn_text = "People Intruder detected!" + # 清理内存 + del outputs, output + del pred_all,filtered_pred_all #cv2.imwrite('img_res.png', img_dw) return bwarn, warn_text diff --git a/myutils/MyDeque.py b/myutils/MyDeque.py index 9c65172..2b53825 100644 --- a/myutils/MyDeque.py +++ b/myutils/MyDeque.py @@ -3,9 +3,18 @@ from threading import Thread, Lock class MyDeque: def __init__(self,maxlen=1): + self.len = maxlen self.dq = deque(maxlen=maxlen) self.lock = Lock() + def __del__(self): + del self.dq + + def isfull(self): + if len(self.dq) == self.len: + return True + return False + def myappend(self,object): with self.lock: self.dq.append(object) @@ -15,4 +24,8 @@ class MyDeque: with self.lock: if self.dq: object = self.dq.popleft() - return object \ No newline at end of file + return object + + def myclear(self): + with self.lock: + self.dq.clear() \ No newline at end of file diff --git a/myutils/myutil.py b/myutils/myutil.py new file mode 100644 index 0000000..e69de29 diff --git a/web/API/viedo.py b/web/API/viedo.py index b9bc518..53c97af 100644 --- a/web/API/viedo.py +++ b/web/API/viedo.py @@ -7,6 +7,8 @@ from myutils.ConfigManager import myCongif import logging import time import subprocess +from concurrent.futures import ThreadPoolExecutor +import threading from collections import deque # 配置日志 @@ -15,6 +17,9 @@ logger = logging.getLogger(__name__) #-------------------基于WEBRTC实现拉流 #pcs = set() #创建一个空的集合,去重复且无序 pcs = {} +active_tasks = {} # 用来存储每个channel的任务 +executor = ThreadPoolExecutor(max_workers=4) + ''' ---------------------传输-------------------------- @@ -139,6 +144,9 @@ async def handle_channel_rtf(channel_id,websocket): finally: process.terminate() +def send_frame_thread(channel_id,websocket): + pass + async def handle_channel(channel_id,websocket): verify_rate = int(myCongif.get_data("verify_rate")) error_max_count = verify_rate * int(myCongif.get_data("video_error_count")) # 视频帧捕获失败触发提示的上限 @@ -150,6 +158,16 @@ async def handle_channel(channel_id,websocket): #视频传输缓冲区 #frame_buffer = deque(maxlen=10) try: + cnode = mMM.verify_list.get_channel_data(channel_id) + if cnode is None: + print("---channel_id--错误--") + return + + frame_count = 0 + start_time = time.time() + all_time = 0 + get_all_time = 0 + send_all_time = 0 while True: # 这里的多线程并发,还需要验证检查 # 帧率控制帧率 ---输出暂时不控制帧率,有就传输 current_time = time.time() @@ -158,11 +176,20 @@ async def handle_channel(channel_id,websocket): await asyncio.sleep(frame_interval - elapsed_time) # 若小于间隔时间则休眠 last_frame_time = time.time() # 执行视频传输 - frame = mMM.verify_list.cm_get_last_frame(channel_id) # get_last_frame 用了try + #frame = mMM.verify_list.cm_get_last_frame(channel_id) # get_last_frame 用了try + get_stime = time.time() + frame = cnode.get_last_frame() + get_etime = time.time() + get_all_time = get_all_time + (get_etime - get_stime) if frame is not None: # frame_buffer.append(frame) #先放入缓冲区 + #7print("KB---",len(frame)/1024) icount = 0 + + send_stime = time.time() await websocket.send(frame) + send_etime = time.time() + send_all_time = send_all_time + (send_etime - send_stime) else: # print("frame is None") icount += 1 @@ -172,6 +199,26 @@ async def handle_channel(channel_id,websocket): error_message = b"video_error" await websocket.send(error_message) await asyncio.sleep(sleep_time) # 等待视频重连时间 + + #----------输出时间----------- + frame_count += 1 + end_time = time.time() + # 计算时间差 + el_time = end_time - start_time + all_time = all_time + (end_time - current_time) + # 每隔一定时间(比如5秒)计算一次帧率 + if el_time >= 10: + fps = frame_count / el_time + print(f"当前帧率: {fps} FPS,循环次数:{frame_count},花费总耗时:{all_time}S,get耗时:{get_all_time},send耗时:{send_all_time}") + # 重置计数器和时间 + frame_count = 0 + all_time = 0 + get_all_time = 0 + send_all_time = 0 + start_time = time.time() + # print(f"get_frame:{round(get_etime-get_stime,5)}Sceond;" + # f"send_frame:{round(send_etime-send_stime,5)}Sceond;" + # f"All_time={round(end_time-current_time,5)}") except asyncio.CancelledError: print(f"WebSocket connection for channel {channel_id} closed by client") raise @@ -184,12 +231,29 @@ async def handle_channel(channel_id,websocket): @api.websocket('/ws/video_feed/') async def ws_video_feed(channel_id): print(f"New connection for channel: {channel_id}") - # 为每个通道创建独立的协程 - shendtask = asyncio.create_task(handle_channel(channel_id, websocket)) - #shendtask = asyncio.create_task(handle_channel_rtf(channel_id, websocket)) - # 等待协程完成 - await shendtask + if channel_id in active_tasks: + active_tasks[channel_id].cancel() + try: + await active_tasks[channel_id] # 确保旧任务被完全取消 + del active_tasks[channel_id] + except asyncio.CancelledError: + print(f"旧任务 {channel_id} 已取消") + try: + # 为每个通道创建独立的协程 + shendtask = asyncio.create_task(handle_channel(channel_id, websocket)) + #shendtask = asyncio.create_task(handle_channel_rtf(channel_id, websocket)) + # 将任务存储到 active_tasks + active_tasks[channel_id] = shendtask + # 等待协程完成 + await shendtask + except Exception as e: + print(f"Channel {channel_id} 出现异常: {e}") + finally: + # 移除已完成的任务 + if channel_id in active_tasks: + del active_tasks[channel_id] + print(f"Cleaning up resources for channel {channel_id}") @api.route('/shutdown', methods=['POST']) async def shutdown():#这是全关 --需要修改