# 导入代码依赖 import cv2 # import os # from model.plugins.ModelBase import ModelBase # import importlib.util import threading import time import numpy as np from quart import Quart, render_template, websocket, jsonify from quart.helpers import Response # def test_acl_verify(): # '''根据路径,动态导入模块''' # model_path = "/mnt/zfbox/model/plugins/RYRQ_ACL/RYRQ_Model_ACL.py" # model_name = "测试" # if os.path.exists(model_path): # module_spec = importlib.util.spec_from_file_location(model_name, model_path) # if module_spec is None: # print(f"{model_path} 加载错误") # return None # module = importlib.util.module_from_spec(module_spec) # module_spec.loader.exec_module(module) # md = getattr(module, "Model")(model_path,0.5) # 实例化类 # if not isinstance(md, ModelBase) or not md.init_ok: # print("{} not zf_model".format(md)) # return None # else: # print("{}文件不存在".format(model_path)) # return None # print("开执行检测") # #return md # img = cv2.imread("/mnt/zfbox/model/plugins/RYRQ_ACL/world_cup.jpg", cv2.IMREAD_COLOR) # 读入图片 # data = [0,1,"[(100, 100), (50, 200), (100, 300), (300, 300), (300, 100)]"] # boxout,ret,strret = md.verify(img,data) # #print(boxout) # #释放相关资源 # md.release() # 加载YOLOv5模型 #model = yolov5.load('yolov5s') app = Quart(__name__) class VideoCaptureWithYOLO: def __init__(self, source): self.source = source self.cap = cv2.VideoCapture(self._get_gstreamer_pipeline(), cv2.CAP_GSTREAMER) self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) self.frame = None self.running = True self.read_lock = threading.Lock() self.thread = threading.Thread(target=self.update) self.thread.start() def _get_gstreamer_pipeline(self): return ( f"rtspsrc location={self.source} protocols=udp latency=0 ! " "rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! appsink" ) def update(self): while self.running: ret, frame = self.cap.read() if not ret: continue # YOLOv5分析 annotated_frame = frame with self.read_lock: self.frame = annotated_frame time.sleep(0.01) # 控制帧率 def read(self): with self.read_lock: frame = self.frame.copy() if self.frame is not None else None return frame def release(self): self.running = False self.thread.join() self.cap.release() # 创建多个视频流 streams = { 'stream1': VideoCaptureWithYOLO('rtsp://192.168.3.44/live1'), 'stream2': VideoCaptureWithYOLO('rtsp://192.168.3.44/live1'), 'stream3': VideoCaptureWithYOLO('rtsp://192.168.3.44/live1'), 'stream4': VideoCaptureWithYOLO('rtsp://192.168.3.44/live1'), 'stream5': VideoCaptureWithYOLO('rtsp://192.168.3.44/live1'), 'stream6': VideoCaptureWithYOLO('rtsp://192.168.3.44/live1'), 'stream7': VideoCaptureWithYOLO('rtsp://192.168.3.44/live1'), 'stream8': VideoCaptureWithYOLO('rtsp://192.168.3.44/live1'), 'stream9': VideoCaptureWithYOLO('rtsp://192.168.3.44/live1') } @app.route('/') async def index(): return await render_template('index.html', streams=streams.keys()) async def generate_frames(stream_id): video_capture = streams[stream_id] while True: frame = video_capture.read() if frame is not None: _, buffer = cv2.imencode('.jpg', frame) frame = buffer.tobytes() yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') @app.route('/video_feed/') async def video_feed(stream_id): return Response(generate_frames(stream_id), mimetype='multipart/x-mixed-replace; boundary=frame') if __name__ == '__main__': app.run(host='0.0.0.0', port=5000)