You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

336 lines
14 KiB

# 导入代码依赖
import time
import av
import os
import cv2
import numpy as np
import threading
import importlib.util
import datetime
import math
import queue
from collections import deque
from core.DBManager import mDBM,DBManager
from myutils.MyLogger_logger import LogHandler
from myutils.ConfigManager import myCongif
from model.plugins.ModelBase import ModelBase
from core.ChannelManager import ChannelManager
from core.ACLModelManager import ACLModeManger
from core.WarnManager import WarnManager
from PIL import Image
class VideoCaptureWithFPS:
'''视频捕获的封装类,是一个通道一个'''
def __init__(self, source):
self.source = source
self.width = None
self.height = None
#GStreamer
rtsp_stream = f"rtspsrc location={self.source} ! decodebin ! videoconvert ! appsink"
self.cap = cv2.VideoCapture(rtsp_stream, cv2.CAP_GSTREAMER)
#opencv
#self.cap = cv2.VideoCapture(self.source)
if self.cap.isOpened(): #若没有打开成功,在读取画面的时候,已有判断和处理 -- 这里也要检查下内存的释放情况
self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(self.width,self.height)
#self.fps = fps # 线程保持最大帧率的刷新画面---过高的帧率会影响CPU性能,但过地的帧率会造成帧积压
self.fps = math.ceil(self.cap.get(cv2.CAP_PROP_FPS)/float(myCongif.get_data("verify_rate"))) #向上取整。
#print(self.fps)
self.running = True
#self.frame_queue = queue.Queue(maxsize=1)
self.frame = None
self.read_lock = threading.Lock()
self.thread = threading.Thread(target=self.update)
self.thread.start()
def update(self):
icount = 0
while self.running:
#start_time = time.time()
ret, frame = self.cap.read()
# end_time = time.time()
# print(f"read()耗时:{(end_time-start_time):.6f}")
if not ret:
icount += 1
if icount > 5: #重连
self.cap.release()
self.cap = cv2.VideoCapture(self.source)
if self.cap.isOpened():
self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# self.fps = fps # 线程保持最大帧率的刷新画面---过高的帧率会影响CPU性能,但过地的帧率会造成帧积压
self.fps = math.ceil(
self.cap.get(cv2.CAP_PROP_FPS) / float(myCongif.get_data("verify_rate"))) # 向上取整。
icount = 0
else:
print(f"{self.source}视频流,将于5分钟后重连!")
time.sleep(myCongif.get_data("cap_sleep_time"))
continue
#resized_frame = cv2.resize(frame, (int(self.width / 2), int(self.height / 2)))
with self.read_lock:
self.frame = frame
# if not self.frame_queue.full():
# self.frame_queue.put(resized_frame)
# 跳过指定数量的帧以避免积压
for _ in range(self.fps):
self.cap.grab()
# time.sleep(self.fps) #按照视频源的帧率进行休眠
#print("Frame updated at:",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
def read(self):
with self.read_lock:
frame = self.frame.copy() if self.frame is not None else None
#frame = self.frame
if frame is not None:
return True, frame
else:
return False, None
# if not self.frame_queue.empty():
# return True, self.frame_queue.get()
# else:
# return False, None
def release(self):
self.running = False
self.thread.join()
self.cap.release()
class ModelManager:
def __init__(self):
self.verify_list = ChannelManager() #模型的主要数据 -- 2024-7-5修改为类管理通道数据
self.bRun = True
self.logger = LogHandler().get_logger("ModelManager")
# 本地YOLOv5仓库路径
self.yolov5_path = myCongif.get_data("yolov5_path")
#self.buflen = myCongif.get_data("buffer_len")
self.icout_max = myCongif.get_data("RESET_INTERVAL") #跟视频帧序用一个变量
self.frame_rate = myCongif.get_data("frame_rate")
#基于模型运行环境进行相应初始化工作
self.model_platform = myCongif.get_data("model_platform")
self.device_id = myCongif.get_data("device_id")
# acl资源初始化
if self.model_platform == "acl":
ACLModeManger.init_acl(self.device_id) #acl -- 全程序初始化
self.model_dic = {} #model_id model
#报警处理线程-全进程独立一个线程处理
self.warnM = None
def __del__(self):
self.logger.debug("释放资源")
del self.verify_list #应该需要深入的删除--待完善
if self.model_platform == "acl":
ACLModeManger.del_acl(self.device_id) #acl -- 全程序反初始化 需要确保在执行析构前,其它资源已释放
def _open_view(self,url,itype): #打开摄像头 0--USB摄像头,1-RTSP,2-海康SDK
if itype == 0:
cap = VideoCaptureWithFPS(int(url))
elif itype == 1:
cap = VideoCaptureWithFPS(url)
else:
raise Exception("视频参数错误!")
return cap
def _import_model(self,model_name,model_path,threshold):
'''
根据路径,动态导入模块
:param model_name: 模块名称
:param model_path: 模块路径
:param threshold: 置信阈值
:return:
'''
if os.path.exists(model_path):
module_spec = importlib.util.spec_from_file_location(model_name, model_path)
if module_spec is None:
self.logger.error(f"{model_path} 加载错误")
return None
module = importlib.util.module_from_spec(module_spec)
module_spec.loader.exec_module(module)
md = getattr(module, "Model")(model_path,self,threshold) #实例化Model
if not isinstance(md, ModelBase):
self.logger.error("{} not zf_model".format(md))
return None
if md.init_ok == False:
self.logger.error("离线模型加载初始化失败!")
return None
else:
self.logger.error("{}文件不存在".format(model_path))
return None
return md
def getschedule(self,c2m_id,myDBM):
'''
根据c2mID 查询该算法的布防时间
:param c2m_id:
:return: 以day为行,hour作为列的,布防标识值二维list
'''
strsql = f"select day,hour,status from schedule where channel2model_id ={c2m_id} order by hour asc,day asc;"
datas = myDBM.do_select(strsql)
onelist = []
twolist = []
threelist = []
fourlist = []
fivelist = []
sixlist = []
sevenlist = []
if datas:
for i in range(24):
onelist.append(datas[i*7 + 0][2])
twolist.append(datas[i*7 + 1][2])
threelist.append(datas[i*7 + 2][2])
fourlist.append(datas[i*7 + 3][2])
fivelist.append(datas[i*7 + 4][2])
sixlist.append(datas[i*7 + 5][2])
sevenlist.append(datas[i*7 + 6][2])
else:
self.logger.debug(f"没有数据--{c2m_id}")
onelist = [1]*24
twolist = [1]*24
threelist = [1]*24
fourlist = [1]*24
fivelist = [1]*24
sixlist = [1]*24
sevenlist = [1]*24
schedule_list = []
schedule_list.append(onelist)
schedule_list.append(twolist)
schedule_list.append(threelist)
schedule_list.append(fourlist)
schedule_list.append(fivelist)
schedule_list.append(sixlist)
schedule_list.append(sevenlist)
return schedule_list
def set_last_img(self,):
pass
def start_model_thread(self,channel_id): #通道ID会重吗?
'''实例化模型组件,启动模型推理线程'''
channel_data = self.verify_list.get_channel(channel_id) # 是对ChannelData 对象的引用
#查询关联的模型 --- 在循环运行前把基础数据都准备好
strsql = (f"select t1.model_id,t1.check_area,t1.polygon ,t2.duration_time,t2.proportion,t2.model_path,t1.ID,"
f"t2.model_name,t1.conf_threshold "
f"from channel2model t1 left join model t2 on t1.model_id = t2.ID where t1.channel_id ={channel_id};")
# print(strsql)
model = mDBM.do_select(strsql,1) #2024-7-12调整规则,一个通道只关联一个模型,表结构暂时不动
if model:
strMID = str(model[0])
m = None
if strMID in self.model_dic:#该模型线程已启动
m = self.model_dic[strMID]
else:
# 基于基类实例化模块类
m = self._import_model("", model[5], model[8]) # 动态加载模型处理文件py --需要验证模型文件是否能加载
if m:
# 开始工作线程---推理线程需不需要全进程唯一
m.run = True
m.start_th() #模型里跑两个线程
#添加模型对象到字典中
self.model_dic[strMID] = m
else:
self.logger.error(f"{model[5]}没有实例化成功")
return #模型没加载成功--原画输出
# 更新该模型对应视频通道的数据
channel_data.set_in_data(model[1], model[2]) #chack_area,ploygon
channel_data.schedule = self.getschedule(model[6], mDBM) # 布放计划-c_m_id
channel_data.result = [0 for _ in
range(model[3] * myCongif.get_data("verify_rate"))] # 初始化时间*验证帧率数量的结果list
channel_data.proportion = model[4] # 报警占比
channel_data.warn_last_time = time.time() # 最后次触发报警的时间
channel_data.model_name = model[7]
# 添加通道
m.addChannel(channel_id, channel_data) #删除时不能手动清空内存 2024-7-14
#启动视频通道预处理和后处理线程
channel_data.start_channel_thread(self,m) #一个视频通道,一个预和后处理线程
#删除还没完善 del self.model_dic[strMID]
else: #没有模型数据--channel_data.bModel = Flase ,不需要添加数据,直接原画输出
return
def stop_model_thread(self,channel_id,model_id):
'''某个视频通道结束工作'''
channel_data = self.verify_list.get_channel(channel_id) # 是对ChannelData 对象的引用
m = self.model_dic.get(model_id)
if m:
m.strop_th()
channel_data.cap.release()
def save_frame_to_video(self):
'''把缓冲区中的画面保存为录像'''
pass
def start_work(self,channel_id=0): #还涉及单通道对开启和关闭
'''算法模型是在后台根据画面实时分析的
1.布防开关需要触发通道关闭和开启
2.布防策略的调整也需要关闭和重启工作
'''
#pre-thread
if channel_id ==0:
strsql = "select id,ulr,type from channel where is_work = 1;" #执行所有通道
else:
strsql = f"select id,ulr,type from channel where is_work = 1 and id = {channel_id};" #单通道启动检测线程
datas = mDBM.do_select(strsql)
for data in datas:
# 创建channel_data
ch_data = self.verify_list.add_channel(data[0],data[1],data[2],True,
myCongif.get_data("buffer_len"),myCongif.get_data("RESET_INTERVAL"))
#启动该通道的视频捕获线程
ch_data.cap = self._open_view(ch_data.str_url,ch_data.int_type) #创建子线程读画面-把cap给模型就行--
ch_data.mMM = self
#目前一个模型两个线程
self.start_model_thread(data[0])
#启动告警线程
self.warnM = WarnManager()
self.warnM.start_warnmanager_th()
def add_warm_data(self,warn_data):
self.warnM.add_warn_data(warn_data)
def stop_work(self,channel_id=0):
'''停止工作线程,0-停止所有,非0停止对应通道ID的线程'''
self.verify_list.stop_channel(channel_id)
#print(f"Current working directory (ModelManager.py): {os.getcwd()}")
mMM = ModelManager()
def test1():
print(cv2.getBuildInformation())
source = 'rtsp://192.168.3.44/live1'
gstreamer_pipeline = (
f"rtspsrc location={source} protocols=udp latency=0 ! "
"rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! appsink"
)
cap = cv2.VideoCapture(gstreamer_pipeline, cv2.CAP_GSTREAMER)
if not cap.isOpened():
print("Error: Unable to open the video source.")
return
else:
print("Successfully opened the video source.")
ret, frame = cap.read()
if ret:
cv2.imshow('Frame', frame)
cv2.waitKey(0)
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
mMM.start_work()
#test1()
print("111")
# name = acl.get_soc_name()
# count, ret = acl.rt.get_device_count()
# print(name,count)