You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

454 lines
20 KiB

# 导入代码依赖
import time
import av
import os
import cv2
import numpy as np
import threading
import importlib.util
import datetime
from core.DBManager import mDBM,DBManager
from myutils.MyLogger_logger import LogHandler
from myutils.ConfigManager import myCongif
from model.plugins.ModelBase import ModelBase
import acl
from PIL import Image
ACL_MEM_MALLOC_HUGE_FIRST = 0
ACL_MEMCPY_HOST_TO_DEVICE = 1
ACL_MEMCPY_DEVICE_TO_HOST = 2
class VideoCaptureWithFPS:
'''视频捕获的封装类,是一个通道一个'''
def __init__(self, source=0):
self.source = source
self.cap = cv2.VideoCapture(self.source)
self.width = None
self.height = None
if self.cap.isOpened(): #若没有打开成功,在读取画面的时候,已有判断和处理
self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
#self.fps = fps # 线程保持最大帧率的刷新画面---过高的帧率会影响CPU性能,但过地的帧率会造成帧积压
self.fps = self.cap.get(cv2.CAP_PROP_FPS) + 20 #只是个参考值,休眠时间要比帧率快点,由于read也需要耗时。
#print(self.fps)
self.frame = None
self.running = True
self.read_lock = threading.Lock()
self.thread = threading.Thread(target=self.update)
self.thread.start()
def update(self):
while self.running:
ret, frame = self.cap.read()
if not ret:
continue
with self.read_lock:
self.frame = frame
time.sleep(1.0 / self.fps)
def read(self):
with self.read_lock:
frame = self.frame
return frame is not None, frame
def release(self):
self.running = False
self.thread.join()
self.cap.release()
class ModelManager:
def __init__(self):
self.verify_list = {} #模型的主要数据
self.bRun = True
self.logger = LogHandler().get_logger("ModelManager")
# 本地YOLOv5仓库路径
self.yolov5_path = myCongif.get_data("yolov5_path")
self.buflen = myCongif.get_data("buffer_len")
self.icout_max = myCongif.get_data("RESET_INTERVAL") #跟视频帧序用一个变量
self.frame_rate = myCongif.get_data("frame_rate")
self.frame_interval = 1.0 / int(myCongif.get_data("verify_rate"))
#保存视频相关内容
self.fourcc = cv2.VideoWriter_fourcc(*'mp4v') # 使用 mp4 编码
#基于模型运行环境进行相应初始化工作
if myCongif.get_data("model_platform") == "acl":
def __del__(self):
self.logger.debug("释放资源")
def _init_acl(self):
'''acl初始化函数'''
self.device_id = 0
#step1 初始化
ret = acl.init()
def _open_view(self,url,itype): #打开摄像头 0--USB摄像头,1-RTSP,2-海康SDK
if itype == 0:
cap = VideoCaptureWithFPS(int(url))
elif itype == 1:
cap = VideoCaptureWithFPS(url)
else:
raise Exception("视频参数错误!")
return cap
def _import_model(self,model_name,model_path):
'''根据路径,动态导入模块'''
if os.path.exists(model_path):
module_spec = importlib.util.spec_from_file_location(model_name, model_path)
if module_spec is None:
self.logger.error(f"{model_path} 加载错误")
return None
module = importlib.util.module_from_spec(module_spec)
module_spec.loader.exec_module(module)
md = getattr(module, "Model")(model_path)
if not isinstance(md, ModelBase):
self.logger.error("{} not zf_model".format(md))
return None
else:
self.logger.error("{}文件不存在".format(model_path))
return None
return md
def getschedule(self,c2m_id,myDBM):
'''
根据c2mID 查询该算法的布防时间
:param c2m_id:
:return: 以day为行,hour作为列的,布防标识值二维list
'''
strsql = f"select day,hour,status from schedule where channel2model_id ={c2m_id} order by hour asc,day asc;"
datas = myDBM.do_select(strsql)
onelist = []
twolist = []
threelist = []
fourlist = []
fivelist = []
sixlist = []
sevenlist = []
if datas:
for i in range(24):
onelist.append(datas[i*7 + 0][2])
twolist.append(datas[i*7 + 1][2])
threelist.append(datas[i*7 + 2][2])
fourlist.append(datas[i*7 + 3][2])
fivelist.append(datas[i*7 + 4][2])
sixlist.append(datas[i*7 + 5][2])
sevenlist.append(datas[i*7 + 6][2])
else:
self.logger(f"没有数据?{c2m_id}")
onelist = [1]*24
twolist = [1]*24
threelist = [1]*24
fourlist = [1]*24
fivelist = [1]*24
sixlist = [1]*24
sevenlist = [1]*24
schedule_list = []
schedule_list.append(onelist)
schedule_list.append(twolist)
schedule_list.append(threelist)
schedule_list.append(fourlist)
schedule_list.append(fivelist)
schedule_list.append(sixlist)
schedule_list.append(sevenlist)
return schedule_list
def verify(self,frame,myModle_list,myModle_data,channel_id,schedule_list,result_list,isdraw=1):
'''验证执行主函数,实现遍历通道关联的模型,调用对应模型执行验证,模型文件遍历执行'''
img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# img = frame.to_ndarray(format="bgr24")
# 使用 模型 进行目标检测
i_warn_count = 0 #报警标签
isverify = False
for i in range(len(myModle_list)): # 遍历通道关联的算法进行检测,若不控制模型数量,有可能需要考虑多线程执行。
model = myModle_list[i]
data = myModle_data[i]
schedule = schedule_list[i]
result = result_list[i]
#验证检测计划,是否在布防时间内
now = datetime.datetime.now() # 获取当前日期和时间
weekday = now.weekday() # 获取星期几,星期一是0,星期天是6
hour = now.hour
result.pop(0) # 保障结果数组定长 --先把最早的结果推出数组
if schedule[weekday][hour] == 1: #不在计划则不进行验证,直接返回图片
# 调用模型,进行检测,model是动态加载的,具体的判断标准由模型内执行 ---- *********
isverify = True
detections, bwarn, warntext = model.verify(img, data,isdraw) #****************重要
# 对识别结果要部要进行处理
if bwarn: # 整个识别有产生报警
#根据模型设定的时间和占比判断是否
# 绘制报警文本
cv2.putText(img, 'Intruder detected!', (50, (i_warn_count + 1) * 50),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
i_warn_count += 1
result.append(1) #要验证数组修改,是地址修改吗?
else: #没有产生报警也需要记录,统一计算占比
result.append(0)
else:
result.append(0)
if not isverify: #没做处理,直接返回的,需要控制下帧率,太快读取没有意义。
time.sleep(1.0/self.frame_rate) #给个默认帧率,不超过30帧,---若经过模型计算,CPU下单模型也就12帧这样
# 将检测结果图像转换为帧 -- 需要确认前面对img的处理都是累加的。
#img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) #numpy.ndarray
if isinstance(img, np.ndarray):
new_frame = av.VideoFrame.from_ndarray(img, format="rgb24") #AVFrame
new_frame.pts = None # 添加此行确保有pts属性
#self.logger.debug("img 是np.darry")
else:
#self.logger.error("img不是np.darry")
new_frame = None
#处理完的图片后返回-gbr模式 (new_frame 是 rgb)
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
return new_frame,img
def dowork_thread(self,channel_id):
'''一个通道一个线程,关联的模型在一个线程检测,局部变量都是一个通道独有'''
channel_data = self.verify_list[channel_id] #一通道一线程 [url,type,True,img_buffer,img,icount]
cap = None
#查询关联的模型 --- 在循环运行前把基础数据都准备好
myDBM = DBManager()
myDBM.connect()
strsql = (f"select t1.model_id,t1.check_area,t1.polygon ,t2.duration_time,t2.proportion,t2.model_path,t1.ID,"
f"t2.model_name "
f"from channel2model t1 left join model t2 on t1.model_id = t2.ID where t1.channel_id ={channel_id};")
myModels = myDBM.do_select(strsql)
#加载模型 --- 是不是要做个限制,一个视频通道关联算法模块的上限 --- 关联多了一个线程执行耗时较多,造成帧率太低,或者再多线程并发 #?
myModle_list = [] #存放模型对象List 一个模型一个
myModle_data = [] #存放检测参数 一个模型一个
schedule_list = [] #布防策略 -一个模型一个
result_list = [] #检测结果记录 -一个模型一个
proportion_lsit = []#占比设定 -一个模型一个
warn_save_count = []#没个模型触发报警后,保存录像的最新帧序号 -一个模型一个
view_buffer = []
ibuffer_count = 0
last_img = None
for model in myModels:
#基于基类实例化模块类
m = self._import_model("",model[5]) #动态加载模型
if m:
myModle_list.append(m)
myModle_data.append(model)
#model[6] -- c2m_id --把通道对于模型的 0-周一,6-周日
schedule_list.append(self.getschedule(model[6],myDBM))
result = [0 for _ in range(model[3] * myCongif.get_data("verify_rate"))] #初始化时间*验证帧率数量的结果list
result_list.append(result)
proportion_lsit.append(model[4])
warn_save_count.append(0) #保存录像的最新帧初始化为0
# if not myModle_list: #没有成功加载的模型原画输出
# self.logger.info(f"视频通道:{channel_id},未配置算法模块,结束线程!")
# return #可以不结束,直接返回未处理画面显示。
#开始循环检测
#print(mydata[0],mydata[1],mydata[2],mydata[3]) # url type tag img_buffer
#[url,type,True,img_buffer]
iread_count =0
last_frame_time = time.time()
#保存视频录像使用 -- 放在循环外面,可以减少点消耗
FPS = myCongif.get_data("verify_rate") # 视频帧率
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # 使用 mp4 编码
while channel_data[2]: #基于tag 作为运行标识。 线程里只是读,住线程更新,最多晚一轮,应该不用线程锁。需验证
# 需要控制帧率
current_time = time.time()
elapsed_time = current_time - last_frame_time
if elapsed_time < self.frame_interval:
time.sleep(self.frame_interval - elapsed_time) #若小于间隔时间则休眠
last_frame_time = time.time()
#*********取画面*************
if not cap: #还没连接视频源
try:
cap = self._open_view(channel_data[0],channel_data[1])
iread_count = 0
except:
self.logger.error("参数错误,终止线程")
return
ret,frame = cap.read()
if not ret:
if iread_count > 60:
self.logger.warning(f"通道-{channel_id}:view disconnected. Reconnecting...")
cap.release()
cap = None
time.sleep(myCongif.get_data("cap_sleep_time"))
else:
iread_count += 1
time.sleep(1.0/20) #20帧只是作为个默认参考值,一般验证帧率都比这个慢
continue #没读到画面继续
iread_count = 0 #重置下视频帧计算
#执行图片推理 -- 如何没有模型或不在工作时间,返回的是原画,要不要控制下帧率? -- 在verify中做了sleep
new_frame,img = self.verify(frame,myModle_list,myModle_data,channel_id,schedule_list,result_list)
#分析图片放入内存中
channel_data[3].append(img)
channel_data[5] += 1 #帧序列加一
#print(self.verify_list[channel_id][5])
if channel_data[5] > self.icout_max:
channel_data = 0
if len(channel_data[3]) > self.buflen: # 保持缓冲区大小---缓冲区可以用来保持录像
channel_data[3].pop(0)
#self.logger.debug("drop one frame!")
channel_data[4] = new_frame #一直更新最新帧
#验证result_list -是否触发报警要求
for i in range(len(result_list)):
result = result_list[i]
proportion = proportion_lsit[i]
count_one = float(sum(result)) #1,0 把1累加的和就是1的数量
ratio_of_ones = count_one / len(result)
#self.logger.debug(result)
if ratio_of_ones >= proportion: #触发报警
model_name = myModle_data[i][7]
w_s_count = warn_save_count[i]
buffer_count = channel_data[5]
self.save_warn(model_name,w_s_count,buffer_count,channel_data[3],cap.width,cap.height,
channel_id,myDBM,FPS,fourcc)
self.send_warn()
#更新帧序列号
warn_save_count[i] = buffer_count
#结果记录要清空
for i in range(len(result)):
result[i] = 0
# end_time = time.time() # 结束时间
# print(f"Processing time: {end_time - start_time} seconds")
# 本地显示---测试使用
# cv2.imshow('Frame', img)
# if cv2.waitKey(1) & 0xFF == ord('q'):
# break
#结束线程
cap.release()
cv2.destroyAllWindows()
def save_warn(self,model_name,w_s_count,buffer_count,buffer,width,height,channnel_id,myDBM,FPS,fourcc):
'''
保存报警信息
:param model_name: 模型名称,如人员入侵
:param w_s_count: 报警已存储的最新帧序列
:param buffer_count: 当前视频缓冲区的最新帧序列
:param buffer: 视频缓存区
:param width: 视频画面的width
:param height: 视频画面的height
:param channnel_id: 视频通道ID
:return: ret 数据库操作记录
'''
now = datetime.datetime.now() # 获取当前日期和时间
current_time_str = now.strftime("%Y-%m-%d_%H-%M-%S")
filename = f"{channnel_id}_{current_time_str}"
#保存视频
video_writer = cv2.VideoWriter(f"model/warn/{filename}.mp4", fourcc, FPS, (width, height))
if not video_writer.isOpened():
print(f"Failed to open video writer for model/warn/{filename}.mp4")
return False
ilen = len(buffer)
istart = 0;
iend = ilen
if buffer_count < w_s_count or (buffer_count-w_s_count) > ilen: #buffer_count重置过
#buffer区,都保存为视频
istart = 0
else:#只取差异的缓冲区大小
istart = ilen - (buffer_count-w_s_count)
for i in range(istart,iend):
video_writer.write(buffer[i])
video_writer.release()
#保存图片
ret = cv2.imwrite(f"model/warn/{filename}.png",buffer[-1])
if not ret:
print("保存图片失败")
return False
#保存数据库
strsql = (f"INSERT INTO warn (model_name ,video_path ,img_path ,creat_time,channel_id ) "
f"Values ('{model_name}','model/warn/{filename}.mp4','model/warn/{filename}.png',"
f"'{current_time_str}','{channnel_id}');")
ret = myDBM.do_sql(strsql)
return ret
def send_warn(self):
'''发送报警信息'''
pass
def save_frame_to_video(self):
'''把缓冲区中的画面保存为录像'''
pass
def start_work(self,channel_id=0):
'''算法模型是在后台根据画面实时分析的
1.布防开关需要触发通道关闭和开启
2.布防策略的调整也需要关闭和重启工作
'''
if channel_id ==0:
strsql = "select id,ulr,type from channel where is_work = 1;" #执行所有通道
else:
strsql = f"select id,ulr,type from channel where is_work = 1 and id = {channel_id};" #单通道启动检测线程
datas = mDBM.do_select(strsql)
for data in datas:
img_buffer = []
img = None
icout = 0 #跟img_buffer对应,记录进入缓冲区的帧序列号
run_data = [data[1],data[2],True,img_buffer,img,icout]
self.verify_list[data[0]] = run_data #需要验证重复情况#?
th_chn = threading.Thread(target=self.dowork_thread, args=(data[0],)) #一个视频通道一个线程,线程句柄暂时部保留
th_chn.start()
def stop_work(self,channel_id=0):
'''停止工作线程,0-停止所有,非0停止对应通道ID的线程'''
if channel_id ==0: #所有线程停止
for data in self.verify_list:
data[2] = False
del data[3]
time.sleep(2)
self.verify_list.clear()
else:
data = self.verify_list[channel_id]
data[2] = False
del data[3]
time.sleep(1)
del self.verify_list[channel_id]
#print(f"Current working directory (ModelManager.py): {os.getcwd()}")
mMM = ModelManager()
def test():
buffer = [np.zeros((480, 640, 3), dtype=np.uint8) for _ in range(60)] # 示例帧列表
FRAME_WIDTH = 640 # 根据你的图像尺寸设置
FRAME_HEIGHT = 480 # 根据你的图像尺寸设置
FPS = myCongif.get_data("verify_rate") # 你的视频帧率
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # 使用 mp4 编码
now = datetime.datetime.now() # 获取当前日期和时间
current_time_str = now.strftime("%Y-%m-%d_%H-%M-%S")
filename = f"{2}_{current_time_str}.mp4"
#filename = "saved_video.mp4"
print(filename)
# 保存视频
video_writer = cv2.VideoWriter(filename, fourcc, FPS, (FRAME_WIDTH, FRAME_HEIGHT))
if not video_writer.isOpened():
print(f"Failed to open video writer for filename")
return False
for frame in buffer:
video_writer.write(frame)
video_writer.release()
# 保存图片
ret = cv2.imwrite("saved_frame.jpg", buffer[-1])
if ret:
print("保存图片成功")
else:
print("保存图片失败")
return False
if __name__ == "__main__":
#mMM.start_work()
# model = ModelManager()._import_model("", "../model/plugins/RYRQ/RYRQ_Model_ACL.py")
# model.testRun()
test()