Files
LifetimePD/models/macro_model.py
Variet Agent d1ddf06e5d feat(model): KAP YTM PD floor integration, expanded 226-var search, ADF fix (AIC->BIC), Model#2 with 6-test diagnostics
- Replace hardcoded DEFAULT_PD_FLOORS with build_complete_pd_floor_table() (KAP bond YTM)
- Fix ADF test: autolag='AIC' -> 'BIC' for small sample (N=26) robustness
- Expand variable search: 40 -> 226 vars (log/diff/return/lag2), 1.9M combos
- Select Model #2: HOUSING_PRICE + CREDIT_SPREAD_LAG1 + CURRENT_ACCOUNT_R
- Add 6-test diagnostics table to AR1 sheet (ADF/LB/DW/BP/ARCH/Shapiro)
- Add Korean variable names for transformed variables
- Generate report v7 with full diagnostics
2026-03-12 00:06:23 +09:00

498 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
거시경제 변수 ↔ Zt 연계 AR(1) + Macro 모형
Zt(신용사이클 인덱스)를 거시경제변수와 자기회귀 항으로 설명하는 모형.
AR(1) + Macro 모형:
Z(t) = c + phi*Z(t-1) + beta1*X1(t) + beta2*X2(t) + beta3*X3(t) + eps(t)
방법론 참고:
- Moody's Analytics: Z-score macro regression scenario forecast
- Zanders Group: Vasicek Z macro regression PiT transition matrix
- EBA/ECB: Forward-looking macro overlay on Z-index
- IFRS 9 B5.5.42-44
"""
import numpy as np
import pandas as pd
import statsmodels.api as sm
from statsmodels.stats.diagnostic import het_breuschpagan, acorr_ljungbox
from statsmodels.stats.stattools import durbin_watson
from statsmodels.stats.outliers_influence import variance_inflation_factor
from scipy import stats
from typing import Dict, List, Optional, Tuple
import logging
import warnings
logger = logging.getLogger(__name__)
warnings.filterwarnings("ignore", category=FutureWarning)
class MacroZtModel:
"""
거시경제변수 → Zt 회귀모형
Features:
- OLS 다중회귀
- 변수 선택 (Stepwise AIC/BIC)
- 잔차 진단 (ADF, Ljung-Box, Breusch-Pagan, DW)
- VIF 다중공선성 체크
- 시나리오별 Zt 예측
"""
def __init__(self):
self.model = None
self.result = None
self.selected_vars = None
self.scaler_params = {}
# AR(1) attributes
self.ar1_phi = None
self.ar1_const = None
self.ar1_beta = None
self.ar1_sigma_eps = None
self.ar1_macro_stats = {}
self.is_ar1 = False
self._ar1_var_names = None
def fit(
self,
zt_series: pd.Series,
macro_data: pd.DataFrame,
method: str = "stepwise_aic",
standardize: bool = False,
forced_vars: Optional[List[str]] = None
) -> "MacroZtModel":
"""
Zt ~ 거시변수 회귀모형 적합
Parameters
----------
zt_series : pd.Series
index=연도, values=Zt 추정값
macro_data : pd.DataFrame
index=연도, columns=거시변수
method : str
변수 선택 방법:
- "all": 모든 변수 사용
- "stepwise_aic": Forward stepwise (AIC 기준)
- "stepwise_bic": Forward stepwise (BIC 기준)
standardize : bool
거시변수 표준화 여부
Returns
-------
self
"""
# 인덱스 정렬 및 교집합
common_years = sorted(set(zt_series.index) & set(macro_data.index))
if len(common_years) < 5:
raise ValueError(f"공통 데이터 포인트가 부족합니다: {len(common_years)}")
y = zt_series.loc[common_years].values.astype(float)
X = macro_data.loc[common_years].copy()
# 결측치 처리
X = X.ffill().bfill().dropna(axis=1)
# 표준화
if standardize:
for col in X.columns:
mean = X[col].mean()
std = X[col].std()
if std > 0:
self.scaler_params[col] = {"mean": mean, "std": std}
X[col] = (X[col] - mean) / std
else:
X = X.drop(columns=[col])
# 변수 선택
if forced_vars:
available = [v for v in forced_vars if v in X.columns]
if len(available) != len(forced_vars):
missing = set(forced_vars) - set(available)
logger.warning(f"강제 지정 변수 중 누락: {missing}")
self.selected_vars = available
logger.info(f"강제 지정 변수 사용: {self.selected_vars}")
elif method == "all":
self.selected_vars = list(X.columns)
elif method.startswith("stepwise"):
criterion = "aic" if "aic" in method else "bic"
self.selected_vars = self._stepwise_selection(y, X, criterion)
else:
self.selected_vars = list(X.columns)
if not self.selected_vars:
logger.warning("변수 선택 결과 선택된 변수가 없습니다. 전체 변수 사용.")
self.selected_vars = list(X.columns)
# 최종 모형 적합
X_selected = sm.add_constant(X[self.selected_vars].values)
self.model = sm.OLS(y, X_selected)
self.result = self.model.fit()
logger.info(f"회귀모형 적합 완료: 선택변수 = {self.selected_vars}")
logger.info(f" R² = {self.result.rsquared:.4f}, "
f"Adj.R² = {self.result.rsquared_adj:.4f}, "
f"AIC = {self.result.aic:.2f}")
return self
def _stepwise_selection(
self,
y: np.ndarray,
X: pd.DataFrame,
criterion: str = "aic"
) -> List[str]:
"""Forward Stepwise 변수 선택"""
remaining = list(X.columns)
selected = []
current_score = np.inf
while remaining:
scores = {}
for var in remaining:
trial_vars = selected + [var]
X_trial = sm.add_constant(X[trial_vars].values)
try:
model = sm.OLS(y, X_trial).fit()
score = model.aic if criterion == "aic" else model.bic
scores[var] = score
except Exception:
continue
if not scores:
break
best_var = min(scores, key=scores.get)
best_score = scores[best_var]
if best_score < current_score:
selected.append(best_var)
remaining.remove(best_var)
current_score = best_score
logger.debug(f" + {best_var} ({criterion.upper()} = {best_score:.2f})")
else:
break
return selected
def predict(self, macro_scenario: pd.DataFrame) -> np.ndarray:
"""
거시 시나리오로 Zt 예측
Parameters
----------
macro_scenario : pd.DataFrame
columns에 selected_vars가 포함되어야 함
Returns
-------
np.ndarray : Zt 예측값 배열
"""
if self.result is None:
raise ValueError("모형이 적합되지 않았습니다. fit()을 먼저 실행하세요.")
X = macro_scenario[self.selected_vars].copy()
# 학습 데이터와 동일한 표준화 적용
for col in X.columns:
if col in self.scaler_params:
mean = self.scaler_params[col]["mean"]
std = self.scaler_params[col]["std"]
X[col] = (X[col] - mean) / std
X_const = sm.add_constant(X.values, has_constant="add")
return self.result.predict(X_const)
def diagnostics(self) -> Dict[str, any]:
"""
회귀 모형 진단 결과 반환
Returns
-------
dict with keys:
- r_squared, adj_r_squared
- f_stat, f_pvalue
- aic, bic
- durbin_watson
- ljung_box (p-value)
- breusch_pagan (p-value)
- vif (각 변수별)
- coefficients (DataFrame)
"""
if self.result is None:
return {}
diag = {
"r_squared": self.result.rsquared,
"adj_r_squared": self.result.rsquared_adj,
"f_stat": self.result.fvalue,
"f_pvalue": self.result.f_pvalue,
"aic": self.result.aic,
"bic": self.result.bic,
"n_obs": int(self.result.nobs),
"selected_vars": self.selected_vars,
}
# Durbin-Watson
residuals = self.result.resid
diag["durbin_watson"] = durbin_watson(residuals)
# Ljung-Box (자기상관 검정)
try:
lb_result = acorr_ljungbox(residuals, lags=[5], return_df=True)
diag["ljung_box_stat"] = lb_result["lb_stat"].values[0]
diag["ljung_box_pvalue"] = lb_result["lb_pvalue"].values[0]
except Exception:
diag["ljung_box_pvalue"] = np.nan
# Breusch-Pagan (이분산 검정)
try:
bp_stat, bp_pvalue, _, _ = het_breuschpagan(
residuals, self.result.model.exog
)
diag["breusch_pagan_stat"] = bp_stat
diag["breusch_pagan_pvalue"] = bp_pvalue
except Exception:
diag["breusch_pagan_pvalue"] = np.nan
# VIF (다중공선성)
try:
X = self.result.model.exog
vif_values = {}
if self.is_ar1 and self._ar1_var_names:
var_names = self._ar1_var_names
else:
var_names = ["const"] + self.selected_vars
for i in range(min(X.shape[1], len(var_names))):
vif_values[var_names[i]] = variance_inflation_factor(X, i)
diag["vif"] = vif_values
except Exception:
diag["vif"] = {}
# 계수 요약
if self.is_ar1 and self._ar1_var_names:
coef_names = self._ar1_var_names
else:
coef_names = ["const"] + self.selected_vars
coef_df = pd.DataFrame({
"변수": coef_names,
"계수": self.result.params,
"표준오차": self.result.bse,
"t값": self.result.tvalues,
"p값": self.result.pvalues,
})
diag["coefficients"] = coef_df
return diag
def summary(self) -> str:
"""모형 요약 출력"""
if self.result is None:
return "모형이 적합되지 않았습니다."
return str(self.result.summary())
def residual_series(self) -> np.ndarray:
"""잔차 시계열 반환"""
if self.result is None:
return np.array([])
return self.result.resid
# ================================================================
# AR(1) + Macro 모형
# ================================================================
def fit_ar1(
self,
zt_series: pd.Series,
macro_data: pd.DataFrame,
forced_vars: Optional[List[str]] = None
) -> "MacroZtModel":
"""
AR(1) + Macro 모형 적합
Z(t) = c + phi*Z(t-1) + beta1*X1(t) + beta2*X2(t) + beta3*X3(t) + eps(t)
Z(t-1)을 설명변수에 포함하여 OLS로 추정합니다.
Parameters
----------
zt_series : pd.Series
index=연도, values=Zt 추정값
macro_data : pd.DataFrame
index=연도, columns=거시변수
forced_vars : List[str], optional
강제 지정 거시변수
"""
self.is_ar1 = True
# 인덱스 정렬 및 교집합
common_years = sorted(set(zt_series.index) & set(macro_data.index))
if len(common_years) < 5:
raise ValueError(f"공통 데이터 포인트가 부족합니다: {len(common_years)}")
# AR(1) 구성: Z(t)와 Z(t-1) 쌍
zt_full = zt_series.loc[common_years].sort_index()
years = list(zt_full.index)
# t-1이 필요하므로 첫 해 제외
y_years = years[1:]
y = zt_full.loc[y_years].values.astype(float)
z_lag = zt_full.loc[years[:-1]].values.astype(float)
# 거시변수
X_macro = macro_data.loc[y_years].copy()
X_macro = X_macro.ffill().bfill().dropna(axis=1)
# 변수 선택
if forced_vars:
available = [v for v in forced_vars if v in X_macro.columns]
if len(available) != len(forced_vars):
missing = set(forced_vars) - set(available)
logger.warning(f"AR(1) 강제 지정 변수 중 누락: {missing}")
self.selected_vars = available
else:
self.selected_vars = list(X_macro.columns)
# 거시변수 표본 통계 저장 (시나리오 충격용)
for col in self.selected_vars:
col_data = macro_data[col].dropna()
self.ar1_macro_stats[col] = {
"mean": float(col_data.mean()),
"std": float(col_data.std()),
"last": float(col_data.iloc[-1]) if len(col_data) > 0 else 0.0
}
# 거시변수 표준화 (mean=0, std=1)
# → β가 "1σ 충격의 Z 영향"으로 직접 해석됨
# → 절편 c가 장기 평균 Z 수준을 결정
X_std = X_macro[self.selected_vars].copy()
for col in self.selected_vars:
mean = self.ar1_macro_stats[col]["mean"]
std = self.ar1_macro_stats[col]["std"]
if std > 0:
X_std[col] = (X_std[col] - mean) / std
else:
X_std[col] = 0.0
# 설계행렬: [const, Z(t-1), X1_std(t), X2_std(t), X3_std(t)]
X_design = np.column_stack([
z_lag,
X_std.values
])
X_with_const = sm.add_constant(X_design)
# OLS 적합
self.model = sm.OLS(y, X_with_const)
self.result = self.model.fit()
# AR(1) 파라미터 추출
# params: [const, phi, beta1, beta2, ...]
params = self.result.params
self.ar1_const = params[0]
self.ar1_phi = params[1]
self.ar1_beta = {}
for i, col in enumerate(self.selected_vars):
self.ar1_beta[col] = params[2 + i]
self.ar1_sigma_eps = float(np.std(self.result.resid))
# 변수명 저장 (진단용)
self._ar1_var_names = ["const", "Z_lag1"] + self.selected_vars
# 정상성 체크
if abs(self.ar1_phi) > 0 and abs(self.ar1_phi) < 1:
half_life = np.log(2) / abs(np.log(abs(self.ar1_phi)))
else:
half_life = np.inf
logger.info(f"AR(1)+Macro 적합 완료:")
logger.info(f" phi = {self.ar1_phi:.4f} (반감기 = {half_life:.1f}년)")
logger.info(f" c = {self.ar1_const:.4f}")
for col, beta in self.ar1_beta.items():
logger.info(f" beta({col}) = {beta:+.6f}")
logger.info(f" R2 = {self.result.rsquared:.4f}, "
f"Adj.R2 = {self.result.rsquared_adj:.4f}")
if abs(self.ar1_phi) >= 1.0:
logger.warning(f" phi={self.ar1_phi:.4f} >= 1.0 -> non-stationary!")
return self
def forecast_z_path(
self,
z_last: float,
macro_shocks: Dict[str, float],
horizon: int = 50
) -> np.ndarray:
"""
AR(1) 모형으로 미래 Z 경로 생성
t=1: Z = c + phi*Z(t0) + sum(beta_i * shock_i * sigma_i)
t>=2: Z = c + phi*Z(t-1) (거시 충격 없음, AR 감쇠만)
Parameters
----------
z_last : float
마지막 관측 Z(t0)
macro_shocks : Dict[str, float]
변수별 충격 (sigma 배수)
예: {"USDKRW": +1.5, "RETAIL_SALES": -1.5}
horizon : int
예측 기간 (년)
Returns
-------
np.ndarray : [Z(t0+1), ..., Z(t0+horizon)]
"""
if not self.is_ar1:
raise ValueError("AR(1) 모형이 적합되지 않았습니다.")
z_path = np.zeros(horizon)
z_prev = z_last
for t in range(horizon):
z_next = self.ar1_const + self.ar1_phi * z_prev
# t=0 (첫 해)만 거시 충격 적용
# β는 표준화된 변수 기준이므로 shock_sigma가 곧 β의 배수
if t == 0:
for var, shock_sigma in macro_shocks.items():
if var in self.ar1_beta:
beta = self.ar1_beta[var]
z_next += beta * shock_sigma
z_path[t] = z_next
z_prev = z_next
return z_path
def build_macro_zt_model(
zt_dict: Dict[int, float],
macro_df: pd.DataFrame,
method: str = "ar1_macro",
forced_vars: Optional[List[str]] = None
) -> MacroZtModel:
"""
편의 함수: Zt + 거시 DataFrame -> 회귀모형 구축
Parameters
----------
zt_dict : {연도: Zt값}
macro_df : index=연도, columns=거시변수
method : "ar1_macro" (기본) | "stepwise_aic" | "all"
forced_vars : 강제 지정 변수
"""
zt_series = pd.Series(zt_dict, name="Zt")
zt_series.index.name = "YEAR"
model = MacroZtModel()
if method == "ar1_macro":
model.fit_ar1(zt_series, macro_df, forced_vars=forced_vars)
else:
model.fit(zt_series, macro_df, method=method, forced_vars=forced_vars)
return model