有关于自编码器的原理,请参考博客http://blog.csdn.net/xukaiwen_2016/article/details/70767518;对于对其与原理熟悉的可以直接看下面代码。
首先是使用到的相关库,数学运算相关操作库Numpy和对数据进行预处理的模块Scikit-lean中的preprocessing,使用TensorFlow的MNIST作为数据集。
import numpy as np import sklearn.preprocessing as prep import tensorflow as tf from tensorflow.examples.tutorials.mnist import input_data
我们知道自编码器最重要的就是求出输入层到数据量较少的隐含层之间的映射矩阵,而这个矩阵是需要初始化的,对于我们的深度学习模型来说,这个矩阵里的初始化数据要尽可能的大小合适,均匀或高斯分布。xavier initialization参数初始化方法正好合适,也经常使用。
def xavier_init(fan_in, fan_out, constant = 1): low = -constant * np.sqrt(6.0 / (fan_in + fan_out)) high = constant * np.sqrt(6.0 / (fan_in + fan_out)) return tf.random_uniform((fan_in, fan_out),minval = low, maxval = high,dtype = tf.float32)
接下来定义一个自编码器的class方便我们重复使用,其中包含_init_()构造函数以及多个成员函数。
首先是_init_()构造函数,参数:n_input(输入变量数),n_hidden(隐含层变量数),transfer_function(隐含层激活函数,默认softplus),optimizer(优化器,默认为Adam),scale(高斯噪声系数,默认0.1)。_initialize_weights()在之后定义,完成映射矩阵以及偏置向量。
def __init__(self, n_input, n_hidden, transfer_function = tf.nn.softplus, optimizer = tf.train.AdamOptimizer(),scale = 0.1): self.n_input = n_input self.n_hidden = n_hidden self.transfer = transfer_function self.scale = tf.placeholder(tf.float32) #定义成一个placeholder self.training_scale = scale network_weights = self._initialize_weights() self.weights = network_weights # 定义模型,也就是输入层,隐含层,输出层以及之间的映射矩阵 self.x = tf.placeholder(tf.float32, [None, self.n_input]) self.hidden = self.transfer(tf.add(tf.matmul(self.x + scale * tf.random_normal((n_input,)), self.weights[‘w1‘]), self.weights[‘b1‘])) self.reconstruction = tf.add(tf.matmul(self.hidden, self.weights[‘w2‘]), self.weights[‘b2‘]) # 定义损失函数,这里我们使用平方差,因为下面的激活函数选择的是恒等 self.cost = 0.5 * tf.reduce_sum(tf.pow(tf.subtract(self.reconstruction, self.x), 2.0)) self.optimizer = optimizer.minimize(self.cost)#优化器为求损失极小化 init = tf.global_variables_initializer() self.sess = tf.Session() self.sess.run(init)
_initialize_weights()完成映射矩阵以及偏置向量,里面调用了之前的xavier_init函数。
def _initialize_weights(self): # 字典类型 all_weights = dict() # 输入层到隐含层矩阵 all_weights[‘w1‘] = tf.Variable(xavier_init(self.n_input, self.n_hidden))# # 输入层到隐含层偏置向量 all_weights[‘b1‘] = tf.Variable(tf.zeros([self.n_hidden], dtype = tf.float32)) # 隐含层到输出层矩阵,可以看出是w1的逆 all_weights[‘w2‘] = tf.Variable(tf.zeros([self.n_hidden, self.n_input], dtype = tf.float32)) # 隐含层到输出层偏置向量 all_weights[‘b2‘] = tf.Variable(tf.zeros([self.n_input], dtype = tf.float32)) return all_weights
partial_fit()使用batch进行训练的函数,训练时使用六cost和optimizer,feed_dict喂数据,包括输入数据和高斯噪声系数。
def partial_fit(self, X): cost, opt = self.sess.run((self.cost, self.optimizer),feed_dict = {self.x: X,self.scale: self.training_scale}) return cost
calc_total_cost()为只求损失不进行损失极小化操作,只执行cost
def calc_total_cost(self, X): return self.sess.run(self.cost, feed_dict = {self.x: X,self.scale: self.training_scale})
transform()根据输入获取其隐含层数据。
def transform(self, X): return self.sess.run(self.hidden, feed_dict = {self.x: X,self.scale: self.training_scale})
generate()根据隐含层数据获取输出层数据。
def generate(self, hidden = None): if hidden is None:hidden = np.random.normal(size = self.weights["b1"]) return self.sess.run(self.reconstruction, feed_dict = {self.hidden: hidden})
reconstruct()根据输入层数据获取输出层数据,相当于transform()+generate()。
def reconstruct(self, X): return self.sess.run(self.reconstruction, feed_dict = {self.x: X,self.scale: self.training_scale})
接下来的就是获取映射矩阵和偏置向量。
def getWeights(self): return self.sess.run(self.weights[‘w1‘]) def getBiases(self): return self.sess.run(self.weights[‘b1‘])
以上就是自编码器class的完整定义。
读取数据集
mnist = input_data.read_data_sets(‘MNIST_data‘, one_hot = True)
standard_scale()对输入的图片数据和测试数据进行标准化,也就是让图片的像素值映射到0-1空间,这个时候我们需要使用sklearn.preprocessing中的StandardScaler。
def standard_scale(X_train, X_test): preprocessor = prep.StandardScaler().fit(X_train) X_train = preprocessor.transform(X_train) X_test = preprocessor.transform(X_test) return X_train, X_test
get_random_block_from_data()随机获取若干图片,我们不是把MNIST中所有数据进行训练。
def get_random_block_from_data(data, batch_size): start_index = np.random.randint(0, len(data) - batch_size) return data[start_index:(start_index + batch_size)]
最后就是进行训练了,下面的代码应该很熟悉了,也很简单,不再多说,直接贴出剩下的全部代码:
X_train, X_test = standard_scale(mnist.train.images, mnist.test.images) n_samples = int(mnist.train.num_examples) training_epochs = 20 batch_size = 128 display_step = 1 autoencoder = AdditiveGaussianNoiseAutoencoder(n_input = 784, n_hidden = 200, transfer_function = tf.nn.softplus, optimizer = tf.train.AdamOptimizer(learning_rate = 0.001), scale = 0.01) for epoch in range(training_epochs): avg_cost = 0. total_batch = int(n_samples / batch_size) # 循环载入训练数据 for i in range(total_batch): batch_xs = get_random_block_from_data(X_train, batch_size) # 使用batch喂入训练数据 cost = autoencoder.partial_fit(batch_xs) # Compute average loss avg_cost += cost / n_samples * batch_size # 展示损失 if epoch % display_step == 0: print("Epoch:", ‘%04d‘ % (epoch + 1), "cost=", "{:.9f}".format(avg_cost)) print("Total cost: " + str(autoencoder.calc_total_cost(X_test)))