import cv2 import asyncio import time from . import api from quart import jsonify, request from aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrack,RTCConfiguration from core.ModelManager import mMM from core.DBManager import mDBM from myutils.ConfigManager import myCongif from fractions import Fraction import threading import logging # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) #-------------------基于WEBRTC实现拉流 #pcs = set() #创建一个空的集合,去重复且无序 pcs = {} ''' --------------基础信息--后续封装在函数里--------------- ''' # 打开摄像头 # def get_camera(): # cap = cv2.VideoCapture(0) # if not cap.isOpened(): # raise IOError("Cannot open webcam") # return cap # # cap = get_camera() # last_image = None # read_lock = threading.Lock() # def camera_thread(): # global cap, last_image # while True: # try: # ret, frame = cap.read() # if not ret: # logger.warning("Camera disconnected. Reconnecting...") # cap.release() # cap = get_camera() # continue # frame = cv2.resize(frame, (640, 480)) # 降低视频分辨率 # with read_lock: # last_image = frame # time.sleep(1.0/20) # except Exception as e: # logger.error(f"Error in camera thread: {e}") # continue #threading.Thread(target=camera_thread, daemon=True).start() ''' ---------------------传输-------------------------- ''' class VideoTransformTrack(VideoStreamTrack): kind = "video" def __init__(self,cid,itype=1): #0-usb,1-RTSP,2-海康SDK --itype已没有作用 super().__init__() self.channel_id = cid self.frame_rate = myCongif.get_data("frame_rate") self.time_base = Fraction(1, self.frame_rate) self.frame_count = 0 self.start_time = time.time() async def recv(self): new_frame = None time.sleep(1.0 / self.frame_rate) new_frame = mMM.verify_list[self.channel_id][4] # while True: # new_frame = mMM.verify_list[self.channel_id][4] # #new_frame = av.VideoFrame.from_ndarray(img, format="rgb24") # if new_frame is not None: # break # else: # time.sleep(1.0 / self.frame_rate) # 设置时间戳和时间基数 -- 根据耗时实时计算帧率 # elapsed_time = time.time() - self.start_time # self.frame_count += 1 # actual_frame_rate = self.frame_count / elapsed_time # self.time_base = Fraction(1, int(actual_frame_rate)) #设置时间戳和帧率 self.frame_count += 1 if self.frame_count > myCongif.get_data("RESET_INTERVAL"): self.frame_count = 0 new_frame.pts = self.frame_count new_frame.time_base = self.time_base print(f"{self.channel_id} -- Frame pts: {new_frame.pts}, time_base: {new_frame.time_base}") # 将检测结果图像转换为帧 # new_frame.pts = int(self.cap.get(cv2.CAP_PROP_POS_FRAMES)) # new_frame.time_base = self.time_base # end_time = time.time() # 结束时间 # print(f"Processing time: {end_time - start_time} seconds") #print("webRTC recv at:",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) return new_frame async def get_stats(peer_connection): stats = await peer_connection.getStats() for report in stats.values(): if report.type == 'outbound-rtp': print(f"RTT: {report.roundTripTime} seconds") print(f"Packets Sent: {report.packetsSent}") print(f"Bytes Sent: {report.bytesSent}") @api.route('/offer', methods=['POST']) async def offer(): #接收客户端的连接请求 params = await request.json channel_id = params["cid"] #需要传参过来 itype = params["mytype"] element_id = params["element_id"] reStatus = 0 reMsg = "" ret = False if itype == 1: #添加通道和页面控件ID的关系 strsql = f"select * from channel where element_id = {element_id}" data = mDBM.do_select(strsql,1) if data: reStatus = 0 reMsg = '该控件已经关联某一视频通道,请确认数据是否正确!' return jsonify({'status': reStatus, 'msg': reMsg}) strsql = f"update channel set element_id = {element_id} where ID = {channel_id};" ret = mDBM.do_sql(strsql) if ret == True: reStatus = 1 reMsg = '播放画面成功,请稍等!' else: reStatus = 0 reMsg = '播放画面失败,请联系技术支持!' #提取客户端发来的SDP,生成服务器端的SDP offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"]) # # 配置 STUN 服务器 #ice_servers = [{"urls": []}]# 禁用 STUN 服务器 # pc = RTCPeerConnection(configuration={"iceServers": ice_servers}) config = RTCConfiguration(iceServers=[]) pc = RTCPeerConnection(configuration=config) # t = threading.Thread(target=get_stats,args=(pc,)) # t.start() #pc = RTCPeerConnection() # 实例化一个rtcp对象 pcs[channel_id] = pc # 集合中添加一个对象,若已存在则不添 @pc.on("datachannel") def on_datachannel(channel): @channel.on("message") def on_message(message): print(f'Message received: {message}') # 处理接收到的数据 if message == 'pause': # 执行暂停操作 pass elif message == 'resume': # 执行继续操作 pass #监听RTC连接状态 @pc.on("iconnectionstatechange") #当ice连接状态发生变化时 async def iconnectionstatechange(): if pc.iceConnectionState == "failed": await pc.close() pcs.pop(channel_id, None) #移除对象 # 添加视频轨道 video_track = VideoTransformTrack(channel_id) pc.addTrack(video_track) # @pc.on('track') --- stream.getTracks().forEach(track => pc.addTrack(track, stream)); 猜测是这里触发的添加了摄像头通道 # def on_track(track): # if track.kind == 'video': # local_video = VideoTransformTrack(track) # pc.addTrack(local_video) # 记录客户端 SDP await pc.setRemoteDescription(offer) # 生成本地 SDP answer = await pc.createAnswer() # 记录本地 SDP await pc.setLocalDescription(answer) print("返回sdp") return jsonify({ "sdp": pc.localDescription.sdp, "type": pc.localDescription.type, "status":reStatus, #itype =1 的时候才有意义 "msg":reMsg }) @api.route('/shutdown', methods=['POST']) async def shutdown():#这是全关 coros = [pc.close() for pc in pcs] await asyncio.gather(*coros) pcs.clear() return 'Server shutting down...' @api.route('/close_stream', methods=['POST']) async def close_stream(): channel_id = (await request.form)['channel_id'] reStatus = 0 reMsg ="" if channel_id in pcs: await pcs[channel_id].close() pcs.pop(channel_id, None) print(f'Stream {channel_id} closed.') #数据库中该通道的关联关系要清除 strsql = f"update channel set element_id = NULL where ID={channel_id};" ret = mDBM.do_sql(strsql) if ret == True: reStatus = 1 reMsg = '关闭画面成功!' else: reStatus = 0 reMsg = '删除通道与组件关联关系失败,请联系技术支持!' else: reMsg = "通道编号不在系统内,请检查!" return jsonify({'status': reStatus, 'msg': reMsg})