diff --git a/data/ccc_interpolator.py b/data/ccc_interpolator.py new file mode 100644 index 0000000..c8aae97 --- /dev/null +++ b/data/ccc_interpolator.py @@ -0,0 +1,197 @@ +# -*- coding: utf-8 -*- +""" +CCC interpolation module: 7x7 -> 8x8 + +B and D rows/columns are used to create a synthetic CCC grade +via geometric mean (log-interpolation) of transition probabilities. + +This module runs AFTER Zt estimation (which uses 7x7 matrices) +to produce the final 8x8 matrices for Lifetime PD projection. + +Usage: + from data.ccc_interpolator import expand_to_8x8 + tm_8x8 = expand_to_8x8(tm_7x7) +""" + +import numpy as np +from typing import Optional + + +# 7x7 index: AAA=0, AA=1, A=2, BBB=3, BB=4, B=5, D=6 +# 8x8 index: AAA=0, AA=1, A=2, BBB=3, BB=4, B=5, CCC=6, D=7 +GRADES_7 = ["AAA", "AA", "A", "BBB", "BB", "B", "D"] +GRADES_8 = ["AAA", "AA", "A", "BBB", "BB", "B", "CCC", "D"] + + +def expand_to_8x8( + tm_7x7: np.ndarray, + alpha: float = 0.5, + method: str = "geometric" +) -> np.ndarray: + """ + 7x7 transition matrix -> 8x8 with CCC interpolated between B and D. + + The CCC row is interpolated from B row and D row. + The CCC column is created by splitting the D column for grades above CCC. + + Parameters + ---------- + tm_7x7 : np.ndarray + 7x7 (AAA, AA, A, BBB, BB, B, D) probability matrix + alpha : float + Interpolation weight (0.5 = geometric midpoint between B and D) + method : str + 'geometric': log-interpolation (default) + 'linear': linear interpolation + + Returns + ------- + np.ndarray + 8x8 (AAA, AA, A, BBB, BB, B, CCC, D) probability matrix + """ + assert tm_7x7.shape == (7, 7), f"Expected (7,7), got {tm_7x7.shape}" + + tm_8x8 = np.zeros((8, 8)) + + # --- Step 1: Copy existing grades (AAA~B) rows/cols --- + # 7x7 index mapping: 0-5 -> 0-5 (AAA~B), 6 -> 7 (D) + for i in range(6): # AAA~B rows + for j in range(6): # AAA~B cols + tm_8x8[i, j] = tm_7x7[i, j] + # D col: 7x7 col6 -> 8x8 col7 + tm_8x8[i, 7] = tm_7x7[i, 6] + + # --- Step 2: CCC column (col6) for existing grades --- + # For each grade AAA~B, split some probability from D column to CCC + # Rationale: some firms default through CCC before reaching D + for i in range(6): + pd_i = tm_7x7[i, 6] # P(i -> D) in 7x7 + if pd_i > 0: + # B row: larger CCC fraction (B is adjacent to CCC) + # Higher grades: smaller CCC fraction + grade_distance_from_b = max(5 - i, 0) + # B->CCC gets ~30%, BB->CCC ~20%, BBB->CCC ~10%, A->CCC ~5% + ccc_fraction = max(0.30 - grade_distance_from_b * 0.06, 0.02) + ccc_prob = pd_i * ccc_fraction + tm_8x8[i, 6] = ccc_prob # to CCC + tm_8x8[i, 7] = pd_i - ccc_prob # remaining to D + else: + tm_8x8[i, 6] = 0.0 + + # --- Step 3: CCC row (row 6) via interpolation --- + b_row = np.zeros(8) + d_row = np.zeros(8) + + # Expand B row (7x7 row5) to 8x8 space + for j in range(6): + b_row[j] = tm_7x7[5, j] + b_row[6] = 0.0 # placeholder for CCC + b_row[7] = tm_7x7[5, 6] + + # D row in 8x8: absorbing state + d_row[7] = 1.0 + + if method == "geometric": + # Geometric interpolation in log space + ccc_row = _geometric_interp(b_row, d_row, alpha) + else: + # Linear interpolation + ccc_row = alpha * b_row + (1 - alpha) * d_row + + # Ensure CCC PD is between B PD and 1.0 + # CCC should default more than B + ccc_pd = max(ccc_row[7], b_row[7] * 1.5) + ccc_pd = min(ccc_pd, 0.60) # cap at 60% + + # CCC stay rate + ccc_stay = max(1.0 - ccc_pd - ccc_row[:6].sum() - ccc_row[6], 0.30) + + # Reassemble CCC row + # Upgrade probabilities from B row, scaled down + for j in range(5): # AAA~BB: very small upgrade from CCC + ccc_row[j] = b_row[j] * 0.3 # CCC upgrades less than B + + ccc_row[5] = b_row[5] * 0.5 # CCC -> B (upgrade) + ccc_row[6] = ccc_stay # CCC -> CCC (stay) + ccc_row[7] = ccc_pd # CCC -> D + + tm_8x8[6, :] = ccc_row + + # --- Step 4: D row (absorbing state) --- + tm_8x8[7, :] = 0.0 + tm_8x8[7, 7] = 1.0 + + # --- Step 5: Normalize rows --- + for i in range(8): + s = tm_8x8[i].sum() + if s > 0: + tm_8x8[i] /= s + + return tm_8x8 + + +def _geometric_interp( + row_a: np.ndarray, + row_b: np.ndarray, + alpha: float = 0.5, + eps: float = 1e-10 +) -> np.ndarray: + """Geometric (log-space) interpolation between two probability rows.""" + result = np.zeros_like(row_a) + for j in range(len(row_a)): + a = max(row_a[j], eps) + b = max(row_b[j], eps) + result[j] = np.exp(alpha * np.log(a) + (1 - alpha) * np.log(b)) + return result + + +def expand_conditional_tm( + cond_7x7: np.ndarray, + ttc_8x8: np.ndarray = None +) -> np.ndarray: + """ + Expand a Z-conditional 7x7 TM to 8x8 using the same interpolation. + + This is used in the lifetime PD projection pipeline: + 1. Estimate Zt from 7x7 matrices + 2. Generate Z-conditional 7x7 TM + 3. Expand to 8x8 for lifetime PD calculation + + Parameters + ---------- + cond_7x7 : np.ndarray + Z-conditional 7x7 transition matrix + ttc_8x8 : np.ndarray, optional + Reference TTC 8x8 for CCC structure (if available) + """ + return expand_to_8x8(cond_7x7) + + +if __name__ == "__main__": + import sys + sys.path.insert(0, ".") + + from data.transition_matrices import load_transition_matrices, compute_ttc_matrix + + matrices = load_transition_matrices(source="real") + ttc_7x7 = compute_ttc_matrix(matrices) + + print("=== TTC 7x7 ===") + for i, g in enumerate(GRADES_7): + print(f" {g:>4}: [{', '.join(f'{v:.4f}' for v in ttc_7x7[i])}]") + + ttc_8x8 = expand_to_8x8(ttc_7x7) + + print("\n=== TTC 8x8 (CCC interpolated) ===") + for i, g in enumerate(GRADES_8): + print(f" {g:>4}: [{', '.join(f'{v:.4f}' for v in ttc_8x8[i])}]") + + # Verify: PD ordering + print("\n=== PD ordering check ===") + for i, g in enumerate(GRADES_8[:-1]): + print(f" {g:>4}: PD = {ttc_8x8[i, -1]*10000:.1f}bp") + + # Check row sums + print("\n=== Row sum check ===") + for i in range(8): + print(f" {GRADES_8[i]:>4}: sum = {ttc_8x8[i].sum():.6f}")