実験コード
import datetime
import numpy as np
import matplotlib.pylab as plt
from chainer import Chain, Variable, cuda, optimizer, optimizers, serializers
import chainer.functions as F
import chainer.links as L
import pandas as pd
# モデルクラス定義
class LSTM(Chain):
def __init__(self, in_size, hidden_size, out_size):
super(LSTM, self).__init__(
xh = L.Linear(in_size, hidden_size),
hh = L.LSTM(hidden_size, hidden_size),
hy = L.Linear(hidden_size, out_size)
)
def __call__(self, x, t=None, train=False):
x = Variable(x)
if train:
t = Variable(t)
h = self.xh(x)
h = self.hh(h)
y = self.hy(h)
if train:
return F.mean_squared_error(y, t)
else:
return y.data
def reset(self):
self.zerograds()
self.hh.reset_state()
EPOCH_NUM = 1000
IN_SIZE=1
HIDDEN_SIZE = 60
OUT_SIZE=1
BATCH_NUM = 10 # 分割したデータをいくつミニバッチに取り込むか
BATCH_SIZE = 60 # ミニバッチで分割する時系列数
# 学習
def Training(str="",wait_load=False):
print("\nTraining\n")
# 教師データ
df = pd.read_csv('nikkei-225-index-historical-chart-data.csv',header=8)
mat = df.query('date.str.match('+str+')', engine='python')
train_data_t = mat[' value'].values
print(train_data_t)
train_data = np.arange(len(train_data_t), dtype="float32");
# 微分(偏差)データに変換
for i in range(len(train_data_t)-1):
train_data[i] = train_data_t[i+1]-train_data_t[i]
# 正規化用ゲイン
gain = np.max(train_data)-np.min(train_data)
gain = gain/2
train_data = train_data/gain # ±1.0以内に
# 入力データと教師データを作成
train_x, train_t = [], []
for i in range(len(train_data)-1):
train_x.append(train_data[i])
train_t.append(train_data[i+1])
train_x = np.array(train_x, dtype="float32")
train_t = np.array(train_t, dtype="float32")
Num = len(train_x)
# モデル定義
model = LSTM(in_size=IN_SIZE, hidden_size=HIDDEN_SIZE, out_size=OUT_SIZE)
optimizer = optimizers.Adam()
optimizer.setup(model)
if wait_load:
serializers.load_npz("mymodel.npz", model)
# 学習開始
print("Train")
st = datetime.datetime.now() # 100エポック時間計測用に記録
for epoch in range(EPOCH_NUM):
# ミニバッチ学習
x, t = [], []
for i in range(BATCH_NUM):
# ランダムな箇所、ただしBATCH_SIZE分だけ抜き取れる場所から選ぶ
# (indexの末端がBATCH_SIZEを超えない部分でリミットを掛ける)
index = np.random.randint(0, Num-BATCH_SIZE+1)
x.append(train_x[index:index+BATCH_SIZE]) # BATCH_SIZE分の入力データを取り出す
t.append(train_t[index:index+BATCH_SIZE]) # BATCH_SIZE分の教師データを取り出す
# NumPy配列に変換
x = np.array(x, dtype="float32")
t = np.array(t, dtype="float32")
loss = 0
total_loss = 0
model.reset()
for i in range(BATCH_SIZE): # 各時刻おきにBATCH_NUMごと読み込んで損失を計算する
x_ = np.array([x[j, i] for j in range(BATCH_NUM)], dtype="float32")[:, np.newaxis] # 時刻iの入力値
t_ = np.array([t[j, i] for j in range(BATCH_NUM)], dtype="float32")[:, np.newaxis] # 時刻i+1の値(=正解の予測値)
loss += model(x=x_, t=t_, train=True) # 誤差合計
loss.backward() # 誤差逆伝播
loss.unchain_backward()
total_loss += loss.data
optimizer.update()
if (epoch+1) % 100 == 0:
# 100エポック毎に誤差と学習時間を表示
ed = datetime.datetime.now()
print("epoch:\t{}\ttotal loss:\t{}\ttime:\t{}".format(epoch+1, total_loss, ed-st))
st = datetime.datetime.now() # 100エポック時間計測用に記録
serializers.save_npz("mymodel.npz", model) # npz形式で書き出し
# 予測能力評価
def Predict(str):
print("\nPredict")
# モデルの定義
model = LSTM(in_size=IN_SIZE, hidden_size=HIDDEN_SIZE, out_size=OUT_SIZE)
# 重みロード
serializers.load_npz("mymodel.npz", model)
# 予測元データロード
df = pd.read_csv('nikkei-225-index-historical-chart-data.csv',header=8)
mat = df.query('date.str.match('+str+')', engine='python')
train_data_t = mat[' value'].values
train_data = np.arange(len(train_data_t), dtype="float32");
print(train_data_t)
# 偏差算出(微分)
for i in range(len(train_data_t)-1):
train_data[i] = train_data_t[i+1]-train_data_t[i]
# 正規化用ゲイン
gain = np.max(train_data)-np.min(train_data)
gain = gain/2
train_data = train_data/gain # ±1.0以内に
Num = len(train_data)-1
print(train_data)
predict = np.empty(0) # 予測値格納用
predict_size = 30 # 予測サイズ
predata_size = len(train_data)-predict_size # 予測直前までのデータ数
indata = train_data[1:predata_size] # 予測直前までのデータ
for _ in range(predata_size):
model.reset()
for i in indata: # モデルに予測直前までの時系列を読み込ませる
x = np.array([[i]], dtype="float32")
y = model(x=x, train=False)
predict = np.append(predict, y) # 最後の予測値を記録
# モデルに読み込ませる予測直前時系列を予測値で更新する
indata = np.delete(indata, 0)
indata = np.append(indata, y)
plt.plot(range(Num+1), train_data, color="red", label="t")
plt.plot(range(predata_size, predata_size+predict_size-1), predict[0:predict_size-1], "--.", label="y")
plt.legend(loc="upper left")
plt.show()
predict = predict * gain # 元データと同じ割合で予測値を拡大
ipredict = np.arange(len(predict)+1, dtype="float32")
predict_tmp = train_data_t[predata_size]; # 初期値(積分定数)
ipredict[0]=predict_tmp
# 積分
for i in range(len(predict)):
predict_tmp = predict_tmp + predict[i]
ipredict[i+1] = predict_tmp
plt.plot(range(Num+1), train_data_t, color="red", label="t")
plt.plot(range(predata_size, predata_size+predict_size-1), ipredict[0:predict_size-1], "--.", label="y")
plt.show()
# 本当に将来を予測
def Predict2(str="",tail=90):
print("\nPredict2")
# モデルの定義
model = LSTM(in_size=IN_SIZE, hidden_size=HIDDEN_SIZE, out_size=OUT_SIZE)
# 重みロード
serializers.load_npz("mymodel.npz", model)
# 予測元データロード
df = pd.read_csv('nikkei-225-index-historical-chart-data.csv',header=8)
if not str:
# 空文字列の場合は貯金tail日分のデータを使用
mat = df.tail(tail)
else:
# strを正規表現としてデータ抽出
mat = df.query('date.str.match('+str+')', engine='python')
train_data_t = mat[' value'].values
train_data = np.arange(len(train_data_t), dtype="float32");
for i in range(len(train_data_t)-1):
train_data[i] = train_data_t[i+1]-train_data_t[i]
gain = np.max(train_data)-np.min(train_data)
gain = gain/2
train_data = train_data/gain
Num = len(train_data)-1
print(train_data)
predict = np.empty(0) # 予測値格納用
predict_size = 30 # 予測サイズ
indata = train_data # 予測直前までの時系列
for _ in range(predict_size):
model.reset() # メモリを初期化
for i in indata: # モデルに予測直前までのデータを読み込ませる
x = np.array([[i]], dtype="float32")
y = model(x=x, train=False)
predict = np.append(predict, y) # 最後の予測値を記録
# モデルに読み込ませる予測直前データを予測値で更新する
indata = np.delete(indata, 0)
indata = np.append(indata, y)
plt.plot(range(Num+1), train_data, color="red", label="t")
plt.plot(range(Num, Num+predict_size), predict[0:predict_size], "--.", label="y")
plt.legend(loc="upper left")
plt.show()
predict = predict * gain
ipredict = np.arange(len(predict)+1, dtype="float32")
predict_tmp = train_data_t[Num];
ipredict[0]=predict_tmp
# 積分
for i in range(len(predict)):
predict_tmp = predict_tmp + predict[i]
ipredict[i+1] = predict_tmp
plt.plot(range(Num+1), train_data_t, color="red", label="t")
plt.plot(range(Num, Num+predict_size), ipredict[0:predict_size], "--.", label="y")
plt.show()
Training(str="\"^(2019-)\"",wait_load=False)
Predict(str="\"^2019-((01)|(02)|(03)|(04))\"") # 訓練データ内で予測
Predict(str="\"^2019-((04)|(05)|(06)|(07))\"") # 訓練データ内で予測
Predict(str="\"^2019-((07)|(08)|(09)|(10))\"") # 訓練データ内で予測
Predict(str="\"^2019-((09)|(10)|(11)|(12))\"") # 訓練データ内で予測
Predict(str="\"^2020-((01)|(02)|(03)|(04))\"") # 訓練データ外(テストデータ)で予測
#Predict2() #本当に未来を予測
Github
同一のものをGithubにも上げてます。
まとめ
- LSTMは株価予測が出来ないわけではないが、定量性を破壊する定性的要因は流石に適応できない。
- 平時であれば、もう少しいい感じに予測できたかも。
- 単純にデータを入れれば良いということは無く、NNの特性から逆算して学習データを加工する必要がある。
- 今回のように微分したり、正規化したり。
- その他:標準化、白色化(無相関化+標準化)
- 直近のデータだけで学習して、局所的な変動を見る方針にすると、また違った効能が見えるかもしれない。
- 人間が見つけられない特徴を拾ってくれる可能性あり。
- 今回は1次元入力、1次元出力の構成にしたが、株価に影響のある多変量データがあれば、それを一緒に入力すれば精度の高い予測ができる可能性あり。
- 嫁には「ふーん」という扱いを受けた。(がんばったのにぃ!)
※ 正規化、標準化、白色化についてはこちらで説明
最強のファンダメンタル株式投資法
https://amzn.to/3WPw2Tk
外資系アナリストが本当に使っている ファンダメンタル分析の手法と実例
https://amzn.to/3WPDmOM
買い時・売り時がひと目でわかる 株価チャート大全
https://amzn.to/3WSKrOU
マーケットのテクニカル分析 ――トレード手法と売買指標の完全総合ガイド
https://amzn.to/3V8yCCL
株価チャートの教科書
https://amzn.to/3K9FXM4
テクニカル投資の基礎講座 ──チャートの読み方から仕掛け・手仕舞いまで
https://amzn.to/44TPWhZ
勝ち続ける投資家になるための 株価予測の技術[決定版]
https://amzn.to/3WOSWu4
Pythonで儲かるAIをつくる
Amazon.co.jp
アセットマネージャーのためのファイナンス機械学習
https://amzn.to/3UR6Byn
Pythonによるファイナンス 第2版 ―データ駆動型アプローチに向けて
https://amzn.to/3wUxVDH
Excelで学ぶ時系列分析―理論と事例による予測―
https://amzn.to/4bJmkpV
新 Excelコンピュータシミュレーション:数学モデルを作って楽しく学ぼう
https://amzn.to/4apoxpu
Excelで操る! ここまでできる科学技術計算 第2版
https://amzn.to/3QVnyGN
工学のためのVBAプログラミング基礎
Amazon.co.jp
Chainer v2による実践深層学習
https://amzn.to/3yEOIeo
ゼロから作るDeep Learning 2 ―自然言語処理編
https://amzn.to/3yqC2I1
ゼロから作るDeep Learning ―Pythonで学ぶディープラーニングの理論と実装
https://amzn.to/3ys3B3L
Pythonではじめる機械学習 ―scikit-learnで学ぶ特徴量エンジニアリングと機械学習の基礎
Amazon.co.jp
コメント