|
|
|
import threading
|
|
|
|
from collections import deque
|
|
|
|
import numpy as np
|
|
|
|
import time
|
|
|
|
import copy
|
|
|
|
import queue
|
|
|
|
import cv2
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
from myutils.ConfigManager import myCongif
|
|
|
|
from core.WarnManager import WarnData
|
|
|
|
from myutils.MyLogger_logger import LogHandler
|
|
|
|
|
|
|
|
#--2024-7-12调整规则,一个视频通道只允许配置一个算法模型,一个算法模型可以配置给多路通道
|
|
|
|
class ChannelData:
|
|
|
|
def __init__(self, channel_id,str_url, int_type, bool_run, deque_length,icount_max):
|
|
|
|
self.channel_id = channel_id
|
|
|
|
self.str_url = str_url #视频源地址
|
|
|
|
self.int_type = int_type #视频源类型,0-usb,1-rtsp,2-hksdk
|
|
|
|
self.bool_run = bool_run #线程运行标识
|
|
|
|
self.bModel = False #该通道是否有模型标识
|
|
|
|
self.frame_interval = 1.0 / int(myCongif.get_data("verify_rate"))
|
|
|
|
self.mylogger = LogHandler().get_logger("ChannelData")
|
|
|
|
#通道-模型推理输入数据
|
|
|
|
self.cap = None
|
|
|
|
self.in_check_area = None
|
|
|
|
self.in_polygon = None
|
|
|
|
self.last_frame_time = time.time()
|
|
|
|
self.preimg_q = queue.Queue(maxsize=myCongif.get_data("q_size")) #这是预处理后的图片--模型推理使用
|
|
|
|
self.img_q = queue.Queue(maxsize=myCongif.get_data("q_size")) # 原图-后处理使用
|
|
|
|
|
|
|
|
#图片的缩放比例
|
|
|
|
self.cale_ratio = None
|
|
|
|
self.pad_size = None
|
|
|
|
self.schedule = None # 布放计划
|
|
|
|
self.result = None # 结果记录
|
|
|
|
self.proportion = None #报警占比
|
|
|
|
self.warn_save_count = 0 #保存录像的最新帧初始化为0
|
|
|
|
self.model_name = None #关联模型的名字
|
|
|
|
self.warn_last_time = time.time()
|
|
|
|
|
|
|
|
#通道-模型推理数据
|
|
|
|
self.last_infer_time = time.time()
|
|
|
|
self.infer_img_q = queue.Queue(maxsize=myCongif.get_data("q_size")) # 原图-后处理使用
|
|
|
|
|
|
|
|
#通道-模型推理输出数据
|
|
|
|
self.last_out_time = time.time()
|
|
|
|
self.output_q = queue.Queue(maxsize=myCongif.get_data("q_size"))
|
|
|
|
self.deque_frame = deque(maxlen=deque_length) #推理后的缓冲区数据
|
|
|
|
self.counter = 0 # 帧序列号--保存报警录像使用
|
|
|
|
self.icount_max = icount_max # 帧序列号上限
|
|
|
|
self.last_frame = None # 保存图片数据 方案一
|
|
|
|
self.frame_queue = queue.Queue(maxsize=1) #保持图片数据 方案二
|
|
|
|
self.lock = threading.RLock() # 用于保证线程安全 -- 输出数据锁
|
|
|
|
self.mMM = None #ModelManager实例对象
|
|
|
|
|
|
|
|
def __del__(self):
|
|
|
|
self.cap.release() #停止视频采集线程
|
|
|
|
|
|
|
|
def cleardata(self): #清理数据
|
|
|
|
pass
|
|
|
|
|
|
|
|
'''输入数据相关函数'''
|
|
|
|
def set_in_data(self,in_check_area,in_polygon):
|
|
|
|
self.in_check_area = in_check_area
|
|
|
|
self.in_polygon = in_polygon
|
|
|
|
|
|
|
|
def set_in_cale_ratio(self,cale_ratio,pad_size):
|
|
|
|
if self.cale_ratio is None:
|
|
|
|
self.cale_ratio = cale_ratio
|
|
|
|
self.pad_size = pad_size
|
|
|
|
|
|
|
|
'''输出数据相关函数'''
|
|
|
|
#添加一帧图片
|
|
|
|
def add_deque(self, value):
|
|
|
|
self.deque_frame.append(value) #deque 满了以后会把前面的数据移除
|
|
|
|
|
|
|
|
#拷贝一份数据
|
|
|
|
def copy_deque(self):
|
|
|
|
return copy.deepcopy(self.deque_frame)
|
|
|
|
|
|
|
|
#获取最后一帧图片
|
|
|
|
def get_last_frame(self):
|
|
|
|
if self.bModel:
|
|
|
|
#print("取画面")
|
|
|
|
# with self.lock:
|
|
|
|
# frame = self.last_frame
|
|
|
|
if not self.frame_queue.empty():
|
|
|
|
return self.frame_queue.get()
|
|
|
|
else:
|
|
|
|
return None
|
|
|
|
else:
|
|
|
|
ret,img = self.cap.read()
|
|
|
|
if not ret:
|
|
|
|
return None
|
|
|
|
#img_bgr_ndarray = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
|
|
|
|
# 在线程里面完成应该可以减少网页端处理时间
|
|
|
|
ret, frame_bgr_webp = cv2.imencode('.jpg', img)
|
|
|
|
if not ret:
|
|
|
|
frame = None
|
|
|
|
else:
|
|
|
|
frame = frame_bgr_webp.tobytes()
|
|
|
|
return frame
|
|
|
|
|
|
|
|
def update_last_frame(self,buffer):
|
|
|
|
if buffer:
|
|
|
|
# with self.lock:
|
|
|
|
# self.last_frame = None
|
|
|
|
# self.last_frame = buffer
|
|
|
|
if not self.frame_queue.full():
|
|
|
|
self.frame_queue.put(buffer)
|
|
|
|
else:
|
|
|
|
self.frame_queue.get() # 丢弃最旧的帧
|
|
|
|
self.frame_queue.put(buffer)
|
|
|
|
|
|
|
|
#帧序列号自增 一个线程中处理,不用加锁
|
|
|
|
def increment_counter(self):
|
|
|
|
self.counter += 1
|
|
|
|
if self.counter > self.icount_max:
|
|
|
|
self.counter = 0
|
|
|
|
|
|
|
|
def get_counter(self):
|
|
|
|
return self.counter
|
|
|
|
|
|
|
|
#清空数据,主要是删除deque 和 last_frame
|
|
|
|
def clear(self):
|
|
|
|
with self.lock:
|
|
|
|
self.bool_run = False
|
|
|
|
time.sleep(1) #休眠一秒,等待通道对应的子线程,停止工作。
|
|
|
|
self.deque_frame.clear()
|
|
|
|
self.last_frame = None
|
|
|
|
|
|
|
|
def stop_run(self):
|
|
|
|
self.bool_run = False
|
|
|
|
|
|
|
|
def th_sleep(self,frame_interval,last_time):
|
|
|
|
# 控制帧率 -- 推理帧率必须所有模型一致,若模型推理耗时一致,该方案还算可以。
|
|
|
|
current_time = time.time()
|
|
|
|
elapsed_time = current_time - last_time
|
|
|
|
if elapsed_time < frame_interval:
|
|
|
|
time.sleep(frame_interval - elapsed_time) # 若小于间隔时间则休眠
|
|
|
|
|
|
|
|
def is_in_schedule(self):
|
|
|
|
'''判断当前时间是否在该通道的工作计划时间内'''
|
|
|
|
# 验证检测计划,是否在布防时间内
|
|
|
|
now = datetime.now() # 获取当前日期和时间
|
|
|
|
weekday = now.weekday() # 获取星期几,星期一是0,星期天是6
|
|
|
|
hour = now.hour
|
|
|
|
if self.schedule[weekday][hour] == 1: # 不在计划则不进行验证,直接返回图片
|
|
|
|
return 0
|
|
|
|
else:
|
|
|
|
next_hour = (now + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
|
|
|
|
seconds_until_next_hour = (next_hour - now).seconds
|
|
|
|
return seconds_until_next_hour
|
|
|
|
|
|
|
|
def th_prework(self,model):
|
|
|
|
last_pre_time = time.time()
|
|
|
|
while self.bool_run:
|
|
|
|
#start_do_time = time.time()
|
|
|
|
# 控制帧率 -- 推理帧率必须所有模型一致,
|
|
|
|
self.th_sleep(self.frame_interval, last_pre_time)
|
|
|
|
last_pre_time = time.time()
|
|
|
|
#判断是否在布防计划内
|
|
|
|
sleep_time = self.is_in_schedule()
|
|
|
|
if sleep_time == 0: #判断是否工作计划内-- 后来判断下是否推理和后处理线程需要休眠
|
|
|
|
self.bModel = True
|
|
|
|
else:
|
|
|
|
self.bModel = False
|
|
|
|
time.sleep(sleep_time) #工作计划以小时为单位,休息到该小时结束
|
|
|
|
continue
|
|
|
|
# 读取图片进行推理
|
|
|
|
ret, img = self.cap.read()
|
|
|
|
if not ret:
|
|
|
|
continue
|
|
|
|
#img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) #这个要确认下什么模式输入进预处理 #?
|
|
|
|
preimg,scale_ratio, pad_size = model.prework(img) #子类实现-根据每个模型的需要进行预处理
|
|
|
|
if not self.preimg_q.full():
|
|
|
|
self.preimg_q.put(preimg)
|
|
|
|
self.img_q.put(img)
|
|
|
|
self.set_in_cale_ratio(scale_ratio, pad_size)
|
|
|
|
else:
|
|
|
|
self.mylogger.debug("preimg_q--预处理队列满了! infer线程处理过慢!")
|
|
|
|
#end_do_time = time.time()
|
|
|
|
# # 计算执行时间(秒)
|
|
|
|
# execution_time = end_do_time - start_do_time
|
|
|
|
# # 输出执行时间
|
|
|
|
# print(f"预处理代码执行时间为:{execution_time:.6f} 秒")
|
|
|
|
|
|
|
|
def th_postwork(self,model):
|
|
|
|
warn_interval = int(myCongif.get_data("warn_interval"))
|
|
|
|
last_out_time = time.time()
|
|
|
|
while self.bool_run:
|
|
|
|
#do_strat_time = time.time()
|
|
|
|
# # 控制帧率 ,
|
|
|
|
# self.th_sleep(self.frame_interval, last_out_time)
|
|
|
|
# last_out_time = time.time()
|
|
|
|
#执行后处理
|
|
|
|
output = self.output_q.get()#为空时会阻塞在get
|
|
|
|
img = self.infer_img_q.get()
|
|
|
|
# 子类实现--具体的报警逻辑
|
|
|
|
filtered_pred_all, bwarn, warn_text = model.postwork(img, output, self.in_check_area,self.in_polygon,
|
|
|
|
self.cale_ratio,self.pad_size)
|
|
|
|
# img的修改应该在原内存空间修改的
|
|
|
|
self.result.pop(0) # 先把最早的结果推出数组,保障结果数组定长
|
|
|
|
if bwarn:
|
|
|
|
# 绘制报警文本
|
|
|
|
#cv2.putText(img, warn_text, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
|
|
|
|
self.result.append(1)
|
|
|
|
else: # 没有产生报警也需要记录,统一计算占比
|
|
|
|
self.result.append(0)
|
|
|
|
|
|
|
|
# 在线程里面完成应该可以减少网页端处理时间
|
|
|
|
ret, frame_bgr_webp = cv2.imencode('.jpg', img)
|
|
|
|
if not ret:
|
|
|
|
buffer_bgr_webp = None
|
|
|
|
else:
|
|
|
|
buffer_bgr_webp = frame_bgr_webp.tobytes()
|
|
|
|
|
|
|
|
# 分析图片放入内存中
|
|
|
|
self.add_deque(img)
|
|
|
|
self.increment_counter() # 帧序列加一
|
|
|
|
# 一直更新最新帧,提供网页端显示
|
|
|
|
self.update_last_frame(buffer_bgr_webp)
|
|
|
|
# print(f"{channel_id}--Frame updated at:",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
|
|
|
|
if bwarn:
|
|
|
|
count_one = float(sum(self.result)) # 1,0 把1累加的和就是1的数量
|
|
|
|
ratio_of_ones = count_one / len(self.result)
|
|
|
|
# self.logger.debug(result)
|
|
|
|
if ratio_of_ones >= self.proportion: # 触发报警
|
|
|
|
# 基于时间间隔判断
|
|
|
|
current_time = time.time()
|
|
|
|
elapsed_time = current_time - self.warn_last_time
|
|
|
|
if elapsed_time < warn_interval:
|
|
|
|
continue
|
|
|
|
self.warn_last_time = current_time
|
|
|
|
# 处理报警
|
|
|
|
warn_data = WarnData()
|
|
|
|
warn_data.model_name = self.model_name
|
|
|
|
warn_data.warn_text = warn_text
|
|
|
|
warn_data.img_buffer = self.copy_deque() #深度复制缓冲区
|
|
|
|
warn_data.width = self.cap.width
|
|
|
|
warn_data.height = self.cap.height
|
|
|
|
warn_data.channel_id = self.channel_id
|
|
|
|
self.mMM.add_warm_data(warn_data)
|
|
|
|
# model_name = self.model_name
|
|
|
|
# w_s_count = self.warn_save_count # 上次保存的缓冲帧序号
|
|
|
|
# buffer_count = self.get_counter()
|
|
|
|
# # 线程?
|
|
|
|
# self.mMM.save_warn(model_name, w_s_count, buffer_count, self.copy_deque(),
|
|
|
|
# self.cap.width, self.cap.height, self.channel_id,
|
|
|
|
# self.mMM.FPS, self.mMM.fourcc)
|
|
|
|
# self.mMM.send_warn()
|
|
|
|
# # 更新帧序列号
|
|
|
|
# self.warn_save_count = buffer_count
|
|
|
|
# 结果记录要清空
|
|
|
|
for i in range(len(self.result)):
|
|
|
|
self.result[i] = 0
|
|
|
|
|
|
|
|
# do_end_time = time.time()
|
|
|
|
# # 计算执行时间(秒)
|
|
|
|
# execution_time = do_end_time - do_strat_time
|
|
|
|
# # 输出执行时间
|
|
|
|
# print(f"*************************************后处理代码执行时间为:{execution_time:.6f} 秒")
|
|
|
|
|
|
|
|
|
|
|
|
#执行预和后处理线程,每个通道一个
|
|
|
|
def start_channel_thread(self,mMM,model):
|
|
|
|
self.mMM = mMM
|
|
|
|
th_pre = threading.Thread(target=self.th_prework, args=(model,)) # 一个视频通道一个线程,线程句柄暂时部保留
|
|
|
|
th_pre.start()
|
|
|
|
|
|
|
|
th_post = threading.Thread(target=self.th_postwork,args=(model,)) # 一个视频通道一个线程,线程句柄暂时部保留
|
|
|
|
th_post.start()
|
|
|
|
|
|
|
|
|
|
|
|
class ChannelManager:
|
|
|
|
def __init__(self):
|
|
|
|
self.channels = {}
|
|
|
|
self.lock = threading.RLock() # 用于保证字典操作的线程安全
|
|
|
|
|
|
|
|
#增加节点
|
|
|
|
def add_channel(self, channel_id, str_url, int_type, bool_run, deque_length=10,icount_max=100000):
|
|
|
|
with self.lock:
|
|
|
|
if channel_id in self.channels: #若已经有数据,先删除后再增加
|
|
|
|
self.channels[channel_id].clear() # 手动清理资源
|
|
|
|
del self.channels[channel_id]
|
|
|
|
ch_data = ChannelData(channel_id,str_url, int_type, bool_run, deque_length,icount_max)
|
|
|
|
self.channels[channel_id] = ch_data
|
|
|
|
return ch_data
|
|
|
|
|
|
|
|
#删除节点
|
|
|
|
def delete_channel(self, channel_id):
|
|
|
|
with self.lock:
|
|
|
|
if channel_id in self.channels:
|
|
|
|
self.channels[channel_id].clear() # 手动清理资源
|
|
|
|
del self.channels[channel_id]
|
|
|
|
|
|
|
|
#获取节点
|
|
|
|
def get_channel(self, channel_id):
|
|
|
|
with self.lock:
|
|
|
|
return self.channels.get(channel_id)
|
|
|
|
|
|
|
|
#停止工作线程
|
|
|
|
def stop_channel(self,channel_id):
|
|
|
|
with self.lock:
|
|
|
|
if channel_id == 0:
|
|
|
|
for clannel_id,clannel_data in self.channels.items():
|
|
|
|
clannel_data.clear()
|
|
|
|
del self.channels
|
|
|
|
else:
|
|
|
|
if channel_id in self.channels:
|
|
|
|
self.channels[channel_id].clear() # 手动清理资源
|
|
|
|
del self.channels[channel_id]
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
# 示例使用
|
|
|
|
manager = ChannelManager()
|
|
|
|
manager.add_channel('channel_1', 'test', 123, True, deque_length=5)
|
|
|
|
|
|
|
|
# 更新和读取操作
|
|
|
|
manager.update_channel_deque('channel_1', 'data1')
|
|
|
|
manager.update_channel_buffer('channel_1', np.array([[1, 2], [3, 4]]))
|
|
|
|
manager.increment_channel_counter('channel_1')
|
|
|
|
|
|
|
|
channel_data = manager.get_channel_data('channel_1')
|
|
|
|
print(channel_data)
|