You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
321 lines
12 KiB
321 lines
12 KiB
import asyncio
|
|
from . import api
|
|
from quart import jsonify, request,websocket
|
|
from core.ModelManager import mMM
|
|
from core.DBManager import mDBM
|
|
from myutils.ConfigManager import myCongif
|
|
import logging
|
|
import time
|
|
import subprocess
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
import threading
|
|
from collections import deque
|
|
|
|
# 配置日志
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
#-------------------基于WEBRTC实现拉流
|
|
#pcs = set() #创建一个空的集合,去重复且无序
|
|
pcs = {}
|
|
active_tasks = {} # 用来存储每个channel的任务
|
|
executor = ThreadPoolExecutor(max_workers=4)
|
|
|
|
|
|
'''
|
|
---------------------传输--------------------------
|
|
'''
|
|
|
|
async def get_stats(peer_connection):
|
|
stats = await peer_connection.getStats()
|
|
for report in stats.values():
|
|
if report.type == 'outbound-rtp':
|
|
print(f"RTT: {report.roundTripTime} seconds")
|
|
print(f"Packets Sent: {report.packetsSent}")
|
|
print(f"Bytes Sent: {report.bytesSent}")
|
|
|
|
# @api.route('/offer', methods=['POST'])
|
|
# async def offer():
|
|
# #接收客户端的连接请求
|
|
# params = await request.json
|
|
# channel_id = params["cid"] #需要传参过来
|
|
# itype = params["mytype"]
|
|
# element_id = params["element_id"]
|
|
# reStatus = 0
|
|
# reMsg = ""
|
|
# ret = False
|
|
# if itype == 1:
|
|
# #添加通道和页面控件ID的关系
|
|
# strsql = f"select * from channel where element_id = {element_id}"
|
|
# data = mDBM.do_select(strsql,1)
|
|
# if data:
|
|
# reStatus = 0
|
|
# reMsg = '该控件已经关联某一视频通道,请确认数据是否正确!'
|
|
# return jsonify({'status': reStatus, 'msg': reMsg})
|
|
# strsql = f"update channel set element_id = {element_id} where ID = {channel_id};"
|
|
# ret = mDBM.do_sql(strsql)
|
|
# if ret == True:
|
|
# reStatus = 1
|
|
# reMsg = '播放画面成功,请稍等!'
|
|
# else:
|
|
# reStatus = 0
|
|
# reMsg = '播放画面失败,请联系技术支持!'
|
|
# #提取客户端发来的SDP,生成服务器端的SDP
|
|
# offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
|
|
# # # 配置 STUN 服务器
|
|
# #ice_servers = [{"urls": []}]# 禁用 STUN 服务器
|
|
# # pc = RTCPeerConnection(configuration={"iceServers": ice_servers})
|
|
# config = RTCConfiguration(iceServers=[])
|
|
# pc = RTCPeerConnection(configuration=config)
|
|
# # t = threading.Thread(target=get_stats,args=(pc,))
|
|
# # t.start()
|
|
#
|
|
# #pc = RTCPeerConnection() # 实例化一个rtcp对象
|
|
# pcs[channel_id] = pc # 集合中添加一个对象,若已存在则不添
|
|
#
|
|
# @pc.on("datachannel")
|
|
# def on_datachannel(channel):
|
|
# @channel.on("message")
|
|
# def on_message(message):
|
|
# print(f'Message received: {message}')
|
|
# # 处理接收到的数据
|
|
# if message == 'pause':
|
|
# # 执行暂停操作
|
|
# pass
|
|
# elif message == 'resume':
|
|
# # 执行继续操作
|
|
# pass
|
|
#
|
|
# #监听RTC连接状态
|
|
# @pc.on("iconnectionstatechange") #当ice连接状态发生变化时
|
|
# async def iconnectionstatechange():
|
|
# if pc.iceConnectionState == "failed":
|
|
# await pc.close()
|
|
# pcs.pop(channel_id, None) #移除对象
|
|
#
|
|
# # 添加视频轨道
|
|
# video_track = VideoTransformTrack(channel_id)
|
|
# pc.addTrack(video_track)
|
|
#
|
|
# # @pc.on('track') --- stream.getTracks().forEach(track => pc.addTrack(track, stream)); 猜测是这里触发的添加了摄像头通道
|
|
# # def on_track(track):
|
|
# # if track.kind == 'video':
|
|
# # local_video = VideoTransformTrack(track)
|
|
# # pc.addTrack(local_video)
|
|
#
|
|
# # 记录客户端 SDP
|
|
# await pc.setRemoteDescription(offer)
|
|
# # 生成本地 SDP
|
|
# answer = await pc.createAnswer()
|
|
# # 记录本地 SDP
|
|
# await pc.setLocalDescription(answer)
|
|
#
|
|
# print("返回sdp")
|
|
# return jsonify({
|
|
# "sdp": pc.localDescription.sdp,
|
|
# "type": pc.localDescription.type,
|
|
# "status":reStatus, #itype =1 的时候才有意义
|
|
# "msg":reMsg
|
|
# })
|
|
|
|
def stream_rtsp_to_flv(rtsp_url):
|
|
command = [
|
|
'ffmpeg',
|
|
'-i', rtsp_url, # 输入 RTSP 流
|
|
'-vcodec', 'libx264', # 视频编码器
|
|
'-f', 'flv', # 输出格式为 FLV
|
|
'pipe:1' # 输出到管道
|
|
]
|
|
|
|
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
return process
|
|
|
|
async def handle_channel_rtf(channel_id,websocket):
|
|
rtsp_url = "rtsp://192.168.3.103/live1"
|
|
process = stream_rtsp_to_flv(rtsp_url)
|
|
|
|
try:
|
|
while True:
|
|
data = process.stdout.read(1024) # 从管道读取 FLV 数据
|
|
if not data:
|
|
break
|
|
await websocket.send(data) # 通过 WebSocket 发送 FLV 数据
|
|
except asyncio.CancelledError:
|
|
process.terminate() # 如果 WebSocket 连接关闭,则终止 ffmpeg 进程
|
|
finally:
|
|
process.terminate()
|
|
|
|
def send_frame_thread(channel_id,websocket):
|
|
pass
|
|
|
|
async def handle_channel(channel_id,websocket):
|
|
verify_rate = int(myCongif.get_data("verify_rate"))
|
|
error_max_count = verify_rate * int(myCongif.get_data("video_error_count")) # 视频帧捕获失败触发提示的上限
|
|
sleep_time = int(myCongif.get_data("cap_sleep_time"))
|
|
frame_interval = 1.0 / verify_rate # 用于帧率控制
|
|
last_frame_time = time.time() # 初始化个读帧时间
|
|
icount = 0
|
|
|
|
#视频传输缓冲区
|
|
#frame_buffer = deque(maxlen=10)
|
|
try:
|
|
cnode = mMM.verify_list.get_channel_data(channel_id)
|
|
if cnode is None:
|
|
print("---channel_id--错误--")
|
|
return
|
|
|
|
frame_count = 0
|
|
start_time = time.time()
|
|
all_time = 0
|
|
get_all_time = 0
|
|
send_all_time = 0
|
|
while True: # 这里的多线程并发,还需要验证检查
|
|
# 帧率控制帧率 ---输出暂时不控制帧率,有就传输
|
|
current_time = time.time()
|
|
elapsed_time = current_time - last_frame_time
|
|
if elapsed_time < frame_interval:
|
|
await asyncio.sleep(frame_interval - elapsed_time) # 若小于间隔时间则休眠
|
|
last_frame_time = time.time()
|
|
# 执行视频传输
|
|
#frame = mMM.verify_list.cm_get_last_frame(channel_id) # get_last_frame 用了try
|
|
get_stime = time.time()
|
|
frame = cnode.get_last_frame()
|
|
get_etime = time.time()
|
|
get_all_time = get_all_time + (get_etime - get_stime)
|
|
if frame is not None:
|
|
# frame_buffer.append(frame) #先放入缓冲区
|
|
#7print("KB---",len(frame)/1024)
|
|
icount = 0
|
|
|
|
send_stime = time.time()
|
|
await websocket.send(b'frame:'+frame)
|
|
send_etime = time.time()
|
|
send_all_time = send_all_time + (send_etime - send_stime)
|
|
else:
|
|
# print("frame is None")
|
|
icount += 1
|
|
if icount > error_max_count:
|
|
print(f"通道-{channel_id},长时间未获取图像,休眠一段时间后再获取。")
|
|
#icount = 0
|
|
error_message = b'error:video_error'
|
|
await websocket.send(error_message)
|
|
await asyncio.sleep(sleep_time) # 等待视频重连时间
|
|
|
|
#----------输出时间-----------
|
|
frame_count += 1
|
|
end_time = time.time()
|
|
# 计算时间差
|
|
el_time = end_time - start_time
|
|
all_time = all_time + (end_time - current_time)
|
|
# 每隔一定时间(比如5秒)计算一次帧率
|
|
if el_time >= 10:
|
|
fps = frame_count / el_time
|
|
print(f"{channel_id}当前帧率: {fps} FPS,循环次数:{frame_count},花费总耗时:{all_time}S,get耗时:{get_all_time},send耗时:{send_all_time}")
|
|
# 重置计数器和时间
|
|
frame_count = 0
|
|
all_time = 0
|
|
get_all_time = 0
|
|
send_all_time = 0
|
|
start_time = time.time()
|
|
# print(f"get_frame:{round(get_etime-get_stime,5)}Sceond;"
|
|
# f"send_frame:{round(send_etime-send_stime,5)}Sceond;"
|
|
# f"All_time={round(end_time-current_time,5)}")
|
|
except asyncio.CancelledError:
|
|
print(f"WebSocket connection for channel {channel_id} closed by client")
|
|
raise
|
|
except Exception as e:
|
|
print(f"Unexpected error: {e}")
|
|
finally:
|
|
print(f"Cleaning up resources for channel {channel_id}")
|
|
|
|
|
|
@api.websocket('/ws/video_feed/<int:channel_id>')
|
|
async def ws_video_feed(channel_id):
|
|
print(f"New connection for channel: {channel_id}")
|
|
if channel_id in active_tasks:
|
|
active_tasks[channel_id].cancel()
|
|
try:
|
|
await active_tasks[channel_id] # 确保旧任务被完全取消
|
|
del active_tasks[channel_id]
|
|
except asyncio.CancelledError:
|
|
print(f"旧任务 {channel_id} 已取消")
|
|
|
|
try:
|
|
# 为每个通道创建独立的协程
|
|
shendtask = asyncio.create_task(handle_channel(channel_id, websocket))
|
|
#shendtask = asyncio.create_task(handle_channel_rtf(channel_id, websocket))
|
|
# 将任务存储到 active_tasks
|
|
active_tasks[channel_id] = shendtask
|
|
# 等待协程完成
|
|
await shendtask
|
|
except Exception as e:
|
|
print(f"Channel {channel_id} 出现异常: {e}")
|
|
finally:
|
|
# 移除已完成的任务
|
|
if channel_id in active_tasks:
|
|
del active_tasks[channel_id]
|
|
print(f"Cleaning up resources for channel {channel_id}")
|
|
|
|
@api.route('/shutdown', methods=['POST'])
|
|
async def shutdown():#这是全关 --需要修改
|
|
coros = [pc.close() for pc in pcs]
|
|
await asyncio.gather(*coros)
|
|
pcs.clear()
|
|
return 'Server shutting down...'
|
|
|
|
@api.route('/viewlist', methods=['GET'])
|
|
async def viewlist():#视频列表
|
|
count = request.args.get('count')
|
|
channel_list = []
|
|
element_list = []
|
|
name_list = []
|
|
if count:
|
|
strsql = (f"select t1.ID,t1.element_id,t1.channel_name,t2.area_name from channel t1 left join "
|
|
f"area t2 on t1.area_id =t2.id where element_id between 0 and {count};")
|
|
datas = mDBM.do_select(strsql)
|
|
for data in datas:
|
|
channel_list.append(data[0])
|
|
element_list.append(data[1])
|
|
name_list.append(f"{data[3]}--{data[2]}")
|
|
return jsonify({'clist': channel_list, 'elist': element_list,'nlist':name_list})
|
|
|
|
@api.route('/start_stream', methods=['POST'])
|
|
async def start_stream(): #开启视频通道,把视频通道编号和元素编号进行关联
|
|
json_data = await request.get_json()
|
|
channel_id = json_data.get('channel_id')
|
|
element_id = json_data.get('element_id')
|
|
reStatus = 0
|
|
reMsg = ""
|
|
strsql = f"select element_id from channel where ID = {channel_id};"
|
|
data = mDBM.do_select(strsql,1)
|
|
if data:
|
|
if not data[0] or data[0] == '':
|
|
strsql = f"update channel set element_id = '{element_id}' where ID={channel_id};"
|
|
ret = mDBM.do_sql(strsql)
|
|
if ret == True:
|
|
reStatus = 1
|
|
reMsg = '播放视频配置成功!'
|
|
else:
|
|
reMsg = '播放视频配置失败,请联系技术支持!'
|
|
else:
|
|
index = int(data[0]) +1
|
|
reMsg = f"该视频通道已在:Video Stream {index}播放,请先关闭"
|
|
return jsonify({'status': reStatus, 'msg': reMsg})
|
|
|
|
|
|
@api.route('/close_stream', methods=['POST'])
|
|
async def close_stream(): #关闭视频通道
|
|
json_data = await request.get_json()
|
|
element_id = json_data.get('element_id')
|
|
reStatus = 0
|
|
reMsg =""
|
|
#数据库中该通道的关联关系要清除
|
|
strsql = f"update channel set element_id = '' where element_id={element_id};"
|
|
ret = mDBM.do_sql(strsql)
|
|
if ret == True:
|
|
reStatus = 1
|
|
reMsg = '关闭画面成功!'
|
|
else:
|
|
reMsg = '删除通道与组件关联关系失败,请联系技术支持!'
|
|
# ?关闭视频播放--业务逻辑-待确认后端是否需要执行
|
|
|
|
return jsonify({'status': reStatus, 'msg': reMsg})
|
|
|