Overview

  • yfinance+pycaretで株価予測ツールを作ってみた。
  • 所謂時系列予測なので、精度は良くない。

Setup

  • python > 3.10
  • pip install yfinance pycaret

Implementation

  • 株式データ収集のソースコード
import yfinance as yf  
import time  
  
# 2024年の日付範囲を設定  
start_date = "2024-01-01"  
end_date = "2024-08-26"  
  
for stock_code in range(1000,9999):  
    time.sleep(1)  
  
    # 株式シンボルを設定  
    symbol = f"{stock_code}.T"  
  
    # yfinanceを使用してデータを取得\  
    try:  
        stock = yf.Ticker(symbol)  
        stock_data = stock.history(start=start_date, end=end_date)  
  
        # 日付をインデックスとして保持しつつ、読みやすい形式に変換  
        stock_data.index = stock_data.index.strftime('%Y-%m-%d')  
    except Exception as e:  
        continue  
  
    # 結果を表示  
    print(stock_data)  
  
    # オプション:データをCSVファイルとして保存  
    stock_data.to_csv(f'./stock_data/{symbol}.csv')
  • 株式データから予測するソースコード
import pandas as pd  
import numpy as np  
import os  
import matplotlib.pyplot as plt  
from pycaret.time_series import *  
from datetime import datetime, timedelta  
from statsmodels.tsa.stattools import adfuller  
  
def load_stock_data(file_path):  
    df = pd.read_csv(file_path, parse_dates=['Date'], index_col='Date')  
    df = df.asfreq("D")  
    return df[['Close', 'Open', 'High', 'Low', 'Volume']]  
  
def add_technical_indicators(data):  
    # 移動平均  
    data['MA7'] = data['Close'].rolling(window=7).mean()  
    data['MA30'] = data['Close'].rolling(window=30).mean()  
  
    # RSI  
    delta = data['Close'].diff()  
    gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()  
    loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()  
    rs = gain / loss  
    data['RSI'] = 100 - (100 / (1 + rs))  
  
    # ボリンジャーバンド  
    data['20MA'] = data['Close'].rolling(window=20).mean()  
    data['20SD'] = data['Close'].rolling(window=20).std()  
    data['Upper_BB'] = data['20MA'] + (data['20SD'] * 2)  
    data['Lower_BB'] = data['20MA'] - (data['20SD'] * 2)  
  
    return data  
  
def check_stationarity(data):  
    result = adfuller(data['Close'].dropna())  
    if result[1] > 0.05:  
        data['Close'] = data['Close'].diff().dropna()  
        print("データを差分変換して定常性を確保しました")  
    return data  
  
def predict_stock_price(data, stock_name):  
    original_data = data.copy()  
    data = check_stationarity(data)  
    data = add_technical_indicators(data)  
    data = data.dropna(axis=1,how="all")  
  
    setup(data=data["Close"],  
          target='Close',  
          fold=3,  
          n_jobs=-1,  
          use_gpu=True,  
          numeric_imputation_target="ffill",  
          numeric_imputation_exogenous="ffill"  
          # numeric_features=['Open', 'High', 'Low', 'Volume', 'MA7', 'MA30', 'RSI', 'Upper_BB', 'Lower_BB']  
          )  
  
    best_model = compare_models(n_select=2)  
    blended_model = blend_models(best_model)  
    final_model = finalize_model(blended_model)  
  
    future_dates = pd.date_range(start=data.index[-1] + timedelta(days=1), periods=60)  
    predictions = predict_model(final_model, fh=60)  
  
    if 'Close' in data.columns and data['Close'].iloc[0] != data['Close'].diff().iloc[0]:  
        predictions['y_pred'] = predictions['y_pred'].cumsum() + original_data['Close'].iloc[-1]  
  
    print(f"\n予測結果 for {stock_name}:")  
    print(predictions)  
  
    predictions.to_csv(f'./prediction_data/{stock_name}_predictions.csv')  
    return original_data, predictions  
  
def is_price_increasing(current_price, predicted_price):  
    return predicted_price > current_price  
  
def plot_stock_data(past_data, predictions, stock_name,is_increasing):  
    plt.figure(figsize=(12, 6))  
    plt.plot(past_data.index, past_data['Close'], label='過去データ', color='blue')  
    plt.plot(predictions.index, predictions['y_pred'], label='予測データ', color='red')  
    plt.title(f'{stock_name} 株価予測')  
    plt.xlabel('日付')  
    plt.ylabel('株価')  
    plt.legend()  
    plt.grid(True)  
    plt.savefig(f'./prediction_data/{stock_name}_prediction_plot_{str(is_increasing)}.png')  
    plt.close()  
  
def main():  
    stock_data_dir = './stock_data'  
  
    for file_name in os.listdir(stock_data_dir):  
        if file_name.endswith('.csv'):  
            file_path = os.path.join(stock_data_dir, file_name)  
            stock_name = os.path.splitext(file_name)[0]  
  
            print(f"\n処理中: {stock_name}")  
  
            stock_data = load_stock_data(file_path)  
            current_price = stock_data['Close'].iloc[-1]  
  
            past_data, predictions = predict_stock_price(stock_data, stock_name)  
            final_predicted_price = predictions['y_pred'].iloc[-1]  
  
            is_increasing = is_price_increasing(current_price, final_predicted_price)  
  
            print(f"\n{stock_name}の現在の株価: {current_price:.2f}")  
            print(f"{stock_name}の予測最終日の株価: {final_predicted_price:.2f}")  
            print(f"{stock_name}の株価は上昇すると予測: {is_increasing}")  
  
            plot_stock_data(past_data, predictions, stock_name,is_increasing)  
            print(f"{stock_name}の予測プロットを保存しました: {stock_name}_prediction_plot.png")  
  
if __name__ == "__main__":  
    main()

Results

  • stock_data,prediction_dataというディレクトリを作っておけば、収集したデータと予測したデータをcsvで保存できる
  • え?これほんとに予測してますか?(ちなみに、間が抜けているのは取引してない日のためです。)

Conclusion

  • csvファイルとしてデータを保存できるところまで確認できました。
  • 予測精度は、・・・です。

Reference