""" 거시경제 변수 ↔ Zt 연계 AR(1) + Macro 모형 Zt(신용사이클 인덱스)를 거시경제변수와 자기회귀 항으로 설명하는 모형. AR(1) + Macro 모형: Z(t) = c + phi*Z(t-1) + beta1*X1(t) + beta2*X2(t) + beta3*X3(t) + eps(t) 방법론 참고: - Moody's Analytics: Z-score macro regression scenario forecast - Zanders Group: Vasicek Z macro regression PiT transition matrix - EBA/ECB: Forward-looking macro overlay on Z-index - IFRS 9 B5.5.42-44 """ import numpy as np import pandas as pd import statsmodels.api as sm from statsmodels.stats.diagnostic import het_breuschpagan, acorr_ljungbox from statsmodels.stats.stattools import durbin_watson from statsmodels.stats.outliers_influence import variance_inflation_factor from scipy import stats from typing import Dict, List, Optional, Tuple import logging import warnings logger = logging.getLogger(__name__) warnings.filterwarnings("ignore", category=FutureWarning) class MacroZtModel: """ 거시경제변수 → Zt 회귀모형 Features: - OLS 다중회귀 - 변수 선택 (Stepwise AIC/BIC) - 잔차 진단 (ADF, Ljung-Box, Breusch-Pagan, DW) - VIF 다중공선성 체크 - 시나리오별 Zt 예측 """ def __init__(self): self.model = None self.result = None self.selected_vars = None self.scaler_params = {} # AR(1) attributes self.ar1_phi = None self.ar1_const = None self.ar1_beta = None self.ar1_sigma_eps = None self.ar1_macro_stats = {} self.is_ar1 = False self._ar1_var_names = None def fit( self, zt_series: pd.Series, macro_data: pd.DataFrame, method: str = "stepwise_aic", standardize: bool = False, forced_vars: Optional[List[str]] = None ) -> "MacroZtModel": """ Zt ~ 거시변수 회귀모형 적합 Parameters ---------- zt_series : pd.Series index=연도, values=Zt 추정값 macro_data : pd.DataFrame index=연도, columns=거시변수 method : str 변수 선택 방법: - "all": 모든 변수 사용 - "stepwise_aic": Forward stepwise (AIC 기준) - "stepwise_bic": Forward stepwise (BIC 기준) standardize : bool 거시변수 표준화 여부 Returns ------- self """ # 인덱스 정렬 및 교집합 common_years = sorted(set(zt_series.index) & set(macro_data.index)) if len(common_years) < 5: raise ValueError(f"공통 데이터 포인트가 부족합니다: {len(common_years)}개") y = zt_series.loc[common_years].values.astype(float) X = macro_data.loc[common_years].copy() # 결측치 처리 X = X.ffill().bfill().dropna(axis=1) # 표준화 if standardize: for col in X.columns: mean = X[col].mean() std = X[col].std() if std > 0: self.scaler_params[col] = {"mean": mean, "std": std} X[col] = (X[col] - mean) / std else: X = X.drop(columns=[col]) # 변수 선택 if forced_vars: available = [v for v in forced_vars if v in X.columns] if len(available) != len(forced_vars): missing = set(forced_vars) - set(available) logger.warning(f"강제 지정 변수 중 누락: {missing}") self.selected_vars = available logger.info(f"강제 지정 변수 사용: {self.selected_vars}") elif method == "all": self.selected_vars = list(X.columns) elif method.startswith("stepwise"): criterion = "aic" if "aic" in method else "bic" self.selected_vars = self._stepwise_selection(y, X, criterion) else: self.selected_vars = list(X.columns) if not self.selected_vars: logger.warning("변수 선택 결과 선택된 변수가 없습니다. 전체 변수 사용.") self.selected_vars = list(X.columns) # 최종 모형 적합 X_selected = sm.add_constant(X[self.selected_vars].values) self.model = sm.OLS(y, X_selected) self.result = self.model.fit() logger.info(f"회귀모형 적합 완료: 선택변수 = {self.selected_vars}") logger.info(f" R² = {self.result.rsquared:.4f}, " f"Adj.R² = {self.result.rsquared_adj:.4f}, " f"AIC = {self.result.aic:.2f}") return self def _stepwise_selection( self, y: np.ndarray, X: pd.DataFrame, criterion: str = "aic" ) -> List[str]: """Forward Stepwise 변수 선택""" remaining = list(X.columns) selected = [] current_score = np.inf while remaining: scores = {} for var in remaining: trial_vars = selected + [var] X_trial = sm.add_constant(X[trial_vars].values) try: model = sm.OLS(y, X_trial).fit() score = model.aic if criterion == "aic" else model.bic scores[var] = score except Exception: continue if not scores: break best_var = min(scores, key=scores.get) best_score = scores[best_var] if best_score < current_score: selected.append(best_var) remaining.remove(best_var) current_score = best_score logger.debug(f" + {best_var} ({criterion.upper()} = {best_score:.2f})") else: break return selected def predict(self, macro_scenario: pd.DataFrame) -> np.ndarray: """ 거시 시나리오로 Zt 예측 Parameters ---------- macro_scenario : pd.DataFrame columns에 selected_vars가 포함되어야 함 Returns ------- np.ndarray : Zt 예측값 배열 """ if self.result is None: raise ValueError("모형이 적합되지 않았습니다. fit()을 먼저 실행하세요.") X = macro_scenario[self.selected_vars].copy() # 학습 데이터와 동일한 표준화 적용 for col in X.columns: if col in self.scaler_params: mean = self.scaler_params[col]["mean"] std = self.scaler_params[col]["std"] X[col] = (X[col] - mean) / std X_const = sm.add_constant(X.values, has_constant="add") return self.result.predict(X_const) def diagnostics(self) -> Dict[str, any]: """ 회귀 모형 진단 결과 반환 Returns ------- dict with keys: - r_squared, adj_r_squared - f_stat, f_pvalue - aic, bic - durbin_watson - ljung_box (p-value) - breusch_pagan (p-value) - vif (각 변수별) - coefficients (DataFrame) """ if self.result is None: return {} diag = { "r_squared": self.result.rsquared, "adj_r_squared": self.result.rsquared_adj, "f_stat": self.result.fvalue, "f_pvalue": self.result.f_pvalue, "aic": self.result.aic, "bic": self.result.bic, "n_obs": int(self.result.nobs), "selected_vars": self.selected_vars, } # Durbin-Watson residuals = self.result.resid diag["durbin_watson"] = durbin_watson(residuals) # Ljung-Box (자기상관 검정) try: lb_result = acorr_ljungbox(residuals, lags=[5], return_df=True) diag["ljung_box_stat"] = lb_result["lb_stat"].values[0] diag["ljung_box_pvalue"] = lb_result["lb_pvalue"].values[0] except Exception: diag["ljung_box_pvalue"] = np.nan # Breusch-Pagan (이분산 검정) try: bp_stat, bp_pvalue, _, _ = het_breuschpagan( residuals, self.result.model.exog ) diag["breusch_pagan_stat"] = bp_stat diag["breusch_pagan_pvalue"] = bp_pvalue except Exception: diag["breusch_pagan_pvalue"] = np.nan # VIF (다중공선성) try: X = self.result.model.exog vif_values = {} if self.is_ar1 and self._ar1_var_names: var_names = self._ar1_var_names else: var_names = ["const"] + self.selected_vars for i in range(min(X.shape[1], len(var_names))): vif_values[var_names[i]] = variance_inflation_factor(X, i) diag["vif"] = vif_values except Exception: diag["vif"] = {} # 계수 요약 if self.is_ar1 and self._ar1_var_names: coef_names = self._ar1_var_names else: coef_names = ["const"] + self.selected_vars coef_df = pd.DataFrame({ "변수": coef_names, "계수": self.result.params, "표준오차": self.result.bse, "t값": self.result.tvalues, "p값": self.result.pvalues, }) diag["coefficients"] = coef_df return diag def summary(self) -> str: """모형 요약 출력""" if self.result is None: return "모형이 적합되지 않았습니다." return str(self.result.summary()) def residual_series(self) -> np.ndarray: """잔차 시계열 반환""" if self.result is None: return np.array([]) return self.result.resid # ================================================================ # AR(1) + Macro 모형 # ================================================================ def fit_ar1( self, zt_series: pd.Series, macro_data: pd.DataFrame, forced_vars: Optional[List[str]] = None ) -> "MacroZtModel": """ AR(1) + Macro 모형 적합 Z(t) = c + phi*Z(t-1) + beta1*X1(t) + beta2*X2(t) + beta3*X3(t) + eps(t) Z(t-1)을 설명변수에 포함하여 OLS로 추정합니다. Parameters ---------- zt_series : pd.Series index=연도, values=Zt 추정값 macro_data : pd.DataFrame index=연도, columns=거시변수 forced_vars : List[str], optional 강제 지정 거시변수 """ self.is_ar1 = True # 인덱스 정렬 및 교집합 common_years = sorted(set(zt_series.index) & set(macro_data.index)) if len(common_years) < 5: raise ValueError(f"공통 데이터 포인트가 부족합니다: {len(common_years)}개") # AR(1) 구성: Z(t)와 Z(t-1) 쌍 zt_full = zt_series.loc[common_years].sort_index() years = list(zt_full.index) # t-1이 필요하므로 첫 해 제외 y_years = years[1:] y = zt_full.loc[y_years].values.astype(float) z_lag = zt_full.loc[years[:-1]].values.astype(float) # 거시변수 X_macro = macro_data.loc[y_years].copy() X_macro = X_macro.ffill().bfill().dropna(axis=1) # 변수 선택 if forced_vars: available = [v for v in forced_vars if v in X_macro.columns] if len(available) != len(forced_vars): missing = set(forced_vars) - set(available) logger.warning(f"AR(1) 강제 지정 변수 중 누락: {missing}") self.selected_vars = available else: self.selected_vars = list(X_macro.columns) # 거시변수 표본 통계 저장 (시나리오 충격용) for col in self.selected_vars: col_data = macro_data[col].dropna() self.ar1_macro_stats[col] = { "mean": float(col_data.mean()), "std": float(col_data.std()), "last": float(col_data.iloc[-1]) if len(col_data) > 0 else 0.0 } # 거시변수 표준화 (mean=0, std=1) # → β가 "1σ 충격의 Z 영향"으로 직접 해석됨 # → 절편 c가 장기 평균 Z 수준을 결정 X_std = X_macro[self.selected_vars].copy() for col in self.selected_vars: mean = self.ar1_macro_stats[col]["mean"] std = self.ar1_macro_stats[col]["std"] if std > 0: X_std[col] = (X_std[col] - mean) / std else: X_std[col] = 0.0 # 설계행렬: [const, Z(t-1), X1_std(t), X2_std(t), X3_std(t)] X_design = np.column_stack([ z_lag, X_std.values ]) X_with_const = sm.add_constant(X_design) # OLS 적합 self.model = sm.OLS(y, X_with_const) self.result = self.model.fit() # AR(1) 파라미터 추출 # params: [const, phi, beta1, beta2, ...] params = self.result.params self.ar1_const = params[0] self.ar1_phi = params[1] self.ar1_beta = {} for i, col in enumerate(self.selected_vars): self.ar1_beta[col] = params[2 + i] self.ar1_sigma_eps = float(np.std(self.result.resid)) # 변수명 저장 (진단용) self._ar1_var_names = ["const", "Z_lag1"] + self.selected_vars # 정상성 체크 if abs(self.ar1_phi) > 0 and abs(self.ar1_phi) < 1: half_life = np.log(2) / abs(np.log(abs(self.ar1_phi))) else: half_life = np.inf logger.info(f"AR(1)+Macro 적합 완료:") logger.info(f" phi = {self.ar1_phi:.4f} (반감기 = {half_life:.1f}년)") logger.info(f" c = {self.ar1_const:.4f}") for col, beta in self.ar1_beta.items(): logger.info(f" beta({col}) = {beta:+.6f}") logger.info(f" R2 = {self.result.rsquared:.4f}, " f"Adj.R2 = {self.result.rsquared_adj:.4f}") if abs(self.ar1_phi) >= 1.0: logger.warning(f" phi={self.ar1_phi:.4f} >= 1.0 -> non-stationary!") return self def forecast_z_path( self, z_last: float, macro_shocks: Dict[str, float], horizon: int = 50 ) -> np.ndarray: """ AR(1) 모형으로 미래 Z 경로 생성 t=1: Z = c + phi*Z(t0) + sum(beta_i * shock_i * sigma_i) t>=2: Z = c + phi*Z(t-1) (거시 충격 없음, AR 감쇠만) Parameters ---------- z_last : float 마지막 관측 Z(t0) macro_shocks : Dict[str, float] 변수별 충격 (sigma 배수) 예: {"USDKRW": +1.5, "RETAIL_SALES": -1.5} horizon : int 예측 기간 (년) Returns ------- np.ndarray : [Z(t0+1), ..., Z(t0+horizon)] """ if not self.is_ar1: raise ValueError("AR(1) 모형이 적합되지 않았습니다.") z_path = np.zeros(horizon) z_prev = z_last for t in range(horizon): z_next = self.ar1_const + self.ar1_phi * z_prev # t=0 (첫 해)만 거시 충격 적용 # β는 표준화된 변수 기준이므로 shock_sigma가 곧 β의 배수 if t == 0: for var, shock_sigma in macro_shocks.items(): if var in self.ar1_beta: beta = self.ar1_beta[var] z_next += beta * shock_sigma z_path[t] = z_next z_prev = z_next return z_path def build_macro_zt_model( zt_dict: Dict[int, float], macro_df: pd.DataFrame, method: str = "ar1_macro", forced_vars: Optional[List[str]] = None ) -> MacroZtModel: """ 편의 함수: Zt + 거시 DataFrame -> 회귀모형 구축 Parameters ---------- zt_dict : {연도: Zt값} macro_df : index=연도, columns=거시변수 method : "ar1_macro" (기본) | "stepwise_aic" | "all" forced_vars : 강제 지정 변수 """ zt_series = pd.Series(zt_dict, name="Zt") zt_series.index.name = "YEAR" model = MacroZtModel() if method == "ar1_macro": model.fit_ar1(zt_series, macro_df, forced_vars=forced_vars) else: model.fit(zt_series, macro_df, method=method, forced_vars=forced_vars) return model