MENU

Seq2Seq 的 PyTorch 实现

June 30, 2020 • Read: 27388 • Deep Learning阅读设置

B 站视频讲解

本文介绍一下如何使用 PyTorch 复现 Seq2Seq,实现简单的机器翻译应用,请先简单阅读论文 Learning Phrase Representations using RNN Encoder–Decoder for Statistical Machine Translation(2014),了解清楚 Seq2Seq 结构是什么样的,之后再阅读本篇文章,可达到事半功倍的效果

我看了很多 Seq2Seq 网络结构图,感觉 PyTorch 官方提供的这个图是最好理解的

首先,从上面的图可以很明显的看出,Seq2Seq 需要对三个变量进行操作,这和之前我接触到的所有网络结构都不一样。我们把 Encoder 的输入称为 enc_input,Decoder 的输入称为 dec_input, Decoder 的输出称为 dec_output。下面以一个具体的例子来说明整个 Seq2Seq 的工作流程

下图是一个由 LSTM 组成的 Encoder 结构,输入的是 "go away" 中的每个字母(包括空格),我们只需要最后一个时刻隐藏状态的信息,即 $h_t$ 和 $c_t$

然后将 Encoder 输出的 $h_t$ 和 $c_t$ 作为 Decoder 初始时刻隐藏状态的输入 $h_0$、$c_0$,如下图所示。同时 Decoder 初始时刻输入层输入的是代表一个句子开始的标志(由用户定义,"<SOS>","\t","S" 等均可,这里以 "\t" 为例),之后得到输出 "m",以及新的隐藏状态 $h_1$ 和 $c_1$

再将 $h_1$、$c_1$ 和 "m" 作为输入,得到输入 "a",以及新的隐藏状态 $h_2$ 和 $c_2$

重复上述步骤,直到最终输出句子的结束标志(由用户定义,"<EOS>","\n","E" 等均可,这里以 "\n" 为例)

在 Decoder 部分,大家可能会有以下几个问题,我做下解答

  • 训练过程中,如果 Decoder 停不下来怎么办?即一直不输出句子的终止标志

    • 首先,训练过程中 Decoder 应该要输出多长的句子,这个是已知的,假设当前时刻已经到了句子长度的最后一个字符了,并且预测的不是终止标志,那也没有关系,就此打住,计算 loss 即可
  • 测试过程中,如果 Decoder 停不下来怎么办?例如预测得到 "wasd s w \n sdsw \n..........(一直输出下去)"

    • 不会停不下来的,因为测试过程中,Decoder 也会有输入,只不过这个输入是很多个没有意义的占位符,例如很多个 "<pad>"。由于 Decoder 有有限长度的输入,所以 Decoder 一定会有有限长度的输出。那么只需要获取第一个终止标志之前的所有字符即可,对于上面的例子,最终的预测结果为 "wasd s w"
  • Decoder 的输入和输出,即 dec_inputdec_output 有什么关系?

    • 在训练阶段,不论当前时刻 Decoder 输出什么字符,下一时刻 Decoder 都按照原来的 "计划" 进行输入。举个例子,假设 dec_input="\twasted",首先输入 "\t" 之后,Decoder 输出的是 "m" 这个字母,记录下来就行了,并不会影响到下一时刻 Decoder 继续输入 "w" 这个字母
    • 在验证或者测试阶段,Decoder 每一时刻的输出是会影响到输入的,因为在验证或者测试时,网络是看不到结果的,所以它只能循环的进行下去。举个例子,我现在要将英语 "wasted" 翻译为德语 "verschwenden"。那么 Decoder 一开始输入 "\t",得到一个输出,假如是 "m",下一时刻 Decoder 会输入 "m",得到输出,假如是 "a",之后会将 "a" 作为输入,得到输出...... 如此循环往复,直到最终时刻

这里说句题外话,其实我个人觉得 Seq2Seq 与 AutoEncoder 非常相似

下面开始代码讲解

首先导库,这里我用 'S' 作为开始标志,'E' 作为结束标志,如果输入或者输入过短,我使用 '?' 进行填充

  • # code by Tae Hwan Jung(Jeff Jung) @graykode, modify by wmathor
  • import torch
  • import numpy as np
  • import torch.nn as nn
  • import torch.utils.data as Data
  • device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
  • # S: Symbol that shows starting of decoding input
  • # E: Symbol that shows starting of decoding output
  • # ?: Symbol that will fill in blank sequence if current batch data size is short than n_step

定义数据集以及参数,这里数据集我设定的非常简单,可以看作是翻译任务,只不过是将英语翻译成英语罢了。

n_step 保存的是最长单词的长度,其它所有不够这个长度的单词,都会在其后用 '?' 填充

  • letter = [c for c in 'SE?abcdefghijklmnopqrstuvwxyz']
  • letter2idx = {n: i for i, n in enumerate(letter)}
  • seq_data = [['man', 'women'], ['black', 'white'], ['king', 'queen'], ['girl', 'boy'], ['up', 'down'], ['high', 'low']]
  • # Seq2Seq Parameter
  • n_step = max([max(len(i), len(j)) for i, j in seq_data]) # max_len(=5)
  • n_hidden = 128
  • n_class = len(letter2idx) # classfication problem
  • batch_size = 3

下面是对数据进行处理,主要做的是,首先对单词长度不够的,用 '?' 进行填充;然后将 Deocder 的输入数据末尾添加终止标志 'E',Decoder 的输入数据开头添加开始标志 'S',Decoder 的输出数据末尾添加结束标志 'E',其实也就如下图所示

  • def make_data(seq_data):
  • enc_input_all, dec_input_all, dec_output_all = [], [], []
  • for seq in seq_data:
  • for i in range(2):
  • seq[i] = seq[i] + '?' * (n_step - len(seq[i])) # 'man??', 'women'
  • enc_input = [letter2idx[n] for n in (seq[0] + 'E')] # ['m', 'a', 'n', '?', '?', 'E']
  • dec_input = [letter2idx[n] for n in ('S' + seq[1])] # ['S', 'w', 'o', 'm', 'e', 'n']
  • dec_output = [letter2idx[n] for n in (seq[1] + 'E')] # ['w', 'o', 'm', 'e', 'n', 'E']
  • enc_input_all.append(np.eye(n_class)[enc_input])
  • dec_input_all.append(np.eye(n_class)[dec_input])
  • dec_output_all.append(dec_output) # not one-hot
  • # make tensor
  • return torch.Tensor(enc_input_all), torch.Tensor(dec_input_all), torch.LongTensor(dec_output_all)
  • '''
  • enc_input_all: [6, n_step+1 (because of 'E'), n_class]
  • dec_input_all: [6, n_step+1 (because of 'S'), n_class]
  • dec_output_all: [6, n_step+1 (because of 'E')]
  • '''
  • enc_input_all, dec_input_all, dec_output_all = make_data(seq_data)

由于这里有三个数据要返回,所以需要自定义 DataSet,具体来说就是继承 torch.utils.data.Dataset 类,然后实现里面的__len__以及__getitem__方法

  • class TranslateDataSet(Data.Dataset):
  • def __init__(self, enc_input_all, dec_input_all, dec_output_all):
  • self.enc_input_all = enc_input_all
  • self.dec_input_all = dec_input_all
  • self.dec_output_all = dec_output_all
  • def __len__(self): # return dataset size
  • return len(self.enc_input_all)
  • def __getitem__(self, idx):
  • return self.enc_input_all[idx], self.dec_input_all[idx], self.dec_output_all[idx]
  • loader = Data.DataLoader(TranslateDataSet(enc_input_all, dec_input_all, dec_output_all), batch_size, True)

下面定义 Seq2Seq 模型,我用的是简单的 RNN 作为编码器和解码器。如果你对 RNN 比较了解的话,定义网络结构的部分其实没什么说的,注释我也写的很清楚了,包括数据维度的变化

  • # Model
  • class Seq2Seq(nn.Module):
  • def __init__(self):
  • super(Seq2Seq, self).__init__()
  • self.encoder = nn.RNN(input_size=n_class, hidden_size=n_hidden, dropout=0.5) # encoder
  • self.decoder = nn.RNN(input_size=n_class, hidden_size=n_hidden, dropout=0.5) # decoder
  • self.fc = nn.Linear(n_hidden, n_class)
  • def forward(self, enc_input, enc_hidden, dec_input):
  • # enc_input(=input_batch): [batch_size, n_step+1, n_class]
  • # dec_inpu(=output_batch): [batch_size, n_step+1, n_class]
  • enc_input = enc_input.transpose(0, 1) # enc_input: [n_step+1, batch_size, n_class]
  • dec_input = dec_input.transpose(0, 1) # dec_input: [n_step+1, batch_size, n_class]
  • # h_t : [num_layers(=1) * num_directions(=1), batch_size, n_hidden]
  • _, h_t = self.encoder(enc_input, enc_hidden)
  • # outputs : [n_step+1, batch_size, num_directions(=1) * n_hidden(=128)]
  • outputs, _ = self.decoder(dec_input, h_t)
  • model = self.fc(outputs) # model : [n_step+1, batch_size, n_class]
  • return model
  • model = Seq2Seq().to(device)
  • criterion = nn.CrossEntropyLoss().to(device)
  • optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

下面是训练,由于输出的 pred 是个三维的数据,所以计算 loss 需要每个样本单独计算,因此就有了下面 for 循环的代码

  • for epoch in range(5000):
  • for enc_input_batch, dec_input_batch, dec_output_batch in loader:
  • # make hidden shape [num_layers * num_directions, batch_size, n_hidden]
  • h_0 = torch.zeros(1, batch_size, n_hidden).to(device)
  • (enc_input_batch, dec_intput_batch, dec_output_batch) = (enc_input_batch.to(device), dec_input_batch.to(device), dec_output_batch.to(device))
  • # enc_input_batch : [batch_size, n_step+1, n_class]
  • # dec_intput_batch : [batch_size, n_step+1, n_class]
  • # dec_output_batch : [batch_size, n_step+1], not one-hot
  • pred = model(enc_input_batch, h_0, dec_intput_batch)
  • # pred : [n_step+1, batch_size, n_class]
  • pred = pred.transpose(0, 1) # [batch_size, n_step+1(=6), n_class]
  • loss = 0
  • for i in range(len(dec_output_batch)):
  • # pred[i] : [n_step+1, n_class]
  • # dec_output_batch[i] : [n_step+1]
  • loss += criterion(pred[i], dec_output_batch[i])
  • if (epoch + 1) % 1000 == 0:
  • print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))
  • optimizer.zero_grad()
  • loss.backward()
  • optimizer.step()

从下面测试的代码可以看出,在测试过程中,Decoder 的 input 是没有意义占位符,所占位置的长度即最大长度 n_step 。并且在输出中找到第一个终止符的位置,截取在此之前的所有字符

  • # Test
  • def translate(word):
  • enc_input, dec_input, _ = make_data([[word, '?' * n_step]])
  • enc_input, dec_input = enc_input.to(device), dec_input.to(device)
  • # make hidden shape [num_layers * num_directions, batch_size, n_hidden]
  • hidden = torch.zeros(1, 1, n_hidden).to(device)
  • output = model(enc_input, hidden, dec_input)
  • # output : [n_step+1, batch_size, n_class]
  • predict = output.data.max(2, keepdim=True)[1] # select n_class dimension
  • decoded = [letter[i] for i in predict]
  • translated = ''.join(decoded[:decoded.index('E')])
  • return translated.replace('?', '')
  • print('test')
  • print('man ->', translate('man'))
  • print('mans ->', translate('mans'))
  • print('king ->', translate('king'))
  • print('black ->', translate('black'))
  • print('up ->', translate('up'))

完整代码如下

  • # code by Tae Hwan Jung(Jeff Jung) @graykode, modify by wmathor
  • import torch
  • import numpy as np
  • import torch.nn as nn
  • import torch.utils.data as Data
  • device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
  • # S: Symbol that shows starting of decoding input
  • # E: Symbol that shows starting of decoding output
  • # ?: Symbol that will fill in blank sequence if current batch data size is short than n_step
  • letter = [c for c in 'SE?abcdefghijklmnopqrstuvwxyz']
  • letter2idx = {n: i for i, n in enumerate(letter)}
  • seq_data = [['man', 'women'], ['black', 'white'], ['king', 'queen'], ['girl', 'boy'], ['up', 'down'], ['high', 'low']]
  • # Seq2Seq Parameter
  • n_step = max([max(len(i), len(j)) for i, j in seq_data]) # max_len(=5)
  • n_hidden = 128
  • n_class = len(letter2idx) # classfication problem
  • batch_size = 3
  • def make_data(seq_data):
  • enc_input_all, dec_input_all, dec_output_all = [], [], []
  • for seq in seq_data:
  • for i in range(2):
  • seq[i] = seq[i] + '?' * (n_step - len(seq[i])) # 'man??', 'women'
  • enc_input = [letter2idx[n] for n in (seq[0] + 'E')] # ['m', 'a', 'n', '?', '?', 'E']
  • dec_input = [letter2idx[n] for n in ('S' + seq[1])] # ['S', 'w', 'o', 'm', 'e', 'n']
  • dec_output = [letter2idx[n] for n in (seq[1] + 'E')] # ['w', 'o', 'm', 'e', 'n', 'E']
  • enc_input_all.append(np.eye(n_class)[enc_input])
  • dec_input_all.append(np.eye(n_class)[dec_input])
  • dec_output_all.append(dec_output) # not one-hot
  • # make tensor
  • return torch.Tensor(enc_input_all), torch.Tensor(dec_input_all), torch.LongTensor(dec_output_all)
  • '''
  • enc_input_all: [6, n_step+1 (because of 'E'), n_class]
  • dec_input_all: [6, n_step+1 (because of 'S'), n_class]
  • dec_output_all: [6, n_step+1 (because of 'E')]
  • '''
  • enc_input_all, dec_input_all, dec_output_all = make_data(seq_data)
  • class TranslateDataSet(Data.Dataset):
  • def __init__(self, enc_input_all, dec_input_all, dec_output_all):
  • self.enc_input_all = enc_input_all
  • self.dec_input_all = dec_input_all
  • self.dec_output_all = dec_output_all
  • def __len__(self): # return dataset size
  • return len(self.enc_input_all)
  • def __getitem__(self, idx):
  • return self.enc_input_all[idx], self.dec_input_all[idx], self.dec_output_all[idx]
  • loader = Data.DataLoader(TranslateDataSet(enc_input_all, dec_input_all, dec_output_all), batch_size, True)
  • # Model
  • class Seq2Seq(nn.Module):
  • def __init__(self):
  • super(Seq2Seq, self).__init__()
  • self.encoder = nn.RNN(input_size=n_class, hidden_size=n_hidden, dropout=0.5) # encoder
  • self.decoder = nn.RNN(input_size=n_class, hidden_size=n_hidden, dropout=0.5) # decoder
  • self.fc = nn.Linear(n_hidden, n_class)
  • def forward(self, enc_input, enc_hidden, dec_input):
  • # enc_input(=input_batch): [batch_size, n_step+1, n_class]
  • # dec_inpu(=output_batch): [batch_size, n_step+1, n_class]
  • enc_input = enc_input.transpose(0, 1) # enc_input: [n_step+1, batch_size, n_class]
  • dec_input = dec_input.transpose(0, 1) # dec_input: [n_step+1, batch_size, n_class]
  • # h_t : [num_layers(=1) * num_directions(=1), batch_size, n_hidden]
  • _, h_t = self.encoder(enc_input, enc_hidden)
  • # outputs : [n_step+1, batch_size, num_directions(=1) * n_hidden(=128)]
  • outputs, _ = self.decoder(dec_input, h_t)
  • model = self.fc(outputs) # model : [n_step+1, batch_size, n_class]
  • return model
  • model = Seq2Seq().to(device)
  • criterion = nn.CrossEntropyLoss().to(device)
  • optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
  • for epoch in range(5000):
  • for enc_input_batch, dec_input_batch, dec_output_batch in loader:
  • # make hidden shape [num_layers * num_directions, batch_size, n_hidden]
  • h_0 = torch.zeros(1, batch_size, n_hidden).to(device)
  • (enc_input_batch, dec_intput_batch, dec_output_batch) = (enc_input_batch.to(device), dec_input_batch.to(device), dec_output_batch.to(device))
  • # enc_input_batch : [batch_size, n_step+1, n_class]
  • # dec_intput_batch : [batch_size, n_step+1, n_class]
  • # dec_output_batch : [batch_size, n_step+1], not one-hot
  • pred = model(enc_input_batch, h_0, dec_intput_batch)
  • # pred : [n_step+1, batch_size, n_class]
  • pred = pred.transpose(0, 1) # [batch_size, n_step+1(=6), n_class]
  • loss = 0
  • for i in range(len(dec_output_batch)):
  • # pred[i] : [n_step+1, n_class]
  • # dec_output_batch[i] : [n_step+1]
  • loss += criterion(pred[i], dec_output_batch[i])
  • if (epoch + 1) % 1000 == 0:
  • print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))
  • optimizer.zero_grad()
  • loss.backward()
  • optimizer.step()
  • # Test
  • def translate(word):
  • enc_input, dec_input, _ = make_data([[word, '?' * n_step]])
  • enc_input, dec_input = enc_input.to(device), dec_input.to(device)
  • # make hidden shape [num_layers * num_directions, batch_size, n_hidden]
  • hidden = torch.zeros(1, 1, n_hidden).to(device)
  • output = model(enc_input, hidden, dec_input)
  • # output : [n_step+1, batch_size, n_class]
  • predict = output.data.max(2, keepdim=True)[1] # select n_class dimension
  • decoded = [letter[i] for i in predict]
  • translated = ''.join(decoded[:decoded.index('E')])
  • return translated.replace('?', '')
  • print('test')
  • print('man ->', translate('man'))
  • print('mans ->', translate('mans'))
  • print('king ->', translate('king'))
  • print('black ->', translate('black'))
  • print('up ->', translate('up'))
Last Modified: April 29, 2021
Archives Tip
QR Code for this page
Tipping QR Code
Leave a Comment

25 Comments
  1. Wang Siwen Wang Siwen

    您好,关于这个 loss 那个部分 loss += criterion (pred [i], dec_output_batch [i]),我应该是这个地方报错了,想知道 两个不同维度的 可以这样 loss 吗?

    1. mathor mathor

      @Wang Siwen 可以的,比方说 crossentropyloss,传入的一个参数是 [batch_size, n_class],另一个参数是 [batch_size],可以计算 loss,不会有错

  2. Wang Siwen Wang Siwen

    C:python36libsite-packagestorchnnmodulesrnn.py:51: UserWarning: dropout option adds dropout after all but last recurrent layer, so non-zero dropout expects num_layers greater than 1, but got dropout=0.5 and num_layers=1
    "num_layers={}".format(dropout, num_layers))
    Traceback (most recent call last):
    File "D:/pycharmproject/SOME_folder/demo2.py", line 96, in <module>

    loss += criterion(pred[i],dec_output_batch[i])

    File "C:python36libsite-packagestorchnnmodulesmodule.py", line 541, in call

    result = self.forward(*input, **kwargs)

    File "C:python36libsite-packagestorchnnmodulesloss.py", line 916, in forward

    ignore_index=self.ignore_index, reduction=self.reduction)

    File "C:python36libsite-packagestorchnnfunctional.py", line 2009, in cross_entropy

    return nll_loss(log_softmax(input, 1), target, weight, None, ignore_index, None, reduction)

    File "C:python36libsite-packagestorchnnfunctional.py", line 1838, in nll_loss

    ret = torch._C._nn.nll_loss(input, target, weight, _Reduction.get_enum(reduction), ignore_index)

    RuntimeError: Expected object of scalar type Long but got scalar type Float for argument #2 'target' in call to _thnn_nll_loss_forward
    我运行的时候报了这个错误,不知道是为什么,博主可以帮我看看吗 @(泪)@(泪)@(泪)

    1. mathor mathor

      @Wang Siwencriterion (a, b),你把 b 改为 LongTensor 类型

    2. Wang Siwen Wang Siwen

      @mathor 之后 loss 也会变成 tensor, 那最后 loss.backword () 就会报错 'Tensor' object has no attribute 'backword'

    3. mathor mathor

      @Wang Siwen... 那真是奇怪了,我运行都没有问题

    4. Wang Siwen Wang Siwen

      @mathor 神奇的是 我直接复制您的程序 运行 就可以!但是我打出来的就是不行 但是 肉眼 真的看不出来和您的任何区别 @(汗)太神奇 @(心碎)

  3. vie vie

    在测试的时候 dec_input 传入的全是问号,是怎么实现你说的

    在验证或者测试阶段,Decoder 每一时刻的输出是会影响到输入的,因为在验证或者测试时,网络是看不到结果的,所以它只能循环的进行下去。举个例子,我现在要将英语 "wasted" 翻译为德语 "verschwenden"。那么 Decoder 一开始输入 "t",得到一个输出,假如是 "m",下一时刻 Decoder 会输入 "m",得到输出,假如是 "a",之后会将 "a" 作为输入,得到输出...... 如此循环往复,直到最终时刻
    ?

    1. dataminer dataminer

      @vie 我也发现这个博主没有完全解决你说的那个问题,这个估计要用 RnnCell 来实现

    2. mathor mathor

      @dataminer 或者自己定义一个 for 循环,每次只输入一个 token

  4. Sun Sun

    up 主您好,关于循环求 loss 那里,您的代码是循环 batch_size 这么多次。
    如果那里不交换 transpose 0 , 1 两个维度,直接循环 n_step+1 次不知道可行嘛
    像这样:
    for i in range(n_step+1)):

    loss += criterion()

    期待您的回答

    1. mathor mathor

      @Sun 可行的

  5. zyj zyj

    请问在 seq2seq 这个类里面的 foward 方法里为什么要将 enc_input 与 dec_input 的第 0 维和第 1 维互换啊?

    1. zyj zyj

      @zyj 是因为在你定义的 RNN 中没有设置 batch_first=True 吗?

    2. mathor mathor

      @zyj

  6. dopawei dopawei

    楼主,这个怎么解决?
    谢谢啦

    Traceback (most recent call last):
    File "D:/PhD Program/course/Deep_learning/assignment/13/test_1.py", line 156, in <module>

    print('2021 March 5 ->', transform('2021 March 5'))

    File "D:/PhD Program/course/Deep_learning/assignment/13/test_1.py", line 150, in transform

    transformed = ''.join(decoded[:decoded.index('E')])

    ValueError: 'E' is not in list

    Process finished with exit code 1

    1. dopawei dopawei

      @dopawei 多运行几次就好,是随机数的问题。但预测效果感人,哈哈!
      print('test')
      print('2021 March 6->', transform('2021 March 6'))
      test
      2021 March 6-> 3/51/1882

  7. Coffee Coffee

    if (epoch + 1) % 1000 == 0:

    print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))

    这句话放这里,打印的就是一个 epoch 中每个 batch 的 loss 了,可以放到后面单独拿出来,打印一个 epoch 的 loss,即一个 epoch 的最后一个 batch 的 loss.

  8. noao noao

    博主你好,请问你的这个代码,有使用预训练模型(比如 huggingface 的)的机器翻译的示例代码吗

    1. mathor mathor

      @noao 博客没有写,视频到有一个,不是机器翻译,但是类似的 seq2seq 任务
      https://www.bilibili.com/video/BV1Ka4y1x7qh

    2. noao noao

      @mathor 好的 不过博主有时间的话,可以发一下 colab 代码吗 b 站的评论都在要 @(呵呵)

    3. mathor mathor

      @noao 哦,那个啊,我当时找了一段时间找不到了,我再看看吧

  9. White White

    测试时,Decoder 的输入为一串‘?’作 PADDING。最后结果烂掉了。@(呵呵)

  10. 王小鹏 王小鹏

    作者你好 这里训练的时候选择输出最大概率来作为下一次的预测 ,加入我想在训练模型之后,在测试的时候,我想用最小概率作为下一次的输出应该怎么做

  11. tellw tellw

    “然后将 Deocder 的输入数据末尾添加终止标志 'E'” 句中 “Decoder” 应改为 “Encoder”