第4章 予測モデルの評価と検証¶

4.2 ホールドアウト法と時系列データ¶

In [1]:
#
# code 4.1
#

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pmdarima.datasets import load_airpassengers

# データの読み込み
data = load_airpassengers()
df_ap = pd.DataFrame(data, columns=['y'])
df_ap.index = pd.date_range(
    start='1949-01-01', # 開始月
    periods=len(df_ap), # 期数
    freq='M'            # 間隔(Mは月を意味する)
)
In [2]:
#
# code 4.2
#

from pmdarima.model_selection import train_test_split

# データを学習データとテストデータに分割
train, test = train_test_split(
    df_ap['y'],   # 分割対象の時系列データ
    test_size=12, # 直近12ヶ月分をテストデータ
)

# 学習データとテストデータの期間を確認
print("Training data:", train.index.min(), train.index.max())
print("Test data:", test.index.min(), test.index.max())
Training data: 1949-01-31 00:00:00 1959-12-31 00:00:00
Test data: 1960-01-31 00:00:00 1960-12-31 00:00:00
In [3]:
#
# code 4.3
#

from sklearn.metrics import (
    mean_absolute_error,            # MAE
    mean_absolute_percentage_error, # MAPE
)
from pmdarima import AutoARIMA

# 学習データでARIMAモデルを構築
model = AutoARIMA(seasonal=True, m=12)
model.fit(train)

# テストデータの期間を予測
forecast = model.predict(n_periods=12)

# 評価指標(MAEとMAPE)を計算
mae = mean_absolute_error(test, forecast)
mape = mean_absolute_percentage_error(test, forecast)
print(f"MAE: {mae:.2f}")
print(f"MAPE: {mape*100:.2f}%")
MAE: 14.90
MAPE: 3.10%
In [4]:
#
# code 3.3
#

# 予測結果をプロットする関数
def plot_prediction_results(
    y,                  # 実際の観測値
    fitted,             # 学習データ期間の予測値
    forecast=None,      # 将来予測の予測値
    conf=None,          # 将来予測の予測区間
):
    # 実測値のプロット
    plt.figure(figsize=(10, 5))
    plt.plot(
        y.index,        # 横軸
        y.values,       # 縦軸 
        label='Actual'  # ラベル名
    )
    # 学習データ期間の予測値のプロット
    plt.plot(
        fitted.index,   # 横軸
        fitted.values,  # 縦軸
        label='Fitted', # ラベル名
        color='green',  # 線の色
        linestyle=':'   # 線のスタイル
    )
    # 将来の予測値のプロット
    if forecast is not None:
        plt.plot(
            forecast.index,   # 横軸
            forecast,         # 縦軸
            label='Forecast', # ラベル名
            color='red',      # 線の色
            linestyle='--'    # 線のスタイル
        )
    # 将来の予測区間のプロット
    if conf is not None:
        plt.fill_between(
            forecast.index,            # 横軸
            conf[:, 0],                # 塗りつぶし幅の下限
            conf[:, 1],                # 塗りつぶし幅の上限
            color='red',               # 塗りつぶし幅の色
            alpha=0.1,                 # 塗りつぶし幅の透明度
            label='Predicted Interval' # ラベル名
        )
    plt.legend() # 凡例を表示
    plt.show()   # グラフを表示
In [5]:
#
# code 4.4
#

# 学習データ期間の予測値を取得
fitted = model.predict_in_sample()

# 将来12か月間の予測(予測区間付き)
forecast, conf = model.predict(
    n_periods=12,         # 予測期間(12ヶ月分)
    return_conf_int=True, # 予測区間を返す
    alpha=0.05,           # 予測区間の信頼水準(95%予測区間)
)

# 予測結果をプロット
plot_prediction_results(df_ap.y, fitted, forecast, conf)
No description has been provided for this image

4.3 時系列のクロスバリデーション方法¶

4.3.2 時系列クロスバリデーション法¶

In [6]:
#
# code 4.5
#

from pmdarima.model_selection import RollingForecastCV

# RollingForecastCVクラスのインスタンスを作成
cv = RollingForecastCV(
    initial=36,  # 初期の訓練用データ期間を36ヶ月に設定
    h=12,        # 先の12か月のデータを予測対象とする
    step=3       # ローリングするステップ数を3ヶ月ずつとする
)

# 時系列データを配列に変換し取得
y = df_ap['y'].values

# クロスバリデーションの分割情報の取得
tscv = cv.split(y)
In [7]:
#
# code 4.6
#

# tscvから最初の3つの分割を取り出して表示
for i, (train_idx, test_idx) in enumerate(tscv):
    print(f"\n分割 {i+1}:")
    print("訓練データのインデックス範囲: "
          f"{train_idx[0]} - {train_idx[-1]}")
    print("検証データのインデックス範囲: "
          f"{test_idx[0]} - {test_idx[-1]}")
    if i >= 2:  # 最初の3つの分割のみ表示
        break
分割 1:
訓練データのインデックス範囲: 0 - 35
検証データのインデックス範囲: 36 - 47

分割 2:
訓練データのインデックス範囲: 0 - 38
検証データのインデックス範囲: 39 - 50

分割 3:
訓練データのインデックス範囲: 0 - 41
検証データのインデックス範囲: 42 - 53
In [8]:
#
# code 4.7
#

# 時系列データに対してクロスバリデーションを実施し、MAEとMAPEを計算する関数
def perform_ARIMA_tscv(
    y_values, # 時系列データ
    tscv,     # 時系列CVの分割情報
    m=0,      # 季節周期
):
    # MAEとMAPEを保存するリスト
    mae_list = []
    mape_list = []
    
    # クロスバリデーションの実施
    for train_idx, test_idx in tscv:

        # 訓練データと検証データの取得
        train_data = y_values[train_idx]  # 訓練データ
        test_data = y_values[test_idx]    # 検証データ

        # 訓練データでARIMAモデルを学習
        if m == 0:
            model = AutoARIMA(seasonal=False) 
        else:
            model = AutoARIMA(seasonal=True, m=m) 
        model.fit(train_data)

        # 検証データの期間を予測
        forecast = model.predict(n_periods=len(test_data))

        # 評価指標を計算
        mae = mean_absolute_error(test_data, forecast)
        mape = mean_absolute_percentage_error(test_data, forecast)
        mae_list.append(mae)    # 計算したMAEをリストに追加
        mape_list.append(mape)  # 計算したMAPEをリストに追加

    # 全クロスバリデーションの平均MAEとMAPEを計算
    mean_mae = np.mean(mae_list)
    mean_mape = np.mean(mape_list)

    # 戻り値: (平均MAE, 平均MAPE)
    return mean_mae, mean_mape
In [9]:
#
# code 4.8
#

m_mae, m_mape = perform_ARIMA_tscv(
    y_values = y,       # 時系列データ
    tscv = cv.split(y), # 時系列CVの分割情報
    m=12,               # ARIMAモデルの期間(季節周期)
)
print(f"CV MAE: {m_mae:.2f}")
print(f"CV MAPE: {m_mape*100:.2f}%")
CV MAE: 16.98
CV MAPE: 5.40%
In [10]:
#
# code 4.9
#

from pmdarima.model_selection import SlidingWindowForecastCV

# SlidingWindowForecastCVクラスのインスタンスを作成
cv = SlidingWindowForecastCV(
    window_size=36,  # 過去36か月分のデータを使用してモデルを学習
    h=12,            # 先の12か月のデータを予測対象とする
    step=3           # スライディングウィンドウを3か月ずつ移動
)

# 時系列データを配列に変換し取得
y = df_ap['y'].values

# クロスバリデーションの分割情報の取得
tscv = cv.split(y)
In [11]:
#
# code 4.10
#

# tscvから最初の3つの分割を取り出して表示
for i, (train_idx, test_idx) in enumerate(tscv):
    print(f"\n分割 {i+1}:")
    print("訓練データのインデックス範囲: "
          f"{train_idx[0]} - {train_idx[-1]}")
    print("検証データのインデックス範囲: "
          f"{test_idx[0]} - {test_idx[-1]}")
    if i >= 2:  # 最初の3つの分割のみ表示
        break
分割 1:
訓練データのインデックス範囲: 0 - 35
検証データのインデックス範囲: 36 - 47

分割 2:
訓練データのインデックス範囲: 3 - 38
検証データのインデックス範囲: 39 - 50

分割 3:
訓練データのインデックス範囲: 6 - 41
検証データのインデックス範囲: 42 - 53
In [12]:
#
# code 4.11
#

m_mae, m_mape = perform_ARIMA_tscv(
    y_values = y, # 時系列データ
    tscv = cv.split(y), # 時系列CVの分割情報
    m=12, # ARIMAモデルの期間(季節周期)
)
print(f"CV MAE: {m_mae:.2f}")
print(f"CV MAPE: {m_mape*100:.2f}%")
CV MAE: 16.60
CV MAPE: 5.20%

4.3.3 ホールドアウト×クロスバリデーション¶

In [13]:
#
# code 4.12
#

from pmdarima.model_selection import train_test_split

# 時系列データを配列に変換し取得
y = df_ap['y'].values

# 時系列データをホールドアウト方式で分割(直近12ヶ月がテストデータ)
train_all, test_final = train_test_split(
    y,            # 分割対象の時系列データ
    test_size=12, # 直近12ヶ月分をテストデータ 
)
In [14]:
#
# code 4.13
#

import optuna
from pmdarima import AutoARIMA
from pmdarima.model_selection import RollingForecastCV
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import mean_absolute_percentage_error

# Optunaの目的関数の設定
def objective(trial):
    
    # 試行する複数の季節性の周期
    m = trial.suggest_int(
        name='m', 
        low=0,      # 最小値
        high=12,    # 最大値
    )

    # RollingForecastCVクラスのインスタンスを作成
    cv = RollingForecastCV(
        initial=36, # 初期の訓練用データ期間を36ヶ月に設定
        h=12,       # 先の12か月のデータを予測対象とする
        step=3      # ローリングするステップ数を3ヶ月ずつとする
    )

    # クロスバリデーションの分割情報の取得
    tscv = cv.split(train_all)

    # perform_ARIMA_tscv関数を使用して評価指標を計算
    mae, _ = perform_ARIMA_tscv(train_all, tscv, m)

    # 戻り値として評価指標を返す
    return mae
In [15]:
#
# code 4.14
#

# Studyオブジェクトの作成(最小化問題として設定)
study = optuna.create_study(
    direction='minimize',
    sampler=optuna.samplers.TPESampler(seed=123)
)

# ハイパーパラメータの最適化実行
study.optimize(
    objective,   # 目的関数
    n_trials=10, # 10回試行
    n_jobs=-1,   # 全CPUを使用する
)

# 最良の周期を取得
best_m = study.best_params['m']
print(f"Best m: {best_m}")
Best m: 12
In [16]:
#
# code 4.15
#

# CV で得られた最良の周期でモデルを学習
model = AutoARIMA(
    seasonal=True, 
    m=best_m, 
)
model.fit(train_all)

# テストデータ期間を予測
forecast = model.predict(n_periods=len(test_final))

# 評価指標を計算
mae = mean_absolute_error(test_final, forecast)
mape = mean_absolute_percentage_error(test_final, forecast)

# 結果を出力
print(f"MAE on test: {mae:.2f}")
print(f"MAPE on test: {mape*100:.2f}%")
MAE on test: 14.90
MAPE on test: 3.10%
In [17]:
#
# Prophetモデル用のデータの準備
#

# Prophet用データフレームに変換('ds'と'y'列で表現)
df_prophet = pd.DataFrame({
    'ds': df_ap.index,
    'y': df_ap['y'].values
})

# データを学習データとテストデータに分割
train_all, test_final = train_test_split(
    df_prophet, 
    test_size=12  # 直近12ヶ月分をテストデータ
)
In [18]:
#
# code 4.16
#

from prophet import Prophet

# Prophetモデル版
# 時系列データに対してCVを実施してMAEとMAPEを計算する関数
def perform_prophet_cv(
    df,                      # 時系列データ(DataFrame with 'ds' and 'y' columns)
    tscv,                    # 時系列CVの分割情報
    changepoint_prior_scale, # トレンドの柔軟性を制御
    seasonality_prior_scale, # 季節性の柔軟性を制御
    growth,                  # トレンドの種類
    seasonality_mode,        # 季節性モデル(加法か乗法か)
):
    # CVの評価指標を格納するリスト
    mae_list = []
    mape_list = []

    # CVを実施
    for train_idx, test_idx in tscv:

        # 訓練データと検証データの取得
        train_data = df.iloc[train_idx]
        test_data = df.iloc[test_idx]

        # Prophetモデルの学習
        model = Prophet(
            changepoint_prior_scale=changepoint_prior_scale,
            seasonality_prior_scale=seasonality_prior_scale,
            growth=growth,
            seasonality_mode=seasonality_mode
        )

        # モデルの学習
        model.fit(train_data)

        # 予測期間の設定
        future = model.make_future_dataframe(
            periods=len(test_data), 
            freq='M'
        )

        # 予測の実行
        forecast = model.predict(future)

        # 検証データ期間の予測値を取得
        y_pred = forecast.iloc[-len(test_data):]['yhat'].values
        y_true = test_data['y'].values  

        # 評価指標を計算
        mae = mean_absolute_error(y_true, y_pred)
        mape = mean_absolute_percentage_error(y_true, y_pred)

        # 評価指標をリストに追加
        mae_list.append(mae)
        mape_list.append(mape)

    # 戻り値: (平均MAE, 平均MAPE)
    return np.mean(mae_list), np.mean(mape_list)
In [19]:
#
# code 4.17
#

import optuna
from pmdarima.model_selection import RollingForecastCV
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import mean_absolute_percentage_error

# Optunaのobjective関数の定義
def objective(trial):

    # トレンドの柔軟性を制御するパラメータ
    changepoint_prior_scale = trial.suggest_float(
        'changepoint_prior_scale',
        0.001, 0.5,
        log=True  # 対数スケールでの探索を指定
    )

    # 季節性の柔軟性を制御するパラメータ
    seasonality_prior_scale = trial.suggest_float(
        'seasonality_prior_scale',
        0.01, 10.0,
        log=True  # 対数スケールでの探索を指定
    )

    # トレンドの種類
    growth = trial.suggest_categorical(
        'growth',
        ['linear']  # logisticは除外
    )

    # 季節性モデルの種類(加法か乗法か)
    seasonality_mode = trial.suggest_categorical(
        'seasonality_mode',
        ['additive', 'multiplicative']
    )

    # RollingForecastCVの設定
    cv = RollingForecastCV(
        initial=36,  # 初期の訓練用データ期間を36ヶ月に設定
        h=12,        # 先の12か月のデータを予測対象とする
        step=3       # ローリングするステップ数を3ヶ月ずつとする
    )

    # クロスバリデーションの分割情報の取得
    tscv = cv.split(train_all.values)  

    # 評価指標の計算
    mae, _ = perform_prophet_cv(
        train_all, 
        tscv, 
        changepoint_prior_scale,
        seasonality_prior_scale,
        growth,
        seasonality_mode
    )
    
    # 戻り値: MAE
    return mae
In [20]:
#
# code 4.18
#

# Studyオブジェクトの作成(最小化問題として設定)
study = optuna.create_study(
    direction='minimize',
    sampler=optuna.samplers.TPESampler(seed=123)
)

# ハイパーパラメータの最適化実行
study.optimize(
    objective,
    n_trials=10000,
    n_jobs=-1
)

# 最適なパラメータの取得と表示
best_params = study.best_params
print("Best parameters:")
for param, value in best_params.items():
    print(f"{param}: {value}")
Best parameters:
changepoint_prior_scale: 0.024159437996275124
seasonality_prior_scale: 0.06539669782956342
growth: linear
seasonality_mode: multiplicative
In [21]:
#
# code 4.19
#

# 最適なパラメータでモデルを学習
final_model = Prophet(**best_params)
final_model.fit(train_all)

# テストデータ期間の予測
future = final_model.make_future_dataframe(
    periods=len(test_final), 
    freq='M'
)
forecast = final_model.predict(future)

# テストデータの評価
test_pred = forecast.iloc[-len(test_final):]['yhat'].values
mae = mean_absolute_error(test_final['y'].values, test_pred)
mape = mean_absolute_percentage_error(test_final['y'].values, test_pred)

# 結果を出力
print(f"MAE on test: {mae:.2f}")
print(f"MAPE on test: {mape*100:.2f}%")
MAE on test: 17.69
MAPE on test: 3.62%

4.4 残差分析によるモデル診断¶

In [22]:
#
# code 4.20
#

from pmdarima.arima import auto_arima

# データフレームをコピー
df_tbl = df_ap.copy()

# モデルの作成
model = auto_arima(
    df_tbl['y'],
    seasonal=True,
    m=12
)

# 予測値(学習データと同じ期間)を追加
df_tbl['pred'] = model.predict_in_sample()

# 残差の計算
df_tbl['res'] = df_tbl['y'] - df_tbl['pred']
df_tbl = df_tbl[12:] #不安定な初期12ヶ月を削除
In [23]:
#
# code 4.21
#

# 残差プロット
plt.figure(figsize=(10, 4))
plt.scatter(df_tbl['pred'], df_tbl['res'], alpha=0.5)
plt.axhline(y=0, color='r', linestyle='--')
plt.xlabel("Predicted Values")
plt.ylabel("Residuals")
plt.title("Residual Plot")
plt.show()
No description has been provided for this image
In [24]:
#
# code 4.22
#

# 残差のヒストグラム
plt.figure(figsize=(10, 4))
plt.hist(df_tbl['res'], bins=20, edgecolor='w')
plt.xlabel("Residuals")
plt.ylabel("Frequency")
plt.title("Histogram of Residuals")
plt.show()
No description has been provided for this image
In [25]:
#
# code 4.23
#

# Q-Q プロット
import statsmodels.api as sm

sm.qqplot(df_tbl['res'], line='s')
plt.title("Q-Q Plot")
plt.show()
No description has been provided for this image
In [26]:
#
# code 4.24
#

# 残差の自己相関プロット
from statsmodels.graphics.tsaplots import plot_acf

plot_acf(df_tbl['res'].dropna(), lags=24)
plt.title("Autocorrelation Plot of Residuals (ACF)")
plt.xlabel("Lag")
plt.ylabel("Autocorrelation")
plt.show()
No description has been provided for this image
In [27]:
#
# code 4.25
#

# シャピロ・ウィルク検定による正規性の検討
from scipy.stats import shapiro

stat_sw, p_sw = shapiro(df_tbl['res'].dropna())
print(f"Shapiro-Wilk Test: stat={stat_sw:.4f}, p={p_sw:.4f}")
Shapiro-Wilk Test: stat=0.9436, p=0.0000
In [28]:
#
# code 4.26
#

# ラン検定によるランダム性の確認
from statsmodels.sandbox.stats.runs import runstest_1samp

stat_runs, p_runs = runstest_1samp(
    df_tbl['res'].dropna(), 
    correction=True
)
print(f"Runs Test: stat={stat_runs:.4f}, p={p_runs:.4f}")
Runs Test: stat=-0.6591, p=0.5099
In [29]:
#
# code 4.27
#

# リュング・ボックス検定による自己相関の確認
from statsmodels.stats.diagnostic import acorr_ljungbox

lb_test = acorr_ljungbox(
    df_tbl['res'].dropna(), 
    lags=[12], 
    return_df=True
)
print(lb_test)
     lb_stat  lb_pvalue
12  8.039604   0.782028

4.5 予測区間の算出¶

In [30]:
#
# code 4.28
#

from typing import Optional, Tuple, Iterator

# 指定された時刻 t の特徴量ベクトルを構築する関数
def build_feature_vector(
    t: int,                         # 現在の時刻インデックス (整数値)
    y_array: np.ndarray,            # 目的変数の時系列データ (1次元配列)
    X_array: Optional[np.ndarray],  # 外生変数 (2次元配列または None)
    use_lags: bool,                 # ラグ特徴量を使用するか (True/False)
    max_lag: int,                   # 最大のラグサイズ (整数値)
    use_exog: bool                  # 外生変数データを使用するか (True/False)
) -> np.ndarray:
    feats = []

    # y のラグ特徴量を追加
    if use_lags and max_lag > 0:
        for lag in range(1, max_lag + 1):
            feats.append(y_array[t - lag])
    
    # 外生変数 X を追加
    if use_exog and (X_array is not None):
        feats.extend(X_array[t].tolist()) 
	
    # 1行に整形して返す
    return np.array(feats, dtype=float).reshape(1, -1)  
In [31]:
#
# code 4.29
#

# 学習データセット(X, y)を構築する関数
# 指定インデックス範囲に基づき、特徴量行列 X と目的変数ベクトル y を生成
def make_train_xy(
    y_array: np.ndarray,           # 目的変数データ
    X_array: Optional[np.ndarray], # 追加の外生変数データ
    idx_array: np.ndarray,         # 学習データのインデックス
    use_lags: bool,                # 目的変数のラグ特徴量を使用するか (True/False)
    max_lag: int,                  # 目的変数のラグ特徴量の最大ラグ数
    use_exog: bool                 # 追加の外生変数を使用するか (True/False)
):
    # 学習データがない場合に空のデータを返す
    if len(idx_array) == 0:
        return np.empty((0, 0)), np.empty((0,))  

	# インデックスの始点と終点
    start_t, end_t = idx_array[0], idx_array[-1]  

    # 特徴量データとターゲット変数データを保持するリストを初期化
    X_list, y_list = [], []

    # ラグが必要な場合は開始インデックスを調整
    offset = max_lag if use_lags else 0

	# 特徴量を生成
    for t in range(start_t + offset, end_t + 1):
        fv = build_feature_vector(
            t, y_array, X_array,
            use_lags, max_lag, use_exog
        )  
        X_list.append(fv.ravel())   # 特徴量ベクトルをリストに追加
        y_list.append(y_array[t])   # 対応するy値をリストに追加
    if len(X_list) == 0:
        return np.empty((0, 0)), np.empty((0,)) 

    # リストを配列に変換して返す
    X_mat = np.array(X_list)
    y_vec = np.array(y_list)
    return X_mat, y_vec
In [32]:
#
# code 4.30
#

# 時系列CCPで予測区間幅を計算する関数
def ccp_recursive_multi_step_general(
    y: np.ndarray,                                      # 目的変数データ
    X: Optional[np.ndarray],                            # 追加の外生変数データ
    cv_splits: Iterator[Tuple[np.ndarray, np.ndarray]], # CV 分割情報
    model_class,                                        # 学習モデル
    H: int = 3,                                         # 何ステップ分先まで予測するか
    pred_alpha: float = 0.1,                            # 予測区間の信頼水準
    use_lags: bool = True,                              # 目的変数のラグ特徴量を使用するか
    max_lag: int = 2,                                   # 目的変数のラグ特徴量の最大ラグ
    use_exog: bool = True,                              # 追加の外生変数を使用するか
    **model_kwargs                                      # モデルに渡す追加のパラメータ
):
    # 各予測ステップの残差リスト初期化
    residuals_h = [[] for _ in range(H)]

    # 各分割 (訓練と校正のセット)について処理を行う
    for (train_idx, test_idx) in cv_splits:
        if len(train_idx) == 0 or len(test_idx) == 0:
            continue  # データ分割が空の場合はスキップ

        # 訓練期間のデータ範囲
        train_start, train_end = train_idx[0], train_idx[-1]  

        # 校正期間のデータ範囲
        cal_start, cal_end = test_idx[0], test_idx[-1]  

        # 訓練データセット (X, y) を構築
        X_train, y_train = make_train_xy(
            y, X, train_idx, use_lags, max_lag, use_exog)

        # 訓練データでモデル学習
        model = model_class(**model_kwargs)
        if len(X_train) > 0:
            model.fit(X_train, y_train)
        else:
            continue  # 訓練データが空の場合はスキップ

        # 校正期間で再帰的な予測を実施し、誤差を記録
        y_known = y.copy()
        for t in range(cal_start, cal_end + 1):
            for h in range(1, H + 1):
                future_t = t + h
                if future_t >= len(y):
                    break  # データ範囲を超えたら終了
                try:
                    # 特徴量を生成
                    fv = build_feature_vector(
                        future_t, y_known, X,
                        use_lags, max_lag, use_exog
                    )  
                except IndexError:
                    break  # ラグ不足でエラーが発生した場合も終了
                # 予測値
                y_hat = model.predict(fv)[0]
                # 誤差 = 真の値 - 予測値  
                e = y[future_t] - y_hat  
                # hステップ目の残差を記録
                residuals_h[h - 1].append(e)  
                # 再帰的予測用にy_knownを更新
                y_known[future_t] = y_hat  
                
    # 予測区間幅を計算してリストに格納
    width_h = []
    for h in range(1, H + 1):
        # 残差の絶対値を取得
        abs_res = np.abs(residuals_h[h - 1])  
        if len(abs_res) == 0:
            width_h.append(0.0)  # 残差がない場合は幅は0
        else:
            # 与えられた信頼水準から分位点を計算
            q = np.quantile(abs_res, 1 - pred_alpha)  
            width_h.append(q)

    # 予測区間幅と残差リストを返す
    return width_h, residuals_h
In [33]:
#
# code 4.31
#

from pmdarima.model_selection import train_test_split

# 学習データとテストデータに分ける
y_train, y_test = train_test_split(
    df_ap['y'], 
    test_size=12
)
In [34]:
#
# code 4.32
#

from sklearn.linear_model import RidgeCV
from pmdarima.model_selection import RollingForecastCV

# RollingForecastCVクラスのインスタンスを作成
cv = RollingForecastCV(
    initial=36,                     # 初期の訓練用データ期間を36ヶ月に設定
    h=12,                           # 先の12か月のデータを予測対象とする
    step=1                          # ローリングするステップ数を3ヶ月ずつとする
)

# パラメータ設定
H = 12                              # 最大予測ステップ
pred_alpha = 0.05                   # 予測区間の信頼レベル
use_lags = True                     # ラグ特徴量を使用
max_lag = 12                        # 最大のラグ数
use_exog = False                    # 外部変数は使用しない

# 時系列CCPで予測区間幅を計算
widths, residuals = ccp_recursive_multi_step_general(
    y=y_train.values,                # 目的変数
    X=None,                          # 外生変数は使用しない
    cv_splits=cv.split(y_train),
    H=H, 
    pred_alpha=pred_alpha,
    use_lags=use_lags,
    max_lag=max_lag,
    use_exog=use_exog,
    model_class=RidgeCV,             # RidgeCV回帰モデルを使用
    alphas=np.logspace(-3, 3, 1000)  # Ridgeのα値の探索範囲
)

# 各ステップごとに予測区間の幅を出力
for step, width in enumerate(widths, start=1):
    print(f"{step} 先予測の予測区間の幅: {width}")
1 先予測の予測区間の幅: 45.09864053791801
2 先予測の予測区間の幅: 47.303124898965095
3 先予測の予測区間の幅: 51.36843881154095
4 先予測の予測区間の幅: 54.693867193013155
5 先予測の予測区間の幅: 59.118012905398615
6 先予測の予測区間の幅: 61.82566022632674
7 先予測の予測区間の幅: 63.87338805892998
8 先予測の予測区間の幅: 65.65040821684723
9 先予測の予測区間の幅: 67.9299782560119
10 先予測の予測区間の幅: 70.0327954042567
11 先予測の予測区間の幅: 73.42143920010884
12 先予測の予測区間の幅: 78.31795777525431
In [35]:
#
# code 4.33
#

# 最終モデル作成: 学習データ全体を使用してモデルを構築
final_model = RidgeCV()

# 学習データ全体用の特徴量行列とターゲット変数を作成
final_X_train, final_y_train = make_train_xy(
    y_array=y_train,
    X_array=None,
    idx_array=np.arange(len(y_train)), 
    use_lags=use_lags,
    max_lag=max_lag,
    use_exog=use_exog
)

# モデル学習
final_model.fit(final_X_train, final_y_train)
In [36]:
#
# code 4.34
#

# 予測結果を格納するリスト
preds, lower_bounds, upper_bounds = [], [], []

# 既知のy
y_known = np.concatenate([
    y_train, 
    np.full(len(df_ap['y']) - len(y_train), np.nan)
])

# テストデータの開始インデックス
test_start_idx = len(y_train)  

# テストデータ期間の各時点で予測実行
for t in range(test_start_idx, test_start_idx + len(y_test)):

    # 特徴量生成: ラグを考慮
    features = build_feature_vector(
        t=t,
        y_array=y_known,
        X_array=None,
        use_lags=use_lags,
        max_lag=max_lag,
        use_exog=use_exog
    )

    # 予測値を計算
    pred = final_model.predict(features)[0]
    preds.append(pred)                          # 各予測値をリストに追加

    # 予測区間の幅を取得
    step_idx = min(H - 1, t - test_start_idx)   # Hステップ以上は最大幅を使用
    interval_width = widths[step_idx]
    lower_bounds.append(pred - interval_width)  # 下限値を計算
    upper_bounds.append(pred + interval_width)  # 上限値を計算

    # 再帰的予測: y_knownに予測値を更新
    y_known[t] = pred

# 予測ごとに予測値と予測区間を出力
preds_bounds = zip(preds, lower_bounds, upper_bounds)
for i, (pred, lower, upper) in enumerate(preds_bounds, 1):
    print(f"{i} 先予測: 予測値 = {pred}, 予測区間 = [{lower}, {upper}]")
1 先予測: 予測値 = 395.35037285893526, 予測区間 = [350.25173232101724, 440.4490133968533]
2 先予測: 予測値 = 381.0014629162444, 予測区間 = [333.6983380172793, 428.3045878152095]
3 先予測: 予測値 = 427.2777022150986, 予測区間 = [375.90926340355765, 478.64614102663955]
4 先予測: 予測値 = 426.4432062926124, 予測区間 = [371.74933909959924, 481.13707348562554]
5 先予測: 予測値 = 466.11202076744087, 予測区間 = [406.9940078620423, 525.2300336728395]
6 先予測: 予測値 = 512.1022270907182, 予測区間 = [450.27656686439144, 573.9278873170449]
7 先予測: 予測値 = 597.5266252173615, 予測区間 = [533.6532371584315, 661.4000132762915]
8 先予測: 予測値 = 607.1613843193647, 予測区間 = [541.5109761025175, 672.811792536212]
9 先予測: 予測値 = 526.9948028430965, 予測区間 = [459.0648245870846, 594.9247810991085]
10 先予測: 予測値 = 456.07084851749585, 予測区間 = [386.03805311323913, 526.1036439217526]
11 先予測: 予測値 = 404.1681036741894, 予測区間 = [330.7466644740806, 477.5895428742982]
12 先予測: 予測値 = 441.6256466228844, 予測区間 = [363.30768884763006, 519.9436043981387]
In [37]:
#
# code 4.35
#

# テストデータ期間の予測精度を計算
mae = mean_absolute_error(y_test, preds)
mape = mean_absolute_percentage_error(y_test, preds)
print(f"テストデータの MAE: {mae:.2f}")
print(f"テストデータの MAPE: {mape*100:.2f}%")

# 予測区間の評価(実測値が予測区間に含まれる割合)
within_interval = (y_test >= lower_bounds) & (y_test <= upper_bounds)
within_interval_count = np.sum(within_interval)
coverage = within_interval_count / len(y_test)
print(f"\n予測区間の信頼水準: {(1-pred_alpha)*100}%")
print(f"実際のカバー率: {coverage*100:.2f}%")

# 結果をデータフレームにまとめて表示
test_final = pd.DataFrame({
    'y': y_test.values,
    'prediction': preds,
    'lower': lower_bounds,
    'upper': upper_bounds,
    'within_interval': within_interval
})
print("\n予測結果の詳細:")
print(test_final)
テストデータの MAE: 14.72
テストデータの MAPE: 3.13%

予測区間の信頼水準: 95.0%
実際のカバー率: 100.00%

予測結果の詳細:
                y  prediction       lower       upper  within_interval
1960-01-31  417.0  395.350373  350.251732  440.449013             True
1960-02-29  391.0  381.001463  333.698338  428.304588             True
1960-03-31  419.0  427.277702  375.909263  478.646141             True
1960-04-30  461.0  426.443206  371.749339  481.137073             True
1960-05-31  472.0  466.112021  406.994008  525.230034             True
1960-06-30  535.0  512.102227  450.276567  573.927887             True
1960-07-31  622.0  597.526625  533.653237  661.400013             True
1960-08-31  606.0  607.161384  541.510976  672.811793             True
1960-09-30  508.0  526.994803  459.064825  594.924781             True
1960-10-31  461.0  456.070849  386.038053  526.103644             True
1960-11-30  390.0  404.168104  330.746664  477.589543             True
1960-12-31  432.0  441.625647  363.307689  519.943604             True
In [38]:
#
# code 4.36
#

# 学習データ期間の予測値
fitted = pd.Series(
    data=final_model.predict(final_X_train),
    index=y_train.index[max_lag:]
)

# テストデータ期間の予測値
forecast = test_final.prediction

# 予測区間
conf = np.column_stack([
    lower_bounds, 
    upper_bounds
])

# 結果をプロット
plot_prediction_results(
    y=df_ap['y'],
    fitted=fitted,
    forecast=forecast,
    conf=conf
)
No description has been provided for this image