天天看点

用TensorFlow实现带Batch-Normalization(BN)的MINIST数据集的识别具体实现

本代码的理论部分参考我的另一篇博客: https://blog.csdn.net/weixin_43551972/article/details/102624175.

要实现BN,主要用到以下两个API:

1.tf.nn.moments()

# 用于在指定维度计算均值与方差
tf.nn.moments(
   x,
   axes,
   shift=None,	# pylint: disable=unused-argument
   name=None,
   keep_dims=False)
           

参数:

  • x:一个Tensor,可以理解为我们输出的数据,形如 [batchsize, height, width, kernels]。
  • axes:整数数组,用于指定计算均值和方差的轴。如果x是1-D向量且axes=[0] 那么该函数就是计算整个向量的均值与方差。
  • shift:未在当前实现中使用。
  • name:用于计算moment的操作范围的名称。
  • keep_dims:产生与输入具有相同维度的moment,通俗点说就是是否保持维度

2.tf.nn.batch_normalization()

tf.nn.batch_normalization(
    x,
    mean,
    variance,
    offset,
    scale,
    variance_epsilon,
    name=None
)
           
  • x是input输入样本
  • mean是样本均值
  • variance是样本方差
  • offset是样本偏移(相加一个转化值)
  • scale是缩放(默认为1)
  • variance_epsilon是为了避免分母为0,添加的一个极小值

具体实现

1.数据准备:

import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets("tensorflow_application/MNIST_data/", one_hot=True)
           

2.因为BN在全连接层和卷积层的均值和方差计算方式不一样,我写了两个BN层代码。

全连接层:

def batch_norm_full(prev_layer, num_units):
    """
    在我看来,求取training 和 inference 的mean和var的值的方法是一样的
    num_units参数传递该层神经元的数量,根据prev_layer参数传入值作为该层输入创建全连接神经网络。

   :param prev_layer: Tensor
        该层神经元输入
    :param num_units: int
        输出神经元的个数
    :returns Tensor
        一个新的归一化后的层
    """
    # gamma一般初始化为1
    gamma = tf.Variable(tf.ones([num_units]))
    # beta一般初始化为0
    beta = tf.Variable(tf.zeros([num_units]))

    epsilon = 1e-3

    batch_mean, batch_variance = tf.nn.moments(prev_layer, [0])

    # 采用滑动平均法计算新的均值和方差
    ema = tf.train.ExponentialMovingAverage(decay=0.99)# 滑动平均的衰减系数

    def mean_var_with_update():
        ema_apply_op = ema.apply([batch_mean, batch_variance])
        with tf.control_dependencies([ema_apply_op]):
            return tf.identity(batch_mean), tf.identity(batch_variance)

    mean, var = mean_var_with_update()
    with tf.control_dependencies([mean, var]):
        batch_normalized_output = tf.nn.batch_normalization(prev_layer, mean, var, beta, gamma, epsilon)
    return batch_normalized_output
           

卷积层:

def batch_norm_conv(prev_layer, out_channels, is_training):
    """
    使用给定的参数作为输入创建卷积层
    :param prev_layer: Tensor
        传入该层神经元作为输入
     :param out_channels: int
        输出神经元的个数
    :param is_training: bool or Tensor
        表示该网络当前是否正在训练,告知Batch Normalization层是否应该更新或者使用均值或方差的分布信息
    :returns Tensor
        一个新的归一化后的层
    """
    gamma = tf.Variable(tf.ones([out_channels]))
    beta = tf.Variable(tf.zeros([out_channels]))

    pop_mean = tf.Variable(tf.zeros([out_channels]), trainable=False)
    pop_variance = tf.Variable(tf.ones([out_channels]), trainable=False)

    epsilon = 1e-3
    axis = list(range(len(prev_layer.get_shape())-1))

    def batch_norm_training():
        # 一定要使用正确的维度确保计算的是每个特征图上的平均值和方差而不是整个网络节点上的统计分布值
        batch_mean, batch_variance = tf.nn.moments(prev_layer, axis, keep_dims=False)

        # 采用滑动平均法计算新的均值和方差
        ema = tf.train.ExponentialMovingAverage(decay=0.99)  # 滑动平均的衰减系数

        def mean_var_with_update():
            ema_apply_op = ema.apply([batch_mean, batch_variance])
            with tf.control_dependencies([ema_apply_op]):
                return tf.identity(batch_mean), tf.identity(batch_variance)

        mean, var = mean_var_with_update()
        with tf.control_dependencies([mean, var]):
            return tf.nn.batch_normalization(prev_layer, mean, var, beta, gamma, epsilon)

    def batch_norm_inference():
        #在这里我们inference时采用的均值和方差应该是所有的输入的数据的均值和方差,
        # 但是为了简便,这里用了均值为0,方差为1
        return tf.nn.batch_normalization(prev_layer, pop_mean, pop_variance, beta, gamma, epsilon)

    batch_normalized_output = tf.cond(is_training, batch_norm_training, batch_norm_inference)

    return batch_normalized_output
           

3.training and inference

def train_inference(num_batches, batch_size, learning_rate):
    # 创建输入样本和标签的占位符
    inputs = tf.placeholder(tf.float32, [None, 784])
    labels = tf.placeholder(tf.float32, [None, 10])
    # 创建占位符表明当前是否正在训练模型
    is_training = tf.placeholder(tf.bool)
    # 定义权重函数
    def weight_variable(shape):
        initial = tf.truncated_normal(shape, stddev=0.1)
        return tf.Variable(initial)

    # 定义偏置项
    def bias_variable(shape):
        initial = tf.constant(0.1, shape=shape)
        return tf.Variable(initial)

    # 定义卷积核
    def conv2d(x, w):
        return tf.nn.conv2d(x, w, strides=[1, 1, 1, 1], padding="SAME")

    # 定义池化矩阵
    def max_pool_2x2(x):
        return tf.nn.max_pool(x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding="SAME")

    #将输入变成四维的,以至于能和第一层卷积层相卷积
    x_image = tf.reshape(inputs, [-1, 28, 28, 1])
    # 设置第一层的初始权重和偏置
    w_conv1 = weight_variable([5, 5, 1, 32])
    b_conv1 = bias_variable([32])
    # 第一层的卷积输出
    h_conv1 = tf.nn.relu(tf.add(conv2d(x_image, w_conv1), b_conv1))
    # 第一层的池化输出
    h_pool1 = max_pool_2x2(h_conv1)

    # 设置第二层的初始权重和偏置
    w_conv2 = weight_variable([5, 5, 32, 64])
    b_conv2 = bias_variable([64])
    # 第二层的卷积输出,注意使用Batch Normalization
    h_conv2 = tf.nn.relu(batch_norm_conv(tf.add(conv2d(h_pool1, w_conv2), b_conv2), 64, is_training))
    # 第二层的池化输出
    h_pool2 = max_pool_2x2(h_conv2)

    # 设置全连接层1的初始权重和偏置
    w_fc1 = weight_variable([7 * 7 * 64, 1024])
    b_fc1 = bias_variable([1024])

    # 将第二层的池化输出压缩为一维向量,用于全连接层1的输入,输入时注意BN
    h_pool2_flat = tf.reshape(h_pool2, [-1, 7 * 7 * 64])

    # 将第二层的池化输出输入到一个全连接层,用于全连接层1的输入,输入时注意BN
    h_fc1 = tf.nn.relu(batch_norm_full(tf.add(tf.matmul(h_pool2_flat, w_fc1), b_fc1), 1024))

    # 设置dropout
    keep_prob = tf.placeholder(tf.float32)
    h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)

    # 全连接层2的输出,注意BN
    layer2 = tf.layers.dense(h_fc1_drop, 10, activation=None)
    logits = batch_norm_full(layer2, 10)

    # 定义loss函数和训练操作
    model_loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=logits, labels=labels))

    train_opt = tf.train.AdamOptimizer(learning_rate).minimize(model_loss)

    # Create operations to test accuracy
    # 创建计算准确度的操作
    correct_prediction = tf.equal(tf.argmax(logits, 1), tf.argmax(labels, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

    # Train and test the network
    # 训练并测试网络模型
    with tf.Session() as sess:
        sess.run(tf.global_variables_initializer())
        for batch_i in range(num_batches):
            batch_xs, batch_ys = mnist.train.next_batch(batch_size)

            # train this batch
            # 训练样本批次
            sess.run(train_opt, {inputs: batch_xs, labels: batch_ys, is_training: True, keep_prob: 0.5})

            # Periodically check the validation or training loss and accuracy
            # 定期检查训练或验证集上的loss和精确度
            if batch_i % 100 == 0:
                loss, acc = sess.run([model_loss, accuracy], {inputs: mnist.validation.images,
                                                              labels: mnist.validation.labels,
                                                              is_training: False,
                                                              keep_prob: 0.5})
                print(
                    'Batch: {:>2}: Validation loss: {:>3.5f}, Validation accuracy: {:>3.5f}'.format(batch_i, loss, acc))
            elif batch_i % 25 == 0:
                loss, acc = sess.run([model_loss, accuracy], {inputs: batch_xs, labels: batch_ys, is_training: False,
                                                              keep_prob: 0.5})
                print('Batch: {:>2}: Training loss: {:>3.5f}, Training accuracy: {:>3.5f}'.format(batch_i, loss, acc))

        # At the end, score the final accuracy for both the validation and test sets
        # 最后在验证集和测试集上对模型准确率进行评分
        acc = sess.run(accuracy, {inputs: mnist.validation.images,
                                  labels: mnist.validation.labels,
                                  is_training: False,
                                  keep_prob: 0.5})
        print('Final validation accuracy: {:>3.5f}'.format(acc))
        acc = sess.run(accuracy, {inputs: mnist.test.images,
                                  labels: mnist.test.labels,
                                  is_training: False,
                                  keep_prob: 0.5})
        print('Final test accuracy: {:>3.5f}'.format(acc))

        # Score the first 100 test images individually, just to make sure batch normalization really worked
        # 对100个独立的测试图片进行评分,对比验证Batch Normalization的效果
        correct = 0
        for i in range(100):
            correct += sess.run(accuracy, feed_dict={inputs: [mnist.test.images[i]],
                                                     labels: [mnist.test.labels[i]],
                                                     is_training: False,
                                                     keep_prob: 0.5})

        print("Accuracy on 100 samples:", correct / 100)
           

4.start:

num_batches = 800  # 迭代次数
batch_size = 64  # 批处理数量
learning_rate = 0.002  # 学习率

tf.reset_default_graph()

with tf.Graph().as_default():
    train_inference(num_batches, batch_size, learning_rate)
           

5.结果展示:

可以看到,训练收敛的很快,第75次基本就收敛了,而且运行速度也提升了。

用TensorFlow实现带Batch-Normalization(BN)的MINIST数据集的识别具体实现
用TensorFlow实现带Batch-Normalization(BN)的MINIST数据集的识别具体实现