最近在做骨架点识别,需要对pth模型进行一个量化。
1.首先是转onnx
# Copyright (c) OpenMMLab. All rights reserved.
import argparse
import warnings
import mmcv
import numpy as np
import torch
from mmcv.runner import load_checkpoint
from mmpose.apis import init_pose_model
from mmaction.models import build_model
try:
import onnx
import onnxruntime as rt
except ImportError as e:
raise ImportError(f'Please install onnx and onnxruntime first. {e}')
try:
from mmcv.onnx.symbolic import register_extra_symbolics
except ModuleNotFoundError:
raise NotImplementedError('please update mmcv to version>=1.0.4')
def _convert_batchnorm(module):
"""Convert the syncBNs into normal BN3ds."""
module_output = module
if isinstance(module, torch.nn.SyncBatchNorm):
module_output = torch.nn.BatchNorm3d(module.num_features, module.eps,
module.momentum, module.affine,
module.track_running_stats)
if module.affine:
module_output.weight.data = module.weight.data.clone().detach()
module_output.bias.data = module.bias.data.clone().detach()
# keep requires_grad unchanged
module_output.weight.requires_grad = module.weight.requires_grad
module_output.bias.requires_grad = module.bias.requires_grad
module_output.running_mean = module.running_mean
module_output.running_var = module.running_var
module_output.num_batches_tracked = module.num_batches_tracked
for name, child in module.named_children():
module_output.add_module(name, _convert_batchnorm(child))
del module
return module_output
def pytorch2onnx(model,
input_shape,
opset_version=11,
show=False,
output_file='tmp.onnx',
verify=False):
"""Convert pytorch model to onnx model.
Args:
model (:obj:`nn.Module`): The pytorch model to be exported.
input_shape (tuple[int]): The input tensor shape of the model.
opset_version (int): Opset version of onnx used. Default: 11.
show (bool): Determines whether to print the onnx model architecture.
Default: False.
output_file (str): Output onnx model name. Default: 'tmp.onnx'.
verify (bool): Determines whether to verify the onnx model.
Default: False.
"""
model.cpu().eval()
input_tensor = torch.randn(input_shape)
register_extra_symbolics(opset_version)#11
torch.onnx.export(
model,
input_tensor,
output_file,
export_params=True,
keep_initializers_as_inputs=True,
verbose=show,
opset_version=opset_version)
print(f'Successfully exported ONNX model: {output_file}')
if verify:#true
# check by onnx
onnx_model = onnx.load(output_file)
onnx.checker.check_model(onnx_model)#当我们的模型不可用时,将会报出异常
# check the numerical value
# get pytorch output
pytorch_result = model(input_tensor)[0].detach().numpy()#(1,120) 120个类的概率
# get onnx output
input_all = [node.name for node in onnx_model.graph.input]#存放graph的输入数据信息存放graph的输入数据信息
input_initializer = [
node.name for node in onnx_model.graph.initializer#存放超参数 [类型:TensorProto列表],对于一个多层网络而言,其中间层的输入有来自上一层的输出,也有来自外界的超参数和数据
]
net_feed_input = list(set(input_all) - set(input_initializer))#搞懂这两个相减是什么意思['onnx::Reshape_0']
assert len(net_feed_input) == 1
sess = rt.InferenceSession(output_file)
onnx_result = sess.run(
None, {net_feed_input[0]: input_tensor.detach().numpy()})[0]
# only compare part of results
random_class = np.random.randint(pytorch_result.shape[1])
assert np.allclose(
pytorch_result[:, 16], onnx_result[:, 16]
), 'The outputs are different between Pytorch and ONNX'
print('The numerical values are same between Pytorch and ONNX')
def parse_args():
parser = argparse.ArgumentParser(
description='Convert MMAction2 models to ONNX')
parser.add_argument('config', help='test config file path')
parser.add_argument('checkpoint', help='checkpoint file')
parser.add_argument('--show', action='store_true', help='show onnx graph')
parser.add_argument('--output-file', type=str, default='tmp.onnx')
parser.add_argument('--opset-version', type=int, default=11)
parser.add_argument(
'--verify',
action='store_true',
help='verify the onnx model output against pytorch output')
parser.add_argument(
'--is-localizer',
action='store_true',
help='whether it is a localizer')
parser.add_argument(
'--shape',
type=int,
nargs='+',
# default=[1, 3, 224, 224],
default=[1,3,256,192],
# default=[1,2, 17, 8, 64, 64],#17代表关键点个数也是channel数,$batch $clip $channel $time $height $width
help='input video size')
parser.add_argument(
'--softmax',
action='store_true',
help='wheter to add softmax layer at the end of recognizers')
args = parser.parse_args()
return args
if __name__ == '__main__':
args = parse_args()
assert args.opset_version == 11, 'MMAction2 only supports opset 11 now'
cfg = mmcv.Config.fromfile(args.config)
# import modules from string list.
if not args.is_localizer:
cfg.model.backbone.pretrained = None
# build the model#修改
pose_config = "demo/hrnet_w32_coco_256x192.py"
pose_checkpoint = "checkpoints/hrnet_w32_coco_256x192-c78dce93_20200708.pth"
model = init_pose_model(pose_config, pose_checkpoint,
'cpu')#构建完添加了model.cfg的属性
# model = build_model(
# cfg.model, train_cfg=None, test_cfg=cfg.get('test_cfg'))
model = _convert_batchnorm(model)
# onnx.export does not support kwargs
if hasattr(model, 'forward_dummy'):
from functools import partial
# model.forward = partial(model.forward_dummy, softmax=args.softmax)
model.forward = model.forward_dummy
elif hasattr(model, '_forward') and args.is_localizer:
model.forward = model._forward
else:
raise NotImplementedError(
'Please implement the forward method for exporting.')
checkpoint = load_checkpoint(model, args.checkpoint, map_location='cpu')
# convert model to onnx file
pytorch2onnx(
model,
args.shape,
opset_version=args.opset_version,
show=args.show,
output_file=args.output_file,
verify=args.verify)
# Following strings of text style are from colorama package
bright_style, reset_style = '\x1b[1m', '\x1b[0m'
red_text, blue_text = '\x1b[31m', '\x1b[34m'
white_background = '\x1b[107m'
msg = white_background + bright_style + red_text
msg += 'DeprecationWarning: This tool will be deprecated in future. '
msg += blue_text + 'Welcome to use the unified model deployment toolbox '
msg += 'MMDeploy: https://github.com/open-mmlab/mmdeploy'
msg += reset_style
warnings.warn(msg)
2.得到onnx模型后我们转rknn,代码如下
from rknn.api import RKNN
# ONNX_MODEL = 'hrnet_w32_macaque_256x192-f7e9e04f_20210407.onnx'
ONNX_MODEL = 'hrnet.onnx'
# ONNX_MODEL = 'action.onnx'
# RKNN_MODEL = 'hrnet_w32_macaque_256x192-f7e9e04f_20230208.rknn'
RKNN_MODEL = 'hrnet.rknn'
if __name__ == '__main__':
# Create RKNN object
rknn = RKNN(verbose=True)
# mean = [0.485, 0.456, 0.406]
# std= [0.229, 0.224, 0.225]
mean = [123.675, 116.28, 103.53]
std= [58.395, 57.12, 57.375]
# pre-process config
print('--> config model')
# rknn.config(mean_values=[[0,0, 0]], std_values=[[255 , 255 , 255]], reorder_channel='0 1 2',
# target_platform='rk3399pro',
# quantized_dtype='asymmetric_affine-u8', optimization_level=3, output_optimize=1)
# rknn.config(mean_values=[mean], std_values=[std],target_platform='rk3588',quantized_algorithm='normal',quant_img_RGB2BGR=True,
# quantized_dtype='asymmetric_quantized-8', optimization_level=3)
#imagenet数据集的均值和方差(三分量顺序是RGB)
#对于img0 得到的是BGR格式 ;shape为(H,W,C)但是我们需要的图片往往是RGB,shape为(H,W,C),所以很自然需要转换
# rknn.config(target_platform='rk3588',quantized_algorithm='normal',mean_values=[mean], std_values=[std],quant_img_RGB2BGR=True,
# quantized_dtype='asymmetric_quantized-8', optimization_level=3)
rknn.config(target_platform='rk3588',quantized_algorithm='normal',mean_values=[mean], std_values=[std],
quantized_dtype='asymmetric_quantized-8', optimization_level=3)
print('done')
print('--> Loading model')
ret = rknn.load_onnx(model=ONNX_MODEL)
if ret != 0:
print('Load model failed!')
exit(ret)
print('done')
# Build model
print('--> Building model')
ret = rknn.build(do_quantization=True, dataset='dataset.txt') # ,pre_compile=True
if ret != 0:
print('Build failed!')
exit(ret)
print('done')
# Export rknn model
print('--> Export RKNN model')
ret = rknn.export_rknn(RKNN_MODEL)
if ret != 0:
print('Export hrnet_w32_macaque_256x192-f7e9e04f_20210407_sim.rknn failed!')
exit(ret)
print('done')
rknn.release()
3.最后用rknn进行推理产生结果
import os
import urllib
import traceback
import time
import sys
import warnings
import numpy as np
import cv2
import torch
from mmcv.parallel import collate, scatter
from torchvision.transforms import functional as F
# from rknn.api import RKNN
from rknnlite.api import RKNNLite
import onnx
# RKNN_MODEL = "hrnet_w32_macaque_256x192-f7e9e04f_20230208.rknn"
RKNN_MODEL = "hrnet.rknn"
ONXX_MODEL = "hrnet.onnx"
IMG_PATH = "1.png"
mean = [0.485, 0.456, 0.406]
std= [0.229, 0.224, 0.225]
QUANTIZE_ON = True
def bbox_xywh2cs(bbox, aspect_ratio, padding=1., pixel_std=200.):
"""Transform the bbox format from (x,y,w,h) into (center, scale)
Args:
bbox (ndarray): Single bbox in (x, y, w, h)
aspect_ratio (float): The expected bbox aspect ratio (w over h)
padding (float): Bbox padding factor that will be multilied to scale.
Default: 1.0
pixel_std (float): The scale normalization factor. Default: 200.0
Returns:
tuple: A tuple containing center and scale.
- np.ndarray[float32](2,): Center of the bbox (x, y).
- np.ndarray[float32](2,): Scale of the bbox w & h.
"""
x, y, w, h = bbox[:4]
center = np.array([x + w * 0.5, y + h * 0.5], dtype=np.float32)
if w > aspect_ratio * h:
h = w * 1.0 / aspect_ratio
elif w < aspect_ratio * h:
w = h * aspect_ratio
scale = np.array([w, h], dtype=np.float32) / pixel_std
scale = scale * padding
return center, scale
def rotate_point(pt, angle_rad):
"""Rotate a point by an angle.
Args:
pt (list[float]): 2 dimensional point to be rotated
angle_rad (float): rotation angle by radian
Returns:
list[float]: Rotated point.
"""
assert len(pt) == 2
sn, cs = np.sin(angle_rad), np.cos(angle_rad)
new_x = pt[0] * cs - pt[1] * sn
new_y = pt[0] * sn + pt[1] * cs
rotated_pt = [new_x, new_y]
return rotated_pt
def _get_3rd_point(a, b):
"""To calculate the affine matrix, three pairs of points are required. This
function is used to get the 3rd point, given 2D points a & b.
The 3rd point is defined by rotating vector `a - b` by 90 degrees
anticlockwise, using b as the rotation center.
Args:
a (np.ndarray): point(x,y)
b (np.ndarray): point(x,y)
Returns:
np.ndarray: The 3rd point.
"""
assert len(a) == 2
assert len(b) == 2
direction = a - b
third_pt = b + np.array([-direction[1], direction[0]], dtype=np.float32)
return third_pt
def get_affine_transform(center,
scale,
rot,
output_size,
shift=(0., 0.),
inv=False):
"""Get the affine transform matrix, given the center/scale/rot/output_size.
Args:
center (np.ndarray[2, ]): Center of the bounding box (x, y).
scale (np.ndarray[2, ]): Scale of the bounding box
wrt [width, height].
rot (float): Rotation angle (degree).
output_size (np.ndarray[2, ] | list(2,)): Size of the
destination heatmaps.
shift (0-100%): Shift translation ratio wrt the width/height.
Default (0., 0.).
inv (bool): Option to inverse the affine transform direction.
(inv=False: src->dst or inv=True: dst->src)
Returns:
np.ndarray: The transform matrix.
"""
assert len(center) == 2
assert len(scale) == 2
assert len(output_size) == 2
assert len(shift) == 2
# pixel_std is 200.
scale_tmp = scale * 200.0
shift = np.array(shift)
src_w = scale_tmp[0]
dst_w = output_size[0]
dst_h = output_size[1]
rot_rad = np.pi * rot / 180
src_dir = rotate_point([0., src_w * -0.5], rot_rad)
dst_dir = np.array([0., dst_w * -0.5])
src = np.zeros((3, 2), dtype=np.float32)
src[0, :] = center + scale_tmp * shift
src[1, :] = center + src_dir + scale_tmp * shift
src[2, :] = _get_3rd_point(src[0, :], src[1, :])
dst = np.zeros((3, 2), dtype=np.float32)
dst[0, :] = [dst_w * 0.5, dst_h * 0.5]
dst[1, :] = np.array([dst_w * 0.5, dst_h * 0.5]) + dst_dir
dst[2, :] = _get_3rd_point(dst[0, :], dst[1, :])
if inv:
trans = cv2.getAffineTransform(np.float32(dst), np.float32(src))
else:
trans = cv2.getAffineTransform(np.float32(src), np.float32(dst))
return trans
def bbox_xyxy2xywh(bbox_xyxy):
"""Transform the bbox format from x1y1x2y2 to xywh.
Args:
bbox_xyxy (np.ndarray): Bounding boxes (with scores), shaped (n, 4) or
(n, 5). (left, top, right, bottom, [score])
Returns:
np.ndarray: Bounding boxes (with scores),
shaped (n, 4) or (n, 5). (left, top, width, height, [score])
"""
bbox_xywh = bbox_xyxy.copy()
bbox_xywh[:, 2] = bbox_xywh[:, 2] - bbox_xywh[:, 0]
bbox_xywh[:, 3] = bbox_xywh[:, 3] - bbox_xywh[:, 1]
return bbox_xywh
def _get_max_preds(heatmaps):
"""Get keypoint predictions from score maps.
Note:
batch_size: N
num_keypoints: K
heatmap height: H
heatmap width: W
Args:
heatmaps (np.ndarray[N, K, H, W]): model predicted heatmaps.
Returns:
tuple: A tuple containing aggregated results.
- preds (np.ndarray[N, K, 2]): Predicted keypoint location.
- maxvals (np.ndarray[N, K, 1]): Scores (confidence) of the keypoints.
"""
assert isinstance(heatmaps,
np.ndarray), ('heatmaps should be numpy.ndarray')
assert heatmaps.ndim == 4, 'batch_images should be 4-ndim'
N, K, _, W = heatmaps.shape
heatmaps_reshaped = heatmaps.reshape((N, K, -1))
idx = np.argmax(heatmaps_reshaped, 2).reshape((N, K, 1))
maxvals = np.amax(heatmaps_reshaped, 2).reshape((N, K, 1))
preds = np.tile(idx, (1, 1, 2)).astype(np.float32)
preds[:, :, 0] = preds[:, :, 0] % W
preds[:, :, 1] = preds[:, :, 1] // W
preds = np.where(np.tile(maxvals, (1, 1, 2)) > 0.0, preds, -1)
return preds, maxvals
def transform_preds(coords, center, scale, output_size, use_udp=False):
"""Get final keypoint predictions from heatmaps and apply scaling and
translation to map them back to the image.
Note:
num_keypoints: K
Args:
coords (np.ndarray[K, ndims]):
* If ndims=2, corrds are predicted keypoint location.
* If ndims=4, corrds are composed of (x, y, scores, tags)
* If ndims=5, corrds are composed of (x, y, scores, tags,
flipped_tags)
center (np.ndarray[2, ]): Center of the bounding box (x, y).
scale (np.ndarray[2, ]): Scale of the bounding box
wrt [width, height].
output_size (np.ndarray[2, ] | list(2,)): Size of the
destination heatmaps.
use_udp (bool): Use unbiased data processing
Returns:
np.ndarray: Predicted coordinates in the images.
"""
assert coords.shape[1] in (2, 4, 5)
assert len(center) == 2
assert len(scale) == 2
assert len(output_size) == 2
# Recover the scale which is normalized by a factor of 200.
scale = scale * 200.0
if use_udp:
scale_x = scale[0] / (output_size[0] - 1.0)
scale_y = scale[1] / (output_size[1] - 1.0)
else:
scale_x = scale[0] / output_size[0]
scale_y = scale[1] / output_size[1]
target_coords = np.ones_like(coords)
target_coords[:, 0] = coords[:, 0] * scale_x + center[0] - scale[0] * 0.5
target_coords[:, 1] = coords[:, 1] * scale_y + center[1] - scale[1] * 0.5
return target_coords
def keypoints_from_heatmaps(heatmaps,
center,
scale,
unbiased=False,
post_process='default',
kernel=11,
valid_radius_factor=0.0546875,
use_udp=False,
target_type='GaussianHeatmap'):
# Avoid being affected
heatmaps = heatmaps.copy()
N, K, H, W = heatmaps.shape
preds, maxvals = _get_max_preds(heatmaps)
# add +/-0.25 shift to the predicted locations for higher acc.
for n in range(N):
for k in range(K):
heatmap = heatmaps[n][k]
px = int(preds[n][k][0])
py = int(preds[n][k][1])
if 1 < px < W - 1 and 1 < py < H - 1:
diff = np.array([
heatmap[py][px + 1] - heatmap[py][px - 1],
heatmap[py + 1][px] - heatmap[py - 1][px]
])
preds[n][k] += np.sign(diff) * .25
if post_process == 'megvii':
preds[n][k] += 0.5
# Transform back to the image
for i in range(N):
preds[i] = transform_preds(
preds[i], center[i], scale[i], [W, H], use_udp=use_udp)
if post_process == 'megvii':
maxvals = maxvals / 255.0 + 0.5
return preds, maxvals
def decode(output,center,scale,score_,batch_size = 1):
c = np.zeros((batch_size, 2), dtype=np.float32)
s = np.zeros((batch_size, 2), dtype=np.float32)
score = np.ones(batch_size)
for i in range(batch_size):
c[i, :] = center
s[i, :] = scale
score[i] = np.array(score_).reshape(-1)
preds, maxvals = keypoints_from_heatmaps(
output,
c,
s,
False,
'default',
11,
0.0546875,
False,
'GaussianHeatmap'
)
all_preds = np.zeros((batch_size, preds.shape[1], 3), dtype=np.float32)
all_boxes = np.zeros((batch_size, 6), dtype=np.float32)
all_preds[:, :, 0:2] = preds[:, :, 0:2]
all_preds[:, :, 2:3] = maxvals
all_boxes[:, 0:2] = c[:, 0:2]
all_boxes[:, 2:4] = s[:, 0:2]
all_boxes[:, 4] = np.prod(s * 200.0, axis=1)
all_boxes[:, 5] = score
result = {}
result['preds'] = all_preds
result['boxes'] = all_boxes
print(result)
return result
def draw(bgr,predict_dict,skeleton):
bboxes = predict_dict["boxes"]
for box in bboxes:
cv2.rectangle(bgr, (int(box[0]), int(box[1])), (int(box[0]) + int(box[2]), int(box[1]) + int(box[3])),(255, 0, 0))
all_preds = predict_dict["preds"]
for all_pred in all_preds:
for x,y,s in all_pred:
cv2.circle(bgr,(int(x), int(y)), 3,(0, 255, 120), -1)
for sk in skeleton:
x0= int(all_pred[sk[0]][0])
y0 = int(all_pred[sk[0]][1])
x1 = int(all_pred[sk[1]][0])
y1 = int(all_pred[sk[1]][1])
cv2.line(bgr, (x0, y0), (x1, y1),(0, 255, 0), 1)
cv2.imwrite("result.jpg",bgr)
if __name__ == "__main__":
# Create RKNN object
# rknn = RKNN()
rknn = RKNNLite()
if not os.path.exists(RKNN_MODEL):
print("model not exist")
exit(-1)
# Load ONNX model
print("--> Loading model")
ret = rknn.load_rknn(RKNN_MODEL)
if ret != 0:
print("Load rknn model failed!")
exit(ret)
print("done")
# init runtime environment
print("--> Init runtime environment")
ret = rknn.init_runtime()
if ret != 0:
print("Init runtime environment failed")
exit(ret)
print("done")
# bbox=[2.213932e+02, 1.935179e+02, 9.873443e+02-2.213932e+02, 1.035825e+03-1.935179e+02,9.995332e-01]
bbox=[0,0,400,631,0.99]
image_size=[192,256]
src_img = cv2.imread(IMG_PATH)
# img = src_img
img = cv2.cvtColor(src_img, cv2.COLOR_BGR2RGB) # hwc rgb
aspect_ratio = image_size[0] / image_size[1]
img_height = img.shape[0]
img_width = img.shape[1]
padding=1.25
pixel_std=200
center, scale = bbox_xywh2cs(
bbox,
aspect_ratio,
padding,
pixel_std)
trans = get_affine_transform(center, scale, 0, image_size)
img = cv2.warpAffine(#旋转后加入了黑边 最后生成的点的坐标也要对齐
img,
trans, (int(image_size[0]), int(image_size[1])),
flags=cv2.INTER_LINEAR)
print(trans)
img = np.transpose(img, (2, 0, 1)).astype(np.float32) # chw rgb
# outputs = rknn.inference(inputs=[img], data_type=None, data_format="nchw")[0]
# img[0, ...] = ((img[0, ...] / 255.0) - 0.485) / 0.229
# img[1, ...] = ((img[1, ...] / 255.0) - 0.456) / 0.224
# img[2, ...] = ((img[2, ...] / 255.0) - 0.406) / 0.225
img = np.transpose(img, (1, 2, 0)).astype(np.float32) # chw rgb
# img = img.reshape(1,256,192,3)
# Inference
print("--> Running model")
start = time.clock()
outputs= rknn.inference(inputs=[img])[0]
end = time.clock()
# 计算运行时间
runTime = end - start
runTime_ms = runTime * 1000
# 输出运行时间
print("运行时间:", runTime_ms, "毫秒")
print(outputs)
predict_dict=decode(outputs,center,scale,bbox[-1])
skeleton = [[15, 13],[13, 11], [16, 14],[14, 12],[11, 12], [5, 11], [6, 12], [5, 6],[5, 7], [6, 8], [7, 9], [8, 10],[1, 2], [0, 1], [0, 2], [1, 3],[2, 4], [3, 5], [4, 6]]
draw(src_img,predict_dict,skeleton)
# rknn.release()
结果如下
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnL1QWN0I2N2EDNhlDMzMmZhRWNhRzY5EmZjFDMldzM3Y2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)