feat: Lifetime PD (50yr) - Belkin & Suchower + Vasicek model

- Belkin & Suchower (1998) credit cycle index (Zt) estimation via WLS
- Vasicek single-factor conditional PD/TM model
- Macro-Zt OLS regression with stepwise variable selection
- 3-scenario (boom/neutral/recession) 50yr PD projection
- Statistical validation suite (ADF, Ljung-Box, R2, ARCH)
- BOK ECOS API integration with fallback data
- Visualization module (7 chart types)
- Detailed theoretical methodology docs/methodology.md
This commit is contained in:
Variet Agent
2026-03-10 21:57:34 +09:00
commit 3a9374c61a
39 changed files with 4671 additions and 0 deletions

281
projection/lifetime_pd.py Normal file
View File

@@ -0,0 +1,281 @@
"""
50년 Lifetime PD 산출 엔진
시나리오별 Zt 경로와 TTC 전이행렬을 결합하여:
1. 연도별 조건부 전이행렬 산출
2. 순차적 행렬 곱으로 누적 전이확률 계산
3. Marginal PD / Cumulative PD / 시나리오 가중평균 PD 산출
IFRS 9 ECL에 직접 사용 가능한 PD Term Structure 출력
참고문헌:
- IFRS 9 Financial Instruments (IASB, 2014)
- EBA Guidelines on IFRS 9 implementation
- Basel Committee BCBS 350 (Credit Risk)
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Tuple
from models.vasicek import conditional_transition_matrix
from data.transition_matrices import RATING_GRADES
import logging
logger = logging.getLogger(__name__)
class LifetimePDEngine:
"""
50년 Lifetime PD 산출 엔진
Process:
1. 각 연도 t에 대해 Zt로 조건부 전이행렬 TM(Zt) 산출
2. 누적 전이행렬 = TM(Z1) × TM(Z2) × ... × TM(Zt)
3. 누적 전이행렬의 D열이 누적 PD
4. 한계 PD = Cumulative PD(t) - Cumulative PD(t-1)
"""
def __init__(
self,
ttc_matrix: np.ndarray,
rho: float = 0.20,
rating_grades: List[str] = None
):
"""
Parameters
----------
ttc_matrix : np.ndarray
N×N TTC 전이행렬
rho : float
자산상관계수
rating_grades : List[str]
등급 레이블
"""
self.ttc_matrix = ttc_matrix
self.rho = rho
self.n_grades = ttc_matrix.shape[0]
self.grades = rating_grades or RATING_GRADES
self.non_default_grades = self.grades[:-1] # D 제외
def compute_lifetime_pd(
self,
z_path: np.ndarray,
horizon: Optional[int] = None
) -> Dict[str, np.ndarray]:
"""
단일 시나리오의 Lifetime PD 산출
Parameters
----------
z_path : np.ndarray
Zt 경로 (길이 = horizon)
horizon : int
예측 기간 (기본: z_path 길이)
Returns
-------
dict with keys:
- "cumulative_pd": shape (horizon, N-1) — 등급별 누적 PD
- "marginal_pd": shape (horizon, N-1) — 등급별 한계 PD
- "survival_prob": shape (horizon, N-1) — 등급별 생존확률
- "conditional_tms": list of 전이행렬 (디버깅용)
"""
if horizon is None:
horizon = len(z_path)
cumulative_tm = np.eye(self.n_grades)
cumulative_pds = []
conditional_tms = []
for t in range(horizon):
z_t = z_path[t] if t < len(z_path) else 0.0
# 조건부 전이행렬 산출
cond_tm = conditional_transition_matrix(self.ttc_matrix, z_t, self.rho)
conditional_tms.append(cond_tm)
# 누적 전이행렬
cumulative_tm = cumulative_tm @ cond_tm
# 누적 PD = D열 (마지막 열)
cum_pd = cumulative_tm[:-1, -1].copy()
cumulative_pds.append(cum_pd)
cumulative_pds = np.array(cumulative_pds) # shape: (horizon, N-1)
# 한계 PD
marginal_pds = np.zeros_like(cumulative_pds)
marginal_pds[0] = cumulative_pds[0]
for t in range(1, horizon):
marginal_pds[t] = np.maximum(cumulative_pds[t] - cumulative_pds[t - 1], 0.0)
# 생존확률
survival_probs = 1.0 - cumulative_pds
return {
"cumulative_pd": cumulative_pds,
"marginal_pd": marginal_pds,
"survival_prob": survival_probs,
"conditional_tms": conditional_tms,
}
def compute_all_scenarios(
self,
z_paths: Dict[str, np.ndarray],
scenario_weights: Dict[str, float],
horizon: Optional[int] = None
) -> Dict[str, any]:
"""
전체 시나리오 Lifetime PD 산출 + 가중평균
Parameters
----------
z_paths : Dict[str, np.ndarray]
시나리오별 Zt 경로
scenario_weights : Dict[str, float]
시나리오별 확률가중치
horizon : int
예측 기간
Returns
-------
dict with keys:
- "by_scenario": {scenario: {cumulative_pd, marginal_pd, ...}}
- "weighted_cumulative_pd": shape (horizon, N-1)
- "weighted_marginal_pd": shape (horizon, N-1)
"""
results = {"by_scenario": {}}
weighted_cum = None
weighted_marginal = None
for scenario_name, z_path in z_paths.items():
logger.info(f"시나리오 '{scenario_name}' PD 산출 중...")
result = self.compute_lifetime_pd(z_path, horizon)
results["by_scenario"][scenario_name] = result
weight = scenario_weights.get(scenario_name, 1.0 / len(z_paths))
if weighted_cum is None:
weighted_cum = weight * result["cumulative_pd"]
weighted_marginal = weight * result["marginal_pd"]
else:
weighted_cum += weight * result["cumulative_pd"]
weighted_marginal += weight * result["marginal_pd"]
results["weighted_cumulative_pd"] = weighted_cum
results["weighted_marginal_pd"] = weighted_marginal
results["weighted_survival_prob"] = 1.0 - weighted_cum
return results
def format_pd_table(
self,
results: Dict,
years: List[int] = None,
scenario: str = None
) -> pd.DataFrame:
"""
PD 결과를 DataFrame 테이블로 포매팅
Parameters
----------
results : dict
compute_all_scenarios() 결과
years : List[int]
표시할 연도 목록 (기본: 1,2,3,5,7,10,15,20,30,50)
scenario : str
특정 시나리오 (None이면 가중평균)
Returns
-------
pd.DataFrame
index=연도, columns=등급
"""
if years is None:
years = [1, 2, 3, 5, 7, 10, 15, 20, 30, 50]
if scenario is not None:
cum_pd = results["by_scenario"][scenario]["cumulative_pd"]
else:
cum_pd = results["weighted_cumulative_pd"]
# 호라이즌 범위 내 연도만 선택
max_t = cum_pd.shape[0]
valid_years = [y for y in years if y <= max_t]
data = {}
for y in valid_years:
data[y] = cum_pd[y - 1] # 0-indexed
df = pd.DataFrame(data, index=self.non_default_grades).T
df.index.name = ""
return df
def format_marginal_pd_table(
self,
results: Dict,
years: List[int] = None,
scenario: str = None
) -> pd.DataFrame:
"""한계 PD를 DataFrame으로 포매팅"""
if years is None:
years = [1, 2, 3, 5, 7, 10, 15, 20, 30, 50]
if scenario is not None:
m_pd = results["by_scenario"][scenario]["marginal_pd"]
else:
m_pd = results["weighted_marginal_pd"]
max_t = m_pd.shape[0]
valid_years = [y for y in years if y <= max_t]
data = {}
for y in valid_years:
data[y] = m_pd[y - 1]
df = pd.DataFrame(data, index=self.non_default_grades).T
df.index.name = ""
return df
def compute_ecl_weights(
marginal_pds: np.ndarray,
lgd: float = 0.45,
discount_rate: float = 0.03,
horizon: int = None
) -> np.ndarray:
"""
ECL (Expected Credit Loss) 계산 보조 함수
ECL = Σ_t [PD_marginal(t) × LGD × DF(t)]
Parameters
----------
marginal_pds : np.ndarray
한계 PD 배열 (등급별)
lgd : float
부도시 손실률 (LGD), 기본 45% (Basel IRB)
discount_rate : float
할인율, 기본 3%
Returns
-------
np.ndarray : 등급별 누적 ECL
"""
if horizon is None:
horizon = marginal_pds.shape[0]
ecl = np.zeros(marginal_pds.shape[1] if marginal_pds.ndim > 1 else 1)
for t in range(horizon):
df = 1.0 / (1.0 + discount_rate) ** (t + 1)
if marginal_pds.ndim > 1:
ecl += marginal_pds[t] * lgd * df
else:
ecl += marginal_pds[t] * lgd * df
return ecl