You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

222 lines
9.9 KiB

from abc import abstractmethod,ABC
from shapely.geometry import Point, Polygon
from myutils.ConfigManager import myCongif
import numpy as np
import cv2
import ast
import platform
if myCongif.get_data("model_platform") == "acl":
import acl
#-----acl相关------
SUCCESS = 0 # 成功状态值
FAILED = 1 # 失败状态值
ACL_MEM_MALLOC_NORMAL_ONLY = 2 # 申请内存策略, 仅申请普通页
class ModelBase(ABC):
def __init__(self,path):
'''
模型类实例化
:param path: 模型文件本身的路径
:param threshold: 模型的置信阈值
'''
self.name = None #基于name来查询,用户对模型的配置参数,代表着模型名称需要唯一 2024-6-18 -逻辑还需要完善和验证
self.version = None
self.model_type = None # 模型类型 1-图像分类,2-目标检测(yolov5),3-分割模型,4-关键点
self.system = myCongif.get_data("model_platform") #platform.system() #获取系统平台
self.do_map = { # 定义插件的入口函数 --
# POCType.POC: self.do_verify,
# POCType.SNIFFER: self.do_sniffer,
# POCType.BRUTE: self.do_brute
}
self.model_path = path # 模型路径
def __del__(self):
print("资源释放")
def draw_polygon(self, img, polygon_points,color=(0, 255, 0)):
self.polygon = Polygon(ast.literal_eval(polygon_points))
points = np.array([self.polygon.exterior.coords], dtype=np.int32)
cv2.polylines(img, points, isClosed=True, color=color, thickness=2)
def is_point_in_region(self, point):
'''判断点是否在区域内,需要先执行draw_polygon'''
if self.polygon:
return self.polygon.contains(Point(point))
else:
return False
#acl ----- 相关-----
def _init_acl(self):
'''acl初始化函数'''
self.device_id = 0
#step1 初始化
ret = acl.init()
ret = acl.rt.set_device(self.device_id) # 指定运算的Device
if ret:
raise RuntimeError(ret)
self.context, ret = acl.rt.create_context(self.device_id) # 显式创建一个Context
if ret:
raise RuntimeError(ret)
print('Init ACL Successfully')
def _del_acl(self):
'''acl去初始化'''
ret = acl.rt.destroy_context(self.context) # 释放 Context
if ret:
raise RuntimeError(ret)
ret = acl.rt.reset_device(self.device_id) # 释放Device
if ret:
raise RuntimeError(ret)
ret = acl.finalize() # 去初始化
if ret:
raise RuntimeError(ret)
print('Deinit ACL Successfully')
def _init_resource(self):
''' 初始化模型、输出相关资源。相关数据类型: aclmdlDesc aclDataBuffer aclmdlDataset'''
print("Init model resource")
# 加载模型文件
#self.model_path = "/home/HwHiAiUser/samples/yolo_acl_sample/yolov5s_bs1.om"
self.model_id, ret = acl.mdl.load_from_file(self.model_path) # 加载模型
if ret != 0:
print(f"{self.model_path}---模型加载失败!")
return False
self.model_desc = acl.mdl.create_desc() # 初始化模型信息对象
ret = acl.mdl.get_desc(self.model_desc, self.model_id) # 根据模型获取描述信息
print("[Model] Model init resource stage success")
# 创建模型输出 dataset 结构
self._gen_output_dataset() # 创建模型输出dataset结构
return True
def _gen_output_dataset(self):
''' 组织输出数据的dataset结构 '''
ret = SUCCESS
self._output_num = acl.mdl.get_num_outputs(self.model_desc) # 获取模型输出个数
self.output_dataset = acl.mdl.create_dataset() # 创建输出dataset结构
for i in range(self._output_num):
temp_buffer_size = acl.mdl.get_output_size_by_index(self.model_desc, i) # 获取模型输出个数
temp_buffer, ret = acl.rt.malloc(temp_buffer_size, ACL_MEM_MALLOC_NORMAL_ONLY) # 为每个输出申请device内存
dataset_buffer = acl.create_data_buffer(temp_buffer,
temp_buffer_size) # 创建输出的data buffer结构,将申请的内存填入data buffer
_, ret = acl.mdl.add_dataset_buffer(self.output_dataset, dataset_buffer) # 将 data buffer 加入输出dataset
if ret == FAILED:
self._release_dataset(self.output_dataset) # 失败时释放dataset
print("[Model] create model output dataset success")
def _gen_input_dataset(self, input_list):
''' 组织输入数据的dataset结构 '''
ret = SUCCESS
self._input_num = acl.mdl.get_num_inputs(self.model_desc) # 获取模型输入个数
self.input_dataset = acl.mdl.create_dataset() # 创建输入dataset结构
for i in range(self._input_num):
item = input_list[i] # 获取第 i 个输入数据
data_ptr = acl.util.bytes_to_ptr(item.tobytes()) # 获取输入数据字节流
size = item.size * item.itemsize # 获取输入数据字节数
dataset_buffer = acl.create_data_buffer(data_ptr, size) # 创建输入dataset buffer结构, 填入输入数据
_, ret = acl.mdl.add_dataset_buffer(self.input_dataset, dataset_buffer) # 将dataset buffer加入dataset
if ret == FAILED:
self._release_dataset(self.input_dataset) # 失败时释放dataset
print("[Model] create model input dataset success")
def _unpack_bytes_array(self, byte_array, shape, datatype):
''' 将内存不同类型的数据解码为numpy数组 '''
np_type = None
# 获取输出数据类型对应的numpy数组类型和解码标记
if datatype == 0: # ACL_FLOAT
np_type = np.float32
elif datatype == 1: # ACL_FLOAT16
np_type = np.float16
elif datatype == 3: # ACL_INT32
np_type = np.int32
elif datatype == 8: # ACL_UINT32
np_type = np.uint32
else:
print("unsurpport datatype ", datatype)
return
# 将解码后的数据组织为numpy数组,并设置shape和类型
return np.frombuffer(byte_array, dtype=np_type).reshape(shape)
def _output_dataset_to_numpy(self):
''' 将模型输出解码为numpy数组 '''
dataset = []
# 遍历每个输出
for i in range(self._output_num):
buffer = acl.mdl.get_dataset_buffer(self.output_dataset, i) # 从输出dataset中获取buffer
data_ptr = acl.get_data_buffer_addr(buffer) # 获取输出数据内存地址
size = acl.get_data_buffer_size(buffer) # 获取输出数据字节数
narray = acl.util.ptr_to_bytes(data_ptr, size) # 将指针转为字节流数据
# 根据模型输出的shape和数据类型,将内存数据解码为numpy数组
outret = acl.mdl.get_output_dims(self.model_desc, i)[0]
dims = outret["dims"] # 获取每个输出的维度
print(f"name:{outret['name']}")
print(f"dimCount:{outret['dimCount']}")
'''
dims = {
"name": xxx, #tensor name
"dimCount":xxx,#shape中的维度个数
"dims": [xx, xx, xx] # 维度信息 --- 取的这个
}
'''
datatype = acl.mdl.get_output_data_type(self.model_desc, i) # 获取每个输出的数据类型 --就数据类型float16,int8等
output_nparray = self._unpack_bytes_array(narray, tuple(dims), datatype) # 解码为numpy数组
dataset.append(output_nparray)
return dataset
def execute(self, input_list):
'''创建输入dataset对象, 推理完成后, 将输出数据转换为numpy格式'''
self._gen_input_dataset(input_list) # 创建模型输入dataset结构
ret = acl.mdl.execute(self.model_id, self.input_dataset, self.output_dataset) # 调用离线模型的execute推理数据
out_numpy = self._output_dataset_to_numpy() # 将推理输出的二进制数据流解码为numpy数组, 数组的shape和类型与模型输出规格一致
return out_numpy
def release(self):
''' 释放模型相关资源 '''
if self._is_released:
return
print("Model start release...")
self._release_dataset(self.input_dataset) # 释放输入数据结构
self.input_dataset = None # 将输入数据置空
self._release_dataset(self.output_dataset) # 释放输出数据结构
self.output_dataset = None # 将输出数据置空
if self.model_id:
ret = acl.mdl.unload(self.model_id) # 卸载模型
if self.model_desc:
ret = acl.mdl.destroy_desc(self.model_desc) # 释放模型描述信息
self._is_released = True
print("Model release source success")
def _release_dataset(self, dataset):
''' 释放 aclmdlDataset 类型数据 '''
if not dataset:
return
num = acl.mdl.get_dataset_num_buffers(dataset) # 获取数据集包含的buffer个数
for i in range(num):
data_buf = acl.mdl.get_dataset_buffer(dataset, i) # 获取buffer指针
if data_buf:
ret = acl.destroy_data_buffer(data_buf) # 释放buffer
ret = acl.mdl.destroy_dataset(dataset) # 销毁数据集
# @abstractmethod
# def infer(self, inputs): # 保留接口, 子类必须重写
# pass
@abstractmethod
def verify(self,image,data,isdraw=1):
'''
:param image: 需要验证的图片
:param data: select t1.model_id,t1.check_area,t1.polygon ,t2.duration_time,t2.proportion,t2.model_path
:param isdraw: 是否需要绘制线框:0-不绘制,1-绘制
:return: detections,bwarn,warntext bwarn:0-没有识别到符合要求的目标,1-没有识别到符合要求的目标。
'''
pass