import os import sqlite3 import cv2 import asyncio import av from aiohttp import web from quart import Quart, render_template, request, jsonify,send_from_directory from aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrack, RTCConfiguration, RTCIceServer from fractions import Fraction from .main import main as main_blueprint #app.config['SECRET_KEY'] = 'mysecret' #密钥 --需要放配置文件 #socketio = SocketIO(app) ''' ************************ 功能函数 ''' rtsp_url = 'rtsp://127.0.0.1/live1' #-------------------基于WEBRTC实现拉流 pcs = set() #创建一个空的集合,去重复且无序 def create_app(): app = Quart(__name__) # 注册蓝图 app.register_blueprint(main_blueprint) return app class VideoTransformTrack(VideoStreamTrack): kind = "video" def __init__(self): super().__init__() self.cap = cv2.VideoCapture('rtsp://127.0.0.1/live1') if not self.cap.isOpened(): raise Exception("Unable to open video source") # Set desired resolution and frame rate # self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) # Set width to 1280 # self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) # Set height to 720 # self.cap.set(cv2.CAP_PROP_FPS, 30) # Set frame rate to 30 FPS self.frame_rate = int(self.cap.get(cv2.CAP_PROP_FPS)) or 30 # Default to 30 if unable to get FPS self.time_base = Fraction(1, self.frame_rate) async def recv(self): ret, frame = self.cap.read() if not ret: raise Exception("Unable to read frame") # Convert frame to RGB frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Convert to VideoFrame video_frame = av.VideoFrame.from_ndarray(frame_rgb, format="rgb24") # Set timestamp video_frame.pts = int(self.cap.get(cv2.CAP_PROP_POS_FRAMES)) video_frame.time_base = self.time_base return video_frame ''' ************************ 页面路由 ''' @app.route('/') async def index(): #return await render_template('登录.html') return await render_template('登录.html') @app.route('/favicon.ico') async def favicon(): return await send_from_directory('static', 'favicon.ico') @app.route('/') async def get_html(html): return await render_template(html) #--------------------webrtc----------------- async def server(pc, offer): # 监听 RTC 连接状态 @pc.on("connectionstatechange") async def on_connectionstatechange(): print("Connection state is %s" % pc.connectionState) # 当 RTC 连接中断后将连接关闭 if pc.connectionState == "failed": await pc.close() pcs.discard(pc) # 监听客户端发来的视频流 @pc.on("track") def on_track(track): print("======= received track: ", track) if track.kind == "video": # # 对视频流进行人脸替换 # t = FaceSwapper(track) # 绑定替换后的视频流 pc.addTrack(track) # 记录客户端 SDP await pc.setRemoteDescription(offer) # 生成本地 SDP answer = await pc.createAnswer() # 记录本地 SDP await pc.setLocalDescription(answer) @app.route('/offer', methods=['POST']) async def offer(): #接收客户端的连接请求 params = await request.json #params = await request.json() print(params) #提取客户端发来的SDP,生成服务器端的SDP offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"]) # Configure ICE servers if necessary #config = RTCConfiguration([RTCIceServer(urls=['stun:stun.voiparound.com'])]) pc = RTCPeerConnection() #实例化一个rtcp对象 pcs.add(pc) #集合中添加一个对象,若已存在则不添加 print(pc) @pc.on("datachannel") def on_datachannel(channel): @channel.on("message") def on_message(message): if isinstance(message, str) and message.startswith("ping"): channel.send("pong" + message[4:]) #监听RTC连接状态 @pc.on("iconnectionstatechange") #当ice连接状态发生变化时 async def iconnectionstatechange(): if pc.iceConnectionState == "failed": await pc.close() pcs.discard(pc) #移除对象 # 添加视频轨道 video_track = VideoTransformTrack() pc.addTrack(video_track) print("完成视频轨道的添加") # 记录客户端 SDP await pc.setRemoteDescription(offer) # 生成本地 SDP answer = await pc.createAnswer() # 记录本地 SDP await pc.setLocalDescription(answer) return jsonify({ "sdp": pc.localDescription.sdp, "type": pc.localDescription.type }) @app.route('/shutdown', methods=['POST']) async def shutdown(): coros = [pc.close() for pc in pcs] await asyncio.gather(*coros) pcs.clear() return 'Server shutting down...' '''3333 各种配置文件路由 ''' @app.route('/data/') async def data(file): return await send_from_directory('static/data',file) @app.route('/files//') async def files(subdir,file): return await send_from_directory(os.path.join('static/files', subdir),file) @app.route('/images//') async def images(subdir,file): return await send_from_directory(os.path.join('static/images', subdir),file) @app.route('/resources/') async def resources(file): return await send_from_directory('static/resources',file) @app.route('/resources//') async def resources_dir(subdir,file): return await send_from_directory(os.path.join('static/resources', subdir),file) @app.route('/resources/css//') async def resources_css_dir(subdir,file): return await send_from_directory(os.path.join('static/resources/css', subdir),file) @app.route('/resources/scripts//') async def resources_scripts_dir(subdir,file): return await send_from_directory(os.path.join('static/resources/scripts', subdir),file) ''' 后端数据接口 ''' @app.route('/submit', methods=['POST']) def submit(): if request.method == 'POST': suggestion = request.form['suggestion'] file = request.files['attachment'] if file: filename = file.filename file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename)) # 将意见和附件的文件名保存到数据库 conn = sqlite3.connect(DB_NAME) cursor = conn.cursor() cursor.execute('INSERT INTO suggestions (suggestion, attachment_filename) VALUES (?, ?)', (suggestion, filename)) conn.commit() conn.close() return "Thanks for your suggestion and attachment! Data saved to the database." # 如果没有附件上传的逻辑处理 return "Data saved!" if __name__ == '__main__': #app.run(debug=True) app.run(debug=True, host='0.0.0.0', port=5000)