天天看點

TF Girls——TensorBoard可視化(10-11)

資料下載下傳:http://ufldl.stanford.edu/housenumbers/

load.py

# encoding:utf-8

# Python2 相容

from __future__ import print_function, division

from scipy.io import loadmat as load

import matplotlib.pyplot as plt

import numpy as np



def reformat(samples, labels):

	# 改變原始資料的形狀

	#  0       1       2      3          3       0       1      2

	# (圖檔高,圖檔寬,通道數,圖檔數) -> (圖檔數,圖檔高,圖檔寬,通道數)

	new = np.transpose(samples, (3, 0, 1, 2)).astype(np.float32)



	# labels 變成 one-hot encoding, [2] -> [0, 0, 1, 0, 0, 0, 0, 0, 0, 0]

	# digit 0 , represented as 10

	# labels 變成 one-hot encoding, [10] -> [1, 0, 0, 0, 0, 0, 0, 0, 0, 0]

	labels = np.array([x[0] for x in labels])	# slow code, whatever

	one_hot_labels = []

	for num in labels:

		one_hot = [0.0] * 10

		if num == 10:

			one_hot[0] = 1.0

		else:

			one_hot[num] = 1.0

		one_hot_labels.append(one_hot)

	labels = np.array(one_hot_labels).astype(np.float32)

	return new, labels



def normalize(samples):

	'''

	并且灰階化: 從三色通道 -> 單色通道     省記憶體 + 加快訓練速度

	(R + G + B) / 3

	将圖檔從 0 ~ 255 線性映射到 -1.0 ~ +1.0

	@samples: numpy array

	'''

	a = np.add.reduce(samples, keepdims=True, axis=3)  # shape (圖檔數,圖檔高,圖檔寬,通道數)

	a = a/3.0

	return a/128.0 - 1.0





def distribution(labels, name):

	# 檢視一下每個label的分布,再畫個統計圖

	# keys:

	# 0

	# 1

	# 2

	# ...

	# 9

	count = {}

	for label in labels:

		key = 0 if label[0] == 10 else label[0]

		if key in count:

			count[key] += 1

		else:

			count[key] = 1

	x = []

	y = []

	for k, v in count.items():

		# print(k, v)

		x.append(k)

		y.append(v)



	y_pos = np.arange(len(x))

	plt.bar(y_pos, y, align='center', alpha=0.5)

	plt.xticks(y_pos, x)

	plt.ylabel('Count')

	plt.title(name + ' Label Distribution')

	plt.show()



def inspect(dataset, labels, i):

	# 顯示圖檔看看

	if dataset.shape[3] == 1:

		shape = dataset.shape

		dataset = dataset.reshape(shape[0], shape[1], shape[2])

	print(labels[i])

	plt.imshow(dataset[i])

	plt.show()





train = load('../data/train_32x32.mat')

test = load('../data/test_32x32.mat')

# extra = load('../data/extra_32x32.mat')



# print('Train Samples Shape:', train['X'].shape)

# print('Train  Labels Shape:', train['y'].shape)



# print('Train Samples Shape:', test['X'].shape)

# print('Train  Labels Shape:', test['y'].shape)



# print('Train Samples Shape:', extra['X'].shape)

# print('Train  Labels Shape:', extra['y'].shape)



train_samples = train['X']

train_labels = train['y']

test_samples = test['X']

test_labels = test['y']

# extra_samples = extra['X']

# extra_labels = extra['y']



n_train_samples, _train_labels = reformat(train_samples, train_labels)

n_test_samples, _test_labels = reformat(test_samples, test_labels)



_train_samples = normalize(n_train_samples)

_test_samples = normalize(n_test_samples)



num_labels = 10

image_size = 32

num_channels = 1



if __name__ == '__main__':

	# 探索資料

	pass

	# inspect(_train_samples, _train_labels, 1234)

	# _train_samples = normalize(_train_samples)

	# inspect(_train_samples, _train_labels, 1234)

	# distribution(train_labels, 'Train Labels')

	# distribution(test_labels, 'Test Labels')
           

dp.py

# 為了 Python2 玩家們

from __future__ import print_function, division



# 第三方

import tensorflow as tf

from sklearn.metrics import confusion_matrix

import numpy as np



# 我們自己

import load



train_samples, train_labels = load._train_samples, load._train_labels

test_samples, test_labels = load._test_samples,  load._test_labels



print('Training set', train_samples.shape, train_labels.shape)

print('    Test set', test_samples.shape, test_labels.shape)



image_size = load.image_size

num_labels = load.num_labels

num_channels = load.num_channels



def get_chunk(samples, labels, chunkSize):

	'''

	Iterator/Generator: get a batch of data

	這個函數是一個疊代器/生成器,用于每一次隻得到 chunkSize 這麼多的資料

	用于 for loop, just like range() function

	'''

	if len(samples) != len(labels):

		raise Exception('Length of samples and labels must equal')

	stepStart = 0	# initial step

	i = 0

	while stepStart < len(samples):

		stepEnd = stepStart + chunkSize

		if stepEnd < len(samples):

			yield i, samples[stepStart:stepEnd], labels[stepStart:stepEnd]

			i += 1

		stepStart = stepEnd





class Network():

	def __init__(self, num_hidden, batch_size):

		'''

		@num_hidden: 隐藏層的節點數量

		@batch_size:因為我們要節省記憶體,是以分批處理資料。每一批的資料量。

		'''

		self.batch_size = batch_size

		self.test_batch_size = 500



		# Hyper Parameters

		self.num_hidden = num_hidden



		# Graph Related

		self.graph = tf.Graph()

		self.tf_train_samples = None

		self.tf_train_labels = None

		self.tf_test_samples = None

		self.tf_test_labels = None

		self.tf_test_prediction = None



		# 統計

		self.merged = None



		# 初始化

		self.define_graph()

		self.session = tf.Session(graph=self.graph)

		self.writer = tf.train.SummaryWriter('./board', self.graph)



	def define_graph(self):

		'''

		定義我的的計算圖譜

		'''

		with self.graph.as_default():

			# 這裡隻是定義圖譜中的各種變量

			with tf.name_scope('inputs'):

				self.tf_train_samples = tf.placeholder(

					tf.float32, shape=(self.batch_size, image_size, image_size, num_channels), name='tf_train_samples'

				)

				self.tf_train_labels  = tf.placeholder(

					tf.float32, shape=(self.batch_size, num_labels), name='tf_train_labels'

				)

				self.tf_test_samples  = tf.placeholder(

					tf.float32, shape=(self.test_batch_size, image_size, image_size, num_channels), name='tf_test_samples'

				)



			# fully connected layer 1, fully connected

			with tf.name_scope('fc1'):

				fc1_weights = tf.Variable(

					tf.truncated_normal([image_size * image_size, self.num_hidden], stddev=0.1), name='fc1_weights'

				)

				fc1_biases = tf.Variable(tf.constant(0.1, shape=[self.num_hidden]), name='fc1_biases')

				tf.histogram_summary('fc1_weights', fc1_weights)

				tf.histogram_summary('fc1_biases', fc1_biases)



			# fully connected layer 2 --> output layer

			with tf.name_scope('fc2'):

				fc2_weights = tf.Variable(

					tf.truncated_normal([self.num_hidden, num_labels], stddev=0.1), name='fc2_weights'

				)

				fc2_biases = tf.Variable(tf.constant(0.1, shape=[num_labels]), name='fc2_biases')

				tf.histogram_summary('fc2_weights', fc2_weights)

				tf.histogram_summary('fc2_biases', fc2_biases)





			# 想在來定義圖譜的運算

			def model(data):

				# fully connected layer 1

				shape = data.get_shape().as_list()

				reshape = tf.reshape(data, [shape[0], shape[1] * shape[2] * shape[3]])



				with tf.name_scope('fc1_model'):

					fc1_model = tf.matmul(reshape, fc1_weights) + fc1_biases

					hidden = tf.nn.relu(fc1_model)



				# fully connected layer 2

				with tf.name_scope('fc2_model'):

					return tf.matmul(hidden, fc2_weights) + fc2_biases



			# Training computation.

			logits = model(self.tf_train_samples)

			with tf.name_scope('loss'):

				self.loss = tf.reduce_mean(

					tf.nn.softmax_cross_entropy_with_logits(logits, self.tf_train_labels)

				)

				tf.scalar_summary('Loss', self.loss)





			# Optimizer.

			with tf.name_scope('optimizer'):

				self.optimizer = tf.train.GradientDescentOptimizer(0.0001).minimize(self.loss)



			# Predictions for the training, validation, and test data.

			with tf.name_scope('predictions'):

				self.train_prediction = tf.nn.softmax(logits, name='train_prediction')

				self.test_prediction = tf.nn.softmax(model(self.tf_test_samples), name='test_prediction')



			self.merged = tf.merge_all_summaries()



	def run(self):

		'''

		用到Session

		'''

		# private function

		def print_confusion_matrix(confusionMatrix):

			print('Confusion    Matrix:')

			for i, line in enumerate(confusionMatrix):

				print(line, line[i]/np.sum(line))

			a = 0

			for i, column in enumerate(np.transpose(confusionMatrix, (1, 0))):

				a += (column[i]/np.sum(column))*(np.sum(column)/26000)

				print(column[i]/np.sum(column),)

			print('\n',np.sum(confusionMatrix), a)





		with self.session as session:

			tf.initialize_all_variables().run()



			### 訓練

			print('Start Training')

			# batch 1000

			for i, samples, labels in get_chunk(train_samples, train_labels, chunkSize=self.batch_size):

				_, l, predictions, summary = session.run(

					[self.optimizer, self.loss, self.train_prediction, self.merged],

					feed_dict={self.tf_train_samples: samples, self.tf_train_labels: labels}

				)

				self.writer.add_summary(summary, i)

				# labels is True Labels

				accuracy, _ = self.accuracy(predictions, labels)

				if i % 50 == 0:

					print('Minibatch loss at step %d: %f' % (i, l))

					print('Minibatch accuracy: %.1f%%' % accuracy)

			###



			### 測試

			accuracies = []

			confusionMatrices = []

			for i, samples, labels in get_chunk(test_samples, test_labels, chunkSize=self.test_batch_size):

				result = self.test_prediction.eval(feed_dict={self.tf_test_samples: samples})

				accuracy, cm = self.accuracy(result, labels, need_confusion_matrix=True)

				accuracies.append(accuracy)

				confusionMatrices.append(cm)

				print('Test Accuracy: %.1f%%' % accuracy)

			print(' Average  Accuracy:', np.average(accuracies))

			print('Standard Deviation:', np.std(accuracies))

			print_confusion_matrix(np.add.reduce(confusionMatrices))

			###



	def accuracy(self, predictions, labels, need_confusion_matrix=False):

		'''

		計算預測的正确率與召回率

		@return: accuracy and confusionMatrix as a tuple

		'''

		_predictions = np.argmax(predictions, 1)

		_labels = np.argmax(labels, 1)

		cm = confusion_matrix(_labels, _predictions) if need_confusion_matrix else None

		# == is overloaded for numpy array

		accuracy = (100.0 * np.sum(_predictions == _labels) / predictions.shape[0])

		return accuracy, cm





if __name__ == '__main__':

	net = Network(num_hidden=128, batch_size=100)

	net.run()
           

繼續閱讀