Compare commits
2 Commits
b8514c1251
...
a406d98226
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a406d98226 | ||
|
|
2b94cc802d |
@@ -18,14 +18,14 @@ ecos:
|
||||
# 전이행렬 데이터 소스
|
||||
data:
|
||||
transition_source: "real" # "real" (3사 실제) | "builtin" (내장 샘플)
|
||||
transition_dir: null # null이면 기본 data/real/
|
||||
transition_dir: null # null이면 기본 data/real_v2/
|
||||
|
||||
# 모형 파라미터
|
||||
model:
|
||||
# 자산상관계수 (Basel IRB 기준 0.12~0.24, 기업 평균 ~0.20)
|
||||
rho: 0.20
|
||||
# 신용등급 체계 (한국 3사 공통)
|
||||
rating_grades: ["AAA", "AA", "A", "BBB", "BB", "B", "CCC", "D"]
|
||||
rating_grades: ["AAA", "AA", "A", "BBB", "BB", "B", "D"] # 7x7 (CCC제외, Zt추정용)
|
||||
|
||||
# 시나리오 설정
|
||||
scenarios:
|
||||
|
||||
197
data/ccc_interpolator.py
Normal file
197
data/ccc_interpolator.py
Normal file
@@ -0,0 +1,197 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
CCC interpolation module: 7x7 -> 8x8
|
||||
|
||||
B and D rows/columns are used to create a synthetic CCC grade
|
||||
via geometric mean (log-interpolation) of transition probabilities.
|
||||
|
||||
This module runs AFTER Zt estimation (which uses 7x7 matrices)
|
||||
to produce the final 8x8 matrices for Lifetime PD projection.
|
||||
|
||||
Usage:
|
||||
from data.ccc_interpolator import expand_to_8x8
|
||||
tm_8x8 = expand_to_8x8(tm_7x7)
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
from typing import Optional
|
||||
|
||||
|
||||
# 7x7 index: AAA=0, AA=1, A=2, BBB=3, BB=4, B=5, D=6
|
||||
# 8x8 index: AAA=0, AA=1, A=2, BBB=3, BB=4, B=5, CCC=6, D=7
|
||||
GRADES_7 = ["AAA", "AA", "A", "BBB", "BB", "B", "D"]
|
||||
GRADES_8 = ["AAA", "AA", "A", "BBB", "BB", "B", "CCC", "D"]
|
||||
|
||||
|
||||
def expand_to_8x8(
|
||||
tm_7x7: np.ndarray,
|
||||
alpha: float = 0.5,
|
||||
method: str = "geometric"
|
||||
) -> np.ndarray:
|
||||
"""
|
||||
7x7 transition matrix -> 8x8 with CCC interpolated between B and D.
|
||||
|
||||
The CCC row is interpolated from B row and D row.
|
||||
The CCC column is created by splitting the D column for grades above CCC.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
tm_7x7 : np.ndarray
|
||||
7x7 (AAA, AA, A, BBB, BB, B, D) probability matrix
|
||||
alpha : float
|
||||
Interpolation weight (0.5 = geometric midpoint between B and D)
|
||||
method : str
|
||||
'geometric': log-interpolation (default)
|
||||
'linear': linear interpolation
|
||||
|
||||
Returns
|
||||
-------
|
||||
np.ndarray
|
||||
8x8 (AAA, AA, A, BBB, BB, B, CCC, D) probability matrix
|
||||
"""
|
||||
assert tm_7x7.shape == (7, 7), f"Expected (7,7), got {tm_7x7.shape}"
|
||||
|
||||
tm_8x8 = np.zeros((8, 8))
|
||||
|
||||
# --- Step 1: Copy existing grades (AAA~B) rows/cols ---
|
||||
# 7x7 index mapping: 0-5 -> 0-5 (AAA~B), 6 -> 7 (D)
|
||||
for i in range(6): # AAA~B rows
|
||||
for j in range(6): # AAA~B cols
|
||||
tm_8x8[i, j] = tm_7x7[i, j]
|
||||
# D col: 7x7 col6 -> 8x8 col7
|
||||
tm_8x8[i, 7] = tm_7x7[i, 6]
|
||||
|
||||
# --- Step 2: CCC column (col6) for existing grades ---
|
||||
# For each grade AAA~B, split some probability from D column to CCC
|
||||
# Rationale: some firms default through CCC before reaching D
|
||||
for i in range(6):
|
||||
pd_i = tm_7x7[i, 6] # P(i -> D) in 7x7
|
||||
if pd_i > 0:
|
||||
# B row: larger CCC fraction (B is adjacent to CCC)
|
||||
# Higher grades: smaller CCC fraction
|
||||
grade_distance_from_b = max(5 - i, 0)
|
||||
# B->CCC gets ~30%, BB->CCC ~20%, BBB->CCC ~10%, A->CCC ~5%
|
||||
ccc_fraction = max(0.30 - grade_distance_from_b * 0.06, 0.02)
|
||||
ccc_prob = pd_i * ccc_fraction
|
||||
tm_8x8[i, 6] = ccc_prob # to CCC
|
||||
tm_8x8[i, 7] = pd_i - ccc_prob # remaining to D
|
||||
else:
|
||||
tm_8x8[i, 6] = 0.0
|
||||
|
||||
# --- Step 3: CCC row (row 6) via interpolation ---
|
||||
b_row = np.zeros(8)
|
||||
d_row = np.zeros(8)
|
||||
|
||||
# Expand B row (7x7 row5) to 8x8 space
|
||||
for j in range(6):
|
||||
b_row[j] = tm_7x7[5, j]
|
||||
b_row[6] = 0.0 # placeholder for CCC
|
||||
b_row[7] = tm_7x7[5, 6]
|
||||
|
||||
# D row in 8x8: absorbing state
|
||||
d_row[7] = 1.0
|
||||
|
||||
if method == "geometric":
|
||||
# Geometric interpolation in log space
|
||||
ccc_row = _geometric_interp(b_row, d_row, alpha)
|
||||
else:
|
||||
# Linear interpolation
|
||||
ccc_row = alpha * b_row + (1 - alpha) * d_row
|
||||
|
||||
# Ensure CCC PD is between B PD and 1.0
|
||||
# CCC should default more than B
|
||||
ccc_pd = max(ccc_row[7], b_row[7] * 1.5)
|
||||
ccc_pd = min(ccc_pd, 0.60) # cap at 60%
|
||||
|
||||
# CCC stay rate
|
||||
ccc_stay = max(1.0 - ccc_pd - ccc_row[:6].sum() - ccc_row[6], 0.30)
|
||||
|
||||
# Reassemble CCC row
|
||||
# Upgrade probabilities from B row, scaled down
|
||||
for j in range(5): # AAA~BB: very small upgrade from CCC
|
||||
ccc_row[j] = b_row[j] * 0.3 # CCC upgrades less than B
|
||||
|
||||
ccc_row[5] = b_row[5] * 0.5 # CCC -> B (upgrade)
|
||||
ccc_row[6] = ccc_stay # CCC -> CCC (stay)
|
||||
ccc_row[7] = ccc_pd # CCC -> D
|
||||
|
||||
tm_8x8[6, :] = ccc_row
|
||||
|
||||
# --- Step 4: D row (absorbing state) ---
|
||||
tm_8x8[7, :] = 0.0
|
||||
tm_8x8[7, 7] = 1.0
|
||||
|
||||
# --- Step 5: Normalize rows ---
|
||||
for i in range(8):
|
||||
s = tm_8x8[i].sum()
|
||||
if s > 0:
|
||||
tm_8x8[i] /= s
|
||||
|
||||
return tm_8x8
|
||||
|
||||
|
||||
def _geometric_interp(
|
||||
row_a: np.ndarray,
|
||||
row_b: np.ndarray,
|
||||
alpha: float = 0.5,
|
||||
eps: float = 1e-10
|
||||
) -> np.ndarray:
|
||||
"""Geometric (log-space) interpolation between two probability rows."""
|
||||
result = np.zeros_like(row_a)
|
||||
for j in range(len(row_a)):
|
||||
a = max(row_a[j], eps)
|
||||
b = max(row_b[j], eps)
|
||||
result[j] = np.exp(alpha * np.log(a) + (1 - alpha) * np.log(b))
|
||||
return result
|
||||
|
||||
|
||||
def expand_conditional_tm(
|
||||
cond_7x7: np.ndarray,
|
||||
ttc_8x8: np.ndarray = None
|
||||
) -> np.ndarray:
|
||||
"""
|
||||
Expand a Z-conditional 7x7 TM to 8x8 using the same interpolation.
|
||||
|
||||
This is used in the lifetime PD projection pipeline:
|
||||
1. Estimate Zt from 7x7 matrices
|
||||
2. Generate Z-conditional 7x7 TM
|
||||
3. Expand to 8x8 for lifetime PD calculation
|
||||
|
||||
Parameters
|
||||
----------
|
||||
cond_7x7 : np.ndarray
|
||||
Z-conditional 7x7 transition matrix
|
||||
ttc_8x8 : np.ndarray, optional
|
||||
Reference TTC 8x8 for CCC structure (if available)
|
||||
"""
|
||||
return expand_to_8x8(cond_7x7)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
sys.path.insert(0, ".")
|
||||
|
||||
from data.transition_matrices import load_transition_matrices, compute_ttc_matrix
|
||||
|
||||
matrices = load_transition_matrices(source="real")
|
||||
ttc_7x7 = compute_ttc_matrix(matrices)
|
||||
|
||||
print("=== TTC 7x7 ===")
|
||||
for i, g in enumerate(GRADES_7):
|
||||
print(f" {g:>4}: [{', '.join(f'{v:.4f}' for v in ttc_7x7[i])}]")
|
||||
|
||||
ttc_8x8 = expand_to_8x8(ttc_7x7)
|
||||
|
||||
print("\n=== TTC 8x8 (CCC interpolated) ===")
|
||||
for i, g in enumerate(GRADES_8):
|
||||
print(f" {g:>4}: [{', '.join(f'{v:.4f}' for v in ttc_8x8[i])}]")
|
||||
|
||||
# Verify: PD ordering
|
||||
print("\n=== PD ordering check ===")
|
||||
for i, g in enumerate(GRADES_8[:-1]):
|
||||
print(f" {g:>4}: PD = {ttc_8x8[i, -1]*10000:.1f}bp")
|
||||
|
||||
# Check row sums
|
||||
print("\n=== Row sum check ===")
|
||||
for i in range(8):
|
||||
print(f" {GRADES_8[i]:>4}: sum = {ttc_8x8[i].sum():.6f}")
|
||||
@@ -15,8 +15,12 @@ from pathlib import Path
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
|
||||
# 등급 레이블
|
||||
RATING_GRADES = ["AAA", "AA", "A", "BBB", "BB", "B", "CCC", "D"]
|
||||
# Rating grade labels
|
||||
RATING_GRADES_8 = ["AAA", "AA", "A", "BBB", "BB", "B", "CCC", "D"]
|
||||
RATING_GRADES_7 = ["AAA", "AA", "A", "BBB", "BB", "B", "D"]
|
||||
|
||||
# Default: 7x7 (CCC excluded for Zt estimation)
|
||||
RATING_GRADES = RATING_GRADES_7
|
||||
N_GRADES = len(RATING_GRADES)
|
||||
|
||||
|
||||
@@ -205,7 +209,7 @@ def _load_real_matrices(data_dir: Optional[str] = None) -> Dict[int, np.ndarray]
|
||||
parse_pdf_matrices.py 로 생성된 3사 평균 CSV 사용.
|
||||
"""
|
||||
if data_dir is None:
|
||||
data_dir = str(Path(__file__).parent / "real")
|
||||
data_dir = str(Path(__file__).parent / "real_v2")
|
||||
real_dir = Path(data_dir)
|
||||
|
||||
if not real_dir.exists():
|
||||
@@ -320,7 +324,11 @@ def get_default_rates(matrices: Dict[int, np.ndarray]) -> pd.DataFrame:
|
||||
index=연도, columns=등급, values=연간 PD
|
||||
"""
|
||||
years = sorted(matrices.keys())
|
||||
grades = RATING_GRADES[:-1] # D 제외
|
||||
n = list(matrices.values())[0].shape[0]
|
||||
if n == 7:
|
||||
grades = RATING_GRADES_7[:-1] # AAA~B (D 제외)
|
||||
else:
|
||||
grades = RATING_GRADES_8[:-1] # AAA~CCC (D 제외)
|
||||
|
||||
data = {}
|
||||
for year in years:
|
||||
@@ -332,10 +340,15 @@ def get_default_rates(matrices: Dict[int, np.ndarray]) -> pd.DataFrame:
|
||||
|
||||
def display_matrix(tm: np.ndarray, title: str = "전이행렬") -> str:
|
||||
"""전이행렬을 보기 좋게 포매팅"""
|
||||
n = tm.shape[0]
|
||||
if n == 7:
|
||||
grades = RATING_GRADES_7
|
||||
else:
|
||||
grades = RATING_GRADES_8
|
||||
df = pd.DataFrame(
|
||||
tm,
|
||||
index=RATING_GRADES,
|
||||
columns=RATING_GRADES
|
||||
index=grades,
|
||||
columns=grades
|
||||
)
|
||||
# 백분율 표시
|
||||
df_pct = df * 100
|
||||
|
||||
Reference in New Issue
Block a user