diff --git a/config.yaml b/config.yaml index 219e12b..f1fa27f 100644 --- a/config.yaml +++ b/config.yaml @@ -36,6 +36,7 @@ cap_sleep_time: 120 #5分钟 buffer_len: 100 #分析后画面缓冲区帧数 -- 可以与验证帧率结合确定缓冲区大小 RESET_INTERVAL : 100000 #帧数重置上限 frame_rate : 20 #帧率参考值 -- 后续作用主要基于verify_rate进行帧率控制 -verify_rate : 10 #验证帧率--- 也就是视频输出的帧率 +verify_rate : 8 #验证帧率--- 也就是视频输出的帧率 warn_video_path: /mnt/zfbox/model/warn/ warn_interval: 120 #报警间隔--单位秒 +q_size : 30 #线程队列对长度 \ No newline at end of file diff --git a/core/ChannelManager.py b/core/ChannelManager.py index e92cb59..6cb3dae 100644 --- a/core/ChannelManager.py +++ b/core/ChannelManager.py @@ -4,19 +4,72 @@ import numpy as np import time import copy import queue +import cv2 +from datetime import datetime, timedelta +from myutils.ConfigManager import myCongif +from core.WarnManager import WarnData +from myutils.MyLogger_logger import LogHandler +#--2024-7-12调整规则,一个视频通道只允许配置一个算法模型,一个算法模型可以配置给多路通道 class ChannelData: - def __init__(self, str_url, int_type, bool_run, deque_length,icount_max): + def __init__(self, channel_id,str_url, int_type, bool_run, deque_length,icount_max): + self.channel_id = channel_id self.str_url = str_url #视频源地址 self.int_type = int_type #视频源类型,0-usb,1-rtsp,2-hksdk self.bool_run = bool_run #线程运行标识 - self.deque_frame = deque(maxlen=deque_length) - self.last_frame = None # 保存图片数据 - self.frame_queue = queue.Queue(maxsize=1) - self.counter = 0 #帧序列号--保存报警录像使用 - self.icount_max = icount_max #帧序列号上限 - self.lock = threading.RLock() # 用于保证线程安全 + self.bModel = False #该通道是否有模型标识 + self.frame_interval = 1.0 / int(myCongif.get_data("verify_rate")) + self.mylogger = LogHandler().get_logger("ChannelData") + #通道-模型推理输入数据 + self.cap = None + self.in_check_area = None + self.in_polygon = None + self.last_frame_time = time.time() + self.preimg_q = queue.Queue(maxsize=myCongif.get_data("q_size")) #这是预处理后的图片--模型推理使用 + self.img_q = queue.Queue(maxsize=myCongif.get_data("q_size")) # 原图-后处理使用 + #图片的缩放比例 + self.cale_ratio = None + self.pad_size = None + self.schedule = None # 布放计划 + self.result = None # 结果记录 + self.proportion = None #报警占比 + self.warn_save_count = 0 #保存录像的最新帧初始化为0 + self.model_name = None #关联模型的名字 + self.warn_last_time = time.time() + + #通道-模型推理数据 + self.last_infer_time = time.time() + self.infer_img_q = queue.Queue(maxsize=myCongif.get_data("q_size")) # 原图-后处理使用 + + #通道-模型推理输出数据 + self.last_out_time = time.time() + self.output_q = queue.Queue(maxsize=myCongif.get_data("q_size")) + self.deque_frame = deque(maxlen=deque_length) #推理后的缓冲区数据 + self.counter = 0 # 帧序列号--保存报警录像使用 + self.icount_max = icount_max # 帧序列号上限 + self.last_frame = None # 保存图片数据 方案一 + self.frame_queue = queue.Queue(maxsize=1) #保持图片数据 方案二 + self.lock = threading.RLock() # 用于保证线程安全 -- 输出数据锁 + self.mMM = None #ModelManager实例对象 + + def __del__(self): + self.cap.release() #停止视频采集线程 + + def cleardata(self): #清理数据 + pass + + '''输入数据相关函数''' + def set_in_data(self,in_check_area,in_polygon): + self.in_check_area = in_check_area + self.in_polygon = in_polygon + + def set_in_cale_ratio(self,cale_ratio,pad_size): + if self.cale_ratio is None: + self.cale_ratio = cale_ratio + self.pad_size = pad_size + + '''输出数据相关函数''' #添加一帧图片 def add_deque(self, value): self.deque_frame.append(value) #deque 满了以后会把前面的数据移除 @@ -27,26 +80,37 @@ class ChannelData: #获取最后一帧图片 def get_last_frame(self): - with self.lock: - frame = self.last_frame + if self.bModel: + #print("取画面") + # with self.lock: + # frame = self.last_frame + if not self.frame_queue.empty(): + return self.frame_queue.get() + else: + return None + else: + ret,img = self.cap.read() + if not ret: + return None + #img_bgr_ndarray = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) + # 在线程里面完成应该可以减少网页端处理时间 + ret, frame_bgr_webp = cv2.imencode('.jpg', img) + if not ret: + frame = None + else: + frame = frame_bgr_webp.tobytes() return frame - # if not self.frame_queue.empty(): - # return self.frame_queue.get() - # else: - # return None - def update_last_frame(self,buffer): if buffer: - with self.lock: - self.last_frame = None - self.last_frame = buffer - # if not self.frame_queue.full(): - # self.frame_queue.put(buffer) - # else: - # self.frame_queue.get() # 丢弃最旧的帧 - # self.frame_queue.put(buffer) - + # with self.lock: + # self.last_frame = None + # self.last_frame = buffer + if not self.frame_queue.full(): + self.frame_queue.put(buffer) + else: + self.frame_queue.get() # 丢弃最旧的帧 + self.frame_queue.put(buffer) #帧序列号自增 一个线程中处理,不用加锁 def increment_counter(self): @@ -68,6 +132,145 @@ class ChannelData: def stop_run(self): self.bool_run = False + def th_sleep(self,frame_interval,last_time): + # 控制帧率 -- 推理帧率必须所有模型一致,若模型推理耗时一致,该方案还算可以。 + current_time = time.time() + elapsed_time = current_time - last_time + if elapsed_time < frame_interval: + time.sleep(frame_interval - elapsed_time) # 若小于间隔时间则休眠 + + def is_in_schedule(self): + '''判断当前时间是否在该通道的工作计划时间内''' + # 验证检测计划,是否在布防时间内 + now = datetime.now() # 获取当前日期和时间 + weekday = now.weekday() # 获取星期几,星期一是0,星期天是6 + hour = now.hour + if self.schedule[weekday][hour] == 1: # 不在计划则不进行验证,直接返回图片 + return 0 + else: + next_hour = (now + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) + seconds_until_next_hour = (next_hour - now).seconds + return seconds_until_next_hour + + def th_prework(self,model): + last_pre_time = time.time() + while self.bool_run: + #start_do_time = time.time() + # 控制帧率 -- 推理帧率必须所有模型一致, + self.th_sleep(self.frame_interval, last_pre_time) + last_pre_time = time.time() + #判断是否在布防计划内 + sleep_time = self.is_in_schedule() + if sleep_time == 0: #判断是否工作计划内-- 后来判断下是否推理和后处理线程需要休眠 + self.bModel = True + else: + self.bModel = False + time.sleep(sleep_time) #工作计划以小时为单位,休息到该小时结束 + continue + # 读取图片进行推理 + ret, img = self.cap.read() + if not ret: + continue + #img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) #这个要确认下什么模式输入进预处理 #? + preimg,scale_ratio, pad_size = model.prework(img) #子类实现-根据每个模型的需要进行预处理 + if not self.preimg_q.full(): + self.preimg_q.put(preimg) + self.img_q.put(img) + self.set_in_cale_ratio(scale_ratio, pad_size) + else: + self.mylogger.debug("preimg_q--预处理队列满了! infer线程处理过慢!") + #end_do_time = time.time() + # # 计算执行时间(秒) + # execution_time = end_do_time - start_do_time + # # 输出执行时间 + # print(f"预处理代码执行时间为:{execution_time:.6f} 秒") + + def th_postwork(self,model): + warn_interval = int(myCongif.get_data("warn_interval")) + last_out_time = time.time() + while self.bool_run: + #do_strat_time = time.time() + # # 控制帧率 , + # self.th_sleep(self.frame_interval, last_out_time) + # last_out_time = time.time() + #执行后处理 + output = self.output_q.get()#为空时会阻塞在get + img = self.infer_img_q.get() + # 子类实现--具体的报警逻辑 + filtered_pred_all, bwarn, warn_text = model.postwork(img, output, self.in_check_area,self.in_polygon, + self.cale_ratio,self.pad_size) + # img的修改应该在原内存空间修改的 + self.result.pop(0) # 先把最早的结果推出数组,保障结果数组定长 + if bwarn: + # 绘制报警文本 + #cv2.putText(img, warn_text, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) + self.result.append(1) + else: # 没有产生报警也需要记录,统一计算占比 + self.result.append(0) + + # 在线程里面完成应该可以减少网页端处理时间 + ret, frame_bgr_webp = cv2.imencode('.jpg', img) + if not ret: + buffer_bgr_webp = None + else: + buffer_bgr_webp = frame_bgr_webp.tobytes() + + # 分析图片放入内存中 + self.add_deque(img) + self.increment_counter() # 帧序列加一 + # 一直更新最新帧,提供网页端显示 + self.update_last_frame(buffer_bgr_webp) + # print(f"{channel_id}--Frame updated at:",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) + if bwarn: + count_one = float(sum(self.result)) # 1,0 把1累加的和就是1的数量 + ratio_of_ones = count_one / len(self.result) + # self.logger.debug(result) + if ratio_of_ones >= self.proportion: # 触发报警 + # 基于时间间隔判断 + current_time = time.time() + elapsed_time = current_time - self.warn_last_time + if elapsed_time < warn_interval: + continue + self.warn_last_time = current_time + # 处理报警 + warn_data = WarnData() + warn_data.model_name = self.model_name + warn_data.warn_text = warn_text + warn_data.img_buffer = self.copy_deque() #深度复制缓冲区 + warn_data.width = self.cap.width + warn_data.height = self.cap.height + warn_data.channel_id = self.channel_id + self.mMM.add_warm_data(warn_data) + # model_name = self.model_name + # w_s_count = self.warn_save_count # 上次保存的缓冲帧序号 + # buffer_count = self.get_counter() + # # 线程? + # self.mMM.save_warn(model_name, w_s_count, buffer_count, self.copy_deque(), + # self.cap.width, self.cap.height, self.channel_id, + # self.mMM.FPS, self.mMM.fourcc) + # self.mMM.send_warn() + # # 更新帧序列号 + # self.warn_save_count = buffer_count + # 结果记录要清空 + for i in range(len(self.result)): + self.result[i] = 0 + + # do_end_time = time.time() + # # 计算执行时间(秒) + # execution_time = do_end_time - do_strat_time + # # 输出执行时间 + # print(f"*************************************后处理代码执行时间为:{execution_time:.6f} 秒") + + + #执行预和后处理线程,每个通道一个 + def start_channel_thread(self,mMM,model): + self.mMM = mMM + th_pre = threading.Thread(target=self.th_prework, args=(model,)) # 一个视频通道一个线程,线程句柄暂时部保留 + th_pre.start() + + th_post = threading.Thread(target=self.th_postwork,args=(model,)) # 一个视频通道一个线程,线程句柄暂时部保留 + th_post.start() + class ChannelManager: def __init__(self): @@ -80,7 +283,9 @@ class ChannelManager: if channel_id in self.channels: #若已经有数据,先删除后再增加 self.channels[channel_id].clear() # 手动清理资源 del self.channels[channel_id] - self.channels[channel_id] = ChannelData(str_url, int_type, bool_run, deque_length,icount_max) + ch_data = ChannelData(channel_id,str_url, int_type, bool_run, deque_length,icount_max) + self.channels[channel_id] = ch_data + return ch_data #删除节点 def delete_channel(self, channel_id): diff --git a/core/ModelManager.py b/core/ModelManager.py index 3ca55c2..c206507 100644 --- a/core/ModelManager.py +++ b/core/ModelManager.py @@ -16,6 +16,7 @@ from myutils.ConfigManager import myCongif from model.plugins.ModelBase import ModelBase from core.ChannelManager import ChannelManager from core.ACLModelManager import ACLModeManger +from core.WarnManager import WarnManager from PIL import Image @@ -25,7 +26,11 @@ class VideoCaptureWithFPS: self.source = source self.width = None self.height = None - self.cap = cv2.VideoCapture(self.source) + #GStreamer + rtsp_stream = f"rtspsrc location={self.source} ! decodebin ! videoconvert ! appsink" + self.cap = cv2.VideoCapture(rtsp_stream, cv2.CAP_GSTREAMER) + #opencv + #self.cap = cv2.VideoCapture(self.source) if self.cap.isOpened(): #若没有打开成功,在读取画面的时候,已有判断和处理 -- 这里也要检查下内存的释放情况 self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) @@ -43,7 +48,10 @@ class VideoCaptureWithFPS: def update(self): icount = 0 while self.running: + #start_time = time.time() ret, frame = self.cap.read() + # end_time = time.time() + # print(f"read()耗时:{(end_time-start_time):.6f}") if not ret: icount += 1 if icount > 5: #重连 @@ -57,7 +65,8 @@ class VideoCaptureWithFPS: self.cap.get(cv2.CAP_PROP_FPS) / float(myCongif.get_data("verify_rate"))) # 向上取整。 icount = 0 else: - time.sleep(1) + print(f"{self.source}视频流,将于5分钟后重连!") + time.sleep(myCongif.get_data("cap_sleep_time")) continue #resized_frame = cv2.resize(frame, (int(self.width / 2), int(self.height / 2))) with self.read_lock: @@ -74,6 +83,7 @@ class VideoCaptureWithFPS: def read(self): with self.read_lock: frame = self.frame.copy() if self.frame is not None else None + #frame = self.frame if frame is not None: return True, frame else: @@ -99,16 +109,17 @@ class ModelManager: #self.buflen = myCongif.get_data("buffer_len") self.icout_max = myCongif.get_data("RESET_INTERVAL") #跟视频帧序用一个变量 self.frame_rate = myCongif.get_data("frame_rate") - self.frame_interval = 1.0 / int(myCongif.get_data("verify_rate")) - #保存视频相关内容 - self.FPS = myCongif.get_data("verify_rate") # 视频帧率--是否能实现动态帧率 - self.fourcc = cv2.VideoWriter_fourcc(*'mp4v') # 使用 mp4 编码 + #基于模型运行环境进行相应初始化工作 self.model_platform = myCongif.get_data("model_platform") self.device_id = myCongif.get_data("device_id") - # acl初始化 -- 一个线程一个 -- 需要验证 + # acl资源初始化 if self.model_platform == "acl": ACLModeManger.init_acl(self.device_id) #acl -- 全程序初始化 + self.model_dic = {} #model_id model + #报警处理线程-全进程独立一个线程处理 + self.warnM = None + def __del__(self): self.logger.debug("释放资源") @@ -140,7 +151,7 @@ class ModelManager: return None module = importlib.util.module_from_spec(module_spec) module_spec.loader.exec_module(module) - md = getattr(module, "Model")(model_path,threshold) #实例化类 + md = getattr(module, "Model")(model_path,self,threshold) #实例化Model if not isinstance(md, ModelBase): self.logger.error("{} not zf_model".format(md)) return None @@ -199,269 +210,89 @@ class ModelManager: def set_last_img(self,): pass - def verify(self,frame,myModle_list,myModle_data,channel_id,schedule_list,result_list,isdraw=1): - '''验证执行主函数,实现遍历通道关联的模型,调用对应模型执行验证,模型文件遍历执行''' - img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) - #img = np.ascontiguousarray(img, dtype=np.float32) / 255.0 # 转换为内存连续存储的数组 --该函数可以待定下是不是所有模型都可以做 - # img = frame.to_ndarray(format="bgr24") - #img = frame - # 使用 模型 进行目标检测 - i_warn_count = 0 #报警标签 - #isverify = False - for i in range(len(myModle_list)): # 遍历通道关联的算法进行检测,若不控制模型数量,有可能需要考虑多线程执行。 - model = myModle_list[i] - data = myModle_data[i] - schedule = schedule_list[i] - result = result_list[i] - #验证检测计划,是否在布防时间内 - now = datetime.datetime.now() # 获取当前日期和时间 - weekday = now.weekday() # 获取星期几,星期一是0,星期天是6 - hour = now.hour - result.pop(0) # 保障结果数组定长 --先把最早的结果推出数组 - if schedule[weekday][hour] == 1: #不在计划则不进行验证,直接返回图片 - # 调用模型,进行检测,model是动态加载的,具体的判断标准由模型内执行 ---- ********* - #isverify = True - detections, bwarn, warntext = model.verify(img, data,isdraw) #****************重要 - # 对识别结果要部要进行处理 - if bwarn: # 整个识别有产生报警 - #根据模型设定的时间和占比判断是否 - # 绘制报警文本 - cv2.putText(img, 'Intruder detected!', (50, (i_warn_count + 1) * 50), - cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) - i_warn_count += 1 - result.append(1) #要验证数组修改,是地址修改吗? - else: #没有产生报警也需要记录,统一计算占比 - result.append(0) - else: - result.append(0) - # if not isverify: #没做处理,直接返回的,需要控制下帧率,太快读取没有意义。 --2024-7-5 取消休眠,帧率控制在dowork_thread完成 - # time.sleep(1.0/self.frame_rate) #给个默认帧率,不超过30帧,---若经过模型计算,CPU下单模型也就12帧这样 - - # 将检测结果图像转换为帧--暂时用不到AVFrame--2024-7-5 - # new_frame_rgb_avframe = av.VideoFrame.from_ndarray(img, format="rgb24") # AVFrame - # new_frame_rgb_avframe.pts = None # 添加此行确保有pts属性 - - # if isinstance(img, np.ndarray): -- 留个纪念 - #处理完的图片后返回-bgr模式 - img_bgr_ndarray = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) - # 将检查结果转换为WebP格式图片 --在线程里面完成应该可以减少网页端处理时间 - ret,frame_bgr_webp=cv2.imencode('.jpg', img_bgr_ndarray) - if not ret: - buffer_bgr_webp = None - else: - buffer_bgr_webp = frame_bgr_webp.tobytes() - return buffer_bgr_webp,img_bgr_ndarray - - - def dowork_thread(self,channel_id): - '''一个通道一个线程,关联的模型在一个线程检测,局部变量都是一个通道独有''' - channel_data = self.verify_list.get_channel(channel_id) #是对ChannelData 对象的引用 - context = None - # 线程ACL初始化 - if self.model_platform == "acl": # ACL线程中初始化内容 - context = ACLModeManger.th_inti_acl(self.device_id) + def start_model_thread(self,channel_id): #通道ID会重吗? + '''实例化模型组件,启动模型推理线程''' + channel_data = self.verify_list.get_channel(channel_id) # 是对ChannelData 对象的引用 #查询关联的模型 --- 在循环运行前把基础数据都准备好 - myDBM = DBManager() - myDBM.connect() strsql = (f"select t1.model_id,t1.check_area,t1.polygon ,t2.duration_time,t2.proportion,t2.model_path,t1.ID," f"t2.model_name,t1.conf_threshold " f"from channel2model t1 left join model t2 on t1.model_id = t2.ID where t1.channel_id ={channel_id};") - #print(strsql) - myModels = myDBM.do_select(strsql) - #加载模型 --- 是不是要做个限制,一个视频通道关联算法模块的上限 --- 关联多了一个线程执行耗时较多,造成帧率太低,或者再多线程并发 #? - - myModle_list = [] #存放模型对象List 一个模型一个 - myModle_data = [] #存放检测参数 一个模型一个 - schedule_list = [] #布防策略 -一个模型一个 - result_list = [] #检测结果记录 -一个模型一个 - warn_last_time =[] #最新的报警时间记录 -一个模型一个 - proportion_list = []#占比设定 -一个模型一个 - warn_save_count = []#没个模型触发报警后,保存录像的最新帧序号 -一个模型一个 - - #获取视频通道的模型相关数据-list - for model in myModels: - #基于基类实例化模块类 - m = self._import_model("",model[5],model[8]) #动态加载模型处理文件py --需要验证模型文件是否能加载 - #m = None - if m: - myModle_list.append(m) #没有成功加载的模型原画输出 - myModle_data.append(model) - #model[6] -- c2m_id --布防计划 0-周一,6-周日 - schedule_list.append(self.getschedule(model[6],myDBM)) - result = [0 for _ in range(model[3] * myCongif.get_data("verify_rate"))] #初始化时间*验证帧率数量的结果list - result_list.append(result) - warn_last_time.append(time.time()) - proportion_list.append(model[4]) #判断是否报警的占比 - warn_save_count.append(0) #保存录像的最新帧初始化为0 - - #开始拉取画面循环检测 - cap = None - #iread_count =0 #失败读取的次数 - last_frame_time = time.time() #初始化个读帧时间 - cap_sleep_time = myCongif.get_data("cap_sleep_time") - #可以释放数据库资源 - del myDBM - warn_interval = myCongif.get_data("warn_interval") - while channel_data.bool_run: #基于tag 作为运行标识。 线程里只是读,住线程更新,最多晚一轮,应该不用线程锁。需验证 - # 帧率控制帧率 - current_time = time.time() - elapsed_time = current_time - last_frame_time - if elapsed_time < self.frame_interval: - time.sleep(self.frame_interval - elapsed_time) #若小于间隔时间则休眠 - last_frame_time = time.time() - #*********取画面************* - if not cap: #第一次需要打开视频流 - try: - cap = self._open_view(channel_data.str_url,channel_data.int_type) #创建子线程读画面 - except: - self.logger.error("打开视频参数错误,终止线程!") - return - ret,frame = cap.read() #除了第一帧,其它应该都是有画面的 - if not ret: - # if iread_count > 30: #2024-7-8 重连接机制放VideoCaptureWithFPS - # self.logger.warning(f"通道-{channel_id}:view disconnected. Reconnecting...") - # cap.release() - # cap = None - # time.sleep(cap_sleep_time) - # else: - # iread_count += 1 - continue #没读到画面继续 - #执行图片推理 -- 如何没有模型或不在工作时间,返回的是原画,要不要控制下帧率? -- 在verify中做了sleep - buffer_bgr_webp,img_bgr_ndarray = self.verify(frame,myModle_list,myModle_data,channel_id,schedule_list,result_list) - - #分析图片放入内存中 - channel_data.add_deque(img_bgr_ndarray) # 缓冲区大小由maxlen控制 超上限后,删除最前的数据 - channel_data.increment_counter() #帧序列加一 - # 一直更新最新帧,提供网页端显示 - channel_data.update_last_frame(buffer_bgr_webp) - #print(f"{channel_id}--Frame updated at:",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) - #验证result_list -是否触发报警要求 --遍历每个模型执行的result - for i in range(len(result_list)): - result = result_list[i] - proportion = proportion_list[i] - count_one = float(sum(result)) #1,0 把1累加的和就是1的数量 - ratio_of_ones = count_one / len(result) - #self.logger.debug(result) - if ratio_of_ones >= proportion: #触发报警 - # 基于时间间隔判断 - current_time = time.time() - elapsed_time = current_time - warn_last_time[i] - if elapsed_time < warn_interval: - continue - warn_last_time[i] = current_time - model_name = myModle_data[i][7] - w_s_count = warn_save_count[i] - buffer_count = channel_data.get_counter() - self.save_warn(model_name,w_s_count,buffer_count,channel_data.copy_deque(), - cap.width,cap.height,channel_id,None,self.FPS,self.fourcc) - self.send_warn() - #更新帧序列号 - warn_save_count[i] = buffer_count - #结果记录要清空 - for i in range(len(result)): - result[i] = 0 - - # end_time = time.time() # 结束时间 - # print(f"Processing time: {end_time - start_time} seconds") - # 本地显示---测试使用 - # if channel_id == 2: - # cv2.imshow(str(channel_id), img) - # if cv2.waitKey(1) & 0xFF == ord('q'): - # break - #结束线程 - - cap.release() #视频采集线程结束 - if context:#ACL线程中反初始化内容 -- 若线程异常退出,这些资源就不能正常释放了 - #先释放每个模型资源 - for model in myModle_list: - del model - #再释放context - ACLModeManger.th_del_acl(context) - #cv2.destroyAllWindows() - - def save_warn(self,model_name,w_s_count,buffer_count,buffer,width,height,channnel_id,myDBM,FPS,fourcc): - ''' - 保存报警信息 --- 涉及到I/O操作可以通过线程取执行 -- 避免主线程阻塞 --还未验证-2024-7-6 - :param model_name: 模型名称,如人员入侵 - :param w_s_count: 报警已存储的最新帧序列 - :param buffer_count: 当前视频缓冲区的最新帧序列 - :param buffer: 视频缓存区 - :param width: 视频画面的width - :param height: 视频画面的height - :param channnel_id: 视频通道ID - :return: ret 数据库操作记录 - ''' - return - - def save_warn_th(model_name,w_s_count,buffer_count,buffer,width,height,channnel_id,myDBM,FPS,fourcc): - now = datetime.datetime.now() # 获取当前日期和时间 - current_time_str = now.strftime("%Y-%m-%d_%H-%M-%S") - filename = f"{channnel_id}_{current_time_str}" - save_path = myCongif.get_data("warn_video_path") - #保存视频 - video_writer = cv2.VideoWriter(f"{save_path}{filename}.mp4", fourcc, FPS, (width, height)) - if not video_writer.isOpened(): - print(f"Failed to open video writer for model/warn/{filename}.mp4") - return False - ilen = len(buffer) - istart = 0; - iend = ilen - if buffer_count < w_s_count or (buffer_count-w_s_count) > ilen: #buffer_count重置过 - #buffer区,都保存为视频 - istart = 0 - else:#只取差异的缓冲区大小 - istart = ilen - (buffer_count-w_s_count) - for i in range(istart,iend): - video_writer.write(buffer[i]) - video_writer.release() - #保存图片 - ret = cv2.imwrite(f"model/warn/{filename}.png",buffer[-1]) - #buffer使用完后删除 - del buffer - if not ret: - print("保存图片失败") - return False - #保存数据库 - myDBM = DBManager() - myDBM.connect() - strsql = (f"INSERT INTO warn (model_name ,video_path ,img_path ,creat_time,channel_id ) " - f"Values ('{model_name}','model/warn/{filename}.mp4','model/warn/{filename}.png'," - f"'{current_time_str}','{channnel_id}');") - ret = myDBM.do_sql(strsql) - del myDBM #释放数据库连接资源 - return ret - - th_chn = threading.Thread(target=save_warn_th, - args=(model_name,w_s_count,buffer_count,buffer,width,height,channnel_id,None,FPS,fourcc,)) # 一个视频通道一个线程,线程句柄暂时部保留 - th_chn.start() - - def send_warn(self): - '''发送报警信息''' - pass + # print(strsql) + model = mDBM.do_select(strsql,1) #2024-7-12调整规则,一个通道只关联一个模型,表结构暂时不动 + if model: + strMID = str(model[0]) + m = None + if strMID in self.model_dic:#该模型线程已启动 + m = self.model_dic[strMID] + else: + # 基于基类实例化模块类 + m = self._import_model("", model[5], model[8]) # 动态加载模型处理文件py --需要验证模型文件是否能加载 + if m: + # 开始工作线程---推理线程需不需要全进程唯一 + m.run = True + m.start_th() #模型里跑两个线程 + #添加模型对象到字典中 + self.model_dic[strMID] = m + else: + self.logger.error(f"{model[5]}没有实例化成功") + return #模型没加载成功--原画输出 + # 更新该模型对应视频通道的数据 + channel_data.set_in_data(model[1], model[2]) #chack_area,ploygon + channel_data.schedule = self.getschedule(model[6], mDBM) # 布放计划-c_m_id + channel_data.result = [0 for _ in + range(model[3] * myCongif.get_data("verify_rate"))] # 初始化时间*验证帧率数量的结果list + channel_data.proportion = model[4] # 报警占比 + channel_data.warn_last_time = time.time() # 最后次触发报警的时间 + channel_data.model_name = model[7] + # 添加通道 + m.addChannel(channel_id, channel_data) #删除时不能手动清空内存 2024-7-14 + #启动视频通道预处理和后处理线程 + channel_data.start_channel_thread(self,m) #一个视频通道,一个预和后处理线程 + + #删除还没完善 del self.model_dic[strMID] + else: #没有模型数据--channel_data.bModel = Flase ,不需要添加数据,直接原画输出 + return + + def stop_model_thread(self,channel_id,model_id): + '''某个视频通道结束工作''' + channel_data = self.verify_list.get_channel(channel_id) # 是对ChannelData 对象的引用 + m = self.model_dic.get(model_id) + if m: + m.strop_th() + channel_data.cap.release() def save_frame_to_video(self): '''把缓冲区中的画面保存为录像''' pass - def start_work(self,channel_id=0): + def start_work(self,channel_id=0): #还涉及单通道对开启和关闭 '''算法模型是在后台根据画面实时分析的 1.布防开关需要触发通道关闭和开启 2.布防策略的调整也需要关闭和重启工作 ''' + #pre-thread if channel_id ==0: strsql = "select id,ulr,type from channel where is_work = 1;" #执行所有通道 else: strsql = f"select id,ulr,type from channel where is_work = 1 and id = {channel_id};" #单通道启动检测线程 datas = mDBM.do_select(strsql) for data in datas: - # img_buffer = deque(maxlen=myCongif.get_data("buffer_len")) #创建个定长的视频buffer - # img = None - # icout = 0 #跟img_buffer对应,记录进入缓冲区的帧序列号 - # run_data = [data[1],data[2],True,img_buffer,img,icout] - # self.verify_list[data[0]] = run_data #需要验证重复情况#? channel_id, str_url, int_type, bool_run, deque_length - self.verify_list.add_channel(data[0],data[1],data[2],True,myCongif.get_data("buffer_len"),myCongif.get_data("RESET_INTERVAL")) - th_chn = threading.Thread(target=self.dowork_thread, args=(data[0],)) #一个视频通道一个线程,线程句柄暂时部保留 - th_chn.start() + # 创建channel_data + ch_data = self.verify_list.add_channel(data[0],data[1],data[2],True, + myCongif.get_data("buffer_len"),myCongif.get_data("RESET_INTERVAL")) + + #启动该通道的视频捕获线程 + ch_data.cap = self._open_view(ch_data.str_url,ch_data.int_type) #创建子线程读画面-把cap给模型就行-- + ch_data.mMM = self + + #目前一个模型两个线程 + self.start_model_thread(data[0]) + #启动告警线程 + self.warnM = WarnManager() + self.warnM.start_warnmanager_th() + + def add_warm_data(self,warn_data): + self.warnM.add_warn_data(warn_data) def stop_work(self,channel_id=0): '''停止工作线程,0-停止所有,非0停止对应通道ID的线程''' diff --git a/core/WarnManager.py b/core/WarnManager.py new file mode 100644 index 0000000..b1af851 --- /dev/null +++ b/core/WarnManager.py @@ -0,0 +1,102 @@ +import threading +import queue +import datetime +import cv2 +from core.DBManager import DBManager +from myutils.ConfigManager import myCongif + +class WarnData: + def __init__(self): + self.width = None #视频画面的width + self.height = None #视频画面的height + self.channel_id = None + self.model_name = None #模型名称,如人员入侵 + self.img_buffer = None #视频缓冲区 赋值时要拷贝一个备份 + + self.warn_text = None + self.channel_name = None + + + +class WarnManager: + def __init__(self): + self.warn_q = queue.Queue() #线程安全 + self.brun = True + # 保存视频相关内容 + self.FPS = myCongif.get_data("verify_rate") # 视频帧率--是否能实现动态帧率 + self.fourcc = cv2.VideoWriter_fourcc(*'mp4v') # 使用 mp4 编码 + + def __del__(self): + pass + + def add_warn_data(self,warn_data): + self.warn_q.put(warn_data) + + def th_warnmanager(self): + myDBM = DBManager() + myDBM.connect() + while self.brun: + warn_data = self.warn_q.get() + self.save_warn(warn_data.model_name,warn_data.img_buffer,warn_data.width,warn_data.height, + warn_data.channel_id,self.FPS,self.fourcc,myDBM) + self.send_warn() + del warn_data.img_buffer + del warn_data + + + def start_warnmanager_th(self): + th_warn = threading.Thread(target=self.th_warnmanager) # 一个视频通道一个线程,线程句柄暂时部保留 + th_warn.start() + + def stop_warnmanager_th(self): + self.brun = False + del self.warn_q + + def send_warn(self): + '''发送报警信息''' + pass + + def save_warn(self,model_name,buffer,width,height,channnel_id,FPS,fourcc,myDBM): + ''' + 保存报警信息 --- 涉及到I/O操作可以通过线程取执行 -- 避免主线程阻塞 --还未验证-2024-7-6 + :param model_name: 模型名称,如人员入侵 + :param w_s_count: 报警已存储的最新帧序列 + :param buffer_count: 当前视频缓冲区的最新帧序列 + :param buffer: 视频缓存区 + :param width: 视频画面的width + :param height: 视频画面的height + :param channnel_id: 视频通道ID + :return: ret 数据库操作记录 + ''' + now = datetime.datetime.now() # 获取当前日期和时间 + current_time_str = now.strftime("%Y-%m-%d_%H-%M-%S") + filename = f"{channnel_id}_{current_time_str}" + save_path = myCongif.get_data("warn_video_path") + # 保存视频 + video_writer = cv2.VideoWriter(f"{save_path}{filename}.mp4", fourcc, FPS, (width, height)) + if not video_writer.isOpened(): + print(f"Failed to open video writer for model/warn/{filename}.mp4") + return False + ilen = len(buffer) + istart = 0; + iend = ilen + + for i in range(len(buffer)): + video_writer.write(buffer[i]) + video_writer.release() + # 保存图片 + ret = cv2.imwrite(f"model/warn/{filename}.png", buffer[-1]) + # buffer使用完后删除 + del buffer + if not ret: + print("保存图片失败") + return False + # 保存数据库 + + strsql = (f"INSERT INTO warn (model_name ,video_path ,img_path ,creat_time,channel_id ) " + f"Values ('{model_name}','model/warn/{filename}.mp4','model/warn/{filename}.png'," + f"'{current_time_str}','{channnel_id}');") + ret = myDBM.do_sql(strsql) + del myDBM # 释放数据库连接资源 + return ret + diff --git a/model/plugins/ModelBase.py b/model/plugins/ModelBase.py index e88da01..02e147c 100644 --- a/model/plugins/ModelBase.py +++ b/model/plugins/ModelBase.py @@ -4,6 +4,10 @@ from myutils.ConfigManager import myCongif from myutils.MyLogger_logger import LogHandler import numpy as np import cv2 +import time +import queue +from datetime import datetime, timedelta +import threading import ast if myCongif.get_data("model_platform") == "acl": import acl @@ -14,30 +18,293 @@ SUCCESS = 0 # 成功状态值 FAILED = 1 # 失败状态值 ACL_MEM_MALLOC_NORMAL_ONLY = 2 # 申请内存策略, 仅申请普通页 +class ModelRunData: + def __init__(self): + self.channel_id = None + + class ModelBase(ABC): - def __init__(self,path): + def __init__(self,path,mMM): ''' 模型类实例化 :param path: 模型文件本身的路径 :param threshold: 模型的置信阈值 ''' + self.model_path = path # 模型路径 + self.mMM = mMM # ModelManager + self.mylogger = LogHandler().get_logger("ModelManager") self.name = None #基于name来查询,用户对模型的配置参数,代表着模型名称需要唯一 2024-6-18 -逻辑还需要完善和验证 self.version = None self.model_type = None # 模型类型 1-图像分类,2-目标检测(yolov5),3-分割模型,4-关键点 self.system = myCongif.get_data("model_platform") #platform.system() #获取系统平台 - self.do_map = { # 定义插件的入口函数 -- - # POCType.POC: self.do_verify, - # POCType.SNIFFER: self.do_sniffer, - # POCType.BRUTE: self.do_brute - } - self.model_path = path # 模型路径 + #--2024-7-12调整规则,一个视频通道只允许配置一个算法模型,一个算法模型可以配置给多路通道 + self.channel_list = [] #该模型需要处理的视频通道 + self.cid_copy_list = [] #基于channel_list的备份,提供给遍历线程 + self.channel_data_list =[] #该模型,针对每个通道配置的执行参数 ChannelData --包含了输入和输出数据 + self.cda_copy_list = [] #基于channel_data_list的备份,提供给遍历线程 + self.frame_interval = 1.0 / int(myCongif.get_data("verify_rate")) self.init_ok = False + #启动推理线程 -- 实例化后是否启动工作线程 -- acl这么搞 + self.run = False + #创建线程锁 + self.list_lock = threading.Lock() #list修改锁 + self.copy_lock = threading.Lock() #副本拷贝锁 + self.read_lock = threading.Lock() #遍历线程锁 + self.readers = 0 #并发读个数 def __del__(self): print("资源释放") + def addChannel(self,channel_id,channel_data): #这两个参数有点重复,后续考虑优化 + bfind = False + with self.list_lock: + for cid in self.channel_list: + if cid == channel_id: + bfind = True + if not bfind: + self.channel_data_list.append(channel_data) + self.channel_list.append(channel_id) + #复制备份 + self.set_copy_list() + + def delChannel(self,channel_id):#调用删除通道的地方,若channel为空,则停止线程,删除该模型对象 + with self.list_lock: + for i,cid in enumerate(self.channel_list): + if cid == channel_id: + self.channel_list.remove(channel_id) + #self.channel_data_list[i].cleardata() #释放内存 + self.channel_data_list.pop(i) + # 复制备份 + self.set_copy_list() + + def set_copy_list(self): #把list拷贝一个副本 + with self.copy_lock: + self.cid_copy_list = self.channel_list.copy() + self.cda_copy_list = self.channel_data_list.copy() + + # copy使用读写锁,这样能避免三个遍历线程间相互竞争 + def acquire_read(self): + with self.read_lock: + self.readers += 1 + if self.readers == 1: + self.copy_lock.acquire() + + def release_read(self): + with self.read_lock: + self.readers -= 1 + if self.readers == 0: + self.copy_lock.release() + + def get_copy_list(self): #线程中拷贝一个本地副本 其实读线程里面执行的时间复杂度也不高,直接用copy锁一样 + self.acquire_read() + local_id = self.cid_copy_list.copy() + local_data = self.cda_copy_list.copy() + self.release_read() + return local_id,local_data + + + def strop_th(self): + '''停止该模型的工作线程''' + self.run = False + time.sleep(1) #确认下在哪执行 + #删除list + del self.channel_list + + def start_th(self): + #要确保三个线程对channel_data的读取和修改是线程安全的,或是独立分开的。 + #预处理 + # th_pre = threading.Thread(target=self.th_prework) # 一个视频通道一个线程,线程句柄暂时部保留 + # th_pre.start() + + #推理 + th_infer = threading.Thread(target=self.th_startwork) # 一个视频通道一个线程,线程句柄暂时部保留 + th_infer.start() + + #后处理 + # th_post = threading.Thread(target=self.th_postwork) # 一个视频通道一个线程,线程句柄暂时部保留 + # th_post.start() + + def th_sleep(self,frame_interval,last_time): + # 控制帧率 -- 推理帧率必须所有模型一致,若模型推理耗时一致,该方案还算可以。 + current_time = time.time() + elapsed_time = current_time - last_time + if elapsed_time < frame_interval: + time.sleep(frame_interval - elapsed_time) # 若小于间隔时间则休眠 + + def is_in_schedule(self,channel_data): + '''判断当前时间是否在该通道的工作计划时间内''' + # 验证检测计划,是否在布防时间内 + now = datetime.now() # 获取当前日期和时间 + weekday = now.weekday() # 获取星期几,星期一是0,星期天是6 + hour = now.hour + if channel_data.schedule[weekday][hour] == 1: # 不在计划则不进行验证,直接返回图片 + return 0 + else: + next_hour = (now + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) + seconds_until_next_hour = (next_hour - now).seconds + return seconds_until_next_hour + + def th_prework(self): + last_pre_time = time.time() + while self.run: + #start_do_time = time.time() + # 控制帧率 -- 推理帧率必须所有模型一致, + self.th_sleep(self.frame_interval, last_pre_time) + last_pre_time = time.time() + #拷贝副本到线程本地 + with self.copy_lock: + #local_cid = self.cid_copy_list.copy() + local_cdata = self.cda_copy_list.copy() + for channel_data in local_cdata: # 如果没有视频通道结束线程 + #判断是否在布防计划内 + sleep_time = self.is_in_schedule(channel_data) + if sleep_time == 0: #判断是否工作计划内-- 后来判断下是否推理和后处理线程需要休眠 + channel_data.bModel = True + else: + channel_data.bModel = False + time.sleep(sleep_time) + continue + # 读取图片进行推理 + ret, img = channel_data.cap.read() + if not ret: + continue + #img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) #这个要确认下什么模式输入进预处理 #? + preimg,scale_ratio, pad_size = self.prework(img) #子类实现-根据每个模型的需要进行预处理 + if not channel_data.preimg_q.full(): + channel_data.preimg_q.put(preimg) + channel_data.img_q.put(img) + channel_data.set_in_cale_ratio(scale_ratio, pad_size) + else: + self.mylogger.debug("preimg_q--预处理队列满了! infer线程处理过慢!") + #end_do_time = time.time() + # # 计算执行时间(秒) + # execution_time = end_do_time - start_do_time + # # 输出执行时间 + # print(f"预处理代码执行时间为:{execution_time:.6f} 秒") + + def th_startwork(self): + '''模型工作线程 由于有多输入通道,需要将执行任务降低到最少''' + self._init_acl() #创建context + if not self._init_resource(): #加载模型文件 -- 正常来说这里不应该失败--上层函数暂时都认为是成功的!需完善 + self.mylogger.error("模型文件初始化加载失败!") + return + last_infer_time = time.time() + #开始工作--- 尽量精简! + while self.run: + #start_do_time = time.time() + # 控制帧率 -- 推理帧率必须所有模型一致, + self.th_sleep(self.frame_interval, last_infer_time) + last_infer_time = time.time() + # 拷贝副本到线程本地 + with self.copy_lock: + # local_cid = self.cid_copy_list.copy() + local_cdata = self.cda_copy_list.copy() + + for channel_data in local_cdata: #如果没有视频通道可以结束线程#? + if channel_data.preimg_q.empty(): + continue + img = channel_data.preimg_q.get() + src_img = channel_data.img_q.get() + #就执行推理,针对结果的逻辑判断交给后处理线程处理。-- 需要确认除目标识别外其他模型的执行方式 + output = self.execute([img,])[0] # 执行推理 + if len(output) > 0: + if not channel_data.output_q.full(): + channel_data.output_q.put(output) + channel_data.infer_img_q.put(src_img) #原图交给后处理线程 + else: + self.mylogger.debug("output_q--后处理队列满了!后处理过慢!") + # end_do_time= time.time() + # # 计算执行时间(秒) + # execution_time = end_do_time - start_do_time + # # 输出执行时间 + # print(f"****************推理代码执行时间为:{execution_time:.6f} 秒") + #结束工作-开始释放资源 + self.release() + self._del_acl() + + def th_postwork(self): + warn_interval = int(myCongif.get_data("warn_interval")) + last_out_time = time.time() + while self.run: + # 控制帧率 -- 推理帧率必须所有模型一致, + self.th_sleep(self.frame_interval, last_out_time) + last_out_time = time.time() + # 拷贝副本到线程本地 + with self.copy_lock: + local_cid = self.cid_copy_list.copy() + local_cdata = self.cda_copy_list.copy() + for i, channel_data in enumerate(local_cdata): + # 控制帧率 -- 推理帧率必须所有模型一致,若模型推理耗时一致,该方案还算可以。 + if channel_data.output_q.empty(): + continue + #执行后处理 + output = channel_data.output_q.get() + img = channel_data.infer_img_q.get() + cale_ratio = channel_data.cale_ratio + pad_size = channel_data.pad_size + + if len(output) <1: + continue + filtered_pred_all, bwarn, warn_text = self.postwork(img,output,channel_data.in_check_area, + channel_data.in_polygon,cale_ratio,pad_size) #子类实现--具体的报警逻辑 + #img的修改应该在原内存空间修改的 + channel_data.result.pop(0) #先把最早的结果推出数组,保障结果数组定长 + if bwarn: # 整个识别有产生报警 + #根据模型设定的时间和占比判断是否 + # 绘制报警文本 + cv2.putText(img, warn_text, (50,50),cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) + channel_data.result.append(1) + else: #没有产生报警也需要记录,统一计算占比 + channel_data.result.append(0) + + # 处理完的图片后返回-bgr模式 --一头一尾是不是抵消了,可以不做处理#? + #img_bgr_ndarray = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) + # 在线程里面完成应该可以减少网页端处理时间 + ret, frame_bgr_webp = cv2.imencode('.jpg', img) + if not ret: + buffer_bgr_webp = None + else: + buffer_bgr_webp = frame_bgr_webp.tobytes() + + # 分析图片放入内存中 + #channel_data.add_deque(img_bgr_ndarray) # 缓冲区大小由maxlen控制 超上限后,删除最前的数据 + channel_data.add_deque(img) + channel_data.increment_counter() # 帧序列加一 + # 一直更新最新帧,提供网页端显示 + channel_data.update_last_frame(buffer_bgr_webp) + # print(f"{channel_id}--Frame updated at:",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) + # 验证result_list -是否触发报警要求 --遍历每个模型执行的result + + if bwarn: + result = channel_data.result #最近的检测记录 + proportion = channel_data.proportion #判断报警的占比 + count_one = float(sum(result)) # 1,0 把1累加的和就是1的数量 + ratio_of_ones = count_one / len(result) + # self.logger.debug(result) + if ratio_of_ones >= proportion: # 触发报警 + # 基于时间间隔判断 + current_time = time.time() + elapsed_time = current_time - channel_data.warn_last_time + if elapsed_time < warn_interval: + continue + #处理报警 + channel_data.warn_last_time = current_time + model_name = channel_data.model_name + w_s_count = channel_data.warn_save_count #上次保存的缓冲帧序号 + buffer_count = channel_data.get_counter() + #线程? + self.mMM.save_warn(model_name, w_s_count, buffer_count, channel_data.copy_deque(), + channel_data.cap.width, channel_data.cap.height, local_cid[i], + self.mMM.FPS, self.mMM.fourcc) + self.mMM.send_warn() + # 更新帧序列号 + channel_data.warn_save_count = buffer_count + # 结果记录要清空 + for i in range(len(result)): + result[i] = 0 + def draw_polygon(self, img, polygon_points,color=(0, 255, 0)): self.polygon = Polygon(ast.literal_eval(polygon_points)) @@ -52,8 +319,7 @@ class ModelBase(ABC): return False #acl ----- 相关----- - def _init_acl(self): - device_id = 0 + def _init_acl(self,device_id=0): self.context, ret = acl.rt.create_context(device_id) # 显式创建一个Context if ret: raise RuntimeError(ret) @@ -66,10 +332,8 @@ class ModelBase(ABC): if ret: raise RuntimeError(ret) print('Deinit TH-Context Successfully') - print('ACL finalize Successfully') def _init_resource(self): - #self._init_acl() #测试使用 ''' 初始化模型、输出相关资源。相关数据类型: aclmdlDesc aclDataBuffer aclmdlDataset''' print("Init model resource") # 加载模型文件 @@ -212,17 +476,25 @@ class ModelBase(ABC): ret = acl.destroy_data_buffer(data_buf) # 释放buffer ret = acl.mdl.destroy_dataset(dataset) # 销毁数据集 - # @abstractmethod - # def infer(self, inputs): # 保留接口, 子类必须重写 - # pass - + @abstractmethod + def prework(self, image): # 预处理 + pass @abstractmethod - def verify(self,image,data,isdraw=1): + def verify(self,image,data,isdraw=1): # ''' :param image: 需要验证的图片 :param data: select t1.model_id,t1.check_area,t1.polygon ,t2.duration_time,t2.proportion,t2.model_path :param isdraw: 是否需要绘制线框:0-不绘制,1-绘制 :return: detections,bwarn,warntext bwarn:0-没有识别到符合要求的目标,1-没有识别到符合要求的目标。 ''' - pass \ No newline at end of file + pass + + @abstractmethod + def postwork(self,image,output,check_area,polygon,scale_ratio, pad_size): # 后处理 + pass + + + +if __name__ =="__main__": + pass \ No newline at end of file diff --git a/model/plugins/RYRQ_ACL/RYRQ_Model_ACL.py b/model/plugins/RYRQ_ACL/RYRQ_Model_ACL.py index 93b28ab..1f9fedf 100644 --- a/model/plugins/RYRQ_ACL/RYRQ_Model_ACL.py +++ b/model/plugins/RYRQ_ACL/RYRQ_Model_ACL.py @@ -8,12 +8,12 @@ import torch # 深度学习运算框架,此处主要用来处理数据 from core.ACLModelManager import ACLModeManger class Model(ModelBase): - def __init__(self,path,threshold=0.5): + def __init__(self,path,mMM,threshold=0.5): # 找pt模型路径 -- 一个约束py文件和模型文件的路径关系需要固定, -- 上传模型时,要解压好路径 dirpath, filename = os.path.split(path) self.model_file = os.path.join(dirpath, "yolov5s_bs1.om") # 目前约束模型文件和py文件在同一目录 self.coco_file = os.path.join(dirpath, "coco_names.txt") - super().__init__(self.model_file) #acl环境初始化基类负责类的实例化 + super().__init__(self.model_file,mMM) #acl环境初始化基类负责类的实例化 self.model_id = None # 模型 id self.input_dataset = None # 输入数据结构 self.output_dataset = None # 输出数据结构 @@ -30,6 +30,7 @@ class Model(ModelBase): self.netw = 640 # 缩放的目标宽度, 也即模型的输入宽度 self.conf_threshold = threshold # 置信度阈值 + #加载ACL模型文件---模型加载、模型执行、模型卸载的操作必须在同一个Context下 if self._init_resource(): #加载离线模型,创建输出缓冲区 print("加载模型文件成功!") @@ -41,54 +42,61 @@ class Model(ModelBase): if self.init_ok: self.release() - - def verify(self,image,data,isdraw=1): - labels_dict = get_labels_from_txt('/mnt/zfbox/model/plugins/RYRQ_ACL/coco_names.txt') # 得到类别信息,返回序号与类别对应的字典 + #针对推理图片的前处理 + def prework(self,image): # 数据前处理 img, scale_ratio, pad_size = letterbox(image, new_shape=[640, 640]) # 对图像进行缩放与填充 - img = img[:, :, ::-1].transpose(2, 0, 1) # BGR to RGB, HWC to CHW #图片在输入时已经做了转换 + img = img[:, :, ::-1].transpose(2, 0, 1) # BGR to RGB, HWC to CHW img = np.ascontiguousarray(img, dtype=np.float32) / 255.0 # 转换为内存连续存储的数组 + return img,scale_ratio, pad_size + + #针对推理结果的后处理 + def postwork(self,image,output,check_area,polygon,scale_ratio, pad_size): + labels_dict = get_labels_from_txt('/mnt/zfbox/model/plugins/RYRQ_ACL/coco_names.txt') # 得到类别信息,返回序号与类别对应的字典 + # 后处理 -- boxout 是 tensor-list: [tensor([[],[].[]])] --[x1,y1,x2,y2,置信度,coco_index] + boxout = nms(torch.tensor(output), conf_thres=0.3, + iou_thres=0.5) # 利用非极大值抑制处理模型输出,conf_thres 为置信度阈值,iou_thres 为iou阈值 + pred_all = boxout[0].numpy() # 转换为numpy数组 -- [[],[],[]] --[x1,y1,x2,y2,置信度,coco_index] + # pred_all[:, :4] 取所有行的前4列,pred_all[:,1]--第一列 + scale_coords([640, 640], pred_all[:, :4], image.shape, ratio_pad=(scale_ratio, pad_size)) # 将推理结果缩放到原始图片大小 - # 模型推理, 得到模型输出 - outputs = None - #outputs = self.execute([img,])#创建input,执行模型,返回结果 --失败返回None + # 是否有检测区域,有先绘制检测区域 由于在该函数生成了polygon对象,所有需要在检测区域前调用。 + if check_area == 1: + self.draw_polygon(image, polygon, (0, 0, 255)) + # 过滤掉不是目标标签的数据 -- 序号0-- person + filtered_pred_all = pred_all[pred_all[:, 5] == 0] + bwarn = False + warn_text = "" + # 绘制检测结果 --- 也需要封装在类里, + for pred in filtered_pred_all: + x1, y1, x2, y2 = int(pred[0]), int(pred[1]), int(pred[2]), int(pred[3]) + # # 绘制目标识别的锚框 --已经在draw_bbox里处理 + # cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2) + if check_area == 1: # 指定了检测区域 + x_center = (x1 + x2) / 2 + y_center = (y1 + y2) / 2 + # 绘制中心点? + cv2.circle(image, (int(x_center), int(y_center)), 5, (0, 0, 255), -1) + # 判断是否区域点 + if not self.is_point_in_region((x_center, y_center)): + continue # 没产生报警-继续 + # 产生报警 -- 有一个符合即可 + bwarn = True + warn_text = "Intruder detected!" + img_dw = draw_bbox(filtered_pred_all, image, (0, 255, 0), 2, labels_dict) # 画出检测框、类别、概率 + # cv2.imwrite('img_res.png', img_dw) + return filtered_pred_all, bwarn, warn_text + + def verify(self,image,outputs,isdraw=1): + labels_dict = get_labels_from_txt('/mnt/zfbox/model/plugins/RYRQ_ACL/coco_names.txt') # 得到类别信息,返回序号与类别对应的字典 + + #后处理部分了 filtered_pred_all = None bwarn = False warn_text = "" - # 是否有检测区域,有先绘制检测区域 由于在该函数生成了polygon对象,所有需要在检测区域前调用。 - if data[1] == 1: - self.draw_polygon(image, data[2], (255, 0, 0)) - if outputs: - output = outputs[0] #只放了一张图片 - # 后处理 -- boxout 是 tensor-list: [tensor([[],[].[]])] --[x1,y1,x2,y2,置信度,coco_index] - boxout = nms(torch.tensor(output), conf_thres=0.3, - iou_thres=0.5) # 利用非极大值抑制处理模型输出,conf_thres 为置信度阈值,iou_thres 为iou阈值 - pred_all = boxout[0].numpy() # 转换为numpy数组 -- [[],[],[]] --[x1,y1,x2,y2,置信度,coco_index] - # pred_all[:, :4] 取所有行的前4列,pred_all[:,1]--第一列 - scale_coords([640, 640], pred_all[:, :4], image.shape, ratio_pad=(scale_ratio, pad_size)) # 将推理结果缩放到原始图片大小 - #过滤掉不是目标标签的数据 -- 序号0-- person - filtered_pred_all = pred_all[pred_all[:, 5] == 0] - # 绘制检测结果 --- 也需要封装在类里, - for pred in filtered_pred_all: - x1, y1, x2, y2 = int(pred[0]), int(pred[1]), int(pred[2]), int(pred[3]) - # # 绘制目标识别的锚框 --已经在draw_bbox里处理 - # cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2) - if data[1] == 1: # 指定了检测区域 - x_center = (x1 + x2) / 2 - y_center = (y1 + y2) / 2 - #绘制中心点? - cv2.circle(image, (int(x_center), int(y_center)), 5, (0, 0, 255), -1) - #判断是否区域点 - if not self.is_point_in_region((x_center, y_center)): - continue #没产生报警-继续 - #产生报警 -- 有一个符合即可 - bwarn = True - warn_text = "Intruder detected!" - img_dw = draw_bbox(filtered_pred_all, image, (0, 255, 0), 2, labels_dict) # 画出检测框、类别、概率 - #cv2.imwrite('img_res.png', img_dw) - return filtered_pred_all, bwarn, warn_text + def testRun(self): print("1111") \ No newline at end of file diff --git a/run.py b/run.py index 02be082..484905f 100644 --- a/run.py +++ b/run.py @@ -4,11 +4,41 @@ from core.ModelManager import mMM import os import platform import shutil - +import queue +import time +import asyncio +import threading +from hypercorn.asyncio import serve +from hypercorn.config import Config print(f"Current working directory (run.py): {os.getcwd()}") + +def test(): + test_q = queue.Queue(maxsize=10) + test_12 = queue.Queue(maxsize=10) + test_q.put("11") + test_q.put("22") + test_12.put("aa") + test_12.put("bb") + q_list = [] + q_list.append(test_q) + q_list.append(test_12) + while True: + for i,q in enumerate(q_list): + if q.empty(): + continue + print(q.get()) + print("执行一次") + time.sleep(1) + web = create_app() +async def run_quart_app(): + config = Config() + config.bind = ["0.0.0.0:5001"] + await serve(web, config) + + if __name__ == '__main__': system = platform.system() if system == "Windows": @@ -22,5 +52,6 @@ if __name__ == '__main__': print(free/(1024*1024)) mMM.start_work() # 启动所有通道的处理 #mVManager.start_check_rtsp() #线程更新视频在线情况 - web.run(debug=True,port=5001,host="0.0.0.0") + asyncio.run(run_quart_app()) + diff --git a/web/API/viedo.py b/web/API/viedo.py index 6e045bc..640f389 100644 --- a/web/API/viedo.py +++ b/web/API/viedo.py @@ -5,6 +5,7 @@ from core.ModelManager import mMM from core.DBManager import mDBM from myutils.ConfigManager import myCongif import logging +import time # 配置日志 logging.basicConfig(level=logging.INFO) @@ -144,17 +145,28 @@ async def get_stats(peer_connection): @api.websocket('/ws/video_feed/') async def ws_video_feed(channel_id): channel_data = mMM.verify_list.get_channel(channel_id) - frame_rate = myCongif.get_data("frame_rate") - while channel_data.bool_run: #这里的多线程并发,还需要验证检查 - frame = channel_data.get_last_frame() - if frame is not None: - #img = frame.to_ndarray(format="bgr24") - # ret, buffer = cv2.imencode('.jpg', frame) - # if not ret: - # continue - # frame = buffer.tobytes() - await websocket.send(frame) - await asyncio.sleep(1.0 / frame_rate) # Adjust based on frame rate + last_time=time.time() + frame_interval = 1.0 / int(myCongif.get_data("verify_rate")) + + if channel_data is not None: + verify_rate = myCongif.get_data("verify_rate") + while channel_data.bool_run: #这里的多协程并发,还需要验证检查 + #控制帧率 + current_time = time.time() + elapsed_time = current_time - last_time + if elapsed_time < frame_interval: + await asyncio.sleep(frame_interval - elapsed_time) # 若小于间隔时间则休眠 + last_time = time.time() + #读取最新的一帧发送 + frame = channel_data.get_last_frame() + if frame is not None: + #img = frame.to_ndarray(format="bgr24") + # ret, buffer = cv2.imencode('.jpg', frame) + # if not ret: + # continue + # frame = buffer.tobytes() + await websocket.send(frame) + #await asyncio.sleep(1.0 / verify_rate) # Adjust based on frame rate @api.route('/shutdown', methods=['POST']) async def shutdown():#这是全关 --需要修改 diff --git a/web/main/routes.py b/web/main/routes.py index a0aa799..162edc4 100644 --- a/web/main/routes.py +++ b/web/main/routes.py @@ -22,7 +22,6 @@ def login_required(f): @main.route('/') async def index(): - print("index") #error = request.args.get('error') return await render_template('实时预览.html') #return await render_template('登录.html',error=error) diff --git a/zfbox.db b/zfbox.db index 920cb4b..781ccf2 100644 Binary files a/zfbox.db and b/zfbox.db differ diff --git a/流程说明.docx b/流程说明.docx index 5336c49..4b4cbaa 100644 Binary files a/流程说明.docx and b/流程说明.docx differ