Browse Source

一通道一线程完整版--V1.0

zfbox
张龙 8 months ago
parent
commit
47e3e18cbc
  1. 2
      .idea/FristProject.iml
  2. 11
      .idea/deployment.xml
  3. 2
      .idea/misc.xml
  4. 5
      config.yaml
  5. 9
      core/ACLModelManager.py
  6. 132
      core/CapManager.py
  7. 534
      core/ChannelManager.py
  8. 53
      core/DBManager.py
  9. 357
      core/ModelManager.py
  10. 20
      core/WarnManager.py
  11. 53
      model/base_model/ascnedcl/det_utils.py
  12. 20
      model/plugins/ModelBase.py
  13. 76
      model/plugins/Peo_ACL/Peo_Model_ACL.py
  14. BIN
      model/plugins/Peo_ACL/people.om
  15. 17
      model/plugins/RYRQ_ACL/RYRQ_Model_ACL.py
  16. 81
      model/plugins/RYRQ_Model_ACL/RYRQ_Model_ACL.py
  17. 80
      model/plugins/RYRQ_Model_ACL/coco_names.txt
  18. BIN
      model/plugins/RYRQ_Model_ACL/yolov5s_bs1.om
  19. 18
      myutils/MyDeque.py
  20. 28
      myutils/MyTraceMalloc.py
  21. 14
      run.py
  22. 66
      web/API/channel.py
  23. 2
      web/API/model.py
  24. 120
      web/API/viedo.py
  25. BIN
      web/main/static/favicon_bak.ico
  26. BIN
      web/main/static/images/登录/u12.png
  27. 6
      web/main/static/images/登录/u4.svg
  28. 6
      web/main/static/resources/css/bootstrap.min.css
  29. 337
      web/main/static/resources/scripts/aiortc-client-new.js
  30. 7
      web/main/static/resources/scripts/bootstrap.bundle.min.js
  31. 3
      web/main/static/resources/scripts/channel_manager.js
  32. 4
      web/main/static/resources/scripts/jquery-1.7.1.min.js
  33. 14
      web/main/static/resources/scripts/jquery-3.2.1.min.js
  34. 1
      web/main/templates/channel_manager.html
  35. 170
      web/main/templates/登录.html
  36. BIN
      zfbox.db

2
.idea/FristProject.iml

@ -2,7 +2,7 @@
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Remote Python 3.9.2 (sftp://root@192.168.3.104:22/usr/local/miniconda3/bin/python)" jdkType="Python SDK" />
<orderEntry type="jdk" jdkName="Remote Python 3.9.2 (sftp://root@192.168.3.101:22/usr/local/miniconda3/bin/python)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

11
.idea/deployment.xml

@ -1,11 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="PublishConfigData" autoUpload="Always" serverName="root@192.168.1.100:22" remoteFilesAllowedToDisappearOnAutoupload="false">
<component name="PublishConfigData" autoUpload="Always" serverName="root@192.168.3.101:22" remoteFilesAllowedToDisappearOnAutoupload="false">
<serverData>
<paths name="root@192.168.1.100:22">
<serverdata>
<mappings>
<mapping deploy="/tmp/pycharm_project_725" local="$PROJECT_DIR$" />
<mapping deploy="/mnt/zfbox" local="$PROJECT_DIR$" />
</mappings>
</serverdata>
</paths>
<paths name="root@192.168.3.101:22">
<serverdata>
<mappings>
<mapping deploy="/mnt/zfbox" local="$PROJECT_DIR$" />
</mappings>
</serverdata>
</paths>

2
.idea/misc.xml

@ -3,7 +3,7 @@
<component name="Black">
<option name="sdkName" value="PyTorch" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Remote Python 3.9.2 (sftp://root@192.168.3.104:22/usr/local/miniconda3/bin/python)" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="Remote Python 3.9.2 (sftp://root@192.168.3.101:22/usr/local/miniconda3/bin/python)" project-jdk-type="Python SDK" />
<component name="PyCharmProfessionalAdvertiser">
<option name="shown" value="true" />
</component>

5
config.yaml

@ -40,10 +40,11 @@ cap_sleep_time: 120 #单位秒 -- 5分钟
buffer_len: 100 #分析后画面缓冲区帧数 -- 可以与验证帧率结合确定缓冲区大小
RESET_INTERVAL : 100000 #帧数重置上限
frame_rate : 20 #帧率参考值 -- 后续作用主要基于verify_rate进行帧率控制
verify_rate : 8 #验证帧率--- 也就是视频输出的帧率
verify_rate : 10 #验证帧率--- 也就是视频输出的帧率
warn_video_path: /mnt/zfbox/model/warn/
warn_interval: 120 #报警间隔--单位秒
video_error_count: 3 #单位秒 ---根据验证帧率,判断3秒内都是空帧的话,视频源链接有问题。
video_error_count: 10 #单位秒 ---根据验证帧率,判断10秒内都是空帧的话,视频源链接有问题。
reconnect_attempts: 5 #cap 读取帧失败几次后进行重连
#system --- 指定网卡
wired_interface : eth0

9
core/ACLModelManager.py

@ -4,7 +4,13 @@ if myCongif.get_data("model_platform") == "acl":
SUCCESS = 0 # 成功状态值
FAILED = 1 # 失败状态值
'''
#acl.rt.get_device_count() 获取可用Device数量
# 需先调用acl.rt.get_run_mode接口获取软件栈的运行模式。
#
# 当查询结果为ACL_HOST = 1,则数据传输时涉及申请Host上的内存。
# 当查询结果为ACL_DEVICE = 0 ,则数据传输时不涉及申请Host上的内存,仅需申请Device上的内存。
'''
class ACLModeManger:
def __init__(self,):
self.acl_ok = False
@ -19,7 +25,6 @@ class ACLModeManger:
ret = acl.init() # 0-成功,其它失败
if ret:
raise RuntimeError(ret)
ret = acl.rt.set_device(device_id) # 指定当前进程或线程中用于运算的Device。可以进程或线程中指定。*多设备时可以放线程*
# 在某一进程中指定Device,该进程内的多个线程可共用此Device显式创建Context(acl.rt.create_context接口)。
if ret:

132
core/CapManager.py

@ -4,14 +4,18 @@ import cv2
import threading
import time
from myutils.ConfigManager import myCongif
from myutils.MyDeque import MyDeque
import subprocess as sp
class VideoCaptureWithFPS:
'''视频捕获的封装类,是一个通道一个'''
def __init__(self, source):
'''视频捕获的封装类,是一个通道一个
打开摄像头 0--USB摄像头1-RTSP,2-海康SDK
'''
def __init__(self, source,type=1):
self.source = source
self.width = None
self.height = None
self.bok = False
# GStreamer --- 内存占用太高,且工作环境的部署也不简单
# self.pipeline = (
# "rtspsrc location=rtsp://192.168.3.102/live1 protocols=udp latency=100 ! "
@ -35,61 +39,72 @@ class VideoCaptureWithFPS:
# self.pipe = sp.Popen(self.ffmpeg_cmd, stdout=sp.PIPE, bufsize=10 ** 8)
# opencv -- 后端默认使用的就是FFmpeg -- 不支持UDP
self.cap = cv2.VideoCapture(self.source)
if self.cap.isOpened(): #若没有打开成功,在读取画面的时候,已有判断和处理 -- 这里也要检查下内存的释放情况
self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(self.width,self.height)
#self.fps = fps # 线程保持最大帧率的刷新画面---过高的帧率会影响CPU性能,但过地的帧率会造成帧积压
self.fps = math.ceil(self.cap.get(cv2.CAP_PROP_FPS)/float(myCongif.get_data("verify_rate")))-1 #向上取整。
#print(self.fps)
self.running = True
self.frame_queue = queue.Queue(maxsize=1)
#self.frame_queue = queue.Queue(maxsize=1)
self.frame_queue = MyDeque(10)
#self.frame = None
#self.read_lock = threading.Lock()
self.thread = threading.Thread(target=self.update)
self.thread.start()
def openViedo_opencv(self,source):
self.cap = cv2.VideoCapture(source)
if self.cap.isOpened(): # 若没有打开成功,在读取画面的时候,已有判断和处理 -- 这里也要检查下内存的释放情况
self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# self.fps = fps # 线程保持最大帧率的刷新画面---过高的帧率会影响CPU性能,但过地的帧率会造成帧积压
self.fps = math.ceil(
self.cap.get(cv2.CAP_PROP_FPS) / float(myCongif.get_data("verify_rate"))) - 1 # 向上取整。
#print(self.width, self.height, self.fps)
else:
raise ValueError("无法打开视频源")
def update(self):
icount = 0
sleep_time = myCongif.get_data("cap_sleep_time")
reconnect_attempts = myCongif.get_data("reconnect_attempts")
while self.running:
ret, frame = self.cap.read()
if not ret:
icount += 1
if icount > 5: #重连
self.cap.release()
self.cap = cv2.VideoCapture(self.source)
#self.cap = cv2.VideoCapture(self.pipeline, cv2.CAP_GSTREAMER)
if self.cap.isOpened():
self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(self.width,self.height)
# self.fps = fps # 线程保持最大帧率的刷新画面---过高的帧率会影响CPU性能,但过地的帧率会造成帧积压
self.fps = math.ceil(
self.cap.get(cv2.CAP_PROP_FPS) / float(myCongif.get_data("verify_rate"))) -1 # 向上取整。
icount = 0
else:
#self.frame = None
sleep_time = myCongif.get_data("cap_sleep_time")
print(f"{self.source}视频流,将于{sleep_time}秒后重连!")
time.sleep(sleep_time)
continue
try:
self.openViedo_opencv(self.source)
failure_count = 0
self.bok = True
while self.running:
ret, frame = self.cap.read()
if not ret:
failure_count += 1
time.sleep(0.5) #休眠一段时间后重试
if failure_count >= reconnect_attempts:
raise RuntimeError("无法读取视频帧")
continue
self.frame_queue.myappend(frame)
failure_count = 0 #重置计数
# 跳过指定数量的帧以避免积压
for _ in range(self.fps):
self.cap.grab()
except Exception as e:
print(f"发生异常:{e}")
self.cap.release()
self.bok = False
print(f"{self.source}视频流,将于{sleep_time}秒后重连!")
time.sleep(sleep_time)
#释放cap资源,由release调用实现
#resized_frame = cv2.resize(frame, (int(self.width / 2), int(self.height / 2)))
# with self.read_lock:
# self.frame = frame
if self.frame_queue.full():
try:
#print("采集线程丢帧")
self.frame_queue.get(timeout=0.01) #这里不get的好处是,模型线程不会有None
except queue.Empty: #为空不处理
pass
self.frame_queue.put(frame)
# 跳过指定数量的帧以避免积压
for _ in range(self.fps):
self.cap.grab()
# time.sleep(self.fps) #按照视频源的帧率进行休眠
#print("Frame updated at:",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
# if self.frame_queue.full():
# try:
# #print("队列满---采集线程丢帧")
# self.frame_queue.get(timeout=0.01) #这里不get的好处是,模型线程不会有None
# except queue.Empty: #为空不处理
# pass
# self.frame_queue.put(frame)
# if not self.frame_queue.full():
# self.frame_queue.put(frame)
def read(self):
'''
@ -104,16 +119,23 @@ class VideoCaptureWithFPS:
# else:
# return False, None
if not self.frame_queue.empty():
try:
frame = self.frame_queue.get(timeout=0.05)
except queue.Empty:
#print("cap-frame None")
return False, None
else:
#print("cap-frame None")
return False, None
return True, frame
# if not self.frame_queue.empty():
# try:
# frame = self.frame_queue.get(timeout=0.05)
# except queue.Empty:
# #print("cap-frame None")
# return False, None
# else:
# #print("cap-frame None")
# return False, None
ret = False
frame = None
if self.bok: #连接状态再读取
frame = self.frame_queue.mypopleft()
if frame is not None:
ret = True
return ret, frame
def release(self):
self.running = False

534
core/ChannelManager.py

@ -1,31 +1,56 @@
import threading
from collections import deque
import numpy as np
import time
import copy
import queue
import cv2
import asyncio
import threading
import importlib.util
import datetime
import time
import os
import ffmpeg
from collections import deque
from myutils.MyLogger_logger import LogHandler
from myutils.ConfigManager import myCongif
from core.CapManager import VideoCaptureWithFPS
from core.ACLModelManager import ACLModeManger
from model.plugins.ModelBase import ModelBase
from core.WarnManager import WarnData
from myutils.MyDeque import MyDeque
import base64
class ChannelData:
def __init__(self, str_url, int_type, bool_run, deque_length,icount_max):
self.cap = None #该通道视频采集对象
self.work_th = None #该通道工作线程句柄
self.b_model = False #是否有运行模型线程
self.bool_run = bool_run # 线程运行标识
self.str_url = str_url #视频源地址
self.int_type = int_type #视频源类型,0-usb,1-rtsp,2-hksdk
self.icount_max = icount_max # 帧序列号上限
self.lock = threading.RLock() # 用于保证线程安全
def __init__(self,channel_id,deque_length,icount_max,warnM):
self.logger = LogHandler().get_logger("ChannelDat")
self.model_platform = myCongif.get_data("model_platform")
self.channel_id = channel_id #该通道的通道ID
self.warnM = warnM #报警线程管理对象--MQ
#视频采集相关
self.cap = None #该通道视频采集对象
self.frame_rate = myCongif.get_data("frame_rate")
self.frame_interval = 1.0 / int(myCongif.get_data("verify_rate"))
self.deque_frame = deque(maxlen=deque_length) #视频缓冲区用于保存录像
#模型采集相关
self.model = None #模型对象 -- 一通道只关联一个模型
self.work_th = None #该通道工作线程句柄
self.b_model = False #是否有运行模型线程
self.bool_run = True # 线程运行标识
self.lock = threading.RLock() # 用于保证线程安全
self.icount_max = icount_max # 帧序列号上限
self.max_len = myCongif.get_data("buffer_len")
self.deque_frame = deque(maxlen=self.max_len) #视频缓冲区用于保存录像
self.last_frame = None # 保存图片数据
self.frame_queue = queue.Queue(maxsize=1)
#self.frame_queue = queue.Queue(maxsize=1)
self.frame_queue = MyDeque(10) #分析画面MQ
self.counter = 0 #帧序列号--保存报警录像使用
#添加一帧图片
def add_deque(self, value):
if len(self.deque_frame) == self.max_len:
removed_frame = self.deque_frame.popleft()
del removed_frame
removed_frame = None
self.deque_frame.append(value) #deque 满了以后会把前面的数据移除
#拷贝一份数据
@ -39,20 +64,19 @@ class ChannelData:
# frame = self.last_frame
# return frame
#if not self.frame_queue.empty():
try:
frame = self.frame_queue.get(timeout=0.3) #web传输没有做帧率控制了,可以超时时间长一点
except queue.Empty:
print("channel--frame None")
return None
# else:
# try:
# frame = self.frame_queue.get(timeout=0.3) #web传输没有做帧率控制了,可以超时时间长一点
# except queue.Empty:
# self.logger.debug(f"{self.channel_id}--web--获取分析画面失败,队列空")
# return None
frame = self.frame_queue.mypopleft()
return frame
else: #如果没有运行,直接从cap获取画面
if self.cap:
ret, frame = self.cap.read() # 除了第一帧,其它应该都是有画面的
if not ret:
print("channel--frame None")
self.logger.debug(f"{self.channel_id}--web--获取原画失败,队列空")
return None
ret, frame_bgr_webp = cv2.imencode('.jpg', frame)
if not ret:
@ -60,10 +84,34 @@ class ChannelData:
else:
buffer_bgr_webp = frame_bgr_webp.tobytes()
return buffer_bgr_webp
# frame_bgr_webp = self.encode_frame_to_flv(frame)
# return frame_bgr_webp
return None
def encode_frame_to_flv(self,frame):
try:
process = (
ffmpeg
.input('pipe:', format='rawvideo', pix_fmt='bgr24', s=f'{frame.shape[1]}x{frame.shape[0]}')
.output('pipe:', format='flv',vcodec='libx264')
.run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True)
)
out, err = process.communicate(input=frame.tobytes())
if process.returncode != 0:
raise RuntimeError(f"FFmpeg encoding failed: {err.decode('utf-8')}")
return out
except Exception as e:
print(f"Error during frame encoding: {e}")
return None
def update_last_frame(self,buffer):
if buffer:
self.frame_queue.myappend(buffer)
# with self.lock:
# self.last_frame = None
# self.last_frame = buffer
@ -76,11 +124,12 @@ class ChannelData:
# pass
# self.frame_queue.put(buffer)
try:
self.frame_queue.put(buffer,timeout=0.05)
except queue.Full:
#print("channel--未插入")
pass
# try:
# self.frame_queue.put(buffer,timeout=0.05)
# except queue.Full:
# self.logger.debug(f"{self.channel_id}分析画面队列满,插入失败")
# pass
#帧序列号自增 一个线程中处理,不用加锁
def increment_counter(self):
@ -91,79 +140,406 @@ class ChannelData:
def get_counter(self):
return self.counter
#清空数据,停止工作线程(若有 ,并删除deque 和 last_frame)
def clear(self):
start_time = time.time()
with self.lock:
if self.b_model: #b_model为true,说明开启了工作线程
def _start_cap_th(self,source,type=1):
'''开始cap采集线程
type = 打开摄像头 0--USB摄像头1-RTSP,2-海康SDK
'''
ret = False
if self.cap:
self.cap.release()
self.cap = None
self.cap = VideoCaptureWithFPS(source,type)
if self.cap:
ret = True
return ret
def _stop_cap_th(self):
'''停止cap采集线程
重要约束停止cap线程前必须先停止model线程
'''
if self.b_model:
self.logger.error("停止采集线程前,请先停止model线程")
return False
else:
if self.cap:
self.cap.release()
self.cap = None
return True #一般不会没有cap
def _start_model_th(self,model_data,schedule):
verify_rate = myCongif.get_data("verify_rate")
warn_interval = myCongif.get_data("warn_interval")
self.bool_run = True
self.work_th = threading.Thread(target=self._dowork_thread,
args=(self.channel_id, model_data, schedule, verify_rate,
warn_interval)) # 一个视频通道一个线程
self.work_th.start()
def _stop_model_th(self):
if self.work_th:
if self.b_model:
self.bool_run = False
self.work_th.join() #等待通道对应的子线程,停止工作。
#time.sleep(1)
self.deque_frame.clear()
self.last_frame = None #二选一
self.frame_queue = queue.Queue(maxsize=1) #二选一
self.counter = 0
self.work_th.join() #线程结束时,会把b_model置False
self.logger.debug(f"{self.channel_id}停止工作线程")
self.work_th = None
end_time = time.time()
execution_time = end_time - start_time
print(f"停止一个通道线程,花费了: {execution_time} seconds")
def _import_model(self,model_name,model_path,threshold,iou_thres):
'''
根据路径动态导入模块
:param model_name: 模块名称 --用通道ID代替
:param model_path: 模块路径
:param threshold: 置信阈值
:param iou_thres: iou阈值
:return:
'''
try:
module_path = model_path.replace("/", ".").rsplit(".", 1)[0]
print(module_path)
# 动态导入模块
module = importlib.import_module(module_path)
# 从模块中获取指定的类
Model = getattr(module, "Model")
# 使用 Model 类
model_instance = Model(model_path,threshold,iou_thres)
return model_instance
except ModuleNotFoundError as e:
print(f"Module not found: {e}")
return None
except AttributeError as e:
print(f"Class not found in module: {e}")
return None
except Exception as e:
print(f"An unexpected error occurred: {e}")
return None
# if os.path.exists(model_path):
# module_spec = importlib.util.spec_from_file_location(model_name, model_path)
# if module_spec is None:
# self.logger.error(f"{model_path} 加载错误")
# return None
# module = importlib.util.module_from_spec(module_spec)
# start_time = time.time()
# print(f"通道{self.channel_id},开始exec_module自定义模型文件")
# module_spec.loader.exec_module(module)
# end_time = time.time()
# print(f"通道{self.channel_id},完成exec_module模型文件实例化耗时{end_time - start_time}")
#
# try:
# md = getattr(module, "Model")(model_path,threshold,iou_thres) #实例化类
# except Exception as e:
# self.logger.error(f"{model_path} 实例化错误,退出模型线程")
# return None
#
# if not isinstance(md, ModelBase):
# self.logger.error("{} not zf_model".format(md))
# return None
# else:
# self.logger.error("{}文件不存在".format(model_path))
# return None
# self.logger.debug(f"{model_path} 加载成功!!!!")
# return md
def _verify(self,frame,model,model_data,schedule,result,isdraw=1):
'''验证执行主函数,实现遍历通道关联的模型,调用对应模型执行验证'''
#img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
img = frame
#验证检测计划,是否在布防时间内
now = datetime.datetime.now() # 获取当前日期和时间
weekday = now.weekday() # 获取星期几,星期一是0,星期天是6
hour = now.hour
result.pop(0) # 保障结果数组定长 --先把最早的结果推出数组
warntext = ""
if model and schedule[weekday][hour] == 1: #不在计划则不进行验证,直接返回图片
# 调用模型,进行检测,model是动态加载的,具体的判断标准由模型内执行 ---- *********
bwarn, warntext = model.verify(img, model_data,isdraw) #****************重要
# 对识别结果要部要进行处理
if bwarn:
# 绘制报警文本
cv2.putText(img, warntext, (50, 50),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
result.append(1) #要验证数组修改,是地址修改吗?
else: #没有产生报警也需要记录,统一计算占比
warntext = ""
result.append(0)
else:
result.append(0)
# 将检测结果图像转换为帧--暂时用不到AVFrame--2024-7-5
# new_frame_rgb_avframe = av.VideoFrame.from_ndarray(img, format="rgb24") # AVFrame
# new_frame_rgb_avframe.pts = None # 添加此行确保有pts属性
# if isinstance(img, np.ndarray): -- 留个纪念
#处理完的图片后返回-bgr模式
#img_bgr_ndarray = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
# 将检查结果转换为WebP格式图片 --在线程里面完成应该可以减少网页端处理时间
ret,frame_bgr_webp=cv2.imencode('.jpg', img)
if not ret:
buffer_bgr_webp = None
else:
buffer_bgr_webp = frame_bgr_webp.tobytes()
#buffer_bgr_webp = self.encode_frame_to_flv(img)
return buffer_bgr_webp,img,warntext
def _dowork_thread(self,channel_id,model_data,schedule,verify_rate,warn_interval):
'''一个通道一个线程,关联的模型在一个线程检测,局部变量都是一个通道独有'''
if not self.cap:
self.logger.error("采集线程未正常启动,退出model线程")
return
#加载自定义模型文件
model = self._import_model(str(channel_id), model_data[5], model_data[8], model_data[9]) # 动态加载模型处理文件py
if not model:
self.logger.error("自定义模型文件加载失败,退出model线程")
return
#初始化模型运行资源
context = None
device_id = myCongif.get_data("device_id")
if self.model_platform == "acl": # ACL线程中初始化内容
context = ACLModeManger.th_inti_acl(device_id) #创建context
#初始化模型资源 -- 加载模型文件
ret = model.init_acl_resource() #加载和初始化离线模型文件--om文件
if not ret:
print("初始化模型资源出错,退出线程!")
return
#初始化业务数据
result = [0 for _ in range(model_data[3] * verify_rate)] # 初始化时间*验证帧率数量的结果list
proportion = model_data[4] # 判断是否报警的占比
warn_last_time = time.time()
#warn_save_count = 0 #保存录像的最新帧初始化为0
#开始循环处理业务
last_frame_time = time.time() #初始化个读帧时间
self.b_model = True
while self.bool_run: #基于tag 作为运行标识。 线程里只是读,住线程更新,最多晚一轮,应该不用线程锁。需验证
# 帧率控制帧率
current_time = time.time()
elapsed_time = current_time - last_frame_time
if elapsed_time < self.frame_interval:
time.sleep(self.frame_interval - elapsed_time) #若小于间隔时间则休眠
last_frame_time = time.time()
#*********取画面*************
ret,frame = self.cap.read() #除了第一帧,其它应该都是有画面的
if not ret:
#self.logger.debug(f"{self.channel_id}--model--获取cap画面失败,队列空")
continue #没读到画面继续
#执行图片推理
buffer_bgr_webp,img_bgr_ndarray,warn_text = self._verify(frame,model,model_data,schedule,result)
#分析图片放入内存中
self.add_deque(img_bgr_ndarray) # 缓冲区大小由maxlen控制 超上限后,删除最前的数据
# 分析画面一直更新最新帧,提供网页端显示
self.update_last_frame(buffer_bgr_webp)
if warn_text:
#验证result -是否触发报警要求 --遍历每个模型执行的result
count_one = float(sum(result)) #1,0 把1累加的和就是1的数量
ratio_of_ones = count_one / len(result)
#self.logger.debug(result)
if ratio_of_ones >= proportion: #触发报警
# 基于时间间隔判断
current_time = time.time()
elapsed_time = current_time - warn_last_time
if elapsed_time < warn_interval:
continue
warn_last_time = current_time
# 处理报警
warn_data = WarnData()
warn_data.model_name = model_data[7]
warn_data.warn_text = warn_text
warn_data.img_buffer = self.copy_deque() # 深度复制缓冲区
warn_data.width = self.cap.width
warn_data.height = self.cap.height
warn_data.channel_id = channel_id
self.warnM.add_warn_data(warn_data)
#结果记录要清空
for i in range(len(result)):
result[i] = 0
# end_time = time.time() # 结束时间
# print(f"Processing time: {end_time - start_time} seconds")
# 本地显示---测试使用
# if channel_id == 2:
# cv2.imshow(str(channel_id), img)
# if cv2.waitKey(1) & 0xFF == ord('q'):
# break
#结束线程
print("开始结束工作线程")
self.b_model = False
#反初始化
if self.model_platform == "acl":
try:
model.release() #释放模型资源资源
# 删除模型对象
del model
#释放context
if context: # ACL线程中反初始化内容 -- 若线程异常退出,这些资源就不能正常释放了
# 再释放context
ACLModeManger.th_del_acl(context)
except Exception as e:
print(e)
#cv2.destroyAllWindows()
print("线程结束!!!!")
def start_work(self,cap_data,model_data,schedule,type=0):
'''
开始工作线程包括视频通道采集和模型处理
:param cap_data: [source,type]
:param model_data: 跟通道关联的模型数据
strsql = (
f"select t1.model_id,t1.check_area,t1.polygon ,t2.duration_time,t2.proportion,t2.model_path,t1.ID,"
f"t2.model_name,t1.conf_thres,t1.iou_thres "
f"from channel2model t1 left join model t2 on t1.model_id = t2.ID where t1.channel_id ={channel_id};")
:param schedule
:param type: 0-启动所有线程1-只启动cap采集线程2-只启动model模型处理线程
:return: True,False
'''
ret = False
if type==0:
self._start_cap_th(cap_data[0],cap_data[1]) #先cap,再model
self._start_model_th(model_data,schedule)
ret = True
elif type == 1:
self._start_cap_th(cap_data[0],cap_data[1])
ret = True
elif type == 2:
self._start_model_th(model_data,schedule)
ret = True
else:
self.logger.error("暂时还不支持该类型工作!")
return ret
def stop_work(self,type=0):
'''
清空数据停止工作线程若有 并删除deque last_frame
:param type: 0-停止所有线程1-只停止cap采集线程2-只停止model模型处理线程
:return: True,False
'''
ret = False
if type == 0:
self._stop_model_th()
self._stop_cap_th()
ret = True
elif type == 1:
self._stop_cap_th()
ret = True
elif type == 2:
self.logger.debug("单独停止工作线程")
self._stop_model_th()
ret = True
else:
self.logger.error("暂时还不支持该类型工作!")
return ret
'''
通道对象管理类只针对通道节点的维护和工作线程的开启和停止 ChannelData的操作必须维护在ChannelManager里面不能输出ChannelData对象
'''
class ChannelManager:
def __init__(self):
self.channels = {}
self._channels = {}
self.cm_lock = threading.RLock() # 用于保证字典操作的线程安全
#增加节点
def add_channel(self, channel_id, str_url, int_type, bool_run, deque_length=10,icount_max=100000):
def add_channel(self, channel_id,deque_length,icount_max,warnM):
ret = False
with self.cm_lock:
if channel_id in self.channels: #若已经有数据,先删除后再增加
self.channels[channel_id].clear() # 手动清理资源
del self.channels[channel_id]
ch_data = ChannelData(str_url, int_type, bool_run, deque_length, icount_max)
self.channels[channel_id] = ch_data
return ch_data
if channel_id in self._channels: #若已经有数据,先删除后再增加
self._channels[channel_id].stop_work(0) # 停止工作
del self._channels[channel_id]
#channel_id,deque_length,icount_max,warnM
ch_data = ChannelData(channel_id, deque_length, icount_max,warnM)
self._channels[channel_id] = ch_data
ret = True
return ret
#删除节点
def delete_channel(self, channel_id): #需要验证资源的释放清空
'''删除节点,包含了停止该节点工作线程'''
ret = False
with self.cm_lock:
if channel_id in self.channels:
self.channels[channel_id].clear() # 手动清理资源
self.channels[channel_id].cap.release()
del self.channels[channel_id]
if channel_id == 0:
for clannel_data in self._channels:
clannel_data.stop_work(0)
self._channels.clear() #清空节点
else:
if channel_id in self._channels:
self._channels[channel_id].stop_work(0) #停止工作
del self._channels[channel_id]
ret = True
return ret
#获取节点
def get_channel(self, channel_id):
#开始工作线程
def start_channel(self,channel_id,cap_data,model_data,schedule,type):
'''
启动通道的工作线程 -- 启动应该没有一下子启动所有
:param channel_id: 启动通道的ID
:param cap_data: [source,type]
:param model_data: 跟该通道关联的模型数据
:param schedule
:param type: 0-启动所有工作线程1-启动CAP采集线程2-启动model工资线程
:return:
'''
ret = False
with self.cm_lock:
return self.channels.get(channel_id)
if channel_id in self._channels:
ret = self._channels[channel_id].start_work(cap_data,model_data,schedule,type)
return ret
#停止工作线程---要把视频采集线程停止掉
def stop_channel(self,channel_id):
def stop_channel(self,channel_id,type):
'''
停止通道的工作线程
:param channel_id: 0-停止所有通道的工作线程其他值为具体停止哪个工作线程
:param type: 0-停止所有工作线程1-停止CAP采集线程2-停止model工资线程
:return:
'''
with self.cm_lock:
ret = False
if channel_id == 0:
# for clannel_id,clannel_data in self.channels.items():
# clannel_data.clear()
for clannel_data in self.channels:
clannel_data.clear() #停止工作线程,并清空业务数据
clannel_data.cap.release() #停止视频采集线程,并是否采集资源
self.channels.clear() #清空整个字典
for clannel_data in self._channels:
clannel_data.stop_work(type)
ret = True
else:
if channel_id in self.channels:
self.channels[channel_id].clear() # 手动清理资源
self.channels[channel_id].cap.release()
del self.channels[channel_id]
if channel_id in self._channels:
self._channels[channel_id].stop_work(type)
ret =True
return ret
if __name__ == "__main__":
# 示例使用
manager = ChannelManager()
manager.add_channel('channel_1', 'test', 123, True, deque_length=5)
def cm_get_last_frame(self,channel_id): #这里如果加锁的话,阻塞会比较厉害,但具体程度也需要验证。
frame = None
if channel_id in self._channels:
try:
frame = self._channels[channel_id].get_last_frame()
except Exception as e:
print(e)
return frame
def cm_get_cap_frame(self,channel_id):
img_base64 = None
if channel_id in self._channels:
channel_data = self._channels[channel_id]
if channel_data.cap:
try:
for i in range(5):
ret, frame = channel_data.cap.read()
if ret:
ret, frame_bgr_webp = cv2.imencode('.jpg', frame)
if ret:
# 将图像数据编码为Base64
img_base64 = base64.b64encode(frame_bgr_webp).decode('utf-8')
break
except Exception as e:
print(e)
return img_base64
# 更新和读取操作
manager.update_channel_deque('channel_1', 'data1')
manager.update_channel_buffer('channel_1', np.array([[1, 2], [3, 4]]))
manager.increment_channel_counter('channel_1')
channel_data = manager.get_channel_data('channel_1')
print(channel_data)
if __name__ == "__main__":
# 示例使用
pass

53
core/DBManager.py

@ -148,10 +148,11 @@ class DBManager():
#修改视频通道和算法间的关联关系
#channel_id 通道ID
#modell_list 最新配置的模型id list
# 0--失败,其他值成功
def updateC2M(self,channel_id,model_id,check_area,polygon_str,conf_thres,iou_thres):
c2m_id = 0
strsql = f"select ID from channel2model where channel_id={channel_id};"
data = self.do_select(strsql,1)
data = self.do_select(strsql,1) #c2m_id
if data: #修改数据
strsql = (f"update channel2model set model_id = {model_id},check_area={check_area},"
f"polygon='{polygon_str}',conf_thres={conf_thres},iou_thres={iou_thres} "
@ -167,18 +168,62 @@ class DBManager():
return data[0]
else:
strsql = f"select ID from channel2model where channel_id={channel_id};"
data = self.do_select(strsql,1)
if data:
return data[0]
new_c2m_ids = self.do_select(strsql,1)
if new_c2m_ids:
return new_c2m_ids[0]
else:
print("正常不会没有值!!")
return 0
def getschedule(self,c2m_id):
'''
根据c2mID 查询该算法的布防时间
:param c2m_id:
:return: 以day为行hour作为列的布防标识值二维list
'''
strsql = f"select day,hour,status from schedule where channel2model_id ={c2m_id} order by hour asc,day asc;"
datas = self.do_select(strsql)
onelist = []
twolist = []
threelist = []
fourlist = []
fivelist = []
sixlist = []
sevenlist = []
if datas:
for i in range(24):
onelist.append(datas[i*7 + 0][2])
twolist.append(datas[i*7 + 1][2])
threelist.append(datas[i*7 + 2][2])
fourlist.append(datas[i*7 + 3][2])
fivelist.append(datas[i*7 + 4][2])
sixlist.append(datas[i*7 + 5][2])
sevenlist.append(datas[i*7 + 6][2])
else:
self.logger.debug(f"没有数据--{c2m_id}")
onelist = [1]*24
twolist = [1]*24
threelist = [1]*24
fourlist = [1]*24
fivelist = [1]*24
sixlist = [1]*24
sevenlist = [1]*24
schedule_list = []
schedule_list.append(onelist)
schedule_list.append(twolist)
schedule_list.append(threelist)
schedule_list.append(fourlist)
schedule_list.append(fivelist)
schedule_list.append(sixlist)
schedule_list.append(sevenlist)
return schedule_list
#检查设备ID是否在数据库?
def checkDevID(self,cID):
pass
def test(self):
# 建立数据库连接
conn = pymysql.connect(

357
core/ModelManager.py

@ -1,272 +1,37 @@
# 导入代码依赖
import time
import os
import cv2
import threading
import importlib.util
import datetime
from core.DBManager import mDBM,DBManager
from core.DBManager import mDBM
from myutils.MyLogger_logger import LogHandler
from myutils.ConfigManager import myCongif
from model.plugins.ModelBase import ModelBase
from core.ChannelManager import ChannelManager
from core.ACLModelManager import ACLModeManger
from core.WarnManager import WarnManager,WarnData
from core.CapManager import VideoCaptureWithFPS
from core.WarnManager import WarnManager
class ModelManager:
def __init__(self):
self.verify_list = ChannelManager() #模型的主要数据 -- 2024-7-5修改为类管理通道数据
self.bRun = True
self.logger = LogHandler().get_logger("ModelManager")
# 本地YOLOv5仓库路径
self.yolov5_path = myCongif.get_data("yolov5_path")
#self.buflen = myCongif.get_data("buffer_len")
self.icout_max = myCongif.get_data("RESET_INTERVAL") #跟视频帧序用一个变量
self.frame_rate = myCongif.get_data("frame_rate")
self.frame_interval = 1.0 / int(myCongif.get_data("verify_rate"))
#保存视频相关内容
self.FPS = myCongif.get_data("verify_rate") # 视频帧率--是否能实现动态帧率
self.fourcc = cv2.VideoWriter_fourcc(*'mp4v') # 使用 mp4 编码
#基于模型运行环境进行相应初始化工作
# 报警处理线程-全进程独立一个线程处理
self.warnM = None
# acl初始化 -- 一个进程一个
self.model_platform = myCongif.get_data("model_platform")
# acl初始化 -- 一个线程一个
if self.model_platform == "acl":
self.device_id = myCongif.get_data("device_id")
ACLModeManger.init_acl(self.device_id) #acl -- 全程序初始化
#self.model_dic = {} # model_id model
# 报警处理线程-全进程独立一个线程处理
self.warnM = None
def __del__(self):
self.logger.debug("释放资源")
self.stop_work(0) #停止所有工作
del self.verify_list #应该需要深入的删除--待完善
if self.model_platform == "acl":
if self.model_platform == "acl": #去初始化
ACLModeManger.del_acl(self.device_id) #acl -- 全程序反初始化 需要确保在执行析构前,其它资源已释放
def _open_view(self,url,itype): #打开摄像头 0--USB摄像头,1-RTSP,2-海康SDK
if itype == 0:
cap = VideoCaptureWithFPS(int(url))
elif itype == 1:
cap = VideoCaptureWithFPS(url)
else:
raise Exception("视频参数错误!")
return cap
def _import_model(self,model_name,model_path,threshold,iou_thres):
'''
根据路径动态导入模块
:param model_name: 模块名称
:param model_path: 模块路径
:param threshold: 置信阈值
:param iou_thres: iou阈值
:return:
'''
if os.path.exists(model_path):
module_spec = importlib.util.spec_from_file_location(model_name, model_path)
if module_spec is None:
self.logger.error(f"{model_path} 加载错误")
return None
module = importlib.util.module_from_spec(module_spec)
module_spec.loader.exec_module(module)
md = getattr(module, "Model")(model_path,threshold,iou_thres) #实例化类
if not isinstance(md, ModelBase):
self.logger.error("{} not zf_model".format(md))
return None
else:
self.logger.error("{}文件不存在".format(model_path))
return None
self.logger.debug(f"{model_path} 加载成功!!!!")
return md
def getschedule(self,c2m_id):
'''
根据c2mID 查询该算法的布防时间
:param c2m_id:
:return: 以day为行hour作为列的布防标识值二维list
'''
strsql = f"select day,hour,status from schedule where channel2model_id ={c2m_id} order by hour asc,day asc;"
datas = mDBM.do_select(strsql)
onelist = []
twolist = []
threelist = []
fourlist = []
fivelist = []
sixlist = []
sevenlist = []
if datas:
for i in range(24):
onelist.append(datas[i*7 + 0][2])
twolist.append(datas[i*7 + 1][2])
threelist.append(datas[i*7 + 2][2])
fourlist.append(datas[i*7 + 3][2])
fivelist.append(datas[i*7 + 4][2])
sixlist.append(datas[i*7 + 5][2])
sevenlist.append(datas[i*7 + 6][2])
else:
self.logger.debug(f"没有数据--{c2m_id}")
onelist = [1]*24
twolist = [1]*24
threelist = [1]*24
fourlist = [1]*24
fivelist = [1]*24
sixlist = [1]*24
sevenlist = [1]*24
schedule_list = []
schedule_list.append(onelist)
schedule_list.append(twolist)
schedule_list.append(threelist)
schedule_list.append(fourlist)
schedule_list.append(fivelist)
schedule_list.append(sixlist)
schedule_list.append(sevenlist)
return schedule_list
def set_last_img(self,):
pass
def verify(self,frame,model,model_data,channel_id,schedule,result,isdraw=1):
'''验证执行主函数,实现遍历通道关联的模型,调用对应模型执行验证'''
#img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
img = frame
#验证检测计划,是否在布防时间内
now = datetime.datetime.now() # 获取当前日期和时间
weekday = now.weekday() # 获取星期几,星期一是0,星期天是6
hour = now.hour
result.pop(0) # 保障结果数组定长 --先把最早的结果推出数组
warntext = ""
if model and schedule[weekday][hour] == 1: #不在计划则不进行验证,直接返回图片
# 调用模型,进行检测,model是动态加载的,具体的判断标准由模型内执行 ---- *********
#isverify = True
detections, bwarn, warntext = model.verify(img, model_data,isdraw) #****************重要
# 对识别结果要部要进行处理
if bwarn:
# 绘制报警文本
cv2.putText(img, warntext, (50, 50),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
result.append(1) #要验证数组修改,是地址修改吗?
else: #没有产生报警也需要记录,统一计算占比
warntext = ""
result.append(0)
else:
result.append(0)
# 将检测结果图像转换为帧--暂时用不到AVFrame--2024-7-5
# new_frame_rgb_avframe = av.VideoFrame.from_ndarray(img, format="rgb24") # AVFrame
# new_frame_rgb_avframe.pts = None # 添加此行确保有pts属性
# if isinstance(img, np.ndarray): -- 留个纪念
#处理完的图片后返回-bgr模式
#img_bgr_ndarray = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
# 将检查结果转换为WebP格式图片 --在线程里面完成应该可以减少网页端处理时间
ret,frame_bgr_webp=cv2.imencode('.jpg', img)
if not ret:
buffer_bgr_webp = None
else:
buffer_bgr_webp = frame_bgr_webp.tobytes()
return buffer_bgr_webp,img,warntext
def dowork_thread(self,channel_id,model_data,schedule,verify_rate,warn_interval):
'''一个通道一个线程,关联的模型在一个线程检测,局部变量都是一个通道独有'''
channel_data = self.verify_list.get_channel(channel_id) #是对ChannelData 对象的引用
context = None
model = self._import_model("", model_data[5], model_data[8], model_data[9]) # 动态加载模型处理文件py
if not model:
print("实例化自定义模型文件出错,退出线程!")
return
# 线程ACL初始化
if self.model_platform == "acl": # ACL线程中初始化内容
context = ACLModeManger.th_inti_acl(self.device_id)
#初始化模型资源 -- 加载模型文件
ret = model.init_acl_resource()
if not ret:
print("初始化模型资源出错,退出线程!")
return
result = [0 for _ in range(model_data[3] * verify_rate)] # 初始化时间*验证帧率数量的结果list
proportion = model_data[4] # 判断是否报警的占比
#model[6] -- c2m_id --布防计划 0-周一,6-周日
warn_last_time = time.time()
warn_save_count = 0 #保存录像的最新帧初始化为0
#开始拉取画面循环检测
cap = channel_data.cap
last_frame_time = time.time() #初始化个读帧时间
channel_data.b_model = True
while channel_data.bool_run: #基于tag 作为运行标识。 线程里只是读,住线程更新,最多晚一轮,应该不用线程锁。需验证
# 帧率控制帧率
current_time = time.time()
elapsed_time = current_time - last_frame_time
if elapsed_time < self.frame_interval:
time.sleep(self.frame_interval - elapsed_time) #若小于间隔时间则休眠
last_frame_time = time.time()
#*********取画面*************
ret,frame = cap.read() #除了第一帧,其它应该都是有画面的
if not ret:
continue #没读到画面继续
#执行图片推理
buffer_bgr_webp,img_bgr_ndarray,warn_text = self.verify(frame,model,model_data,channel_id,schedule,result)
#分析图片放入内存中
channel_data.add_deque(img_bgr_ndarray) # 缓冲区大小由maxlen控制 超上限后,删除最前的数据
#channel_data.increment_counter() #帧序列加一
# 一直更新最新帧,提供网页端显示
channel_data.update_last_frame(buffer_bgr_webp)
#print(f"{channel_id}--Frame updated at:",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
if warn_text:
#验证result -是否触发报警要求 --遍历每个模型执行的result
count_one = float(sum(result)) #1,0 把1累加的和就是1的数量
ratio_of_ones = count_one / len(result)
#self.logger.debug(result)
if ratio_of_ones >= proportion: #触发报警
# 基于时间间隔判断
current_time = time.time()
elapsed_time = current_time - warn_last_time
if elapsed_time < warn_interval:
continue
warn_last_time = current_time
# 处理报警
warn_data = WarnData()
warn_data.model_name = model_data[7]
warn_data.warn_text = warn_text
warn_data.img_buffer = channel_data.copy_deque() # 深度复制缓冲区
warn_data.width = cap.width
warn_data.height = cap.height
warn_data.channel_id = channel_id
self.warnM.add_warn_data(warn_data)
# #更新帧序列号 #加了报警间隔 --buffer的长度小于间隔
# warn_save_count = buffer_count
#结果记录要清空
for i in range(len(result)):
result[i] = 0
# end_time = time.time() # 结束时间
# print(f"Processing time: {end_time - start_time} seconds")
# 本地显示---测试使用
# if channel_id == 2:
# cv2.imshow(str(channel_id), img)
# if cv2.waitKey(1) & 0xFF == ord('q'):
# break
#结束线程
print("开始结束工作线程")
channel_data.b_model = False
if self.model_platform == "acl": # ACL线程中初始化内容
try:
model.release() #释放模型资源资源
# 删除模型对象
del model
#释放context
if context: # ACL线程中反初始化内容 -- 若线程异常退出,这些资源就不能正常释放了
# 再释放context
ACLModeManger.th_del_acl(context)
except Exception as e:
print(e)
#cv2.destroyAllWindows()
print("线程结束!!!!")
def send_warn(self):
'''发送报警信息'''
pass
@ -280,84 +45,106 @@ class ModelManager:
1.布防开关需要触发通道关闭和开启
2.布防策略的调整也需要关闭和重启工作
'''
# 启动告警线程
if self.warnM is None:
self.warnM = WarnManager()
self.warnM.start_warnmanager_th()
#工作线程:cap+model
if channel_id ==0:
strsql = "select id,ulr,type from channel where is_work = 1;" #执行所有通道
else:
strsql = f"select id,ulr,type from channel where is_work = 1 and id = {channel_id};" #单通道启动检测线程
datas = mDBM.do_select(strsql)
deque_length = myCongif.get_data("buffer_len")
icount_max = myCongif.get_data("RESET_INTERVAL")
for data in datas:
channel_id = data[0]
#1.创建channel对象 channel_id, str_url, int_type, bool_run, deque_length
c_data = self.verify_list.add_channel(channel_id,data[1],data[2],True,
myCongif.get_data("buffer_len"),myCongif.get_data("RESET_INTERVAL"))
#2.启动该通道的视频捕获线程 --把视频捕获线程,放主线程创建
c_data.cap = self._open_view(c_data.str_url, c_data.int_type) # 创建子线程读画面-把cap给模型就行--
#3.启动工作线程 **************************
self.start_work_th(channel_id,c_data)
# 启动告警线程
if self.warnM is None:
self.warnM = WarnManager()
self.warnM.start_warnmanager_th()
def stop_work(self,channel_id=0): #要对应start_work 1.停止工作线程,2.停止cap线程。3.删除c_data
#1.创建channel对象 #channel_id,deque_length,icount_max,warnM
ret = self.verify_list.add_channel(channel_id,deque_length,icount_max,self.warnM)
if ret:
# 2.启动工作线程
strsql = (
f"select t1.model_id,t1.check_area,t1.polygon ,t2.duration_time,t2.proportion,t2.model_path,t1.ID,"
f"t2.model_name,t1.conf_thres,t1.iou_thres "
f"from channel2model t1 left join model t2 on t1.model_id = t2.ID where t1.channel_id ={channel_id};")
model_data = mDBM.do_select(strsql, 1) # 2024-7-12调整规则,一个通道只关联一个模型,表结构暂时不动
# cap_data[source,type]
cap_data = [data[1], data[2]]
if model_data and model_data[0]: # 如果该通道关联了模型
schedule = mDBM.getschedule(model_data[6]) # 获取布防计划
#启动
self.verify_list.start_channel(channel_id,cap_data,model_data,schedule,0) #cap + model
else:
self.logger.debug(f"{channel_id}通道没有关联模型,只运行cap采集线程")
# 启动
self.verify_list.start_channel(channel_id, cap_data, None,None, 1) # cap
else:
print("*****有错误********")
def stop_work(self,channel_id=0): #要对应start_work 1.停止model线程,2.停止cap线程。3.删除c_data
'''停止工作线程(包括采集线程,并删除通道数据对象),0-停止所有,非0停止对应通道ID的线程'''
try:
self.verify_list.stop_channel(channel_id)
self.verify_list.delete_channel(channel_id) # 1,2,3
except Exception as e:
print(e)
if channel_id == 0:
#停止告警线程
self.warnM.brun = False
del self.warnM
self.warnM = None
def start_work_th(self,channel_id,c_data):
verify_rate = myCongif.get_data("verify_rate")
warn_interval = myCongif.get_data("warn_interval")
def startModelWork(self,channel_id):
strsql = (
f"select t1.model_id,t1.check_area,t1.polygon ,t2.duration_time,t2.proportion,t2.model_path,t1.ID,"
f"t2.model_name,t1.conf_thres,t1.iou_thres "
f"from channel2model t1 left join model t2 on t1.model_id = t2.ID where t1.channel_id ={channel_id};")
model_data = mDBM.do_select(strsql, 1) # 2024-7-12调整规则,一个通道只关联一个模型,表结构暂时不动
if model_data and model_data[0]: # 如果该通道关联了模型
schedule = self.getschedule(model_data[6]) # 获取布防计划
# 数据准备OK-开始工作线程
c_data.bool_run = True
c_data.work_th = threading.Thread(target=self.dowork_thread,
args=(channel_id, model_data, schedule, verify_rate,
warn_interval)) # 一个视频通道一个线程
c_data.work_th.start()
schedule = mDBM.getschedule(model_data[6]) # 获取布防计划
# cap_data[source,type]
cap_data = None
# 启动
self.logger.debug(f"通道{channel_id}重新启动model线程")
self.verify_list.start_channel(channel_id, cap_data, model_data, schedule, 2) # cap + model
else:
self.logger.debug(f"通道{channel_id}没有关联模型,不需要启动model线程")
def restartC2M(self,channel_id):
'''
修改通道管理的算法模型后需要对该通道算法执行部分重新加载执行
修改通道管理的算法模型后需要对该通道算法执行部分重新加载执行 -- 只重启model线程
:param channel_id:
:return:
'''
channel_data = self.verify_list.get_channel(channel_id)
#停止该通道的工作线程 --dowork_thread -- 并清空channel_data中的业务数据
channel_data.clear()
#重启该通道的工作线程
self.start_work_th(channel_id,channel_data)
#停止该通道的工作线程 -- 不停cap
self.verify_list.stop_channel(channel_id,2)
#重新读取工作线程数据,重启该通道的工作线程
self.startModelWork(channel_id)
def test1(self):
# from model.plugins.RYRQ_Model_ACL.RYRQ_Model_ACL import Model
# mymodel = Model("")
# mymodel.testRun()
import importlib
# 需要导入的模块路径
module_path = "model.plugins.RYRQ_Model_ACL.RYRQ_Model_ACL"
# 动态导入模块
module_name = importlib.import_module(module_path)
# 从模块中加载指定的类
Model = getattr(module_name, "Model")
# 使用 Model 类
model_instance = Model("")
model_instance.testRun()
#print(f"Current working directory (ModelManager.py): {os.getcwd()}")
mMM = ModelManager()
def test1():
pass
if __name__ == "__main__":
mMM.start_work()
#test1()
print("111")
# name = acl.get_soc_name()
# count, ret = acl.rt.get_device_count()

20
core/WarnManager.py

@ -36,12 +36,19 @@ class WarnManager:
myDBM = DBManager()
myDBM.connect()
while self.brun:
warn_data = self.warn_q.get()
self.save_warn(warn_data.model_name,warn_data.img_buffer,warn_data.width,warn_data.height,
warn_data.channel_id,self.FPS,self.fourcc,myDBM)
self.send_warn()
del warn_data.img_buffer
del warn_data
warn_data = None
try:
warn_data = self.warn_q.get(timeout=5)
except queue.Empty:
continue
if warn_data:
self.save_warn(warn_data.model_name,warn_data.img_buffer,warn_data.width,warn_data.height,
warn_data.channel_id,self.FPS,self.fourcc,myDBM)
self.send_warn()
del warn_data.img_buffer
warn_data.img_buffer = None
del warn_data
warn_data = None
def start_warnmanager_th(self):
@ -97,6 +104,5 @@ class WarnManager:
f"Values ('{model_name}','model/warn/{filename}.mp4','model/warn/{filename}.png',"
f"'{current_time_str}','{channnel_id}');")
ret = myDBM.do_sql(strsql)
del myDBM # 释放数据库连接资源
return ret

53
model/base_model/ascnedcl/det_utils.py

@ -63,7 +63,7 @@ def non_max_suppression(
agnostic=False,
multi_label=False,
labels=(),
max_det=300,
max_det=100, #每张图片最低暴力的检测框数量,原本是300,
nm=0, # number of masks
):
"""Non-Maximum Suppression (NMS) on inference results to reject overlapping detections
@ -71,19 +71,21 @@ def non_max_suppression(
Returns:
list of detections, on (n,6) tensor per image [xyxy, conf, cls]
"""
#判断并获取预测结果:如果 prediction 是列表或元组,选择其中的第一个元素作为模型预测输出
if isinstance(prediction, (list, tuple)): # YOLOv5 model in validation model, output = (inference_out, loss_out)
prediction = prediction[0] # select only inference output
#设备检测:检测当前运行设备是否为 Apple MPS,如果是则将预测结果转换为 CPU 进行处理,因为 MPS 尚不完全支持 NMS。
device = prediction.device
mps = 'mps' in device.type # Apple MPS
if mps: # MPS not fully supported yet, convert tensors to CPU before NMS
prediction = prediction.cpu()
# mps = 'mps' in device.type # Apple MPS
# if mps: # MPS not fully supported yet, convert tensors to CPU before NMS
# prediction = prediction.cpu()
#获取批量大小和类数量:bs 表示批量大小,nc 表示类的数量,xc 表示通过置信度阈值的候选框。
bs = prediction.shape[0] # batch size
nc = prediction.shape[2] - nm - 5 # number of classes
xc = prediction[..., 4] > conf_thres # candidates
# Checks
# Checks 参数检查:确保 conf_thres 和 iou_thres 在有效范围内。
assert 0 <= conf_thres <= 1, f'Invalid Confidence threshold {conf_thres}, valid values are between 0.0 and 1.0'
assert 0 <= iou_thres <= 1, f'Invalid IoU {iou_thres}, valid values are between 0.0 and 1.0'
@ -92,17 +94,21 @@ def non_max_suppression(
max_wh = 7680 # (pixels) maximum box width and height
max_nms = 30000 # maximum number of boxes into torchvision.ops.nms()
time_limit = 0.5 + 0.05 * bs # seconds to quit after
multi_label &= nc > 1 # multiple labels per box (adds 0.5ms/img)
multi_label &= nc > 1 # multiple labels per box (adds 0.5ms/img) 如果 nc > 1,则允许每个框有多个标签
#初始化:记录当前时间用于后续的时间限制,mi 表示掩码索引的起始位置,
t = time.time()
mi = 5 + nc # mask start index
output = [torch.zeros((0, 6 + nm), device=prediction.device)] * bs
# output 初始化为一个批次大小的空列表,每个元素是形状为 (0, 6 + nm) 的零张量。
output = [torch.zeros((0, 6 + nm), device=device)] * bs
#逐图像处理:遍历批量中的每一张图像,只保留通过置信度筛选的框
for xi, x in enumerate(prediction): # image index, image inference
# Apply constraints
# x[((x[..., 2:4] < min_wh) | (x[..., 2:4] > max_wh)).any(1), 4] = 0 # width-height
x = x[xc[xi]] # confidence
# Cat apriori labels if autolabelling
# Cat apriori labels if autolabelling 自动标签:如果存在标签,按照标签信息构建一个新的检测框,并将其与原有的检测框拼接
if labels and len(labels[xi]):
lb = labels[xi]
v = torch.zeros((len(lb), nc + nm + 5), device=x.device)
@ -110,31 +116,32 @@ def non_max_suppression(
v[:, 4] = 1.0 # conf
v[range(len(lb)), lb[:, 0].long() + 5] = 1.0 # cls
x = torch.cat((x, v), 0)
del v
# If none remain process next image
# If none remain process next image 空检测处理:如果没有剩余检测框,则跳过该图像
if not x.shape[0]:
continue
# Compute conf
# Compute conf 置信度计算:更新置信度,x[:, 5:] 乘以 x[:, 4:5],即置信度等于目标置信度乘以类别置信度
x[:, 5:] *= x[:, 4:5] # conf = obj_conf * cls_conf
# Box/Mask
# Box/Mask 转换框坐标:将框坐标从中心点格式转换为左上角和右下角格式,并获取掩码信息(如果存在)。
box = xywh2xyxy(x[:, :4]) # center_x, center_y, width, height) to (x1, y1, x2, y2)
mask = x[:, mi:] # zero columns if no masks
# Detections matrix nx6 (xyxy, conf, cls)
if multi_label:
if multi_label: #如果允许多标签,选择所有符合阈值的类标签。
i, j = (x[:, 5:mi] > conf_thres).nonzero(as_tuple=False).T
x = torch.cat((box[i], x[i, 5 + j, None], j[:, None].float(), mask[i]), 1)
else: # best class only
else: # best class only 只选择置信度最高的标签。
conf, j = x[:, 5:mi].max(1, keepdim=True)
x = torch.cat((box, conf, j.float(), mask), 1)[conf.view(-1) > conf_thres]
# Filter by class
# Filter by class 按类过滤:如果指定了 classes,则只保留属于这些类的检测框
if classes is not None:
x = x[(x[:, 5:6] == torch.tensor(classes, device=x.device)).any(1)]
# Check shape
# Check shape 如果检测框超过 max_nms,则按置信度排序后只保留前 max_nms 个框
n = x.shape[0] # number of boxes
if not n: # no boxes
continue
@ -143,20 +150,24 @@ def non_max_suppression(
else:
x = x[x[:, 4].argsort(descending=True)] # sort by confidence
# Batched NMS
# Batched NMS 根据类偏移框坐标,对每个类进行非极大值抑制,保留 max_det 个检测框。
c = x[:, 5:6] * (0 if agnostic else max_wh) # classes
boxes, scores = x[:, :4] + c, x[:, 4] # boxes (offset by class), scores
i = torchvision.ops.nms(boxes, scores, iou_thres) # NMS
if i.shape[0] > max_det: # limit detections
i = i[:max_det]
#输出结果:将NMS后的结果保存到 output 中,如果超时则打印警告并终止处理
output[xi] = x[i]
if mps:
output[xi] = output[xi].to(device)
# if mps:
# output[xi] = output[xi].to(device)
if (time.time() - t) > time_limit:
print(f'WARNING ⚠️ NMS time limit {time_limit:.3f}s exceeded')
del c,x,box,mask
break # time limit exceeded
del c, x, box, mask
del device,bs,nc,xc
del prediction
return output

20
model/plugins/ModelBase.py

@ -72,12 +72,12 @@ class ModelBase(ABC):
return False
#acl ----- 相关-----
def _init_acl(self):
device_id = 0
self.context, ret = acl.rt.create_context(device_id) # 显式创建一个Context
if ret:
raise RuntimeError(ret)
print('Init TH-Context Successfully')
# def _init_acl(self):
# device_id = 0
# self.context, ret = acl.rt.create_context(device_id) # 显式创建一个Context
# if ret:
# raise RuntimeError(ret)
# print('Init TH-Context Successfully')
# def _del_acl(self):
# device_id = 0
@ -100,6 +100,8 @@ class ModelBase(ABC):
self.model_desc = acl.mdl.create_desc() # 初始化模型信息对象
if not self.model_desc:
return False
#(例如输入/输出的个数、名称、数据类型、Format、维度信息等)
ret = acl.mdl.get_desc(self.model_desc, self.model_id) # 根据模型ID获取该模型的aclmdlDesc类型数据(描述信息)
print("[Model] Model init resource stage success")
# 创建模型输出 dataset 结构
@ -134,9 +136,10 @@ class ModelBase(ABC):
self._input_num = acl.mdl.get_num_inputs(self.model_desc) # 获取模型输入个数
self.input_dataset = acl.mdl.create_dataset() # 创建输入dataset结构
for i in range(self._input_num):
item = input_list[i] # 获取第 i 个输入数据
item = input_list[i] # 获取第 i 个输入数据 --#?这里输入个数和input_list的个数是一一对应的? 还是默认只有一个输入
data_ptr = acl.util.bytes_to_ptr(item.tobytes()) # 获取输入数据字节流
size = item.size * item.itemsize # 获取输入数据字节数
#这里没有调用acl.rt.malloc 申请内存 后续需要跟进对比下。#?
dataset_buffer = acl.create_data_buffer(data_ptr, size) # 创建输入dataset buffer结构, 填入输入数据
_, ret = acl.mdl.add_dataset_buffer(self.input_dataset, dataset_buffer) # 将dataset buffer加入dataset
@ -206,7 +209,8 @@ class ModelBase(ABC):
self.input_dataset = None
return None
out_numpy = self._output_dataset_to_numpy() # 将推理输出的二进制数据流解码为numpy数组, 数组的shape和类型与模型输出规格一致
#self._release_dataset(self.input_dataset) # 释放dataset -- 要不要执行需要验证
self._release_dataset(self.input_dataset) # 释放dataset -- 要不要执行需要验证
self.input_dataset = None
return out_numpy
def release(self):

76
model/plugins/Peo_ACL/Peo_Model_ACL.py

@ -0,0 +1,76 @@
import os.path
from model.plugins.ModelBase import ModelBase
from myutils.ConfigManager import myCongif
from model.base_model.ascnedcl.det_utils import get_labels_from_txt, letterbox, scale_coords, nms, draw_bbox # 模型前后处理相关函数
import cv2
import numpy as np
import torch # 深度学习运算框架,此处主要用来处理数据
from core.ACLModelManager import ACLModeManger
class Model(ModelBase):
def __init__(self,path,threshold=0.5):
# 找pt模型路径 -- 一个约束py文件和模型文件的路径关系需要固定, -- 上传模型时,要解压好路径
dirpath, filename = os.path.split(path)
self.model_file = os.path.join(dirpath, "yolov5s_bs1.om") # 目前约束模型文件和py文件在同一目录
#self.coco_file = os.path.join(dirpath, "coco_names.txt")
super().__init__(self.model_file) # acl环境初始化基类负责类的实例化
self.name = "人员模型-yolov5"
self.version = "V1.0"
self.model_type = 2
self.neth = 640 # 缩放的目标高度, 也即模型的输入高度
self.netw = 640 # 缩放的目标宽度, 也即模型的输入宽度
self.conf_threshold = threshold # 置信度阈值
def verify(self,image,data,isdraw=1):
labels_dict = get_labels_from_txt('/mnt/zfbox/model/plugins/RYRQ_ACL/coco_names.txt') # 得到类别信息,返回序号与类别对应的字典
# 数据前处理
img, scale_ratio, pad_size = letterbox(image, new_shape=[640, 640]) # 对图像进行缩放与填充
img = img[:, :, ::-1].transpose(2, 0, 1) # BGR to RGB, HWC to CHW #图片在输入时已经做了转换
img = np.ascontiguousarray(img, dtype=np.float32) / 255.0 # 转换为内存连续存储的数组
# 模型推理, 得到模型输出
outputs = None
outputs = self.execute([img,])#创建input,执行模型,返回结果 --失败返回None
filtered_pred_all = None
bwarn = False
warn_text = ""
# 是否有检测区域,有先绘制检测区域 由于在该函数生成了polygon对象,所有需要在检测区域前调用。
if data[1] == 1:
self.draw_polygon(image, data[2], (255, 0, 0))
if outputs:
output = outputs[0] #只放了一张图片
# 后处理 -- boxout 是 tensor-list: [tensor([[],[].[]])] --[x1,y1,x2,y2,置信度,coco_index]
boxout = nms(torch.tensor(output), conf_thres=0.3,
iou_thres=0.5) # 利用非极大值抑制处理模型输出,conf_thres 为置信度阈值,iou_thres 为iou阈值
pred_all = boxout[0].numpy() # 转换为numpy数组 -- [[],[],[]] --[x1,y1,x2,y2,置信度,coco_index]
# pred_all[:, :4] 取所有行的前4列,pred_all[:,1]--第一列
scale_coords([640, 640], pred_all[:, :4], image.shape, ratio_pad=(scale_ratio, pad_size)) # 将推理结果缩放到原始图片大小
#过滤掉不是目标标签的数据 -- 序号0-- person
filtered_pred_all = pred_all[pred_all[:, 5] == 0]
# 绘制检测结果 --- 也需要封装在类里,
for pred in filtered_pred_all:
x1, y1, x2, y2 = int(pred[0]), int(pred[1]), int(pred[2]), int(pred[3])
# # 绘制目标识别的锚框 --已经在draw_bbox里处理
# cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
if data[1] == 1: # 指定了检测区域
x_center = (x1 + x2) / 2
y_center = (y1 + y2) / 2
#绘制中心点?
cv2.circle(image, (int(x_center), int(y_center)), 5, (0, 0, 255), -1)
#判断是否区域点
if not self.is_point_in_region((x_center, y_center)):
continue #没产生报警-继续
#产生报警 -- 有一个符合即可
bwarn = True
warn_text = "People checked!"
img_dw = draw_bbox(filtered_pred_all, image, (0, 255, 0), 2, labels_dict) # 画出检测框、类别、概率
#cv2.imwrite('img_res_peo.png', image)
return filtered_pred_all, bwarn, warn_text
def testRun(self):
print("1111")

BIN
model/plugins/Peo_ACL/people.om

Binary file not shown.

17
model/plugins/RYRQ_ACL/RYRQ_Model_ACL.py

@ -31,12 +31,12 @@ class Model(ModelBase):
img = np.ascontiguousarray(img, dtype=np.float32) / 255.0 # 转换为内存连续存储的数组
# 模型推理, 得到模型输出
outputs = None
outputs = self.execute([img,])#创建input,执行模型,返回结果 --失败返回None
filtered_pred_all = None
bwarn = False
warn_text = ""
# 是否有检测区域,有先绘制检测区域 由于在该函数生成了polygon对象,所有需要在检测区域前调用。
if data[1] == 1:
self.draw_polygon(image, data[2], (255, 0, 0))
@ -45,8 +45,10 @@ class Model(ModelBase):
output = outputs[0] #只放了一张图片
# 后处理 -- boxout 是 tensor-list: [tensor([[],[].[]])] --[x1,y1,x2,y2,置信度,coco_index]
# 利用非极大值抑制处理模型输出,conf_thres 为置信度阈值,iou_thres 为iou阈值
boxout = nms(torch.tensor(output), conf_thres=self.conf_threshold,iou_thres=self.iou_thres)
output_torch = torch.tensor(output)
boxout = nms(output_torch, conf_thres=0.4,iou_thres=0.5)
del output_torch
output_torch = None
pred_all = boxout[0].numpy() # 转换为numpy数组 -- [[],[],[]] --[x1,y1,x2,y2,置信度,coco_index]
# pred_all[:, :4] 取所有行的前4列,pred_all[:,1]--第一列
scale_coords([640, 640], pred_all[:, :4], image.shape, ratio_pad=(scale_ratio, pad_size)) # 将推理结果缩放到原始图片大小
@ -68,9 +70,14 @@ class Model(ModelBase):
#产生报警 -- 有一个符合即可
bwarn = True
warn_text = "People Intruder detected!"
img_dw = draw_bbox(filtered_pred_all, image, (0, 255, 0), 2, labels_dict) # 画出检测框、类别、概率
draw_bbox(filtered_pred_all, image, (0, 255, 0), 2, labels_dict) # 画出检测框、类别、概率
#清理内存
del outputs,output
del boxout
del pred_all,filtered_pred_all
#cv2.imwrite('img_res.png', img_dw)
return filtered_pred_all, bwarn, warn_text
return bwarn, warn_text
def testRun(self):
print("1111")

81
model/plugins/RYRQ_Model_ACL/RYRQ_Model_ACL.py

@ -0,0 +1,81 @@
import os.path
from model.plugins.ModelBase import ModelBase
from model.base_model.ascnedcl.det_utils import get_labels_from_txt, letterbox, scale_coords, nms, draw_bbox # 模型前后处理相关函数
import cv2
import numpy as np
import torch # 深度学习运算框架,此处主要用来处理数据
class Model(ModelBase):
def __init__(self,path,threshold=0.5,iou_thres=0.5):
# 找pt模型路径 -- 一个约束py文件和模型文件的路径关系需要固定, -- 上传模型时,要解压好路径
dirpath, filename = os.path.split(path)
self.model_file = os.path.join(dirpath, "yolov5s_bs1.om") # 目前约束模型文件和py文件在同一目录
self.coco_file = os.path.join(dirpath, "coco_names.txt")
super().__init__(self.model_file) # acl环境初始化基类负责类的实例化
self.name = "人员入侵-yolov5"
self.version = "V1.0"
self.model_type = 2
self.neth = 640 # 缩放的目标高度, 也即模型的输入高度
self.netw = 640 # 缩放的目标宽度, 也即模型的输入宽度
self.conf_threshold = threshold # 置信度阈值
self.iou_thres = iou_thres #IOU阈值
def verify(self,image,data,isdraw=1):
labels_dict = get_labels_from_txt('/mnt/zfbox/model/plugins/RYRQ_ACL/coco_names.txt') # 得到类别信息,返回序号与类别对应的字典
# 数据前处理
img, scale_ratio, pad_size = letterbox(image, new_shape=[640, 640]) # 对图像进行缩放与填充
img = img[:, :, ::-1].transpose(2, 0, 1) # BGR to RGB, HWC to CHW #图片在输入时已经做了转换
img = np.ascontiguousarray(img, dtype=np.float32) / 255.0 # 转换为内存连续存储的数组
# 模型推理, 得到模型输出
outputs = self.execute([img,])#创建input,执行模型,返回结果 --失败返回None
filtered_pred_all = None
bwarn = False
warn_text = ""
# 是否有检测区域,有先绘制检测区域 由于在该函数生成了polygon对象,所有需要在检测区域前调用。
if data[1] == 1:
self.draw_polygon(image, data[2], (255, 0, 0))
if outputs:
output = outputs[0] #只放了一张图片
# 后处理 -- boxout 是 tensor-list: [tensor([[],[].[]])] --[x1,y1,x2,y2,置信度,coco_index]
# 利用非极大值抑制处理模型输出,conf_thres 为置信度阈值,iou_thres 为iou阈值
output_torch = torch.tensor(output)
boxout = nms(output_torch, conf_thres=0.4,iou_thres=0.5)
del output_torch
pred_all = boxout[0].numpy() # 转换为numpy数组 -- [[],[],[]] --[x1,y1,x2,y2,置信度,coco_index]
# pred_all[:, :4] 取所有行的前4列,pred_all[:,1]--第一列
scale_coords([640, 640], pred_all[:, :4], image.shape, ratio_pad=(scale_ratio, pad_size)) # 将推理结果缩放到原始图片大小
#过滤掉不是目标标签的数据 -- 序号0-- person
filtered_pred_all = pred_all[pred_all[:, 5] == 0]
# 绘制检测结果 --- 也需要封装在类里,
for pred in filtered_pred_all:
x1, y1, x2, y2 = int(pred[0]), int(pred[1]), int(pred[2]), int(pred[3])
# # 绘制目标识别的锚框 --已经在draw_bbox里处理
# cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
if data[1] == 1: # 指定了检测区域
x_center = (x1 + x2) / 2
y_center = (y1 + y2) / 2
#绘制中心点?
cv2.circle(image, (int(x_center), int(y_center)), 5, (0, 0, 255), -1)
#判断是否区域点
if not self.is_point_in_region((x_center, y_center)):
continue #没产生报警-继续
#产生报警 -- 有一个符合即可
bwarn = True
warn_text = "People Intruder detected!"
draw_bbox(filtered_pred_all, image, (0, 255, 0), 2, labels_dict) # 画出检测框、类别、概率
# 清理内存
del outputs, output
del boxout
del pred_all,filtered_pred_all
#cv2.imwrite('img_res.png', img_dw)
return bwarn, warn_text
def testRun(self):
print("I am RYRQ-Model-ACL!!!")

80
model/plugins/RYRQ_Model_ACL/coco_names.txt

@ -0,0 +1,80 @@
person
bicycle
car
motorcycle
airplane
bus
train
truck
boat
traffic_light
fire_hydrant
stop_sign
parking_meter
bench
bird
cat
dog
horse
sheep
cow
elephant
bear
zebra
giraffe
backpack
umbrella
handbag
tie
suitcase
frisbee
skis
snowboard
sports_ball
kite
baseball_bat
baseball_glove
skateboard
surfboard
tennis_racket
bottle
wine_glass
cup
fork
knife
spoon
bowl
banana
apple
sandwich
orange
broccoli
carrot
hot_dog
pizza
donut
cake
chair
couch
potted_plant
bed
dining_table
toilet
tv
laptop
mouse
remote
keyboard
cell_phone
microwave
oven
toaster
sink
refrigerator
book
clock
vase
scissors
teddy_bear
hair_drier
toothbrush

BIN
model/plugins/RYRQ_Model_ACL/yolov5s_bs1.om

Binary file not shown.

18
myutils/MyDeque.py

@ -0,0 +1,18 @@
from collections import deque
from threading import Thread, Lock
class MyDeque:
def __init__(self,maxlen=1):
self.dq = deque(maxlen=maxlen)
self.lock = Lock()
def myappend(self,object):
with self.lock:
self.dq.append(object)
def mypopleft(self):
object = None
with self.lock:
if self.dq:
object = self.dq.popleft()
return object

28
myutils/MyTraceMalloc.py

@ -0,0 +1,28 @@
import tracemalloc
import time
class MyTraceMalloc:
def __init__(self):
tracemalloc.start(25) # 保留25个堆栈级别
self.before_snapshot = tracemalloc.take_snapshot()
self.current_snapshot = None
self.top_stats = []
# 内存跟踪情况输出
def display_top_stats(self):
self.current_snapshot = tracemalloc.take_snapshot()
self.top_stats = self.current_snapshot .compare_to(self.before_snapshot, 'lineno')
print("[ Top 10 differences ]")
for stat in self.top_stats[:10]:
print(stat)
current, peak = tracemalloc.get_traced_memory()
print(f"当前内存分配: {current / 1024 / 1024:.2f} MiB")
print(f"峰值内存分配: {peak / 1024 / 1024:.2f} MiB")
self.before_snapshot = self.current_snapshot
def trace_memory(self):
while True:
time.sleep(60) # 每隔60秒输出一次
self.display_top_stats()

14
run.py

@ -6,8 +6,8 @@ import shutil
import asyncio
from hypercorn.asyncio import serve
from hypercorn.config import Config
from core.DBManager import mDBM
from core.Upload_file import allowed_file,check_file,updata_model
from myutils.MyTraceMalloc import MyTraceMalloc
import threading
print(f"Current working directory (run.py): {os.getcwd()}")
web = create_app()
@ -17,8 +17,7 @@ async def run_quart_app():
await serve(web, config)
def test():
filepath = "uploads/RYRQ_Model_ACL.zip"
check_file(filepath,2)
mMM.test1()
if __name__ == '__main__':
#test()
@ -33,8 +32,15 @@ if __name__ == '__main__':
else:
raise NotImplementedError(f"Unsupported operating system: {system}")
print(free/(1024*1024))
# #内存监控线程
# myTM = MyTraceMalloc()
# threading.Thread(target=myTM.trace_memory, daemon=True).start()
#启动工作线程
mMM.start_work() # 启动所有通道的处理
#mVManager.start_check_rtsp() #线程更新视频在线情况
#启动web服务
asyncio.run(run_quart_app())

66
web/API/channel.py

@ -6,8 +6,7 @@ from core.DBManager import mDBM
from myutils.ReManager import mReM
from core.ModelManager import mMM
from myutils.ConfigManager import myCongif
import cv2
import base64
import asyncio
import time
@api.route('/channel/tree',methods=['GET'])
@ -77,15 +76,25 @@ async def channel_add(): #新增通道 -- 2024-8-1修改为与修改通道用一
reStatus = 1
if cid == -1:
reMsg = '添加通道成功'
#await asyncio.sleep(0.5) # 休眠0.5,待数据更新完成
# 对新增的视频通道进行视频采集和
strsql = f"select ID from channel where area_id={area_id} and channel_name={cName};"
strsql = f"select ID from channel where area_id={area_id} and channel_name='{cName}';"
data = mDBM.do_select(strsql, 1)
if data:
cid = data[0]
mMM.start_work(cid)
else:
print(f"这里不应该没有值!!!!area_id-{area_id},cName--{cName}")
else:
await asyncio.sleep(1) #休眠一秒
strsql = f"select ID from channel where area_id={area_id} and channel_name='{cName}';"
data = mDBM.do_select(strsql, 1)
if data:
cid = data[0]
mMM.start_work(cid)
else:
print(f"这里真的不应该没有值了!!!!area_id-{area_id},cName--{cName}")
elif cid >0: #cid 不会是0,以防万一
reMsg = '修改通道成功'
#需要先停再启动---不区分有没有修改URL了,就当有修改使用
mMM.stop_work(cid)
@ -111,36 +120,30 @@ async def get_channel_img():
'''获取一帧图片'''
json_data = await request.get_json()
cid = int(json_data.get('cid'))
channel_data = mMM.verify_list.get_channel(cid)
if channel_data:
# 读取一帧画面
ret,frame = channel_data.cap.read()
if ret:
# 将帧转换为JPEG格式
_, buffer = cv2.imencode('.jpg', frame)
# 将图像数据编码为Base64
img_base64 = base64.b64encode(buffer).decode('utf-8')
# 返回JSON响应
return jsonify({"image": img_base64})
else:
return jsonify({"error": "Failed to capture frame"}), 404
img_base64 = mMM.verify_list.cm_get_cap_frame(cid)
if img_base64:
# 返回JSON响应
return jsonify({"image": img_base64})
else:
return jsonify({"error": "Channel not found"}), 500
return jsonify({"error": "Failed to capture frame"}), 404
@api.route('/channel/del',methods=['POST'])
@login_required
async def channel_del(): #删除通道
json_data = await request.get_json()
cid = int(json_data.get('cid'))
mMM.stop_work(cid)
#删除该通道和算法的关联信息:布防时间,算法模型数据----使用外键级联删除会方便很多,只要一个删除就可以
ret = mDBM.delchannel(cid)
if ret == True:
reStatus = 1
reMsg = '删除通道信息成功'
reStatus = 0
if cid > 0:
mMM.stop_work(cid)
#删除该通道和算法的关联信息:布防时间,算法模型数据----使用外键级联删除会方便很多,只要一个删除就可以
ret = mDBM.delchannel(cid)
if ret == True:
reStatus = 1
reMsg = '删除通道信息成功'
else:
reMsg = '删除通道信息失败,请联系技术支持!'
else:
reStatus = 0
reMsg = '删除通道信息失败,请联系技术支持!'
reMsg = '通道ID参数错误,请联系技术支持!'
return jsonify({'status': reStatus, 'msg': reMsg})
@api.route('/channel/check',methods=['GET'])
@ -214,7 +217,7 @@ async def change_c_t_m():
iou_thres = json_data.get('iou_thres')
conf_thres = json_data.get('conf_thres')
schedule = json_data.get('schedule')
cid = json_data.get('cid')
cid = int(json_data.get('cid'))
#返回数据
reStatus = 0
reMsg = "更新失败,请联系技术支持!"
@ -227,9 +230,8 @@ async def change_c_t_m():
#需要删除数据
ret = mDBM.delC2M(cid,1) #只是删除了数据库数据
if ret:
#重启下通道的工作线程
mMM.stop_work(cid)
mMM.start_work(cid)
#停止该通道的model线程
mMM.restartC2M(cid) #没有模型关联数据,模型工作线程不会启动
reStatus = 1
reMsg = "更新成功"
else: #原本就没数据
@ -245,8 +247,8 @@ async def change_c_t_m():
ret = updataSchedule(c2m_id,schedule.replace("'", "\"")) #schedule表
if ret:
# 重启下通道的工作线程
mMM.stop_work(cid)
mMM.start_work(cid)
print(f"restartC2M--{cid}")
mMM.restartC2M(cid) #没有模型关联数据,模型工作线程不会启动
reStatus = 1
reMsg = "更新成功"
return jsonify({'status': reStatus, 'msg': reMsg})

2
web/API/model.py

@ -76,7 +76,7 @@ async def model_add(): #新增算法
@api.route('/model/upgrade',methods=['POST'])
@login_required
async def model_upgrade(): #升级算法,需要先上传算法文件
async def model_upgrade(): #升级算法
reStatus = 0
reMsg = "有错误!"
form = await request.form

120
web/API/viedo.py

@ -6,6 +6,8 @@ from core.DBManager import mDBM
from myutils.ConfigManager import myCongif
import logging
import time
import subprocess
from collections import deque
# 配置日志
logging.basicConfig(level=logging.INFO)
@ -110,55 +112,85 @@ async def get_stats(peer_connection):
# "msg":reMsg
# })
@api.websocket('/ws/video_feed/<int:channel_id>')
async def ws_video_feed(channel_id):
print(f"New connection for channel: {channel_id}")
channel_data = mMM.verify_list.get_channel(channel_id)
def stream_rtsp_to_flv(rtsp_url):
command = [
'ffmpeg',
'-i', rtsp_url, # 输入 RTSP 流
'-vcodec', 'libx264', # 视频编码器
'-f', 'flv', # 输出格式为 FLV
'pipe:1' # 输出到管道
]
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return process
async def handle_channel_rtf(channel_id,websocket):
rtsp_url = "rtsp://192.168.3.103/live1"
process = stream_rtsp_to_flv(rtsp_url)
try:
if channel_data:
verify_rate = int(myCongif.get_data("verify_rate"))
error_max_count = verify_rate * int(myCongif.get_data("video_error_count")) #视频帧捕获失败触发提示的上限
sleep_time = int(myCongif.get_data("cap_sleep_time"))
frame_interval = 1.0 / verify_rate #用于帧率控制
last_frame_time = time.time() # 初始化个读帧时间
icount = 0
while True: #这里的多线程并发,还需要验证检查
# 帧率控制帧率 ---输出暂时不控制帧率,有就传输
current_time = time.time()
elapsed_time = current_time - last_frame_time
if elapsed_time < frame_interval:
await asyncio.sleep(frame_interval - elapsed_time) # 若小于间隔时间则休眠
last_frame_time = time.time()
#执行视频传输
frame = channel_data.get_last_frame()
if frame is not None:
#img = frame.to_ndarray(format="bgr24")
# ret, buffer = cv2.imencode('.jpg', frame)
# if not ret:
# continue
# frame = buffer.tobytes()
icount = 0
await websocket.send(frame)
else:
#print("frame is None")
icount += 1
if icount > error_max_count:
print("视频源无图像")
icount = 0
error_message = b"video_error"
await websocket.send(error_message)
await asyncio.sleep(sleep_time) #等待视频重连时间
#await asyncio.sleep(0.02)
else:
print("没有通道数据!")
error_message = b"client_error"
await websocket.send(error_message)
await asyncio.sleep(0.1) #等0.1秒前端处理时间
while True:
data = process.stdout.read(1024) # 从管道读取 FLV 数据
if not data:
break
await websocket.send(data) # 通过 WebSocket 发送 FLV 数据
except asyncio.CancelledError:
process.terminate() # 如果 WebSocket 连接关闭,则终止 ffmpeg 进程
finally:
process.terminate()
async def handle_channel(channel_id,websocket):
verify_rate = int(myCongif.get_data("verify_rate"))
error_max_count = verify_rate * int(myCongif.get_data("video_error_count")) # 视频帧捕获失败触发提示的上限
sleep_time = int(myCongif.get_data("cap_sleep_time"))
frame_interval = 1.0 / verify_rate # 用于帧率控制
last_frame_time = time.time() # 初始化个读帧时间
icount = 0
#视频传输缓冲区
#frame_buffer = deque(maxlen=10)
try:
while True: # 这里的多线程并发,还需要验证检查
# 帧率控制帧率 ---输出暂时不控制帧率,有就传输
current_time = time.time()
elapsed_time = current_time - last_frame_time
if elapsed_time < frame_interval:
await asyncio.sleep(frame_interval - elapsed_time) # 若小于间隔时间则休眠
last_frame_time = time.time()
# 执行视频传输
frame = mMM.verify_list.cm_get_last_frame(channel_id) # get_last_frame 用了try
if frame is not None:
# frame_buffer.append(frame) #先放入缓冲区
icount = 0
await websocket.send(frame)
else:
# print("frame is None")
icount += 1
if icount > error_max_count:
print(f"通道-{channel_id},长时间未获取图像,休眠一段时间后再获取。")
#icount = 0
error_message = b"video_error"
await websocket.send(error_message)
await asyncio.sleep(sleep_time) # 等待视频重连时间
except asyncio.CancelledError:
print(f"WebSocket connection for channel {channel_id} closed by client")
raise
except Exception as e:
print(f"WebSocket connection for channel {channel_id} closed: {e}")
print(f"Unexpected error: {e}")
finally:
print(f"Cleaning up resources for channel {channel_id}")
@api.websocket('/ws/video_feed/<int:channel_id>')
async def ws_video_feed(channel_id):
print(f"New connection for channel: {channel_id}")
# 为每个通道创建独立的协程
shendtask = asyncio.create_task(handle_channel(channel_id, websocket))
#shendtask = asyncio.create_task(handle_channel_rtf(channel_id, websocket))
# 等待协程完成
await shendtask
@api.route('/shutdown', methods=['POST'])
async def shutdown():#这是全关 --需要修改
coros = [pc.close() for pc in pcs]

BIN
web/main/static/favicon_bak.ico

Binary file not shown.

After

Width:  |  Height:  |  Size: 15 KiB

BIN
web/main/static/images/登录/u12.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 232 KiB

6
web/main/static/images/登录/u4.svg

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<svg version="1.1" xmlns:xlink="http://www.w3.org/1999/xlink" width="35px" height="40px" xmlns="http://www.w3.org/2000/svg">
<g transform="matrix(1 0 0 1 -1165 -28 )">
<path d="M 34.767578125 28.9192708333333 C 34.9225260416667 30.2300347222222 35 31.6232638888889 35 33.0989583333333 C 35 34.9913194444444 34.4303385416667 36.6145833333333 33.291015625 37.96875 C 32.1516927083333 39.3229166666667 30.7799479166667 40 29.17578125 40 L 5.82421875 40 C 4.22005208333333 40 2.84830729166667 39.3229166666667 1.708984375 37.96875 C 0.569661458333333 36.6145833333333 0 34.9913194444444 0 33.0989583333333 C 0 31.6232638888889 0.0774739583333335 30.2300347222222 0.232421875 28.9192708333333 C 0.387369791666667 27.6085069444444 0.674479166666667 26.2890625 1.09375 24.9609375 C 1.51302083333333 23.6328125 2.04622395833333 22.4956597222222 2.693359375 21.5494791666667 C 3.34049479166667 20.6032986111111 4.197265625 19.8307291666667 5.263671875 19.2317708333333 C 6.330078125 18.6328125 7.55598958333333 18.3333333333333 8.94140625 18.3333333333333 C 11.3294270833333 20.5555555555556 14.1822916666667 21.6666666666667 17.5 21.6666666666667 C 20.8177083333333 21.6666666666667 23.6705729166667 20.5555555555556 26.05859375 18.3333333333333 C 27.4440104166667 18.3333333333333 28.669921875 18.6328125 29.736328125 19.2317708333333 C 30.802734375 19.8307291666667 31.6595052083333 20.6032986111111 32.306640625 21.5494791666667 C 32.9537760416667 22.4956597222222 33.4869791666667 23.6328125 33.90625 24.9609375 C 34.3255208333333 26.2890625 34.6126302083333 27.6085069444444 34.767578125 28.9192708333333 Z M 24.923828125 2.9296875 C 26.974609375 4.8828125 28 7.23958333333333 28 10 C 28 12.7604166666667 26.974609375 15.1171875 24.923828125 17.0703125 C 22.873046875 19.0234375 20.3984375 20 17.5 20 C 14.6015625 20 12.126953125 19.0234375 10.076171875 17.0703125 C 8.025390625 15.1171875 7 12.7604166666667 7 10 C 7 7.23958333333333 8.025390625 4.8828125 10.076171875 2.9296875 C 12.126953125 0.9765625 14.6015625 0 17.5 0 C 20.3984375 0 22.873046875 0.9765625 24.923828125 2.9296875 Z " fill-rule="nonzero" fill="#000000" stroke="none" transform="matrix(1 0 0 1 1165 28 )" />
</g>
</svg>

6
web/main/static/resources/css/bootstrap.min.css

File diff suppressed because one or more lines are too long

337
web/main/static/resources/scripts/aiortc-client-new.js

@ -1,5 +1,7 @@
let video_list = {}; //element_id -- socket
let run_list = {}; //element_id -- runtag(替换berror)
let run_list = {}; //element_id -- runtag
let berror_state_list = {}; //element_id -- 错误信息显示
let m_count = 0;
var channel_list = null;
const fourViewButton = document.getElementById('fourView');
const nineViewButton = document.getElementById('nineView');
@ -8,9 +10,12 @@ const nineViewButton = document.getElementById('nineView');
window.addEventListener('beforeunload', function (event) {
// 关闭所有 WebSocket 连接或执行其他清理操作
for(let key in video_list){
delete run_list[key];
video_list[key].close();
const videoFrame = document.getElementById(`video-${key}`);
const event = new Event('closeVideo');
videoFrame.dispatchEvent(event);
delete video_list[key];
berror_state_list[key] = false;
}
});
@ -64,7 +69,32 @@ document.addEventListener('DOMContentLoaded', async function() {
}
});
document.addEventListener('click', function() {
console.log("第一次页面点击,开始显示视频--已注释",m_count);
// if(m_count != 0){
// count = m_count
// //获取视频接口
// const url = `/api/viewlist?count=${count}`;
// fetch(url)
// .then(response => response.json())
// .then(data => {
// console.log('Success:', data);
// clist = data.clist;
// elist = data.elist;
// nlist = data.nlist;
// for(let i=0;i<clist.length;i++){
// if(parseInt(elist[i]) < count){
// console.log("切换窗口时进行连接",clist[i])
// connectToStream(elist[i],clist[i],nlist[i])
// //startFLVStream(elist[i],clist[i],nlist[i]);
// }
// }
// })
// .catch(error => {
// console.error('Error:', error);
// });
// }
}, { once: true });
//视频窗口
document.getElementById('fourView').addEventListener('click', function() {
@ -100,9 +130,12 @@ document.getElementById('nineView').addEventListener('click', function() {
function generateVideoNodes(count) { //在这里显示视频-初始化
//结束在播放的socket
for(let key in video_list){
delete run_list[key];
video_list[key].close();
const videoFrame = document.getElementById(`video-${key}`);
const event = new Event('closeVideo');
videoFrame.dispatchEvent(event);
delete video_list[key];
berror_state_list[key] = false;
}
//切换窗口布局
const videoGrid = document.getElementById('videoGrid');
@ -116,31 +149,38 @@ function generateVideoNodes(count) { //在这里显示视频-初始化
<div class="video-title">Video Stream ${i+1}</div>
<div class="video-buttons">
<button onclick="toggleFullScreen(${i})">🔲</button>
<button onclick="closeVideo(${i})"></button>
<button onclick="closeVideo(${i})"></button>
<!-- <button onclick="closeFLVStream(${i})"></button> -->
</div>
</div>
<div class="video-area"><img id="video-${i}" alt="Video Stream" /></div>
<!-- <div class="video-area"><video id="video-${i}" controls></video></div> -->
</div>`;
}
videoGrid.innerHTML = html;
//获取视频接口
const url = `/api/viewlist?count=${count}`;
fetch(url)
.then(response => response.json())
.then(data => {
console.log('Success:', data);
clist = data.clist;
elist = data.elist;
nlist = data.nlist;
for(let i=0;i<clist.length;i++){
if(parseInt(elist[i]) < count){
connectToStream(elist[i],clist[i],nlist[i])
// if(m_count != 0){
//获取视频接口
const url = `/api/viewlist?count=${count}`;
fetch(url)
.then(response => response.json())
.then(data => {
console.log('Success:', data);
clist = data.clist;
elist = data.elist;
nlist = data.nlist;
for(let i=0;i<clist.length;i++){
if(parseInt(elist[i]) < count){
console.log("切换窗口时进行连接",clist[i])
connectToStream(elist[i],clist[i],nlist[i])
//startFLVStream(elist[i],clist[i],nlist[i]);
}
}
}
})
.catch(error => {
console.error('Error:', error);
});
})
.catch(error => {
console.error('Error:', error);
});
// }
//m_count = count
}
function toggleFullScreen(id) {
@ -155,51 +195,6 @@ function toggleFullScreen(id) {
};
}
function closeVideo(id) {
const titleElement = document.querySelector(`[data-frame-id="${id}"] .video-title`);
if (titleElement.textContent === `Video Stream ${Number(id)+1}`) {
showModal('当前视频窗口未播放视频。');
return;
};
console.log('closeVideo');
//发送视频链接接口
const url = '/api/close_stream';
const data = {"element_id":id};
// 发送 POST 请求
fetch(url, {
method: 'POST', // 指定请求方法为 POST
headers: {
'Content-Type': 'application/json' // 设置请求头,告诉服务器请求体的数据类型为 JSON
},
body: JSON.stringify(data) // 将 JavaScript 对象转换为 JSON 字符串
})
.then(response => response.json()) // 将响应解析为 JSON
.then(data => {
console.log('Success:', data);
const istatus = data.status;
if(istatus === 0){
showModal(data.msg); // 使用 Modal 显示消息
return;
}
else{
const videoFrame = document.querySelector(`[data-frame-id="${id}"] .video-area img`);
const titleElement = document.querySelector(`[data-frame-id="${id}"] .video-title`);
run_list[id] = false;
video_list[id].close();
videoFrame.src = ''; // 清空画面
videoFrame.style.display = 'none'; // 停止播放时隐藏 img 元素
titleElement.textContent = `Video Stream ${id+1}`;
removeErrorMessage(videoFrame)
}
})
.catch((error) => {
showModal(`Error: ${error.message}`); // 使用 Modal 显示错误信息
return;
});
}
function allowDrop(event) {
event.preventDefault();
}
@ -243,7 +238,9 @@ function drop(event) {
}
else{
//获取视频流
console.log("drop触发")
connectToStream(frameId,nodeId,nodeName);
//startFLVStream(frameId,nodeId,nodeName); #基于FLV的开发程度:后端直接用RTSP流转发是有画面的,但CPU占用太高,用不了。2024-8-30
}
})
.catch((error) => {
@ -254,7 +251,7 @@ function drop(event) {
}
function connectToStream(element_id,channel_id,channel_name) {
console.log("开始连接视频",channel_id);
console.log("开始连接视频",element_id,channel_id);
// 设置视频区域的标题
const titleElement = document.querySelector(`[data-frame-id="${element_id}"] .video-title`);
titleElement.textContent = channel_name;
@ -266,10 +263,18 @@ function connectToStream(element_id,channel_id,channel_name) {
function connect() {
const socket = new WebSocket(streamUrl);
console.log("socket建立----",element_id);
//全局变量需要维护好
if(element_id in video_list) {
video_list[element_id].close();
}else{
berror_state_list[element_id] = false;
}
video_list[element_id] = socket;
run_list[element_id] = true;
imgElement.style.display = 'block';
berror_state = false;
// 处理连接打开事件
socket.onopen = () => {
console.log('WebSocket connection established');
@ -285,18 +290,20 @@ function connectToStream(element_id,channel_id,channel_name) {
const decodedData = decoder.decode(arrayBuffer);
if (decodedData === "video_error") { //video_error
displayErrorMessage(imgElement, "该视频源未获取到画面,请检查,默认每隔2分钟将重新链接视频源。");
berror_state = true;
displayErrorMessage(imgElement, "该视频源未获取到画面,请检查后刷新重试,默认两分钟后重连");
berror_state_list[element_id] = true;
//socket.close(1000, "Normal Closure"); // 停止连接
} else if(decodedData === "client_error"){ //client_error
run_list[element_id] = false;
displayErrorMessage(imgElement, "该通道节点数据存在问题,请重启或联系技术支持!");
socket.close(); // 停止连接
berror_state = true;
socket.close(1000, "Normal Closure"); // 停止连接
berror_state_list[element_id] = true;
}
else {
if(berror_state){
if(berror_state_list[element_id]){
removeErrorMessage(imgElement);
berror_state = false;
berror_state_list[element_id] = false;
//console.log("移除错误信息!");
}
// 释放旧的对象URL
if (imgElement.src) {
@ -324,12 +331,186 @@ function connectToStream(element_id,channel_id,channel_name) {
socket.onerror = function() {
console.log(`WebSocket错误,Channel ID: ${channel_id}`);
socket.close();
socket.close(1000, "Normal Closure");
};
};
connect();
}
function closeVideo(id) {
const titleElement = document.querySelector(`[data-frame-id="${id}"] .video-title`);
if (titleElement.textContent === `Video Stream ${Number(id)+1}`) {
showModal('当前视频窗口未播放视频。');
return;
};
console.log('closeVideo');
//发送视频链接接口
const url = '/api/close_stream';
const data = {"element_id":id};
// 发送 POST 请求
fetch(url, {
method: 'POST', // 指定请求方法为 POST
headers: {
'Content-Type': 'application/json' // 设置请求头,告诉服务器请求体的数据类型为 JSON
},
body: JSON.stringify(data) // 将 JavaScript 对象转换为 JSON 字符串
})
.then(response => response.json()) // 将响应解析为 JSON
.then(data => {
console.log('Success:', data);
const istatus = data.status;
if(istatus == 0){
showModal(data.msg); // 使用 Modal 显示消息
return;
}
else{
const videoFrame = document.querySelector(`[data-frame-id="${id}"] .video-area img`);
const titleElement = document.querySelector(`[data-frame-id="${id}"] .video-title`);
run_list[id] = false;
video_list[id].close();
delete video_list[id];
videoFrame.src = ''; // 清空画面
videoFrame.style.display = 'none'; // 停止播放时隐藏 img 元素
titleElement.textContent = `Video Stream ${id+1}`;
removeErrorMessage(videoFrame);
berror_state_list[id] = false;
}
})
.catch((error) => {
showModal(`Error: ${error.message}`); // 使用 Modal 显示错误信息
return;
});
}
function startFLVStream(element_id,channel_id,channel_name) {
// 设置视频区域的标题
const titleElement = document.querySelector(`[data-frame-id="${element_id}"] .video-title`);
titleElement.textContent = channel_name;
//获取视频
// const imgElement = document.getElementById(`video-${element_id}`);
// imgElement.alt = `Stream ${channel_name}`;
const videoElement = document.getElementById(`video-${element_id}`);
let reconnectAttempts = 0;
const maxReconnectAttempts = 3;
const flvUrl = `ws://${window.location.host}/api/ws/video_feed/${channel_id}`;
function initFLVPlayer() {
if (flvjs.isSupported()) {
//要避免重复播放
if(element_id in video_list) {
closeFLVStream(element_id)
}else{
video_list[element_id] = element_id;
berror_state_list[element_id] = true;
}
flvPlayer = flvjs.createPlayer({
type: 'flv',
url: flvUrl,
});
flvPlayer.attachMediaElement(videoElement);
flvPlayer.load();
flvPlayer.play();
// 设定超时时间,例如10秒
timeoutId = setTimeout(() => {
console.error('No video data received. Closing connection.');
flvPlayer.destroy(); // 停止视频
// 显示错误信息或提示
displayErrorMessage(videoElement, "该视频源获取画面超时,请检查后刷新重试,默认两分钟后重连");
berror_state_list[element_id] = true;
}, 130000); // 130秒
// 错误处理
flvPlayer.on(flvjs.Events.ERROR, (errorType, errorDetail) => {
console.error(`FLV Error: ${errorType} - ${errorDetail}`);
if (reconnectAttempts < maxReconnectAttempts) {
reconnectAttempts++;
console.log("开始重连")
setTimeout(initFLVPlayer, 30000); // 尝试重连
} else {
displayErrorMessage(videoElement, "重连超时,请检查后重试,可联系技术支持!");
berror_state_list[element_id] = true;
}
});
// 监听播放事件,如果播放成功则清除超时计时器
flvPlayer.on(flvjs.Events.STATISTICS_INFO, () => {
clearTimeout(timeoutId);
timeoutId = null;
removeErrorMessage(videoElement);
berror_state_list[element_id] = false;
});
// 关闭视频流时销毁播放器
videoElement.addEventListener('closeVideo', () => {
if(flvPlayer){
flvPlayer.destroy();
videoElement.removeEventListener('closeVideo', onCloseVideo);
flvPlayer.off(flvjs.Events.ERROR);
flvPlayer.off(flvjs.Events.STATISTICS_INFO);
delete flvPlayer
}
});
} else {
console.error('FLV is not supported in this browser.');
}
}
initFLVPlayer();
}
// 主动关闭视频的函数
function closeFLVStream(id) {
const titleElement = document.querySelector(`[data-frame-id="${id}"] .video-title`);
if (titleElement.textContent === `Video Stream ${Number(id)+1}`) {
showModal('当前视频窗口未播放视频。');
return;
};
//发送视频链接接口
const url = '/api/close_stream';
const data = {"element_id":id};
// 发送 POST 请求
fetch(url, {
method: 'POST', // 指定请求方法为 POST
headers: {
'Content-Type': 'application/json' // 设置请求头,告诉服务器请求体的数据类型为 JSON
},
body: JSON.stringify(data) // 将 JavaScript 对象转换为 JSON 字符串
})
.then(response => response.json()) // 将响应解析为 JSON
.then(data => {
console.log('Success:', data);
const istatus = data.status;
if(istatus == 0){
showModal(data.msg); // 使用 Modal 显示消息
return;
}
else{
const videoFrame = document.getElementById(`video-${element_id}`);
const event = new Event('closeVideo');
videoFrame.dispatchEvent(event);
//videoFrame.style.display = 'none'; // 停止播放时隐藏 img 元素
titleElement.textContent = `Video Stream ${id+1}`;
removeErrorMessage(videoFrame);
berror_state_list[key] = false;
delete video_list[id];
}
})
.catch((error) => {
showModal(`Error: ${error.message}`); // 使用 Modal 显示错误信息
return;
});
}
function displayErrorMessage(imgElement, message) {
removeErrorMessage(imgElement)
imgElement.style.display = 'none'; // 隐藏图片

7
web/main/static/resources/scripts/bootstrap.bundle.min.js

File diff suppressed because one or more lines are too long

3
web/main/static/resources/scripts/channel_manager.js

@ -376,7 +376,7 @@ canvas.addEventListener('click', (event) => {
// 获取鼠标相对于canvas的位置
const x = (event.clientX - rect.left) * scaleX;
const y = (event.clientY - rect.top) * scaleY;
points.push({ x, y });
console.log(points);
//绘制线条
drawLines();
@ -458,6 +458,7 @@ function show_channel_model_schedule(cid){
else{//指定区域
document.getElementById('zdjc').checked = true;
m_polygon = c2m_data[0].polygon;
console.log("m_polygon--",m_polygon);
if(m_polygon !== ""){ //指定区域了,一般是会有数据的。
const coords = parseCoordStr(m_polygon);
points = coords;

4
web/main/static/resources/scripts/jquery-1.7.1.min.js

File diff suppressed because one or more lines are too long

14
web/main/static/resources/scripts/jquery-3.2.1.min.js

File diff suppressed because one or more lines are too long

1
web/main/templates/channel_manager.html

@ -172,7 +172,6 @@
<canvas id="backgroundCanvas" style="display: none;"></canvas>
<canvas id="myCanvas"></canvas>
</div>
</div>
<div class="col-6 ms-auto">
<!-- 配置算法 -->

170
web/main/templates/登录.html

@ -0,0 +1,170 @@
<!DOCTYPE html>
<html>
<head>
<title>登录</title>
<meta http-equiv="X-UA-Compatible" content="IE=edge"/>
<meta http-equiv="content-type" content="text/html; charset=utf-8"/>
<link href="resources/css/axure_rp_page.css" type="text/css" rel="stylesheet"/>
<link href="data/styles.css" type="text/css" rel="stylesheet"/>
<link href="files/登录/styles.css" type="text/css" rel="stylesheet"/>
<script src="resources/scripts/jquery-3.2.1.min.js"></script>
<script src="resources/scripts/axure/axQuery.js"></script>
<script src="resources/scripts/axure/globals.js"></script>
<script src="resources/scripts/axutils.js"></script>
<script src="resources/scripts/axure/annotation.js"></script>
<script src="resources/scripts/axure/axQuery.std.js"></script>
<script src="resources/scripts/axure/doc.js"></script>
<script src="resources/scripts/messagecenter.js"></script>
<script src="resources/scripts/axure/events.js"></script>
<script src="resources/scripts/axure/recording.js"></script>
<script src="resources/scripts/axure/action.js"></script>
<script src="resources/scripts/axure/expr.js"></script>
<script src="resources/scripts/axure/geometry.js"></script>
<script src="resources/scripts/axure/flyout.js"></script>
<script src="resources/scripts/axure/model.js"></script>
<script src="resources/scripts/axure/repeater.js"></script>
<script src="resources/scripts/axure/sto.js"></script>
<script src="resources/scripts/axure/utils.temp.js"></script>
<script src="resources/scripts/axure/variables.js"></script>
<script src="resources/scripts/axure/drag.js"></script>
<script src="resources/scripts/axure/move.js"></script>
<script src="resources/scripts/axure/visibility.js"></script>
<script src="resources/scripts/axure/style.js"></script>
<script src="resources/scripts/axure/adaptive.js"></script>
<script src="resources/scripts/axure/tree.js"></script>
<script src="resources/scripts/axure/init.temp.js"></script>
<script src="resources/scripts/axure/legacy.js"></script>
<script src="resources/scripts/axure/viewer.js"></script>
<script src="resources/scripts/axure/math.js"></script>
<script src="resources/scripts/axure/jquery.nicescroll.min.js"></script>
<script src="data/document.js"></script>
<script src="files/登录/data.js"></script>
<script type="text/javascript">
$axure.utils.getTransparentGifPath = function() { return 'resources/images/transparent.gif'; };
$axure.utils.getOtherPath = function() { return 'resources/Other.html'; };
$axure.utils.getReloadPath = function() { return 'resources/reload.html'; };
</script>
</head>
<body>
{% if error %}
<div style="color: red;">{{ error }}</div>
{% endif %}
<div id="base" class="">
<!-- Unnamed (Rectangle) -->
<div id="u0" class="ax_default flow_shape">
<div id="u0_div" class=""></div>
<div id="u0_text" class="text " style="display:none; visibility: hidden">
<p></p>
</div>
</div>
<!-- Unnamed (Rectangle) -->
<div id="u1" class="ax_default flow_shape">
<div id="u1_div" class=""></div>
<div id="u1_text" class="text " style="display:none; visibility: hidden">
<p></p>
</div>
</div>
<!-- Unnamed (Rectangle) -->
<div id="u2" class="ax_default heading_1">
<div id="u2_div" class=""></div>
<div id="u2_text" class="text ">
<p><span>智凡BOX</span></p>
</div>
</div>
<!-- Unnamed (Rectangle) -->
<div id="u3" class="ax_default heading_3">
<div id="u3_div" class=""></div>
<div id="u3_text" class="text ">
<p><span>@2024 ZFKJ All Rights Reserved </span></p>
</div>
</div>
<!-- Unnamed (Shape) -->
<div id="u4" class="ax_default icon">
<img id="u4_img" class="img " src="images/登录/u4.svg"/>
<div id="u4_text" class="text " style="display:none; visibility: hidden">
<p></p>
</div>
</div>
<!-- Unnamed (Rectangle) -->
<div id="u5" class="ax_default heading_2">
<div id="u5_div" class=""></div>
<div id="u5_text" class="text ">
<p><span>用户名:</span></p>
</div>
</div>
<!-- Unnamed (Text Field) -->
<div id="u6" class="ax_default text_field">
<div id="u6_div" class=""></div>
<input id="u6_input" type="text" value="" class="u6_input"/>
</div>
<!-- Unnamed (Rectangle) -->
<div id="u7" class="ax_default heading_2">
<div id="u7_div" class=""></div>
<div id="u7_text" class="text ">
<p><span>密码:</span></p>
</div>
</div>
<!-- Unnamed (Text Field) -->
<div id="u8" class="ax_default text_field">
<div id="u8_div" class=""></div>
<input id="u8_input" type="text" value="" class="u8_input"/>
</div>
<!-- Unnamed (Text Field) -->
<div id="u9" class="ax_default text_field">
<div id="u9_div" class=""></div>
<input id="u9_input" type="text" value="" class="u9_input"/>
</div>
<!-- Unnamed (Rectangle) -->
<div id="u10" class="ax_default heading_2">
<div id="u10_div" class=""></div>
<div id="u10_text" class="text ">
<p><span>验证码:</span></p>
</div>
</div>
<!-- Unnamed (Rectangle) -->
<div id="u11" class="ax_default primary_button">
<div id="u11_div" class=""></div>
<div id="u11_text" class="text ">
<p><span>登录</span></p>
</div>
</div>
<!-- Unnamed (Image) -->
<div id="u12" class="ax_default image">
<img id="u12_img" class="img " src="images/登录/u12.png"/>
<div id="u12_text" class="text " style="display:none; visibility: hidden">
<p></p>
</div>
</div>
<!-- Unnamed (Rectangle) -->
<div id="u13" class="ax_default button">
<div id="u13_div" class=""></div>
<div id="u13_text" class="text ">
<p><span>忘记密码</span></p>
</div>
</div>
<!-- Unnamed (Rectangle) -->
<div id="u14" class="ax_default label">
<div id="u14_div" class=""></div>
<div id="u14_text" class="text ">
<p><span>注:通过手机验证码修改密码。</span></p>
</div>
</div>
</div>
<script src="resources/scripts/axure/ios.js"></script>
</body>
</html>

BIN
zfbox.db

Binary file not shown.
Loading…
Cancel
Save