feat(shadow): Shadow Rating + default rate blending with Bayesian posterior (#315 #316)

This commit is contained in:
EDF Agent
2026-03-11 23:50:15 +09:00
parent 0547dfbb3a
commit 0c0b2129eb

300
src/models/shadow_rating.py Normal file
View File

@@ -0,0 +1,300 @@
"""
Shadow Rating + 등급별 부도율 산출 모듈
1) Ordered Probit 기반 Shadow Rating: DD + 재무비율 → 신용등급 추정
2) 등급별 부도율: 한국 관측 + 글로벌 벤치마크 블렌딩
3) 베이지안 보정: 표본 부족 등급에 사전 정보 활용
Usage:
python -m src.models.shadow_rating # 전체 산출
python -m src.models.shadow_rating --stats # 통계만
"""
import sys
import argparse
import yaml
import numpy as np
import pandas as pd
from datetime import datetime
from pathlib import Path
from scipy.stats import norm
from scipy.optimize import minimize
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from src.data.database import get_connection, init_db
from src.models.merton import (
DD_RATING_MAP, GLOBAL_DEFAULT_RATES, dd_to_rating
)
def load_config() -> dict:
config_path = Path(__file__).parent.parent.parent / "config" / "settings.yaml"
with open(config_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
# ============================================================
# 1. Shadow Rating: 재무비율 강화
# ============================================================
def compute_shadow_features(conn) -> pd.DataFrame:
"""Merton 결과 + 재무비율을 병합하여 Shadow Rating 입력 생성"""
query = """
SELECT
mr.ticker,
mr.DD,
mr.EDF,
mr.sigma_V,
mr.leverage as merton_leverage,
mr.dd_rating,
mr.method,
f.leverage_ratio,
f.roa,
f.interest_coverage,
f.log_assets,
f.total_assets,
f.total_equity,
f.operating_income,
f.net_income,
c.name
FROM merton_results mr
JOIN financial_data f ON mr.ticker = f.ticker
JOIN companies c ON mr.ticker = c.ticker
"""
df = pd.read_sql_query(query, conn)
# 추가 재무비율 생성
df["equity_ratio"] = df["total_equity"] / df["total_assets"].replace(0, np.nan)
df["size_score"] = df["log_assets"].rank(pct=True) # 규모 백분위
# ICR 캡핑 (극단값 처리)
df["icr_capped"] = df["interest_coverage"].clip(-10, 100)
df.loc[df["icr_capped"].isna(), "icr_capped"] = 0
# ROA 캡핑
df["roa_capped"] = df["roa"].clip(-1, 1)
df.loc[df["roa_capped"].isna(), "roa_capped"] = 0
return df
def compute_composite_score(df: pd.DataFrame) -> pd.DataFrame:
"""
DD + 재무비율 결합 Composite Score 산출
점수 높을수록 신용도 높음 (DD와 같은 방향)
"""
df = df.copy()
# 각 변수 정규화 (z-score)
def zscore(s):
mean, std = s.mean(), s.std()
if std == 0:
return pd.Series(0, index=s.index)
return (s - mean) / std
z_dd = zscore(df["DD"])
z_lev = -zscore(df["leverage_ratio"].fillna(0.5)) # 레버리지: 낮을수록 좋음
z_roa = zscore(df["roa_capped"])
z_icr = zscore(df["icr_capped"])
z_size = zscore(df["size_score"])
# 가중 합산 — DD에 가장 큰 가중치
df["composite_score"] = (
0.50 * z_dd + # Distance-to-Default (핵심)
0.15 * z_lev + # 레버리지
0.15 * z_roa + # 수익성
0.10 * z_icr + # 이자보상배율
0.10 * z_size # 규모
)
return df
def assign_shadow_rating(df: pd.DataFrame) -> pd.DataFrame:
"""
Composite Score 기반 Shadow Rating 부여
Ordered Probit 대신 Score 분위를 이용한 등급 할당:
실제 관측 등급이 거의 없는 상황에서 Ordered Probit은 추정 불가.
대안: Score 퍼센타일 기반 등급 분포 (글로벌 등급 분포와 비슷하게 맞춤)
"""
df = df.copy()
# 글로벌 등급 비중 (S&P 기준 근사)
rating_dist = {
"AAA": 0.01, "AA+": 0.02, "AA": 0.03, "AA-": 0.04,
"A+": 0.06, "A": 0.08, "A-": 0.08,
"BBB+": 0.08, "BBB": 0.10, "BBB-": 0.08,
"BB+": 0.07, "BB": 0.08, "BB-": 0.06,
"B+": 0.05, "B": 0.04, "B-": 0.03,
"CCC+": 0.02, "CCC": 0.02, "CCC-": 0.04,
}
# Score 내림차순 정렬
df = df.sort_values("composite_score", ascending=False).reset_index(drop=True)
n = len(df)
# 등급별 할당 수 계산
grade_assigns = {}
assigned = 0
grades_order = list(rating_dist.keys())
for i, grade in enumerate(grades_order):
if i == len(grades_order) - 1:
# 마지막 등급은 잔여 전부
grade_assigns[grade] = n - assigned
else:
count = max(1, round(n * rating_dist[grade]))
grade_assigns[grade] = count
assigned += grade_assigns[grade]
# 할당
idx = 0
df["shadow_rating"] = ""
for grade in grades_order:
count = grade_assigns[grade]
df.loc[idx:idx+count-1, "shadow_rating"] = grade
idx += count
# 미할당 (rounding 오차) → 마지막 등급
df.loc[df["shadow_rating"] == "", "shadow_rating"] = grades_order[-1]
return df
# ============================================================
# 2. 등급별 부도율 + 글로벌 블렌딩
# ============================================================
def compute_default_rates(df: pd.DataFrame, config: dict) -> pd.DataFrame:
"""
등급별 부도율 산출 + 글로벌 벤치마크 블렌딩 + 베이지안 보정
한국 시장에서 실제 부도율은 관측 불가 → 이론적 EDF 평균으로 대체
"""
threshold = config.get("blending", {}).get("threshold", 50)
prior_strength = config.get("blending", {}).get("bayesian_prior_strength", 50)
rating_order = list(GLOBAL_DEFAULT_RATES.keys())
results = []
for rating in rating_order:
subset = df[df["shadow_rating"] == rating]
n_firms = len(subset)
if n_firms == 0:
continue
# 한국 관측 "부도율" = 평균 EDF (이론적 부도확률)
korean_dr = subset["EDF"].mean()
# 글로벌 벤치마크
global_dr = GLOBAL_DEFAULT_RATES.get(rating, 0.01)
# 블렌딩 가중치: 표본이 많으면 한국 가중치↑
weight_kr = min(n_firms / threshold, 1.0)
blended_dr = weight_kr * korean_dr + (1 - weight_kr) * global_dr
# 베이지안 보정: Beta posterior
# Prior: Beta(alpha, beta) where mean = global_dr
alpha_prior = global_dr * prior_strength
beta_prior = (1 - global_dr) * prior_strength
# Posterior mean (n번 관측, k번 "부도" → 연속값이므로 n*korean_dr 사용)
alpha_post = alpha_prior + n_firms * korean_dr
beta_post = beta_prior + n_firms * (1 - korean_dr)
bayesian_dr = alpha_post / (alpha_post + beta_post)
results.append({
"rating_grade": rating,
"n_firms": n_firms,
"n_defaults": 0, # 실제 부도 관측 없음
"korean_dr": korean_dr,
"global_dr": global_dr,
"weight_kr": weight_kr,
"blended_dr": blended_dr,
"bayesian_dr": bayesian_dr,
})
return pd.DataFrame(results)
# ============================================================
# Main
# ============================================================
def main():
parser = argparse.ArgumentParser(description="Shadow Rating + 등급별 부도율")
parser.add_argument("--stats", action="store_true", help="통계만 출력")
args = parser.parse_args()
config = load_config()
conn = init_db()
# 1) Shadow Rating 산출
print("="*60)
print("[Shadow Rating] Composite Score 기반 등급 부여")
print("="*60)
df = compute_shadow_features(conn)
print(f" 대상 종목: {len(df)}")
df = compute_composite_score(df)
df = assign_shadow_rating(df)
# 비교: DD 기반 vs Shadow Rating
print("\n=== DD 등급 vs Shadow 등급 비교 ===")
match = (df["dd_rating"] == df["shadow_rating"]).sum()
print(f" 일치율: {match}/{len(df)} ({match/len(df)*100:.1f}%)")
# Shadow Rating 분포
print("\n=== Shadow Rating 분포 ===")
rating_order = list(GLOBAL_DEFAULT_RATES.keys())
df["shadow_rating"] = pd.Categorical(df["shadow_rating"], categories=rating_order, ordered=True)
dist = df["shadow_rating"].value_counts().sort_index()
for rating, count in dist.items():
if count > 0:
avg_dd = df[df["shadow_rating"] == rating]["DD"].mean()
avg_score = df[df["shadow_rating"] == rating]["composite_score"].mean()
print(f" {rating:5s}: {count:4d}개 | DD평균={avg_dd:6.2f} | Score={avg_score:6.2f}")
# 2) 등급별 부도율
print("\n" + "="*60)
print("[부도율] 등급별 부도율 산출 + 글로벌 블렌딩")
print("="*60)
dr_df = compute_default_rates(df, config)
print(f"\n{'등급':>5} | {'기업수':>5} | {'EDF평균':>10} | {'글로벌':>10} | {'블렌딩':>10} | {'베이지안':>10}")
print("-" * 70)
for _, row in dr_df.iterrows():
print(f" {row['rating_grade']:5s} | {row['n_firms']:5d} | {row['korean_dr']:10.6f} | "
f"{row['global_dr']:10.6f} | {row['blended_dr']:10.6f} | {row['bayesian_dr']:10.6f}")
# 3) DB 저장
if not args.stats:
base_date = df["DD"].index[0] if hasattr(df["DD"].index, '__getitem__') else datetime.now().strftime("%Y-%m-%d")
# merton_results에 shadow_rating 업데이트
for _, row in df.iterrows():
conn.execute("""
UPDATE merton_results SET dd_rating = ? WHERE ticker = ?
""", (row["shadow_rating"], row["ticker"]))
# default_rates 테이블 저장
base_date_str = datetime.now().strftime("%Y-%m-%d")
for _, row in dr_df.iterrows():
conn.execute("""
INSERT OR REPLACE INTO default_rates
(base_date, rating_grade, n_firms, n_defaults, korean_dr, global_dr, weight_kr, blended_dr, bayesian_dr)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
base_date_str, row["rating_grade"], int(row["n_firms"]), int(row["n_defaults"]),
row["korean_dr"], row["global_dr"], row["weight_kr"], row["blended_dr"], row["bayesian_dr"]
))
conn.commit()
print(f"\n → merton_results shadow_rating 업데이트: {len(df)}")
print(f" → default_rates 저장: {len(dr_df)}")
conn.close()
if __name__ == "__main__":
main()