|
|
|
import threading
|
|
|
|
from collections import deque
|
|
|
|
import numpy as np
|
|
|
|
import time
|
|
|
|
import copy
|
|
|
|
import queue
|
|
|
|
import cv2
|
|
|
|
import asyncio
|
|
|
|
|
|
|
|
class ChannelData:
|
|
|
|
def __init__(self, str_url, int_type, bool_run, deque_length,icount_max):
|
|
|
|
self.cap = None #该通道视频采集对象
|
|
|
|
self.work_th = None #该通道工作线程句柄
|
|
|
|
self.b_model = False #是否有运行模型线程
|
|
|
|
self.bool_run = bool_run # 线程运行标识
|
|
|
|
|
|
|
|
self.str_url = str_url #视频源地址
|
|
|
|
self.int_type = int_type #视频源类型,0-usb,1-rtsp,2-hksdk
|
|
|
|
self.icount_max = icount_max # 帧序列号上限
|
|
|
|
self.lock = threading.RLock() # 用于保证线程安全
|
|
|
|
|
|
|
|
self.deque_frame = deque(maxlen=deque_length) #视频缓冲区用于保存录像
|
|
|
|
self.last_frame = None # 保存图片数据
|
|
|
|
self.frame_queue = queue.Queue(maxsize=1)
|
|
|
|
self.counter = 0 #帧序列号--保存报警录像使用
|
|
|
|
|
|
|
|
#添加一帧图片
|
|
|
|
def add_deque(self, value):
|
|
|
|
self.deque_frame.append(value) #deque 满了以后会把前面的数据移除
|
|
|
|
|
|
|
|
#拷贝一份数据
|
|
|
|
def copy_deque(self):
|
|
|
|
return copy.deepcopy(self.deque_frame)
|
|
|
|
|
|
|
|
#获取最后一帧图片
|
|
|
|
def get_last_frame(self):
|
|
|
|
if self.b_model:
|
|
|
|
# with self.lock:
|
|
|
|
# frame = self.last_frame
|
|
|
|
# return frame
|
|
|
|
|
|
|
|
#if not self.frame_queue.empty():
|
|
|
|
try:
|
|
|
|
frame = self.frame_queue.get(timeout=0.3) #web传输没有做帧率控制了,可以超时时间长一点
|
|
|
|
except queue.Empty:
|
|
|
|
print("channel--frame None")
|
|
|
|
return None
|
|
|
|
# else:
|
|
|
|
# return None
|
|
|
|
return frame
|
|
|
|
else: #如果没有运行,直接从cap获取画面
|
|
|
|
if self.cap:
|
|
|
|
ret, frame = self.cap.read() # 除了第一帧,其它应该都是有画面的
|
|
|
|
if not ret:
|
|
|
|
print("channel--frame None")
|
|
|
|
return None
|
|
|
|
ret, frame_bgr_webp = cv2.imencode('.jpg', frame)
|
|
|
|
if not ret:
|
|
|
|
buffer_bgr_webp = None
|
|
|
|
else:
|
|
|
|
buffer_bgr_webp = frame_bgr_webp.tobytes()
|
|
|
|
return buffer_bgr_webp
|
|
|
|
return None
|
|
|
|
|
|
|
|
def update_last_frame(self,buffer):
|
|
|
|
if buffer:
|
|
|
|
# with self.lock:
|
|
|
|
# self.last_frame = None
|
|
|
|
# self.last_frame = buffer
|
|
|
|
|
|
|
|
# if self.frame_queue.full():
|
|
|
|
# try:
|
|
|
|
# print("channel--丢帧")
|
|
|
|
# self.frame_queue.get(timeout=0.01)
|
|
|
|
# except queue.Empty: #为空不处理
|
|
|
|
# pass
|
|
|
|
# self.frame_queue.put(buffer)
|
|
|
|
|
|
|
|
try:
|
|
|
|
self.frame_queue.put(buffer,timeout=0.05)
|
|
|
|
except queue.Full:
|
|
|
|
#print("channel--未插入")
|
|
|
|
pass
|
|
|
|
|
|
|
|
#帧序列号自增 一个线程中处理,不用加锁
|
|
|
|
def increment_counter(self):
|
|
|
|
self.counter += 1
|
|
|
|
if self.counter > self.icount_max:
|
|
|
|
self.counter = 0
|
|
|
|
|
|
|
|
def get_counter(self):
|
|
|
|
return self.counter
|
|
|
|
|
|
|
|
#清空数据,停止工作线程(若有 ,并删除deque 和 last_frame)
|
|
|
|
def clear(self):
|
|
|
|
start_time = time.time()
|
|
|
|
with self.lock:
|
|
|
|
if self.b_model: #b_model为true,说明开启了工作线程
|
|
|
|
self.bool_run = False
|
|
|
|
self.work_th.join() #等待通道对应的子线程,停止工作。
|
|
|
|
#time.sleep(1)
|
|
|
|
self.deque_frame.clear()
|
|
|
|
self.last_frame = None #二选一
|
|
|
|
self.frame_queue = queue.Queue(maxsize=1) #二选一
|
|
|
|
self.counter = 0
|
|
|
|
|
|
|
|
end_time = time.time()
|
|
|
|
execution_time = end_time - start_time
|
|
|
|
print(f"停止一个通道线程,花费了: {execution_time} seconds")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ChannelManager:
|
|
|
|
def __init__(self):
|
|
|
|
self.channels = {}
|
|
|
|
self.cm_lock = threading.RLock() # 用于保证字典操作的线程安全
|
|
|
|
|
|
|
|
#增加节点
|
|
|
|
def add_channel(self, channel_id, str_url, int_type, bool_run, deque_length=10,icount_max=100000):
|
|
|
|
with self.cm_lock:
|
|
|
|
if channel_id in self.channels: #若已经有数据,先删除后再增加
|
|
|
|
self.channels[channel_id].clear() # 手动清理资源
|
|
|
|
del self.channels[channel_id]
|
|
|
|
ch_data = ChannelData(str_url, int_type, bool_run, deque_length, icount_max)
|
|
|
|
self.channels[channel_id] = ch_data
|
|
|
|
return ch_data
|
|
|
|
|
|
|
|
#删除节点
|
|
|
|
def delete_channel(self, channel_id): #需要验证资源的释放清空
|
|
|
|
with self.cm_lock:
|
|
|
|
if channel_id in self.channels:
|
|
|
|
self.channels[channel_id].clear() # 手动清理资源
|
|
|
|
self.channels[channel_id].cap.release()
|
|
|
|
del self.channels[channel_id]
|
|
|
|
|
|
|
|
#获取节点
|
|
|
|
def get_channel(self, channel_id):
|
|
|
|
with self.cm_lock:
|
|
|
|
return self.channels.get(channel_id)
|
|
|
|
|
|
|
|
#停止工作线程---要把视频采集线程停止掉
|
|
|
|
def stop_channel(self,channel_id):
|
|
|
|
with self.cm_lock:
|
|
|
|
if channel_id == 0:
|
|
|
|
# for clannel_id,clannel_data in self.channels.items():
|
|
|
|
# clannel_data.clear()
|
|
|
|
for clannel_data in self.channels:
|
|
|
|
clannel_data.clear() #停止工作线程,并清空业务数据
|
|
|
|
clannel_data.cap.release() #停止视频采集线程,并是否采集资源
|
|
|
|
|
|
|
|
self.channels.clear() #清空整个字典
|
|
|
|
else:
|
|
|
|
if channel_id in self.channels:
|
|
|
|
self.channels[channel_id].clear() # 手动清理资源
|
|
|
|
self.channels[channel_id].cap.release()
|
|
|
|
del self.channels[channel_id]
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
# 示例使用
|
|
|
|
manager = ChannelManager()
|
|
|
|
manager.add_channel('channel_1', 'test', 123, True, deque_length=5)
|
|
|
|
|
|
|
|
# 更新和读取操作
|
|
|
|
manager.update_channel_deque('channel_1', 'data1')
|
|
|
|
manager.update_channel_buffer('channel_1', np.array([[1, 2], [3, 4]]))
|
|
|
|
manager.increment_channel_counter('channel_1')
|
|
|
|
|
|
|
|
channel_data = manager.get_channel_data('channel_1')
|
|
|
|
print(channel_data)
|