- Replace hardcoded DEFAULT_PD_FLOORS with build_complete_pd_floor_table() (KAP bond YTM) - Fix ADF test: autolag='AIC' -> 'BIC' for small sample (N=26) robustness - Expand variable search: 40 -> 226 vars (log/diff/return/lag2), 1.9M combos - Select Model #2: HOUSING_PRICE + CREDIT_SPREAD_LAG1 + CURRENT_ACCOUNT_R - Add 6-test diagnostics table to AR1 sheet (ADF/LB/DW/BP/ARCH/Shapiro) - Add Korean variable names for transformed variables - Generate report v7 with full diagnostics
498 lines
17 KiB
Python
498 lines
17 KiB
Python
"""
|
||
거시경제 변수 ↔ Zt 연계 AR(1) + Macro 모형
|
||
|
||
Zt(신용사이클 인덱스)를 거시경제변수와 자기회귀 항으로 설명하는 모형.
|
||
|
||
AR(1) + Macro 모형:
|
||
Z(t) = c + phi*Z(t-1) + beta1*X1(t) + beta2*X2(t) + beta3*X3(t) + eps(t)
|
||
|
||
방법론 참고:
|
||
- Moody's Analytics: Z-score macro regression scenario forecast
|
||
- Zanders Group: Vasicek Z macro regression PiT transition matrix
|
||
- EBA/ECB: Forward-looking macro overlay on Z-index
|
||
- IFRS 9 B5.5.42-44
|
||
"""
|
||
|
||
import numpy as np
|
||
import pandas as pd
|
||
import statsmodels.api as sm
|
||
from statsmodels.stats.diagnostic import het_breuschpagan, acorr_ljungbox
|
||
from statsmodels.stats.stattools import durbin_watson
|
||
from statsmodels.stats.outliers_influence import variance_inflation_factor
|
||
from scipy import stats
|
||
from typing import Dict, List, Optional, Tuple
|
||
import logging
|
||
import warnings
|
||
|
||
logger = logging.getLogger(__name__)
|
||
warnings.filterwarnings("ignore", category=FutureWarning)
|
||
|
||
|
||
class MacroZtModel:
|
||
"""
|
||
거시경제변수 → Zt 회귀모형
|
||
|
||
Features:
|
||
- OLS 다중회귀
|
||
- 변수 선택 (Stepwise AIC/BIC)
|
||
- 잔차 진단 (ADF, Ljung-Box, Breusch-Pagan, DW)
|
||
- VIF 다중공선성 체크
|
||
- 시나리오별 Zt 예측
|
||
"""
|
||
|
||
def __init__(self):
|
||
self.model = None
|
||
self.result = None
|
||
self.selected_vars = None
|
||
self.scaler_params = {}
|
||
# AR(1) attributes
|
||
self.ar1_phi = None
|
||
self.ar1_const = None
|
||
self.ar1_beta = None
|
||
self.ar1_sigma_eps = None
|
||
self.ar1_macro_stats = {}
|
||
self.is_ar1 = False
|
||
self._ar1_var_names = None
|
||
|
||
def fit(
|
||
self,
|
||
zt_series: pd.Series,
|
||
macro_data: pd.DataFrame,
|
||
method: str = "stepwise_aic",
|
||
standardize: bool = False,
|
||
forced_vars: Optional[List[str]] = None
|
||
) -> "MacroZtModel":
|
||
"""
|
||
Zt ~ 거시변수 회귀모형 적합
|
||
|
||
Parameters
|
||
----------
|
||
zt_series : pd.Series
|
||
index=연도, values=Zt 추정값
|
||
macro_data : pd.DataFrame
|
||
index=연도, columns=거시변수
|
||
method : str
|
||
변수 선택 방법:
|
||
- "all": 모든 변수 사용
|
||
- "stepwise_aic": Forward stepwise (AIC 기준)
|
||
- "stepwise_bic": Forward stepwise (BIC 기준)
|
||
standardize : bool
|
||
거시변수 표준화 여부
|
||
|
||
Returns
|
||
-------
|
||
self
|
||
"""
|
||
# 인덱스 정렬 및 교집합
|
||
common_years = sorted(set(zt_series.index) & set(macro_data.index))
|
||
if len(common_years) < 5:
|
||
raise ValueError(f"공통 데이터 포인트가 부족합니다: {len(common_years)}개")
|
||
|
||
y = zt_series.loc[common_years].values.astype(float)
|
||
X = macro_data.loc[common_years].copy()
|
||
|
||
# 결측치 처리
|
||
X = X.ffill().bfill().dropna(axis=1)
|
||
|
||
# 표준화
|
||
if standardize:
|
||
for col in X.columns:
|
||
mean = X[col].mean()
|
||
std = X[col].std()
|
||
if std > 0:
|
||
self.scaler_params[col] = {"mean": mean, "std": std}
|
||
X[col] = (X[col] - mean) / std
|
||
else:
|
||
X = X.drop(columns=[col])
|
||
|
||
# 변수 선택
|
||
if forced_vars:
|
||
available = [v for v in forced_vars if v in X.columns]
|
||
if len(available) != len(forced_vars):
|
||
missing = set(forced_vars) - set(available)
|
||
logger.warning(f"강제 지정 변수 중 누락: {missing}")
|
||
self.selected_vars = available
|
||
logger.info(f"강제 지정 변수 사용: {self.selected_vars}")
|
||
elif method == "all":
|
||
self.selected_vars = list(X.columns)
|
||
elif method.startswith("stepwise"):
|
||
criterion = "aic" if "aic" in method else "bic"
|
||
self.selected_vars = self._stepwise_selection(y, X, criterion)
|
||
else:
|
||
self.selected_vars = list(X.columns)
|
||
|
||
if not self.selected_vars:
|
||
logger.warning("변수 선택 결과 선택된 변수가 없습니다. 전체 변수 사용.")
|
||
self.selected_vars = list(X.columns)
|
||
|
||
# 최종 모형 적합
|
||
X_selected = sm.add_constant(X[self.selected_vars].values)
|
||
self.model = sm.OLS(y, X_selected)
|
||
self.result = self.model.fit()
|
||
|
||
logger.info(f"회귀모형 적합 완료: 선택변수 = {self.selected_vars}")
|
||
logger.info(f" R² = {self.result.rsquared:.4f}, "
|
||
f"Adj.R² = {self.result.rsquared_adj:.4f}, "
|
||
f"AIC = {self.result.aic:.2f}")
|
||
|
||
return self
|
||
|
||
def _stepwise_selection(
|
||
self,
|
||
y: np.ndarray,
|
||
X: pd.DataFrame,
|
||
criterion: str = "aic"
|
||
) -> List[str]:
|
||
"""Forward Stepwise 변수 선택"""
|
||
remaining = list(X.columns)
|
||
selected = []
|
||
current_score = np.inf
|
||
|
||
while remaining:
|
||
scores = {}
|
||
for var in remaining:
|
||
trial_vars = selected + [var]
|
||
X_trial = sm.add_constant(X[trial_vars].values)
|
||
try:
|
||
model = sm.OLS(y, X_trial).fit()
|
||
score = model.aic if criterion == "aic" else model.bic
|
||
scores[var] = score
|
||
except Exception:
|
||
continue
|
||
|
||
if not scores:
|
||
break
|
||
|
||
best_var = min(scores, key=scores.get)
|
||
best_score = scores[best_var]
|
||
|
||
if best_score < current_score:
|
||
selected.append(best_var)
|
||
remaining.remove(best_var)
|
||
current_score = best_score
|
||
logger.debug(f" + {best_var} ({criterion.upper()} = {best_score:.2f})")
|
||
else:
|
||
break
|
||
|
||
return selected
|
||
|
||
def predict(self, macro_scenario: pd.DataFrame) -> np.ndarray:
|
||
"""
|
||
거시 시나리오로 Zt 예측
|
||
|
||
Parameters
|
||
----------
|
||
macro_scenario : pd.DataFrame
|
||
columns에 selected_vars가 포함되어야 함
|
||
|
||
Returns
|
||
-------
|
||
np.ndarray : Zt 예측값 배열
|
||
"""
|
||
if self.result is None:
|
||
raise ValueError("모형이 적합되지 않았습니다. fit()을 먼저 실행하세요.")
|
||
|
||
X = macro_scenario[self.selected_vars].copy()
|
||
|
||
# 학습 데이터와 동일한 표준화 적용
|
||
for col in X.columns:
|
||
if col in self.scaler_params:
|
||
mean = self.scaler_params[col]["mean"]
|
||
std = self.scaler_params[col]["std"]
|
||
X[col] = (X[col] - mean) / std
|
||
|
||
X_const = sm.add_constant(X.values, has_constant="add")
|
||
return self.result.predict(X_const)
|
||
|
||
def diagnostics(self) -> Dict[str, any]:
|
||
"""
|
||
회귀 모형 진단 결과 반환
|
||
|
||
Returns
|
||
-------
|
||
dict with keys:
|
||
- r_squared, adj_r_squared
|
||
- f_stat, f_pvalue
|
||
- aic, bic
|
||
- durbin_watson
|
||
- ljung_box (p-value)
|
||
- breusch_pagan (p-value)
|
||
- vif (각 변수별)
|
||
- coefficients (DataFrame)
|
||
"""
|
||
if self.result is None:
|
||
return {}
|
||
|
||
diag = {
|
||
"r_squared": self.result.rsquared,
|
||
"adj_r_squared": self.result.rsquared_adj,
|
||
"f_stat": self.result.fvalue,
|
||
"f_pvalue": self.result.f_pvalue,
|
||
"aic": self.result.aic,
|
||
"bic": self.result.bic,
|
||
"n_obs": int(self.result.nobs),
|
||
"selected_vars": self.selected_vars,
|
||
}
|
||
|
||
# Durbin-Watson
|
||
residuals = self.result.resid
|
||
diag["durbin_watson"] = durbin_watson(residuals)
|
||
|
||
# Ljung-Box (자기상관 검정)
|
||
try:
|
||
lb_result = acorr_ljungbox(residuals, lags=[5], return_df=True)
|
||
diag["ljung_box_stat"] = lb_result["lb_stat"].values[0]
|
||
diag["ljung_box_pvalue"] = lb_result["lb_pvalue"].values[0]
|
||
except Exception:
|
||
diag["ljung_box_pvalue"] = np.nan
|
||
|
||
# Breusch-Pagan (이분산 검정)
|
||
try:
|
||
bp_stat, bp_pvalue, _, _ = het_breuschpagan(
|
||
residuals, self.result.model.exog
|
||
)
|
||
diag["breusch_pagan_stat"] = bp_stat
|
||
diag["breusch_pagan_pvalue"] = bp_pvalue
|
||
except Exception:
|
||
diag["breusch_pagan_pvalue"] = np.nan
|
||
|
||
# VIF (다중공선성)
|
||
try:
|
||
X = self.result.model.exog
|
||
vif_values = {}
|
||
if self.is_ar1 and self._ar1_var_names:
|
||
var_names = self._ar1_var_names
|
||
else:
|
||
var_names = ["const"] + self.selected_vars
|
||
for i in range(min(X.shape[1], len(var_names))):
|
||
vif_values[var_names[i]] = variance_inflation_factor(X, i)
|
||
diag["vif"] = vif_values
|
||
except Exception:
|
||
diag["vif"] = {}
|
||
|
||
# 계수 요약
|
||
if self.is_ar1 and self._ar1_var_names:
|
||
coef_names = self._ar1_var_names
|
||
else:
|
||
coef_names = ["const"] + self.selected_vars
|
||
coef_df = pd.DataFrame({
|
||
"변수": coef_names,
|
||
"계수": self.result.params,
|
||
"표준오차": self.result.bse,
|
||
"t값": self.result.tvalues,
|
||
"p값": self.result.pvalues,
|
||
})
|
||
diag["coefficients"] = coef_df
|
||
|
||
return diag
|
||
|
||
def summary(self) -> str:
|
||
"""모형 요약 출력"""
|
||
if self.result is None:
|
||
return "모형이 적합되지 않았습니다."
|
||
return str(self.result.summary())
|
||
|
||
def residual_series(self) -> np.ndarray:
|
||
"""잔차 시계열 반환"""
|
||
if self.result is None:
|
||
return np.array([])
|
||
return self.result.resid
|
||
|
||
# ================================================================
|
||
# AR(1) + Macro 모형
|
||
# ================================================================
|
||
|
||
def fit_ar1(
|
||
self,
|
||
zt_series: pd.Series,
|
||
macro_data: pd.DataFrame,
|
||
forced_vars: Optional[List[str]] = None
|
||
) -> "MacroZtModel":
|
||
"""
|
||
AR(1) + Macro 모형 적합
|
||
|
||
Z(t) = c + phi*Z(t-1) + beta1*X1(t) + beta2*X2(t) + beta3*X3(t) + eps(t)
|
||
|
||
Z(t-1)을 설명변수에 포함하여 OLS로 추정합니다.
|
||
|
||
Parameters
|
||
----------
|
||
zt_series : pd.Series
|
||
index=연도, values=Zt 추정값
|
||
macro_data : pd.DataFrame
|
||
index=연도, columns=거시변수
|
||
forced_vars : List[str], optional
|
||
강제 지정 거시변수
|
||
"""
|
||
self.is_ar1 = True
|
||
|
||
# 인덱스 정렬 및 교집합
|
||
common_years = sorted(set(zt_series.index) & set(macro_data.index))
|
||
if len(common_years) < 5:
|
||
raise ValueError(f"공통 데이터 포인트가 부족합니다: {len(common_years)}개")
|
||
|
||
# AR(1) 구성: Z(t)와 Z(t-1) 쌍
|
||
zt_full = zt_series.loc[common_years].sort_index()
|
||
years = list(zt_full.index)
|
||
|
||
# t-1이 필요하므로 첫 해 제외
|
||
y_years = years[1:]
|
||
|
||
y = zt_full.loc[y_years].values.astype(float)
|
||
z_lag = zt_full.loc[years[:-1]].values.astype(float)
|
||
|
||
# 거시변수
|
||
X_macro = macro_data.loc[y_years].copy()
|
||
X_macro = X_macro.ffill().bfill().dropna(axis=1)
|
||
|
||
# 변수 선택
|
||
if forced_vars:
|
||
available = [v for v in forced_vars if v in X_macro.columns]
|
||
if len(available) != len(forced_vars):
|
||
missing = set(forced_vars) - set(available)
|
||
logger.warning(f"AR(1) 강제 지정 변수 중 누락: {missing}")
|
||
self.selected_vars = available
|
||
else:
|
||
self.selected_vars = list(X_macro.columns)
|
||
|
||
# 거시변수 표본 통계 저장 (시나리오 충격용)
|
||
for col in self.selected_vars:
|
||
col_data = macro_data[col].dropna()
|
||
self.ar1_macro_stats[col] = {
|
||
"mean": float(col_data.mean()),
|
||
"std": float(col_data.std()),
|
||
"last": float(col_data.iloc[-1]) if len(col_data) > 0 else 0.0
|
||
}
|
||
|
||
# 거시변수 표준화 (mean=0, std=1)
|
||
# → β가 "1σ 충격의 Z 영향"으로 직접 해석됨
|
||
# → 절편 c가 장기 평균 Z 수준을 결정
|
||
X_std = X_macro[self.selected_vars].copy()
|
||
for col in self.selected_vars:
|
||
mean = self.ar1_macro_stats[col]["mean"]
|
||
std = self.ar1_macro_stats[col]["std"]
|
||
if std > 0:
|
||
X_std[col] = (X_std[col] - mean) / std
|
||
else:
|
||
X_std[col] = 0.0
|
||
|
||
# 설계행렬: [const, Z(t-1), X1_std(t), X2_std(t), X3_std(t)]
|
||
X_design = np.column_stack([
|
||
z_lag,
|
||
X_std.values
|
||
])
|
||
X_with_const = sm.add_constant(X_design)
|
||
|
||
# OLS 적합
|
||
self.model = sm.OLS(y, X_with_const)
|
||
self.result = self.model.fit()
|
||
|
||
# AR(1) 파라미터 추출
|
||
# params: [const, phi, beta1, beta2, ...]
|
||
params = self.result.params
|
||
self.ar1_const = params[0]
|
||
self.ar1_phi = params[1]
|
||
self.ar1_beta = {}
|
||
for i, col in enumerate(self.selected_vars):
|
||
self.ar1_beta[col] = params[2 + i]
|
||
self.ar1_sigma_eps = float(np.std(self.result.resid))
|
||
|
||
# 변수명 저장 (진단용)
|
||
self._ar1_var_names = ["const", "Z_lag1"] + self.selected_vars
|
||
|
||
# 정상성 체크
|
||
if abs(self.ar1_phi) > 0 and abs(self.ar1_phi) < 1:
|
||
half_life = np.log(2) / abs(np.log(abs(self.ar1_phi)))
|
||
else:
|
||
half_life = np.inf
|
||
|
||
logger.info(f"AR(1)+Macro 적합 완료:")
|
||
logger.info(f" phi = {self.ar1_phi:.4f} (반감기 = {half_life:.1f}년)")
|
||
logger.info(f" c = {self.ar1_const:.4f}")
|
||
for col, beta in self.ar1_beta.items():
|
||
logger.info(f" beta({col}) = {beta:+.6f}")
|
||
logger.info(f" R2 = {self.result.rsquared:.4f}, "
|
||
f"Adj.R2 = {self.result.rsquared_adj:.4f}")
|
||
|
||
if abs(self.ar1_phi) >= 1.0:
|
||
logger.warning(f" phi={self.ar1_phi:.4f} >= 1.0 -> non-stationary!")
|
||
|
||
return self
|
||
|
||
def forecast_z_path(
|
||
self,
|
||
z_last: float,
|
||
macro_shocks: Dict[str, float],
|
||
horizon: int = 50
|
||
) -> np.ndarray:
|
||
"""
|
||
AR(1) 모형으로 미래 Z 경로 생성
|
||
|
||
t=1: Z = c + phi*Z(t0) + sum(beta_i * shock_i * sigma_i)
|
||
t>=2: Z = c + phi*Z(t-1) (거시 충격 없음, AR 감쇠만)
|
||
|
||
Parameters
|
||
----------
|
||
z_last : float
|
||
마지막 관측 Z(t0)
|
||
macro_shocks : Dict[str, float]
|
||
변수별 충격 (sigma 배수)
|
||
예: {"USDKRW": +1.5, "RETAIL_SALES": -1.5}
|
||
horizon : int
|
||
예측 기간 (년)
|
||
|
||
Returns
|
||
-------
|
||
np.ndarray : [Z(t0+1), ..., Z(t0+horizon)]
|
||
"""
|
||
if not self.is_ar1:
|
||
raise ValueError("AR(1) 모형이 적합되지 않았습니다.")
|
||
|
||
z_path = np.zeros(horizon)
|
||
z_prev = z_last
|
||
|
||
for t in range(horizon):
|
||
z_next = self.ar1_const + self.ar1_phi * z_prev
|
||
|
||
# t=0 (첫 해)만 거시 충격 적용
|
||
# β는 표준화된 변수 기준이므로 shock_sigma가 곧 β의 배수
|
||
if t == 0:
|
||
for var, shock_sigma in macro_shocks.items():
|
||
if var in self.ar1_beta:
|
||
beta = self.ar1_beta[var]
|
||
z_next += beta * shock_sigma
|
||
|
||
z_path[t] = z_next
|
||
z_prev = z_next
|
||
|
||
return z_path
|
||
|
||
|
||
def build_macro_zt_model(
|
||
zt_dict: Dict[int, float],
|
||
macro_df: pd.DataFrame,
|
||
method: str = "ar1_macro",
|
||
forced_vars: Optional[List[str]] = None
|
||
) -> MacroZtModel:
|
||
"""
|
||
편의 함수: Zt + 거시 DataFrame -> 회귀모형 구축
|
||
|
||
Parameters
|
||
----------
|
||
zt_dict : {연도: Zt값}
|
||
macro_df : index=연도, columns=거시변수
|
||
method : "ar1_macro" (기본) | "stepwise_aic" | "all"
|
||
forced_vars : 강제 지정 변수
|
||
"""
|
||
zt_series = pd.Series(zt_dict, name="Zt")
|
||
zt_series.index.name = "YEAR"
|
||
|
||
model = MacroZtModel()
|
||
|
||
if method == "ar1_macro":
|
||
model.fit_ar1(zt_series, macro_df, forced_vars=forced_vars)
|
||
else:
|
||
model.fit(zt_series, macro_df, method=method, forced_vars=forced_vars)
|
||
|
||
return model
|