天天看點

rknn量化hrnet流程【人體骨架點檢測】

最近在做骨架點識别,需要對pth模型進行一個量化。

1.首先是轉onnx

# Copyright (c) OpenMMLab. All rights reserved.
import argparse
import warnings

import mmcv
import numpy as np
import torch
from mmcv.runner import load_checkpoint
from mmpose.apis import init_pose_model

from mmaction.models import build_model

try:
    import onnx
    import onnxruntime as rt
except ImportError as e:
    raise ImportError(f'Please install onnx and onnxruntime first. {e}')

try:
    from mmcv.onnx.symbolic import register_extra_symbolics
except ModuleNotFoundError:
    raise NotImplementedError('please update mmcv to version>=1.0.4')


def _convert_batchnorm(module):
    """Convert the syncBNs into normal BN3ds."""
    module_output = module
    if isinstance(module, torch.nn.SyncBatchNorm):
        module_output = torch.nn.BatchNorm3d(module.num_features, module.eps,
                                             module.momentum, module.affine,
                                             module.track_running_stats)
        if module.affine:
            module_output.weight.data = module.weight.data.clone().detach()
            module_output.bias.data = module.bias.data.clone().detach()
            # keep requires_grad unchanged
            module_output.weight.requires_grad = module.weight.requires_grad
            module_output.bias.requires_grad = module.bias.requires_grad
        module_output.running_mean = module.running_mean
        module_output.running_var = module.running_var
        module_output.num_batches_tracked = module.num_batches_tracked
    for name, child in module.named_children():
        module_output.add_module(name, _convert_batchnorm(child))
    del module
    return module_output


def pytorch2onnx(model,
                 input_shape,
                 opset_version=11,
                 show=False,
                 output_file='tmp.onnx',
                 verify=False):
    """Convert pytorch model to onnx model.

    Args:
        model (:obj:`nn.Module`): The pytorch model to be exported.
        input_shape (tuple[int]): The input tensor shape of the model.
        opset_version (int): Opset version of onnx used. Default: 11.
        show (bool): Determines whether to print the onnx model architecture.
            Default: False.
        output_file (str): Output onnx model name. Default: 'tmp.onnx'.
        verify (bool): Determines whether to verify the onnx model.
            Default: False.
    """
    model.cpu().eval()

    input_tensor = torch.randn(input_shape)

    register_extra_symbolics(opset_version)#11
    torch.onnx.export(
        model,
        input_tensor,
        output_file,
        export_params=True,
        keep_initializers_as_inputs=True,
        verbose=show,
        opset_version=opset_version)

    print(f'Successfully exported ONNX model: {output_file}')
    if verify:#true
        # check by onnx
        onnx_model = onnx.load(output_file)
        onnx.checker.check_model(onnx_model)#當我們的模型不可用時,将會報出異常

        # check the numerical value
        # get pytorch output
        pytorch_result = model(input_tensor)[0].detach().numpy()#(1,120) 120個類的機率

        # get onnx output
        input_all = [node.name for node in onnx_model.graph.input]#存放graph的輸入資料資訊存放graph的輸入資料資訊
        input_initializer = [
            node.name for node in onnx_model.graph.initializer#存放超參數 [類型:TensorProto清單],對于一個多層網絡而言,其中間層的輸入有來自上一層的輸出,也有來自外界的超參數和資料
        ]
        net_feed_input = list(set(input_all) - set(input_initializer))#搞懂這兩個相減是什麼意思['onnx::Reshape_0']
        assert len(net_feed_input) == 1
        sess = rt.InferenceSession(output_file)
        onnx_result = sess.run(
            None, {net_feed_input[0]: input_tensor.detach().numpy()})[0]
        # only compare part of results
        random_class = np.random.randint(pytorch_result.shape[1])
        assert np.allclose(
            pytorch_result[:, 16], onnx_result[:, 16]
        ), 'The outputs are different between Pytorch and ONNX'
        print('The numerical values are same between Pytorch and ONNX')


def parse_args():
    parser = argparse.ArgumentParser(
        description='Convert MMAction2 models to ONNX')
    parser.add_argument('config', help='test config file path')
    parser.add_argument('checkpoint', help='checkpoint file')
    parser.add_argument('--show', action='store_true', help='show onnx graph')
    parser.add_argument('--output-file', type=str, default='tmp.onnx')
    parser.add_argument('--opset-version', type=int, default=11)
    parser.add_argument(
        '--verify',
        action='store_true',
        help='verify the onnx model output against pytorch output')
    parser.add_argument(
        '--is-localizer',
        action='store_true',
        help='whether it is a localizer')
    parser.add_argument(
        '--shape',
        type=int,
        nargs='+',
        # default=[1, 3, 224, 224],
        default=[1,3,256,192],
        # default=[1,2, 17, 8, 64, 64],#17代表關鍵點個數也是channel數,$batch $clip $channel $time $height $width
        help='input video size')
    parser.add_argument(
        '--softmax',
        action='store_true',
        help='wheter to add softmax layer at the end of recognizers')
    args = parser.parse_args()
    return args


if __name__ == '__main__':
    args = parse_args()

    assert args.opset_version == 11, 'MMAction2 only supports opset 11 now'

    cfg = mmcv.Config.fromfile(args.config)
    # import modules from string list.

    if not args.is_localizer:
        cfg.model.backbone.pretrained = None

    # build the model#修改
    pose_config = "demo/hrnet_w32_coco_256x192.py"
    pose_checkpoint = "checkpoints/hrnet_w32_coco_256x192-c78dce93_20200708.pth"
    model = init_pose_model(pose_config, pose_checkpoint,
                            'cpu')#建構完添加了model.cfg的屬性
    # model = build_model(
    #     cfg.model, train_cfg=None, test_cfg=cfg.get('test_cfg'))
    model = _convert_batchnorm(model)

    # onnx.export does not support kwargs
    if hasattr(model, 'forward_dummy'):
        from functools import partial
        # model.forward = partial(model.forward_dummy, softmax=args.softmax)
        model.forward = model.forward_dummy
    elif hasattr(model, '_forward') and args.is_localizer:
        model.forward = model._forward
    else:
        raise NotImplementedError(
            'Please implement the forward method for exporting.')

    checkpoint = load_checkpoint(model, args.checkpoint, map_location='cpu')

    # convert model to onnx file
    pytorch2onnx(
        model,
        args.shape,
        opset_version=args.opset_version,
        show=args.show,
        output_file=args.output_file,
        verify=args.verify)

    # Following strings of text style are from colorama package
    bright_style, reset_style = '\x1b[1m', '\x1b[0m'
    red_text, blue_text = '\x1b[31m', '\x1b[34m'
    white_background = '\x1b[107m'

    msg = white_background + bright_style + red_text
    msg += 'DeprecationWarning: This tool will be deprecated in future. '
    msg += blue_text + 'Welcome to use the unified model deployment toolbox '
    msg += 'MMDeploy: https://github.com/open-mmlab/mmdeploy'
    msg += reset_style
    warnings.warn(msg)

           

2.得到onnx模型後我們轉rknn,代碼如下

from rknn.api import RKNN
 
# ONNX_MODEL = 'hrnet_w32_macaque_256x192-f7e9e04f_20210407.onnx'
ONNX_MODEL = 'hrnet.onnx'
# ONNX_MODEL = 'action.onnx'
# RKNN_MODEL = 'hrnet_w32_macaque_256x192-f7e9e04f_20230208.rknn'
RKNN_MODEL = 'hrnet.rknn'
 
if __name__ == '__main__':
 
    # Create RKNN object
    rknn = RKNN(verbose=True)
    # mean = [0.485, 0.456, 0.406]
    # std= [0.229, 0.224, 0.225]
    mean = [123.675, 116.28, 103.53]
    std= [58.395, 57.12, 57.375]
    # pre-process config
    print('--> config model')
    # rknn.config(mean_values=[[0,0, 0]], std_values=[[255 , 255 , 255]], reorder_channel='0 1 2',
    #             target_platform='rk3399pro',
    #             quantized_dtype='asymmetric_affine-u8', optimization_level=3, output_optimize=1)
    # rknn.config(mean_values=[mean], std_values=[std],target_platform='rk3588',quantized_algorithm='normal',quant_img_RGB2BGR=True,
    #             quantized_dtype='asymmetric_quantized-8', optimization_level=3)
    #imagenet資料集的均值和方差(三分量順序是RGB)
    #對于img0 得到的是BGR格式 ;shape為(H,W,C)但是我們需要的圖檔往往是RGB,shape為(H,W,C),是以很自然需要轉換
    # rknn.config(target_platform='rk3588',quantized_algorithm='normal',mean_values=[mean], std_values=[std],quant_img_RGB2BGR=True,
    #             quantized_dtype='asymmetric_quantized-8', optimization_level=3)
    rknn.config(target_platform='rk3588',quantized_algorithm='normal',mean_values=[mean], std_values=[std],
                quantized_dtype='asymmetric_quantized-8', optimization_level=3)
    print('done')
 
    print('--> Loading model')
    ret = rknn.load_onnx(model=ONNX_MODEL)
    if ret != 0:
        print('Load model  failed!')
        exit(ret)
    print('done')
 
    # Build model
    print('--> Building model')
    ret = rknn.build(do_quantization=True, dataset='dataset.txt')  # ,pre_compile=True
    if ret != 0:
        print('Build failed!')
        exit(ret)
    print('done')
    # Export rknn model
    print('--> Export RKNN model')
    ret = rknn.export_rknn(RKNN_MODEL)
    if ret != 0:
        print('Export hrnet_w32_macaque_256x192-f7e9e04f_20210407_sim.rknn failed!')
        exit(ret)
    print('done')
 
    rknn.release()

           

3.最後用rknn進行推理産生結果

import os
import urllib
import traceback
import time
import sys
import warnings

import numpy as np
import cv2
import  torch
from mmcv.parallel import collate, scatter
from torchvision.transforms import functional as F
# from rknn.api import RKNN
from rknnlite.api import RKNNLite
import onnx



# RKNN_MODEL = "hrnet_w32_macaque_256x192-f7e9e04f_20230208.rknn"
RKNN_MODEL = "hrnet.rknn"
ONXX_MODEL = "hrnet.onnx"
IMG_PATH = "1.png"
mean = [0.485, 0.456, 0.406]
std= [0.229, 0.224, 0.225]
QUANTIZE_ON = True

def bbox_xywh2cs(bbox, aspect_ratio, padding=1., pixel_std=200.):
    """Transform the bbox format from (x,y,w,h) into (center, scale)

    Args:
        bbox (ndarray): Single bbox in (x, y, w, h)
        aspect_ratio (float): The expected bbox aspect ratio (w over h)
        padding (float): Bbox padding factor that will be multilied to scale.
            Default: 1.0
        pixel_std (float): The scale normalization factor. Default: 200.0

    Returns:
        tuple: A tuple containing center and scale.
        - np.ndarray[float32](2,): Center of the bbox (x, y).
        - np.ndarray[float32](2,): Scale of the bbox w & h.
    """

    x, y, w, h = bbox[:4]
    center = np.array([x + w * 0.5, y + h * 0.5], dtype=np.float32)

    if w > aspect_ratio * h:
        h = w * 1.0 / aspect_ratio
    elif w < aspect_ratio * h:
        w = h * aspect_ratio

    scale = np.array([w, h], dtype=np.float32) / pixel_std
    scale = scale * padding

    return center, scale
def rotate_point(pt, angle_rad):
    """Rotate a point by an angle.

    Args:
        pt (list[float]): 2 dimensional point to be rotated
        angle_rad (float): rotation angle by radian

    Returns:
        list[float]: Rotated point.
    """
    assert len(pt) == 2
    sn, cs = np.sin(angle_rad), np.cos(angle_rad)
    new_x = pt[0] * cs - pt[1] * sn
    new_y = pt[0] * sn + pt[1] * cs
    rotated_pt = [new_x, new_y]

    return rotated_pt
def _get_3rd_point(a, b):
    """To calculate the affine matrix, three pairs of points are required. This
    function is used to get the 3rd point, given 2D points a & b.

    The 3rd point is defined by rotating vector `a - b` by 90 degrees
    anticlockwise, using b as the rotation center.

    Args:
        a (np.ndarray): point(x,y)
        b (np.ndarray): point(x,y)

    Returns:
        np.ndarray: The 3rd point.
    """
    assert len(a) == 2
    assert len(b) == 2
    direction = a - b
    third_pt = b + np.array([-direction[1], direction[0]], dtype=np.float32)

    return third_pt
def get_affine_transform(center,
                         scale,
                         rot,
                         output_size,
                         shift=(0., 0.),
                         inv=False):
    """Get the affine transform matrix, given the center/scale/rot/output_size.

    Args:
        center (np.ndarray[2, ]): Center of the bounding box (x, y).
        scale (np.ndarray[2, ]): Scale of the bounding box
            wrt [width, height].
        rot (float): Rotation angle (degree).
        output_size (np.ndarray[2, ] | list(2,)): Size of the
            destination heatmaps.
        shift (0-100%): Shift translation ratio wrt the width/height.
            Default (0., 0.).
        inv (bool): Option to inverse the affine transform direction.
            (inv=False: src->dst or inv=True: dst->src)

    Returns:
        np.ndarray: The transform matrix.
    """
    assert len(center) == 2
    assert len(scale) == 2
    assert len(output_size) == 2
    assert len(shift) == 2

    # pixel_std is 200.
    scale_tmp = scale * 200.0

    shift = np.array(shift)
    src_w = scale_tmp[0]
    dst_w = output_size[0]
    dst_h = output_size[1]

    rot_rad = np.pi * rot / 180
    src_dir = rotate_point([0., src_w * -0.5], rot_rad)
    dst_dir = np.array([0., dst_w * -0.5])

    src = np.zeros((3, 2), dtype=np.float32)
    src[0, :] = center + scale_tmp * shift
    src[1, :] = center + src_dir + scale_tmp * shift
    src[2, :] = _get_3rd_point(src[0, :], src[1, :])

    dst = np.zeros((3, 2), dtype=np.float32)
    dst[0, :] = [dst_w * 0.5, dst_h * 0.5]
    dst[1, :] = np.array([dst_w * 0.5, dst_h * 0.5]) + dst_dir
    dst[2, :] = _get_3rd_point(dst[0, :], dst[1, :])

    if inv:
        trans = cv2.getAffineTransform(np.float32(dst), np.float32(src))
    else:
        trans = cv2.getAffineTransform(np.float32(src), np.float32(dst))

    return trans
def bbox_xyxy2xywh(bbox_xyxy):
    """Transform the bbox format from x1y1x2y2 to xywh.

    Args:
        bbox_xyxy (np.ndarray): Bounding boxes (with scores), shaped (n, 4) or
            (n, 5). (left, top, right, bottom, [score])

    Returns:
        np.ndarray: Bounding boxes (with scores),
          shaped (n, 4) or (n, 5). (left, top, width, height, [score])
    """
    bbox_xywh = bbox_xyxy.copy()
    bbox_xywh[:, 2] = bbox_xywh[:, 2] - bbox_xywh[:, 0]
    bbox_xywh[:, 3] = bbox_xywh[:, 3] - bbox_xywh[:, 1]

    return bbox_xywh
def _get_max_preds(heatmaps):
    """Get keypoint predictions from score maps.

    Note:
        batch_size: N
        num_keypoints: K
        heatmap height: H
        heatmap width: W

    Args:
        heatmaps (np.ndarray[N, K, H, W]): model predicted heatmaps.

    Returns:
        tuple: A tuple containing aggregated results.

        - preds (np.ndarray[N, K, 2]): Predicted keypoint location.
        - maxvals (np.ndarray[N, K, 1]): Scores (confidence) of the keypoints.
    """
    assert isinstance(heatmaps,
                      np.ndarray), ('heatmaps should be numpy.ndarray')
    assert heatmaps.ndim == 4, 'batch_images should be 4-ndim'

    N, K, _, W = heatmaps.shape
    heatmaps_reshaped = heatmaps.reshape((N, K, -1))
    idx = np.argmax(heatmaps_reshaped, 2).reshape((N, K, 1))
    maxvals = np.amax(heatmaps_reshaped, 2).reshape((N, K, 1))

    preds = np.tile(idx, (1, 1, 2)).astype(np.float32)
    preds[:, :, 0] = preds[:, :, 0] % W
    preds[:, :, 1] = preds[:, :, 1] // W

    preds = np.where(np.tile(maxvals, (1, 1, 2)) > 0.0, preds, -1)
    return preds, maxvals
def transform_preds(coords, center, scale, output_size, use_udp=False):
    """Get final keypoint predictions from heatmaps and apply scaling and
    translation to map them back to the image.

    Note:
        num_keypoints: K

    Args:
        coords (np.ndarray[K, ndims]):

            * If ndims=2, corrds are predicted keypoint location.
            * If ndims=4, corrds are composed of (x, y, scores, tags)
            * If ndims=5, corrds are composed of (x, y, scores, tags,
              flipped_tags)

        center (np.ndarray[2, ]): Center of the bounding box (x, y).
        scale (np.ndarray[2, ]): Scale of the bounding box
            wrt [width, height].
        output_size (np.ndarray[2, ] | list(2,)): Size of the
            destination heatmaps.
        use_udp (bool): Use unbiased data processing

    Returns:
        np.ndarray: Predicted coordinates in the images.
    """
    assert coords.shape[1] in (2, 4, 5)
    assert len(center) == 2
    assert len(scale) == 2
    assert len(output_size) == 2

    # Recover the scale which is normalized by a factor of 200.
    scale = scale * 200.0

    if use_udp:
        scale_x = scale[0] / (output_size[0] - 1.0)
        scale_y = scale[1] / (output_size[1] - 1.0)
    else:
        scale_x = scale[0] / output_size[0]
        scale_y = scale[1] / output_size[1]

    target_coords = np.ones_like(coords)
    target_coords[:, 0] = coords[:, 0] * scale_x + center[0] - scale[0] * 0.5
    target_coords[:, 1] = coords[:, 1] * scale_y + center[1] - scale[1] * 0.5

    return target_coords
def keypoints_from_heatmaps(heatmaps,
                            center,
                            scale,
                            unbiased=False,
                            post_process='default',
                            kernel=11,
                            valid_radius_factor=0.0546875,
                            use_udp=False,
                            target_type='GaussianHeatmap'):
    
    # Avoid being affected
    heatmaps = heatmaps.copy()

    N, K, H, W = heatmaps.shape
    preds, maxvals = _get_max_preds(heatmaps)
    # add +/-0.25 shift to the predicted locations for higher acc.
    for n in range(N):
        for k in range(K):
            heatmap = heatmaps[n][k]
            px = int(preds[n][k][0])
            py = int(preds[n][k][1])
            if 1 < px < W - 1 and 1 < py < H - 1:
                diff = np.array([
                    heatmap[py][px + 1] - heatmap[py][px - 1],
                    heatmap[py + 1][px] - heatmap[py - 1][px]
                ])
                preds[n][k] += np.sign(diff) * .25
                if post_process == 'megvii':
                    preds[n][k] += 0.5

    # Transform back to the image
    for i in range(N):
        preds[i] = transform_preds(
            preds[i], center[i], scale[i], [W, H], use_udp=use_udp)

    if post_process == 'megvii':
        maxvals = maxvals / 255.0 + 0.5

    return preds, maxvals

def decode(output,center,scale,score_,batch_size = 1):


    c = np.zeros((batch_size, 2), dtype=np.float32)
    s = np.zeros((batch_size, 2), dtype=np.float32)
    score = np.ones(batch_size)
    for i in range(batch_size):
        c[i, :] = center
        s[i, :] = scale

        score[i] = np.array(score_).reshape(-1)
	  

    preds, maxvals = keypoints_from_heatmaps(
	    output,
	    c,
	    s,
       False,
        'default',
        11,
        0.0546875,
        False,
        'GaussianHeatmap'
        )

    all_preds = np.zeros((batch_size, preds.shape[1], 3), dtype=np.float32)
    all_boxes = np.zeros((batch_size, 6), dtype=np.float32)
    all_preds[:, :, 0:2] = preds[:, :, 0:2]
    all_preds[:, :, 2:3] = maxvals
    all_boxes[:, 0:2] = c[:, 0:2]
    all_boxes[:, 2:4] = s[:, 0:2]
    all_boxes[:, 4] = np.prod(s * 200.0, axis=1)
    all_boxes[:, 5] = score
    result = {}

    result['preds'] = all_preds
    result['boxes'] = all_boxes

    print(result)
    return result
def draw(bgr,predict_dict,skeleton):
    bboxes = predict_dict["boxes"]
    for box in bboxes:
        cv2.rectangle(bgr, (int(box[0]), int(box[1])), (int(box[0]) + int(box[2]), int(box[1]) + int(box[3])),(255, 0, 0))

    all_preds = predict_dict["preds"]
    for all_pred in all_preds:
        for x,y,s in all_pred:
            cv2.circle(bgr,(int(x), int(y)), 3,(0, 255, 120), -1)
        for sk in skeleton:
            x0=  int(all_pred[sk[0]][0])
            y0 = int(all_pred[sk[0]][1])
            x1 = int(all_pred[sk[1]][0])
            y1 = int(all_pred[sk[1]][1])
            cv2.line(bgr, (x0, y0), (x1, y1),(0, 255, 0), 1)
    cv2.imwrite("result.jpg",bgr)

if __name__ == "__main__":

    # Create RKNN object
    # rknn = RKNN()
    rknn = RKNNLite()

    if not os.path.exists(RKNN_MODEL):
        print("model not exist")
        exit(-1)

    # Load ONNX model
    print("--> Loading model")
    ret = rknn.load_rknn(RKNN_MODEL)
    if ret != 0:
        print("Load rknn model failed!")
        exit(ret)
    print("done")

    # init runtime environment
    print("--> Init runtime environment")
    ret = rknn.init_runtime()
    if ret != 0:
        print("Init runtime environment failed")
        exit(ret)
    print("done")

    # bbox=[2.213932e+02, 1.935179e+02, 9.873443e+02-2.213932e+02, 1.035825e+03-1.935179e+02,9.995332e-01] 
    bbox=[0,0,400,631,0.99]
    image_size=[192,256]
    src_img = cv2.imread(IMG_PATH)
    # img = src_img
    img = cv2.cvtColor(src_img, cv2.COLOR_BGR2RGB)  # hwc rgb
    aspect_ratio = image_size[0] / image_size[1]
    img_height = img.shape[0]
    img_width = img.shape[1]
    padding=1.25
    pixel_std=200
    center, scale = bbox_xywh2cs(
        bbox,
        aspect_ratio,
        padding,
        pixel_std)
    trans = get_affine_transform(center, scale, 0, image_size)
    img = cv2.warpAffine(#旋轉後加入了黑邊 最後生成的點的坐标也要對齊
        img,
        trans, (int(image_size[0]), int(image_size[1])),
        flags=cv2.INTER_LINEAR)
    print(trans)
    img = np.transpose(img, (2, 0, 1)).astype(np.float32)  # chw rgb
    # outputs = rknn.inference(inputs=[img], data_type=None, data_format="nchw")[0]
    # img[0, ...] = ((img[0, ...] / 255.0) - 0.485) / 0.229
    # img[1, ...] = ((img[1, ...] / 255.0) - 0.456) / 0.224
    # img[2, ...] = ((img[2, ...] / 255.0) - 0.406) / 0.225
    img = np.transpose(img, (1, 2, 0)).astype(np.float32)  # chw rgb
    # img = img.reshape(1,256,192,3)
    # Inference
    print("--> Running model")
    start = time.clock()
    outputs= rknn.inference(inputs=[img])[0]
    end = time.clock()
    # 計算運作時間
    runTime = end - start
    runTime_ms = runTime * 1000
    # 輸出運作時間
    print("運作時間:", runTime_ms, "毫秒")
    print(outputs)
    predict_dict=decode(outputs,center,scale,bbox[-1])
    skeleton = [[15, 13],[13, 11], [16, 14],[14, 12],[11, 12], [5, 11], [6, 12], [5, 6],[5, 7], [6, 8], [7, 9], [8, 10],[1, 2], [0, 1], [0, 2], [1, 3],[2, 4], [3, 5], [4, 6]]
    draw(src_img,predict_dict,skeleton)
    # rknn.release()

           

結果如下

rknn量化hrnet流程【人體骨架點檢測】

繼續閱讀