- import numpy as np
- import pandas as pd
- import matplotlib.pyplot as plt
- import torch
- plt.rcParams['font.sans-serif'] = ['SimHei'] #定义使其正常显示中文字体黑体
- plt.rcParams['axes.unicode_minus'] = False #用来正常显示表示负号
- # 导入数据
- filename = 'RMB_EU.xltx'
- df = pd.read_excel(filename, encoding='utf-8')
- ### 提取测试数据
- data = df.loc[:,['时间','高']][:] # 提取时间与最大值两列
- data
- # # 最大最小归一化方法
- data['高'] = (data['高']-data['高'].min())/(data['高'].max()-data['高'].min())
- def create_dataset(dataset, len_x=7, len_y=7):
- data_X, data_Y, data_Z = [], [], []
- for i in range(len(dataset) - (len_x + len_y) + 1):
- a = dataset[i:(i + len_x)]
- data_X.append(a)
- b = dataset[(i + len_x):(i+len_x+len_y)]
- data_Y.append(b)
- for i in range(len(dataset)-(len_x+len_y)+1, len(dataset)-len_x+1):
- a = dataset[i:(i + len_x)]
- data_Z.append(a)
- return np.array(data_X), np.array(data_Y), np.array(data_Z)
- # 构建数据集
- X, Y, Z = create_dataset(data['高'], 30, 30)
- train_size = int(len(X) * 0.7)
- test_size = len(X) - train_size
- train_X = X[:train_size]
- train_Y = Y[:train_size]
- test_X = X[train_size:]
- test_Y = Y[train_size:]
- train_X.shape,train_Y.shape,test_X.shape,test_Y.shape, Z.shape
- # 设置数据形状
- train_X = train_X.reshape(-1, 1, 30)
- train_Y = train_Y.reshape(-1, 1, 30)
- test_X = test_X.reshape(-1, 1, 30)
- # test_Y = test_Y.reshape(-1, 1, 7)
- pred_Z = Z.reshape(-1, 1, 30)
- # 转换为 Tensor 类型
- train_X = torch.from_numpy(train_X)
- train_Y = torch.from_numpy(train_Y)
- test_X = torch.from_numpy(test_X)
- # test_Y = torch.from_numpy(test_Y)
- pred_Z = torch.from_numpy(pred_Z)
- # 使用 cuda
- train_X = train_X.cuda()
- train_Y = train_Y.cuda()
- test_X = test_X.cuda()
- # test_Y = test_Y.cuda()
- pred_Z = pred_Z.cuda()
- from torch import nn
- from torch.autograd import Variable
- rnn = nn.LSTM(30,30,2).cuda()
- rnn = rnn.double() # 数据类型问题
- criterion = nn.MSELoss()
- optimizer = torch.optim.Adam(rnn.parameters(), lr=1e-2)
- # 开始训练
- for e in range(50):
- var_X = Variable(train_X)
- var_Y = Variable(train_Y)
- # 前向传播
- out,(h, c) = rnn(var_X)
- loss = criterion(out, var_Y)
- # 反向传播
- optimizer.zero_grad()
- loss.backward()
- optimizer.step()
- print('Epoch: {}, Loss: {:.5f}'.format(e + 1, loss.data[0]))
- model = rnn.eval() # 转换成测试模式
- var_data = Variable(test_X)
- pred_test,(th, tc) = model(var_data) # 测试集的预测结果
- pred_test = pred_test.reshape(-1, 30)
- pred_test = pred_test.CPU()
- pred_test = pred_test.detach().numpy()
- pred_test
- # 画出实际结果和预测的结果
- plt.figure(figsize=(12,8))
- plt.plot(data['时间'][train_size:-59],pred_test[:,6], 'r', label='train')
- # plt.plot(data['时间'][train_size:-29],pred_t, 'r', label='prediction')
- plt.plot(data['时间'][train_size:-59],test_Y[:,6], 'b', label='real')
- plt.plot(data['时间'][-30:], pred_1, 'g', label='prediction')
- plt.legend(loc='best')
- # 预测结果
- pred,(_,_) = model(pred_Z)
- pred = pred.CPU()
- pred = pred.detach().numpy()
- pred = pred.reshape(-1, 30)
- pred_1 = np.mean(pred, axis=1)
- pred_1.shape
- plt.plot(data['时间'][-30:], pred_1, '-g', label='new')
来源: http://www.bubuko.com/infodetail-3358306.html