MENU

Simple RNN 时间序列预测

February 7, 2020 • Read: 5718 • Deep Learning阅读设置

本文将介绍利用朴素的 RNN 模型进行时间序列预测

比方说现在我们有如下图所示的一段正弦曲线,输入红色部分,通过训练输出下一段的值

首先分析一下,假设我们一次输入 50 个点,batch 设为 1,每个点就一个值,所以 input 的 shape 就是 [50, 1, 1],这里我们换一种表示形式,把 batch 放在前面,那么 shape 就是 [1, 50, 1],可以这么理解这个 shape,1 条曲线,一共有 50 个点,每个点都是 1 个实数

  • import numpy.random import randint
  • import numpy as np
  • import torch
  • from torch import nn, optim
  • from matplotlib import pyplot as plt
  • num_time_steps = 50
  • start = randint(3) # [0, 3)
  • time_steps = np.linspace(start, start + 10, num_time_steps) # 返回num_time_steps个点
  • data = np.sin(time_steps) # [50]
  • data = data.reshape(num_time_steps, -1) # [50, 1]
  • x = torch.tensor(data[:-1]).float().view(1, num_time_steps - 1, 1) # 0~48
  • y = torch.tensor(data[1:]).float().view(1, num_time_steps - 1, 1) # 1~49

start 表示的含义从几何上来说就是图上红色左边框的对应的横坐标的值,因为我们要确定一个起点,从这个起点开始向后取 50 个点,如果每次这个起点都是相同的,就会被这个网络记住

x 是 50 个数据点中的前 49 个,我们利用这 49 个点,每个点都向后预测一个单位的数据,得到 $\hat y$,然后将 $\hat y$ 与 $y$ 进行对比

接下来是构建网络架构

  • class Net(nn.Module):
  • def __init__(self):
  • super(Net, self).__init__()
  • self.rnn = nn.RNN(
  • input_size=input_size,
  • hidden_size=hidden_size,
  • num_layers=1,
  • batch_first=True,
  • )
  • self.linear = nn.Linear(hidden_size, output_size)
  • def forward(self, x, h0):
  • out, h0 = self.rnn(x, h0)
  • # [b, seq, h] => [seq, h]
  • out = out.view(-1, hidden_size)
  • out = self.linear(out) # [seq, h] => [seq, 1]
  • out = out.unsqueeze(dim=0) # => [1, seq, 1]
  • return out, h0

首先里面是一个 simple RNN,其中有个参数 batch_first,因为我们数据传入的格式是 batch 在前,所以要把这个参数设为 True。RNN 之后接了个 Linear,将 memory 的 size 输出为 output_size=1 方便进行比较,因为我们就只需要一个值

然后我们定义网络 Train 的代码

  • model = Net()
  • criterion = nn.MSELoss()
  • optimizer = optim.Adam(model.parameters(), lr)
  • h0 = torch.zeros(1, 1, hidden_size) # [b, 1, hidden_size]
  • for iter in range(6000):
  • start = np.random.randint(3, size=1)[0]
  • time_steps = np.linspace(start, start + 10, num_time_steps)
  • data = np.sin(time_steps)
  • data = data.reshape(num_time_steps, 1)
  • x = torch.tensor(data[:-1]).float().view(1, num_time_steps - 1, 1)
  • y = torch.tensor(data[1:]).float().view(1, num_time_steps - 1, 1)
  • output, h0 = model(x, h0)
  • h0 = h0.detach()
  • loss = criterion(output, y)
  • model.zero_grad()
  • loss.backward()
  • optimizer.step()
  • if iter % 100 == 0:
  • print("Iteration: {} loss {}".format(iter, loss.item()))

最后是 Predict 的部分

  • predictions = []
  • input = x[:, 0, :]
  • for _ in range(x.shape[1]):
  • input = input.view(1, 1, 1)
  • (pred, h0) = model(input, h0)
  • input = pred
  • predictions.append(pred.detach().numpy().ravel()[0])

假设 x 的 shape 是 [b, seq, 1],经过 x[:, 0, :] 之后就变成了 [b, 1],但其实前面说过了,batch 值是 1,所以 input 的 shape 就是 [1, 1],然后再展开成 [1, 1, 1] 是为了能匹配网络的输入维度

倒数第二行和第三行的代码做的事情是,首先带入第一个值,得到一个输出 pred,然后把 pred 作为下一次的输入,又得到一个 pred,如此循环往复,就把上一次的输出,作为下一次的输入

最后的输出图像如下所示

完整代码如下:

  • from numpy.random import randint
  • import numpy as np
  • import torch
  • import torch.nn as nn
  • import torch.optim as optim
  • from matplotlib import pyplot as plt
  • num_time_steps = 50
  • input_size = 1
  • hidden_size = 16
  • output_size = 1
  • lr=0.01
  • class Net(nn.Module):
  • def __init__(self):
  • super(Net, self).__init__()
  • self.rnn = nn.RNN(
  • input_size=input_size,
  • hidden_size=hidden_size,
  • num_layers=1,
  • batch_first=True,
  • )
  • self.linear = nn.Linear(hidden_size, output_size)
  • def forward(self, x, h0):
  • out, h0 = self.rnn(x, h0)
  • # [b, seq, h]
  • out = out.view(-1, hidden_size)
  • out = self.linear(out)
  • out = out.unsqueeze(dim=0)
  • return out, h0
  • model = Net()
  • criterion = nn.MSELoss()
  • optimizer = optim.Adam(model.parameters(), lr)
  • h0 = torch.zeros(1, 1, hidden_size)
  • for iter in range(6000):
  • start = randint(3)
  • time_steps = np.linspace(start, start + 10, num_time_steps)
  • data = np.sin(time_steps)
  • data = data.reshape(num_time_steps, 1)
  • x = torch.tensor(data[:-1]).float().view(1, num_time_steps - 1, 1)
  • y = torch.tensor(data[1:]).float().view(1, num_time_steps - 1, 1)
  • output, h0 = model(x, h0)
  • h0 = h0.detach()
  • loss = criterion(output, y)
  • model.zero_grad()
  • loss.backward()
  • optimizer.step()
  • if iter % 100 == 0:
  • print("Iteration: {} loss {}".format(iter, loss.item()))
  • start = randint(3)
  • time_steps = np.linspace(start, start + 10, num_time_steps)
  • data = np.sin(time_steps)
  • data = data.reshape(num_time_steps, 1)
  • x = torch.tensor(data[:-1]).float().view(1, num_time_steps - 1, 1)
  • y = torch.tensor(data[1:]).float().view(1, num_time_steps - 1, 1)
  • predictions = []
  • input = x[:, 0, :]
  • for _ in range(x.shape[1]):
  • input = input.view(1, 1, 1)
  • (pred, h0) = model(input, h0)
  • input = pred
  • predictions.append(pred.detach().numpy().ravel()[0])
  • x = x.data.numpy().ravel() # flatten操作
  • y = y.data.numpy()
  • plt.scatter(time_steps[:-1], x.ravel(), s=90)
  • plt.plot(time_steps[:-1], x.ravel())
  • plt.scatter(time_steps[1:], predictions)
  • plt.show()
Last Modified: February 12, 2020
Archives Tip
QR Code for this page
Tipping QR Code
Leave a Comment

已有 1 条评论
  1. 小新 小新

    同学,你好,请教你一个问题,关于 RNN 初始的隐状态 h0/RNN 最后一步输出的隐表示 h0,h0.detach () 是不对这个变量求导,不更新这个变量的作用吗(不清楚)?谢谢
    同学,还有一个问题想请教你一下(也是关于 deatch 的):比如 RNN(一层)先输入前 4 个时刻(t0 到 t3)的值,没有输入初始的隐状态(默认为 0),RNN 每一个时刻输出的隐表示经过一个 linear 作为下一个时刻的预测值(t1 到 t4),hidden 是这个时间段内的最后一个隐表示,这里有一个 hidden.detach (),然后 RNN 输入后 4 个时刻(t4 到 t7)的值,每一个时刻输出的隐表示经过一个 linear 作为下一个时刻的预测值(t5 到 t8),损失函数为,求和:(yi-yi_预测值)^2,i 从 1 到 8。想问一下,这里的 hidden.detach () 有什么作用?谢谢