import os import subprocess import platform import zipfile import importlib.util import shutil from myutils.ConfigManager import myCongif from model.plugins.ModelBase import ModelBase def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in myCongif.get_data('ALLOWED_EXTENSIONS') #对上传的系统升级包进行检查 type:1--系统升级包,2--算法升级包 def check_file(filepath,filename_pre,type): #默认路径一般都是uploads/文件名 ''' 检查上传文件的合法性,若符合要求则移动到正式路径下面 :param filepath: .zip文件路径 :param filename_pre: 去除掉.zip的纯文件名 :param type: 1--系统升级包, 2--算法升级包 :return: model_version model_name model_path(相对路径) ''' model_name = None model_version = None model_path = None system = platform.system() #path = filepath.rsplit('.', 1)[0] #去掉后缀 path = myCongif.get_data("UPLOAD_FOLDER") # uploads zip_path = filepath.rsplit('.', 1)[0] # uploads/filenamedir filepath_py = zip_path + '/' + filename_pre + '.py' #这里就需要约束py文件名,就是zip压缩包的文件名 buzip = False #解压缩 if system == "Windows": try: with zipfile.ZipFile(filepath, 'r') as zip_ref: zip_ref.extractall(path) buzip = True except zipfile.BadZipfile: print("文件格式错误,解压失败") except Exception as e: print(f"解压失败: {e}") elif system == "Linux": try: subprocess.run(['unzip', '-o', filepath, '-d', path], check=True) buzip = True except subprocess.CalledProcessError as e: print(f"解压失败: {e.stderr}") except Exception as e: print(f"解压失败: {e}") else: raise NotImplementedError(f"Unsupported operating system: {system}") #加载模型文件,获取模型名称和版本号 if buzip: if type == 2: #模型升级包 print(filepath_py) model = import_model("",filepath_py,0) if model: #把解压文件移动至正式路径 tag_path = myCongif.get_data("Model_Plugins") #model/plugins ret = move_dir(zip_path,tag_path,filename_pre) if ret: model_name = model.name #算法名 model_version = model.version #算法版本 model_path = tag_path+'/'+ filename_pre +'/'+filename_pre +'.py' #py文件的路径,是相对路径 del model elif type == 1: #系统升级包 pass else: pass #错误值 return model_version,model_name,model_path def move_dir(source_path,tag_path,filename_pre,type=1): ''' 移动文件夹 :param source_path: 源文件夹 ***/***/filedir :param tag_path: 目标路径 ***/***/ 不需要带filedir :param filename_pre: 不带后缀的文件名 :param type: 0-不覆盖移动,目标路径存在filedir的话返回, 1-覆盖移动,删除后再移动 :return: False True ''' bsuccess = False #若根目录不在,则创建 if not os.path.exists(tag_path): #model/plugins os.makedirs(tag_path) #判断移动后目录是否存在 newpath = tag_path + '/' + filename_pre if os.path.exists(newpath): if type == 1: shutil.rmtree(newpath) else: return bsuccess #这个返回失败 # 移动文件夹 try: shutil.move(source_path, tag_path) print(f"成功将文件夹移动到 {tag_path}") bsuccess = True except Exception as e: print(f"移动文件夹失败: {e}") return bsuccess def import_model(model_name,model_path,threshold): ''' 根据路径,动态导入模块 :param model_name: 模块名称 :param model_path: 模块路径 :param threshold: 置信阈值 :return: ''' if os.path.exists(model_path): module_spec = importlib.util.spec_from_file_location(model_name, model_path) if module_spec is None: print(f"{model_path} 加载错误") return None module = importlib.util.module_from_spec(module_spec) module_spec.loader.exec_module(module) md = getattr(module, "Model")(model_path,threshold) #实例化类 if not isinstance(md, ModelBase): print("{} not zf_model".format(md)) return None else: print("{}文件不存在".format(model_path)) return None print(f"{model_path} 加载成功!!!!") return md def update_system(filepath): #系统升级 pass def updata_model(filepath): #算法模型升级或新增 try: # 假设我们解压并运行一个升级脚本 subprocess.run(['unzip', '-o', filepath, '-d', '/path/to/upgrade/directory'], check=True) subprocess.run(['/path/to/upgrade/directory/upgrade_script.sh'], check=True) return True, 'Upgrade completed successfully.' except subprocess.CalledProcessError as e: return False, str(e)