# encoding:utf-8
# Python2 相容
from __future__ import print_function, division
from scipy.io import loadmat as load
import matplotlib.pyplot as plt
import numpy as np
def reformat(samples, labels):
# 改變原始資料的形狀
# 0 1 2 3 3 0 1 2
# (圖檔高,圖檔寬,通道數,圖檔數) -> (圖檔數,圖檔高,圖檔寬,通道數)
new = np.transpose(samples, (3, 0, 1, 2)).astype(np.float32)
# labels 變成 one-hot encoding, [2] -> [0, 0, 1, 0, 0, 0, 0, 0, 0, 0]
# digit 0 , represented as 10
# labels 變成 one-hot encoding, [10] -> [1, 0, 0, 0, 0, 0, 0, 0, 0, 0]
labels = np.array([x[0] for x in labels]) # slow code, whatever
one_hot_labels = []
for num in labels:
one_hot = [0.0] * 10
if num == 10:
one_hot[0] = 1.0
one_hot[num] = 1.0
labels = np.array(one_hot_labels).astype(np.float32)
return new, labels
def normalize(samples):
并且灰階化: 從三色通道 -> 單色通道 省記憶體 + 加快訓練速度
(R + G + B) / 3
将圖檔從 0 ~ 255 線性映射到 -1.0 ~ +1.0
@samples: numpy array
a = np.add.reduce(samples, keepdims=True, axis=3) # shape (圖檔數,圖檔高,圖檔寬,通道數)
a = a/3.0
return a/128.0 - 1.0
def distribution(labels, name):
# 檢視一下每個label的分布,再畫個統計圖
# keys:
# 0
# 1
# 2
# ...
# 9
count = {}
for label in labels:
key = 0 if label[0] == 10 else label[0]
if key in count:
count[key] += 1
count[key] = 1
x = []
y = []
for k, v in count.items():
# print(k, v)
y_pos = np.arange(len(x))
plt.bar(y_pos, y, align='center', alpha=0.5)
plt.xticks(y_pos, x)
plt.title(name + ' Label Distribution')
def inspect(dataset, labels, i):
# 顯示圖檔看看
if dataset.shape[3] == 1:
shape = dataset.shape
dataset = dataset.reshape(shape[0], shape[1], shape[2])
train = load('../data/train_32x32.mat')
test = load('../data/test_32x32.mat')
# extra = load('../data/extra_32x32.mat')
# print('Train Samples Shape:', train['X'].shape)
# print('Train Labels Shape:', train['y'].shape)
# print('Train Samples Shape:', test['X'].shape)
# print('Train Labels Shape:', test['y'].shape)
# print('Train Samples Shape:', extra['X'].shape)
# print('Train Labels Shape:', extra['y'].shape)
train_samples = train['X']
train_labels = train['y']
test_samples = test['X']
test_labels = test['y']
# extra_samples = extra['X']
# extra_labels = extra['y']
n_train_samples, _train_labels = reformat(train_samples, train_labels)
n_test_samples, _test_labels = reformat(test_samples, test_labels)
_train_samples = normalize(n_train_samples)
_test_samples = normalize(n_test_samples)
num_labels = 10
image_size = 32
num_channels = 1
if __name__ == '__main__':
# 探索資料
# inspect(_train_samples, _train_labels, 1234)
# _train_samples = normalize(_train_samples)
# inspect(_train_samples, _train_labels, 1234)
# distribution(train_labels, 'Train Labels')
# distribution(test_labels, 'Test Labels')
# 為了 Python2 玩家們
from __future__ import print_function, division
# 第三方
import tensorflow as tf
from sklearn.metrics import confusion_matrix
import numpy as np
# 我們自己
import load
train_samples, train_labels = load._train_samples, load._train_labels
test_samples, test_labels = load._test_samples, load._test_labels
print('Training set', train_samples.shape, train_labels.shape)
print(' Test set', test_samples.shape, test_labels.shape)
image_size = load.image_size
num_labels = load.num_labels
num_channels = load.num_channels
def get_chunk(samples, labels, chunkSize):
Iterator/Generator: get a batch of data
這個函數是一個疊代器/生成器,用于每一次隻得到 chunkSize 這麼多的資料
用于 for loop, just like range() function
if len(samples) != len(labels):
raise Exception('Length of samples and labels must equal')
stepStart = 0 # initial step
i = 0
while stepStart < len(samples):
stepEnd = stepStart + chunkSize
if stepEnd < len(samples):
yield i, samples[stepStart:stepEnd], labels[stepStart:stepEnd]
i += 1
stepStart = stepEnd
class Network():
def __init__(self, num_hidden, batch_size):
@num_hidden: 隐藏層的節點數量
self.batch_size = batch_size
self.test_batch_size = 500
# Hyper Parameters
self.num_hidden = num_hidden
# Graph Related
self.graph = tf.Graph()
self.tf_train_samples = None
self.tf_train_labels = None
self.tf_test_samples = None
self.tf_test_labels = None
self.tf_test_prediction = None
# 統計
self.merged = None
# 初始化
self.session = tf.Session(graph=self.graph)
self.writer = tf.train.SummaryWriter('./board', self.graph)
def define_graph(self):
with self.graph.as_default():
# 這裡隻是定義圖譜中的各種變量
with tf.name_scope('inputs'):
self.tf_train_samples = tf.placeholder(
tf.float32, shape=(self.batch_size, image_size, image_size, num_channels), name='tf_train_samples'
self.tf_train_labels = tf.placeholder(
tf.float32, shape=(self.batch_size, num_labels), name='tf_train_labels'
self.tf_test_samples = tf.placeholder(
tf.float32, shape=(self.test_batch_size, image_size, image_size, num_channels), name='tf_test_samples'
# fully connected layer 1, fully connected
with tf.name_scope('fc1'):
fc1_weights = tf.Variable(
tf.truncated_normal([image_size * image_size, self.num_hidden], stddev=0.1), name='fc1_weights'
fc1_biases = tf.Variable(tf.constant(0.1, shape=[self.num_hidden]), name='fc1_biases')
tf.histogram_summary('fc1_weights', fc1_weights)
tf.histogram_summary('fc1_biases', fc1_biases)
# fully connected layer 2 --> output layer
with tf.name_scope('fc2'):
fc2_weights = tf.Variable(
tf.truncated_normal([self.num_hidden, num_labels], stddev=0.1), name='fc2_weights'
fc2_biases = tf.Variable(tf.constant(0.1, shape=[num_labels]), name='fc2_biases')
tf.histogram_summary('fc2_weights', fc2_weights)
tf.histogram_summary('fc2_biases', fc2_biases)
# 想在來定義圖譜的運算
def model(data):
# fully connected layer 1
shape = data.get_shape().as_list()
reshape = tf.reshape(data, [shape[0], shape[1] * shape[2] * shape[3]])
with tf.name_scope('fc1_model'):
fc1_model = tf.matmul(reshape, fc1_weights) + fc1_biases
hidden = tf.nn.relu(fc1_model)
# fully connected layer 2
with tf.name_scope('fc2_model'):
return tf.matmul(hidden, fc2_weights) + fc2_biases
# Training computation.
logits = model(self.tf_train_samples)
with tf.name_scope('loss'):
self.loss = tf.reduce_mean(
tf.nn.softmax_cross_entropy_with_logits(logits, self.tf_train_labels)
tf.scalar_summary('Loss', self.loss)
# Optimizer.
with tf.name_scope('optimizer'):
self.optimizer = tf.train.GradientDescentOptimizer(0.0001).minimize(self.loss)
# Predictions for the training, validation, and test data.
with tf.name_scope('predictions'):
self.train_prediction = tf.nn.softmax(logits, name='train_prediction')
self.test_prediction = tf.nn.softmax(model(self.tf_test_samples), name='test_prediction')
self.merged = tf.merge_all_summaries()
def run(self):
# private function
def print_confusion_matrix(confusionMatrix):
print('Confusion Matrix:')
for i, line in enumerate(confusionMatrix):
print(line, line[i]/np.sum(line))
a = 0
for i, column in enumerate(np.transpose(confusionMatrix, (1, 0))):
a += (column[i]/np.sum(column))*(np.sum(column)/26000)
print('\n',np.sum(confusionMatrix), a)
with self.session as session:
### 訓練
print('Start Training')
# batch 1000
for i, samples, labels in get_chunk(train_samples, train_labels, chunkSize=self.batch_size):
_, l, predictions, summary = session.run(
[self.optimizer, self.loss, self.train_prediction, self.merged],
feed_dict={self.tf_train_samples: samples, self.tf_train_labels: labels}
self.writer.add_summary(summary, i)
# labels is True Labels
accuracy, _ = self.accuracy(predictions, labels)
if i % 50 == 0:
print('Minibatch loss at step %d: %f' % (i, l))
print('Minibatch accuracy: %.1f%%' % accuracy)
### 測試
accuracies = []
confusionMatrices = []
for i, samples, labels in get_chunk(test_samples, test_labels, chunkSize=self.test_batch_size):
result = self.test_prediction.eval(feed_dict={self.tf_test_samples: samples})
accuracy, cm = self.accuracy(result, labels, need_confusion_matrix=True)
print('Test Accuracy: %.1f%%' % accuracy)
print(' Average Accuracy:', np.average(accuracies))
print('Standard Deviation:', np.std(accuracies))
def accuracy(self, predictions, labels, need_confusion_matrix=False):
@return: accuracy and confusionMatrix as a tuple
_predictions = np.argmax(predictions, 1)
_labels = np.argmax(labels, 1)
cm = confusion_matrix(_labels, _predictions) if need_confusion_matrix else None
# == is overloaded for numpy array
accuracy = (100.0 * np.sum(_predictions == _labels) / predictions.shape[0])
return accuracy, cm
if __name__ == '__main__':
net = Network(num_hidden=128, batch_size=100)