Files
LifetimePD/projection/lifetime_pd.py
Variet Agent 3a9374c61a feat: Lifetime PD (50yr) - Belkin & Suchower + Vasicek model
- Belkin & Suchower (1998) credit cycle index (Zt) estimation via WLS
- Vasicek single-factor conditional PD/TM model
- Macro-Zt OLS regression with stepwise variable selection
- 3-scenario (boom/neutral/recession) 50yr PD projection
- Statistical validation suite (ADF, Ljung-Box, R2, ARCH)
- BOK ECOS API integration with fallback data
- Visualization module (7 chart types)
- Detailed theoretical methodology docs/methodology.md
2026-03-10 21:57:34 +09:00

282 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
50년 Lifetime PD 산출 엔진
시나리오별 Zt 경로와 TTC 전이행렬을 결합하여:
1. 연도별 조건부 전이행렬 산출
2. 순차적 행렬 곱으로 누적 전이확률 계산
3. Marginal PD / Cumulative PD / 시나리오 가중평균 PD 산출
IFRS 9 ECL에 직접 사용 가능한 PD Term Structure 출력
참고문헌:
- IFRS 9 Financial Instruments (IASB, 2014)
- EBA Guidelines on IFRS 9 implementation
- Basel Committee BCBS 350 (Credit Risk)
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Tuple
from models.vasicek import conditional_transition_matrix
from data.transition_matrices import RATING_GRADES
import logging
logger = logging.getLogger(__name__)
class LifetimePDEngine:
"""
50년 Lifetime PD 산출 엔진
Process:
1. 각 연도 t에 대해 Zt로 조건부 전이행렬 TM(Zt) 산출
2. 누적 전이행렬 = TM(Z1) × TM(Z2) × ... × TM(Zt)
3. 누적 전이행렬의 D열이 누적 PD
4. 한계 PD = Cumulative PD(t) - Cumulative PD(t-1)
"""
def __init__(
self,
ttc_matrix: np.ndarray,
rho: float = 0.20,
rating_grades: List[str] = None
):
"""
Parameters
----------
ttc_matrix : np.ndarray
N×N TTC 전이행렬
rho : float
자산상관계수
rating_grades : List[str]
등급 레이블
"""
self.ttc_matrix = ttc_matrix
self.rho = rho
self.n_grades = ttc_matrix.shape[0]
self.grades = rating_grades or RATING_GRADES
self.non_default_grades = self.grades[:-1] # D 제외
def compute_lifetime_pd(
self,
z_path: np.ndarray,
horizon: Optional[int] = None
) -> Dict[str, np.ndarray]:
"""
단일 시나리오의 Lifetime PD 산출
Parameters
----------
z_path : np.ndarray
Zt 경로 (길이 = horizon)
horizon : int
예측 기간 (기본: z_path 길이)
Returns
-------
dict with keys:
- "cumulative_pd": shape (horizon, N-1) — 등급별 누적 PD
- "marginal_pd": shape (horizon, N-1) — 등급별 한계 PD
- "survival_prob": shape (horizon, N-1) — 등급별 생존확률
- "conditional_tms": list of 전이행렬 (디버깅용)
"""
if horizon is None:
horizon = len(z_path)
cumulative_tm = np.eye(self.n_grades)
cumulative_pds = []
conditional_tms = []
for t in range(horizon):
z_t = z_path[t] if t < len(z_path) else 0.0
# 조건부 전이행렬 산출
cond_tm = conditional_transition_matrix(self.ttc_matrix, z_t, self.rho)
conditional_tms.append(cond_tm)
# 누적 전이행렬
cumulative_tm = cumulative_tm @ cond_tm
# 누적 PD = D열 (마지막 열)
cum_pd = cumulative_tm[:-1, -1].copy()
cumulative_pds.append(cum_pd)
cumulative_pds = np.array(cumulative_pds) # shape: (horizon, N-1)
# 한계 PD
marginal_pds = np.zeros_like(cumulative_pds)
marginal_pds[0] = cumulative_pds[0]
for t in range(1, horizon):
marginal_pds[t] = np.maximum(cumulative_pds[t] - cumulative_pds[t - 1], 0.0)
# 생존확률
survival_probs = 1.0 - cumulative_pds
return {
"cumulative_pd": cumulative_pds,
"marginal_pd": marginal_pds,
"survival_prob": survival_probs,
"conditional_tms": conditional_tms,
}
def compute_all_scenarios(
self,
z_paths: Dict[str, np.ndarray],
scenario_weights: Dict[str, float],
horizon: Optional[int] = None
) -> Dict[str, any]:
"""
전체 시나리오 Lifetime PD 산출 + 가중평균
Parameters
----------
z_paths : Dict[str, np.ndarray]
시나리오별 Zt 경로
scenario_weights : Dict[str, float]
시나리오별 확률가중치
horizon : int
예측 기간
Returns
-------
dict with keys:
- "by_scenario": {scenario: {cumulative_pd, marginal_pd, ...}}
- "weighted_cumulative_pd": shape (horizon, N-1)
- "weighted_marginal_pd": shape (horizon, N-1)
"""
results = {"by_scenario": {}}
weighted_cum = None
weighted_marginal = None
for scenario_name, z_path in z_paths.items():
logger.info(f"시나리오 '{scenario_name}' PD 산출 중...")
result = self.compute_lifetime_pd(z_path, horizon)
results["by_scenario"][scenario_name] = result
weight = scenario_weights.get(scenario_name, 1.0 / len(z_paths))
if weighted_cum is None:
weighted_cum = weight * result["cumulative_pd"]
weighted_marginal = weight * result["marginal_pd"]
else:
weighted_cum += weight * result["cumulative_pd"]
weighted_marginal += weight * result["marginal_pd"]
results["weighted_cumulative_pd"] = weighted_cum
results["weighted_marginal_pd"] = weighted_marginal
results["weighted_survival_prob"] = 1.0 - weighted_cum
return results
def format_pd_table(
self,
results: Dict,
years: List[int] = None,
scenario: str = None
) -> pd.DataFrame:
"""
PD 결과를 DataFrame 테이블로 포매팅
Parameters
----------
results : dict
compute_all_scenarios() 결과
years : List[int]
표시할 연도 목록 (기본: 1,2,3,5,7,10,15,20,30,50)
scenario : str
특정 시나리오 (None이면 가중평균)
Returns
-------
pd.DataFrame
index=연도, columns=등급
"""
if years is None:
years = [1, 2, 3, 5, 7, 10, 15, 20, 30, 50]
if scenario is not None:
cum_pd = results["by_scenario"][scenario]["cumulative_pd"]
else:
cum_pd = results["weighted_cumulative_pd"]
# 호라이즌 범위 내 연도만 선택
max_t = cum_pd.shape[0]
valid_years = [y for y in years if y <= max_t]
data = {}
for y in valid_years:
data[y] = cum_pd[y - 1] # 0-indexed
df = pd.DataFrame(data, index=self.non_default_grades).T
df.index.name = ""
return df
def format_marginal_pd_table(
self,
results: Dict,
years: List[int] = None,
scenario: str = None
) -> pd.DataFrame:
"""한계 PD를 DataFrame으로 포매팅"""
if years is None:
years = [1, 2, 3, 5, 7, 10, 15, 20, 30, 50]
if scenario is not None:
m_pd = results["by_scenario"][scenario]["marginal_pd"]
else:
m_pd = results["weighted_marginal_pd"]
max_t = m_pd.shape[0]
valid_years = [y for y in years if y <= max_t]
data = {}
for y in valid_years:
data[y] = m_pd[y - 1]
df = pd.DataFrame(data, index=self.non_default_grades).T
df.index.name = ""
return df
def compute_ecl_weights(
marginal_pds: np.ndarray,
lgd: float = 0.45,
discount_rate: float = 0.03,
horizon: int = None
) -> np.ndarray:
"""
ECL (Expected Credit Loss) 계산 보조 함수
ECL = Σ_t [PD_marginal(t) × LGD × DF(t)]
Parameters
----------
marginal_pds : np.ndarray
한계 PD 배열 (등급별)
lgd : float
부도시 손실률 (LGD), 기본 45% (Basel IRB)
discount_rate : float
할인율, 기본 3%
Returns
-------
np.ndarray : 등급별 누적 ECL
"""
if horizon is None:
horizon = marginal_pds.shape[0]
ecl = np.zeros(marginal_pds.shape[1] if marginal_pds.ndim > 1 else 1)
for t in range(horizon):
df = 1.0 / (1.0 + discount_rate) ** (t + 1)
if marginal_pds.ndim > 1:
ecl += marginal_pds[t] * lgd * df
else:
ecl += marginal_pds[t] * lgd * df
return ecl