天天看点

18-TFRecord 数据格式化存储工具(CDBmax 数据国度)

一、写在前面

  这篇是拖了很久才正式写的一篇,也算是工程化的应用中比较重要的一个部分,所以在这里做一个简要的分享。TFRecord是TensorFlow官方推荐使用的数据格式化存储工具,可以很大程度上提高TensorFlow训练过程中的IO效率。我们之前在做一些简单的训练的时候都是使用文本存储的方式,比如之前所作的花卉分类使用的两万张图片存储之文本文件中大约耗费了70个G的存储空间,而且一次性读入这么大的数据量很显然是一个不现实的事情,就算数据部分成功读入之后,模型所占内存空间也许将无法满足,就容易出现IOError的状况,而出现了这种问题解决得办法不外乎两种方法,一种是加装内存条或者是加装同型号显卡,另外一种就是使用一种合理的IO读写的方式,来降低内存或者显存得开销。首先第一种方法需要考虑的问题就比较多了,它不仅与个人的财力挂钩,还与机器可拓展空间和真实空间挂钩,所以学习一种优化存储空间的方式就显得尤为重要。

二、相关原理

  1.TFRecord相对优于一般方法的原因主要有以下几点:1.TFRecord内部采用“Protocol Buffer”的二进制数据编码方案,其单个字节相对于汉字的“utf-8”编码所占用的字节数要少的多,因而在生成一次TFRecord之后,模型训练过程中的数据读取、加工处理的效率都会有很大的提升空间。2.利用了Threading(线程)和Queues(队列)从TFRecord中分批次读取数据,这种方法可以实现一边读取数据至队列,然后一边使用队首数据训练模型的目的。这样就起到降低了内存空间的高占用率的作用。

  2.简要介绍一下本篇博客的逻辑和任务,本文主要编写针对一个简单的分类数据集通过TFRecord的文件存储方式存取至磁盘上,然后通过相应读写方法,读取TFRecord文件,并做简单的训练来熟悉整个流程。

三、相关代码

  1.使用TFRecord的时候,数据单位一般是tf.train.Example或者是tf.train.SequenceExample,Example一般是用于处理数值、图像大小固定的数据,可使用该方法指定各特征数值的名称和数据类型。

示例1:

tf.train.Example(features=tf.train.Features(feature={
    'batch':tf.train.Features(int64_list=tf.train.Int64List(value=[batch])),
    'height':tf.train.Features(int64_list=tf.train.Int64List(value=[height])),
    'weight':tf.train.Features(int64_list=tf.train.Int64List(values=[weight])),
    'channels':tf.train.Features(int64_list=tf.train.Int64List(values=[channels]))
}))
           

  该代码块中int64_list(整数列表)可替换为BytesList(字符串列表)和FloatList(实数列表),也就是说Features支持存储如上三种类型的数据。

  SequenceExample一般是用于处理文本、时间序列没有固定长度的数据,该部分在NLP中是比较常用的一种数据存储方式。

示例2:

#定义对象
example=tf.train.SequenceExample()
#定义数据类型和数据量大小
example.context.feature['length'].int64_list.value.append(len(words))
#通过feature_lists来加载数据
word_lists=example.feature_lists.feature_list['word']
for word in words:
    word_lists.feature_add().int64_list.value.append(word_id(word))
           

  2.写入数据至文件

  首先创建一个协议内存块(Protocol Buffer),该协议内存块中将用于存放特征属性[features]。

#1.TFRecord 写入
#定义Protocol Buffer(协议缓冲器)
def examples(image,label):
    return tf.train.Example(features=tf.train.Features(feature={
        'data_X':tf.train.Feature(bytes_list=tf.train.BytesList(value=[image])),
        'data_Y':tf.train.Feature(bytes_list=tf.train.BytesList(value=[label])),
    }))
           

  然后将获取到的数据填入到Example内存协议块中,再将协议内存块序列化为一个字符串并且通过

tf.python_io.TFRecordWriter()

方法写入至TFRecords文件。

#IO写入
def writer(train_file,test_file):
    #读取data_X,data_Y数据集
    train_X,train_Y,test_X,test_Y=divide_data()
    #定义写train_file IO对象
    writer1=tf.python_io.TFRecordWriter(train_file)
    for data_X,data_Y in zip(train_X,train_Y):
        #print(data_X,'*************')
        #print(data_Y,'*************')
        #转换数据格式
        print(data_X.shape)
        print(data_Y.shape)
        data_X=data_X.astype(np.float32)
        data_Y=data_Y.astype(np.float32)

        mk_em=examples(data_X.tobytes(),data_Y.tobytes())
        writer1.write(mk_em.SerializeToString())
    writer1.close()
    writer2=tf.python_io.TFRecordWriter(test_file)
    for data_X,data_Y in zip(test_X,test_Y):
        data_X=data_X.astype(np.float32)
        data_Y=data_Y.astype(np.float32)

        mk_em=examples(data_X.tobytes(),data_Y.tobytes())
        writer2.write(mk_em.SerializeToString())
    writer2.close()

if __name__=='__main__':
    writer('train_data.tfrecord','test_data.tfrecord')
           

  运行完整文件之后,将可以看到在同级目录下面会生成两个tfRecord文件,一个名为’train_data.tfrecord’,该文件用于模型训练,另一个名为’test_data.tfrecord’,该文件作为测试集而存在。

  3.从tfRecord文件中读取数据

  首先获取队列,并对队列中的内存协议块进行读取和解码,然后将转换之后的数据组合成一个batch的数据,传入至模型。用于优化模型参数。代码块如下所示:

#读取batch_size条数据
def read_tfrecord(filename,batch_size=256):
    #获取队列
    filename_queue=tf.train.string_input_producer([filename])
    #构建数据读取器
    reader=tf.TFRecordReader()
    #读取队列中的数据
    _,serializer_example=reader.read(filename_queue)

    #处理样本
    features=tf.parse_single_example(
        serializer_example,
        features={
            'data_X':tf.FixedLenFeature([],tf.string),
            'data_Y':tf.FixedLenFeature([],tf.string)
        }
    )
    #读取特征
    data_X=tf.decode_raw(features['data_X'],tf.float32)
    data_Y=tf.decode_raw(features['data_Y'],tf.float32)
    
    #格式重定
    data_X=tf.reshape(data_X,[8])
    data_Y=tf.reshape(data_Y,[5])

    #转换为批次的Tensor对象 capacity表示队列元素中的最大数量
    data_X,data_Y=tf.train.batch([data_X,data_Y],batch_size=batch_size,capacity=3500)

    return data_X,data_Y
           

  4.构建DNN网络模型

  这里构建了三隐藏层、一输出层、一BN层以及两层dropout层的DNN网络模型结构,非线性变换中使用sigmoid函数对数据进行非线性变换的处理,具体代码如下:

#build model
def model(data_X,data_Y):
    #tf.reset_default_graph()
    #搭建全连接网络
    hidden_1=1024
    hidden_2=256
    hidden_3=32
    print(data_X.get_shape())
    input_m=data_X.get_shape()[1]
    output_m=data_Y.get_shape()[1]
    learning_rate=0.001
    global_step = tf.Variable(0, name='global_step', trainable=False)
    #构建初始化w,b
    with tf.variable_scope('scope',reuse = tf.AUTO_REUSE):
        weights={
            "w1":tf.Variable(tf.get_variable('w1',[input_m,hidden_1],dtype=tf.float32,initializer=tf.random_normal_initializer(mean=0, stddev=0.1))),
            "w2":tf.Variable(tf.random_normal([hidden_1,hidden_2],stddev=0.1),tf.float32,name='w2'),
            "w3":tf.Variable(tf.random_normal([hidden_2,hidden_3],stddev=0.1),tf.float32,name='w3'),
            "out":tf.Variable(tf.get_variable('out',[hidden_3,output_m],dtype=tf.float32,initializer=tf.random_normal_initializer(mean=0, stddev=0.1)))
        }
        b={
            'b1':tf.Variable(tf.zeros([hidden_1]),tf.float32,name='b1'),
            'b2':tf.Variable(tf.zeros([hidden_2]),tf.float32,name='b2'),
            'b3':tf.Variable(tf.zeros([hidden_3]),tf.float32,name='b3'),
            'b4':tf.Variable(tf.zeros([output_m]),tf.float32,name='b4')
        }
    layer0=batch_norm(data_X,decay=0.9,updates_collections=None,is_training=True)
    layer1=tf.nn.sigmoid(tf.add(tf.matmul(layer0,weights['w1']),b['b1']))
    dropout1=tf.nn.dropout(layer1,keep_prob=0.75)
    layer2=tf.nn.sigmoid(tf.add(tf.matmul(dropout1,weights['w2']),b['b2']))
    dropout2=tf.nn.dropout(layer2,keep_prob=0.75)
    layer3=tf.nn.sigmoid(tf.add(tf.matmul(dropout2,weights['w3']),b['b3']))
    out=tf.add(tf.matmul(layer3,weights['out']),b['b4'])
    #获取softmax的分类概率值
    predict=tf.nn.softmax(out,name='output')
    #计算交叉熵损失函数
    loss=tf.reduce_mean(-tf.reduce_sum(data_Y*tf.log(predict),axis=1))
    
    #使用梯度下降求解,最小化误差
    #train=tf.train.GradientDescentOptimizer(learning_rate=learning_rate).minimize(loss)
    train=tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(loss)
    #计算正确率
    equal=tf.equal(tf.argmax(predict,axis=1),tf.argmax(data_Y,axis=1))
    correct_rate=tf.reduce_mean(tf.cast(equal,tf.float32))
    return train,loss,correct_rate,global_step
           

  5.分批次读取数据并训练DNN模型

#train
def train():
    checkpoint_dir='./model'
    save_time=500
    training_epochs=100000
    display_time=5
    # 读取训练集 TFRecord文件Tensor对象
    train_X, train_Y = read_tfrecord('train_data.tfrecord')

    #构建返回的训练器
    train,loss,correct_rate,global_step=model(train_X,train_Y)
    # 读取测试集 TFRecord文件Tensor对象
    test_X,test_Y=read_tfrecord('test_data.tfrecord',batch_size=1000)
    _,loss_test,correct_rate_test,_=model(test_X,test_Y)
    saver=tf.train.Saver(max_to_keep=2)
    with tf.Session(config=tf.ConfigProto(log_device_placement=False,allow_soft_placement=True)) as sess:
        sess.run(tf.global_variables_initializer())
        ckpt=None
        if True:
            #加载模型继续训练
            ckpt=tf.train.latest_checkpoint(checkpoint_dir)
            if ckpt:
                print("load model …………")
                saver.restore(sess,ckpt)
        #开启线程
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess=sess, coord=coord)
        #训练
        for epoch in range(sess.run(global_step),training_epochs):
            _,loss_,correct_rate_=sess.run([train,loss,correct_rate])
            if (epoch+1)%display_time==0:
                print('step:{},loss:{},correct_rate:{}'.format(epoch+1,loss_,correct_rate_))
                loss_test_,correct_rate_test_=sess.run([loss_test,correct_rate_test])
                print('testing:loss:{},correct_rate:{}'.format(loss_test_,correct_rate_test_))
            sess.run(tf.assign(global_step, epoch + 1))
            if (epoch+1)%save_time==0:
                print('save model …………')
                saver.save(sess,'./model/model.ckpt',global_step=global_step)
            
        coord.request_stop()
        coord.join(threads)
        
if __name__=='__main__':
    train()
           

三、广而告之

18-TFRecord 数据格式化存储工具(CDBmax 数据国度)

当你在进行数据统计分析,模型建立遇到困难的时候,那么请点开这个链接吧:

https://shop163287636.taobao.com/?spm=a230r.7195193.1997079397.2.b79b4e98VwGtpt

四、总结

  1.使用该方法是具有一定局限性的,因为其仅可以顺序从tfRecord文件中读取,所以需要对样本的数据量有一定的要求,这个要求当然是数据量越大越好,不然模型很容易过拟合。

  2.该部分完整代码见 https://download.csdn.net/download/qq_37972530/10887231

继续阅读