You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

228 lines
10 KiB

10 months ago
from abc import abstractmethod,ABC
from shapely.geometry import Point, Polygon
from myutils.ConfigManager import myCongif
from myutils.MyLogger_logger import LogHandler
10 months ago
import numpy as np
import cv2
import ast
if myCongif.get_data("model_platform") == "acl":
import acl
import platform
#-----acl相关------
SUCCESS = 0 # 成功状态值
FAILED = 1 # 失败状态值
ACL_MEM_MALLOC_NORMAL_ONLY = 2 # 申请内存策略, 仅申请普通页
11 months ago
10 months ago
class ModelBase(ABC):
def __init__(self,path):
'''
模型类实例化
:param path: 模型文件本身的路径
:param threshold: 模型的置信阈值
'''
self.mylogger = LogHandler().get_logger("ModelManager")
10 months ago
self.name = None #基于name来查询,用户对模型的配置参数,代表着模型名称需要唯一 2024-6-18 -逻辑还需要完善和验证
self.version = None
self.model_type = None # 模型类型 1-图像分类,2-目标检测(yolov5),3-分割模型,4-关键点
self.system = myCongif.get_data("model_platform") #platform.system() #获取系统平台
10 months ago
self.do_map = { # 定义插件的入口函数 --
# POCType.POC: self.do_verify,
# POCType.SNIFFER: self.do_sniffer,
# POCType.BRUTE: self.do_brute
}
self.model_path = path # 模型路径
self.init_ok = False
11 months ago
def __del__(self):
10 months ago
print("资源释放")
def draw_polygon(self, img, polygon_points,color=(0, 255, 0)):
self.polygon = Polygon(ast.literal_eval(polygon_points))
10 months ago
points = np.array([self.polygon.exterior.coords], dtype=np.int32)
cv2.polylines(img, points, isClosed=True, color=color, thickness=2)
def is_point_in_region(self, point):
'''判断点是否在区域内,需要先执行draw_polygon'''
if self.polygon:
return self.polygon.contains(Point(point))
else:
return False
#acl ----- 相关-----
def _init_acl(self):
device_id = 0
self.context, ret = acl.rt.create_context(device_id) # 显式创建一个Context
if ret:
raise RuntimeError(ret)
print('Init TH-Context Successfully')
def _del_acl(self):
device_id = 0
# 线程释放context
ret = acl.rt.destroy_context(self.context) # 释放 Context
if ret:
raise RuntimeError(ret)
print('Deinit TH-Context Successfully')
print('ACL finalize Successfully')
def _init_resource(self):
#self._init_acl() #测试使用
''' 初始化模型、输出相关资源。相关数据类型: aclmdlDesc aclDataBuffer aclmdlDataset'''
print("Init model resource")
# 加载模型文件
#self.model_path = "/home/HwHiAiUser/samples/yolo_acl_sample/yolov5s_bs1.om"
self.model_id, ret = acl.mdl.load_from_file(self.model_path) # 加载模型
if ret != 0:
print(f"{self.model_path}---模型加载失败!")
return False
self.model_desc = acl.mdl.create_desc() # 初始化模型信息对象
ret = acl.mdl.get_desc(self.model_desc, self.model_id) # 根据模型ID获取该模型的aclmdlDesc类型数据(描述信息)
print("[Model] Model init resource stage success")
# 创建模型输出 dataset 结构
ret = self._gen_output_dataset() # 创建模型输出dataset结构
if ret !=0:
print("[Model] create model output dataset fail")
return False
return True
def _gen_output_dataset(self):
''' 组织输出数据的dataset结构 '''
ret = SUCCESS
self._output_num = acl.mdl.get_num_outputs(self.model_desc) # 根据aclmdlDesc类型的数据,获取模型的输出个数
self.output_dataset = acl.mdl.create_dataset() # 创建输出dataset结构
for i in range(self._output_num):
temp_buffer_size = acl.mdl.get_output_size_by_index(self.model_desc, i) # 获取模型输出个数
temp_buffer, ret = acl.rt.malloc(temp_buffer_size, ACL_MEM_MALLOC_NORMAL_ONLY) # 为每个输出申请device内存
dataset_buffer = acl.create_data_buffer(temp_buffer,
temp_buffer_size) # 创建输出的data buffer结构,将申请的内存填入data buffer
_, ret = acl.mdl.add_dataset_buffer(self.output_dataset, dataset_buffer) # 将 data buffer 加入输出dataset(地址)
if ret == FAILED:
self._release_dataset(self.output_dataset) # 失败时释放dataset
return ret
print("[Model] create model output dataset success")
return ret
def _gen_input_dataset(self, input_list):
''' 组织输入数据的dataset结构 '''
ret = SUCCESS
self._input_num = acl.mdl.get_num_inputs(self.model_desc) # 获取模型输入个数
self.input_dataset = acl.mdl.create_dataset() # 创建输入dataset结构
for i in range(self._input_num):
item = input_list[i] # 获取第 i 个输入数据
data_ptr = acl.util.bytes_to_ptr(item.tobytes()) # 获取输入数据字节流
size = item.size * item.itemsize # 获取输入数据字节数
dataset_buffer = acl.create_data_buffer(data_ptr, size) # 创建输入dataset buffer结构, 填入输入数据
_, ret = acl.mdl.add_dataset_buffer(self.input_dataset, dataset_buffer) # 将dataset buffer加入dataset
if ret == FAILED:
self._release_dataset(self.input_dataset) # 失败时释放dataset
#print("[Model] create model input dataset success")
def _unpack_bytes_array(self, byte_array, shape, datatype):
''' 将内存不同类型的数据解码为numpy数组 '''
np_type = None
# 获取输出数据类型对应的numpy数组类型和解码标记
if datatype == 0: # ACL_FLOAT
np_type = np.float32
elif datatype == 1: # ACL_FLOAT16
np_type = np.float16
elif datatype == 3: # ACL_INT32
np_type = np.int32
elif datatype == 8: # ACL_UINT32
np_type = np.uint32
else:
print("unsurpport datatype ", datatype)
return
# 将解码后的数据组织为numpy数组,并设置shape和类型
return np.frombuffer(byte_array, dtype=np_type).reshape(shape)
def _output_dataset_to_numpy(self):
''' 将模型输出解码为numpy数组 '''
dataset = []
# 遍历每个输出
for i in range(self._output_num):
buffer = acl.mdl.get_dataset_buffer(self.output_dataset, i) # 从输出dataset中获取buffer
data_ptr = acl.get_data_buffer_addr(buffer) # 获取输出数据内存地址
size = acl.get_data_buffer_size(buffer) # 获取输出数据字节数
narray = acl.util.ptr_to_bytes(data_ptr, size) # 将指针转为字节流数据
# 根据模型输出的shape和数据类型,将内存数据解码为numpy数组
outret = acl.mdl.get_output_dims(self.model_desc, i)[0]
dims = outret["dims"] # 获取每个输出的维度
# print(f"name:{outret['name']}")
# print(f"dimCount:{outret['dimCount']}")
'''
dims = {
"name": xxx, #tensor name
"dimCount":xxx#shape中的维度个数
"dims": [xx, xx, xx] # 维度信息 --- 取的这个
}
'''
datatype = acl.mdl.get_output_data_type(self.model_desc, i) # 获取每个输出的数据类型 --就数据类型float16,int8等
output_nparray = self._unpack_bytes_array(narray, tuple(dims), datatype) # 解码为numpy数组
dataset.append(output_nparray)
return dataset
def execute(self, input_list):
'''创建输入dataset对象, 推理完成后, 将输出数据转换为numpy格式'''
self._gen_input_dataset(input_list) # 创建模型输入dataset结构
ret = acl.mdl.execute(self.model_id, self.input_dataset, self.output_dataset) # 调用离线模型的execute推理数据
if ret:
self.mylogger.error(f"acl.mdl.execute fail!--{ret}")
self._release_dataset(self.input_dataset) # 失败时释放dataset --创建输入空间失败时会释放。
return None
out_numpy = self._output_dataset_to_numpy() # 将推理输出的二进制数据流解码为numpy数组, 数组的shape和类型与模型输出规格一致
self._release_dataset(self.input_dataset) # 释放dataset -- 要不要执行需要验证
return out_numpy
def release(self):
''' 释放模型相关资源 '''
if self._is_released:
return
print("Model start release...")
self._release_dataset(self.input_dataset) # 释放输入数据结构
self.input_dataset = None # 将输入数据置空
self._release_dataset(self.output_dataset) # 释放输出数据结构
self.output_dataset = None # 将输出数据置空
if self.model_id:
ret = acl.mdl.unload(self.model_id) # 卸载模型
if self.model_desc:
ret = acl.mdl.destroy_desc(self.model_desc) # 释放模型描述信息
self._is_released = True
print("Model release source success")
#测试使用
#self._del_acl()
def _release_dataset(self, dataset):
''' 释放 aclmdlDataset 类型数据 '''
if not dataset:
return
num = acl.mdl.get_dataset_num_buffers(dataset) # 获取数据集包含的buffer个数
for i in range(num):
data_buf = acl.mdl.get_dataset_buffer(dataset, i) # 获取buffer指针
if data_buf:
ret = acl.destroy_data_buffer(data_buf) # 释放buffer
ret = acl.mdl.destroy_dataset(dataset) # 销毁数据集
# @abstractmethod
# def infer(self, inputs): # 保留接口, 子类必须重写
# pass
10 months ago
11 months ago
@abstractmethod
def verify(self,image,data,isdraw=1):
10 months ago
'''
:param image: 需要验证的图片
:param data: select t1.model_id,t1.check_area,t1.polygon ,t2.duration_time,t2.proportion,t2.model_path
:param isdraw: 是否需要绘制线框0-不绘制1-绘制
:return: detections,bwarn,warntext bwarn:0-没有识别到符合要求的目标1-没有识别到符合要求的目标
10 months ago
'''
11 months ago
pass