Browse Source

整合了模型独立进程,通过配置文件中的workType进行控制。

zfbox
张龙 7 months ago
parent
commit
6f5d873ee8
  1. 7
      config.yaml
  2. 5
      core/CapManager.py
  3. 629
      core/ChannelData.py
  4. 484
      core/ChannelManager.py
  5. 19
      core/DataStruct.py
  6. 46
      core/ModelManager.py
  7. 133
      core/ModelNode.py
  8. 9
      model/base_model/ascnedcl/classes.py
  9. 1
      model/base_model/ascnedcl/det_utils.py
  10. 107
      model/base_model/ascnedcl/det_utils_v10.py
  11. 8
      model/plugins/ModelBase.py
  12. 76
      model/plugins/RYRQ_ACL/RYRQ_Model_ACL.py
  13. 103
      model/plugins/RYRQ_Model_ACL/RYRQ_Model_ACL.py
  14. 15
      myutils/MyDeque.py
  15. 0
      myutils/myutil.py
  16. 76
      web/API/viedo.py

7
config.yaml

@ -30,6 +30,9 @@ RTSP_Check_Time : 600 #10分钟 -- 2024-7-8 取消使用
#max_channel_num
max_channel_num : 8 #最大视频通道数量
encode_param : 50 #无参数默认是95
mywidth: 640
myheight: 480
#model
model_platform : acl #acl gpu cpu
@ -49,3 +52,7 @@ reconnect_attempts: 5 #cap 读取帧失败几次后进行重连
#system --- 指定网卡
wired_interface : eth0
wireless_interface : WLAN
#独立模型线程相关
workType : 1 # 1--一通道一线程。2--模型独立线程

5
core/CapManager.py

@ -42,7 +42,7 @@ class VideoCaptureWithFPS:
self.running = True
#self.frame_queue = queue.Queue(maxsize=1)
self.frame_queue = MyDeque(10)
self.frame_queue = MyDeque(5)
#self.frame = None
#self.read_lock = threading.Lock()
self.thread = threading.Thread(target=self.update)
@ -63,6 +63,7 @@ class VideoCaptureWithFPS:
def update(self):
sleep_time = myCongif.get_data("cap_sleep_time")
reconnect_attempts = myCongif.get_data("reconnect_attempts")
while self.running:
try:
self.openViedo_opencv(self.source)
@ -141,6 +142,8 @@ class VideoCaptureWithFPS:
frame = self.frame_queue.mypopleft()
if frame is not None:
ret = True
else:
print("____读取cap帧为空,采集速度过慢___")
return ret, frame
def release(self):

629
core/ChannelData.py

@ -0,0 +1,629 @@
import time
import copy
import importlib.util
import datetime
import time
import threading
import cv2
import ffmpeg
import subprocess
from collections import deque
from myutils.MyLogger_logger import LogHandler
from core.CapManager import VideoCaptureWithFPS
from core.ACLModelManager import ACLModeManger
from model.plugins.ModelBase import ModelBase
from core.WarnManager import WarnData
from core.DataStruct import ModelinData,ModeloutData
from myutils.MyDeque import MyDeque
from myutils.ConfigManager import myCongif
class ChannelData:
def __init__(self,channel_id,deque_length,icount_max,warnM):
self.logger = LogHandler().get_logger("ChannelDat")
self.model_platform = myCongif.get_data("model_platform")
self.channel_id = channel_id #该通道的通道ID
self.warnM = warnM #报警线程管理对象--MQ
self.ffprocess = self.start_h264_encoder(myCongif.get_data("mywidth"),myCongif.get_data("myheight"))#基于frame进行编码
#视频采集相关
self.cap = None #该通道视频采集对象
self.frame_rate = myCongif.get_data("frame_rate")
self.frame_interval = 1.0 / int(myCongif.get_data("verify_rate"))
#模型采集相关
self.model = None #模型对象 -- 一通道只关联一个模型
self.work_th = None #该通道工作线程句柄
self.b_model = False #是否有运行模型线程
self.bool_run = True # 线程运行标识
self.lock = threading.RLock() # 用于保证线程安全
self.icount_max = icount_max # 帧序列号上限
self.max_len = myCongif.get_data("buffer_len")
self.deque_frame = deque(maxlen=self.max_len) #视频缓冲区用于保存录像
self.last_frame = None # 保存图片数据
#self.frame_queue = queue.Queue(maxsize=1)
self.frame_queue = MyDeque(10) #分析画面MQ
self.counter = 0 #帧序列号--保存报警录像使用
#model独立线程相关
self.per_th = None #预处理线程句柄
self.per_status= False #预处理线程状态
self.post_th = None #后处理线程句柄
self.post_status = False #后处理线程状态
self.model_node= None #模型对象 -- inmq,outmq
self.out_mq = MyDeque(30) #放通道里面
#设置JPEG压缩基本
self.encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), myCongif.get_data("encode_param")] # 50 是压缩质量(0到100)
#添加一帧图片
def add_deque(self, value):
if len(self.deque_frame) == self.max_len:
removed_frame = self.deque_frame.popleft()
del removed_frame
removed_frame = None
self.deque_frame.append(value) #deque 满了以后会把前面的数据移除
#拷贝一份数据
def copy_deque(self):
return copy.deepcopy(self.deque_frame)
#获取最后一帧图片
def get_last_frame(self):
if self.b_model:
# with self.lock:
# frame = self.last_frame
# return frame
# try:
# frame = self.frame_queue.get(timeout=0.3) #web传输没有做帧率控制了,可以超时时间长一点
# except queue.Empty:
# self.logger.debug(f"{self.channel_id}--web--获取分析画面失败,队列空")
# return None
frame = self.frame_queue.mypopleft()
return frame
else: #如果没有运行,直接从cap获取画面
if self.cap:
ret, frame = self.cap.read() # 除了第一帧,其它应该都是有画面的
if not ret:
self.logger.debug(f"{self.channel_id}--web--获取原画失败,队列空")
return None
ret,buffer_bgr_webp = self._encode_frame(frame)
return buffer_bgr_webp
# frame_bgr_webp = self.encode_frame_to_flv(frame)
# return frame_bgr_webp
return None
def encode_frame_to_flv(self,frame):
try:
process = (
ffmpeg
.input('pipe:', format='rawvideo', pix_fmt='bgr24', s=f'{frame.shape[1]}x{frame.shape[0]}')
.output('pipe:', format='flv',vcodec='libx264')
.run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True)
)
out, err = process.communicate(input=frame.tobytes())
if process.returncode != 0:
raise RuntimeError(f"FFmpeg encoding failed: {err.decode('utf-8')}")
return out
except Exception as e:
print(f"Error during frame encoding: {e}")
return None
def update_last_frame(self,buffer):
if buffer:
self.frame_queue.myappend(buffer)
# with self.lock:
# self.last_frame = None
# self.last_frame = buffer
# if self.frame_queue.full():
# try:
# print("channel--丢帧")
# self.frame_queue.get(timeout=0.01)
# except queue.Empty: #为空不处理
# pass
# self.frame_queue.put(buffer)
# try:
# self.frame_queue.put(buffer,timeout=0.05)
# except queue.Full:
# self.logger.debug(f"{self.channel_id}分析画面队列满,插入失败")
# pass
#------------h264编码相关---------------
def start_h264_encoder(self,width, height): #宽高一样,初步定全进程一个
process = subprocess.Popen(
['ffmpeg',
'-f', 'rawvideo',
'-pix_fmt', 'bgr24',
'-s', f'{width}x{height}',
'-i', '-', # Take input from stdin
'-an', # No audio
'-vcodec', 'h264_ascend',
'-preset', 'ultrafast',
'-f', 'h264', # Output format H.264
'-'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
return process
def encode_frame_h264(self, frame):
if self.process.poll() is not None:
raise RuntimeError("FFmpeg process has exited unexpectedly.")
# Write frame to stdin of the FFmpeg process
try:
self.process.stdin.write(frame.tobytes())
except Exception as e:
raise RuntimeError(f"Failed to write frame to FFmpeg: {e}")
# Capture the encoded output
buffer_size = 1024 * 10 # Adjust this based on the size of the encoded frame
encoded_frame = bytearray()
while True:
chunk = self.process.stdout.read(buffer_size)
if not chunk:
break
encoded_frame.extend(chunk)
if not encoded_frame:
raise RuntimeError("No encoded data received from FFmpeg.")
# Optional: Check for errors in stderr
# stderr_output = self.process.stderr.read()
# if "error" in stderr_output.lower():
# raise RuntimeError(f"FFmpeg error: {stderr_output}")
return bytes(encoded_frame)
def _encode_frame(self,frame,itype=0):
ret = False
buffer_bgr_webp = None
if itype == 0: #jpg
ret, frame_bgr_webp = cv2.imencode('.jpg', frame, self.encode_param)
if ret:
buffer_bgr_webp = frame_bgr_webp.tobytes()
elif itype == 1: #H264
try:
buffer_bgr_webp = self.encode_frame_h264(frame)
ret = True
except Exception as e:
print(e)
else:
print("错误的参数!!")
return ret,buffer_bgr_webp
#帧序列号自增 一个线程中处理,不用加锁
def increment_counter(self):
self.counter += 1
if self.counter > self.icount_max:
self.counter = 0
def get_counter(self):
return self.counter
def _start_cap_th(self,source,type=1):
'''开始cap采集线程
type = 打开摄像头 0--USB摄像头1-RTSP,2-海康SDK
'''
ret = False
if self.cap:
self.cap.release()
self.cap = None
self.cap = VideoCaptureWithFPS(source,type)
if self.cap:
ret = True
return ret
def _stop_cap_th(self):
'''停止cap采集线程
重要约束停止cap线程前必须先停止model线程
'''
if self.b_model:
self.logger.error("停止采集线程前,请先停止model线程")
return False
else:
if self.cap:
self.cap.release()
self.cap = None
return True #一般不会没有cap
def _pre_work_th(self,schedule):
'''一个通道一个线程,关联的模型在一个线程检测,局部变量都是一个通道独有'''
if not self.cap:
self.logger.error("采集线程未正常启动,不进行工作")
return
while self.model_node.model_th_status == 0: #避免模型没启动成功,模型线程在运行
time.sleep(1)
if self.model_node.model_th_status == 1:
# 开始循环处理业务
last_frame_time = time.time() # 初始化个读帧时间
self.per_status = True
self.b_model = True
while self.bool_run: # 基于tag 作为运行标识。 线程里只是读,住线程更新,最多晚一轮,应该不用线程锁。需验证
# 帧率控制帧率
current_time = time.time()
elapsed_time = current_time - last_frame_time
if elapsed_time < self.frame_interval:
time.sleep(self.frame_interval - elapsed_time) # 若小于间隔时间则休眠
last_frame_time = time.time()
# *********取画面*************
ret, frame = self.cap.read() # 除了第一帧,其它应该都是有画面的
if not ret:
# self.logger.debug(f"{self.channel_id}--model--获取cap画面失败,队列空")
continue # 没读到画面继续
# 验证检测计划,是否在布防时间内
now = datetime.datetime.now() # 获取当前日期和时间
weekday = now.weekday() # 获取星期几,星期一是0,星期天是6
hour = now.hour
if schedule[weekday][hour] == 1:
#图片预处理
img,scale_ratio, pad_size = self.model_node.model.prework(frame)
indata = ModelinData(self.channel_id,img,frame,scale_ratio, pad_size)
self.model_node.in_mq.myappend(indata)
else:# 不在计划则不进行验证,直接返回图片 --存在问题是:result 漏数据
ret, frame_bgr_webp = cv2.imencode('.jpg', frame,self.encode_param)
if not ret:
buffer_bgr_webp = None
else:
buffer_bgr_webp = frame_bgr_webp.tobytes()
self.update_last_frame(buffer_bgr_webp)
else:
self.logger.error("模型线程为启动成功,不进行工作")
return
self.b_model = False
self.per_status = False
def _post_work_th(self,duration_time,proportion,verify_rate,warn_interval,model_name,check_area,polygon,conf_threshold,iou_thres):
# 初始化业务数据
result = [0 for _ in range(duration_time * verify_rate)] # 初始化时间*验证帧率数量的结果list
warn_last_time = time.time()
while self.bool_run:
out_data = self.out_mq.mypopleft() #(image,scale_ratio, pad_size,outputs):
if not out_data:
time.sleep(0.1)
continue
#开始后处理
bwarn, warn_text = self.model_node.model.postwork(out_data.image,out_data.outputs,out_data.scale_ratio,out_data.pad_size,
check_area,polygon,conf_threshold,iou_thres)
# 对识别结果要部要进行处理
if bwarn:
# 绘制报警文本
cv2.putText(out_data.image, warn_text, (50, 50),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
result.append(1) # 要验证数组修改,是地址修改吗?
else: # 没有产生报警也需要记录,统一计算占比
result.append(0)
#分析画面保存
ret, frame_bgr_webp = cv2.imencode('.jpg', out_data.image,self.encode_param)
buffer_bgr_webp = None
if ret:
buffer_bgr_webp = frame_bgr_webp.tobytes()
# 分析图片放入缓冲区内存中
self.add_deque(out_data.image) # 缓冲区大小由maxlen控制 超上限后,删除最前的数据
# 分析画面一直更新最新帧,提供网页端显示
self.update_last_frame(buffer_bgr_webp)
if bwarn:
# 验证result -是否触发报警要求 --遍历每个模型执行的result
count_one = float(sum(result)) # 1,0 把1累加的和就是1的数量
ratio_of_ones = count_one / len(result)
# self.logger.debug(result)
if ratio_of_ones >= proportion: # 触发报警
# 基于时间间隔判断
current_time = time.time()
elapsed_time = current_time - warn_last_time
if elapsed_time < warn_interval:
continue
warn_last_time = current_time
# 处理报警
warn_data = WarnData()
warn_data.model_name = model_name
warn_data.warn_text = warn_text
warn_data.img_buffer = self.copy_deque() # 深度复制缓冲区
warn_data.width = self.cap.width
warn_data.height = self.cap.height
warn_data.channel_id = self.channel_id
self.warnM.add_warn_data(warn_data)
# 结果记录要清空
for i in range(len(result)):
result[i] = 0
def _verify(self,frame,model,model_data,schedule,result,isdraw=1):
'''验证执行主函数,实现遍历通道关联的模型,调用对应模型执行验证'''
#img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
img = frame
#验证检测计划,是否在布防时间内
now = datetime.datetime.now() # 获取当前日期和时间
weekday = now.weekday() # 获取星期几,星期一是0,星期天是6
hour = now.hour
result.pop(0) # 保障结果数组定长 --先把最早的结果推出数组
warntext = ""
if model and schedule[weekday][hour] == 1: #不在计划则不进行验证,直接返回图片
# 调用模型,进行检测,model是动态加载的,具体的判断标准由模型内执行 ---- *********
bwarn, warntext = model.verify(img, model_data,isdraw) #****************重要
# 对识别结果要部要进行处理
if bwarn:
# 绘制报警文本
cv2.putText(img, warntext, (50, 50),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
result.append(1) #要验证数组修改,是地址修改吗?
else: #没有产生报警也需要记录,统一计算占比
warntext = ""
result.append(0)
else:
result.append(0)
# 将检测结果图像转换为帧--暂时用不到AVFrame--2024-7-5
# new_frame_rgb_avframe = av.VideoFrame.from_ndarray(img, format="rgb24") # AVFrame
# new_frame_rgb_avframe.pts = None # 添加此行确保有pts属性
# if isinstance(img, np.ndarray): -- 留个纪念
#处理完的图片后返回-bgr模式
#img_bgr_ndarray = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
ret,buffer_bgr_webp = self._encode_frame(img)
#buffer_bgr_webp = self.encode_frame_to_flv(img)
return buffer_bgr_webp,img,warntext
def _dowork_thread(self,channel_id,model_data,schedule,verify_rate,warn_interval):
'''一个通道一个线程,关联的模型在一个线程检测,局部变量都是一个通道独有'''
if not self.cap:
self.logger.error("采集线程未正常启动,退出model线程")
return
#加载自定义模型文件
model = self._import_model(str(channel_id), model_data[5], model_data[8], model_data[9]) # 动态加载模型处理文件py
if not model:
self.logger.error("自定义模型文件加载失败,退出model线程")
return
#初始化模型运行资源
context = None
device_id = myCongif.get_data("device_id")
if self.model_platform == "acl": # ACL线程中初始化内容
context = ACLModeManger.th_inti_acl(device_id) #创建context
#初始化模型资源 -- 加载模型文件
ret = model.init_acl_resource() #加载和初始化离线模型文件--om文件
if not ret:
print("初始化模型资源出错,退出线程!")
return
#初始化业务数据
result = [0 for _ in range(model_data[3] * verify_rate)] # 初始化时间*验证帧率数量的结果list
proportion = model_data[4] # 判断是否报警的占比
warn_last_time = time.time()
#warn_save_count = 0 #保存录像的最新帧初始化为0
#开始循环处理业务
last_frame_time = time.time() #初始化个读帧时间
self.b_model = True
while self.bool_run: #基于tag 作为运行标识。 线程里只是读,住线程更新,最多晚一轮,应该不用线程锁。需验证
# 帧率控制帧率
current_time = time.time()
elapsed_time = current_time - last_frame_time
if elapsed_time < self.frame_interval:
time.sleep(self.frame_interval - elapsed_time) #若小于间隔时间则休眠
last_frame_time = time.time()
#*********取画面*************
ret,frame = self.cap.read() #除了第一帧,其它应该都是有画面的
if not ret:
#self.logger.debug(f"{self.channel_id}--model--获取cap画面失败,队列空")
continue #没读到画面继续
#执行图片推理
buffer_bgr_webp,img_bgr_ndarray,warn_text = self._verify(frame,model,model_data,schedule,result)
#分析图片放入内存中
self.add_deque(img_bgr_ndarray) # 缓冲区大小由maxlen控制 超上限后,删除最前的数据
# 分析画面一直更新最新帧,提供网页端显示
self.update_last_frame(buffer_bgr_webp)
if warn_text:
#验证result -是否触发报警要求 --遍历每个模型执行的result
count_one = float(sum(result)) #1,0 把1累加的和就是1的数量
ratio_of_ones = count_one / len(result)
#self.logger.debug(result)
if ratio_of_ones >= proportion: #触发报警
# 基于时间间隔判断
current_time = time.time()
elapsed_time = current_time - warn_last_time
if elapsed_time < warn_interval:
continue
warn_last_time = current_time
# 处理报警
warn_data = WarnData()
warn_data.model_name = model_data[7]
warn_data.warn_text = warn_text
warn_data.img_buffer = self.copy_deque() # 深度复制缓冲区
warn_data.width = self.cap.width
warn_data.height = self.cap.height
warn_data.channel_id = channel_id
self.warnM.add_warn_data(warn_data)
#结果记录要清空
for i in range(len(result)):
result[i] = 0
# end_time = time.time() # 结束时间
# print(f"Processing time: {end_time - start_time} seconds")
# 本地显示---测试使用
# if channel_id == 2:
# cv2.imshow(str(channel_id), img)
# if cv2.waitKey(1) & 0xFF == ord('q'):
# break
#结束线程
print("开始结束工作线程")
self.b_model = False
#反初始化
if self.model_platform == "acl":
try:
model.release() #释放模型资源资源
# 删除模型对象
del model
#释放context
if context: # ACL线程中反初始化内容 -- 若线程异常退出,这些资源就不能正常释放了
# 再释放context
ACLModeManger.th_del_acl(context)
except Exception as e:
print(e)
#cv2.destroyAllWindows()
print("线程结束!!!!")
#2024-9-9 新增兼容独立model线程 根据self.model_node判断,None:1通道1线程,not None:独立线程
def _start_model_th(self,model_data,schedule,type=1):
verify_rate = myCongif.get_data("verify_rate")
warn_interval = myCongif.get_data("warn_interval")
self.bool_run = True
if self.model_node:#要起个预处理线程,和一个后处理线程
#启动后处理线程
self.post_th = threading.Thread(target=self._post_work_th,
args=(model_data[3],model_data[4],verify_rate,warn_interval,model_data[7],
model_data[1],model_data[2],model_data[8],model_data[9]))
self.post_th.start()
#启动模型线程,若线程已启动,则+1 mq
self.model_node.start_model_th(self.channel_id,self.out_mq)
#启动预处理线程
self.per_th = threading.Thread(target=self._pre_work_th,args=(schedule,))
self.per_th.start()
else:
self.work_th = threading.Thread(target=self._dowork_thread,
args=(self.channel_id, model_data, schedule, verify_rate,
warn_interval)) # 一个视频通道一个线程
self.work_th.start()
def _stop_model_th(self):
if self.model_node: #独立线程,需要停止预处理线程,和后处理线程
self.bool_run = False
#停止预处理线程
if self.per_th:
self.per_th.join()
self.per_th = None
#停止model线程 -1
self.model_node.stop_model_th(self.channel_id)
#停止后处理线程
if self.post_th:
self.post_th.join()
self.post_th = None
self.out_mq.myclear()#清空后处理mq中未处理的数据
else:
if self.work_th:
if self.b_model:
self.bool_run = False
self.work_th.join() #线程结束时,会把b_model置False
self.logger.debug(f"{self.channel_id}停止工作线程")
self.work_th = None
def _import_model(self,model_name,model_path,threshold,iou_thres):
'''
根据路径动态导入模块
:param model_name: 模块名称 --用通道ID代替
:param model_path: 模块路径
:param threshold: 置信阈值
:param iou_thres: iou阈值
:return:
'''
try:
module_path = model_path.replace("/", ".").rsplit(".", 1)[0]
print(module_path)
# 动态导入模块
module = importlib.import_module(module_path)
# 从模块中获取指定的类
Model = getattr(module, "Model")
# 使用 Model 类
model_instance = Model(model_path,threshold,iou_thres)
return model_instance
except ModuleNotFoundError as e:
print(f"Module not found: {e}")
return None
except AttributeError as e:
print(f"Class not found in module: {e}")
return None
except Exception as e:
print(f"An unexpected error occurred: {e}")
return None
# if os.path.exists(model_path):
# module_spec = importlib.util.spec_from_file_location(model_name, model_path)
# if module_spec is None:
# self.logger.error(f"{model_path} 加载错误")
# return None
# module = importlib.util.module_from_spec(module_spec)
# start_time = time.time()
# print(f"通道{self.channel_id},开始exec_module自定义模型文件")
# module_spec.loader.exec_module(module)
# end_time = time.time()
# print(f"通道{self.channel_id},完成exec_module模型文件实例化耗时{end_time - start_time}")
#
# try:
# md = getattr(module, "Model")(model_path,threshold,iou_thres) #实例化类
# except Exception as e:
# self.logger.error(f"{model_path} 实例化错误,退出模型线程")
# return None
#
# if not isinstance(md, ModelBase):
# self.logger.error("{} not zf_model".format(md))
# return None
# else:
# self.logger.error("{}文件不存在".format(model_path))
# return None
# self.logger.debug(f"{model_path} 加载成功!!!!")
# return md
def start_work(self,cap_data,model_data,schedule,type,model_Node=None):
'''
开始工作线程包括视频通道采集和模型处理
:param cap_data: [source,type]
:param model_data: 跟通道关联的模型数据
strsql = (
f"select t1.model_id,t1.check_area,t1.polygon ,t2.duration_time,t2.proportion,t2.model_path,t1.ID,"
f"t2.model_name,t1.conf_thres,t1.iou_thres "
f"from channel2model t1 left join model t2 on t1.model_id = t2.ID where t1.channel_id ={channel_id};")
:param schedule
:param type: 0-启动所有线程1-只启动cap采集线程2-只启动model模型处理线程
:return: True,False
'''
ret = False
self.model_node = model_Node
if type==0:
self._start_cap_th(cap_data[0],cap_data[1]) #先cap,再model
self._start_model_th(model_data,schedule)
ret = True
elif type == 1:
self._start_cap_th(cap_data[0],cap_data[1])
ret = True
elif type == 2:
self._start_model_th(model_data,schedule)
ret = True
else:
self.logger.error("暂时还不支持该类型工作!")
return ret
def stop_work(self,type=0):
'''
清空数据停止工作线程若有 并删除deque last_frame
:param type: 0-停止所有线程1-只停止cap采集线程2-只停止model模型处理线程
:return: True,False
'''
ret = False
if type == 0:
self._stop_model_th()
self._stop_cap_th()
ret = True
elif type == 1:
self._stop_cap_th()
ret = True
elif type == 2:
self.logger.debug("单独停止工作线程")
self._stop_model_th()
ret = True
else:
self.logger.error("暂时还不支持该类型工作!")
return ret

484
core/ChannelManager.py

@ -1,439 +1,10 @@
import numpy as np
import time
import copy
import cv2
import threading
import importlib.util
import datetime
import time
import os
import ffmpeg
from collections import deque
from myutils.MyLogger_logger import LogHandler
from myutils.ConfigManager import myCongif
from core.CapManager import VideoCaptureWithFPS
from core.ACLModelManager import ACLModeManger
from model.plugins.ModelBase import ModelBase
from core.WarnManager import WarnData
from myutils.MyDeque import MyDeque
import base64
class ChannelData:
def __init__(self,channel_id,deque_length,icount_max,warnM):
self.logger = LogHandler().get_logger("ChannelDat")
self.model_platform = myCongif.get_data("model_platform")
self.channel_id = channel_id #该通道的通道ID
self.warnM = warnM #报警线程管理对象--MQ
#视频采集相关
self.cap = None #该通道视频采集对象
self.frame_rate = myCongif.get_data("frame_rate")
self.frame_interval = 1.0 / int(myCongif.get_data("verify_rate"))
#模型采集相关
self.model = None #模型对象 -- 一通道只关联一个模型
self.work_th = None #该通道工作线程句柄
self.b_model = False #是否有运行模型线程
self.bool_run = True # 线程运行标识
self.lock = threading.RLock() # 用于保证线程安全
self.icount_max = icount_max # 帧序列号上限
self.max_len = myCongif.get_data("buffer_len")
self.deque_frame = deque(maxlen=self.max_len) #视频缓冲区用于保存录像
self.last_frame = None # 保存图片数据
#self.frame_queue = queue.Queue(maxsize=1)
self.frame_queue = MyDeque(10) #分析画面MQ
self.counter = 0 #帧序列号--保存报警录像使用
#添加一帧图片
def add_deque(self, value):
if len(self.deque_frame) == self.max_len:
removed_frame = self.deque_frame.popleft()
del removed_frame
removed_frame = None
self.deque_frame.append(value) #deque 满了以后会把前面的数据移除
#拷贝一份数据
def copy_deque(self):
return copy.deepcopy(self.deque_frame)
#获取最后一帧图片
def get_last_frame(self):
if self.b_model:
# with self.lock:
# frame = self.last_frame
# return frame
# try:
# frame = self.frame_queue.get(timeout=0.3) #web传输没有做帧率控制了,可以超时时间长一点
# except queue.Empty:
# self.logger.debug(f"{self.channel_id}--web--获取分析画面失败,队列空")
# return None
frame = self.frame_queue.mypopleft()
return frame
else: #如果没有运行,直接从cap获取画面
if self.cap:
ret, frame = self.cap.read() # 除了第一帧,其它应该都是有画面的
if not ret:
self.logger.debug(f"{self.channel_id}--web--获取原画失败,队列空")
return None
ret, frame_bgr_webp = cv2.imencode('.jpg', frame)
if not ret:
buffer_bgr_webp = None
else:
buffer_bgr_webp = frame_bgr_webp.tobytes()
return buffer_bgr_webp
# frame_bgr_webp = self.encode_frame_to_flv(frame)
# return frame_bgr_webp
return None
def encode_frame_to_flv(self,frame):
try:
process = (
ffmpeg
.input('pipe:', format='rawvideo', pix_fmt='bgr24', s=f'{frame.shape[1]}x{frame.shape[0]}')
.output('pipe:', format='flv',vcodec='libx264')
.run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True)
)
out, err = process.communicate(input=frame.tobytes())
if process.returncode != 0:
raise RuntimeError(f"FFmpeg encoding failed: {err.decode('utf-8')}")
return out
except Exception as e:
print(f"Error during frame encoding: {e}")
return None
def update_last_frame(self,buffer):
if buffer:
self.frame_queue.myappend(buffer)
# with self.lock:
# self.last_frame = None
# self.last_frame = buffer
# if self.frame_queue.full():
# try:
# print("channel--丢帧")
# self.frame_queue.get(timeout=0.01)
# except queue.Empty: #为空不处理
# pass
# self.frame_queue.put(buffer)
# try:
# self.frame_queue.put(buffer,timeout=0.05)
# except queue.Full:
# self.logger.debug(f"{self.channel_id}分析画面队列满,插入失败")
# pass
#帧序列号自增 一个线程中处理,不用加锁
def increment_counter(self):
self.counter += 1
if self.counter > self.icount_max:
self.counter = 0
def get_counter(self):
return self.counter
def _start_cap_th(self,source,type=1):
'''开始cap采集线程
type = 打开摄像头 0--USB摄像头1-RTSP,2-海康SDK
'''
ret = False
if self.cap:
self.cap.release()
self.cap = None
self.cap = VideoCaptureWithFPS(source,type)
if self.cap:
ret = True
return ret
def _stop_cap_th(self):
'''停止cap采集线程
重要约束停止cap线程前必须先停止model线程
'''
if self.b_model:
self.logger.error("停止采集线程前,请先停止model线程")
return False
else:
if self.cap:
self.cap.release()
self.cap = None
return True #一般不会没有cap
def _start_model_th(self,model_data,schedule):
verify_rate = myCongif.get_data("verify_rate")
warn_interval = myCongif.get_data("warn_interval")
self.bool_run = True
self.work_th = threading.Thread(target=self._dowork_thread,
args=(self.channel_id, model_data, schedule, verify_rate,
warn_interval)) # 一个视频通道一个线程
self.work_th.start()
def _stop_model_th(self):
if self.work_th:
if self.b_model:
self.bool_run = False
self.work_th.join() #线程结束时,会把b_model置False
self.logger.debug(f"{self.channel_id}停止工作线程")
self.work_th = None
def _import_model(self,model_name,model_path,threshold,iou_thres):
'''
根据路径动态导入模块
:param model_name: 模块名称 --用通道ID代替
:param model_path: 模块路径
:param threshold: 置信阈值
:param iou_thres: iou阈值
:return:
'''
try:
module_path = model_path.replace("/", ".").rsplit(".", 1)[0]
print(module_path)
# 动态导入模块
module = importlib.import_module(module_path)
# 从模块中获取指定的类
Model = getattr(module, "Model")
# 使用 Model 类
model_instance = Model(model_path,threshold,iou_thres)
return model_instance
except ModuleNotFoundError as e:
print(f"Module not found: {e}")
return None
except AttributeError as e:
print(f"Class not found in module: {e}")
return None
except Exception as e:
print(f"An unexpected error occurred: {e}")
return None
# if os.path.exists(model_path):
# module_spec = importlib.util.spec_from_file_location(model_name, model_path)
# if module_spec is None:
# self.logger.error(f"{model_path} 加载错误")
# return None
# module = importlib.util.module_from_spec(module_spec)
# start_time = time.time()
# print(f"通道{self.channel_id},开始exec_module自定义模型文件")
# module_spec.loader.exec_module(module)
# end_time = time.time()
# print(f"通道{self.channel_id},完成exec_module模型文件实例化耗时{end_time - start_time}")
#
# try:
# md = getattr(module, "Model")(model_path,threshold,iou_thres) #实例化类
# except Exception as e:
# self.logger.error(f"{model_path} 实例化错误,退出模型线程")
# return None
#
# if not isinstance(md, ModelBase):
# self.logger.error("{} not zf_model".format(md))
# return None
# else:
# self.logger.error("{}文件不存在".format(model_path))
# return None
# self.logger.debug(f"{model_path} 加载成功!!!!")
# return md
def _verify(self,frame,model,model_data,schedule,result,isdraw=1):
'''验证执行主函数,实现遍历通道关联的模型,调用对应模型执行验证'''
#img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
img = frame
#验证检测计划,是否在布防时间内
now = datetime.datetime.now() # 获取当前日期和时间
weekday = now.weekday() # 获取星期几,星期一是0,星期天是6
hour = now.hour
result.pop(0) # 保障结果数组定长 --先把最早的结果推出数组
warntext = ""
if model and schedule[weekday][hour] == 1: #不在计划则不进行验证,直接返回图片
# 调用模型,进行检测,model是动态加载的,具体的判断标准由模型内执行 ---- *********
bwarn, warntext = model.verify(img, model_data,isdraw) #****************重要
# 对识别结果要部要进行处理
if bwarn:
# 绘制报警文本
cv2.putText(img, warntext, (50, 50),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
result.append(1) #要验证数组修改,是地址修改吗?
else: #没有产生报警也需要记录,统一计算占比
warntext = ""
result.append(0)
else:
result.append(0)
# 将检测结果图像转换为帧--暂时用不到AVFrame--2024-7-5
# new_frame_rgb_avframe = av.VideoFrame.from_ndarray(img, format="rgb24") # AVFrame
# new_frame_rgb_avframe.pts = None # 添加此行确保有pts属性
# if isinstance(img, np.ndarray): -- 留个纪念
#处理完的图片后返回-bgr模式
#img_bgr_ndarray = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
# 将检查结果转换为WebP格式图片 --在线程里面完成应该可以减少网页端处理时间
ret,frame_bgr_webp=cv2.imencode('.jpg', img)
if not ret:
buffer_bgr_webp = None
else:
buffer_bgr_webp = frame_bgr_webp.tobytes()
#buffer_bgr_webp = self.encode_frame_to_flv(img)
return buffer_bgr_webp,img,warntext
def _dowork_thread(self,channel_id,model_data,schedule,verify_rate,warn_interval):
'''一个通道一个线程,关联的模型在一个线程检测,局部变量都是一个通道独有'''
if not self.cap:
self.logger.error("采集线程未正常启动,退出model线程")
return
#加载自定义模型文件
model = self._import_model(str(channel_id), model_data[5], model_data[8], model_data[9]) # 动态加载模型处理文件py
if not model:
self.logger.error("自定义模型文件加载失败,退出model线程")
return
#初始化模型运行资源
context = None
device_id = myCongif.get_data("device_id")
if self.model_platform == "acl": # ACL线程中初始化内容
context = ACLModeManger.th_inti_acl(device_id) #创建context
#初始化模型资源 -- 加载模型文件
ret = model.init_acl_resource() #加载和初始化离线模型文件--om文件
if not ret:
print("初始化模型资源出错,退出线程!")
return
#初始化业务数据
result = [0 for _ in range(model_data[3] * verify_rate)] # 初始化时间*验证帧率数量的结果list
proportion = model_data[4] # 判断是否报警的占比
warn_last_time = time.time()
#warn_save_count = 0 #保存录像的最新帧初始化为0
#开始循环处理业务
last_frame_time = time.time() #初始化个读帧时间
self.b_model = True
while self.bool_run: #基于tag 作为运行标识。 线程里只是读,住线程更新,最多晚一轮,应该不用线程锁。需验证
# 帧率控制帧率
current_time = time.time()
elapsed_time = current_time - last_frame_time
if elapsed_time < self.frame_interval:
time.sleep(self.frame_interval - elapsed_time) #若小于间隔时间则休眠
last_frame_time = time.time()
#*********取画面*************
ret,frame = self.cap.read() #除了第一帧,其它应该都是有画面的
if not ret:
#self.logger.debug(f"{self.channel_id}--model--获取cap画面失败,队列空")
continue #没读到画面继续
#执行图片推理
buffer_bgr_webp,img_bgr_ndarray,warn_text = self._verify(frame,model,model_data,schedule,result)
#分析图片放入内存中
self.add_deque(img_bgr_ndarray) # 缓冲区大小由maxlen控制 超上限后,删除最前的数据
# 分析画面一直更新最新帧,提供网页端显示
self.update_last_frame(buffer_bgr_webp)
if warn_text:
#验证result -是否触发报警要求 --遍历每个模型执行的result
count_one = float(sum(result)) #1,0 把1累加的和就是1的数量
ratio_of_ones = count_one / len(result)
#self.logger.debug(result)
if ratio_of_ones >= proportion: #触发报警
# 基于时间间隔判断
current_time = time.time()
elapsed_time = current_time - warn_last_time
if elapsed_time < warn_interval:
continue
warn_last_time = current_time
# 处理报警
warn_data = WarnData()
warn_data.model_name = model_data[7]
warn_data.warn_text = warn_text
warn_data.img_buffer = self.copy_deque() # 深度复制缓冲区
warn_data.width = self.cap.width
warn_data.height = self.cap.height
warn_data.channel_id = channel_id
self.warnM.add_warn_data(warn_data)
#结果记录要清空
for i in range(len(result)):
result[i] = 0
# end_time = time.time() # 结束时间
# print(f"Processing time: {end_time - start_time} seconds")
# 本地显示---测试使用
# if channel_id == 2:
# cv2.imshow(str(channel_id), img)
# if cv2.waitKey(1) & 0xFF == ord('q'):
# break
#结束线程
print("开始结束工作线程")
self.b_model = False
#反初始化
if self.model_platform == "acl":
try:
model.release() #释放模型资源资源
# 删除模型对象
del model
#释放context
if context: # ACL线程中反初始化内容 -- 若线程异常退出,这些资源就不能正常释放了
# 再释放context
ACLModeManger.th_del_acl(context)
except Exception as e:
print(e)
#cv2.destroyAllWindows()
print("线程结束!!!!")
def start_work(self,cap_data,model_data,schedule,type=0):
'''
开始工作线程包括视频通道采集和模型处理
:param cap_data: [source,type]
:param model_data: 跟通道关联的模型数据
strsql = (
f"select t1.model_id,t1.check_area,t1.polygon ,t2.duration_time,t2.proportion,t2.model_path,t1.ID,"
f"t2.model_name,t1.conf_thres,t1.iou_thres "
f"from channel2model t1 left join model t2 on t1.model_id = t2.ID where t1.channel_id ={channel_id};")
:param schedule
:param type: 0-启动所有线程1-只启动cap采集线程2-只启动model模型处理线程
:return: True,False
'''
ret = False
if type==0:
self._start_cap_th(cap_data[0],cap_data[1]) #先cap,再model
self._start_model_th(model_data,schedule)
ret = True
elif type == 1:
self._start_cap_th(cap_data[0],cap_data[1])
ret = True
elif type == 2:
self._start_model_th(model_data,schedule)
ret = True
else:
self.logger.error("暂时还不支持该类型工作!")
return ret
def stop_work(self,type=0):
'''
清空数据停止工作线程若有 并删除deque last_frame
:param type: 0-停止所有线程1-只停止cap采集线程2-只停止model模型处理线程
:return: True,False
'''
ret = False
if type == 0:
self._stop_model_th()
self._stop_cap_th()
ret = True
elif type == 1:
self._stop_cap_th()
ret = True
elif type == 2:
self.logger.debug("单独停止工作线程")
self._stop_model_th()
ret = True
else:
self.logger.error("暂时还不支持该类型工作!")
return ret
from myutils.ConfigManager import myCongif
#独立模型线程
from core.ModelNode import ModelNode
from core.ChannelData import ChannelData #其实ChannelNode会更加贴切一些
'''
通道对象管理类只针对通道节点的维护和工作线程的开启和停止 ChannelData的操作必须维护在ChannelManager里面不能输出ChannelData对象
@ -443,6 +14,11 @@ class ChannelManager:
self._channels = {}
self.cm_lock = threading.RLock() # 用于保证字典操作的线程安全
# 独立Model_th相关参数 --- modelNode 用一个类封装下model线程和其相关参数
self.model_list = {} # model_id -- modelNode
self.workType = int(myCongif.get_data("workType"))
self.device_id = myCongif.get_data("device_id")
#增加节点
def add_channel(self, channel_id,deque_length,icount_max,warnM):
ret = False
@ -450,6 +26,8 @@ class ChannelManager:
if channel_id in self._channels: #若已经有数据,先删除后再增加
self._channels[channel_id].stop_work(0) # 停止工作
del self._channels[channel_id]
if self.workType == 2: # 若有model线程停止了,则删除该model节点
self.delModelNode()
#channel_id,deque_length,icount_max,warnM
ch_data = ChannelData(channel_id, deque_length, icount_max,warnM)
self._channels[channel_id] = ch_data
@ -459,7 +37,7 @@ class ChannelManager:
#删除节点
def delete_channel(self, channel_id): #需要验证资源的释放清空
'''删除节点,包含了停止该节点工作线程'''
ret = False
ret = True
with self.cm_lock:
if channel_id == 0:
for clannel_data in self._channels:
@ -469,14 +47,15 @@ class ChannelManager:
if channel_id in self._channels:
self._channels[channel_id].stop_work(0) #停止工作
del self._channels[channel_id]
ret = True
if self.workType == 2: #若有model线程停止了,则删除该model节点
self.delModelNode()
return ret
#开始工作线程
def start_channel(self,channel_id,cap_data,model_data,schedule,type):
'''
启动通道的工作线程 -- 启动应该没有一下子启动所有
:param channel_id: 启动通道的ID
:param channel_id: 启动通道的ID channel_id == 0的情况已经在上一层处理了这里不会有0
:param cap_data: [source,type]
:param model_data: 跟该通道关联的模型数据
:param schedule
@ -486,12 +65,16 @@ class ChannelManager:
ret = False
with self.cm_lock:
if channel_id in self._channels:
ret = self._channels[channel_id].start_work(cap_data,model_data,schedule,type)
c_node = self._channels[channel_id]
model_node = None
if self.workType == 2 and type !=1: #需要确保当type!=1时,model_data必须有数据 -- 调用时已经有判断
model_node = self.CreateModelNode(model_data[0], model_data[5], channel_id)
ret = c_node.start_work(cap_data,model_data,schedule,type,model_node)
return ret
#停止工作线程---要把视频采集线程停止掉
def stop_channel(self,channel_id,type):
def stop_channel(self,channel_id,type): #9-10截止目前就重启模型线程时用到该函数(channel_id,2)
'''
停止通道的工作线程
:param channel_id: 0-停止所有通道的工作线程其他值为具体停止哪个工作线程
@ -509,8 +92,17 @@ class ChannelManager:
if channel_id in self._channels:
self._channels[channel_id].stop_work(type)
ret =True
if self.workType == 2: #若有model线程停止了,则删除该model节点
self.delModelNode()
return ret
def get_channel_data(self,channel_id):
cdata = None
if channel_id in self._channels:
cdata = self._channels[channel_id]
return cdata
def cm_get_last_frame(self,channel_id): #这里如果加锁的话,阻塞会比较厉害,但具体程度也需要验证。
frame = None
if channel_id in self._channels:
@ -529,7 +121,7 @@ class ChannelManager:
for i in range(5):
ret, frame = channel_data.cap.read()
if ret:
ret, frame_bgr_webp = cv2.imencode('.jpg', frame)
ret, frame_bgr_webp = cv2.imencode('.jpg', frame,self.encode_param)
if ret:
# 将图像数据编码为Base64
img_base64 = base64.b64encode(frame_bgr_webp).decode('utf-8')
@ -538,6 +130,20 @@ class ChannelManager:
print(e)
return img_base64
'''模型独立线程修改2024-9-9,要求是双模式兼容'''
def CreateModelNode(self, model_id, model_path, channel_id):
if model_id in self.model_list:
modelN = self.model_list[model_id]
else:
modelN = ModelNode(self.device_id,model_path)
self.model_list[model_id] = modelN
return modelN
def delModelNode(self): #关于modelnodel :1.考虑modelnode是否可以不删除,清空inmq即可,2.mdel_list是否需要加锁。#?
return
for model_id, modelNode in self.model_list.items():
if modelNode.ch_count == 0:
del self.model_list[model_id]
if __name__ == "__main__":

19
core/DataStruct.py

@ -22,4 +22,23 @@ class CSAPPLogin_Data(ctypes.Structure):
('passwd', ctypes.c_char * 32)
]
class ModelinData:
def __init__(self,channel_id,img,image,scale_ratio, pad_size):
self.channel_id = channel_id #通道ID
self.img = img #预处理后的图片数据
self.image = image #原图的数据
self.scale_ratio = scale_ratio
self.pad_size = pad_size
def __del__(self):
pass
class ModeloutData:
def __init__(self,image,scale_ratio, pad_size,outputs):
self.image = image #原图
self.outputs = outputs #模型推理后结果
self.scale_ratio = scale_ratio
self.pad_size = pad_size
def __del__(self):
pass

46
core/ModelManager.py

@ -1,5 +1,5 @@
# 导入代码依赖
import cv2
from core.DBManager import mDBM
from myutils.MyLogger_logger import LogHandler
from myutils.ConfigManager import myCongif
@ -7,7 +7,6 @@ from core.ChannelManager import ChannelManager
from core.ACLModelManager import ACLModeManger
from core.WarnManager import WarnManager
class ModelManager:
def __init__(self):
self.verify_list = ChannelManager() #模型的主要数据 -- 2024-7-5修改为类管理通道数据
@ -18,6 +17,7 @@ class ModelManager:
# 报警处理线程-全进程独立一个线程处理
self.warnM = None
# acl初始化 -- 一个进程一个
self.model_platform = myCongif.get_data("model_platform")
if self.model_platform == "acl":
@ -50,6 +50,8 @@ class ModelManager:
self.warnM = WarnManager()
self.warnM.start_warnmanager_th()
#工作线程:cap+model
if channel_id ==0:
strsql = "select id,ulr,type from channel where is_work = 1;" #执行所有通道
@ -70,11 +72,10 @@ class ModelManager:
f"t2.model_name,t1.conf_thres,t1.iou_thres "
f"from channel2model t1 left join model t2 on t1.model_id = t2.ID where t1.channel_id ={channel_id};")
model_data = mDBM.do_select(strsql, 1) # 2024-7-12调整规则,一个通道只关联一个模型,表结构暂时不动
# cap_data[source,type]
cap_data = [data[1], data[2]]
cap_data = [data[1], data[2]] # cap_data[source,type]
if model_data and model_data[0]: # 如果该通道关联了模型
schedule = mDBM.getschedule(model_data[6]) # 获取布防计划
#启动
#启动通道线程
self.verify_list.start_channel(channel_id,cap_data,model_data,schedule,0) #cap + model
else:
self.logger.debug(f"{channel_id}通道没有关联模型,只运行cap采集线程")
@ -83,10 +84,10 @@ class ModelManager:
else:
print("*****有错误********")
def stop_work(self,channel_id=0): #要对应start_work 1.停止model线程,2.停止cap线程。3.删除c_data
def stop_work(self,channel_id=0): #全停 要对应start_work 1.停止model线程,2.停止cap线程。3.删除c_data
'''停止工作线程(包括采集线程,并删除通道数据对象),0-停止所有,非0停止对应通道ID的线程'''
try:
self.verify_list.delete_channel(channel_id) # 1,2,3
self.verify_list.delete_channel(channel_id)
except Exception as e:
print(e)
@ -96,6 +97,17 @@ class ModelManager:
del self.warnM
self.warnM = None
def restartC2M(self,channel_id):
'''
修改通道管理的算法模型后需要对该通道算法执行部分重新加载执行 -- 只重启model线程
:param channel_id:
:return:
'''
#停止该通道的工作线程 -- 不停cap
self.verify_list.stop_channel(channel_id,2)
#重新读取工作线程数据,重启该通道的工作线程
self.startModelWork(channel_id)
def startModelWork(self,channel_id):
strsql = (
f"select t1.model_id,t1.check_area,t1.polygon ,t2.duration_time,t2.proportion,t2.model_path,t1.ID,"
@ -104,28 +116,14 @@ class ModelManager:
model_data = mDBM.do_select(strsql, 1) # 2024-7-12调整规则,一个通道只关联一个模型,表结构暂时不动
if model_data and model_data[0]: # 如果该通道关联了模型
schedule = mDBM.getschedule(model_data[6]) # 获取布防计划
# cap_data[source,type]
cap_data = None
cap_data = None # cap_data[source,type]
# 启动
self.logger.debug(f"通道{channel_id}重新启动model线程")
self.verify_list.start_channel(channel_id, cap_data, model_data, schedule, 2) # cap + model
self.verify_list.start_channel(channel_id, cap_data, model_data, schedule, 2) # model
else:
self.logger.debug(f"通道{channel_id}没有关联模型,不需要启动model线程")
def restartC2M(self,channel_id):
'''
修改通道管理的算法模型后需要对该通道算法执行部分重新加载执行 -- 只重启model线程
:param channel_id:
:return:
'''
#停止该通道的工作线程 -- 不停cap
self.verify_list.stop_channel(channel_id,2)
#重新读取工作线程数据,重启该通道的工作线程
self.startModelWork(channel_id)
'''模型独立线程修改2024-9-9,要求是双模式兼容'''
def thModel(self):
pass
def test1(self):
# from model.plugins.RYRQ_Model_ACL.RYRQ_Model_ACL import Model

133
core/ModelNode.py

@ -0,0 +1,133 @@
import threading
import importlib.util
import time
from myutils.MyDeque import MyDeque
from myutils.ConfigManager import myCongif
from myutils.MyLogger_logger import LogHandler
from core.ACLModelManager import ACLModeManger
from core.DataStruct import ModelinData,ModeloutData
from threading import Lock
class ModelNode:
def __init__(self,device,model_path):
self.device = device
self.model_path = model_path
self.model = None #模型对象
self.model_th = None #模型线程句柄
self.brun = True #模型控制标识
self.model_th_status = 0 #模型线程运行状态 0--初始状态,1-线程执行成功,2-线程退出
self.in_mq = MyDeque(50) #
self.channel_list = {} #channel_id out_mq --需要线程安全
self.clist_Lock = Lock() #channel_list的维护锁
self.ch_count = 0 #关联启动的通道数量
self.count_Lock = Lock() #count的维护锁
self.model_platform = myCongif.get_data("model_platform")
self.logger = LogHandler().get_logger("ModelNode")
def __del__(self):
pass
def _reset(self): #重置数据
#self.model_th_status = 0 # 模型线程运行状态 0--初始状态,1-线程执行成功,2-线程退出
self.in_mq.myclear()
def _import_model(self,model_path,threshold=0.5,iou_thres=0.5):
'''
根据路径动态导入模块
:param model_path: 模块路径
:param threshold: 置信阈值
:param iou_thres: iou阈值
:return:
'''
try:
module_path = model_path.replace("/", ".").rsplit(".", 1)[0]
print(module_path)
# 动态导入模块
module = importlib.import_module(module_path)
# 从模块中获取指定的类
Model = getattr(module, "Model")
# 使用 Model 类
model_instance = Model(model_path,threshold,iou_thres)
return model_instance
except ModuleNotFoundError as e:
print(f"Module not found: {e}")
return None
except AttributeError as e:
print(f"Class not found in module: {e}")
return None
except Exception as e:
print(f"An unexpected error occurred: {e}")
return None
def _model_th(self):
# 加载自定义模型文件
self.model = self._import_model(self.model_path) # 动态加载模型处理文件py
if not self.model:
self.logger.error("自定义模型文件加载失败,退出model线程")
self.model_th_status = 2
return
# 初始化模型运行资源
context = None
if self.model_platform == "acl": # ACL线程中初始化内容
context = ACLModeManger.th_inti_acl(self.device) # 创建context
# 初始化模型资源 -- 加载模型文件
ret = self.model.init_acl_resource() # 加载和初始化离线模型文件--om文件
if not ret:
print("初始化模型资源出错,退出线程!")
self.model_th_status = 2
return
#执行工作
self.model_th_status = 1
while self.brun:
inData = self.in_mq.mypopleft() #空时,返回None #(self,channel_id,img,image,scale_ratio, pad_size):
if inData:
outputs = self.model.execute([inData.img,])#创建input,执行模型,返回结果 --失败返回None
outdata = ModeloutData(inData.image,inData.scale_ratio,inData.pad_size,outputs)
del inData.img
with self.clist_Lock:
if inData.channel_id in self.channel_list:
self.channel_list[inData.channel_id].myappend(outdata)
else:
time.sleep(0.05)
#结束线程,释放资源
self.model_th_status = 0
self._reset()
# 反初始化
if self.model_platform == "acl":
try:
self.model.release() # 释放模型资源资源
# 删除模型对象
del self.model
# 释放context
if context: # ACL线程中反初始化内容 -- 若线程异常退出,这些资源就不能正常释放了
# 再释放context
ACLModeManger.th_del_acl(context)
except Exception as e:
print(e)
def start_model_th(self,channel_id,out_mq):
with self.count_Lock:
with self.clist_Lock:
if channel_id in self.channel_list:
return #这个可以删除老的,新增新的
self.channel_list[channel_id] = out_mq
if self.ch_count == 0: #第一次启动线程
self.brun = True
self.model_th = threading.Thread(target=self._model_th)
self.model_th.start()
self.ch_count += 1 #有通道调用一次就加一
def stop_model_th(self,channel_id):
with self.count_Lock:
with self.clist_Lock:
if channel_id in self.channel_list:
del self.channel_list[channel_id]
self.ch_count -= 1
if self.ch_count == 0: #所有通道结束
self.brun = False
self.model_th.join()
self.model_th = None

9
model/base_model/ascnedcl/classes.py

@ -0,0 +1,9 @@
CLASSES = ['person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', 'boat', 'traffic light',
'fire hydrant', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow',
'elephant', 'bear', 'zebra', 'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee',
'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard',
'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple',
'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch',
'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone',
'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear',
'hair drier', 'toothbrush']

1
model/base_model/ascnedcl/det_utils.py

@ -55,6 +55,7 @@ def letterbox(img, new_shape=(640, 640), color=(114, 114, 114), auto=False, scal
return img, ratio, (dw, dh)
def non_max_suppression(
prediction,
conf_thres=0.25,

107
model/base_model/ascnedcl/det_utils_v10.py

@ -0,0 +1,107 @@
import cv2
import numpy as np
from model.base_model.ascnedcl.classes import CLASSES
def letterbox(img, new_shape=(640, 640), auto=False, scaleFill=False, scaleup=True, center=True, stride=32):
# Resize and pad image while meeting stride-multiple constraints
shape = img.shape[:2] # current shape [height, width]
if isinstance(new_shape, int):
new_shape = (new_shape, new_shape)
# Scale ratio (new / old)
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
if not scaleup: # only scale down, do not scale up (for better val mAP)
r = min(r, 1.0)
# Compute padding
ratio = r, r # width, height ratios
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding
if auto: # minimum rectangle
dw, dh = np.mod(dw, stride), np.mod(dh, stride) # wh padding
elif scaleFill: # stretch
dw, dh = 0.0, 0.0
new_unpad = (new_shape[1], new_shape[0])
ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios
if center:
dw /= 2 # divide padding into 2 sides
dh /= 2
if shape[::-1] != new_unpad: # resize
img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR)
top, bottom = int(round(dh - 0.1)) if center else 0, int(round(dh + 0.1))
left, right = int(round(dw - 0.1)) if center else 0, int(round(dw + 0.1))
img = cv2.copyMakeBorder(
img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=(114, 114, 114)
) # add border
return img, ratio, dw, dh
def non_max_suppression_v10(prediction,conf_thres,ratio,dw,dh):
result = []
for i in range(prediction.shape[0]):
data = prediction[i]
# 读取类别置信度
confidence = data[4]
# 用阈值进行过滤
if confidence > conf_thres:
# 读取类别索引
label = int(data[5])
# 读取类坐标值,把坐标还原到原始图像
xmin = int((data[0] - int(round(dw - 0.1))) / ratio[0])
ymin = int((data[1] - int(round(dh - 0.1))) / ratio[1])
xmax = int((data[2] - int(round(dw + 0.1))) / ratio[0])
ymax = int((data[3] - int(round(dh + 0.1))) / ratio[1])
result.append([xmin, ymin, xmax, ymax, confidence, label])
return result
def draw_bbox_old(bbox, img0, color, wt):
det_result_str = ''
for idx, class_id in enumerate(bbox[:, 5]):
if float(bbox[idx][4] < float(0.05)):
continue
img0 = cv2.rectangle(img0, (int(bbox[idx][0]), int(bbox[idx][1])), (int(bbox[idx][2]), int(bbox[idx][3])), color, wt)
img0 = cv2.putText(img0, str(idx) + ' ' + CLASSES[int(class_id)], (int(bbox[idx][0]), int(bbox[idx][1] + 16)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
img0 = cv2.putText(img0, '{:.4f}'.format(bbox[idx][4]), (int(bbox[idx][0]), int(bbox[idx][1] + 32)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
det_result_str += '{} {} {} {} {} {}\n'.format(CLASSES[bbox[idx][5]], str(bbox[idx][4]), bbox[idx][0], bbox[idx][1], bbox[idx][2], bbox[idx][3])
return img0
def draw_box(img,
box, # [xmin, ymin, xmax, ymax]
score,
class_id):
'''Draws a bounding box on the image'''
# Retrieve the color for the class ID
color_palette = np.random.uniform(0, 255, size=(len(CLASSES), 3))
color = color_palette[class_id]
# Draw the bounding box on the image
cv2.rectangle(img, (box[0], box[1]), (box[2], box[3]), color, 2)
# Create the label text with class name and score
label = f'{CLASSES[class_id]}: {score:.2f}'
# Calculate the dimensions of the label text
(label_width, label_height), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
# Calculate the position of the label text
label_x = box[0]
label_y = box[1] - 10 if box[1] - 10 > label_height else box[1] + 10
# Draw a filled rectangle as the background for the label text
cv2.rectangle(
img,
(int(label_x), int(label_y - label_height)),
(int(label_x + label_width), int(label_y + label_height)),
color,
cv2.FILLED,
)
# Draw the label text on the image
cv2.putText(img, label, (int(label_x), int(label_y)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
return img

8
model/plugins/ModelBase.py

@ -259,4 +259,12 @@ class ModelBase(ABC):
:param isdraw: 是否需要绘制线框0-不绘制1-绘制
:return: detections,bwarn,warntext bwarn:0-没有识别到符合要求的目标1-没有识别到符合要求的目标
'''
pass
@abstractmethod
def prework(self,image):
pass
@abstractmethod
def postwork(self,image,outputs,scale_ratio,pad_size,check_area,polygon,conf_threshold,iou_thres):
pass

76
model/plugins/RYRQ_ACL/RYRQ_Model_ACL.py

@ -11,6 +11,8 @@ class Model(ModelBase):
dirpath, filename = os.path.split(path)
self.model_file = os.path.join(dirpath, "yolov5s_bs1.om") # 目前约束模型文件和py文件在同一目录
self.coco_file = os.path.join(dirpath, "coco_names.txt")
self.labels_dict = get_labels_from_txt(self.coco_file) # 得到类别信息,返回序号与类别对应的字典
super().__init__(self.model_file) # acl环境初始化基类负责类的实例化
self.name = "人员入侵-yolov5"
@ -19,9 +21,72 @@ class Model(ModelBase):
self.neth = 640 # 缩放的目标高度, 也即模型的输入高度
self.netw = 640 # 缩放的目标宽度, 也即模型的输入宽度
self.conf_threshold = threshold # 置信度阈值
self.iou_thres = iou_thres # IOU阈值
# self.conf_threshold = threshold # 置信度阈值
# self.iou_thres = iou_thres #IOU阈值
def prework(self,image):
'''模型输入图片数据前处理 --- 针对每个模型特有的预处理内容 -'''
img, scale_ratio, pad_size = letterbox(image, new_shape=[640, 640]) # 对图像进行缩放与填充
img = img[:, :, ::-1].transpose(2, 0, 1) # BGR to RGB, HWC to CHW #图片在输入时已经做了转换
img = np.ascontiguousarray(img, dtype=np.float32) / 255.0 # 转换为内存连续存储的数组
return img,scale_ratio, pad_size
def postwork(self,image,outputs,scale_ratio,pad_size,check_area,polygon,conf_threshold,iou_thres):
'''
针对每个模型特有的后处理内容
:param image:
:param outputs:
:param scale_ratio:
:param pad_size:
:param check_area:
:param polygon:
:param conf_threshold:
:param iou_thres:
:return:
'''
bwarn = False
warn_text = ""
# 是否有检测区域,有先绘制检测区域 由于在该函数生成了polygon对象,所有需要在检测区域前调用。
if check_area == 1:
self.draw_polygon(image, polygon, (255, 0, 0))
if outputs:
output = outputs[0] # 只放了一张图片 -- #是否能批量验证?
# 后处理 -- boxout 是 tensor-list: [tensor([[],[].[]])] --[x1,y1,x2,y2,置信度,coco_index]
# 利用非极大值抑制处理模型输出,conf_thres 为置信度阈值,iou_thres 为iou阈值
output_torch = torch.tensor(output)
boxout = nms(output_torch, conf_thres=conf_threshold, iou_thres=iou_thres)
del output_torch
pred_all = boxout[0].numpy() # 转换为numpy数组 -- [[],[],[]] --[x1,y1,x2,y2,置信度,coco_index]
# pred_all[:, :4] 取所有行的前4列,pred_all[:,1]--第一列
scale_coords([640, 640], pred_all[:, :4], image.shape, ratio_pad=(scale_ratio, pad_size)) # 将推理结果缩放到原始图片大小
# 过滤掉不是目标标签的数据 -- 序号0-- person self.labels_dict --这个考虑下是否可以放到nms前面#?
filtered_pred_all = pred_all[pred_all[:, 5] == 0]
# 绘制检测结果 --- 也需要封装在类里,
for pred in filtered_pred_all:
x1, y1, x2, y2 = int(pred[0]), int(pred[1]), int(pred[2]), int(pred[3])
# # 绘制目标识别的锚框 --已经在draw_bbox里处理
# cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
if check_area == 1: # 指定了检测区域
x_center = (x1 + x2) / 2
y_center = (y1 + y2) / 2
# 绘制中心点?
cv2.circle(image, (int(x_center), int(y_center)), 5, (0, 0, 255), -1)
# 判断是否区域点
if not self.is_point_in_region((x_center, y_center)):
continue # 没产生报警-继续
# 产生报警 -- 有一个符合即可
bwarn = True
warn_text = "People Intruder detected!"
draw_bbox(filtered_pred_all, image, (0, 255, 0), 2, self.labels_dict) # 画出检测框、类别、概率
# 清理内存
del outputs, output
del boxout
del pred_all, filtered_pred_all
#图片绘制是在原图生效 image
return bwarn, warn_text
def verify(self,image,data,isdraw=1):
labels_dict = get_labels_from_txt('/mnt/zfbox/model/plugins/RYRQ_ACL/coco_names.txt') # 得到类别信息,返回序号与类别对应的字典
@ -36,7 +101,6 @@ class Model(ModelBase):
filtered_pred_all = None
bwarn = False
warn_text = ""
# 是否有检测区域,有先绘制检测区域 由于在该函数生成了polygon对象,所有需要在检测区域前调用。
if data[1] == 1:
self.draw_polygon(image, data[2], (255, 0, 0))
@ -70,8 +134,8 @@ class Model(ModelBase):
bwarn = True
warn_text = "People Intruder detected!"
draw_bbox(filtered_pred_all, image, (0, 255, 0), 2, labels_dict) # 画出检测框、类别、概率
#清理内存
del outputs,output
# 清理内存
del outputs, output
del boxout
del pred_all,filtered_pred_all
@ -79,4 +143,4 @@ class Model(ModelBase):
return bwarn, warn_text
def testRun(self):
print("1111")
print("I am RYRQ-Model-ACL!!!")

103
model/plugins/RYRQ_Model_ACL/RYRQ_Model_ACL.py

@ -1,6 +1,7 @@
import os.path
from model.plugins.ModelBase import ModelBase
from model.base_model.ascnedcl.det_utils import get_labels_from_txt, letterbox, scale_coords, nms, draw_bbox # 模型前后处理相关函数
#from model.base_model.ascnedcl.det_utils import get_labels_from_txt, letterbox, scale_coords, nms, draw_bbox # 模型前后处理相关函数
from model.base_model.ascnedcl.det_utils_v10 import draw_box,draw_bbox_old, letterbox,non_max_suppression_v10
import cv2
import numpy as np
import torch # 深度学习运算框架,此处主要用来处理数据
@ -9,71 +10,127 @@ class Model(ModelBase):
def __init__(self,path,threshold=0.5,iou_thres=0.5):
# 找pt模型路径 -- 一个约束py文件和模型文件的路径关系需要固定, -- 上传模型时,要解压好路径
dirpath, filename = os.path.split(path)
self.model_file = os.path.join(dirpath, "yolov5s_bs1.om") # 目前约束模型文件和py文件在同一目录
#self.model_file = os.path.join(dirpath, "yolov5s_bs1.om") # 目前约束模型文件和py文件在同一目录
self.model_file = os.path.join(dirpath, "yolov10m_310B4.om") # 目前约束模型文件和py文件在同一目录
self.coco_file = os.path.join(dirpath, "coco_names.txt")
#self.labels_dict = get_labels_from_txt(self.coco_file) # 得到类别信息,返回序号与类别对应的字典
super().__init__(self.model_file) # acl环境初始化基类负责类的实例化
self.name = "人员入侵-yolov5"
self.name = "人员入侵-yolov10"
self.version = "V1.0"
self.model_type = 2
self.neth = 640 # 缩放的目标高度, 也即模型的输入高度
self.netw = 640 # 缩放的目标宽度, 也即模型的输入宽度
self.conf_threshold = threshold # 置信度阈值
self.iou_thres = iou_thres #IOU阈值
def verify(self,image,data,isdraw=1):
labels_dict = get_labels_from_txt('/mnt/zfbox/model/plugins/RYRQ_ACL/coco_names.txt') # 得到类别信息,返回序号与类别对应的字典
# 数据前处理
def prework(self,image):
'''模型输入图片数据前处理 --- 针对每个模型特有的预处理内容 -'''
img, scale_ratio, pad_size = letterbox(image, new_shape=[640, 640]) # 对图像进行缩放与填充
img = img[:, :, ::-1].transpose(2, 0, 1) # BGR to RGB, HWC to CHW #图片在输入时已经做了转换
img = np.ascontiguousarray(img, dtype=np.float32) / 255.0 # 转换为内存连续存储的数组
return img,scale_ratio, pad_size
# 模型推理, 得到模型输出
outputs = self.execute([img,])#创建input,执行模型,返回结果 --失败返回None
filtered_pred_all = None
def postwork(self,image,outputs,scale_ratio,pad_size,check_area,polygon,conf_threshold,iou_thres):
'''
针对每个模型特有的后处理内容
:param image:
:param outputs:
:param scale_ratio:
:param pad_size:
:param check_area:
:param polygon:
:param conf_threshold:
:param iou_thres:
:return:
'''
bwarn = False
warn_text = ""
# 是否有检测区域,有先绘制检测区域 由于在该函数生成了polygon对象,所有需要在检测区域前调用。
if data[1] == 1:
self.draw_polygon(image, data[2], (255, 0, 0))
if check_area == 1:
self.draw_polygon(image, polygon, (255, 0, 0))
if outputs:
output = outputs[0] #只放了一张图片
output = outputs[0] # 只放了一张图片 -- #是否能批量验证?
# 后处理 -- boxout 是 tensor-list: [tensor([[],[].[]])] --[x1,y1,x2,y2,置信度,coco_index]
# 利用非极大值抑制处理模型输出,conf_thres 为置信度阈值,iou_thres 为iou阈值
output_torch = torch.tensor(output)
boxout = nms(output_torch, conf_thres=0.4,iou_thres=0.5)
boxout = nms(output_torch, conf_thres=conf_threshold, iou_thres=iou_thres)
del output_torch
pred_all = boxout[0].numpy() # 转换为numpy数组 -- [[],[],[]] --[x1,y1,x2,y2,置信度,coco_index]
# pred_all[:, :4] 取所有行的前4列,pred_all[:,1]--第一列
scale_coords([640, 640], pred_all[:, :4], image.shape, ratio_pad=(scale_ratio, pad_size)) # 将推理结果缩放到原始图片大小
#过滤掉不是目标标签的数据 -- 序号0-- person
# 过滤掉不是目标标签的数据 -- 序号0-- person self.labels_dict --这个考虑下是否可以放到nms前面#?
filtered_pred_all = pred_all[pred_all[:, 5] == 0]
# 绘制检测结果 --- 也需要封装在类里,
for pred in filtered_pred_all:
x1, y1, x2, y2 = int(pred[0]), int(pred[1]), int(pred[2]), int(pred[3])
# # 绘制目标识别的锚框 --已经在draw_bbox里处理
# cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
if data[1] == 1: # 指定了检测区域
if check_area == 1: # 指定了检测区域
x_center = (x1 + x2) / 2
y_center = (y1 + y2) / 2
#绘制中心点?
# 绘制中心点?
cv2.circle(image, (int(x_center), int(y_center)), 5, (0, 0, 255), -1)
#判断是否区域点
# 判断是否区域点
if not self.is_point_in_region((x_center, y_center)):
continue #没产生报警-继续
#产生报警 -- 有一个符合即可
continue # 没产生报警-继续
# 产生报警 -- 有一个符合即可
bwarn = True
warn_text = "People Intruder detected!"
draw_bbox(filtered_pred_all, image, (0, 255, 0), 2, labels_dict) # 画出检测框、类别、概率
draw_bbox(filtered_pred_all, image, (0, 255, 0), 2, self.labels_dict) # 画出检测框、类别、概率
# 清理内存
del outputs, output
del boxout
del pred_all,filtered_pred_all
del pred_all, filtered_pred_all
#图片绘制是在原图生效 image
return bwarn, warn_text
def verify(self,image,data,isdraw=1):
# 数据前处理
img, scale_ratio, dw,dh = letterbox(image, new_shape=[self.netw, self.neth]) # 对图像进行缩放与填充
img = img[:, :, ::-1].transpose(2, 0, 1) # BGR to RGB, HWC to CHW #图片在输入时已经做了转换
img = np.ascontiguousarray(img, dtype=np.float32) / 255.0 # 转换为内存连续存储的数组
# 模型推理, 得到模型输出
outputs = self.execute([img,])#创建input,执行模型,返回结果 --失败返回None
filtered_pred_all = None
bwarn = False
warn_text = ""
# 是否有检测区域,有先绘制检测区域 由于在该函数生成了polygon对象,所有需要在检测区域前调用。
if data[1] == 1:
self.draw_polygon(image, data[2], (255, 0, 0))
if outputs:
output = np.squeeze(outputs[0]) #移除张量为1的维度 --暂时不明白其具体意义
pred_all = non_max_suppression_v10(output,self.conf_threshold,scale_ratio,dw,dh)
for xmin, ymin, xmax, ymax, confidence, label in pred_all:
# # 绘制目标识别的锚框 --已经在draw_bbox里处理
# cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
draw_box(image, [xmin, ymin, xmax, ymax], confidence, label) # 画出检测框、类别、概率
if label == 0: # person
#判断是否产生告警
x1, y1, x2, y2 = int(xmin), int(ymin), int(xmax), int(ymax)
if data[1] == 1: # 指定了检测区域
x_center = (x1 + x2) / 2
y_center = (y1 + y2) / 2
# 绘制中心点?
cv2.circle(image, (int(x_center), int(y_center)), 5, (0, 0, 255), -1)
# 判断是否区域点
if not self.is_point_in_region((x_center, y_center)):
continue # 没产生报警-继续
# 产生报警 -- 有一个符合即可
bwarn = True
warn_text = "People Intruder detected!"
# 清理内存
del outputs, output
del pred_all,filtered_pred_all
#cv2.imwrite('img_res.png', img_dw)
return bwarn, warn_text

15
myutils/MyDeque.py

@ -3,9 +3,18 @@ from threading import Thread, Lock
class MyDeque:
def __init__(self,maxlen=1):
self.len = maxlen
self.dq = deque(maxlen=maxlen)
self.lock = Lock()
def __del__(self):
del self.dq
def isfull(self):
if len(self.dq) == self.len:
return True
return False
def myappend(self,object):
with self.lock:
self.dq.append(object)
@ -15,4 +24,8 @@ class MyDeque:
with self.lock:
if self.dq:
object = self.dq.popleft()
return object
return object
def myclear(self):
with self.lock:
self.dq.clear()

0
myutils/myutil.py

76
web/API/viedo.py

@ -7,6 +7,8 @@ from myutils.ConfigManager import myCongif
import logging
import time
import subprocess
from concurrent.futures import ThreadPoolExecutor
import threading
from collections import deque
# 配置日志
@ -15,6 +17,9 @@ logger = logging.getLogger(__name__)
#-------------------基于WEBRTC实现拉流
#pcs = set() #创建一个空的集合,去重复且无序
pcs = {}
active_tasks = {} # 用来存储每个channel的任务
executor = ThreadPoolExecutor(max_workers=4)
'''
---------------------传输--------------------------
@ -139,6 +144,9 @@ async def handle_channel_rtf(channel_id,websocket):
finally:
process.terminate()
def send_frame_thread(channel_id,websocket):
pass
async def handle_channel(channel_id,websocket):
verify_rate = int(myCongif.get_data("verify_rate"))
error_max_count = verify_rate * int(myCongif.get_data("video_error_count")) # 视频帧捕获失败触发提示的上限
@ -150,6 +158,16 @@ async def handle_channel(channel_id,websocket):
#视频传输缓冲区
#frame_buffer = deque(maxlen=10)
try:
cnode = mMM.verify_list.get_channel_data(channel_id)
if cnode is None:
print("---channel_id--错误--")
return
frame_count = 0
start_time = time.time()
all_time = 0
get_all_time = 0
send_all_time = 0
while True: # 这里的多线程并发,还需要验证检查
# 帧率控制帧率 ---输出暂时不控制帧率,有就传输
current_time = time.time()
@ -158,11 +176,20 @@ async def handle_channel(channel_id,websocket):
await asyncio.sleep(frame_interval - elapsed_time) # 若小于间隔时间则休眠
last_frame_time = time.time()
# 执行视频传输
frame = mMM.verify_list.cm_get_last_frame(channel_id) # get_last_frame 用了try
#frame = mMM.verify_list.cm_get_last_frame(channel_id) # get_last_frame 用了try
get_stime = time.time()
frame = cnode.get_last_frame()
get_etime = time.time()
get_all_time = get_all_time + (get_etime - get_stime)
if frame is not None:
# frame_buffer.append(frame) #先放入缓冲区
#7print("KB---",len(frame)/1024)
icount = 0
send_stime = time.time()
await websocket.send(frame)
send_etime = time.time()
send_all_time = send_all_time + (send_etime - send_stime)
else:
# print("frame is None")
icount += 1
@ -172,6 +199,26 @@ async def handle_channel(channel_id,websocket):
error_message = b"video_error"
await websocket.send(error_message)
await asyncio.sleep(sleep_time) # 等待视频重连时间
#----------输出时间-----------
frame_count += 1
end_time = time.time()
# 计算时间差
el_time = end_time - start_time
all_time = all_time + (end_time - current_time)
# 每隔一定时间(比如5秒)计算一次帧率
if el_time >= 10:
fps = frame_count / el_time
print(f"当前帧率: {fps} FPS,循环次数:{frame_count},花费总耗时:{all_time}S,get耗时:{get_all_time},send耗时:{send_all_time}")
# 重置计数器和时间
frame_count = 0
all_time = 0
get_all_time = 0
send_all_time = 0
start_time = time.time()
# print(f"get_frame:{round(get_etime-get_stime,5)}Sceond;"
# f"send_frame:{round(send_etime-send_stime,5)}Sceond;"
# f"All_time={round(end_time-current_time,5)}")
except asyncio.CancelledError:
print(f"WebSocket connection for channel {channel_id} closed by client")
raise
@ -184,12 +231,29 @@ async def handle_channel(channel_id,websocket):
@api.websocket('/ws/video_feed/<int:channel_id>')
async def ws_video_feed(channel_id):
print(f"New connection for channel: {channel_id}")
# 为每个通道创建独立的协程
shendtask = asyncio.create_task(handle_channel(channel_id, websocket))
#shendtask = asyncio.create_task(handle_channel_rtf(channel_id, websocket))
# 等待协程完成
await shendtask
if channel_id in active_tasks:
active_tasks[channel_id].cancel()
try:
await active_tasks[channel_id] # 确保旧任务被完全取消
del active_tasks[channel_id]
except asyncio.CancelledError:
print(f"旧任务 {channel_id} 已取消")
try:
# 为每个通道创建独立的协程
shendtask = asyncio.create_task(handle_channel(channel_id, websocket))
#shendtask = asyncio.create_task(handle_channel_rtf(channel_id, websocket))
# 将任务存储到 active_tasks
active_tasks[channel_id] = shendtask
# 等待协程完成
await shendtask
except Exception as e:
print(f"Channel {channel_id} 出现异常: {e}")
finally:
# 移除已完成的任务
if channel_id in active_tasks:
del active_tasks[channel_id]
print(f"Cleaning up resources for channel {channel_id}")
@api.route('/shutdown', methods=['POST'])
async def shutdown():#这是全关 --需要修改

Loading…
Cancel
Save