From 0c0b2129eb8f2e099eb4de4206a2da8361fce10a Mon Sep 17 00:00:00 2001 From: EDF Agent Date: Wed, 11 Mar 2026 23:50:15 +0900 Subject: [PATCH] feat(shadow): Shadow Rating + default rate blending with Bayesian posterior (#315 #316) --- src/models/shadow_rating.py | 300 ++++++++++++++++++++++++++++++++++++ 1 file changed, 300 insertions(+) create mode 100644 src/models/shadow_rating.py diff --git a/src/models/shadow_rating.py b/src/models/shadow_rating.py new file mode 100644 index 0000000..e490d46 --- /dev/null +++ b/src/models/shadow_rating.py @@ -0,0 +1,300 @@ +""" +Shadow Rating + 등급별 부도율 산출 모듈 + +1) Ordered Probit 기반 Shadow Rating: DD + 재무비율 → 신용등급 추정 +2) 등급별 부도율: 한국 관측 + 글로벌 벤치마크 블렌딩 +3) 베이지안 보정: 표본 부족 등급에 사전 정보 활용 + +Usage: + python -m src.models.shadow_rating # 전체 산출 + python -m src.models.shadow_rating --stats # 통계만 +""" +import sys +import argparse +import yaml +import numpy as np +import pandas as pd +from datetime import datetime +from pathlib import Path +from scipy.stats import norm +from scipy.optimize import minimize + +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + +from src.data.database import get_connection, init_db +from src.models.merton import ( + DD_RATING_MAP, GLOBAL_DEFAULT_RATES, dd_to_rating +) + + +def load_config() -> dict: + config_path = Path(__file__).parent.parent.parent / "config" / "settings.yaml" + with open(config_path, "r", encoding="utf-8") as f: + return yaml.safe_load(f) + + +# ============================================================ +# 1. Shadow Rating: 재무비율 강화 +# ============================================================ +def compute_shadow_features(conn) -> pd.DataFrame: + """Merton 결과 + 재무비율을 병합하여 Shadow Rating 입력 생성""" + query = """ + SELECT + mr.ticker, + mr.DD, + mr.EDF, + mr.sigma_V, + mr.leverage as merton_leverage, + mr.dd_rating, + mr.method, + f.leverage_ratio, + f.roa, + f.interest_coverage, + f.log_assets, + f.total_assets, + f.total_equity, + f.operating_income, + f.net_income, + c.name + FROM merton_results mr + JOIN financial_data f ON mr.ticker = f.ticker + JOIN companies c ON mr.ticker = c.ticker + """ + df = pd.read_sql_query(query, conn) + + # 추가 재무비율 생성 + df["equity_ratio"] = df["total_equity"] / df["total_assets"].replace(0, np.nan) + df["size_score"] = df["log_assets"].rank(pct=True) # 규모 백분위 + + # ICR 캡핑 (극단값 처리) + df["icr_capped"] = df["interest_coverage"].clip(-10, 100) + df.loc[df["icr_capped"].isna(), "icr_capped"] = 0 + + # ROA 캡핑 + df["roa_capped"] = df["roa"].clip(-1, 1) + df.loc[df["roa_capped"].isna(), "roa_capped"] = 0 + + return df + + +def compute_composite_score(df: pd.DataFrame) -> pd.DataFrame: + """ + DD + 재무비율 결합 Composite Score 산출 + + 점수 높을수록 신용도 높음 (DD와 같은 방향) + """ + df = df.copy() + + # 각 변수 정규화 (z-score) + def zscore(s): + mean, std = s.mean(), s.std() + if std == 0: + return pd.Series(0, index=s.index) + return (s - mean) / std + + z_dd = zscore(df["DD"]) + z_lev = -zscore(df["leverage_ratio"].fillna(0.5)) # 레버리지: 낮을수록 좋음 + z_roa = zscore(df["roa_capped"]) + z_icr = zscore(df["icr_capped"]) + z_size = zscore(df["size_score"]) + + # 가중 합산 — DD에 가장 큰 가중치 + df["composite_score"] = ( + 0.50 * z_dd + # Distance-to-Default (핵심) + 0.15 * z_lev + # 레버리지 + 0.15 * z_roa + # 수익성 + 0.10 * z_icr + # 이자보상배율 + 0.10 * z_size # 규모 + ) + + return df + + +def assign_shadow_rating(df: pd.DataFrame) -> pd.DataFrame: + """ + Composite Score 기반 Shadow Rating 부여 + + Ordered Probit 대신 Score 분위를 이용한 등급 할당: + 실제 관측 등급이 거의 없는 상황에서 Ordered Probit은 추정 불가. + 대안: Score 퍼센타일 기반 등급 분포 (글로벌 등급 분포와 비슷하게 맞춤) + """ + df = df.copy() + + # 글로벌 등급 비중 (S&P 기준 근사) + rating_dist = { + "AAA": 0.01, "AA+": 0.02, "AA": 0.03, "AA-": 0.04, + "A+": 0.06, "A": 0.08, "A-": 0.08, + "BBB+": 0.08, "BBB": 0.10, "BBB-": 0.08, + "BB+": 0.07, "BB": 0.08, "BB-": 0.06, + "B+": 0.05, "B": 0.04, "B-": 0.03, + "CCC+": 0.02, "CCC": 0.02, "CCC-": 0.04, + } + + # Score 내림차순 정렬 + df = df.sort_values("composite_score", ascending=False).reset_index(drop=True) + n = len(df) + + # 등급별 할당 수 계산 + grade_assigns = {} + assigned = 0 + grades_order = list(rating_dist.keys()) + + for i, grade in enumerate(grades_order): + if i == len(grades_order) - 1: + # 마지막 등급은 잔여 전부 + grade_assigns[grade] = n - assigned + else: + count = max(1, round(n * rating_dist[grade])) + grade_assigns[grade] = count + assigned += grade_assigns[grade] + + # 할당 + idx = 0 + df["shadow_rating"] = "" + for grade in grades_order: + count = grade_assigns[grade] + df.loc[idx:idx+count-1, "shadow_rating"] = grade + idx += count + + # 미할당 (rounding 오차) → 마지막 등급 + df.loc[df["shadow_rating"] == "", "shadow_rating"] = grades_order[-1] + + return df + + +# ============================================================ +# 2. 등급별 부도율 + 글로벌 블렌딩 +# ============================================================ +def compute_default_rates(df: pd.DataFrame, config: dict) -> pd.DataFrame: + """ + 등급별 부도율 산출 + 글로벌 벤치마크 블렌딩 + 베이지안 보정 + + 한국 시장에서 실제 부도율은 관측 불가 → 이론적 EDF 평균으로 대체 + """ + threshold = config.get("blending", {}).get("threshold", 50) + prior_strength = config.get("blending", {}).get("bayesian_prior_strength", 50) + + rating_order = list(GLOBAL_DEFAULT_RATES.keys()) + + results = [] + for rating in rating_order: + subset = df[df["shadow_rating"] == rating] + n_firms = len(subset) + + if n_firms == 0: + continue + + # 한국 관측 "부도율" = 평균 EDF (이론적 부도확률) + korean_dr = subset["EDF"].mean() + + # 글로벌 벤치마크 + global_dr = GLOBAL_DEFAULT_RATES.get(rating, 0.01) + + # 블렌딩 가중치: 표본이 많으면 한국 가중치↑ + weight_kr = min(n_firms / threshold, 1.0) + blended_dr = weight_kr * korean_dr + (1 - weight_kr) * global_dr + + # 베이지안 보정: Beta posterior + # Prior: Beta(alpha, beta) where mean = global_dr + alpha_prior = global_dr * prior_strength + beta_prior = (1 - global_dr) * prior_strength + + # Posterior mean (n번 관측, k번 "부도" → 연속값이므로 n*korean_dr 사용) + alpha_post = alpha_prior + n_firms * korean_dr + beta_post = beta_prior + n_firms * (1 - korean_dr) + bayesian_dr = alpha_post / (alpha_post + beta_post) + + results.append({ + "rating_grade": rating, + "n_firms": n_firms, + "n_defaults": 0, # 실제 부도 관측 없음 + "korean_dr": korean_dr, + "global_dr": global_dr, + "weight_kr": weight_kr, + "blended_dr": blended_dr, + "bayesian_dr": bayesian_dr, + }) + + return pd.DataFrame(results) + + +# ============================================================ +# Main +# ============================================================ +def main(): + parser = argparse.ArgumentParser(description="Shadow Rating + 등급별 부도율") + parser.add_argument("--stats", action="store_true", help="통계만 출력") + args = parser.parse_args() + + config = load_config() + conn = init_db() + + # 1) Shadow Rating 산출 + print("="*60) + print("[Shadow Rating] Composite Score 기반 등급 부여") + print("="*60) + + df = compute_shadow_features(conn) + print(f" 대상 종목: {len(df)}개") + + df = compute_composite_score(df) + df = assign_shadow_rating(df) + + # 비교: DD 기반 vs Shadow Rating + print("\n=== DD 등급 vs Shadow 등급 비교 ===") + match = (df["dd_rating"] == df["shadow_rating"]).sum() + print(f" 일치율: {match}/{len(df)} ({match/len(df)*100:.1f}%)") + + # Shadow Rating 분포 + print("\n=== Shadow Rating 분포 ===") + rating_order = list(GLOBAL_DEFAULT_RATES.keys()) + df["shadow_rating"] = pd.Categorical(df["shadow_rating"], categories=rating_order, ordered=True) + dist = df["shadow_rating"].value_counts().sort_index() + for rating, count in dist.items(): + if count > 0: + avg_dd = df[df["shadow_rating"] == rating]["DD"].mean() + avg_score = df[df["shadow_rating"] == rating]["composite_score"].mean() + print(f" {rating:5s}: {count:4d}개 | DD평균={avg_dd:6.2f} | Score={avg_score:6.2f}") + + # 2) 등급별 부도율 + print("\n" + "="*60) + print("[부도율] 등급별 부도율 산출 + 글로벌 블렌딩") + print("="*60) + + dr_df = compute_default_rates(df, config) + + print(f"\n{'등급':>5} | {'기업수':>5} | {'EDF평균':>10} | {'글로벌':>10} | {'블렌딩':>10} | {'베이지안':>10}") + print("-" * 70) + for _, row in dr_df.iterrows(): + print(f" {row['rating_grade']:5s} | {row['n_firms']:5d} | {row['korean_dr']:10.6f} | " + f"{row['global_dr']:10.6f} | {row['blended_dr']:10.6f} | {row['bayesian_dr']:10.6f}") + + # 3) DB 저장 + if not args.stats: + base_date = df["DD"].index[0] if hasattr(df["DD"].index, '__getitem__') else datetime.now().strftime("%Y-%m-%d") + # merton_results에 shadow_rating 업데이트 + for _, row in df.iterrows(): + conn.execute(""" + UPDATE merton_results SET dd_rating = ? WHERE ticker = ? + """, (row["shadow_rating"], row["ticker"])) + + # default_rates 테이블 저장 + base_date_str = datetime.now().strftime("%Y-%m-%d") + for _, row in dr_df.iterrows(): + conn.execute(""" + INSERT OR REPLACE INTO default_rates + (base_date, rating_grade, n_firms, n_defaults, korean_dr, global_dr, weight_kr, blended_dr, bayesian_dr) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + base_date_str, row["rating_grade"], int(row["n_firms"]), int(row["n_defaults"]), + row["korean_dr"], row["global_dr"], row["weight_kr"], row["blended_dr"], row["bayesian_dr"] + )) + conn.commit() + print(f"\n → merton_results shadow_rating 업데이트: {len(df)}건") + print(f" → default_rates 저장: {len(dr_df)}건") + + conn.close() + + +if __name__ == "__main__": + main()