- Replace hardcoded DEFAULT_PD_FLOORS with build_complete_pd_floor_table() (KAP bond YTM) - Fix ADF test: autolag='AIC' -> 'BIC' for small sample (N=26) robustness - Expand variable search: 40 -> 226 vars (log/diff/return/lag2), 1.9M combos - Select Model #2: HOUSING_PRICE + CREDIT_SPREAD_LAG1 + CURRENT_ACCOUNT_R - Add 6-test diagnostics table to AR1 sheet (ADF/LB/DW/BP/ARCH/Shapiro) - Add Korean variable names for transformed variables - Generate report v7 with full diagnostics
276 lines
10 KiB
Python
276 lines
10 KiB
Python
"""
|
||
시나리오 엔진: 호황/불황/중립 거시경제 시나리오 생성
|
||
|
||
IFRS 9 / ECB / Fed 방식을 참고한 3개 시나리오:
|
||
- Upside (호황): 경기 확장기 지속, 낮은 실업률, 완만한 금리
|
||
- Base (중립): IMF WEO 전망 등 기본 시나리오
|
||
- Downside (불황): 경기 침체, 높은 실업률, 신용경색
|
||
|
||
각 시나리오별 거시변수 경로 → Zt 경로 → mean-reversion 적용
|
||
|
||
참고:
|
||
- ECB (2020). "The ECB's macroprudential stress test"
|
||
- Fed (2023). "2023 Stress Test Scenarios"
|
||
- IFRS 9 B5.5.42-44 (Multiple scenarios requirement)
|
||
"""
|
||
|
||
import numpy as np
|
||
import pandas as pd
|
||
import yaml
|
||
from typing import Dict, List, Optional, Tuple
|
||
import logging
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class ScenarioEngine:
|
||
"""
|
||
시나리오 생성 및 관리 엔진
|
||
|
||
3가지 접근법 지원:
|
||
1) Z-직접 설정: Zt의 평균/표준편차에서 σ 배수로 시나리오 생성
|
||
2) 거시변수 시나리오: 거시변수 경로 → 회귀모형 → Zt 경로
|
||
3) 하이브리드: 단기(1-5년)는 거시 기반, 장기(6년+)는 Z-직접
|
||
"""
|
||
|
||
def __init__(self, config: dict):
|
||
"""
|
||
Parameters
|
||
----------
|
||
config : dict
|
||
scenarios 및 convergence 설정
|
||
"""
|
||
self.scenario_config = config.get("scenarios", {})
|
||
self.convergence_config = config.get("convergence", {})
|
||
|
||
self.pit_horizon = self.convergence_config.get("pit_horizon", 5)
|
||
self.transition_horizon = self.convergence_config.get("transition_horizon", 10)
|
||
self.mean_reversion_lambda = self.convergence_config.get("mean_reversion_lambda", 0.3)
|
||
self.total_horizon = self.convergence_config.get("total_horizon", 50)
|
||
|
||
def generate_z_paths(
|
||
self,
|
||
zt_history: Dict[int, float],
|
||
macro_model=None,
|
||
macro_scenarios: Optional[Dict[str, pd.DataFrame]] = None,
|
||
base_year: int = 2025
|
||
) -> Dict[str, np.ndarray]:
|
||
"""
|
||
시나리오별 Zt 경로 생성 (50년)
|
||
|
||
AR(1) 모형이 있으면: forecast_z_path() 사용 (macro_shocks 기반)
|
||
없으면: Z-직접 방식 (μ±kσ) fallback
|
||
|
||
Parameters
|
||
----------
|
||
zt_history : Dict[int, float]
|
||
과거 Zt 시계열
|
||
macro_model : MacroZtModel, optional
|
||
거시→Zt 회귀모형 (AR(1) 또는 OLS)
|
||
macro_scenarios : Dict[str, pd.DataFrame], optional
|
||
(OLS 모형용) 시나리오별 거시변수 경로
|
||
base_year : int
|
||
기준 연도
|
||
|
||
Returns
|
||
-------
|
||
Dict[str, np.ndarray]
|
||
{"upside": [Z_1,...,Z_50], "base": [...], "downside": [...]}
|
||
"""
|
||
zt_values = np.array(list(zt_history.values()))
|
||
z_mean = zt_values.mean()
|
||
z_std = zt_values.std()
|
||
z_last = zt_values[-1] # 마지막 관측값
|
||
|
||
logger.info(f"Zt 통계: μ={z_mean:.4f}, σ={z_std:.4f}, Z_last={z_last:.4f}")
|
||
|
||
z_paths = {}
|
||
|
||
# AR(1) 모형 사용 경로
|
||
use_ar1 = (macro_model is not None and
|
||
hasattr(macro_model, 'is_ar1') and
|
||
macro_model.is_ar1)
|
||
|
||
if use_ar1:
|
||
logger.info("AR(1)+Macro 모형으로 시나리오 경로 생성")
|
||
|
||
for scenario_name, scenario_cfg in self.scenario_config.items():
|
||
# config에서 macro_shocks 가져오기
|
||
macro_shocks = scenario_cfg.get("macro_shocks", {})
|
||
|
||
# forecast_z_path로 50년 경로 생성
|
||
z_path = macro_model.forecast_z_path(
|
||
z_last=z_last,
|
||
macro_shocks=macro_shocks,
|
||
horizon=self.total_horizon
|
||
)
|
||
|
||
z_paths[scenario_name] = z_path
|
||
|
||
logger.info(
|
||
f" {scenario_name}: Z[1]={z_path[0]:+.3f}, "
|
||
f"Z[5]={z_path[4]:+.3f}, Z[10]={z_path[9]:+.3f}, "
|
||
f"Z[50]={z_path[-1]:+.3f}"
|
||
)
|
||
else:
|
||
# Fallback: Z-직접 방식
|
||
logger.info("Z-직접 방식으로 시나리오 경로 생성 (AR(1) 없음)")
|
||
|
||
for scenario_name, scenario_cfg in self.scenario_config.items():
|
||
z_multiplier = scenario_cfg.get("z_multiplier", 0.0)
|
||
z_scenario = z_mean + z_multiplier * z_std
|
||
|
||
# OLS 거시 모형이 있으면 단기 거시 기반
|
||
if macro_model is not None and macro_scenarios is not None:
|
||
scenario_key = scenario_name
|
||
if scenario_key in macro_scenarios:
|
||
macro_path = macro_scenarios[scenario_key]
|
||
z_short = macro_model.predict(macro_path)
|
||
n_short = min(len(z_short), self.pit_horizon)
|
||
else:
|
||
z_short = np.full(self.pit_horizon, z_scenario)
|
||
n_short = self.pit_horizon
|
||
else:
|
||
z_short = np.full(self.pit_horizon, z_scenario)
|
||
n_short = self.pit_horizon
|
||
|
||
z_path = np.zeros(self.total_horizon)
|
||
|
||
# Phase 1: PIT 기간
|
||
for t in range(min(n_short, self.total_horizon)):
|
||
val = z_short[t] if t < len(z_short) else z_scenario
|
||
z_path[t] = val if np.isfinite(val) else z_scenario
|
||
|
||
# Phase 2: Mean-reversion
|
||
for t in range(self.pit_horizon, min(self.transition_horizon, self.total_horizon)):
|
||
decay = np.exp(-self.mean_reversion_lambda * (t - self.pit_horizon + 1))
|
||
z_path[t] = z_path[self.pit_horizon - 1] * decay
|
||
|
||
# Phase 3: TTC
|
||
for t in range(self.transition_horizon, self.total_horizon):
|
||
z_path[t] = 0.0
|
||
|
||
z_paths[scenario_name] = z_path
|
||
|
||
logger.info(
|
||
f" {scenario_name}: Z[1]={z_path[0]:+.3f}, "
|
||
f"Z[5]={z_path[4]:+.3f}, Z[10]={z_path[9]:+.3f}, "
|
||
f"Z[50]={z_path[-1]:+.3f}"
|
||
)
|
||
|
||
return z_paths
|
||
|
||
def generate_default_macro_scenarios(
|
||
self,
|
||
macro_history: pd.DataFrame,
|
||
base_year: int = 2025,
|
||
forecast_years: int = 5
|
||
) -> Dict[str, pd.DataFrame]:
|
||
"""
|
||
간이 거시경제 시나리오 생성
|
||
|
||
과거 데이터의 통계 특성을 기반으로 3개 시나리오 생성
|
||
|
||
Returns
|
||
-------
|
||
Dict[str, pd.DataFrame]
|
||
{"upside": DataFrame, "base": DataFrame, "downside": DataFrame}
|
||
"""
|
||
# 과거 통계
|
||
macro_mean = macro_history.mean()
|
||
macro_std = macro_history.std()
|
||
last_row = macro_history.iloc[-1]
|
||
|
||
scenarios = {}
|
||
years = list(range(base_year + 1, base_year + forecast_years + 1))
|
||
|
||
# 호황 시나리오
|
||
upside_data = {}
|
||
for col in macro_history.columns:
|
||
# 호황: GDP↑, 실업률↓, 금리 적정, 선행지수↑
|
||
if col == "UNEMPLOYMENT":
|
||
upside_data[col] = np.linspace(
|
||
last_row[col], max(macro_mean[col] - 0.5 * macro_std[col], 2.0),
|
||
forecast_years
|
||
)
|
||
elif col == "GDP_GROWTH":
|
||
upside_data[col] = np.linspace(
|
||
last_row[col], macro_mean[col] + 0.5 * macro_std[col],
|
||
forecast_years
|
||
)
|
||
elif col == "LEADING_INDEX":
|
||
upside_data[col] = np.linspace(
|
||
last_row[col], macro_mean[col] + 1.0 * macro_std[col],
|
||
forecast_years
|
||
)
|
||
else:
|
||
upside_data[col] = np.linspace(
|
||
last_row[col], macro_mean[col],
|
||
forecast_years
|
||
)
|
||
scenarios["upside"] = pd.DataFrame(upside_data, index=years)
|
||
|
||
# 중립 시나리오
|
||
base_data = {}
|
||
for col in macro_history.columns:
|
||
base_data[col] = np.linspace(
|
||
last_row[col], macro_mean[col],
|
||
forecast_years
|
||
)
|
||
scenarios["base"] = pd.DataFrame(base_data, index=years)
|
||
|
||
# 불황 시나리오
|
||
downside_data = {}
|
||
for col in macro_history.columns:
|
||
if col == "UNEMPLOYMENT":
|
||
downside_data[col] = np.linspace(
|
||
last_row[col], macro_mean[col] + 1.5 * macro_std[col],
|
||
forecast_years
|
||
)
|
||
elif col == "GDP_GROWTH":
|
||
downside_val = max(macro_mean[col] - 2.0 * macro_std[col], -3.0)
|
||
downside_data[col] = np.linspace(
|
||
last_row[col], downside_val, forecast_years
|
||
)
|
||
elif col == "LEADING_INDEX":
|
||
downside_data[col] = np.linspace(
|
||
last_row[col], macro_mean[col] - 2.0 * macro_std[col],
|
||
forecast_years
|
||
)
|
||
else:
|
||
downside_data[col] = np.linspace(
|
||
last_row[col], macro_mean[col] + 0.5 * macro_std[col],
|
||
forecast_years
|
||
)
|
||
scenarios["downside"] = pd.DataFrame(downside_data, index=years)
|
||
|
||
return scenarios
|
||
|
||
def get_scenario_weights(self) -> Dict[str, float]:
|
||
"""시나리오별 확률가중치 반환"""
|
||
weights = {}
|
||
for name, cfg in self.scenario_config.items():
|
||
weights[name] = cfg.get("weight", 1.0 / len(self.scenario_config))
|
||
|
||
# 정규화
|
||
total = sum(weights.values())
|
||
if total > 0:
|
||
weights = {k: v / total for k, v in weights.items()}
|
||
|
||
return weights
|
||
|
||
def get_scenario_names(self) -> List[str]:
|
||
"""시나리오 이름 목록"""
|
||
return list(self.scenario_config.keys())
|
||
|
||
def get_display_name(self, scenario_key: str) -> str:
|
||
"""시나리오 표시 이름"""
|
||
cfg = self.scenario_config.get(scenario_key, {})
|
||
return cfg.get("name", scenario_key)
|
||
|
||
|
||
def load_config(config_path: str = "config.yaml") -> dict:
|
||
"""설정 파일 로딩"""
|
||
with open(config_path, "r", encoding="utf-8") as f:
|
||
return yaml.safe_load(f)
|