天天看点

自己动手实现深度学习框架-2 核心实现 - 自带buff

自己动手实现深度学习框架-2 核心实现

目标

        完成框架设计文档中列出的基础类和需要在基础类中实现的接口。使用最简的单多层感知机(Multi-Layer Perceptron)模型对框架进行初步验证, 因此, 除了框架的核心部分外, 还要实现一个全连接层,一个激活函数,一个优化器和一个损失函数。

框架代码简介

        我把这个框架命名为cute-dl, 已经上传到github上: https://github.com/brandonlyg/cute-dl.

        目录结构为:

        -- cutedl: 框架实现代码

        -- example: 示例

        -- test: 单元测试

        MLP示例位于 example/mlp目录下。

层(Layer)和层参数(LayerParam)

        相关代码在model.py中.

        LayerParam只有属性的定义, 没什么逻辑在里面:

lass LayerParam(object):

    \'\'\'
    layer_name: 所属层的的名字
    name: 参数名
    value: 参数值
    \'\'\'
    def __init__(self, layer_name, name, value):
        self.__name = layer_name+"/"+name
        self.value = value

        #梯度
        self.gradient = None
        #更新次数
        self.udt = 0

    @property
    def name(self):
        return self.__name

    def reset(self):
        self.gradient = None
        self.udt = 0
           

        其中参数名字是使用树形结构, 例如: "1-MyLayer/W", 是"1-MyLayer"层的"W"参数的名字。其中"1"是层在模型中的唯一ID, "MyLayer"是层的标签(tag), "W"是参数在这个层中的唯一名字。

        Layer需要实现两个方法: 一个是__init__方法,一个是join方法. 其他方法不需要实现,只需按设计文档中的描述给出定义即可。

        先来看看__init__方法:

\'\'\'
    outshape: 输出形状 2 或者 (2,3)
    kargs:
        activation: 激活函数的名字
        inshape: 输入形状
    \'\'\'
    def __init__(self, *outshape, **kargs)
        #输出形状
        if len(outshape) == 1 and type(outshape[0]) == type(()):
            self.__outshape = outshape[0]
        else:
            self.__outshape = outshape

        #输入形状
        self.__inshape = None

        #得到激活函数
        self.__activation = activations.get(\'linear\')

        #层在模型中的id, 是层在模型中的索引
        self.__id = 0
        #层的名字
        self.__name = \'/%d-%s\'%(self.__id, self.tag)

        #得到可选参数
        #print("Layer kargs:", kargs)
        if \'inshape\' in kargs:
            self.__inshape = kargs[\'inshape\']
            if type(self.__inshape) != type(()):
                self.__inshape = (self.__inshape,)
            #print("------inshape:", self.__inshape)

        if \'activation\' in kargs:
            self.__activation = activations.get(kargs[\'activation\'])


        if self.__inshape is not None:
            self.init_params()
           

        实现的时主要处理这么几个问题:

  1. 输入输出形状的处理. 保证输入输出形状不论是int还是tuple, 最后都要转换成tuple。
  2. 必须要有激活函数。默认使用的线性激活函数。
  3. 自动生成层的名字。

        join方法实现:

\'\'\'
    加入到模型中
    pre_layer: 前一个层
    *inshape: 输入形状
    \'\'\'
    def join(self, pre_layer, *inshape):
        if self.__outshape == (-1,):
            self.__inshape = pre_layer.inshape
            self.__outshape = pre_layer.outshape
        else:
            self.__inshape = pre_layer.outshape
            if len(inshape) != 0:
                self.__inshape = inshape

        self.__id = pre_layer.layer_id + 1
        self.__name = \'/%d-%s\'%(self.__id, self.tag)

        self.init_params()
           

        这个方法主要功能是把当前层和另一层连接在一起, 让另一个层成为当前层的(在模型中的)前一层。这里的"连接"主要体现在: 把另一个层的输出作为输入。对层ID的处理上, 使用简单的累加保证层ID在模型中是唯一的, 同时还能通过ID的值知道层位于模型中的什么位置。 有了输入输出形状, 就可以调用子类实现的init_params方法初始化参数了。

激活函数(Activation)

        激活函数代码在activation.py中。

        接口定义:

\'\'\'
激活函数
\'\'\'
class Activation(object):
    name=\'\'

    def __call__(self, in_batch):
        raise Exception("__call__ not implement")

    \'\'\'
    求梯度
    gradient: 该函数输出值的梯度
    \'\'\'
    def grad(self, gradient):
        raise Exception("gradient not implement")
           

        其中类属性name作为激活函数的名字。

        实现线性激活函数, 作为默认激活函数:

\'\'\'
线性激活函数, 没有激活
\'\'\'
class Linear(Activation):
    name=\'linear\'

    def __call__(self, in_batch):
        return in_batch

    def grad(self, gradient):
        return gradient
           

        实现最常用的relu激活函数:

\'\'\'
relu 激活函数
\'\'\'
class Relu(Activation):
    name=\'relu\'

    def __init__(self):
        self.__grad = None

    def __call__(self, in_batch):
        #得到 <= 0的数据的索引
        indices =  in_batch <= 0

        in_batch[indices] = 0
        self.__grad = indices

        return in_batch

    def grad(self, gradient):
        gradient[self.__grad] = 0
        self.__grad = None
        return gradient
           

        实现用名字(name)获取激活函数:

act_dict = {
    Linear.name: Linear,
    Relu.name: Relu
}

#创建激活函数
def get(name):
    #print(act_dict)
    #print(\'name:\', name)
    ACT = act_dict[name]
    return ACT()
           

模型(Model)

        首先需要向模型中添加层

\'\'\'
    layers: Layer list
    \'\'\'
    def __init__(self, layers=None):
        self.__layers = layers

    \'\'\'
    添加层
    layer: Layer类型的对象
    \'\'\'
    def add(self, layer):
        if self.__layers is None:
            self.__layers = []

        self.__layers.append(layer)

        return self
           

        __init__和add方法都能实现这个功能。

        然后是层的的访问能力:

\'\'\'
    得到一个Layer对象
    idx: Layer对象的索引
    \'\'\'
    def get_layer(self, index):
        self.__check()
        if len(self.__layers) <= index:
            raise Exception("index out of range %d"%len(self.__layers))

        return self.__layers[index]

    @property
    def layer_count(self):
        return len(self.__layers)

    \'\'\'
    得到层的迭代器
    \'\'\'
    def layer_iterator(self):
        self.__check()

        for ly in self.__layers:
            yield ly
           

        接下来是组装模型:

\'\'\'
    组装模型
    \'\'\'
    def assemble(self):
        self.__check()
        count = len(self.__layers)

        #输入层必须要有输入形状
        ly_0 = self.__layers[0]
        if ly_0.inshape is None or len(ly_0.inshape) == 0:
            raise Exception("input layer miss inshape")

        #把每一层的输入形状设置为上一层的输出形状,
        #设置输入形状的同时, 要求该层自动初始化参数(如果有参数的话)
        pre_ly = ly_0
        for ly in self.__layers[1:]:
            ly.join(pre_ly)
            pre_ly = ly
           

        向前传播:

\'\'\'
    使用模型预测
    in_batch: 一批输入数据
    \'\'\'
    def predict(self, in_batch, training=False):
        self.__check()

        out = in_batch
        for ly in self.__layers:
            out = ly.forward(out, training)

        return out
           

        反向传播:

\'\'\'
    反向传播梯度
    \'\'\'
    def backward(self, gradient):
        g = gradient
        #pdb.set_trace()
        count = len(self.__layers)
        for i in range(count-1, -1, -1):
            ly = self.__layers[i]
            g = ly.backward(g)
           

训练上下文会话(Session)

        Session代码在session.py中。

        初始化__init__:

\'\'\'
    model: Model对象
    loss: Loss对象
    optimizer: Optimizer对象
    \'\'\'
    def __init__(self, model, loss, optimizer):
        self.__model = model
        self.__loss = loss
        self.__optimizer = optimizer
           

会话主要维护模型, 损失函数和优化器。这些对一个简单的MLP模型来说已经足够,至于genoptimizer以后再添加。

        训练模型:

\'\'\'
    分批训练
    \'\'\'
    def batch_train(self, data, label):
        #使用模型预测
        out = self.__model.predict(data, training=True)
        #使用损失函数评估误差
        loss = self.__loss(out, label)
        grad = self.__loss.gradient
        #pdb.set_trace()
        #反向传播梯度
        self.__model.backward(self.__loss.gradient)

        #更新模型参数
        self.__optimizer(self.__model)

        return loss
           

        保存会话:

\'\'\'
    保存session
    fpath: 保存的文件路径
        fpath+\'.s.pkl\' 是保存session的文件
        fpath+\'.m.pkl\' 是保存model的文件
    \'\'\'
    def save(self, fpath):
        model = self.__model
        self.__model = None

        model.save(fpath)

        realfp = fpath + ".s.pkl"
        with open(realfp, \'wb\') as f:
            pickle.dump(self, f)

           

        这里把模型和会话分开保存, 是为了以后可以灵活地选择只加载模型或加载整个会话。下面是模型的保存方法, 在Model中实现:

\'\'\'
    保存模型
    \'\'\'
    def save(self, fpath):
        dir = os.path.dirname(fpath)
        if not os.path.exists(dir):
            os.mkdir(dir)

        self.reset()
        realfp = fpath + ".m.pkl"
        with open(realfp, \'wb\') as f:
            pickle.dump(self, f)
           

        加载会话:

\'\'\'
    加载session
    \'\'\'
    @classmethod
    def load(cls, fpath):
        realfp = fpath + ".s.pkl"
        if not os.path.exists(realfp):
            return None

        sess = None
        with open(realfp, \'rb\') as f:
            sess = pickle.load(f)

        model = Model.load(fpath)
        sess.set_model(model)

        return sess
           

损失函数(Loss)

        损失函数代码在loss.py中。首先定义接口:

\'\'\'
损失函数
\'\'\'
class Loss(object):

    \'\'\'
    梯度属性
    \'\'\'
    @property
    def gradient(self):
        raise Exception("gradient not impliment")


    \'\'\'
    计算误差和梯度
    y_true 数据的真实标签
    y_pred 模型预测的标签

    return 误差值
    \'\'\'
    def __call__(self, y_true, y_pred):
        raise Exception("__call__ not impliment")
           

        接下来给出均方误差损失函数实现:

\'\'\'
均方误差损失函数
\'\'\'
class Mse(Loss):

    def __init__(self):
        self.__grad = None

    def __call__(self, y_true, y_pred):
        err = y_true - y_pred
        loss =  (err**2).mean(axis=0)/2

        n = y_true.shape[0]
        self.__grad = err/n
        #pdb.set_trace()
        return loss.sum()

    @property
    def gradient(self):
        return self.__grad
           

学习率优化器(Optimizer)

        优化器代码在optimizer.py中。

        定义接口:

\'\'\'
学习率优化器
\'\'\'
class Optimizer(object):

    \'\'\'
    更新参数
    \'\'\'
    def __call__(self, model):
        raise Exception(\'not implement\')
           

        实现一个固定学习率优化器, 没有用任何参数优化算法。

\'\'\'
固定学习率优化器
\'\'\'
class Fixed(Optimizer):

    \'\'\'
    lt: 学习率
    \'\'\'
    def __init__(self, lt=0.01):
        self.__lt = lt

    def __call__(self, model):
        #pdb.set_trace()
        for ly in model.layer_iterator():
            for p in ly.params:
                p.value -= self.__lt * p.gradient
                p.udt += 1
           

        到目前为止,一个能够支持最简单MLP模型的框架已经完成。接下来用一个MLP示例来验证一下。

MLPS示例

        使用MLP模型完成一个广义线性回归的任务, 代码在examples/mlp/linear-regression.py中。

        假设这个任务是拟合一个二次多项式函数:

\'\'\'
任务目标函数
\'\'\'
def target_func(x):
    ##加入服从参数(0, 0.25^2)正态分布噪声
    y = (x - 2)**2 + 0.25 * np.random.randn(len(x))
    return y
           

看一下这个函数的图像:

自己动手实现深度学习框架-2 核心实现 - 自带buff

        从使用这个函数采样得到数据集:

\'\'\'
生成数据集
返回: train_x, train_y, test_x, test_y
train_x, train_y 训练数据集的数据和标签
test_x, test_y 验证数据解的数据和标签
\'\'\'
def generate_dataset():
    \'\'\'
    生成200条数据, 随机取出80%条作为训练数据集, 剩余数据为测试数据集
    \'\'\'
    fpath = "./ds.pkl"
    if os.path.exists(fpath):
        with open(fpath, \'rb\') as f:
            ds = pickle.load(f)
            return ds

    count = 200
    x = np.linspace(-1, 5, count)
    y = target_func(x)

    #打乱顺序
    indices = np.arange(count)
    np.random.shuffle(indices)
    #训练数据集
    split = int(count*0.8)
    idxs = indices[:split]
    train_x = x[idxs].reshape((-1,1))
    train_y = y[idxs].reshape((-1,1))

    #测试数据集
    idxs = sorted(indices[split:])
    test_x = x[idxs].reshape((-1, 1))
    shape = test_x.shape
    test_y = y[idxs].reshape((-1, 1))

    ds = {
        \'train_x\': train_x,
        \'train_y\': train_y,
        \'test_x\': test_x,
        \'test_y\': test_y
    }
    with open(fpath, \'wb\') as f:
        pickle.dump(ds, f)

    return ds

#得到数据集
ds_0 = generate_dataset()
print("train shape:", ds_0[\'train_x\'].shape)
print("test shape:", ds_0[\'test_x\'].shape)

#训练集只取一部分
count = 100
ds_1 = {
    \'train_x\': ds_0[\'train_x\'][:16],
    \'train_y\': ds_0[\'train_y\'][:16],
    \'test_x\': ds_0[\'test_x\'],
    \'test_y\': ds_0[\'test_y\']
}
           

        这里得到两个数据集, 一个数据集中有160条训练数据, 40条验证数据。另一个中有16条训练数据和40条验证数据。

        分批训练模型:

\'\'\'
训练模型
\'\'\'
def train(epochs, ds, model=None, batch_size=64, record_epochs=1):
    #加载/构建session
    sess = None
    if model is None:
        sess = Session.load(model_path)
    else:
        sess = Session(model,
                    loss=losses.Mse(),
                    optimizer = optimizers.Fixed()
                )

    train_x = ds[\'train_x\']
    train_y = ds[\'train_y\']
    test_x = ds[\'test_x\']
    test_y = ds[\'test_y\']

    batchs = int(train_x.shape[0]/batch_size)
    print("epochs:%d, batchs=%d"%(epochs, batchs))

    #记录训练历史
    history = {
        \'loss\': [],
        \'val_loss\': [],
        \'epochs\': [],
        \'val_x\': test_x,
        \'val_y\': test_y,
        \'val_pred\': None
    }

    print("start training ")
    t_start = time.time()
    steps = epochs * batchs

    epoch = 1
    #循环训练
    for step in range(steps):
        start = (step % batchs) * batch_size
        end = start + batch_size
        batch_x = train_x[start:end]
        batch_y = train_y[start:end]

        loss = sess.batch_train(batch_x, batch_y)

        cur_epoch = int(step/batchs) + 1

        #每轮打印一次
        if step > 0 and  step % batchs == 0:
            print(((\'epoch:%05d/%d loss=%f\'%(cur_epoch, epochs, loss))+\' \'*50)[:50], end=\'\r\')

        #记录
        if step % batchs == 0 and (cur_epoch - epoch == record_epochs or cur_epoch == epochs):
            epoch = cur_epoch

            y_pred = sess.model.predict(test_x)
            val_loss = sess.loss(test_y, y_pred)

            history[\'loss\'].append(loss)
            history[\'val_loss\'].append(val_loss)
            history[\'epochs\'].append(epoch)
            history[\'val_pred\']  = y_pred

            print(((\'epoch:%05d/%d loss=%f, val_loss=%f\'%(cur_epoch, epochs, loss, val_loss))+\' \'*50)[:50], end=\'\r\')
            print("")

    sess.save(model_path)
    print("training finished cost:%f" % (time.time() - t_start))

    return history
           

        通过这段代码可以看出,框架虽然看起来可用, 但训练模型是仍然需要不少代码,不够友好。不过没关系,目前先通过示例积累经验,以后在把分批训练的功能加入到Session中。要牢记现阶段的主要任务: 对框架进行初步验证。

验证1: 使用线性模型拟合目标函数

#欠拟合示例
def fit_1():
    model = Model([
        nnlys.Dense(32, inshape=1),
        nnlys.Dense(1)
    ])
    model.assemble()
    #这个模型是一个线性模型, 用来拟合非线性函数, 模型复杂度不够,一定会表现出欠拟合
    history = train(20000, ds_0, model, record_epochs=100)
    fit_report(history, report_path+\'01.png\')
           

        拟合报告:

自己动手实现深度学习框架-2 核心实现 - 自带buff

可以看到不论是训练误差还是验证误差都很大, 下面的拟合图形更是惨不忍睹。模型呈欠拟合。

验证2:使用同样多参数的非线性模型拟合目标函数

#使用增加模型复杂度解决欠拟合问题
def fit_2():
    model = Model([
        nnlys.Dense(32, inshape=1, activation=\'relu\'),
        nnlys.Dense(1)
    ])
    model.assemble()
    #使用了relu激活函数模型变成了非线性的, 增加了模型的复杂度
    history = train(30000, ds_0, model, record_epochs=300)
    history[\'loss\'] = history[\'loss\'][5:]
    history[\'val_loss\'] = history[\'val_loss\'][5:]
    history[\'epochs\'] = history[\'epochs\'][5:]
    fit_report(history, report_path+\'02.png\')
           

        拟合报告:

自己动手实现深度学习框架-2 核心实现 - 自带buff

拟合情况比较理想。

验证3: 增加模型复杂度减少训练数据

#过拟合
def fit_3():
    model = Model([
        nnlys.Dense(512, inshape=1, activation=\'relu\'),
        nnlys.Dense(128, activation=\'relu\'),
        nnlys.Dense(1)
    ])
    model.assemble()
    #使用数据集ds_1, 只有16条训练数据
    history = train(30000, ds_1, model, batch_size=16, record_epochs=300)
    history[\'loss\'] = history[\'loss\'][20:]
    history[\'val_loss\'] = history[\'val_loss\'][20:]
    history[\'epochs\'] = history[\'epochs\'][20:]
    fit_report(history, report_path+\'03.png\')
           

        拟合报告:

自己动手实现深度学习框架-2 核心实现 - 自带buff

可以看到训练误差持续降低, 而验证误差先低后高, 说明随着训练轮次的增加,模型过多地学习到了训练数据的模式, 导致泛化误差增大,呈现过过拟合。

验证3: 仍然使用较少的训练数据但降低模型的复杂度

#减少参数数量缓解过拟合
def fit_4():
    model = Model([
        nnlys.Dense(128, inshape=1, activation=\'relu\'),
        nnlys.Dense(64, activation=\'relu\'),
        nnlys.Dense(1)
    ])
    model.assemble()

    history = train(30000, ds_1, model, batch_size=16, record_epochs=300)
    history[\'loss\'] = history[\'loss\'][20:]
    history[\'val_loss\'] = history[\'val_loss\'][20:]
    history[\'epochs\'] = history[\'epochs\'][20:]
    fit_report(history, report_path+\'04.png\')
           

        拟合报告:

自己动手实现深度学习框架-2 核心实现 - 自带buff

可以看到过拟合现象有所缓解,到25000左右才出现过拟合现象,拟合图形变得稍微好一点, 过拟合只是略有缓解。

总结

        目前已经实现了一个最简单可运行的深度学习框架。从验证情况看,它已经达到预期,能够支持简单的MLP模型, 但很直接地暴露出两个问题:

  1. 训练模型需要比较多的代码,不够友好。
  2. 框架本身没有提供处理过拟合现象的方案。

    下一步的主要目标就是解决这两个问题。

自己动手实现深度学习框架-2 核心实现 - 自带buff