import threading from collections import deque import numpy as np import time import copy import queue import cv2 from datetime import datetime, timedelta from myutils.ConfigManager import myCongif from core.WarnManager import WarnData from myutils.MyLogger_logger import LogHandler #--2024-7-12调整规则,一个视频通道只允许配置一个算法模型,一个算法模型可以配置给多路通道 class ChannelData: def __init__(self, channel_id,str_url, int_type, bool_run, deque_length,icount_max): self.channel_id = channel_id self.str_url = str_url #视频源地址 self.int_type = int_type #视频源类型,0-usb,1-rtsp,2-hksdk self.bool_run = bool_run #线程运行标识 self.bModel = False #该通道是否有模型标识 self.frame_interval = 1.0 / int(myCongif.get_data("verify_rate")) self.mylogger = LogHandler().get_logger("ChannelData") #通道-模型推理输入数据 self.cap = None self.in_check_area = None self.in_polygon = None self.last_frame_time = time.time() self.preimg_q = queue.Queue(maxsize=myCongif.get_data("q_size")) #这是预处理后的图片--模型推理使用 self.img_q = queue.Queue(maxsize=myCongif.get_data("q_size")) # 原图-后处理使用 #图片的缩放比例 self.cale_ratio = None self.pad_size = None self.schedule = None # 布放计划 self.result = None # 结果记录 self.proportion = None #报警占比 self.warn_save_count = 0 #保存录像的最新帧初始化为0 self.model_name = None #关联模型的名字 self.warn_last_time = time.time() #通道-模型推理数据 self.last_infer_time = time.time() self.infer_img_q = queue.Queue(maxsize=myCongif.get_data("q_size")) # 原图-后处理使用 #通道-模型推理输出数据 self.last_out_time = time.time() self.output_q = queue.Queue(maxsize=myCongif.get_data("q_size")) self.deque_frame = deque(maxlen=deque_length) #推理后的缓冲区数据 self.counter = 0 # 帧序列号--保存报警录像使用 self.icount_max = icount_max # 帧序列号上限 self.last_frame = None # 保存图片数据 方案一 self.frame_queue = queue.Queue(maxsize=1) #保持图片数据 方案二 self.lock = threading.RLock() # 用于保证线程安全 -- 输出数据锁 self.mMM = None #ModelManager实例对象 def __del__(self): self.cap.release() #停止视频采集线程 def cleardata(self): #清理数据 pass '''输入数据相关函数''' def set_in_data(self,in_check_area,in_polygon): self.in_check_area = in_check_area self.in_polygon = in_polygon def set_in_cale_ratio(self,cale_ratio,pad_size): if self.cale_ratio is None: self.cale_ratio = cale_ratio self.pad_size = pad_size '''输出数据相关函数''' #添加一帧图片 def add_deque(self, value): self.deque_frame.append(value) #deque 满了以后会把前面的数据移除 #拷贝一份数据 def copy_deque(self): return copy.deepcopy(self.deque_frame) #获取最后一帧图片 def get_last_frame(self): if self.bModel: #print("取画面") # with self.lock: # frame = self.last_frame if not self.frame_queue.empty(): return self.frame_queue.get() else: return None else: ret,img = self.cap.read() if not ret: return None #img_bgr_ndarray = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) # 在线程里面完成应该可以减少网页端处理时间 ret, frame_bgr_webp = cv2.imencode('.jpg', img) if not ret: frame = None else: frame = frame_bgr_webp.tobytes() return frame def update_last_frame(self,buffer): if buffer: # with self.lock: # self.last_frame = None # self.last_frame = buffer if not self.frame_queue.full(): self.frame_queue.put(buffer) else: self.frame_queue.get() # 丢弃最旧的帧 self.frame_queue.put(buffer) #帧序列号自增 一个线程中处理,不用加锁 def increment_counter(self): self.counter += 1 if self.counter > self.icount_max: self.counter = 0 def get_counter(self): return self.counter #清空数据,主要是删除deque 和 last_frame def clear(self): with self.lock: self.bool_run = False time.sleep(1) #休眠一秒,等待通道对应的子线程,停止工作。 self.deque_frame.clear() self.last_frame = None def stop_run(self): self.bool_run = False def th_sleep(self,frame_interval,last_time): # 控制帧率 -- 推理帧率必须所有模型一致,若模型推理耗时一致,该方案还算可以。 current_time = time.time() elapsed_time = current_time - last_time if elapsed_time < frame_interval: time.sleep(frame_interval - elapsed_time) # 若小于间隔时间则休眠 def is_in_schedule(self): '''判断当前时间是否在该通道的工作计划时间内''' # 验证检测计划,是否在布防时间内 now = datetime.now() # 获取当前日期和时间 weekday = now.weekday() # 获取星期几,星期一是0,星期天是6 hour = now.hour if self.schedule[weekday][hour] == 1: # 不在计划则不进行验证,直接返回图片 return 0 else: next_hour = (now + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) seconds_until_next_hour = (next_hour - now).seconds return seconds_until_next_hour def th_prework(self,model): last_pre_time = time.time() while self.bool_run: #start_do_time = time.time() # 控制帧率 -- 推理帧率必须所有模型一致, self.th_sleep(self.frame_interval, last_pre_time) last_pre_time = time.time() #判断是否在布防计划内 sleep_time = self.is_in_schedule() if sleep_time == 0: #判断是否工作计划内-- 后来判断下是否推理和后处理线程需要休眠 self.bModel = True else: self.bModel = False time.sleep(sleep_time) #工作计划以小时为单位,休息到该小时结束 continue # 读取图片进行推理 ret, img = self.cap.read() if not ret: continue #img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) #这个要确认下什么模式输入进预处理 #? preimg,scale_ratio, pad_size = model.prework(img) #子类实现-根据每个模型的需要进行预处理 if not self.preimg_q.full(): self.preimg_q.put(preimg) self.img_q.put(img) self.set_in_cale_ratio(scale_ratio, pad_size) else: self.mylogger.debug("preimg_q--预处理队列满了! infer线程处理过慢!") #end_do_time = time.time() # # 计算执行时间(秒) # execution_time = end_do_time - start_do_time # # 输出执行时间 # print(f"预处理代码执行时间为:{execution_time:.6f} 秒") def th_postwork(self,model): warn_interval = int(myCongif.get_data("warn_interval")) last_out_time = time.time() while self.bool_run: #do_strat_time = time.time() # # 控制帧率 , # self.th_sleep(self.frame_interval, last_out_time) # last_out_time = time.time() #执行后处理 output = self.output_q.get()#为空时会阻塞在get img = self.infer_img_q.get() # 子类实现--具体的报警逻辑 filtered_pred_all, bwarn, warn_text = model.postwork(img, output, self.in_check_area,self.in_polygon, self.cale_ratio,self.pad_size) # img的修改应该在原内存空间修改的 self.result.pop(0) # 先把最早的结果推出数组,保障结果数组定长 if bwarn: # 绘制报警文本 #cv2.putText(img, warn_text, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) self.result.append(1) else: # 没有产生报警也需要记录,统一计算占比 self.result.append(0) # 在线程里面完成应该可以减少网页端处理时间 ret, frame_bgr_webp = cv2.imencode('.jpg', img) if not ret: buffer_bgr_webp = None else: buffer_bgr_webp = frame_bgr_webp.tobytes() # 分析图片放入内存中 self.add_deque(img) self.increment_counter() # 帧序列加一 # 一直更新最新帧,提供网页端显示 self.update_last_frame(buffer_bgr_webp) # print(f"{channel_id}--Frame updated at:",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) if bwarn: count_one = float(sum(self.result)) # 1,0 把1累加的和就是1的数量 ratio_of_ones = count_one / len(self.result) # self.logger.debug(result) if ratio_of_ones >= self.proportion: # 触发报警 # 基于时间间隔判断 current_time = time.time() elapsed_time = current_time - self.warn_last_time if elapsed_time < warn_interval: continue self.warn_last_time = current_time # 处理报警 warn_data = WarnData() warn_data.model_name = self.model_name warn_data.warn_text = warn_text warn_data.img_buffer = self.copy_deque() #深度复制缓冲区 warn_data.width = self.cap.width warn_data.height = self.cap.height warn_data.channel_id = self.channel_id self.mMM.add_warm_data(warn_data) # model_name = self.model_name # w_s_count = self.warn_save_count # 上次保存的缓冲帧序号 # buffer_count = self.get_counter() # # 线程? # self.mMM.save_warn(model_name, w_s_count, buffer_count, self.copy_deque(), # self.cap.width, self.cap.height, self.channel_id, # self.mMM.FPS, self.mMM.fourcc) # self.mMM.send_warn() # # 更新帧序列号 # self.warn_save_count = buffer_count # 结果记录要清空 for i in range(len(self.result)): self.result[i] = 0 # do_end_time = time.time() # # 计算执行时间(秒) # execution_time = do_end_time - do_strat_time # # 输出执行时间 # print(f"*************************************后处理代码执行时间为:{execution_time:.6f} 秒") #执行预和后处理线程,每个通道一个 def start_channel_thread(self,mMM,model): self.mMM = mMM th_pre = threading.Thread(target=self.th_prework, args=(model,)) # 一个视频通道一个线程,线程句柄暂时部保留 th_pre.start() th_post = threading.Thread(target=self.th_postwork,args=(model,)) # 一个视频通道一个线程,线程句柄暂时部保留 th_post.start() class ChannelManager: def __init__(self): self.channels = {} self.lock = threading.RLock() # 用于保证字典操作的线程安全 #增加节点 def add_channel(self, channel_id, str_url, int_type, bool_run, deque_length=10,icount_max=100000): with self.lock: if channel_id in self.channels: #若已经有数据,先删除后再增加 self.channels[channel_id].clear() # 手动清理资源 del self.channels[channel_id] ch_data = ChannelData(channel_id,str_url, int_type, bool_run, deque_length,icount_max) self.channels[channel_id] = ch_data return ch_data #删除节点 def delete_channel(self, channel_id): with self.lock: if channel_id in self.channels: self.channels[channel_id].clear() # 手动清理资源 del self.channels[channel_id] #获取节点 def get_channel(self, channel_id): with self.lock: return self.channels.get(channel_id) #停止工作线程 def stop_channel(self,channel_id): with self.lock: if channel_id == 0: for clannel_id,clannel_data in self.channels.items(): clannel_data.clear() del self.channels else: if channel_id in self.channels: self.channels[channel_id].clear() # 手动清理资源 del self.channels[channel_id] if __name__ == "__main__": # 示例使用 manager = ChannelManager() manager.add_channel('channel_1', 'test', 123, True, deque_length=5) # 更新和读取操作 manager.update_channel_deque('channel_1', 'data1') manager.update_channel_buffer('channel_1', np.array([[1, 2], [3, 4]])) manager.increment_channel_counter('channel_1') channel_data = manager.get_channel_data('channel_1') print(channel_data)