import asyncio from . import api from quart import jsonify, request,websocket from core.ModelManager import mMM from core.DBManager import mDBM from myutils.ConfigManager import myCongif import logging import time import subprocess from collections import deque # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) #-------------------基于WEBRTC实现拉流 #pcs = set() #创建一个空的集合,去重复且无序 pcs = {} ''' ---------------------传输-------------------------- ''' async def get_stats(peer_connection): stats = await peer_connection.getStats() for report in stats.values(): if report.type == 'outbound-rtp': print(f"RTT: {report.roundTripTime} seconds") print(f"Packets Sent: {report.packetsSent}") print(f"Bytes Sent: {report.bytesSent}") # @api.route('/offer', methods=['POST']) # async def offer(): # #接收客户端的连接请求 # params = await request.json # channel_id = params["cid"] #需要传参过来 # itype = params["mytype"] # element_id = params["element_id"] # reStatus = 0 # reMsg = "" # ret = False # if itype == 1: # #添加通道和页面控件ID的关系 # strsql = f"select * from channel where element_id = {element_id}" # data = mDBM.do_select(strsql,1) # if data: # reStatus = 0 # reMsg = '该控件已经关联某一视频通道,请确认数据是否正确!' # return jsonify({'status': reStatus, 'msg': reMsg}) # strsql = f"update channel set element_id = {element_id} where ID = {channel_id};" # ret = mDBM.do_sql(strsql) # if ret == True: # reStatus = 1 # reMsg = '播放画面成功,请稍等!' # else: # reStatus = 0 # reMsg = '播放画面失败,请联系技术支持!' # #提取客户端发来的SDP,生成服务器端的SDP # offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"]) # # # 配置 STUN 服务器 # #ice_servers = [{"urls": []}]# 禁用 STUN 服务器 # # pc = RTCPeerConnection(configuration={"iceServers": ice_servers}) # config = RTCConfiguration(iceServers=[]) # pc = RTCPeerConnection(configuration=config) # # t = threading.Thread(target=get_stats,args=(pc,)) # # t.start() # # #pc = RTCPeerConnection() # 实例化一个rtcp对象 # pcs[channel_id] = pc # 集合中添加一个对象,若已存在则不添 # # @pc.on("datachannel") # def on_datachannel(channel): # @channel.on("message") # def on_message(message): # print(f'Message received: {message}') # # 处理接收到的数据 # if message == 'pause': # # 执行暂停操作 # pass # elif message == 'resume': # # 执行继续操作 # pass # # #监听RTC连接状态 # @pc.on("iconnectionstatechange") #当ice连接状态发生变化时 # async def iconnectionstatechange(): # if pc.iceConnectionState == "failed": # await pc.close() # pcs.pop(channel_id, None) #移除对象 # # # 添加视频轨道 # video_track = VideoTransformTrack(channel_id) # pc.addTrack(video_track) # # # @pc.on('track') --- stream.getTracks().forEach(track => pc.addTrack(track, stream)); 猜测是这里触发的添加了摄像头通道 # # def on_track(track): # # if track.kind == 'video': # # local_video = VideoTransformTrack(track) # # pc.addTrack(local_video) # # # 记录客户端 SDP # await pc.setRemoteDescription(offer) # # 生成本地 SDP # answer = await pc.createAnswer() # # 记录本地 SDP # await pc.setLocalDescription(answer) # # print("返回sdp") # return jsonify({ # "sdp": pc.localDescription.sdp, # "type": pc.localDescription.type, # "status":reStatus, #itype =1 的时候才有意义 # "msg":reMsg # }) def stream_rtsp_to_flv(rtsp_url): command = [ 'ffmpeg', '-i', rtsp_url, # 输入 RTSP 流 '-vcodec', 'libx264', # 视频编码器 '-f', 'flv', # 输出格式为 FLV 'pipe:1' # 输出到管道 ] process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return process async def handle_channel_rtf(channel_id,websocket): rtsp_url = "rtsp://192.168.3.103/live1" process = stream_rtsp_to_flv(rtsp_url) try: while True: data = process.stdout.read(1024) # 从管道读取 FLV 数据 if not data: break await websocket.send(data) # 通过 WebSocket 发送 FLV 数据 except asyncio.CancelledError: process.terminate() # 如果 WebSocket 连接关闭,则终止 ffmpeg 进程 finally: process.terminate() async def handle_channel(channel_id,websocket): verify_rate = int(myCongif.get_data("verify_rate")) error_max_count = verify_rate * int(myCongif.get_data("video_error_count")) # 视频帧捕获失败触发提示的上限 sleep_time = int(myCongif.get_data("cap_sleep_time")) frame_interval = 1.0 / verify_rate # 用于帧率控制 last_frame_time = time.time() # 初始化个读帧时间 icount = 0 #视频传输缓冲区 #frame_buffer = deque(maxlen=10) try: while True: # 这里的多线程并发,还需要验证检查 # 帧率控制帧率 ---输出暂时不控制帧率,有就传输 current_time = time.time() elapsed_time = current_time - last_frame_time if elapsed_time < frame_interval: await asyncio.sleep(frame_interval - elapsed_time) # 若小于间隔时间则休眠 last_frame_time = time.time() # 执行视频传输 frame = mMM.verify_list.cm_get_last_frame(channel_id) # get_last_frame 用了try if frame is not None: # frame_buffer.append(frame) #先放入缓冲区 icount = 0 await websocket.send(frame) else: # print("frame is None") icount += 1 if icount > error_max_count: print(f"通道-{channel_id},长时间未获取图像,休眠一段时间后再获取。") #icount = 0 error_message = b"video_error" await websocket.send(error_message) await asyncio.sleep(sleep_time) # 等待视频重连时间 except asyncio.CancelledError: print(f"WebSocket connection for channel {channel_id} closed by client") raise except Exception as e: print(f"Unexpected error: {e}") finally: print(f"Cleaning up resources for channel {channel_id}") @api.websocket('/ws/video_feed/') async def ws_video_feed(channel_id): print(f"New connection for channel: {channel_id}") # 为每个通道创建独立的协程 shendtask = asyncio.create_task(handle_channel(channel_id, websocket)) #shendtask = asyncio.create_task(handle_channel_rtf(channel_id, websocket)) # 等待协程完成 await shendtask @api.route('/shutdown', methods=['POST']) async def shutdown():#这是全关 --需要修改 coros = [pc.close() for pc in pcs] await asyncio.gather(*coros) pcs.clear() return 'Server shutting down...' @api.route('/viewlist', methods=['GET']) async def viewlist():#视频列表 count = request.args.get('count') channel_list = [] element_list = [] name_list = [] if count: strsql = (f"select t1.ID,t1.element_id,t1.channel_name,t2.area_name from channel t1 left join " f"area t2 on t1.area_id =t2.id where element_id between 0 and {count};") datas = mDBM.do_select(strsql) for data in datas: channel_list.append(data[0]) element_list.append(data[1]) name_list.append(f"{data[3]}--{data[2]}") return jsonify({'clist': channel_list, 'elist': element_list,'nlist':name_list}) @api.route('/start_stream', methods=['POST']) async def start_stream(): #开启视频通道,把视频通道编号和元素编号进行关联 json_data = await request.get_json() channel_id = json_data.get('channel_id') element_id = json_data.get('element_id') reStatus = 0 reMsg = "" strsql = f"select element_id from channel where ID = {channel_id};" data = mDBM.do_select(strsql,1) if data: if not data[0] or data[0] == '': strsql = f"update channel set element_id = '{element_id}' where ID={channel_id};" ret = mDBM.do_sql(strsql) if ret == True: reStatus = 1 reMsg = '播放视频配置成功!' else: reMsg = '播放视频配置失败,请联系技术支持!' else: index = int(data[0]) +1 reMsg = f"该视频通道已在:Video Stream {index}播放,请先关闭" return jsonify({'status': reStatus, 'msg': reMsg}) @api.route('/close_stream', methods=['POST']) async def close_stream(): #关闭视频通道 json_data = await request.get_json() element_id = json_data.get('element_id') reStatus = 0 reMsg ="" #数据库中该通道的关联关系要清除 strsql = f"update channel set element_id = '' where element_id={element_id};" ret = mDBM.do_sql(strsql) if ret == True: reStatus = 1 reMsg = '关闭画面成功!' else: reMsg = '删除通道与组件关联关系失败,请联系技术支持!' # ?关闭视频播放--业务逻辑-待确认后端是否需要执行 return jsonify({'status': reStatus, 'msg': reMsg})