- Belkin & Suchower (1998) credit cycle index (Zt) estimation via WLS - Vasicek single-factor conditional PD/TM model - Macro-Zt OLS regression with stepwise variable selection - 3-scenario (boom/neutral/recession) 50yr PD projection - Statistical validation suite (ADF, Ljung-Box, R2, ARCH) - BOK ECOS API integration with fallback data - Visualization module (7 chart types) - Detailed theoretical methodology docs/methodology.md
282 lines
8.4 KiB
Python
282 lines
8.4 KiB
Python
"""
|
||
50년 Lifetime PD 산출 엔진
|
||
|
||
시나리오별 Zt 경로와 TTC 전이행렬을 결합하여:
|
||
1. 연도별 조건부 전이행렬 산출
|
||
2. 순차적 행렬 곱으로 누적 전이확률 계산
|
||
3. Marginal PD / Cumulative PD / 시나리오 가중평균 PD 산출
|
||
|
||
IFRS 9 ECL에 직접 사용 가능한 PD Term Structure 출력
|
||
|
||
참고문헌:
|
||
- IFRS 9 Financial Instruments (IASB, 2014)
|
||
- EBA Guidelines on IFRS 9 implementation
|
||
- Basel Committee BCBS 350 (Credit Risk)
|
||
"""
|
||
|
||
import numpy as np
|
||
import pandas as pd
|
||
from typing import Dict, List, Optional, Tuple
|
||
from models.vasicek import conditional_transition_matrix
|
||
from data.transition_matrices import RATING_GRADES
|
||
import logging
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class LifetimePDEngine:
|
||
"""
|
||
50년 Lifetime PD 산출 엔진
|
||
|
||
Process:
|
||
1. 각 연도 t에 대해 Zt로 조건부 전이행렬 TM(Zt) 산출
|
||
2. 누적 전이행렬 = TM(Z1) × TM(Z2) × ... × TM(Zt)
|
||
3. 누적 전이행렬의 D열이 누적 PD
|
||
4. 한계 PD = Cumulative PD(t) - Cumulative PD(t-1)
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
ttc_matrix: np.ndarray,
|
||
rho: float = 0.20,
|
||
rating_grades: List[str] = None
|
||
):
|
||
"""
|
||
Parameters
|
||
----------
|
||
ttc_matrix : np.ndarray
|
||
N×N TTC 전이행렬
|
||
rho : float
|
||
자산상관계수
|
||
rating_grades : List[str]
|
||
등급 레이블
|
||
"""
|
||
self.ttc_matrix = ttc_matrix
|
||
self.rho = rho
|
||
self.n_grades = ttc_matrix.shape[0]
|
||
self.grades = rating_grades or RATING_GRADES
|
||
self.non_default_grades = self.grades[:-1] # D 제외
|
||
|
||
def compute_lifetime_pd(
|
||
self,
|
||
z_path: np.ndarray,
|
||
horizon: Optional[int] = None
|
||
) -> Dict[str, np.ndarray]:
|
||
"""
|
||
단일 시나리오의 Lifetime PD 산출
|
||
|
||
Parameters
|
||
----------
|
||
z_path : np.ndarray
|
||
Zt 경로 (길이 = horizon)
|
||
horizon : int
|
||
예측 기간 (기본: z_path 길이)
|
||
|
||
Returns
|
||
-------
|
||
dict with keys:
|
||
- "cumulative_pd": shape (horizon, N-1) — 등급별 누적 PD
|
||
- "marginal_pd": shape (horizon, N-1) — 등급별 한계 PD
|
||
- "survival_prob": shape (horizon, N-1) — 등급별 생존확률
|
||
- "conditional_tms": list of 전이행렬 (디버깅용)
|
||
"""
|
||
if horizon is None:
|
||
horizon = len(z_path)
|
||
|
||
cumulative_tm = np.eye(self.n_grades)
|
||
cumulative_pds = []
|
||
conditional_tms = []
|
||
|
||
for t in range(horizon):
|
||
z_t = z_path[t] if t < len(z_path) else 0.0
|
||
|
||
# 조건부 전이행렬 산출
|
||
cond_tm = conditional_transition_matrix(self.ttc_matrix, z_t, self.rho)
|
||
conditional_tms.append(cond_tm)
|
||
|
||
# 누적 전이행렬
|
||
cumulative_tm = cumulative_tm @ cond_tm
|
||
|
||
# 누적 PD = D열 (마지막 열)
|
||
cum_pd = cumulative_tm[:-1, -1].copy()
|
||
cumulative_pds.append(cum_pd)
|
||
|
||
cumulative_pds = np.array(cumulative_pds) # shape: (horizon, N-1)
|
||
|
||
# 한계 PD
|
||
marginal_pds = np.zeros_like(cumulative_pds)
|
||
marginal_pds[0] = cumulative_pds[0]
|
||
for t in range(1, horizon):
|
||
marginal_pds[t] = np.maximum(cumulative_pds[t] - cumulative_pds[t - 1], 0.0)
|
||
|
||
# 생존확률
|
||
survival_probs = 1.0 - cumulative_pds
|
||
|
||
return {
|
||
"cumulative_pd": cumulative_pds,
|
||
"marginal_pd": marginal_pds,
|
||
"survival_prob": survival_probs,
|
||
"conditional_tms": conditional_tms,
|
||
}
|
||
|
||
def compute_all_scenarios(
|
||
self,
|
||
z_paths: Dict[str, np.ndarray],
|
||
scenario_weights: Dict[str, float],
|
||
horizon: Optional[int] = None
|
||
) -> Dict[str, any]:
|
||
"""
|
||
전체 시나리오 Lifetime PD 산출 + 가중평균
|
||
|
||
Parameters
|
||
----------
|
||
z_paths : Dict[str, np.ndarray]
|
||
시나리오별 Zt 경로
|
||
scenario_weights : Dict[str, float]
|
||
시나리오별 확률가중치
|
||
horizon : int
|
||
예측 기간
|
||
|
||
Returns
|
||
-------
|
||
dict with keys:
|
||
- "by_scenario": {scenario: {cumulative_pd, marginal_pd, ...}}
|
||
- "weighted_cumulative_pd": shape (horizon, N-1)
|
||
- "weighted_marginal_pd": shape (horizon, N-1)
|
||
"""
|
||
results = {"by_scenario": {}}
|
||
|
||
weighted_cum = None
|
||
weighted_marginal = None
|
||
|
||
for scenario_name, z_path in z_paths.items():
|
||
logger.info(f"시나리오 '{scenario_name}' PD 산출 중...")
|
||
|
||
result = self.compute_lifetime_pd(z_path, horizon)
|
||
results["by_scenario"][scenario_name] = result
|
||
|
||
weight = scenario_weights.get(scenario_name, 1.0 / len(z_paths))
|
||
|
||
if weighted_cum is None:
|
||
weighted_cum = weight * result["cumulative_pd"]
|
||
weighted_marginal = weight * result["marginal_pd"]
|
||
else:
|
||
weighted_cum += weight * result["cumulative_pd"]
|
||
weighted_marginal += weight * result["marginal_pd"]
|
||
|
||
results["weighted_cumulative_pd"] = weighted_cum
|
||
results["weighted_marginal_pd"] = weighted_marginal
|
||
results["weighted_survival_prob"] = 1.0 - weighted_cum
|
||
|
||
return results
|
||
|
||
def format_pd_table(
|
||
self,
|
||
results: Dict,
|
||
years: List[int] = None,
|
||
scenario: str = None
|
||
) -> pd.DataFrame:
|
||
"""
|
||
PD 결과를 DataFrame 테이블로 포매팅
|
||
|
||
Parameters
|
||
----------
|
||
results : dict
|
||
compute_all_scenarios() 결과
|
||
years : List[int]
|
||
표시할 연도 목록 (기본: 1,2,3,5,7,10,15,20,30,50)
|
||
scenario : str
|
||
특정 시나리오 (None이면 가중평균)
|
||
|
||
Returns
|
||
-------
|
||
pd.DataFrame
|
||
index=연도, columns=등급
|
||
"""
|
||
if years is None:
|
||
years = [1, 2, 3, 5, 7, 10, 15, 20, 30, 50]
|
||
|
||
if scenario is not None:
|
||
cum_pd = results["by_scenario"][scenario]["cumulative_pd"]
|
||
else:
|
||
cum_pd = results["weighted_cumulative_pd"]
|
||
|
||
# 호라이즌 범위 내 연도만 선택
|
||
max_t = cum_pd.shape[0]
|
||
valid_years = [y for y in years if y <= max_t]
|
||
|
||
data = {}
|
||
for y in valid_years:
|
||
data[y] = cum_pd[y - 1] # 0-indexed
|
||
|
||
df = pd.DataFrame(data, index=self.non_default_grades).T
|
||
df.index.name = "년"
|
||
|
||
return df
|
||
|
||
def format_marginal_pd_table(
|
||
self,
|
||
results: Dict,
|
||
years: List[int] = None,
|
||
scenario: str = None
|
||
) -> pd.DataFrame:
|
||
"""한계 PD를 DataFrame으로 포매팅"""
|
||
if years is None:
|
||
years = [1, 2, 3, 5, 7, 10, 15, 20, 30, 50]
|
||
|
||
if scenario is not None:
|
||
m_pd = results["by_scenario"][scenario]["marginal_pd"]
|
||
else:
|
||
m_pd = results["weighted_marginal_pd"]
|
||
|
||
max_t = m_pd.shape[0]
|
||
valid_years = [y for y in years if y <= max_t]
|
||
|
||
data = {}
|
||
for y in valid_years:
|
||
data[y] = m_pd[y - 1]
|
||
|
||
df = pd.DataFrame(data, index=self.non_default_grades).T
|
||
df.index.name = "년"
|
||
|
||
return df
|
||
|
||
|
||
def compute_ecl_weights(
|
||
marginal_pds: np.ndarray,
|
||
lgd: float = 0.45,
|
||
discount_rate: float = 0.03,
|
||
horizon: int = None
|
||
) -> np.ndarray:
|
||
"""
|
||
ECL (Expected Credit Loss) 계산 보조 함수
|
||
|
||
ECL = Σ_t [PD_marginal(t) × LGD × DF(t)]
|
||
|
||
Parameters
|
||
----------
|
||
marginal_pds : np.ndarray
|
||
한계 PD 배열 (등급별)
|
||
lgd : float
|
||
부도시 손실률 (LGD), 기본 45% (Basel IRB)
|
||
discount_rate : float
|
||
할인율, 기본 3%
|
||
|
||
Returns
|
||
-------
|
||
np.ndarray : 등급별 누적 ECL
|
||
"""
|
||
if horizon is None:
|
||
horizon = marginal_pds.shape[0]
|
||
|
||
ecl = np.zeros(marginal_pds.shape[1] if marginal_pds.ndim > 1 else 1)
|
||
|
||
for t in range(horizon):
|
||
df = 1.0 / (1.0 + discount_rate) ** (t + 1)
|
||
if marginal_pds.ndim > 1:
|
||
ecl += marginal_pds[t] * lgd * df
|
||
else:
|
||
ecl += marginal_pds[t] * lgd * df
|
||
|
||
return ecl
|