# 导入代码依赖 import time import av import os import cv2 import numpy as np import threading import importlib.util import datetime import math import copy import queue from collections import deque from core.DBManager import mDBM,DBManager from myutils.MyLogger_logger import LogHandler from myutils.ConfigManager import myCongif from model.plugins.ModelBase import ModelBase from core.ChannelManager import ChannelManager from core.ACLModelManager import ACLModeManger from core.WarnManager import WarnManager,WarnData from PIL import Image class VideoCaptureWithFPS: '''视频捕获的封装类,是一个通道一个''' def __init__(self, source): self.source = source self.width = None self.height = None # GStreamer #rtsp_stream = f"rtspsrc location={self.source} ! decodebin ! videoconvert ! appsink" pipeline = ( f"rtspsrc location={self.source} latency=0 ! " "rtph264depay ! " "h264parse ! " "avdec_h264 ! " # 使用 avdec_h264 代替其他解码器 "videoscale ! " "video/x-raw,width=640,height=480,framerate=10/1 ! " # 降低分辨率和帧率 "videoconvert ! " "appsink" ) self.cap = cv2.VideoCapture(pipeline, cv2.CAP_GSTREAMER) # opencv # self.cap = cv2.VideoCapture(self.source) if self.cap.isOpened(): #若没有打开成功,在读取画面的时候,已有判断和处理 -- 这里也要检查下内存的释放情况 self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) print(self.width,self.height) #self.fps = fps # 线程保持最大帧率的刷新画面---过高的帧率会影响CPU性能,但过地的帧率会造成帧积压 self.fps = math.ceil(self.cap.get(cv2.CAP_PROP_FPS)/float(myCongif.get_data("verify_rate"))) #向上取整。 #print(self.fps) self.running = True #self.frame_queue = queue.Queue(maxsize=1) self.frame = None self.read_lock = threading.Lock() self.thread = threading.Thread(target=self.update) self.thread.start() def update(self): icount = 0 while self.running: ret, frame = self.cap.read() if not ret: icount += 1 if icount > 5: #重连 self.cap.release() self.cap = cv2.VideoCapture(self.source) if self.cap.isOpened(): self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) print(self.width,self.height) # self.fps = fps # 线程保持最大帧率的刷新画面---过高的帧率会影响CPU性能,但过地的帧率会造成帧积压 self.fps = math.ceil( self.cap.get(cv2.CAP_PROP_FPS) / float(myCongif.get_data("verify_rate"))) # 向上取整。 icount = 0 else: sleep_time = myCongif.get_data("cap_sleep_time") print(f"{self.source}视频流,将于{sleep_time}秒后重连!") time.sleep(sleep_time) continue #resized_frame = cv2.resize(frame, (int(self.width / 2), int(self.height / 2))) with self.read_lock: self.frame = frame # if not self.frame_queue.full(): # self.frame_queue.put(resized_frame) # 跳过指定数量的帧以避免积压 for _ in range(self.fps): self.cap.grab() # time.sleep(self.fps) #按照视频源的帧率进行休眠 #print("Frame updated at:",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) def read(self): with self.read_lock: frame = self.frame.copy() if self.frame is not None else None if frame is not None: return True, frame else: return False, None # if not self.frame_queue.empty(): # return True, self.frame_queue.get() # else: # return False, None def release(self): self.running = False self.thread.join() self.cap.release() class ModelManager: def __init__(self): self.verify_list = ChannelManager() #模型的主要数据 -- 2024-7-5修改为类管理通道数据 self.bRun = True self.logger = LogHandler().get_logger("ModelManager") # 本地YOLOv5仓库路径 self.yolov5_path = myCongif.get_data("yolov5_path") #self.buflen = myCongif.get_data("buffer_len") self.icout_max = myCongif.get_data("RESET_INTERVAL") #跟视频帧序用一个变量 self.frame_rate = myCongif.get_data("frame_rate") self.frame_interval = 1.0 / int(myCongif.get_data("verify_rate")) #保存视频相关内容 self.FPS = myCongif.get_data("verify_rate") # 视频帧率--是否能实现动态帧率 self.fourcc = cv2.VideoWriter_fourcc(*'mp4v') # 使用 mp4 编码 #基于模型运行环境进行相应初始化工作 self.model_platform = myCongif.get_data("model_platform") self.device_id = myCongif.get_data("device_id") # acl初始化 -- 一个线程一个 -- 需要验证 if self.model_platform == "acl": ACLModeManger.init_acl(self.device_id) #acl -- 全程序初始化 self.model_dic = {} # model_id model # 报警处理线程-全进程独立一个线程处理 self.warnM = None def __del__(self): self.logger.debug("释放资源") del self.verify_list #应该需要深入的删除--待完善 if self.model_platform == "acl": ACLModeManger.del_acl(self.device_id) #acl -- 全程序反初始化 需要确保在执行析构前,其它资源已释放 def _open_view(self,url,itype): #打开摄像头 0--USB摄像头,1-RTSP,2-海康SDK if itype == 0: cap = VideoCaptureWithFPS(int(url)) elif itype == 1: cap = VideoCaptureWithFPS(url) else: raise Exception("视频参数错误!") return cap def _import_model(self,model_name,model_path,threshold): ''' 根据路径,动态导入模块 :param model_name: 模块名称 :param model_path: 模块路径 :param threshold: 置信阈值 :return: ''' if os.path.exists(model_path): module_spec = importlib.util.spec_from_file_location(model_name, model_path) if module_spec is None: self.logger.error(f"{model_path} 加载错误") return None module = importlib.util.module_from_spec(module_spec) module_spec.loader.exec_module(module) md = getattr(module, "Model")(model_path,threshold) #实例化类 if not isinstance(md, ModelBase): self.logger.error("{} not zf_model".format(md)) return None if md.init_ok == False: self.logger.error("离线模型加载初始化失败!") return None else: self.logger.error("{}文件不存在".format(model_path)) return None self.logger.debug(f"{model_path} 加载成功!!!!") return md def getschedule(self,c2m_id,myDBM): ''' 根据c2mID 查询该算法的布防时间 :param c2m_id: :return: 以day为行,hour作为列的,布防标识值二维list ''' strsql = f"select day,hour,status from schedule where channel2model_id ={c2m_id} order by hour asc,day asc;" datas = myDBM.do_select(strsql) onelist = [] twolist = [] threelist = [] fourlist = [] fivelist = [] sixlist = [] sevenlist = [] if datas: for i in range(24): onelist.append(datas[i*7 + 0][2]) twolist.append(datas[i*7 + 1][2]) threelist.append(datas[i*7 + 2][2]) fourlist.append(datas[i*7 + 3][2]) fivelist.append(datas[i*7 + 4][2]) sixlist.append(datas[i*7 + 5][2]) sevenlist.append(datas[i*7 + 6][2]) else: self.logger.debug(f"没有数据--{c2m_id}") onelist = [1]*24 twolist = [1]*24 threelist = [1]*24 fourlist = [1]*24 fivelist = [1]*24 sixlist = [1]*24 sevenlist = [1]*24 schedule_list = [] schedule_list.append(onelist) schedule_list.append(twolist) schedule_list.append(threelist) schedule_list.append(fourlist) schedule_list.append(fivelist) schedule_list.append(sixlist) schedule_list.append(sevenlist) return schedule_list def set_last_img(self,): pass def verify(self,frame,model,model_data,channel_id,schedule,result,isdraw=1): '''验证执行主函数,实现遍历通道关联的模型,调用对应模型执行验证''' #img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) img = frame #验证检测计划,是否在布防时间内 now = datetime.datetime.now() # 获取当前日期和时间 weekday = now.weekday() # 获取星期几,星期一是0,星期天是6 hour = now.hour result.pop(0) # 保障结果数组定长 --先把最早的结果推出数组 detections = None bwarn = False warntext = "" if model and schedule[weekday][hour] == 1: #不在计划则不进行验证,直接返回图片 # 调用模型,进行检测,model是动态加载的,具体的判断标准由模型内执行 ---- ********* #isverify = True detections, bwarn, warntext = model.verify(img, model_data,isdraw) #****************重要 # 对识别结果要部要进行处理 if bwarn: # 绘制报警文本 cv2.putText(img, warntext, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) result.append(1) #要验证数组修改,是地址修改吗? else: #没有产生报警也需要记录,统一计算占比 warntext = "" result.append(0) else: result.append(0) # 将检测结果图像转换为帧--暂时用不到AVFrame--2024-7-5 # new_frame_rgb_avframe = av.VideoFrame.from_ndarray(img, format="rgb24") # AVFrame # new_frame_rgb_avframe.pts = None # 添加此行确保有pts属性 # if isinstance(img, np.ndarray): -- 留个纪念 #处理完的图片后返回-bgr模式 #img_bgr_ndarray = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) # 将检查结果转换为WebP格式图片 --在线程里面完成应该可以减少网页端处理时间 ret,frame_bgr_webp=cv2.imencode('.jpg', img) if not ret: buffer_bgr_webp = None else: buffer_bgr_webp = frame_bgr_webp.tobytes() return buffer_bgr_webp,img,warntext def dowork_thread(self,channel_id): '''一个通道一个线程,关联的模型在一个线程检测,局部变量都是一个通道独有''' channel_data = self.verify_list.get_channel(channel_id) #是对ChannelData 对象的引用 context = None # 线程ACL初始化 if self.model_platform == "acl": # ACL线程中初始化内容 context = ACLModeManger.th_inti_acl(self.device_id) #查询关联的模型 --- 在循环运行前把基础数据都准备好 myDBM = DBManager() myDBM.connect() strsql = (f"select t1.model_id,t1.check_area,t1.polygon ,t2.duration_time,t2.proportion,t2.model_path,t1.ID," f"t2.model_name,t1.conf_threshold " f"from channel2model t1 left join model t2 on t1.model_id = t2.ID where t1.channel_id ={channel_id};") #print(strsql) model = myDBM.do_select(strsql,1) #2024-7-12调整规则,一个通道只关联一个模型,表结构暂时不动 if len(model) ==0: print(f"{channel_id}视频通道没有关联模型,结束线程!") return #基于基类实例化模块类 m = self._import_model("",model[5],model[8]) #动态加载模型处理文件py schedule = self.getschedule(model[6], myDBM) result = [0 for _ in range(model[3] * myCongif.get_data("verify_rate"))] # 初始化时间*验证帧率数量的结果list #model[6] -- c2m_id --布防计划 0-周一,6-周日 warn_last_time = time.time() proportion = model[4] #判断是否报警的占比 warn_save_count = 0 #保存录像的最新帧初始化为0 #开始拉取画面循环检测 cap = channel_data.cap last_frame_time = time.time() #初始化个读帧时间 #可以释放数据库资源 del myDBM warn_interval = myCongif.get_data("warn_interval") while channel_data.bool_run: #基于tag 作为运行标识。 线程里只是读,住线程更新,最多晚一轮,应该不用线程锁。需验证 # 帧率控制帧率 current_time = time.time() elapsed_time = current_time - last_frame_time if elapsed_time < self.frame_interval: time.sleep(self.frame_interval - elapsed_time) #若小于间隔时间则休眠 last_frame_time = time.time() #*********取画面************* ret,frame = cap.read() #除了第一帧,其它应该都是有画面的 if not ret: continue #没读到画面继续 #执行图片推理 buffer_bgr_webp,img_bgr_ndarray,warn_text = self.verify(frame,m,model,channel_id,schedule,result) #分析图片放入内存中 channel_data.add_deque(img_bgr_ndarray) # 缓冲区大小由maxlen控制 超上限后,删除最前的数据 #channel_data.increment_counter() #帧序列加一 # 一直更新最新帧,提供网页端显示 channel_data.update_last_frame(buffer_bgr_webp) #print(f"{channel_id}--Frame updated at:",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) if warn_text: #验证result -是否触发报警要求 --遍历每个模型执行的result count_one = float(sum(result)) #1,0 把1累加的和就是1的数量 ratio_of_ones = count_one / len(result) #self.logger.debug(result) if ratio_of_ones >= proportion: #触发报警 # 基于时间间隔判断 current_time = time.time() elapsed_time = current_time - warn_last_time if elapsed_time < warn_interval: continue warn_last_time = current_time # 处理报警 warn_data = WarnData() warn_data.model_name = model[7] warn_data.warn_text = warn_text warn_data.img_buffer = channel_data.copy_deque() # 深度复制缓冲区 warn_data.width = cap.width warn_data.height = cap.height warn_data.channel_id = channel_id self.warnM.add_warn_data(warn_data) # #更新帧序列号 #加了报警间隔 --buffer的长度小于间隔 # warn_save_count = buffer_count #结果记录要清空 for i in range(len(result)): result[i] = 0 # end_time = time.time() # 结束时间 # print(f"Processing time: {end_time - start_time} seconds") # 本地显示---测试使用 # if channel_id == 2: # cv2.imshow(str(channel_id), img) # if cv2.waitKey(1) & 0xFF == ord('q'): # break #结束线程 cap.release() #视频采集线程结束 if context:#ACL线程中反初始化内容 -- 若线程异常退出,这些资源就不能正常释放了 #先释放每个模型资源 del model #再释放context ACLModeManger.th_del_acl(context) #cv2.destroyAllWindows() def send_warn(self): '''发送报警信息''' pass def save_frame_to_video(self): '''把缓冲区中的画面保存为录像''' pass def start_work(self,channel_id=0): '''算法模型是在后台根据画面实时分析的 1.布防开关需要触发通道关闭和开启 2.布防策略的调整也需要关闭和重启工作 ''' if channel_id ==0: strsql = "select id,ulr,type from channel where is_work = 1;" #执行所有通道 else: strsql = f"select id,ulr,type from channel where is_work = 1 and id = {channel_id};" #单通道启动检测线程 datas = mDBM.do_select(strsql) for data in datas: # channel_id, str_url, int_type, bool_run, deque_length c_data = self.verify_list.add_channel(data[0],data[1],data[2],True, myCongif.get_data("buffer_len"),myCongif.get_data("RESET_INTERVAL")) # 启动该通道的视频捕获线程 --把视频捕获线程,放主线程创建 c_data.cap = self._open_view(c_data.str_url, c_data.int_type) # 创建子线程读画面-把cap给模型就行-- th_chn = threading.Thread(target=self.dowork_thread, args=(data[0],)) #一个视频通道一个线程,线程句柄暂时部保留 th_chn.start() # 启动告警线程 if self.warnM is None: self.warnM = WarnManager() self.warnM.start_warnmanager_th() def stop_work(self,channel_id=0): '''停止工作线程,0-停止所有,非0停止对应通道ID的线程''' self.verify_list.stop_channel(channel_id) if channel_id == 0: #停止告警线程 self.warnM.brun = False def restartC2M(self,channel_id): ''' 修改通道管理的算法模型后需要对该通道算法执行部分重新加载执行 :param channel_id: :return: ''' pass #print(f"Current working directory (ModelManager.py): {os.getcwd()}") mMM = ModelManager() def test1(): print(cv2.getBuildInformation()) source = 'rtsp://192.168.3.44/live1' gstreamer_pipeline = ( f"rtspsrc location={source} protocols=udp latency=0 ! " "rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! appsink" ) cap = cv2.VideoCapture(gstreamer_pipeline, cv2.CAP_GSTREAMER) if not cap.isOpened(): print("Error: Unable to open the video source.") return else: print("Successfully opened the video source.") ret, frame = cap.read() if ret: cv2.imshow('Frame', frame) cv2.waitKey(0) cap.release() cv2.destroyAllWindows() if __name__ == "__main__": mMM.start_work() #test1() print("111") # name = acl.get_soc_name() # count, ret = acl.rt.get_device_count() # print(name,count)