最近在做骨架點識别,需要對pth模型進行一個量化。
1.首先是轉onnx
# Copyright (c) OpenMMLab. All rights reserved.
import argparse
import warnings
import mmcv
import numpy as np
import torch
from mmcv.runner import load_checkpoint
from mmpose.apis import init_pose_model
from mmaction.models import build_model
try:
import onnx
import onnxruntime as rt
except ImportError as e:
raise ImportError(f'Please install onnx and onnxruntime first. {e}')
try:
from mmcv.onnx.symbolic import register_extra_symbolics
except ModuleNotFoundError:
raise NotImplementedError('please update mmcv to version>=1.0.4')
def _convert_batchnorm(module):
"""Convert the syncBNs into normal BN3ds."""
module_output = module
if isinstance(module, torch.nn.SyncBatchNorm):
module_output = torch.nn.BatchNorm3d(module.num_features, module.eps,
module.momentum, module.affine,
module.track_running_stats)
if module.affine:
module_output.weight.data = module.weight.data.clone().detach()
module_output.bias.data = module.bias.data.clone().detach()
# keep requires_grad unchanged
module_output.weight.requires_grad = module.weight.requires_grad
module_output.bias.requires_grad = module.bias.requires_grad
module_output.running_mean = module.running_mean
module_output.running_var = module.running_var
module_output.num_batches_tracked = module.num_batches_tracked
for name, child in module.named_children():
module_output.add_module(name, _convert_batchnorm(child))
del module
return module_output
def pytorch2onnx(model,
input_shape,
opset_version=11,
show=False,
output_file='tmp.onnx',
verify=False):
"""Convert pytorch model to onnx model.
Args:
model (:obj:`nn.Module`): The pytorch model to be exported.
input_shape (tuple[int]): The input tensor shape of the model.
opset_version (int): Opset version of onnx used. Default: 11.
show (bool): Determines whether to print the onnx model architecture.
Default: False.
output_file (str): Output onnx model name. Default: 'tmp.onnx'.
verify (bool): Determines whether to verify the onnx model.
Default: False.
"""
model.cpu().eval()
input_tensor = torch.randn(input_shape)
register_extra_symbolics(opset_version)#11
torch.onnx.export(
model,
input_tensor,
output_file,
export_params=True,
keep_initializers_as_inputs=True,
verbose=show,
opset_version=opset_version)
print(f'Successfully exported ONNX model: {output_file}')
if verify:#true
# check by onnx
onnx_model = onnx.load(output_file)
onnx.checker.check_model(onnx_model)#當我們的模型不可用時,将會報出異常
# check the numerical value
# get pytorch output
pytorch_result = model(input_tensor)[0].detach().numpy()#(1,120) 120個類的機率
# get onnx output
input_all = [node.name for node in onnx_model.graph.input]#存放graph的輸入資料資訊存放graph的輸入資料資訊
input_initializer = [
node.name for node in onnx_model.graph.initializer#存放超參數 [類型:TensorProto清單],對于一個多層網絡而言,其中間層的輸入有來自上一層的輸出,也有來自外界的超參數和資料
]
net_feed_input = list(set(input_all) - set(input_initializer))#搞懂這兩個相減是什麼意思['onnx::Reshape_0']
assert len(net_feed_input) == 1
sess = rt.InferenceSession(output_file)
onnx_result = sess.run(
None, {net_feed_input[0]: input_tensor.detach().numpy()})[0]
# only compare part of results
random_class = np.random.randint(pytorch_result.shape[1])
assert np.allclose(
pytorch_result[:, 16], onnx_result[:, 16]
), 'The outputs are different between Pytorch and ONNX'
print('The numerical values are same between Pytorch and ONNX')
def parse_args():
parser = argparse.ArgumentParser(
description='Convert MMAction2 models to ONNX')
parser.add_argument('config', help='test config file path')
parser.add_argument('checkpoint', help='checkpoint file')
parser.add_argument('--show', action='store_true', help='show onnx graph')
parser.add_argument('--output-file', type=str, default='tmp.onnx')
parser.add_argument('--opset-version', type=int, default=11)
parser.add_argument(
'--verify',
action='store_true',
help='verify the onnx model output against pytorch output')
parser.add_argument(
'--is-localizer',
action='store_true',
help='whether it is a localizer')
parser.add_argument(
'--shape',
type=int,
nargs='+',
# default=[1, 3, 224, 224],
default=[1,3,256,192],
# default=[1,2, 17, 8, 64, 64],#17代表關鍵點個數也是channel數,$batch $clip $channel $time $height $width
help='input video size')
parser.add_argument(
'--softmax',
action='store_true',
help='wheter to add softmax layer at the end of recognizers')
args = parser.parse_args()
return args
if __name__ == '__main__':
args = parse_args()
assert args.opset_version == 11, 'MMAction2 only supports opset 11 now'
cfg = mmcv.Config.fromfile(args.config)
# import modules from string list.
if not args.is_localizer:
cfg.model.backbone.pretrained = None
# build the model#修改
pose_config = "demo/hrnet_w32_coco_256x192.py"
pose_checkpoint = "checkpoints/hrnet_w32_coco_256x192-c78dce93_20200708.pth"
model = init_pose_model(pose_config, pose_checkpoint,
'cpu')#建構完添加了model.cfg的屬性
# model = build_model(
# cfg.model, train_cfg=None, test_cfg=cfg.get('test_cfg'))
model = _convert_batchnorm(model)
# onnx.export does not support kwargs
if hasattr(model, 'forward_dummy'):
from functools import partial
# model.forward = partial(model.forward_dummy, softmax=args.softmax)
model.forward = model.forward_dummy
elif hasattr(model, '_forward') and args.is_localizer:
model.forward = model._forward
else:
raise NotImplementedError(
'Please implement the forward method for exporting.')
checkpoint = load_checkpoint(model, args.checkpoint, map_location='cpu')
# convert model to onnx file
pytorch2onnx(
model,
args.shape,
opset_version=args.opset_version,
show=args.show,
output_file=args.output_file,
verify=args.verify)
# Following strings of text style are from colorama package
bright_style, reset_style = '\x1b[1m', '\x1b[0m'
red_text, blue_text = '\x1b[31m', '\x1b[34m'
white_background = '\x1b[107m'
msg = white_background + bright_style + red_text
msg += 'DeprecationWarning: This tool will be deprecated in future. '
msg += blue_text + 'Welcome to use the unified model deployment toolbox '
msg += 'MMDeploy: https://github.com/open-mmlab/mmdeploy'
msg += reset_style
warnings.warn(msg)
2.得到onnx模型後我們轉rknn,代碼如下
from rknn.api import RKNN
# ONNX_MODEL = 'hrnet_w32_macaque_256x192-f7e9e04f_20210407.onnx'
ONNX_MODEL = 'hrnet.onnx'
# ONNX_MODEL = 'action.onnx'
# RKNN_MODEL = 'hrnet_w32_macaque_256x192-f7e9e04f_20230208.rknn'
RKNN_MODEL = 'hrnet.rknn'
if __name__ == '__main__':
# Create RKNN object
rknn = RKNN(verbose=True)
# mean = [0.485, 0.456, 0.406]
# std= [0.229, 0.224, 0.225]
mean = [123.675, 116.28, 103.53]
std= [58.395, 57.12, 57.375]
# pre-process config
print('--> config model')
# rknn.config(mean_values=[[0,0, 0]], std_values=[[255 , 255 , 255]], reorder_channel='0 1 2',
# target_platform='rk3399pro',
# quantized_dtype='asymmetric_affine-u8', optimization_level=3, output_optimize=1)
# rknn.config(mean_values=[mean], std_values=[std],target_platform='rk3588',quantized_algorithm='normal',quant_img_RGB2BGR=True,
# quantized_dtype='asymmetric_quantized-8', optimization_level=3)
#imagenet資料集的均值和方差(三分量順序是RGB)
#對于img0 得到的是BGR格式 ;shape為(H,W,C)但是我們需要的圖檔往往是RGB,shape為(H,W,C),是以很自然需要轉換
# rknn.config(target_platform='rk3588',quantized_algorithm='normal',mean_values=[mean], std_values=[std],quant_img_RGB2BGR=True,
# quantized_dtype='asymmetric_quantized-8', optimization_level=3)
rknn.config(target_platform='rk3588',quantized_algorithm='normal',mean_values=[mean], std_values=[std],
quantized_dtype='asymmetric_quantized-8', optimization_level=3)
print('done')
print('--> Loading model')
ret = rknn.load_onnx(model=ONNX_MODEL)
if ret != 0:
print('Load model failed!')
exit(ret)
print('done')
# Build model
print('--> Building model')
ret = rknn.build(do_quantization=True, dataset='dataset.txt') # ,pre_compile=True
if ret != 0:
print('Build failed!')
exit(ret)
print('done')
# Export rknn model
print('--> Export RKNN model')
ret = rknn.export_rknn(RKNN_MODEL)
if ret != 0:
print('Export hrnet_w32_macaque_256x192-f7e9e04f_20210407_sim.rknn failed!')
exit(ret)
print('done')
rknn.release()
3.最後用rknn進行推理産生結果
import os
import urllib
import traceback
import time
import sys
import warnings
import numpy as np
import cv2
import torch
from mmcv.parallel import collate, scatter
from torchvision.transforms import functional as F
# from rknn.api import RKNN
from rknnlite.api import RKNNLite
import onnx
# RKNN_MODEL = "hrnet_w32_macaque_256x192-f7e9e04f_20230208.rknn"
RKNN_MODEL = "hrnet.rknn"
ONXX_MODEL = "hrnet.onnx"
IMG_PATH = "1.png"
mean = [0.485, 0.456, 0.406]
std= [0.229, 0.224, 0.225]
QUANTIZE_ON = True
def bbox_xywh2cs(bbox, aspect_ratio, padding=1., pixel_std=200.):
"""Transform the bbox format from (x,y,w,h) into (center, scale)
Args:
bbox (ndarray): Single bbox in (x, y, w, h)
aspect_ratio (float): The expected bbox aspect ratio (w over h)
padding (float): Bbox padding factor that will be multilied to scale.
Default: 1.0
pixel_std (float): The scale normalization factor. Default: 200.0
Returns:
tuple: A tuple containing center and scale.
- np.ndarray[float32](2,): Center of the bbox (x, y).
- np.ndarray[float32](2,): Scale of the bbox w & h.
"""
x, y, w, h = bbox[:4]
center = np.array([x + w * 0.5, y + h * 0.5], dtype=np.float32)
if w > aspect_ratio * h:
h = w * 1.0 / aspect_ratio
elif w < aspect_ratio * h:
w = h * aspect_ratio
scale = np.array([w, h], dtype=np.float32) / pixel_std
scale = scale * padding
return center, scale
def rotate_point(pt, angle_rad):
"""Rotate a point by an angle.
Args:
pt (list[float]): 2 dimensional point to be rotated
angle_rad (float): rotation angle by radian
Returns:
list[float]: Rotated point.
"""
assert len(pt) == 2
sn, cs = np.sin(angle_rad), np.cos(angle_rad)
new_x = pt[0] * cs - pt[1] * sn
new_y = pt[0] * sn + pt[1] * cs
rotated_pt = [new_x, new_y]
return rotated_pt
def _get_3rd_point(a, b):
"""To calculate the affine matrix, three pairs of points are required. This
function is used to get the 3rd point, given 2D points a & b.
The 3rd point is defined by rotating vector `a - b` by 90 degrees
anticlockwise, using b as the rotation center.
Args:
a (np.ndarray): point(x,y)
b (np.ndarray): point(x,y)
Returns:
np.ndarray: The 3rd point.
"""
assert len(a) == 2
assert len(b) == 2
direction = a - b
third_pt = b + np.array([-direction[1], direction[0]], dtype=np.float32)
return third_pt
def get_affine_transform(center,
scale,
rot,
output_size,
shift=(0., 0.),
inv=False):
"""Get the affine transform matrix, given the center/scale/rot/output_size.
Args:
center (np.ndarray[2, ]): Center of the bounding box (x, y).
scale (np.ndarray[2, ]): Scale of the bounding box
wrt [width, height].
rot (float): Rotation angle (degree).
output_size (np.ndarray[2, ] | list(2,)): Size of the
destination heatmaps.
shift (0-100%): Shift translation ratio wrt the width/height.
Default (0., 0.).
inv (bool): Option to inverse the affine transform direction.
(inv=False: src->dst or inv=True: dst->src)
Returns:
np.ndarray: The transform matrix.
"""
assert len(center) == 2
assert len(scale) == 2
assert len(output_size) == 2
assert len(shift) == 2
# pixel_std is 200.
scale_tmp = scale * 200.0
shift = np.array(shift)
src_w = scale_tmp[0]
dst_w = output_size[0]
dst_h = output_size[1]
rot_rad = np.pi * rot / 180
src_dir = rotate_point([0., src_w * -0.5], rot_rad)
dst_dir = np.array([0., dst_w * -0.5])
src = np.zeros((3, 2), dtype=np.float32)
src[0, :] = center + scale_tmp * shift
src[1, :] = center + src_dir + scale_tmp * shift
src[2, :] = _get_3rd_point(src[0, :], src[1, :])
dst = np.zeros((3, 2), dtype=np.float32)
dst[0, :] = [dst_w * 0.5, dst_h * 0.5]
dst[1, :] = np.array([dst_w * 0.5, dst_h * 0.5]) + dst_dir
dst[2, :] = _get_3rd_point(dst[0, :], dst[1, :])
if inv:
trans = cv2.getAffineTransform(np.float32(dst), np.float32(src))
else:
trans = cv2.getAffineTransform(np.float32(src), np.float32(dst))
return trans
def bbox_xyxy2xywh(bbox_xyxy):
"""Transform the bbox format from x1y1x2y2 to xywh.
Args:
bbox_xyxy (np.ndarray): Bounding boxes (with scores), shaped (n, 4) or
(n, 5). (left, top, right, bottom, [score])
Returns:
np.ndarray: Bounding boxes (with scores),
shaped (n, 4) or (n, 5). (left, top, width, height, [score])
"""
bbox_xywh = bbox_xyxy.copy()
bbox_xywh[:, 2] = bbox_xywh[:, 2] - bbox_xywh[:, 0]
bbox_xywh[:, 3] = bbox_xywh[:, 3] - bbox_xywh[:, 1]
return bbox_xywh
def _get_max_preds(heatmaps):
"""Get keypoint predictions from score maps.
Note:
batch_size: N
num_keypoints: K
heatmap height: H
heatmap width: W
Args:
heatmaps (np.ndarray[N, K, H, W]): model predicted heatmaps.
Returns:
tuple: A tuple containing aggregated results.
- preds (np.ndarray[N, K, 2]): Predicted keypoint location.
- maxvals (np.ndarray[N, K, 1]): Scores (confidence) of the keypoints.
"""
assert isinstance(heatmaps,
np.ndarray), ('heatmaps should be numpy.ndarray')
assert heatmaps.ndim == 4, 'batch_images should be 4-ndim'
N, K, _, W = heatmaps.shape
heatmaps_reshaped = heatmaps.reshape((N, K, -1))
idx = np.argmax(heatmaps_reshaped, 2).reshape((N, K, 1))
maxvals = np.amax(heatmaps_reshaped, 2).reshape((N, K, 1))
preds = np.tile(idx, (1, 1, 2)).astype(np.float32)
preds[:, :, 0] = preds[:, :, 0] % W
preds[:, :, 1] = preds[:, :, 1] // W
preds = np.where(np.tile(maxvals, (1, 1, 2)) > 0.0, preds, -1)
return preds, maxvals
def transform_preds(coords, center, scale, output_size, use_udp=False):
"""Get final keypoint predictions from heatmaps and apply scaling and
translation to map them back to the image.
Note:
num_keypoints: K
Args:
coords (np.ndarray[K, ndims]):
* If ndims=2, corrds are predicted keypoint location.
* If ndims=4, corrds are composed of (x, y, scores, tags)
* If ndims=5, corrds are composed of (x, y, scores, tags,
flipped_tags)
center (np.ndarray[2, ]): Center of the bounding box (x, y).
scale (np.ndarray[2, ]): Scale of the bounding box
wrt [width, height].
output_size (np.ndarray[2, ] | list(2,)): Size of the
destination heatmaps.
use_udp (bool): Use unbiased data processing
Returns:
np.ndarray: Predicted coordinates in the images.
"""
assert coords.shape[1] in (2, 4, 5)
assert len(center) == 2
assert len(scale) == 2
assert len(output_size) == 2
# Recover the scale which is normalized by a factor of 200.
scale = scale * 200.0
if use_udp:
scale_x = scale[0] / (output_size[0] - 1.0)
scale_y = scale[1] / (output_size[1] - 1.0)
else:
scale_x = scale[0] / output_size[0]
scale_y = scale[1] / output_size[1]
target_coords = np.ones_like(coords)
target_coords[:, 0] = coords[:, 0] * scale_x + center[0] - scale[0] * 0.5
target_coords[:, 1] = coords[:, 1] * scale_y + center[1] - scale[1] * 0.5
return target_coords
def keypoints_from_heatmaps(heatmaps,
center,
scale,
unbiased=False,
post_process='default',
kernel=11,
valid_radius_factor=0.0546875,
use_udp=False,
target_type='GaussianHeatmap'):
# Avoid being affected
heatmaps = heatmaps.copy()
N, K, H, W = heatmaps.shape
preds, maxvals = _get_max_preds(heatmaps)
# add +/-0.25 shift to the predicted locations for higher acc.
for n in range(N):
for k in range(K):
heatmap = heatmaps[n][k]
px = int(preds[n][k][0])
py = int(preds[n][k][1])
if 1 < px < W - 1 and 1 < py < H - 1:
diff = np.array([
heatmap[py][px + 1] - heatmap[py][px - 1],
heatmap[py + 1][px] - heatmap[py - 1][px]
])
preds[n][k] += np.sign(diff) * .25
if post_process == 'megvii':
preds[n][k] += 0.5
# Transform back to the image
for i in range(N):
preds[i] = transform_preds(
preds[i], center[i], scale[i], [W, H], use_udp=use_udp)
if post_process == 'megvii':
maxvals = maxvals / 255.0 + 0.5
return preds, maxvals
def decode(output,center,scale,score_,batch_size = 1):
c = np.zeros((batch_size, 2), dtype=np.float32)
s = np.zeros((batch_size, 2), dtype=np.float32)
score = np.ones(batch_size)
for i in range(batch_size):
c[i, :] = center
s[i, :] = scale
score[i] = np.array(score_).reshape(-1)
preds, maxvals = keypoints_from_heatmaps(
output,
c,
s,
False,
'default',
11,
0.0546875,
False,
'GaussianHeatmap'
)
all_preds = np.zeros((batch_size, preds.shape[1], 3), dtype=np.float32)
all_boxes = np.zeros((batch_size, 6), dtype=np.float32)
all_preds[:, :, 0:2] = preds[:, :, 0:2]
all_preds[:, :, 2:3] = maxvals
all_boxes[:, 0:2] = c[:, 0:2]
all_boxes[:, 2:4] = s[:, 0:2]
all_boxes[:, 4] = np.prod(s * 200.0, axis=1)
all_boxes[:, 5] = score
result = {}
result['preds'] = all_preds
result['boxes'] = all_boxes
print(result)
return result
def draw(bgr,predict_dict,skeleton):
bboxes = predict_dict["boxes"]
for box in bboxes:
cv2.rectangle(bgr, (int(box[0]), int(box[1])), (int(box[0]) + int(box[2]), int(box[1]) + int(box[3])),(255, 0, 0))
all_preds = predict_dict["preds"]
for all_pred in all_preds:
for x,y,s in all_pred:
cv2.circle(bgr,(int(x), int(y)), 3,(0, 255, 120), -1)
for sk in skeleton:
x0= int(all_pred[sk[0]][0])
y0 = int(all_pred[sk[0]][1])
x1 = int(all_pred[sk[1]][0])
y1 = int(all_pred[sk[1]][1])
cv2.line(bgr, (x0, y0), (x1, y1),(0, 255, 0), 1)
cv2.imwrite("result.jpg",bgr)
if __name__ == "__main__":
# Create RKNN object
# rknn = RKNN()
rknn = RKNNLite()
if not os.path.exists(RKNN_MODEL):
print("model not exist")
exit(-1)
# Load ONNX model
print("--> Loading model")
ret = rknn.load_rknn(RKNN_MODEL)
if ret != 0:
print("Load rknn model failed!")
exit(ret)
print("done")
# init runtime environment
print("--> Init runtime environment")
ret = rknn.init_runtime()
if ret != 0:
print("Init runtime environment failed")
exit(ret)
print("done")
# bbox=[2.213932e+02, 1.935179e+02, 9.873443e+02-2.213932e+02, 1.035825e+03-1.935179e+02,9.995332e-01]
bbox=[0,0,400,631,0.99]
image_size=[192,256]
src_img = cv2.imread(IMG_PATH)
# img = src_img
img = cv2.cvtColor(src_img, cv2.COLOR_BGR2RGB) # hwc rgb
aspect_ratio = image_size[0] / image_size[1]
img_height = img.shape[0]
img_width = img.shape[1]
padding=1.25
pixel_std=200
center, scale = bbox_xywh2cs(
bbox,
aspect_ratio,
padding,
pixel_std)
trans = get_affine_transform(center, scale, 0, image_size)
img = cv2.warpAffine(#旋轉後加入了黑邊 最後生成的點的坐标也要對齊
img,
trans, (int(image_size[0]), int(image_size[1])),
flags=cv2.INTER_LINEAR)
print(trans)
img = np.transpose(img, (2, 0, 1)).astype(np.float32) # chw rgb
# outputs = rknn.inference(inputs=[img], data_type=None, data_format="nchw")[0]
# img[0, ...] = ((img[0, ...] / 255.0) - 0.485) / 0.229
# img[1, ...] = ((img[1, ...] / 255.0) - 0.456) / 0.224
# img[2, ...] = ((img[2, ...] / 255.0) - 0.406) / 0.225
img = np.transpose(img, (1, 2, 0)).astype(np.float32) # chw rgb
# img = img.reshape(1,256,192,3)
# Inference
print("--> Running model")
start = time.clock()
outputs= rknn.inference(inputs=[img])[0]
end = time.clock()
# 計算運作時間
runTime = end - start
runTime_ms = runTime * 1000
# 輸出運作時間
print("運作時間:", runTime_ms, "毫秒")
print(outputs)
predict_dict=decode(outputs,center,scale,bbox[-1])
skeleton = [[15, 13],[13, 11], [16, 14],[14, 12],[11, 12], [5, 11], [6, 12], [5, 6],[5, 7], [6, 8], [7, 9], [8, 10],[1, 2], [0, 1], [0, 2], [1, 3],[2, 4], [3, 5], [4, 6]]
draw(src_img,predict_dict,skeleton)
# rknn.release()
結果如下
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnL1QWN0I2N2EDNhlDMzMmZhRWNhRzY5EmZjFDMldzM3Y2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)