138 lines
5.2 KiB

import os
import subprocess
import platform
import zipfile
import importlib.util
import shutil
from myutils.ConfigManager import myCongif
from model.plugins.ModelBase import ModelBase
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in myCongif.get_data('ALLOWED_EXTENSIONS')
#对上传的系统升级包进行检查 type:1--系统升级包,2--算法升级包
def check_file(filepath,filename_pre,type): #默认路径一般都是uploads/文件名
'''
检查上传文件的合法性,若符合要求则移动到正式路径下面
:param filepath: .zip文件路径
:param filename_pre: 去除掉.zip的纯文件名
:param type: 1--系统升级包, 2--算法升级包
:return: model_version model_name model_path(相对路径)
'''
model_name = None
model_version = None
model_path = None
system = platform.system()
#path = filepath.rsplit('.', 1)[0] #去掉后缀
path = myCongif.get_data("UPLOAD_FOLDER") # uploads
zip_path = filepath.rsplit('.', 1)[0] # uploads/filenamedir
filepath_py = zip_path + '/' + filename_pre + '.py' #这里就需要约束py文件名,就是zip压缩包的文件名
buzip = False
#解压缩
if system == "Windows":
try:
with zipfile.ZipFile(filepath, 'r') as zip_ref:
zip_ref.extractall(path)
buzip = True
except zipfile.BadZipfile:
print("文件格式错误,解压失败")
except Exception as e:
print(f"解压失败: {e}")
elif system == "Linux":
try:
subprocess.run(['unzip', '-o', filepath, '-d', path], check=True)
buzip = True
except subprocess.CalledProcessError as e:
print(f"解压失败: {e.stderr}")
except Exception as e:
print(f"解压失败: {e}")
else:
raise NotImplementedError(f"Unsupported operating system: {system}")
#加载模型文件,获取模型名称和版本号
if buzip:
if type == 2: #模型升级包
print(filepath_py)
model = import_model("",filepath_py,0)
if model:
#把解压文件移动至正式路径
tag_path = myCongif.get_data("Model_Plugins") #model/plugins
ret = move_dir(zip_path,tag_path,filename_pre)
if ret:
model_name = model.name #算法名
model_version = model.version #算法版本
model_path = tag_path+'/'+ filename_pre +'/'+filename_pre +'.py' #py文件的路径,是相对路径
del model
elif type == 1: #系统升级包
pass
else:
pass #错误值
return model_version,model_name,model_path
def move_dir(source_path,tag_path,filename_pre,type=1):
'''
移动文件夹
:param source_path: 源文件夹 ***/***/filedir
:param tag_path: 目标路径 ***/***/ 不需要带filedir
:param filename_pre: 不带后缀的文件名
:param type: 0-不覆盖移动,目标路径存在filedir的话返回, 1-覆盖移动,删除后再移动
:return: False True
'''
bsuccess = False
#若根目录不在,则创建
if not os.path.exists(tag_path): #model/plugins
os.makedirs(tag_path)
#判断移动后目录是否存在
newpath = tag_path + '/' + filename_pre
if os.path.exists(newpath):
if type == 1:
shutil.rmtree(newpath)
else:
return bsuccess #这个返回失败
# 移动文件夹
try:
shutil.move(source_path, tag_path)
print(f"成功将文件夹移动到 {tag_path}")
bsuccess = True
except Exception as e:
print(f"移动文件夹失败: {e}")
return bsuccess
def import_model(model_name,model_path,threshold):
'''
根据路径,动态导入模块
:param model_name: 模块名称
:param model_path: 模块路径
:param threshold: 置信阈值
:return:
'''
if os.path.exists(model_path):
module_spec = importlib.util.spec_from_file_location(model_name, model_path)
if module_spec is None:
print(f"{model_path} 加载错误")
return None
module = importlib.util.module_from_spec(module_spec)
module_spec.loader.exec_module(module)
md = getattr(module, "Model")(model_path,threshold) #实例化类
if not isinstance(md, ModelBase):
print("{} not zf_model".format(md))
return None
else:
print("{}文件不存在".format(model_path))
return None
print(f"{model_path} 加载成功!!!!")
return md
def update_system(filepath): #系统升级
pass
def updata_model(filepath): #算法模型升级或新增
try:
# 假设我们解压并运行一个升级脚本
subprocess.run(['unzip', '-o', filepath, '-d', '/path/to/upgrade/directory'], check=True)
subprocess.run(['/path/to/upgrade/directory/upgrade_script.sh'], check=True)
return True, 'Upgrade completed successfully.'
except subprocess.CalledProcessError as e:
return False, str(e)