319 lines
10 KiB
Python
319 lines
10 KiB
Python
"""
|
|
거시경제 변수 ↔ Zt 연계 통계모형
|
|
|
|
Zt(신용사이클 인덱스)를 거시경제변수로 설명하는 회귀모형을 구축하고,
|
|
미래 거시 시나리오에 따른 Zt 전망을 생성합니다.
|
|
|
|
모형:
|
|
Z_t = β₀ + β₁·GDP_growth + β₂·Unemployment + β₃·Base_Rate
|
|
+ β₄·CD_Rate + β₅·CPI_growth + β₆·Leading_Index + ε_t
|
|
|
|
방법론 참고:
|
|
- IMF (2021). "IFRS 9 and CECL Compatible Estimation for Top-Down Solvency Stress Testing"
|
|
- ECB (2019). "Scenario Design for IFRS 9 Expected Credit Loss Estimation"
|
|
- Fed (2022). "Dodd-Frank Act Stress Test Methodology"
|
|
"""
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
import statsmodels.api as sm
|
|
from statsmodels.stats.diagnostic import het_breuschpagan, acorr_ljungbox
|
|
from statsmodels.stats.stattools import durbin_watson
|
|
from statsmodels.stats.outliers_influence import variance_inflation_factor
|
|
from scipy import stats
|
|
from typing import Dict, List, Optional, Tuple
|
|
import logging
|
|
import warnings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
warnings.filterwarnings("ignore", category=FutureWarning)
|
|
|
|
|
|
class MacroZtModel:
|
|
"""
|
|
거시경제변수 → Zt 회귀모형
|
|
|
|
Features:
|
|
- OLS 다중회귀
|
|
- 변수 선택 (Stepwise AIC/BIC)
|
|
- 잔차 진단 (ADF, Ljung-Box, Breusch-Pagan, DW)
|
|
- VIF 다중공선성 체크
|
|
- 시나리오별 Zt 예측
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.model = None
|
|
self.result = None
|
|
self.selected_vars = None
|
|
self.scaler_params = {} # 정규화 파라미터
|
|
|
|
def fit(
|
|
self,
|
|
zt_series: pd.Series,
|
|
macro_data: pd.DataFrame,
|
|
method: str = "stepwise_aic",
|
|
standardize: bool = False,
|
|
forced_vars: Optional[List[str]] = None
|
|
) -> "MacroZtModel":
|
|
"""
|
|
Zt ~ 거시변수 회귀모형 적합
|
|
|
|
Parameters
|
|
----------
|
|
zt_series : pd.Series
|
|
index=연도, values=Zt 추정값
|
|
macro_data : pd.DataFrame
|
|
index=연도, columns=거시변수
|
|
method : str
|
|
변수 선택 방법:
|
|
- "all": 모든 변수 사용
|
|
- "stepwise_aic": Forward stepwise (AIC 기준)
|
|
- "stepwise_bic": Forward stepwise (BIC 기준)
|
|
standardize : bool
|
|
거시변수 표준화 여부
|
|
|
|
Returns
|
|
-------
|
|
self
|
|
"""
|
|
# 인덱스 정렬 및 교집합
|
|
common_years = sorted(set(zt_series.index) & set(macro_data.index))
|
|
if len(common_years) < 5:
|
|
raise ValueError(f"공통 데이터 포인트가 부족합니다: {len(common_years)}개")
|
|
|
|
y = zt_series.loc[common_years].values.astype(float)
|
|
X = macro_data.loc[common_years].copy()
|
|
|
|
# 결측치 처리
|
|
X = X.ffill().bfill().dropna(axis=1)
|
|
|
|
# 표준화
|
|
if standardize:
|
|
for col in X.columns:
|
|
mean = X[col].mean()
|
|
std = X[col].std()
|
|
if std > 0:
|
|
self.scaler_params[col] = {"mean": mean, "std": std}
|
|
X[col] = (X[col] - mean) / std
|
|
else:
|
|
X = X.drop(columns=[col])
|
|
|
|
# 변수 선택
|
|
if forced_vars:
|
|
available = [v for v in forced_vars if v in X.columns]
|
|
if len(available) != len(forced_vars):
|
|
missing = set(forced_vars) - set(available)
|
|
logger.warning(f"강제 지정 변수 중 누락: {missing}")
|
|
self.selected_vars = available
|
|
logger.info(f"강제 지정 변수 사용: {self.selected_vars}")
|
|
elif method == "all":
|
|
self.selected_vars = list(X.columns)
|
|
elif method.startswith("stepwise"):
|
|
criterion = "aic" if "aic" in method else "bic"
|
|
self.selected_vars = self._stepwise_selection(y, X, criterion)
|
|
else:
|
|
self.selected_vars = list(X.columns)
|
|
|
|
if not self.selected_vars:
|
|
logger.warning("변수 선택 결과 선택된 변수가 없습니다. 전체 변수 사용.")
|
|
self.selected_vars = list(X.columns)
|
|
|
|
# 최종 모형 적합
|
|
X_selected = sm.add_constant(X[self.selected_vars].values)
|
|
self.model = sm.OLS(y, X_selected)
|
|
self.result = self.model.fit()
|
|
|
|
logger.info(f"회귀모형 적합 완료: 선택변수 = {self.selected_vars}")
|
|
logger.info(f" R² = {self.result.rsquared:.4f}, "
|
|
f"Adj.R² = {self.result.rsquared_adj:.4f}, "
|
|
f"AIC = {self.result.aic:.2f}")
|
|
|
|
return self
|
|
|
|
def _stepwise_selection(
|
|
self,
|
|
y: np.ndarray,
|
|
X: pd.DataFrame,
|
|
criterion: str = "aic"
|
|
) -> List[str]:
|
|
"""Forward Stepwise 변수 선택"""
|
|
remaining = list(X.columns)
|
|
selected = []
|
|
current_score = np.inf
|
|
|
|
while remaining:
|
|
scores = {}
|
|
for var in remaining:
|
|
trial_vars = selected + [var]
|
|
X_trial = sm.add_constant(X[trial_vars].values)
|
|
try:
|
|
model = sm.OLS(y, X_trial).fit()
|
|
score = model.aic if criterion == "aic" else model.bic
|
|
scores[var] = score
|
|
except Exception:
|
|
continue
|
|
|
|
if not scores:
|
|
break
|
|
|
|
best_var = min(scores, key=scores.get)
|
|
best_score = scores[best_var]
|
|
|
|
if best_score < current_score:
|
|
selected.append(best_var)
|
|
remaining.remove(best_var)
|
|
current_score = best_score
|
|
logger.debug(f" + {best_var} ({criterion.upper()} = {best_score:.2f})")
|
|
else:
|
|
break
|
|
|
|
return selected
|
|
|
|
def predict(self, macro_scenario: pd.DataFrame) -> np.ndarray:
|
|
"""
|
|
거시 시나리오로 Zt 예측
|
|
|
|
Parameters
|
|
----------
|
|
macro_scenario : pd.DataFrame
|
|
columns에 selected_vars가 포함되어야 함
|
|
|
|
Returns
|
|
-------
|
|
np.ndarray : Zt 예측값 배열
|
|
"""
|
|
if self.result is None:
|
|
raise ValueError("모형이 적합되지 않았습니다. fit()을 먼저 실행하세요.")
|
|
|
|
X = macro_scenario[self.selected_vars].copy()
|
|
|
|
# 학습 데이터와 동일한 표준화 적용
|
|
for col in X.columns:
|
|
if col in self.scaler_params:
|
|
mean = self.scaler_params[col]["mean"]
|
|
std = self.scaler_params[col]["std"]
|
|
X[col] = (X[col] - mean) / std
|
|
|
|
X_const = sm.add_constant(X.values, has_constant="add")
|
|
return self.result.predict(X_const)
|
|
|
|
def diagnostics(self) -> Dict[str, any]:
|
|
"""
|
|
회귀 모형 진단 결과 반환
|
|
|
|
Returns
|
|
-------
|
|
dict with keys:
|
|
- r_squared, adj_r_squared
|
|
- f_stat, f_pvalue
|
|
- aic, bic
|
|
- durbin_watson
|
|
- ljung_box (p-value)
|
|
- breusch_pagan (p-value)
|
|
- vif (각 변수별)
|
|
- coefficients (DataFrame)
|
|
"""
|
|
if self.result is None:
|
|
return {}
|
|
|
|
diag = {
|
|
"r_squared": self.result.rsquared,
|
|
"adj_r_squared": self.result.rsquared_adj,
|
|
"f_stat": self.result.fvalue,
|
|
"f_pvalue": self.result.f_pvalue,
|
|
"aic": self.result.aic,
|
|
"bic": self.result.bic,
|
|
"n_obs": int(self.result.nobs),
|
|
"selected_vars": self.selected_vars,
|
|
}
|
|
|
|
# Durbin-Watson
|
|
residuals = self.result.resid
|
|
diag["durbin_watson"] = durbin_watson(residuals)
|
|
|
|
# Ljung-Box (자기상관 검정)
|
|
try:
|
|
lb_result = acorr_ljungbox(residuals, lags=[5], return_df=True)
|
|
diag["ljung_box_stat"] = lb_result["lb_stat"].values[0]
|
|
diag["ljung_box_pvalue"] = lb_result["lb_pvalue"].values[0]
|
|
except Exception:
|
|
diag["ljung_box_pvalue"] = np.nan
|
|
|
|
# Breusch-Pagan (이분산 검정)
|
|
try:
|
|
bp_stat, bp_pvalue, _, _ = het_breuschpagan(
|
|
residuals, self.result.model.exog
|
|
)
|
|
diag["breusch_pagan_stat"] = bp_stat
|
|
diag["breusch_pagan_pvalue"] = bp_pvalue
|
|
except Exception:
|
|
diag["breusch_pagan_pvalue"] = np.nan
|
|
|
|
# VIF (다중공선성)
|
|
try:
|
|
X = self.result.model.exog
|
|
vif_values = {}
|
|
var_names = ["const"] + self.selected_vars
|
|
for i in range(X.shape[1]):
|
|
vif_values[var_names[i]] = variance_inflation_factor(X, i)
|
|
diag["vif"] = vif_values
|
|
except Exception:
|
|
diag["vif"] = {}
|
|
|
|
# 계수 요약
|
|
coef_df = pd.DataFrame({
|
|
"변수": ["const"] + self.selected_vars,
|
|
"계수": self.result.params,
|
|
"표준오차": self.result.bse,
|
|
"t값": self.result.tvalues,
|
|
"p값": self.result.pvalues,
|
|
})
|
|
diag["coefficients"] = coef_df
|
|
|
|
return diag
|
|
|
|
def summary(self) -> str:
|
|
"""모형 요약 출력"""
|
|
if self.result is None:
|
|
return "모형이 적합되지 않았습니다."
|
|
return str(self.result.summary())
|
|
|
|
def residual_series(self) -> np.ndarray:
|
|
"""잔차 시계열 반환"""
|
|
if self.result is None:
|
|
return np.array([])
|
|
return self.result.resid
|
|
|
|
|
|
def build_macro_zt_model(
|
|
zt_dict: Dict[int, float],
|
|
macro_df: pd.DataFrame,
|
|
method: str = "stepwise_aic",
|
|
forced_vars: Optional[List[str]] = None
|
|
) -> MacroZtModel:
|
|
"""
|
|
편의 함수: Zt 딕셔너리 + 거시 DataFrame → 회귀모형 구축
|
|
|
|
Parameters
|
|
----------
|
|
zt_dict : Dict[int, float]
|
|
{연도: Zt값}
|
|
macro_df : pd.DataFrame
|
|
index=연도, columns=거시변수
|
|
method : str
|
|
변수 선택 방법
|
|
forced_vars : List[str], optional
|
|
강제 지정 변수 (지정 시 method 무시)
|
|
|
|
Returns
|
|
-------
|
|
MacroZtModel : 적합된 모형
|
|
"""
|
|
zt_series = pd.Series(zt_dict, name="Zt")
|
|
zt_series.index.name = "YEAR"
|
|
|
|
model = MacroZtModel()
|
|
model.fit(zt_series, macro_df, method=method, forced_vars=forced_vars)
|
|
|
|
return model
|