""" 50년 Lifetime PD 산출 엔진 시나리오별 Zt 경로와 TTC 전이행렬을 결합하여: 1. 연도별 조건부 전이행렬 산출 2. 순차적 행렬 곱으로 누적 전이확률 계산 3. Marginal PD / Cumulative PD / 시나리오 가중평균 PD 산출 IFRS 9 ECL에 직접 사용 가능한 PD Term Structure 출력 참고문헌: - IFRS 9 Financial Instruments (IASB, 2014) - EBA Guidelines on IFRS 9 implementation - Basel Committee BCBS 350 (Credit Risk) """ import numpy as np import pandas as pd from typing import Dict, List, Optional, Tuple from models.vasicek import conditional_transition_matrix from data.transition_matrices import RATING_GRADES import logging logger = logging.getLogger(__name__) class LifetimePDEngine: """ 50년 Lifetime PD 산출 엔진 Process: 1. 각 연도 t에 대해 Zt로 조건부 전이행렬 TM(Zt) 산출 2. 누적 전이행렬 = TM(Z1) × TM(Z2) × ... × TM(Zt) 3. 누적 전이행렬의 D열이 누적 PD 4. 한계 PD = Cumulative PD(t) - Cumulative PD(t-1) """ def __init__( self, ttc_matrix: np.ndarray, rho: float = 0.20, rating_grades: List[str] = None ): """ Parameters ---------- ttc_matrix : np.ndarray N×N TTC 전이행렬 rho : float 자산상관계수 rating_grades : List[str] 등급 레이블 """ self.ttc_matrix = ttc_matrix self.rho = rho self.n_grades = ttc_matrix.shape[0] self.grades = rating_grades or RATING_GRADES self.non_default_grades = self.grades[:-1] # D 제외 def compute_lifetime_pd( self, z_path: np.ndarray, horizon: Optional[int] = None ) -> Dict[str, np.ndarray]: """ 단일 시나리오의 Lifetime PD 산출 Parameters ---------- z_path : np.ndarray Zt 경로 (길이 = horizon) horizon : int 예측 기간 (기본: z_path 길이) Returns ------- dict with keys: - "cumulative_pd": shape (horizon, N-1) — 등급별 누적 PD - "marginal_pd": shape (horizon, N-1) — 등급별 한계 PD - "survival_prob": shape (horizon, N-1) — 등급별 생존확률 - "conditional_tms": list of 전이행렬 (디버깅용) """ if horizon is None: horizon = len(z_path) cumulative_tm = np.eye(self.n_grades) cumulative_pds = [] conditional_tms = [] for t in range(horizon): z_t = z_path[t] if t < len(z_path) else 0.0 # 조건부 전이행렬 산출 cond_tm = conditional_transition_matrix(self.ttc_matrix, z_t, self.rho) conditional_tms.append(cond_tm) # 누적 전이행렬 cumulative_tm = cumulative_tm @ cond_tm # 누적 PD = D열 (마지막 열) cum_pd = cumulative_tm[:-1, -1].copy() cumulative_pds.append(cum_pd) cumulative_pds = np.array(cumulative_pds) # shape: (horizon, N-1) # 한계 PD marginal_pds = np.zeros_like(cumulative_pds) marginal_pds[0] = cumulative_pds[0] for t in range(1, horizon): marginal_pds[t] = np.maximum(cumulative_pds[t] - cumulative_pds[t - 1], 0.0) # 생존확률 survival_probs = 1.0 - cumulative_pds return { "cumulative_pd": cumulative_pds, "marginal_pd": marginal_pds, "survival_prob": survival_probs, "conditional_tms": conditional_tms, } def compute_all_scenarios( self, z_paths: Dict[str, np.ndarray], scenario_weights: Dict[str, float], horizon: Optional[int] = None ) -> Dict[str, any]: """ 전체 시나리오 Lifetime PD 산출 + 가중평균 Parameters ---------- z_paths : Dict[str, np.ndarray] 시나리오별 Zt 경로 scenario_weights : Dict[str, float] 시나리오별 확률가중치 horizon : int 예측 기간 Returns ------- dict with keys: - "by_scenario": {scenario: {cumulative_pd, marginal_pd, ...}} - "weighted_cumulative_pd": shape (horizon, N-1) - "weighted_marginal_pd": shape (horizon, N-1) """ results = {"by_scenario": {}} weighted_cum = None weighted_marginal = None for scenario_name, z_path in z_paths.items(): logger.info(f"시나리오 '{scenario_name}' PD 산출 중...") result = self.compute_lifetime_pd(z_path, horizon) results["by_scenario"][scenario_name] = result weight = scenario_weights.get(scenario_name, 1.0 / len(z_paths)) if weighted_cum is None: weighted_cum = weight * result["cumulative_pd"] weighted_marginal = weight * result["marginal_pd"] else: weighted_cum += weight * result["cumulative_pd"] weighted_marginal += weight * result["marginal_pd"] results["weighted_cumulative_pd"] = weighted_cum results["weighted_marginal_pd"] = weighted_marginal results["weighted_survival_prob"] = 1.0 - weighted_cum return results def format_pd_table( self, results: Dict, years: List[int] = None, scenario: str = None ) -> pd.DataFrame: """ PD 결과를 DataFrame 테이블로 포매팅 Parameters ---------- results : dict compute_all_scenarios() 결과 years : List[int] 표시할 연도 목록 (기본: 1,2,3,5,7,10,15,20,30,50) scenario : str 특정 시나리오 (None이면 가중평균) Returns ------- pd.DataFrame index=연도, columns=등급 """ if years is None: years = [1, 2, 3, 5, 7, 10, 15, 20, 30, 50] if scenario is not None: cum_pd = results["by_scenario"][scenario]["cumulative_pd"] else: cum_pd = results["weighted_cumulative_pd"] # 호라이즌 범위 내 연도만 선택 max_t = cum_pd.shape[0] valid_years = [y for y in years if y <= max_t] data = {} for y in valid_years: data[y] = cum_pd[y - 1] # 0-indexed df = pd.DataFrame(data, index=self.non_default_grades).T df.index.name = "년" return df def format_marginal_pd_table( self, results: Dict, years: List[int] = None, scenario: str = None ) -> pd.DataFrame: """한계 PD를 DataFrame으로 포매팅""" if years is None: years = [1, 2, 3, 5, 7, 10, 15, 20, 30, 50] if scenario is not None: m_pd = results["by_scenario"][scenario]["marginal_pd"] else: m_pd = results["weighted_marginal_pd"] max_t = m_pd.shape[0] valid_years = [y for y in years if y <= max_t] data = {} for y in valid_years: data[y] = m_pd[y - 1] df = pd.DataFrame(data, index=self.non_default_grades).T df.index.name = "년" return df def compute_ecl_weights( marginal_pds: np.ndarray, lgd: float = 0.45, discount_rate: float = 0.03, horizon: int = None ) -> np.ndarray: """ ECL (Expected Credit Loss) 계산 보조 함수 ECL = Σ_t [PD_marginal(t) × LGD × DF(t)] Parameters ---------- marginal_pds : np.ndarray 한계 PD 배열 (등급별) lgd : float 부도시 손실률 (LGD), 기본 45% (Basel IRB) discount_rate : float 할인율, 기본 3% Returns ------- np.ndarray : 등급별 누적 ECL """ if horizon is None: horizon = marginal_pds.shape[0] ecl = np.zeros(marginal_pds.shape[1] if marginal_pds.ndim > 1 else 1) for t in range(horizon): df = 1.0 / (1.0 + discount_rate) ** (t + 1) if marginal_pds.ndim > 1: ecl += marginal_pds[t] * lgd * df else: ecl += marginal_pds[t] * lgd * df return ecl