# 导入代码依赖 import time import av import os import cv2 import torch import numpy as np import threading import importlib.util from core.DBManager import mDBM,DBManager from myutils.MyLogger_logger import LogHandler from myutils.ConfigManager import myCongif class VideoCaptureWithFPS: def __init__(self, src=0, target_fps=10): self.cap = cv2.VideoCapture(src) self.target_fps = target_fps self.frame_interval = 1.0 / target_fps self.last_frame_time = time.time() def read(self): current_time = time.time() elapsed_time = current_time - self.last_frame_time if elapsed_time < self.frame_interval: #小于间隔时间会休眠 time.sleep(self.frame_interval - elapsed_time) self.last_frame_time = time.time() ret, frame = self.cap.read() return ret, frame def release(self): self.cap.release() self.cap = None class ModelManager: def __init__(self): self.verify_list = {} self.bRun = True self.logger = LogHandler().get_logger("ModelManager") # 本地YOLOv5仓库路径 self.yolov5_path = myCongif.get_data("yolov5_path") self.buflen = myCongif.get_data("buffer_len") def _open_view(self,url,itype): #打开摄像头 0--USB摄像头,1-RTSP,2-海康SDK if itype == 0: cap = VideoCaptureWithFPS(int(url)) elif itype == 1: cap = VideoCaptureWithFPS(url) else: raise Exception("视频参数错误!") return cap def _import_model(self,model_name,model_path): '''根据路径,动态导入模块''' if os.path.exists(model_path): module_spec = importlib.util.spec_from_file_location(model_name, model_path) module = importlib.util.module_from_spec(module_spec) module_spec.loader.exec_module(module) else: self.logger.error("{}文件不存在".format(model_path)) return None return module def dowork_thread(self,channel_id): '''一个通道一个线程,关联的模型在一个线程检测''' cap = None #查询关联的模型 --- 在循环运行前把基础数据都准备好 myDBM = DBManager() myDBM.connection() strsql = (f"select t1.model_id,t1.check_area,t1.polygon ,t2.duration_time,t2.proportion,t2.model_path " f"from channel2model t1 left join model t2 on t1.model_id = t2.ID where t1.channel_id ={channel_id};") myModels = myDBM.do_select(strsql) #加载模型 --- 是不是要做个限制,一个视频通道关联算法模块的上限 --- 关联多了一个线程执行耗时较多,造成帧率太低,或者再多线程并发 #? myModle_list = [] #存放模型对象List myModle_data = [] #存放检测参数 for model in myModels: #基于基类实例化模块类 m = self._import_model("",model[5]) #动态加载模型 -- 待完善 myModle_list.append(m) myModle_data.append(model) #开始循环检测 #print(mydata[0],mydata[1],mydata[2],mydata[3]) # url type tag img_buffer #[url,type,True,img_buffer] while self.verify_list[channel_id][2]: #基于tag 作为运行标识。 线程里只是读,住线程更新,最多晚一轮,应该不用线程锁。需验证 if not cap: #还没连接视频源 try: cap = self._open_view(self.verify_list[channel_id][0],self.verify_list[channel_id][1]) except: self.logger.error("参数错误,终止线程") return ret,frame = cap.read() if not ret: self.logger.warning("view disconnected. Reconnecting...") cap.release() cap = None time.sleep(myCongif.get_data("cap_sleep_time")) continue #没读到画面继续 #图像处理 img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # img = frame.to_ndarray(format="bgr24") #检查布防时间是否在布防时间内 # 使用 模型 进行目标检测 i_warn_count = 0 for i in range(len(myModle_list)):#如果比较多可以考虑做线程进行检测 model = myModle_list[i] data = myModle_data[i] # 进行模型检测 detections,bwarn,warntext = model.verify(img,data) #对识别结果要部要进行处理 if bwarn: # 整个识别有产生报警 # 绘制报警文本 cv2.putText(img, 'Intruder detected!', (50, (i_warn_count+1)*50), cv2.FONT_HERSHEY_SIMPLEX, 1,(0, 0, 255), 2) i_warn_count += 1 # 保存报警信息? --- 待完成 self.save_warn() # 推送报警? --- 待完成 self.send_warn() # 将检测结果图像转换为帧 -- 需要确认前面对img的处理都是累加的。 new_frame = av.VideoFrame.from_ndarray(img, format="rgb24") # 分析图片放入内存中 --- #new_frame = cv2.resize(new_frame, (640, 480)) # 降低视频分辨率 self.verify_list[channel_id][3].append(new_frame) if len(self.verify_list[channel_id]) > self.buflen: # 保持缓冲区大小不超过10帧 self.verify_list[channel_id][3].pop(0) self.logger.debug("drop one frame!") def start_work(self,channel_id=0): '''算法模型是在后台根据画面实时分析的''' if channel_id ==0: strsql = "select id,ulr,type from channel where is_work = 1;" #要考虑布防和撤防开关的的调整 else: strsql = f"select id,ulr,type from channel where is_work = 1 and id = {channel_id};" #单通道启动检测线程 datas = mDBM.do_select(strsql) for data in datas: img_buffer = [] run_data = [data[1],data[2],True,img_buffer] self.verify_list[data[0]] = run_data #需要验证重复情况 th_chn = threading.Thread(target=self.dowork_thread, args=(data[0],)) #一个视频通道一个线程,线程句柄暂时部保留 th_chn.start() def stop_work(self,channel_id=0): if channel_id ==0: #所有线程停止 for data in self.verify_list: data[2] = False del data[3] time.sleep(2) self.verify_list.clear() else: data = self.verify_list[channel_id] data[2] = False del data[3] time.sleep(1) del self.verify_list[channel_id] if __name__ == "__main__": ModelManager().start_work()