一、背景
之前在做GAN主要是關注GAN的應用,找了一些比較好的例子實作了下,後面還會持續做這方面的工作。今天來看看DRAGAN對于GAN中一些問題的處理方法,也為今後這方面的研究做一部分基礎工作吧,我們不僅應該了解GAN能做什麼,還應該了解GAN的問題及解決方法。
DRAGAN是Naveen Kodali等人于2017年5月發表的一篇文章《ON CONVERGENCE AND STABILITY OF GANS》,雖然文章發表的比較早,不過這篇文章的思路還是非常值得我們去學習,這篇文章的重點我會放在一些分析上,後面的實驗過程我暫時先用MNIST資料集做實驗,重點會分析一部分代碼,總之先對文章進行一個詳細的解讀吧。
[1]文章連結:https://arxiv.org/pdf/1705.07215.pdf
二、DRAGAN原理
很遺憾的是,網上幾乎沒有對這篇文章的解讀,是以我就根據自己的了解來解讀一下這篇文章的工作。
我先把文章的摘要部分截取出來:
We propose studying GAN training dynamics as regret minimization, which is in contrast to the popular view that there is consistent minimization of a divergence between real and generated distributions. We analyze the convergence of GAN training from this new point of view to understand why mode collapse happens. We hypothesize the existence of undesirable local equilibria in this non-convex game to be responsible for mode collapse. We observe that these local equilibria often exhibit sharp gradients of the discriminator function around some real data points. We demonstrate that these degenerate local equilibria can be avoided with a gradient penalty scheme called DRAGAN. We show that DRAGAN enables faster training, achieves improved stability with fewer mode collapses, and leads to generator networks with better modeling performance across a variety of architectures and objective functions.
簡而言之,作者提出一個觀點,GAN的訓練過程是作為一種regret minimization(後悔最小化?不太明白專業翻譯是啥)而非出傳統觀點所認為的consistent minimization(持續最小化)。猜測模型倒塌(mode collapse)是由于在非凸情況下出現了局部平衡,作者觀測到局部平衡總是在判别函數中的真實資料周圍表現出了尖銳的梯度,是以作者提出DRAGAN,在模型中引入梯度懲罰機制以避免局部平衡。結果表明,DRAGAN能夠更快訓練,且更穩定,很少出現模型倒塌(mode collapse)現象。
我們知道,GAN是能夠學習資料分布的,模型驅動一般是用梯度下降,但是,GAN是不穩定的,GAN會一定程度上引起模型倒塌(mode collapse)現象。傳統觀點認為引起該現象的原因是由于訓練過程中嘗試極小化強發散(minimize a strong divergence)。
作者指出,GAN的訓練是一種博弈,生成器和判别器都是用的無悔算法(no-regret algorithms)。然而,這種方式會導緻,非凸(non-convex)情況(常常用于深度神經網絡)下,收斂的不穩定(convergence results do not hold)。在非凸博弈中,通常來看,全局有悔極小化(global regret minimization)和平衡計算(equilibrium computation)是非常困難的。并且,梯度下降法最終還會陷入循環或者在一些情況下收斂于局部平衡。作者猜測上面的兩點問題分别導緻了陷入循環和模型倒塌,然而,對于陷入循環現象無法探測,是以作者重點關注模型倒塌的問題。
還是回到前面的問題,模型倒塌是由于判别器中出現了過于尖銳的梯度,為了解決這個問題,可以加入單隐含層的神經網絡,這也解釋了為什麼WGAN能夠減輕模型倒塌的現象(可參考:對抗神經網絡學習(四)——WGAN+爬蟲生成皮卡丘圖像(tensorflow實作)),是以作者加入了梯度懲罰機制,提出DRAGAN(Deep Regret Analytic Generative Adversarial Networks),作者的主要貢獻可以總結為以下幾點:
• We propose a new way of reasoning about the GAN training dynamics - by viewing AGD as regret minimization. (提出了一個GAN訓練過程中出現問題的一種新的原因——将梯度下降作為regret minimization)
• We provide a novel proof for the asymptotic convergence of GAN training in the nonparametric limit and it does not require the discriminator to be optimal at each step.(對于GAN在非參限制下的漸進收斂提出了一種新的證明)
• We discuss how AGD can converge to a potentially bad local equilibrium in non-convex games and hypothesize this to be responsible for mode collapse during GAN training.(讨論了梯度下降在非凸博弈中如何收斂到局部平衡)
• We characterize mode collapse situations with sharp gradients of the discriminator function around some real data points.(判别函數周圍的真實資料點所産生的尖銳梯度會導緻模型倒塌)
• A novel gradient penalty scheme called DRAGAN is introduced based on this observation and we demonstrate that it mitigates the mode collapse issue.(一種新的梯度懲罰項,建立DRAGAN)
前面主要是作者對文章内容的綜述,下面是部分相關背景工作:(這裡實在是懶得打公式了,就先截圖了)
為了做進一步實驗,首先我們需要先了解GAN中的一些内容,對于GAN來說,生成器和判别器的cost function可以定義為:
對于整個模型來說,也就是:
對于凸凹情況來講,若要滿足下面的方程:
此時,可以認為生成器和判别器的博弈達到了一個平衡狀态。觀測這種平衡最好的方法則是regret minimization,為了解釋這個方法,需要先介紹無悔算法(no-regret algorithms):
假定我們給出凸損失函數序列L1,L2, . . . :K ——> R,我們選出前t個序列記為Kt,無悔算法即指的是:
這裡我們就可以用無悔算法找到GAN博弈的結果J(·,·)。推導過程如下,若經過t輪博弈,生成器的損失函數可以表示為
,判别器的損失函數可以表示為
,T輪博弈之後:
換句話說:
下面是關于模型倒塌現象出現的解決辦法:
模型倒塌的原因前面也介紹過了,這種尖銳的梯度,會使得多個z矢量映射到單個輸出x,造成博弈的退化平衡(實際表現出來也就是輸入的多組變量都會産生一緻的結果),為了減少這種現象,可以對判别器添加懲罰項:
這個方法也确實能夠提高模型訓練的穩定性。這也解釋了為什麼WGAN能一定程度上解決模型倒塌,進一步的研究,這種機制非常難達到,一旦過度懲罰(over-penalized),判别器則會引入一些噪聲,是以更好的懲罰項應該如下設定:
最後,出于一些經驗性的優化考慮,作者最終所采用的懲罰項為:
左圖是訓練次數與inception score(該參數用來評價GAN生成圖像品質的好壞,不過有一定缺陷)的關系圖,右邊是相應的判别器的梯度均方值,可以看到,圖像中出現了變化劇烈的地方,這也就是産生模型倒塌的原因。
另外的一些細節包括:
• We use the vanilla GAN objective in our experiments, but our penalty improves stability using other objective functions as well. This is demonstrated in section 3.3. (實驗使用了vanilla GAN對象,但是懲罰項是用了其他對象)
• The penalty scheme used in our experiments is the one shown in equation 1. (懲罰機制使用的是上文提到的)
• We use small pixel-level noise but it is possible to find better ways of imposing this penalty. However, this exploration is beyond the scope of our paper.(對于該懲罰項中的噪聲采用的是小的像素級噪聲,但其實還有更好的方法)
• The optimal configuration of the hyperparameters for DRAGAN depends on the architecture, dataset and data domain. We set them to be λ ~ 10, k = 1 and c ~ 10 in most of our experiments.(超參數的優化與模型結構,資料集,資料域有關。)
下面作者展示了一下加入梯度懲罰項後的變化,變化前的IS得分(一種評價生成圖像品質的因子)變化:
變化後的得分:
明顯可以看到圖像中明顯減少劇烈變化的地方,這也就是說一定程度上減輕了模型倒塌。
最後作者提到,現有模型中,有很多采用正則化來限制判别器梯度的方法,思路與作者的很類似,比較典型的是LS-GAN和WGAN-GP;在LS-GAN中,模型在判别器中引入Lipschitz constraint,并做了如下限制:
相似的是,WGAN-GP也是對判别器進行限制:
這兩種思路非常相似,作者将其稱之為“coupled penalties”(雙重懲罰?)。
而WGAN-GP仍然沒有對任意成對的真假圖像進行處理(這裡應該指的是沒有加入偏執項),根據現有文獻,在成對的真假圖像中,理想判别器是有着1範數梯度(norm-1 gradients)的,且這些成對樣本的聯合分布為泊松分布(π)。而這也是DRAGAN和其他方法的不同之處,作者将自己的方法稱之為“local penalties”(局部懲罰)。當然,作者也指出了coupled penalties存在的問題:
• With adversarial training finding applications beyond fitting implicit generative models, penalties which depend on generated samples can be prohibitive. (懲罰項可能會被抑制)
• The resulting class of functions when coupled penalties are used will be highly restricted compared to our method and this affects modeling performance. We refer the reader to Figure 4 and appendix section 5.2.2 to see this effect.(coupled penalties會使結果被嚴重限制)
• Our algorithm works with AGD, while WGAN-GP needs multiple inner iterations to optimize D. This is because the generated samples can be anywhere in the data space and they change from one iteration to the next. In contrast, we consistently regularize D(x) only along the real data manifold.(作者的算法可以直接使用自适應梯度下降,但是WGAN-GP則需要内部多次疊代來優化判别器)
最後是作者的一些小實驗,這裡就不多說了,貼出一個關于Swissroll資料(瑞士卷)的實驗吧:
圖中橙色的是真實資料,綠色的是生成的樣本,背景中類似等高線的是水準集(一種region-based圖像分割方法),從上到下的依次是普通的GAN,WGAN-GP,DRAGAN,從左到右依次是訓練的不同階段,通過這些圖也能夠比較明顯的看出來,DRAGAN能夠很好的拟合真實資料。
實驗的代碼主要參考:
[2]https://github.com/hwalsuklee/tensorflow-generative-model-collections
不過自己在運作的過程中,發現原有代碼的儲存圖像還有運作程式當中,有一點小問題,是以做了一定改進。
三、DRAGAN實作
1. 檔案結構
DRAGAN所需要的檔案結構包括:
-- utils.py # 輔助檔案
-- ops.py # 圖層檔案
-- dragan.py # 主檔案
-- mnist_data ###### 需要自己準備資料集
|------ 0
|------ image1.jpg
|------ image2.jpg
|------ ......
|------ 1
|------ image1.jpg
|------ image2.jpg
|------ ......
|------ 2
|------ ......
|------ 9
|------ image1.jpg
|------ image2.jpg
|------ ......
和傳統mnist資料集不同的是,這次我是将mnist資料集的圖檔做好,按類别放在了不同檔案夾裡,這樣也一定程度上可以将該方法用于其他資料。至于如何制作資料集,這裡就不再多說了,可以參考:對抗神經網絡學習(一)——GAN實作mnist手寫數字生成(tensorflow實作)。當然了,實際上DRAGAN是非監督模型,不需要這些标簽也仍然可以順利運作。
2. 輔助檔案utils.py
utils.py檔案主要定義了一些儲存圖像,加載變量的操作,我對原代碼中的很多功能進行了删除,隻保留了兩個函數,下面摘一些比較關鍵的代碼:
首先是保留關鍵變量:
def show_all_variables():
import tensorflow as tf
import tensorflow.contrib.slim as slim
model_vars = tf.trainable_variables()
slim.model_analyzer.analyze_vars(model_vars, print_info=True)
其次我們需要對實驗生成的結果進行繪制,這裡将64張圖貼到一起,這段代碼是參考了之前寫GAN時的代碼,覺得更簡單好用,函數如下:
def show_result(batch_res, fname, grid_size=(8, 8), grid_pad=0, image_height=28, image_width=28):
from skimage import io
import numpy as np
batch_res = 0.5 * batch_res.reshape((batch_res.shape[0], image_height, image_width)) + 0.5
# 重構顯示圖像格網的參數
img_h, img_w = batch_res.shape[1], batch_res.shape[2]
grid_h = img_h * grid_size[0] + grid_pad * (grid_size[0] - 1)
grid_w = img_w * grid_size[1] + grid_pad * (grid_size[1] - 1)
img_grid = np.zeros((grid_h, grid_w), dtype=np.uint8)
for i, res in enumerate(batch_res):
if i >= grid_size[0] * grid_size[1]:
break
img = res * 255.
img = img.astype(np.uint8)
row = (i // grid_size[0]) * (img_h + grid_pad)
col = (i % grid_size[1]) * (img_w + grid_pad)
img_grid[row:row + img_h, col:col + img_w] = img
io.imsave(fname, img_grid)
3. 圖層檔案ops.py
ops.py檔案中定義了很多圖層相關的操作。下面來一一說明。
首先是BN(batch normalization)層,主要功能就是進行歸一化,一定程度上減少過拟合:
import tensorflow as tf
def bn(x, is_training, scope):
return tf.contrib.layers.batch_norm(x,
decay=0.9,
updates_collections=None,
epsilon=1e-5,
scale=True,
is_training=is_training,
scope=scope)
接下來是一對常用的卷積和反卷積操作,這兩個操作有現成的API,隻不過将它們封裝成函數會更友善一些:
def conv2d(input_, output_dim, k_h=5, k_w=5, d_h=2, d_w=2, stddev=0.02, name="conv2d"):
with tf.variable_scope(name):
w = tf.get_variable('w', [k_h, k_w, input_.get_shape()[-1], output_dim],
initializer=tf.truncated_normal_initializer(stddev=stddev))
conv = tf.nn.conv2d(input_, w, strides=[1, d_h, d_w, 1], padding='SAME')
biases = tf.get_variable('biases', [output_dim], initializer=tf.constant_initializer(0.0))
conv = tf.reshape(tf.nn.bias_add(conv, biases), conv.get_shape())
return conv
def deconv2d(input_, output_shape, k_h=5, k_w=5, d_h=2, d_w=2, name="deconv2d", stddev=0.02, with_w=False):
with tf.variable_scope(name):
# filter : [height, width, output_channels, in_channels]
w = tf.get_variable('w', [k_h, k_w, output_shape[-1], input_.get_shape()[-1]],
initializer=tf.random_normal_initializer(stddev=stddev))
deconv = tf.nn.conv2d_transpose(input_, w, output_shape=output_shape, strides=[1, d_h, d_w, 1])
biases = tf.get_variable('biases', [output_shape[-1]], initializer=tf.constant_initializer(0.0))
deconv = tf.reshape(tf.nn.bias_add(deconv, biases), deconv.get_shape())
if with_w:
return deconv, w, biases
else:
return deconv
最後就是leakyRelu和線性函數,這裡也直接給出:
def lrelu(x, leak=0.2, name="lrelu"):
return tf.maximum(x, leak*x)
def linear(input_, output_size, scope=None, stddev=0.02, bias_start=0.0, with_w=False):
shape = input_.get_shape().as_list()
with tf.variable_scope(scope or "Linear"):
matrix = tf.get_variable("Matrix", [shape[1], output_size], tf.float32,
tf.random_normal_initializer(stddev=stddev))
bias = tf.get_variable("bias", [output_size],
initializer=tf.constant_initializer(bias_start))
if with_w:
return tf.matmul(input_, matrix) + bias, matrix, bias
else:
return tf.matmul(input_, matrix) + bias
4. 模型檔案dragan.py
dragan.py檔案的内容比較多,包括讀取資料,定義dragan的相關操作,主函數控制流程等,下面來簡要進行說明。
首先我們需要導入相關的庫和定義關鍵的超參數(當然這些參數你也可以封裝到主函數裡):
import os, glob
from skimage import io, transform
import time
from ops import *
from utils import *
epochs = 1000
batch_size = 64
z_dim = 128
checkpoint_dir = './DRAGAN/checkpoint/'
result_dir = './DRAGAN/result/'
dataset_dir = 'mnist_data/'
接下來是讀取資料部分,這裡我的思路是,讀取檔案夾下的所有資料,按子檔案夾的編号為其制作标簽,同時考慮是否将标簽制作成onehot編碼格式,最後将資料集的80%用作訓練,20%用作測試,這部分的代碼為:
def get_data(data_path, onehot=False, train_data_rate=0.8, image_height=28, image_width=28):
def label_to_onehot(labels):
n_sample = len(labels)
n_class = max(labels) + 1
onehot_labels = np.zeros((n_sample, n_class))
onehot_labels[np.arange(n_sample), labels] = 1
return onehot_labels
# 找到路徑下的所有子檔案夾
cate = [data_path + f for f in os.listdir(data_path) if os.path.isdir(data_path + f)]
# 設定兩個變量用來存放圖像和标簽
imgs = []
labels = []
# 循環讀取每張圖檔并制作标簽
for idx, folder in enumerate(cate):
for im in glob.glob(folder + '/*.jpg'):
print('reading the images:%s' % im)
img = io.imread(im)
img = (img < 127) * (255 - img)
img = transform.resize(img, (image_height, image_width))
# img = tf.image.per_image_standardization(img)
imgs.append(img)
labels.append(idx)
if onehot:
labels = label_to_onehot(labels)
# 随機打亂資料
image_num = np.asarray(imgs, np.float32).shape[0]
arr = np.arange(image_num)
np.random.shuffle(arr)
data = np.asarray(imgs, np.float32)[arr]
label = np.asarray(labels, np.int32)[arr]
# 将所有資料分為訓練集和驗證集
s = np.int(image_num * train_data_rate)
x_train = data[:s].reshape(-1, image_height, image_width, 1)
y_train = label[:s]
x_val = data[s:].reshape(-1, image_height, image_width, 1)
y_val = label[s:]
return x_train, y_train, x_val, y_val, image_num
接下來才是最最關鍵的部分,也就是draGAN模型的建構,draGAN主要是在infoGAN的基礎上進行的,這裡我主要參考原代碼,做了少量改進,下面給出所有相關代碼:
class DRAGAN(object):
model_name = "DRAGAN" # name for checkpoint
def __init__(self, sess, epoch, batch_size, z_dim, checkpoint_dir, result_dir,dataset_dir):
self.sess = sess
self.dataset_dir = dataset_dir
self.checkpoint_dir = checkpoint_dir
self.result_dir = result_dir
self.epoch = epoch
self.batch_size = batch_size
self.image_height = 28
self.image_width = 28
self.image_channel = 1
self.z_dim = z_dim
# DRAGAN parameter
self.lambd = 0.25 # The higher value, the more stable, but the slower convergence
# summary writer
self.writer = tf.summary.FileWriter('./logs/' + self.model_name, self.sess.graph)
# train
self.learning_rate = 0.0002
self.beta1 = 0.5
# test
self.sample_num = 64 # number of generated images to be saved
# load data
self.train_x, self.train_y, self.val_y, self.val_y, self.image_num = get_data(self.dataset_dir)
self.num_batches = self.image_num // self.batch_size
def discriminator(self, x, is_training=True, reuse=False):
# Network Architecture is exactly same as in infoGAN (https://arxiv.org/abs/1606.03657)
# Architecture : (64)4c2s-(128)4c2s_BL-FC1024_BL-FC1_S
with tf.variable_scope("discriminator", reuse=reuse):
net = lrelu(conv2d(x, 64, 4, 4, 2, 2, name='d_conv1'))
net = lrelu(bn(conv2d(net, 128, 4, 4, 2, 2, name='d_conv2'), is_training=is_training, scope='d_bn2'))
net = tf.reshape(net, [self.batch_size, -1])
net = lrelu(bn(linear(net, 1024, scope='d_fc3'), is_training=is_training, scope='d_bn3'))
out_logit = linear(net, 1, scope='d_fc4')
out = tf.nn.sigmoid(out_logit)
return out, out_logit, net
def generator(self, z, is_training=True, reuse=False):
# Network Architecture is exactly same as in infoGAN (https://arxiv.org/abs/1606.03657)
# Architecture : FC1024_BR-FC7x7x128_BR-(64)4dc2s_BR-(1)4dc2s_S
with tf.variable_scope("generator", reuse=reuse):
net = tf.nn.relu(bn(linear(z, 1024, scope='g_fc1'), is_training=is_training, scope='g_bn1'))
net = tf.nn.relu(bn(linear(net, 128 * 7 * 7, scope='g_fc2'), is_training=is_training, scope='g_bn2'))
net = tf.reshape(net, [self.batch_size, 7, 7, 128])
net = tf.nn.relu(
bn(deconv2d(net, [self.batch_size, 14, 14, 64], 4, 4, 2, 2, name='g_dc3'), is_training=is_training,
scope='g_bn3'))
out = tf.nn.sigmoid(deconv2d(net, [self.batch_size, 28, 28, 1], 4, 4, 2, 2, name='g_dc4'))
return out
def get_perturbed_batch(self, minibatch):
return minibatch + 0.5 * minibatch.std() * np.random.random(minibatch.shape)
def build_model(self):
# some parameters
image_dims = [self.image_height, self.image_width, self.image_channel]
bs = self.batch_size
""" Graph Input """
self.inputs = tf.placeholder(tf.float32, [bs] + image_dims, name='real_images')
self.inputs_p = tf.placeholder(tf.float32, [bs] + image_dims, name='real_perturbed_images')
# noises
self.z = tf.placeholder(tf.float32, [bs, self.z_dim], name='z')
""" Loss Function """
# output of D for real images
D_real, D_real_logits, _ = self.discriminator(self.inputs, is_training=True, reuse=False)
# output of D for fake images
G = self.generator(self.z, is_training=True, reuse=False)
D_fake, D_fake_logits, _ = self.discriminator(G, is_training=True, reuse=True)
# get loss for discriminator
d_loss_real = tf.reduce_mean(
tf.nn.sigmoid_cross_entropy_with_logits(logits=D_real_logits, labels=tf.ones_like(D_real)))
d_loss_fake = tf.reduce_mean(
tf.nn.sigmoid_cross_entropy_with_logits(logits=D_fake_logits, labels=tf.zeros_like(D_fake)))
self.d_loss = d_loss_real + d_loss_fake
# get loss for generator
self.g_loss = tf.reduce_mean(
tf.nn.sigmoid_cross_entropy_with_logits(logits=D_fake_logits, labels=tf.ones_like(D_fake)))
""" DRAGAN Loss (Gradient penalty) """
# This is borrowed from https://github.com/kodalinaveen3/DRAGAN/blob/master/DRAGAN.ipynb
alpha = tf.random_uniform(shape=self.inputs.get_shape(), minval=0.,maxval=1.)
differences = self.inputs_p - self.inputs # This is different from WGAN-GP
interpolates = self.inputs + (alpha * differences)
_, D_inter, _=self.discriminator(interpolates, is_training=True, reuse=True)
gradients = tf.gradients(D_inter, [interpolates])[0]
slopes = tf.sqrt(tf.reduce_sum(tf.square(gradients), reduction_indices=[1]))
gradient_penalty = tf.reduce_mean((slopes - 1.) ** 2)
self.d_loss += self.lambd * gradient_penalty
""" Training """
# divide trainable variables into a group for D and a group for G
t_vars = tf.trainable_variables()
d_vars = [var for var in t_vars if 'd_' in var.name]
g_vars = [var for var in t_vars if 'g_' in var.name]
# optimizers
with tf.control_dependencies(tf.get_collection(tf.GraphKeys.UPDATE_OPS)):
self.d_optim = tf.train.AdamOptimizer(self.learning_rate*5, beta1=self.beta1) \
.minimize(self.d_loss, var_list=d_vars)
self.g_optim = tf.train.AdamOptimizer(self.learning_rate*5, beta1=self.beta1) \
.minimize(self.g_loss, var_list=g_vars)
"""" Testing """
# for test
self.fake_images = self.generator(self.z, is_training=False, reuse=True)
""" Summary """
d_loss_real_sum = tf.summary.scalar("d_loss_real", d_loss_real)
d_loss_fake_sum = tf.summary.scalar("d_loss_fake", d_loss_fake)
d_loss_sum = tf.summary.scalar("d_loss", self.d_loss)
g_loss_sum = tf.summary.scalar("g_loss", self.g_loss)
# final summary operations
self.g_sum = tf.summary.merge([d_loss_fake_sum, g_loss_sum])
self.d_sum = tf.summary.merge([d_loss_real_sum, d_loss_sum])
def train(self):
# initialize all variables
self.sess.run(tf.global_variables_initializer())
# graph inputs for visualize training results
self.sample_z = np.random.uniform(-1, 1, size=(self.batch_size , self.z_dim))
# saver to save model
self.saver = tf.train.Saver()
# restore check-point if it exits
could_load, checkpoint_counter = self.load(self.checkpoint_dir)
if could_load:
start_epoch = int(checkpoint_counter / self.num_batches)
start_batch_id = checkpoint_counter - start_epoch * self.num_batches
counter = checkpoint_counter
print(" [*] Load SUCCESS")
else:
start_epoch = 0
start_batch_id = 0
counter = 1
print(" [!] Load failed...")
# loop for epoch
start_time = time.time()
for epoch in range(start_epoch, self.epoch):
# get batch data
for idx in range(start_batch_id, self.num_batches):
batch_images = self.train_x[idx*self.batch_size:(idx+1)*self.batch_size]
if batch_images.shape[0] != self.batch_size:
continue
batch_images_p = self.get_perturbed_batch(batch_images)
batch_z = np.random.uniform(-1, 1, [self.batch_size, self.z_dim]).astype(np.float32)
# update D network
_, summary_str, d_loss = self.sess.run([self.d_optim, self.d_sum, self.d_loss],
feed_dict={self.inputs: batch_images,
self.inputs_p: batch_images_p,
self.z: batch_z})
self.writer.add_summary(summary_str, counter)
# update G network
_, summary_str, g_loss = self.sess.run([self.g_optim, self.g_sum, self.g_loss],
feed_dict={self.z: batch_z})
self.writer.add_summary(summary_str, counter)
# display training status
counter += 1
print("Epoch: [%2d] [%4d/%4d] time: %4.4f, d_loss: %.8f, g_loss: %.8f" \
% (epoch, idx, self.num_batches, time.time() - start_time, d_loss, g_loss))
# save training results for every 10 steps
if np.mod(counter, 10) == 0:
samples = self.sess.run(self.fake_images,
feed_dict={self.z: self.sample_z})
show_result(samples, os.path.join(result_dir, '_train_{:02d}_{:04d}.png'.format(epoch, idx)))
# After an epoch, start_batch_id is set to zero
# non-zero value is only for the first epoch after loading pre-trained model
start_batch_id = 0
# show temporal results
self.visualize_results(epoch)
# save model for final step
self.save(self.checkpoint_dir, counter)
def visualize_results(self, epoch):
""" random condition, random noise """
z_sample = np.random.uniform(-1, 1, size=(self.batch_size, self.z_dim))
samples = self.sess.run(self.fake_images, feed_dict={self.z: z_sample})
show_result(samples, os.path.join(result_dir, '_epoch_{:02d}.png'.format(epoch)))
@property
def model_dir(self):
return "{}_{}_{}_{}".format(
self.model_name, 'mnist',
self.batch_size, self.z_dim)
def save(self, checkpoint_dir, step):
checkpoint_dir = os.path.join(checkpoint_dir, self.model_dir, self.model_name)
self.saver.save(self.sess, os.path.join(checkpoint_dir, self.model_name+'.model'), global_step=step)
def load(self, checkpoint_dir):
import re
print(" [*] Reading checkpoints...")
checkpoint_dir = os.path.join(checkpoint_dir, self.model_dir, self.model_name)
ckpt = tf.train.get_checkpoint_state(checkpoint_dir)
if ckpt and ckpt.model_checkpoint_path:
ckpt_name = os.path.basename(ckpt.model_checkpoint_path)
self.saver.restore(self.sess, os.path.join(checkpoint_dir, ckpt_name))
counter = int(next(re.finditer("(\d+)(?!.*\d)", ckpt_name)).group(0))
print(" [*] Success to read {}".format(ckpt_name))
return True, counter
else:
print(" [*] Failed to find a checkpoint")
return False, 0
最後就是主函數main()的編寫了:
def main():
if not os.path.exists(checkpoint_dir):
os.makedirs(checkpoint_dir)
if not os.path.exists(result_dir):
os.makedirs(result_dir)
if not os.path.exists('./DRAGAN/logs/'):
os.makedirs('./DRAGAN/logs/')
sess = tf.Session()
sess.run(tf.global_variables_initializer())
dragan = DRAGAN(sess=sess, epoch=epochs, batch_size=batch_size, z_dim=z_dim,
checkpoint_dir=checkpoint_dir, result_dir=result_dir, dataset_dir=dataset_dir)
dragan.build_model()
# show network architecture
show_all_variables()
# launch the graph in a session
dragan.train()
print(" [*] Training finished!")
# visualize learned generator
dragan.visualize_results(epochs - 1)
print(" [*] Testing finished!")
if __name__ == '__main__':
main()
編寫完以上檔案就大功告成,下面就是運作模型檢視結果了。
四、實驗結果
運作完上述程式之後,我們會生成很多圖像,這些圖像都是28*28的,每64張圖組成一幅大圖,是以我們需要先對這些圖進行切割,為了保證切割結果的有效性,我們需要先删除前面不合要求的生成結果,然後可以按照下述代碼進行切割:
from skimage import io
import os
def check_folder(label):
if not os.path.exists('./DRAGAN/gen_cut/'+str(label)):
os.makedirs('./DRAGAN/gen_cut/'+str(label))
def single_cut(img, img_name, label, pad=0):
for i in range(8):
for j in range(8):
roi =img[i*28+i*pad:(i+1)*28+i*pad, j*28+j*pad:(j+1)*28+j*pad]
roi[roi<=128] = 0
io.imsave('./DRAGAN/gen_cut/' + str(label)+'/'+ str(label) + '_' + str(i*8+j) + '_' + img_name +'.jpg', roi)
def main(label):
images = os.listdir('./DRAGAN/sample/'+str(label))
count = 0
for imagename in images:
imagepath = os.path.join('./DRAGAN/sample/'+str(label), imagename)
print(imagepath)
img = io.imread(imagepath)
single_cut(img, str(count), label)
count += 1
if __name__ == '__main__':
if not os.path.exists('./DRAGAN/gen_cut'):
os.makedirs('./DRAGAN/gen_cut')
for i in range(10):
check_folder(i)
main(i)
因為本實驗重在分析,是以就在思考,如何說明DRAGAN能解決模型倒塌問題,且比原始的GAN生成效果更好?經過一些文獻的比較和選擇,最終決定用PSNR來評價(雖然這麼做有一些值得商榷的地方),思路是對原始圖像進行疊加,得到疊加圖,然後分别對DRAGAN的結果進行疊加和GAN的結果疊加,最後利用疊加圖,來計算PSNR,雖然過程不一定合理,但是這個結果還是有一定參考價值的,先給出計算PSNR的代碼,其實比較簡單:
from skimage import io
import numpy as np
import os
def psnr(img1, img2):
mse = (np.abs(img1 - img2) ** 2).mean()
psnr = 10 * np.log10(255 * 255 / mse)
return psnr
def mean_img(folder, pre_process=True):
images = os.listdir(folder)
image_len = len(images)
final_img = 0
for i in range(image_len):
img = io.imread(folder + images[i])
if pre_process:
img = (img < 127) * (255 - img)
img[img < 128] = 0
img[img != 0] = 1
final_img += (img / image_len)
return final_img
if __name__ == '__main__':
# raw data
img1 = mean_img('raw_data/9/', True)
# DRAGAN
img2 = mean_img('DRAGAN/9/', False)
print('psnr of DRAGAN', psnr(img1, img2))
# GAN
img3 = mean_img('GAN/9/', False)
print('psnr of GAN', psnr(img1, img3))
我們先來看看疊加圖的效果,這裡我以數字9為例:
可以明顯看到DRAGAN的效果貌似更好,下面再來看一下最後的輸出信噪比:
五、分析
1. 我們來仔細看看作者的梯度懲罰項是怎麼寫的:
""" DRAGAN Loss (Gradient penalty) """
# This is borrowed from https://github.com/kodalinaveen3/DRAGAN/blob/master/DRAGAN.ipynb
alpha = tf.random_uniform(shape=self.inputs.get_shape(), minval=0.,maxval=1.)
differences = self.inputs_p - self.inputs # This is different from WGAN-GP
interpolates = self.inputs + (alpha * differences)
_, D_inter, _=self.discriminator(interpolates, is_training=True, reuse=True)
gradients = tf.gradients(D_inter, [interpolates])[0]
slopes = tf.sqrt(tf.reduce_sum(tf.square(gradients), reduction_indices=[1]))
gradient_penalty = tf.reduce_mean((slopes - 1.) ** 2)
self.d_loss += self.lambd * gradient_penalty
2. GAN生成圖像的品質評價方法包括Inception Score、Mode Score、Kernel MMD、Wasserstein 距離、Fréchet Inception Distance、1-NN 分類器,具體介紹可參考:https://www.jiqizhixin.com/articles/2018-07-02-3