自己動手實作深度學習架構-2 核心實作
目标
完成架構設計文檔中列出的基礎類和需要在基礎類中實作的接口。使用最簡的單多層感覺機(Multi-Layer Perceptron)模型對架構進行初步驗證, 是以, 除了架構的核心部分外, 還要實作一個全連接配接層,一個激活函數,一個優化器和一個損失函數。
架構代碼簡介
我把這個架構命名為cute-dl, 已經上傳到github上: https://github.com/brandonlyg/cute-dl.
目錄結構為:
-- cutedl: 架構實作代碼
-- example: 示例
-- test: 單元測試
MLP示例位于 example/mlp目錄下。
層(Layer)和層參數(LayerParam)
相關代碼在model.py中.
LayerParam隻有屬性的定義, 沒什麼邏輯在裡面:
lass LayerParam(object):
\'\'\'
layer_name: 所屬層的的名字
name: 參數名
value: 參數值
\'\'\'
def __init__(self, layer_name, name, value):
self.__name = layer_name+"/"+name
self.value = value
#梯度
self.gradient = None
#更新次數
self.udt = 0
@property
def name(self):
return self.__name
def reset(self):
self.gradient = None
self.udt = 0
其中參數名字是使用樹形結構, 例如: "1-MyLayer/W", 是"1-MyLayer"層的"W"參數的名字。其中"1"是層在模型中的唯一ID, "MyLayer"是層的标簽(tag), "W"是參數在這個層中的唯一名字。
Layer需要實作兩個方法: 一個是__init__方法,一個是join方法. 其他方法不需要實作,隻需按設計文檔中的描述給出定義即可。
先來看看__init__方法:
\'\'\'
outshape: 輸出形狀 2 或者 (2,3)
kargs:
activation: 激活函數的名字
inshape: 輸入形狀
\'\'\'
def __init__(self, *outshape, **kargs)
#輸出形狀
if len(outshape) == 1 and type(outshape[0]) == type(()):
self.__outshape = outshape[0]
else:
self.__outshape = outshape
#輸入形狀
self.__inshape = None
#得到激活函數
self.__activation = activations.get(\'linear\')
#層在模型中的id, 是層在模型中的索引
self.__id = 0
#層的名字
self.__name = \'/%d-%s\'%(self.__id, self.tag)
#得到可選參數
#print("Layer kargs:", kargs)
if \'inshape\' in kargs:
self.__inshape = kargs[\'inshape\']
if type(self.__inshape) != type(()):
self.__inshape = (self.__inshape,)
#print("------inshape:", self.__inshape)
if \'activation\' in kargs:
self.__activation = activations.get(kargs[\'activation\'])
if self.__inshape is not None:
self.init_params()
實作的時主要處理這麼幾個問題:
- 輸入輸出形狀的處理. 保證輸入輸出形狀不論是int還是tuple, 最後都要轉換成tuple。
- 必須要有激活函數。預設使用的線性激活函數。
- 自動生成層的名字。
join方法實作:
\'\'\'
加入到模型中
pre_layer: 前一個層
*inshape: 輸入形狀
\'\'\'
def join(self, pre_layer, *inshape):
if self.__outshape == (-1,):
self.__inshape = pre_layer.inshape
self.__outshape = pre_layer.outshape
else:
self.__inshape = pre_layer.outshape
if len(inshape) != 0:
self.__inshape = inshape
self.__id = pre_layer.layer_id + 1
self.__name = \'/%d-%s\'%(self.__id, self.tag)
self.init_params()
這個方法主要功能是把目前層和另一層連接配接在一起, 讓另一個層成為目前層的(在模型中的)前一層。這裡的"連接配接"主要展現在: 把另一個層的輸出作為輸入。對層ID的處理上, 使用簡單的累加保證層ID在模型中是唯一的, 同時還能通過ID的值知道層位于模型中的什麼位置。 有了輸入輸出形狀, 就可以調用子類實作的init_params方法初始化參數了。
激活函數(Activation)
激活函數代碼在activation.py中。
接口定義:
\'\'\'
激活函數
\'\'\'
class Activation(object):
name=\'\'
def __call__(self, in_batch):
raise Exception("__call__ not implement")
\'\'\'
求梯度
gradient: 該函數輸出值的梯度
\'\'\'
def grad(self, gradient):
raise Exception("gradient not implement")
其中類屬性name作為激活函數的名字。
實作線性激活函數, 作為預設激活函數:
\'\'\'
線性激活函數, 沒有激活
\'\'\'
class Linear(Activation):
name=\'linear\'
def __call__(self, in_batch):
return in_batch
def grad(self, gradient):
return gradient
實作最常用的relu激活函數:
\'\'\'
relu 激活函數
\'\'\'
class Relu(Activation):
name=\'relu\'
def __init__(self):
self.__grad = None
def __call__(self, in_batch):
#得到 <= 0的資料的索引
indices = in_batch <= 0
in_batch[indices] = 0
self.__grad = indices
return in_batch
def grad(self, gradient):
gradient[self.__grad] = 0
self.__grad = None
return gradient
實作用名字(name)擷取激活函數:
act_dict = {
Linear.name: Linear,
Relu.name: Relu
}
#建立激活函數
def get(name):
#print(act_dict)
#print(\'name:\', name)
ACT = act_dict[name]
return ACT()
模型(Model)
首先需要向模型中添加層
\'\'\'
layers: Layer list
\'\'\'
def __init__(self, layers=None):
self.__layers = layers
\'\'\'
添加層
layer: Layer類型的對象
\'\'\'
def add(self, layer):
if self.__layers is None:
self.__layers = []
self.__layers.append(layer)
return self
__init__和add方法都能實作這個功能。
然後是層的的通路能力:
\'\'\'
得到一個Layer對象
idx: Layer對象的索引
\'\'\'
def get_layer(self, index):
self.__check()
if len(self.__layers) <= index:
raise Exception("index out of range %d"%len(self.__layers))
return self.__layers[index]
@property
def layer_count(self):
return len(self.__layers)
\'\'\'
得到層的疊代器
\'\'\'
def layer_iterator(self):
self.__check()
for ly in self.__layers:
yield ly
接下來是組裝模型:
\'\'\'
組裝模型
\'\'\'
def assemble(self):
self.__check()
count = len(self.__layers)
#輸入層必須要有輸入形狀
ly_0 = self.__layers[0]
if ly_0.inshape is None or len(ly_0.inshape) == 0:
raise Exception("input layer miss inshape")
#把每一層的輸入形狀設定為上一層的輸出形狀,
#設定輸入形狀的同時, 要求該層自動初始化參數(如果有參數的話)
pre_ly = ly_0
for ly in self.__layers[1:]:
ly.join(pre_ly)
pre_ly = ly
向前傳播:
\'\'\'
使用模型預測
in_batch: 一批輸入資料
\'\'\'
def predict(self, in_batch, training=False):
self.__check()
out = in_batch
for ly in self.__layers:
out = ly.forward(out, training)
return out
反向傳播:
\'\'\'
反向傳播梯度
\'\'\'
def backward(self, gradient):
g = gradient
#pdb.set_trace()
count = len(self.__layers)
for i in range(count-1, -1, -1):
ly = self.__layers[i]
g = ly.backward(g)
訓練上下文會話(Session)
Session代碼在session.py中。
初始化__init__:
\'\'\'
model: Model對象
loss: Loss對象
optimizer: Optimizer對象
\'\'\'
def __init__(self, model, loss, optimizer):
self.__model = model
self.__loss = loss
self.__optimizer = optimizer
會話主要維護模型, 損失函數和優化器。這些對一個簡單的MLP模型來說已經足夠,至于genoptimizer以後再添加。
訓練模型:
\'\'\'
分批訓練
\'\'\'
def batch_train(self, data, label):
#使用模型預測
out = self.__model.predict(data, training=True)
#使用損失函數評估誤差
loss = self.__loss(out, label)
grad = self.__loss.gradient
#pdb.set_trace()
#反向傳播梯度
self.__model.backward(self.__loss.gradient)
#更新模型參數
self.__optimizer(self.__model)
return loss
儲存會話:
\'\'\'
儲存session
fpath: 儲存的檔案路徑
fpath+\'.s.pkl\' 是儲存session的檔案
fpath+\'.m.pkl\' 是儲存model的檔案
\'\'\'
def save(self, fpath):
model = self.__model
self.__model = None
model.save(fpath)
realfp = fpath + ".s.pkl"
with open(realfp, \'wb\') as f:
pickle.dump(self, f)
這裡把模型和會話分開儲存, 是為了以後可以靈活地選擇隻加載模型或加載整個會話。下面是模型的儲存方法, 在Model中實作:
\'\'\'
儲存模型
\'\'\'
def save(self, fpath):
dir = os.path.dirname(fpath)
if not os.path.exists(dir):
os.mkdir(dir)
self.reset()
realfp = fpath + ".m.pkl"
with open(realfp, \'wb\') as f:
pickle.dump(self, f)
加載會話:
\'\'\'
加載session
\'\'\'
@classmethod
def load(cls, fpath):
realfp = fpath + ".s.pkl"
if not os.path.exists(realfp):
return None
sess = None
with open(realfp, \'rb\') as f:
sess = pickle.load(f)
model = Model.load(fpath)
sess.set_model(model)
return sess
損失函數(Loss)
損失函數代碼在loss.py中。首先定義接口:
\'\'\'
損失函數
\'\'\'
class Loss(object):
\'\'\'
梯度屬性
\'\'\'
@property
def gradient(self):
raise Exception("gradient not impliment")
\'\'\'
計算誤差和梯度
y_true 資料的真實标簽
y_pred 模型預測的标簽
return 誤內插補點
\'\'\'
def __call__(self, y_true, y_pred):
raise Exception("__call__ not impliment")
接下來給出均方誤差損失函數實作:
\'\'\'
均方誤差損失函數
\'\'\'
class Mse(Loss):
def __init__(self):
self.__grad = None
def __call__(self, y_true, y_pred):
err = y_true - y_pred
loss = (err**2).mean(axis=0)/2
n = y_true.shape[0]
self.__grad = err/n
#pdb.set_trace()
return loss.sum()
@property
def gradient(self):
return self.__grad
學習率優化器(Optimizer)
優化器代碼在optimizer.py中。
定義接口:
\'\'\'
學習率優化器
\'\'\'
class Optimizer(object):
\'\'\'
更新參數
\'\'\'
def __call__(self, model):
raise Exception(\'not implement\')
實作一個固定學習率優化器, 沒有用任何參數優化算法。
\'\'\'
固定學習率優化器
\'\'\'
class Fixed(Optimizer):
\'\'\'
lt: 學習率
\'\'\'
def __init__(self, lt=0.01):
self.__lt = lt
def __call__(self, model):
#pdb.set_trace()
for ly in model.layer_iterator():
for p in ly.params:
p.value -= self.__lt * p.gradient
p.udt += 1
到目前為止,一個能夠支援最簡單MLP模型的架構已經完成。接下來用一個MLP示例來驗證一下。
MLPS示例
使用MLP模型完成一個廣義線性回歸的任務, 代碼在examples/mlp/linear-regression.py中。
假設這個任務是拟合一個二次多項式函數:
\'\'\'
任務目标函數
\'\'\'
def target_func(x):
##加入服從參數(0, 0.25^2)正态分布噪聲
y = (x - 2)**2 + 0.25 * np.random.randn(len(x))
return y
看一下這個函數的圖像:
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIx0DciV2dmADM30zd-cmbw5CRzUCRzUydaVnQuxUMjRlTykEVNVTWq5EdjRUT5lFRNhXVq5EeFRVTwEERNlXQq1EdJpmT3VFVPRDOD50dBpWT3lkeMlXWE1UMrR0T2NmMiNnSywEd5ITW110MaZHetlVdO1GT3lERNl3YXJGc5kHT20ESjBjUIF2Lc12bj5SYphXa5VWen5WY35iclN3Ztl2Lc9CX6MHc0RHaiojIsJye.png)
從使用這個函數采樣得到資料集:
\'\'\'
生成資料集
傳回: train_x, train_y, test_x, test_y
train_x, train_y 訓練資料集的資料和标簽
test_x, test_y 驗證資料解的資料和标簽
\'\'\'
def generate_dataset():
\'\'\'
生成200條資料, 随機取出80%條作為訓練資料集, 剩餘資料為測試資料集
\'\'\'
fpath = "./ds.pkl"
if os.path.exists(fpath):
with open(fpath, \'rb\') as f:
ds = pickle.load(f)
return ds
count = 200
x = np.linspace(-1, 5, count)
y = target_func(x)
#打亂順序
indices = np.arange(count)
np.random.shuffle(indices)
#訓練資料集
split = int(count*0.8)
idxs = indices[:split]
train_x = x[idxs].reshape((-1,1))
train_y = y[idxs].reshape((-1,1))
#測試資料集
idxs = sorted(indices[split:])
test_x = x[idxs].reshape((-1, 1))
shape = test_x.shape
test_y = y[idxs].reshape((-1, 1))
ds = {
\'train_x\': train_x,
\'train_y\': train_y,
\'test_x\': test_x,
\'test_y\': test_y
}
with open(fpath, \'wb\') as f:
pickle.dump(ds, f)
return ds
#得到資料集
ds_0 = generate_dataset()
print("train shape:", ds_0[\'train_x\'].shape)
print("test shape:", ds_0[\'test_x\'].shape)
#訓練集隻取一部分
count = 100
ds_1 = {
\'train_x\': ds_0[\'train_x\'][:16],
\'train_y\': ds_0[\'train_y\'][:16],
\'test_x\': ds_0[\'test_x\'],
\'test_y\': ds_0[\'test_y\']
}
這裡得到兩個資料集, 一個資料集中有160條訓練資料, 40條驗證資料。另一個中有16條訓練資料和40條驗證資料。
分批訓練模型:
\'\'\'
訓練模型
\'\'\'
def train(epochs, ds, model=None, batch_size=64, record_epochs=1):
#加載/建構session
sess = None
if model is None:
sess = Session.load(model_path)
else:
sess = Session(model,
loss=losses.Mse(),
optimizer = optimizers.Fixed()
)
train_x = ds[\'train_x\']
train_y = ds[\'train_y\']
test_x = ds[\'test_x\']
test_y = ds[\'test_y\']
batchs = int(train_x.shape[0]/batch_size)
print("epochs:%d, batchs=%d"%(epochs, batchs))
#記錄訓練曆史
history = {
\'loss\': [],
\'val_loss\': [],
\'epochs\': [],
\'val_x\': test_x,
\'val_y\': test_y,
\'val_pred\': None
}
print("start training ")
t_start = time.time()
steps = epochs * batchs
epoch = 1
#循環訓練
for step in range(steps):
start = (step % batchs) * batch_size
end = start + batch_size
batch_x = train_x[start:end]
batch_y = train_y[start:end]
loss = sess.batch_train(batch_x, batch_y)
cur_epoch = int(step/batchs) + 1
#每輪列印一次
if step > 0 and step % batchs == 0:
print(((\'epoch:%05d/%d loss=%f\'%(cur_epoch, epochs, loss))+\' \'*50)[:50], end=\'\r\')
#記錄
if step % batchs == 0 and (cur_epoch - epoch == record_epochs or cur_epoch == epochs):
epoch = cur_epoch
y_pred = sess.model.predict(test_x)
val_loss = sess.loss(test_y, y_pred)
history[\'loss\'].append(loss)
history[\'val_loss\'].append(val_loss)
history[\'epochs\'].append(epoch)
history[\'val_pred\'] = y_pred
print(((\'epoch:%05d/%d loss=%f, val_loss=%f\'%(cur_epoch, epochs, loss, val_loss))+\' \'*50)[:50], end=\'\r\')
print("")
sess.save(model_path)
print("training finished cost:%f" % (time.time() - t_start))
return history
通過這段代碼可以看出,架構雖然看起來可用, 但訓練模型是仍然需要不少代碼,不夠友好。不過沒關系,目前先通過示例積累經驗,以後在把分批訓練的功能加入到Session中。要牢記現階段的主要任務: 對架構進行初步驗證。
驗證1: 使用線性模型拟合目标函數
#欠拟合示例
def fit_1():
model = Model([
nnlys.Dense(32, inshape=1),
nnlys.Dense(1)
])
model.assemble()
#這個模型是一個線性模型, 用來拟合非線性函數, 模型複雜度不夠,一定會表現出欠拟合
history = train(20000, ds_0, model, record_epochs=100)
fit_report(history, report_path+\'01.png\')
拟合報告:
可以看到不論是訓練誤差還是驗證誤差都很大, 下面的拟合圖形更是慘不忍睹。模型呈欠拟合。
驗證2:使用同樣多參數的非線性模型拟合目标函數
#使用增加模型複雜度解決欠拟合問題
def fit_2():
model = Model([
nnlys.Dense(32, inshape=1, activation=\'relu\'),
nnlys.Dense(1)
])
model.assemble()
#使用了relu激活函數模型變成了非線性的, 增加了模型的複雜度
history = train(30000, ds_0, model, record_epochs=300)
history[\'loss\'] = history[\'loss\'][5:]
history[\'val_loss\'] = history[\'val_loss\'][5:]
history[\'epochs\'] = history[\'epochs\'][5:]
fit_report(history, report_path+\'02.png\')
拟合報告:
拟合情況比較理想。
驗證3: 增加模型複雜度減少訓練資料
#過拟合
def fit_3():
model = Model([
nnlys.Dense(512, inshape=1, activation=\'relu\'),
nnlys.Dense(128, activation=\'relu\'),
nnlys.Dense(1)
])
model.assemble()
#使用資料集ds_1, 隻有16條訓練資料
history = train(30000, ds_1, model, batch_size=16, record_epochs=300)
history[\'loss\'] = history[\'loss\'][20:]
history[\'val_loss\'] = history[\'val_loss\'][20:]
history[\'epochs\'] = history[\'epochs\'][20:]
fit_report(history, report_path+\'03.png\')
拟合報告:
可以看到訓練誤差持續降低, 而驗證誤差先低後高, 說明随着訓練輪次的增加,模型過多地學習到了訓練資料的模式, 導緻泛化誤差增大,呈現過過拟合。
驗證3: 仍然使用較少的訓練資料但降低模型的複雜度
#減少參數數量緩解過拟合
def fit_4():
model = Model([
nnlys.Dense(128, inshape=1, activation=\'relu\'),
nnlys.Dense(64, activation=\'relu\'),
nnlys.Dense(1)
])
model.assemble()
history = train(30000, ds_1, model, batch_size=16, record_epochs=300)
history[\'loss\'] = history[\'loss\'][20:]
history[\'val_loss\'] = history[\'val_loss\'][20:]
history[\'epochs\'] = history[\'epochs\'][20:]
fit_report(history, report_path+\'04.png\')
拟合報告:
可以看到過拟合現象有所緩解,到25000左右才出現過拟合現象,拟合圖形變得稍微好一點, 過拟合隻是略有緩解。
總結
目前已經實作了一個最簡單可運作的深度學習架構。從驗證情況看,它已經達到預期,能夠支援簡單的MLP模型, 但很直接地暴露出兩個問題:
- 訓練模型需要比較多的代碼,不夠友好。
-
架構本身沒有提供處理過拟合現象的方案。
下一步的主要目标就是解決這兩個問題。