You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
138 lines
5.2 KiB
138 lines
5.2 KiB
import os
|
|
import subprocess
|
|
import platform
|
|
import zipfile
|
|
import importlib.util
|
|
import shutil
|
|
from myutils.ConfigManager import myCongif
|
|
from model.plugins.ModelBase import ModelBase
|
|
|
|
def allowed_file(filename):
|
|
return '.' in filename and filename.rsplit('.', 1)[1].lower() in myCongif.get_data('ALLOWED_EXTENSIONS')
|
|
|
|
#对上传的系统升级包进行检查 type:1--系统升级包,2--算法升级包
|
|
def check_file(filepath,filename_pre,type): #默认路径一般都是uploads/文件名
|
|
'''
|
|
检查上传文件的合法性,若符合要求则移动到正式路径下面
|
|
:param filepath: .zip文件路径
|
|
:param filename_pre: 去除掉.zip的纯文件名
|
|
:param type: 1--系统升级包, 2--算法升级包
|
|
:return: model_version model_name model_path(相对路径)
|
|
'''
|
|
model_name = None
|
|
model_version = None
|
|
model_path = None
|
|
system = platform.system()
|
|
|
|
#path = filepath.rsplit('.', 1)[0] #去掉后缀
|
|
path = myCongif.get_data("UPLOAD_FOLDER") # uploads
|
|
zip_path = filepath.rsplit('.', 1)[0] # uploads/filenamedir
|
|
filepath_py = zip_path + '/' + filename_pre + '.py' #这里就需要约束py文件名,就是zip压缩包的文件名
|
|
|
|
buzip = False
|
|
#解压缩
|
|
if system == "Windows":
|
|
try:
|
|
with zipfile.ZipFile(filepath, 'r') as zip_ref:
|
|
zip_ref.extractall(path)
|
|
buzip = True
|
|
except zipfile.BadZipfile:
|
|
print("文件格式错误,解压失败")
|
|
except Exception as e:
|
|
print(f"解压失败: {e}")
|
|
elif system == "Linux":
|
|
try:
|
|
subprocess.run(['unzip', '-o', filepath, '-d', path], check=True)
|
|
buzip = True
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"解压失败: {e.stderr}")
|
|
except Exception as e:
|
|
print(f"解压失败: {e}")
|
|
else:
|
|
raise NotImplementedError(f"Unsupported operating system: {system}")
|
|
#加载模型文件,获取模型名称和版本号
|
|
if buzip:
|
|
if type == 2: #模型升级包
|
|
print(filepath_py)
|
|
model = import_model("",filepath_py,0)
|
|
if model:
|
|
#把解压文件移动至正式路径
|
|
tag_path = myCongif.get_data("Model_Plugins") #model/plugins
|
|
ret = move_dir(zip_path,tag_path,filename_pre)
|
|
if ret:
|
|
model_name = model.name #算法名
|
|
model_version = model.version #算法版本
|
|
model_path = tag_path+'/'+ filename_pre +'/'+filename_pre +'.py' #py文件的路径,是相对路径
|
|
del model
|
|
elif type == 1: #系统升级包
|
|
pass
|
|
else:
|
|
pass #错误值
|
|
return model_version,model_name,model_path
|
|
|
|
def move_dir(source_path,tag_path,filename_pre,type=1):
|
|
'''
|
|
移动文件夹
|
|
:param source_path: 源文件夹 ***/***/filedir
|
|
:param tag_path: 目标路径 ***/***/ 不需要带filedir
|
|
:param filename_pre: 不带后缀的文件名
|
|
:param type: 0-不覆盖移动,目标路径存在filedir的话返回, 1-覆盖移动,删除后再移动
|
|
:return: False True
|
|
'''
|
|
bsuccess = False
|
|
#若根目录不在,则创建
|
|
if not os.path.exists(tag_path): #model/plugins
|
|
os.makedirs(tag_path)
|
|
#判断移动后目录是否存在
|
|
newpath = tag_path + '/' + filename_pre
|
|
if os.path.exists(newpath):
|
|
if type == 1:
|
|
shutil.rmtree(newpath)
|
|
else:
|
|
return bsuccess #这个返回失败
|
|
|
|
# 移动文件夹
|
|
try:
|
|
shutil.move(source_path, tag_path)
|
|
print(f"成功将文件夹移动到 {tag_path}")
|
|
bsuccess = True
|
|
except Exception as e:
|
|
print(f"移动文件夹失败: {e}")
|
|
return bsuccess
|
|
|
|
def import_model(model_name,model_path,threshold):
|
|
'''
|
|
根据路径,动态导入模块
|
|
:param model_name: 模块名称
|
|
:param model_path: 模块路径
|
|
:param threshold: 置信阈值
|
|
:return:
|
|
'''
|
|
if os.path.exists(model_path):
|
|
module_spec = importlib.util.spec_from_file_location(model_name, model_path)
|
|
if module_spec is None:
|
|
print(f"{model_path} 加载错误")
|
|
return None
|
|
module = importlib.util.module_from_spec(module_spec)
|
|
module_spec.loader.exec_module(module)
|
|
md = getattr(module, "Model")(model_path,threshold) #实例化类
|
|
if not isinstance(md, ModelBase):
|
|
print("{} not zf_model".format(md))
|
|
return None
|
|
else:
|
|
print("{}文件不存在".format(model_path))
|
|
return None
|
|
print(f"{model_path} 加载成功!!!!")
|
|
return md
|
|
|
|
def update_system(filepath): #系统升级
|
|
pass
|
|
|
|
def updata_model(filepath): #算法模型升级或新增
|
|
try:
|
|
# 假设我们解压并运行一个升级脚本
|
|
subprocess.run(['unzip', '-o', filepath, '-d', '/path/to/upgrade/directory'], check=True)
|
|
subprocess.run(['/path/to/upgrade/directory/upgrade_script.sh'], check=True)
|
|
return True, 'Upgrade completed successfully.'
|
|
except subprocess.CalledProcessError as e:
|
|
return False, str(e)
|