企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# 实现 LSTM 模型 我们将扩展我们的 RNN 模型,以便通过在此秘籍中引入 LSTM 单元来使用更长的序列。 ## 做好准备 长短期记忆(LSTM)是传统 RNN 的变体。 LSTM 是一种解决可变长度 RNN 所具有的消失/爆炸梯度问题的方法。为了解决这个问题,LSTM 单元引入了一个内部遗忘门,它可以修改从一个单元到下一个单元的信息流。为了概念化它的工作原理,我们将逐步介绍一个无偏置的 LSTM 方程式。第一步与常规 RNN 相同: ![](https://img.kancloud.cn/c0/b6/c0b6487193d132da75126c5baa925f4b_1780x220.png) 为了确定我们想要忘记或通过的值,我们将如下评估候选值。这些值通常称为存储单元: ![](https://img.kancloud.cn/af/6c/af6c0c8c5e0abbadb68d6bf93d9c6a4f_2150x220.png) 现在我们用一个遗忘矩阵修改候选存储单元,其计算方法如下: ![](https://img.kancloud.cn/77/a0/77a0f876c8aac693a50319b71ea86ff6_1860x230.png) 我们现在将遗忘存储器与先前的存储器步骤相结合,并将其添加到候选存储器单元以获得新的存储器值: ![](https://img.kancloud.cn/02/cc/02cc376ecfad5f33c1a2846af74553a4_1640x200.png) 现在我们将所有内容组合起来以获取单元格的输出: ![](https://img.kancloud.cn/2b/93/2b93629782163bca7a0d40d2112a9b21_2570x220.png) 然后,对于下一次迭代,我们更新 h 如下: ![](https://img.kancloud.cn/f2/5e/f25e1357fd5b4e27bac9faaf9d66d57f_1510x220.png) LSTM 的想法是通过基于输入到细胞的信息可以忘记或修改的细胞具有自我调节的信息流。 > 在这里使用 TensorFlow 的一个好处是我们不必跟踪这些操作及其相应的反向传播属性。 TensorFlow 将跟踪这些并根据我们的损失函数,优化器和学习率指定的梯度自动更新模型变量。 对于这个秘籍,我们将使用具有 LSTM 细胞的序列 RNN 来尝试预测接下来的单词,对莎士比亚的作品进行训练。为了测试我们的工作方式,我们将提供模型候选短语,例如`thou art more`,并查看模型是否可以找出短语后面应该包含的单词。 ## 操作步骤 1. 首先,我们为脚本加载必要的库: ```py import os import re import string import requests import numpy as np import collections import random import pickle import matplotlib.pyplot as plt import tensorflow as tf ``` 1. 接下来,我们启动图会话并设置 RNN 参数: ```py sess = tf.Session() # Set RNN Parameters min_word_freq = 5 rnn_size = 128 epochs = 10 batch_size = 100 learning_rate = 0.001 training_seq_len = 50 embedding_size = rnn_size save_every = 500 eval_every = 50 prime_texts = ['thou art more', 'to be or not to', 'wherefore art thou'] ``` 1. 我们设置数据和模型文件夹和文件名,同时声明要删除的标点符号。我们希望保留连字符和撇号,因为莎士比亚经常使用它们来组合单词和音节: ```py data_dir = 'temp' data_file = 'shakespeare.txt' model_path = 'shakespeare_model' full_model_dir = os.path.join(data_dir, model_path) # Declare punctuation to remove, everything except hyphens and apostrophe's punctuation = string.punctuation punctuation = ''.join([x for x in punctuation if x not in ['-', "'"]]) ``` 1. 接下来,我们获取数据。如果数据文件不存在,我们下载并保存莎士比亚文本。如果确实存在,我们加载数据: ```py if not os.path.exists(full_model_dir): os.makedirs(full_model_dir) # Make data directory if not os.path.exists(data_dir): os.makedirs(data_dir) print('Loading Shakespeare Data') # Check if file is downloaded. if not os.path.isfile(os.path.join(data_dir, data_file)): print('Not found, downloading Shakespeare texts from www.gutenberg.org') shakespeare_url = 'http://www.gutenberg.org/cache/epub/100/pg100.txt' # Get Shakespeare text response = requests.get(shakespeare_url) shakespeare_file = response.content # Decode binary into string s_text = shakespeare_file.decode('utf-8') # Drop first few descriptive paragraphs. s_text = s_text[7675:] # Remove newlines s_text = s_text.replace('\r\n', '') s_text = s_text.replace('\n', '') # Write to file with open(os.path.join(data_dir, data_file), 'w') as out_conn: out_conn.write(s_text) else: # If file has been saved, load from that file with open(os.path.join(data_dir, data_file), 'r') as file_conn: s_text = file_conn.read().replace('\n', '') ``` 1. 我们通过删除标点符号和额外的空格来清理莎士比亚的文本: ```py s_text = re.sub(r'[{}]'.format(punctuation), ' ', s_text) s_text = re.sub('s+', ' ', s_text ).strip().lower() ``` 1. 我们现在处理创建要使用的莎士比亚词汇。我们创建一个函数,它将返回两个字典(单词到索引和索引到单词),其中的单词出现的频率超过指定的频率: ```py def build_vocab(text, min_word_freq): word_counts = collections.Counter(text.split(' ')) # limit word counts to those more frequent than cutoff word_counts = {key:val for key, val in word_counts.items() if val>min_word_freq} # Create vocab --> index mapping words = word_counts.keys() vocab_to_ix_dict = {key:(ix+1) for ix, key in enumerate(words)} # Add unknown key --> 0 index vocab_to_ix_dict['unknown']=0 # Create index --> vocab mapping ix_to_vocab_dict = {val:key for key,val in vocab_to_ix_dict.items()} return ix_to_vocab_dict, vocab_to_ix_dict ix2vocab, vocab2ix = build_vocab(s_text, min_word_freq) vocab_size = len(ix2vocab) + 1 ``` > 请注意,在处理文本时,我们必须小心索引值为零的单词。我们应该保存填充的零值,也可能保存未知单词。 1. 现在我们有了词汇量,我们将莎士比亚的文本变成了一系列索引: ```py s_text_words = s_text.split(' ') s_text_ix = [] for ix, x in enumerate(s_text_words): try: s_text_ix.append(vocab2ix[x]) except: s_text_ix.append(0) s_text_ix = np.array(s_text_ix) ``` 1. 在本文中,我们将展示如何在类对象中创建模型。这对我们很有帮助,因为我们希望使用相同的模型(具有相同的权重)来批量训练并从示例文本生成文本。如果没有采用内部抽样方法的课程,这将很难做到。理想情况下,此类代码应位于单独的 Python 文件中,我们可以在此脚本的开头导入该文件: ```py class LSTM_Model(): def __init__(self, rnn_size, batch_size, learning_rate, training_seq_len, vocab_size, infer =False): self.rnn_size = rnn_size self.vocab_size = vocab_size self.infer = infer self.learning_rate = learning_rate if infer: self.batch_size = 1 self.training_seq_len = 1 else: self.batch_size = batch_size self.training_seq_len = training_seq_len self.lstm_cell = tf.nn.rnn_cell.BasicLSTMCell(rnn_size) self.initial_state = self.lstm_cell.zero_state(self.batch_size, tf.float32) self.x_data = tf.placeholder(tf.int32, [self.batch_size, self.training_seq_len]) self.y_output = tf.placeholder(tf.int32, [self.batch_size, self.training_seq_len]) with tf.variable_scope('lstm_vars'): # Softmax Output Weights W = tf.get_variable('W', [self.rnn_size, self.vocab_size], tf.float32, tf.random_normal_initializer()) b = tf.get_variable('b', [self.vocab_size], tf.float32, tf.constant_initializer(0.0)) # Define Embedding embedding_mat = tf.get_variable('embedding_mat', [self.vocab_size, self.rnn_size], tf.float32, tf.random_normal_initializer()) embedding_output = tf.nn.embedding_lookup(embedding_mat, self.x_data) rnn_inputs = tf.split(embedding_output, num_or_size_splits=self.training_seq_len, axis=1) rnn_inputs_trimmed = [tf.squeeze(x, [1]) for x in rnn_inputs] # If we are inferring (generating text), we add a 'loop' function # Define how to get the i+1 th input from the i th output def inferred_loop(prev, count): prev_transformed = tf.matmul(prev, W) + b prev_symbol = tf.stop_gradient(tf.argmax(prev_transformed, 1)) output = tf.nn.embedding_lookup(embedding_mat, prev_symbol) return output decoder = tf.nn.seq2seq.rnn_decoder outputs, last_state = decoder(rnn_inputs_trimmed, self.initial_state, self.lstm_cell, loop_function=inferred_loop if infer else None) # Non inferred outputs output = tf.reshape(tf.concat(1, outputs), [-1, self.rnn_size]) # Logits and output self.logit_output = tf.matmul(output, W) + b self.model_output = tf.nn.softmax(self.logit_output) loss_fun = tf.contrib.legacy_seq2seq.sequence_loss_by_example loss = loss_fun([self.logit_output],[tf.reshape(self.y_output, [-1])], [tf.ones([self.batch_size * self.training_seq_len])], self.vocab_size) self.cost = tf.reduce_sum(loss) / (self.batch_size * self.training_seq_len) self.final_state = last_state gradients, _ = tf.clip_by_global_norm(tf.gradients(self.cost, tf.trainable_variables()), 4.5) optimizer = tf.train.AdamOptimizer(self.learning_rate) self.train_op = optimizer.apply_gradients(zip(gradients, tf.trainable_variables())) def sample(self, sess, words=ix2vocab, vocab=vocab2ix, num=10, prime_text='thou art'): state = sess.run(self.lstm_cell.zero_state(1, tf.float32)) word_list = prime_text.split() for word in word_list[:-1]: x = np.zeros((1, 1)) x[0, 0] = vocab[word] feed_dict = {self.x_data: x, self.initial_state:state} [state] = sess.run([self.final_state], feed_dict=feed_dict) out_sentence = prime_text word = word_list[-1] for n in range(num): x = np.zeros((1, 1)) x[0, 0] = vocab[word] feed_dict = {self.x_data: x, self.initial_state:state} [model_output, state] = sess.run([self.model_output, self.final_state], feed_dict=feed_dict) sample = np.argmax(model_output[0]) if sample == 0: break word = words[sample] out_sentence = out_sentence + ' ' + word return out_sentence ``` 1. 现在我们将声明 LSTM 模型以及测试模型。我们将在变量范围内执行此操作,并告诉范围我们将重用测试 LSTM 模型的变量: ```py with tf.variable_scope('lstm_model', reuse=tf.AUTO_REUSE) as scope: # Define LSTM Model lstm_model = LSTM_Model(rnn_size, batch_size, learning_rate, training_seq_len, vocab_size) scope.reuse_variables() test_lstm_model = LSTM_Model(rnn_size, batch_size, learning_rate, training_seq_len, vocab_size, infer=True) ``` 1. 我们创建一个保存操作,并将输入文本拆分为相等的批量大小的块。然后我们初始化模型的变量: ```py saver = tf.train.Saver() # Create batches for each epoch num_batches = int(len(s_text_ix)/(batch_size * training_seq_len)) + 1 # Split up text indices into subarrays, of equal size batches = np.array_split(s_text_ix, num_batches) # Reshape each split into [batch_size, training_seq_len] batches = [np.resize(x, [batch_size, training_seq_len]) for x in batches] # Initialize all variables init = tf.global_variables_initializer() sess.run(init) ``` 1. 我们现在可以遍历我们的周期,在每个周期开始之前对数据进行混洗。我们数据的目标只是相同的数据,但是移动了 1(使用`numpy.roll()`函数): ```py train_loss = [] iteration_count = 1 for epoch in range(epochs): # Shuffle word indices random.shuffle(batches) # Create targets from shuffled batches targets = [np.roll(x, -1, axis=1) for x in batches] # Run a through one epoch print('Starting Epoch #{} of {}.'.format(epoch+1, epochs)) # Reset initial LSTM state every epoch state = sess.run(lstm_model.initial_state) for ix, batch in enumerate(batches): training_dict = {lstm_model.x_data: batch, lstm_model.y_output: targets[ix]} c, h = lstm_model.initial_state training_dict[c] = state.c training_dict[h] = state.h temp_loss, state, _ = sess.run([lstm_model.cost, lstm_model.final_state, lstm_model.train_op], feed_dict=training_dict) train_loss.append(temp_loss) # Print status every 10 gens if iteration_count % 10 == 0: summary_nums = (iteration_count, epoch+1, ix+1, num_batches+1, temp_loss) print('Iteration: {}, Epoch: {}, Batch: {} out of {}, Loss: {:.2f}'.format(*summary_nums)) # Save the model and the vocab if iteration_count % save_every == 0: # Save model model_file_name = os.path.join(full_model_dir, 'model') saver.save(sess, model_file_name, global_step = iteration_count) print('Model Saved To: {}'.format(model_file_name)) # Save vocabulary dictionary_file = os.path.join(full_model_dir, 'vocab.pkl') with open(dictionary_file, 'wb') as dict_file_conn: pickle.dump([vocab2ix, ix2vocab], dict_file_conn) if iteration_count % eval_every == 0: for sample in prime_texts: print(test_lstm_model.sample(sess, ix2vocab, vocab2ix, num=10, prime_text=sample)) iteration_count += 1 ``` 1. 这导致以下输出: ```py Loading Shakespeare Data Cleaning Text Building Shakespeare Vocab Vocabulary Length = 8009 Starting Epoch #1 of 10\. Iteration: 10, Epoch: 1, Batch: 10 out of 182, Loss: 10.37 Iteration: 20, Epoch: 1, Batch: 20 out of 182, Loss: 9.54 ... Iteration: 1790, Epoch: 10, Batch: 161 out of 182, Loss: 5.68 Iteration: 1800, Epoch: 10, Batch: 171 out of 182, Loss: 6.05 thou art more than i am a to be or not to the man i have wherefore art thou art of the long Iteration: 1810, Epoch: 10, Batch: 181 out of 182, Loss: 5.99 ``` 1. 最后,以下是我们如何绘制历史上的训练损失: ```py plt.plot(train_loss, 'k-') plt.title('Sequence to Sequence Loss') plt.xlabel('Generation') plt.ylabel('Loss') plt.show() ``` This results in the following plot of our loss values: ![](https://img.kancloud.cn/ce/d8/ced87695c3298877752963275b78b8f1_393x281.png) 图 4:模型所有代的序列到序列损失 ## 工作原理 在这个例子中,我们基于莎士比亚词汇构建了一个带有 LSTM 单元的 RNN 模型来预测下一个单词。可以采取一些措施来改进模型,可能会增加序列大小,具有衰减的学习率,或者训练模型以获得更多的周期。 ## 更多 为了抽样,我们实现了一个贪婪的采样器。贪婪的采样器可能会一遍又一遍地重复相同的短语;例如,他们可能会卡住`for the for the` `for the....`为了防止这种情况,我们还可以实现一种更随机的采样方式,可能是根据输出的对数或概率分布制作加权采样器。