天天看点

rknn量化hrnet流程【人体骨架点检测】

最近在做骨架点识别,需要对pth模型进行一个量化。

1.首先是转onnx

# Copyright (c) OpenMMLab. All rights reserved.
import argparse
import warnings

import mmcv
import numpy as np
import torch
from mmcv.runner import load_checkpoint
from mmpose.apis import init_pose_model

from mmaction.models import build_model

try:
    import onnx
    import onnxruntime as rt
except ImportError as e:
    raise ImportError(f'Please install onnx and onnxruntime first. {e}')

try:
    from mmcv.onnx.symbolic import register_extra_symbolics
except ModuleNotFoundError:
    raise NotImplementedError('please update mmcv to version>=1.0.4')


def _convert_batchnorm(module):
    """Convert the syncBNs into normal BN3ds."""
    module_output = module
    if isinstance(module, torch.nn.SyncBatchNorm):
        module_output = torch.nn.BatchNorm3d(module.num_features, module.eps,
                                             module.momentum, module.affine,
                                             module.track_running_stats)
        if module.affine:
            module_output.weight.data = module.weight.data.clone().detach()
            module_output.bias.data = module.bias.data.clone().detach()
            # keep requires_grad unchanged
            module_output.weight.requires_grad = module.weight.requires_grad
            module_output.bias.requires_grad = module.bias.requires_grad
        module_output.running_mean = module.running_mean
        module_output.running_var = module.running_var
        module_output.num_batches_tracked = module.num_batches_tracked
    for name, child in module.named_children():
        module_output.add_module(name, _convert_batchnorm(child))
    del module
    return module_output


def pytorch2onnx(model,
                 input_shape,
                 opset_version=11,
                 show=False,
                 output_file='tmp.onnx',
                 verify=False):
    """Convert pytorch model to onnx model.

    Args:
        model (:obj:`nn.Module`): The pytorch model to be exported.
        input_shape (tuple[int]): The input tensor shape of the model.
        opset_version (int): Opset version of onnx used. Default: 11.
        show (bool): Determines whether to print the onnx model architecture.
            Default: False.
        output_file (str): Output onnx model name. Default: 'tmp.onnx'.
        verify (bool): Determines whether to verify the onnx model.
            Default: False.
    """
    model.cpu().eval()

    input_tensor = torch.randn(input_shape)

    register_extra_symbolics(opset_version)#11
    torch.onnx.export(
        model,
        input_tensor,
        output_file,
        export_params=True,
        keep_initializers_as_inputs=True,
        verbose=show,
        opset_version=opset_version)

    print(f'Successfully exported ONNX model: {output_file}')
    if verify:#true
        # check by onnx
        onnx_model = onnx.load(output_file)
        onnx.checker.check_model(onnx_model)#当我们的模型不可用时,将会报出异常

        # check the numerical value
        # get pytorch output
        pytorch_result = model(input_tensor)[0].detach().numpy()#(1,120) 120个类的概率

        # get onnx output
        input_all = [node.name for node in onnx_model.graph.input]#存放graph的输入数据信息存放graph的输入数据信息
        input_initializer = [
            node.name for node in onnx_model.graph.initializer#存放超参数 [类型:TensorProto列表],对于一个多层网络而言,其中间层的输入有来自上一层的输出,也有来自外界的超参数和数据
        ]
        net_feed_input = list(set(input_all) - set(input_initializer))#搞懂这两个相减是什么意思['onnx::Reshape_0']
        assert len(net_feed_input) == 1
        sess = rt.InferenceSession(output_file)
        onnx_result = sess.run(
            None, {net_feed_input[0]: input_tensor.detach().numpy()})[0]
        # only compare part of results
        random_class = np.random.randint(pytorch_result.shape[1])
        assert np.allclose(
            pytorch_result[:, 16], onnx_result[:, 16]
        ), 'The outputs are different between Pytorch and ONNX'
        print('The numerical values are same between Pytorch and ONNX')


def parse_args():
    parser = argparse.ArgumentParser(
        description='Convert MMAction2 models to ONNX')
    parser.add_argument('config', help='test config file path')
    parser.add_argument('checkpoint', help='checkpoint file')
    parser.add_argument('--show', action='store_true', help='show onnx graph')
    parser.add_argument('--output-file', type=str, default='tmp.onnx')
    parser.add_argument('--opset-version', type=int, default=11)
    parser.add_argument(
        '--verify',
        action='store_true',
        help='verify the onnx model output against pytorch output')
    parser.add_argument(
        '--is-localizer',
        action='store_true',
        help='whether it is a localizer')
    parser.add_argument(
        '--shape',
        type=int,
        nargs='+',
        # default=[1, 3, 224, 224],
        default=[1,3,256,192],
        # default=[1,2, 17, 8, 64, 64],#17代表关键点个数也是channel数,$batch $clip $channel $time $height $width
        help='input video size')
    parser.add_argument(
        '--softmax',
        action='store_true',
        help='wheter to add softmax layer at the end of recognizers')
    args = parser.parse_args()
    return args


if __name__ == '__main__':
    args = parse_args()

    assert args.opset_version == 11, 'MMAction2 only supports opset 11 now'

    cfg = mmcv.Config.fromfile(args.config)
    # import modules from string list.

    if not args.is_localizer:
        cfg.model.backbone.pretrained = None

    # build the model#修改
    pose_config = "demo/hrnet_w32_coco_256x192.py"
    pose_checkpoint = "checkpoints/hrnet_w32_coco_256x192-c78dce93_20200708.pth"
    model = init_pose_model(pose_config, pose_checkpoint,
                            'cpu')#构建完添加了model.cfg的属性
    # model = build_model(
    #     cfg.model, train_cfg=None, test_cfg=cfg.get('test_cfg'))
    model = _convert_batchnorm(model)

    # onnx.export does not support kwargs
    if hasattr(model, 'forward_dummy'):
        from functools import partial
        # model.forward = partial(model.forward_dummy, softmax=args.softmax)
        model.forward = model.forward_dummy
    elif hasattr(model, '_forward') and args.is_localizer:
        model.forward = model._forward
    else:
        raise NotImplementedError(
            'Please implement the forward method for exporting.')

    checkpoint = load_checkpoint(model, args.checkpoint, map_location='cpu')

    # convert model to onnx file
    pytorch2onnx(
        model,
        args.shape,
        opset_version=args.opset_version,
        show=args.show,
        output_file=args.output_file,
        verify=args.verify)

    # Following strings of text style are from colorama package
    bright_style, reset_style = '\x1b[1m', '\x1b[0m'
    red_text, blue_text = '\x1b[31m', '\x1b[34m'
    white_background = '\x1b[107m'

    msg = white_background + bright_style + red_text
    msg += 'DeprecationWarning: This tool will be deprecated in future. '
    msg += blue_text + 'Welcome to use the unified model deployment toolbox '
    msg += 'MMDeploy: https://github.com/open-mmlab/mmdeploy'
    msg += reset_style
    warnings.warn(msg)

           

2.得到onnx模型后我们转rknn,代码如下

from rknn.api import RKNN
 
# ONNX_MODEL = 'hrnet_w32_macaque_256x192-f7e9e04f_20210407.onnx'
ONNX_MODEL = 'hrnet.onnx'
# ONNX_MODEL = 'action.onnx'
# RKNN_MODEL = 'hrnet_w32_macaque_256x192-f7e9e04f_20230208.rknn'
RKNN_MODEL = 'hrnet.rknn'
 
if __name__ == '__main__':
 
    # Create RKNN object
    rknn = RKNN(verbose=True)
    # mean = [0.485, 0.456, 0.406]
    # std= [0.229, 0.224, 0.225]
    mean = [123.675, 116.28, 103.53]
    std= [58.395, 57.12, 57.375]
    # pre-process config
    print('--> config model')
    # rknn.config(mean_values=[[0,0, 0]], std_values=[[255 , 255 , 255]], reorder_channel='0 1 2',
    #             target_platform='rk3399pro',
    #             quantized_dtype='asymmetric_affine-u8', optimization_level=3, output_optimize=1)
    # rknn.config(mean_values=[mean], std_values=[std],target_platform='rk3588',quantized_algorithm='normal',quant_img_RGB2BGR=True,
    #             quantized_dtype='asymmetric_quantized-8', optimization_level=3)
    #imagenet数据集的均值和方差(三分量顺序是RGB)
    #对于img0 得到的是BGR格式 ;shape为(H,W,C)但是我们需要的图片往往是RGB,shape为(H,W,C),所以很自然需要转换
    # rknn.config(target_platform='rk3588',quantized_algorithm='normal',mean_values=[mean], std_values=[std],quant_img_RGB2BGR=True,
    #             quantized_dtype='asymmetric_quantized-8', optimization_level=3)
    rknn.config(target_platform='rk3588',quantized_algorithm='normal',mean_values=[mean], std_values=[std],
                quantized_dtype='asymmetric_quantized-8', optimization_level=3)
    print('done')
 
    print('--> Loading model')
    ret = rknn.load_onnx(model=ONNX_MODEL)
    if ret != 0:
        print('Load model  failed!')
        exit(ret)
    print('done')
 
    # Build model
    print('--> Building model')
    ret = rknn.build(do_quantization=True, dataset='dataset.txt')  # ,pre_compile=True
    if ret != 0:
        print('Build failed!')
        exit(ret)
    print('done')
    # Export rknn model
    print('--> Export RKNN model')
    ret = rknn.export_rknn(RKNN_MODEL)
    if ret != 0:
        print('Export hrnet_w32_macaque_256x192-f7e9e04f_20210407_sim.rknn failed!')
        exit(ret)
    print('done')
 
    rknn.release()

           

3.最后用rknn进行推理产生结果

import os
import urllib
import traceback
import time
import sys
import warnings

import numpy as np
import cv2
import  torch
from mmcv.parallel import collate, scatter
from torchvision.transforms import functional as F
# from rknn.api import RKNN
from rknnlite.api import RKNNLite
import onnx



# RKNN_MODEL = "hrnet_w32_macaque_256x192-f7e9e04f_20230208.rknn"
RKNN_MODEL = "hrnet.rknn"
ONXX_MODEL = "hrnet.onnx"
IMG_PATH = "1.png"
mean = [0.485, 0.456, 0.406]
std= [0.229, 0.224, 0.225]
QUANTIZE_ON = True

def bbox_xywh2cs(bbox, aspect_ratio, padding=1., pixel_std=200.):
    """Transform the bbox format from (x,y,w,h) into (center, scale)

    Args:
        bbox (ndarray): Single bbox in (x, y, w, h)
        aspect_ratio (float): The expected bbox aspect ratio (w over h)
        padding (float): Bbox padding factor that will be multilied to scale.
            Default: 1.0
        pixel_std (float): The scale normalization factor. Default: 200.0

    Returns:
        tuple: A tuple containing center and scale.
        - np.ndarray[float32](2,): Center of the bbox (x, y).
        - np.ndarray[float32](2,): Scale of the bbox w & h.
    """

    x, y, w, h = bbox[:4]
    center = np.array([x + w * 0.5, y + h * 0.5], dtype=np.float32)

    if w > aspect_ratio * h:
        h = w * 1.0 / aspect_ratio
    elif w < aspect_ratio * h:
        w = h * aspect_ratio

    scale = np.array([w, h], dtype=np.float32) / pixel_std
    scale = scale * padding

    return center, scale
def rotate_point(pt, angle_rad):
    """Rotate a point by an angle.

    Args:
        pt (list[float]): 2 dimensional point to be rotated
        angle_rad (float): rotation angle by radian

    Returns:
        list[float]: Rotated point.
    """
    assert len(pt) == 2
    sn, cs = np.sin(angle_rad), np.cos(angle_rad)
    new_x = pt[0] * cs - pt[1] * sn
    new_y = pt[0] * sn + pt[1] * cs
    rotated_pt = [new_x, new_y]

    return rotated_pt
def _get_3rd_point(a, b):
    """To calculate the affine matrix, three pairs of points are required. This
    function is used to get the 3rd point, given 2D points a & b.

    The 3rd point is defined by rotating vector `a - b` by 90 degrees
    anticlockwise, using b as the rotation center.

    Args:
        a (np.ndarray): point(x,y)
        b (np.ndarray): point(x,y)

    Returns:
        np.ndarray: The 3rd point.
    """
    assert len(a) == 2
    assert len(b) == 2
    direction = a - b
    third_pt = b + np.array([-direction[1], direction[0]], dtype=np.float32)

    return third_pt
def get_affine_transform(center,
                         scale,
                         rot,
                         output_size,
                         shift=(0., 0.),
                         inv=False):
    """Get the affine transform matrix, given the center/scale/rot/output_size.

    Args:
        center (np.ndarray[2, ]): Center of the bounding box (x, y).
        scale (np.ndarray[2, ]): Scale of the bounding box
            wrt [width, height].
        rot (float): Rotation angle (degree).
        output_size (np.ndarray[2, ] | list(2,)): Size of the
            destination heatmaps.
        shift (0-100%): Shift translation ratio wrt the width/height.
            Default (0., 0.).
        inv (bool): Option to inverse the affine transform direction.
            (inv=False: src->dst or inv=True: dst->src)

    Returns:
        np.ndarray: The transform matrix.
    """
    assert len(center) == 2
    assert len(scale) == 2
    assert len(output_size) == 2
    assert len(shift) == 2

    # pixel_std is 200.
    scale_tmp = scale * 200.0

    shift = np.array(shift)
    src_w = scale_tmp[0]
    dst_w = output_size[0]
    dst_h = output_size[1]

    rot_rad = np.pi * rot / 180
    src_dir = rotate_point([0., src_w * -0.5], rot_rad)
    dst_dir = np.array([0., dst_w * -0.5])

    src = np.zeros((3, 2), dtype=np.float32)
    src[0, :] = center + scale_tmp * shift
    src[1, :] = center + src_dir + scale_tmp * shift
    src[2, :] = _get_3rd_point(src[0, :], src[1, :])

    dst = np.zeros((3, 2), dtype=np.float32)
    dst[0, :] = [dst_w * 0.5, dst_h * 0.5]
    dst[1, :] = np.array([dst_w * 0.5, dst_h * 0.5]) + dst_dir
    dst[2, :] = _get_3rd_point(dst[0, :], dst[1, :])

    if inv:
        trans = cv2.getAffineTransform(np.float32(dst), np.float32(src))
    else:
        trans = cv2.getAffineTransform(np.float32(src), np.float32(dst))

    return trans
def bbox_xyxy2xywh(bbox_xyxy):
    """Transform the bbox format from x1y1x2y2 to xywh.

    Args:
        bbox_xyxy (np.ndarray): Bounding boxes (with scores), shaped (n, 4) or
            (n, 5). (left, top, right, bottom, [score])

    Returns:
        np.ndarray: Bounding boxes (with scores),
          shaped (n, 4) or (n, 5). (left, top, width, height, [score])
    """
    bbox_xywh = bbox_xyxy.copy()
    bbox_xywh[:, 2] = bbox_xywh[:, 2] - bbox_xywh[:, 0]
    bbox_xywh[:, 3] = bbox_xywh[:, 3] - bbox_xywh[:, 1]

    return bbox_xywh
def _get_max_preds(heatmaps):
    """Get keypoint predictions from score maps.

    Note:
        batch_size: N
        num_keypoints: K
        heatmap height: H
        heatmap width: W

    Args:
        heatmaps (np.ndarray[N, K, H, W]): model predicted heatmaps.

    Returns:
        tuple: A tuple containing aggregated results.

        - preds (np.ndarray[N, K, 2]): Predicted keypoint location.
        - maxvals (np.ndarray[N, K, 1]): Scores (confidence) of the keypoints.
    """
    assert isinstance(heatmaps,
                      np.ndarray), ('heatmaps should be numpy.ndarray')
    assert heatmaps.ndim == 4, 'batch_images should be 4-ndim'

    N, K, _, W = heatmaps.shape
    heatmaps_reshaped = heatmaps.reshape((N, K, -1))
    idx = np.argmax(heatmaps_reshaped, 2).reshape((N, K, 1))
    maxvals = np.amax(heatmaps_reshaped, 2).reshape((N, K, 1))

    preds = np.tile(idx, (1, 1, 2)).astype(np.float32)
    preds[:, :, 0] = preds[:, :, 0] % W
    preds[:, :, 1] = preds[:, :, 1] // W

    preds = np.where(np.tile(maxvals, (1, 1, 2)) > 0.0, preds, -1)
    return preds, maxvals
def transform_preds(coords, center, scale, output_size, use_udp=False):
    """Get final keypoint predictions from heatmaps and apply scaling and
    translation to map them back to the image.

    Note:
        num_keypoints: K

    Args:
        coords (np.ndarray[K, ndims]):

            * If ndims=2, corrds are predicted keypoint location.
            * If ndims=4, corrds are composed of (x, y, scores, tags)
            * If ndims=5, corrds are composed of (x, y, scores, tags,
              flipped_tags)

        center (np.ndarray[2, ]): Center of the bounding box (x, y).
        scale (np.ndarray[2, ]): Scale of the bounding box
            wrt [width, height].
        output_size (np.ndarray[2, ] | list(2,)): Size of the
            destination heatmaps.
        use_udp (bool): Use unbiased data processing

    Returns:
        np.ndarray: Predicted coordinates in the images.
    """
    assert coords.shape[1] in (2, 4, 5)
    assert len(center) == 2
    assert len(scale) == 2
    assert len(output_size) == 2

    # Recover the scale which is normalized by a factor of 200.
    scale = scale * 200.0

    if use_udp:
        scale_x = scale[0] / (output_size[0] - 1.0)
        scale_y = scale[1] / (output_size[1] - 1.0)
    else:
        scale_x = scale[0] / output_size[0]
        scale_y = scale[1] / output_size[1]

    target_coords = np.ones_like(coords)
    target_coords[:, 0] = coords[:, 0] * scale_x + center[0] - scale[0] * 0.5
    target_coords[:, 1] = coords[:, 1] * scale_y + center[1] - scale[1] * 0.5

    return target_coords
def keypoints_from_heatmaps(heatmaps,
                            center,
                            scale,
                            unbiased=False,
                            post_process='default',
                            kernel=11,
                            valid_radius_factor=0.0546875,
                            use_udp=False,
                            target_type='GaussianHeatmap'):
    
    # Avoid being affected
    heatmaps = heatmaps.copy()

    N, K, H, W = heatmaps.shape
    preds, maxvals = _get_max_preds(heatmaps)
    # add +/-0.25 shift to the predicted locations for higher acc.
    for n in range(N):
        for k in range(K):
            heatmap = heatmaps[n][k]
            px = int(preds[n][k][0])
            py = int(preds[n][k][1])
            if 1 < px < W - 1 and 1 < py < H - 1:
                diff = np.array([
                    heatmap[py][px + 1] - heatmap[py][px - 1],
                    heatmap[py + 1][px] - heatmap[py - 1][px]
                ])
                preds[n][k] += np.sign(diff) * .25
                if post_process == 'megvii':
                    preds[n][k] += 0.5

    # Transform back to the image
    for i in range(N):
        preds[i] = transform_preds(
            preds[i], center[i], scale[i], [W, H], use_udp=use_udp)

    if post_process == 'megvii':
        maxvals = maxvals / 255.0 + 0.5

    return preds, maxvals

def decode(output,center,scale,score_,batch_size = 1):


    c = np.zeros((batch_size, 2), dtype=np.float32)
    s = np.zeros((batch_size, 2), dtype=np.float32)
    score = np.ones(batch_size)
    for i in range(batch_size):
        c[i, :] = center
        s[i, :] = scale

        score[i] = np.array(score_).reshape(-1)
	  

    preds, maxvals = keypoints_from_heatmaps(
	    output,
	    c,
	    s,
       False,
        'default',
        11,
        0.0546875,
        False,
        'GaussianHeatmap'
        )

    all_preds = np.zeros((batch_size, preds.shape[1], 3), dtype=np.float32)
    all_boxes = np.zeros((batch_size, 6), dtype=np.float32)
    all_preds[:, :, 0:2] = preds[:, :, 0:2]
    all_preds[:, :, 2:3] = maxvals
    all_boxes[:, 0:2] = c[:, 0:2]
    all_boxes[:, 2:4] = s[:, 0:2]
    all_boxes[:, 4] = np.prod(s * 200.0, axis=1)
    all_boxes[:, 5] = score
    result = {}

    result['preds'] = all_preds
    result['boxes'] = all_boxes

    print(result)
    return result
def draw(bgr,predict_dict,skeleton):
    bboxes = predict_dict["boxes"]
    for box in bboxes:
        cv2.rectangle(bgr, (int(box[0]), int(box[1])), (int(box[0]) + int(box[2]), int(box[1]) + int(box[3])),(255, 0, 0))

    all_preds = predict_dict["preds"]
    for all_pred in all_preds:
        for x,y,s in all_pred:
            cv2.circle(bgr,(int(x), int(y)), 3,(0, 255, 120), -1)
        for sk in skeleton:
            x0=  int(all_pred[sk[0]][0])
            y0 = int(all_pred[sk[0]][1])
            x1 = int(all_pred[sk[1]][0])
            y1 = int(all_pred[sk[1]][1])
            cv2.line(bgr, (x0, y0), (x1, y1),(0, 255, 0), 1)
    cv2.imwrite("result.jpg",bgr)

if __name__ == "__main__":

    # Create RKNN object
    # rknn = RKNN()
    rknn = RKNNLite()

    if not os.path.exists(RKNN_MODEL):
        print("model not exist")
        exit(-1)

    # Load ONNX model
    print("--> Loading model")
    ret = rknn.load_rknn(RKNN_MODEL)
    if ret != 0:
        print("Load rknn model failed!")
        exit(ret)
    print("done")

    # init runtime environment
    print("--> Init runtime environment")
    ret = rknn.init_runtime()
    if ret != 0:
        print("Init runtime environment failed")
        exit(ret)
    print("done")

    # bbox=[2.213932e+02, 1.935179e+02, 9.873443e+02-2.213932e+02, 1.035825e+03-1.935179e+02,9.995332e-01] 
    bbox=[0,0,400,631,0.99]
    image_size=[192,256]
    src_img = cv2.imread(IMG_PATH)
    # img = src_img
    img = cv2.cvtColor(src_img, cv2.COLOR_BGR2RGB)  # hwc rgb
    aspect_ratio = image_size[0] / image_size[1]
    img_height = img.shape[0]
    img_width = img.shape[1]
    padding=1.25
    pixel_std=200
    center, scale = bbox_xywh2cs(
        bbox,
        aspect_ratio,
        padding,
        pixel_std)
    trans = get_affine_transform(center, scale, 0, image_size)
    img = cv2.warpAffine(#旋转后加入了黑边 最后生成的点的坐标也要对齐
        img,
        trans, (int(image_size[0]), int(image_size[1])),
        flags=cv2.INTER_LINEAR)
    print(trans)
    img = np.transpose(img, (2, 0, 1)).astype(np.float32)  # chw rgb
    # outputs = rknn.inference(inputs=[img], data_type=None, data_format="nchw")[0]
    # img[0, ...] = ((img[0, ...] / 255.0) - 0.485) / 0.229
    # img[1, ...] = ((img[1, ...] / 255.0) - 0.456) / 0.224
    # img[2, ...] = ((img[2, ...] / 255.0) - 0.406) / 0.225
    img = np.transpose(img, (1, 2, 0)).astype(np.float32)  # chw rgb
    # img = img.reshape(1,256,192,3)
    # Inference
    print("--> Running model")
    start = time.clock()
    outputs= rknn.inference(inputs=[img])[0]
    end = time.clock()
    # 计算运行时间
    runTime = end - start
    runTime_ms = runTime * 1000
    # 输出运行时间
    print("运行时间:", runTime_ms, "毫秒")
    print(outputs)
    predict_dict=decode(outputs,center,scale,bbox[-1])
    skeleton = [[15, 13],[13, 11], [16, 14],[14, 12],[11, 12], [5, 11], [6, 12], [5, 6],[5, 7], [6, 8], [7, 9], [8, 10],[1, 2], [0, 1], [0, 2], [1, 3],[2, 4], [3, 5], [4, 6]]
    draw(src_img,predict_dict,skeleton)
    # rknn.release()

           

结果如下

rknn量化hrnet流程【人体骨架点检测】

继续阅读