feat(data): implement market-implied PD floor and 7x7 transition matrix parsing #task-290

This commit is contained in:
Variet Agent
2026-03-11 15:53:38 +09:00
parent 0762fcc5d8
commit b8514c1251
235 changed files with 1729 additions and 1102 deletions

325
data/pd_floor.py Normal file
View File

@@ -0,0 +1,325 @@
"""
시장 YTM 기반 PD 플로어 산출 모듈
KAP(한국자산평가) 등급별 회사채 수익률에서 신용스프레드를 추출하고,
시장 내재 부도율(PD)을 산출한 뒤, AAA=5bp 앵커링 + 로그 스케일링으로
각 등급의 PD 플로어를 결정합니다.
핵심 공식:
PD_raw = spread / (1 - recovery_rate) [LGD = 1 - RR]
scale = log(anchor_AAA) / log(PD_raw_AAA)
PD_floor(g) = exp(log(PD_raw(g)) × scale)
참고:
- Basel III CRE30.4: 기업 PD 플로어 5bp (0.05%)
- 한국 시장 회수율: 약 40% (LGD = 60%)
- docs/pd_floor_reference.md 참조
"""
import numpy as np
import logging
from typing import Dict, Optional, Tuple, List
logger = logging.getLogger(__name__)
# 기본 파라미터
DEFAULT_RECOVERY_RATE = 0.40 # 회수율 40% → LGD 60%
DEFAULT_AAA_ANCHOR = 0.0005 # 5bp = 0.05% = 0.0005
DEFAULT_LGD = 1 - DEFAULT_RECOVERY_RATE # 0.60
# 대등급 순서 (인덱스 기반 외삽에 사용)
BROAD_GRADE_ORDER = ["AAA", "AA", "A", "BBB", "BB", "B"]
BROAD_GRADE_INDEX = {g: i for i, g in enumerate(BROAD_GRADE_ORDER)}
def compute_market_implied_pd(
spread_bp: float,
lgd: float = DEFAULT_LGD
) -> float:
"""
신용스프레드에서 시장 내재 PD 산출
PD ≈ spread / LGD
Parameters
----------
spread_bp : float
신용스프레드 (bp, 1bp = 0.01%)
lgd : float
Loss Given Default (기본: 0.60)
Returns
-------
float
시장 내재 PD (소수, 예: 0.000727 = 7.27bp)
"""
spread_decimal = spread_bp / 10000 # bp → 소수
return spread_decimal / lgd
def compute_pd_floors(
spreads_bp: Dict[str, float],
anchor_aaa: float = DEFAULT_AAA_ANCHOR,
lgd: float = DEFAULT_LGD
) -> Dict[str, float]:
"""
AAA=5bp 앵커링 + 로그 스케일링으로 등급별 PD 플로어 산출
Parameters
----------
spreads_bp : Dict[str, float]
등급별 신용스프레드 (bp). 대등급(AAA, AA, A, BBB) 기준.
anchor_aaa : float
AAA 앵커 PD (기본: 5bp = 0.0005)
lgd : float
Loss Given Default (기본: 0.60)
Returns
-------
Dict[str, float]
등급별 PD 플로어 (소수). 예: {'AAA': 0.0005, 'AA': 0.00057, ...}
"""
# Step 1: 시장 내재 PD 산출
pd_raw = {}
for grade, spread in spreads_bp.items():
pd_raw[grade] = compute_market_implied_pd(spread, lgd)
if "AAA" not in pd_raw or pd_raw["AAA"] <= 0:
raise ValueError("AAA 스프레드가 0 이하입니다. 데이터를 확인하세요.")
# Step 2: 로그 스케일링
# scale = log(anchor) / log(pd_raw_AAA)
log_anchor = np.log(anchor_aaa)
log_aaa_raw = np.log(pd_raw["AAA"])
scale = log_anchor / log_aaa_raw
pd_floors = {}
for grade, pd_val in pd_raw.items():
pd_floors[grade] = np.exp(np.log(pd_val) * scale)
logger.info(f"PD 플로어 스케일링: scale={scale:.4f}, "
f"AAA raw={pd_raw['AAA']*10000:.1f}bp → floor={pd_floors['AAA']*10000:.1f}bp")
return pd_floors
def extrapolate_speculative_grades(
pd_floors: Dict[str, float],
grades_to_extrapolate: List[str] = ["BB", "B"]
) -> Dict[str, float]:
"""
투자적격등급(AAA~BBB)의 로그 트렌드를 외삽하여 투기등급 PD 산출
log(PD) vs grade_index의 선형 트렌드를 BBB 이후로 연장합니다.
Parameters
----------
pd_floors : Dict[str, float]
투자적격등급 PD 플로어 (AAA~BBB)
grades_to_extrapolate : List[str]
외삽할 등급 (기본: ["BB", "B"])
Returns
-------
Dict[str, float]
투기등급이 추가된 PD 플로어
"""
# 기존 투자적격 등급의 log(PD) vs index 관계
available = []
for g in BROAD_GRADE_ORDER:
if g in pd_floors:
available.append((BROAD_GRADE_INDEX[g], np.log(pd_floors[g])))
if len(available) < 2:
raise ValueError("외삽을 위해 최소 2개 등급의 PD가 필요합니다.")
# 선형 회귀: log(PD) = a × index + b
x = np.array([a[0] for a in available])
y = np.array([a[1] for a in available])
coeffs = np.polyfit(x, y, 1) # [기울기, 절편]
result = pd_floors.copy()
for grade in grades_to_extrapolate:
if grade in BROAD_GRADE_INDEX:
idx = BROAD_GRADE_INDEX[grade]
log_pd = coeffs[0] * idx + coeffs[1]
result[grade] = np.exp(log_pd)
logger.info(f"외삽 PD [{grade}]: {result[grade]*10000:.1f}bp "
f"(index={idx}, log_pd={log_pd:.4f})")
return result
def interpolate_ccc_pd(
pd_b: float,
pd_d: float = 1.0,
method: str = "geometric"
) -> Dict[str, float]:
"""
B와 D(=100%) 사이를 보간하여 CCC/CC/C 등급 PD 산출
Parameters
----------
pd_b : float
B등급 PD (소수)
pd_d : float
D등급 PD (기본: 1.0 = 100%)
method : str
'geometric': 기하평균
'log': 로그 등간격 보간
Returns
-------
Dict[str, float]
{'CCC': ..., 'CC': ..., 'C': ...}
"""
if method == "geometric":
# B → CCC → CC → C → D 순서로 기하적 등간격
# 4개 구간 (B→CCC, CCC→CC, CC→C, C→D)
log_b = np.log(pd_b)
log_d = np.log(pd_d)
step = (log_d - log_b) / 4
ccc_pd = np.exp(log_b + step * 1)
cc_pd = np.exp(log_b + step * 2)
c_pd = np.exp(log_b + step * 3)
else: # log-linear
log_b = np.log(pd_b)
log_d = np.log(pd_d)
step = (log_d - log_b) / 4
ccc_pd = np.exp(log_b + step * 1)
cc_pd = np.exp(log_b + step * 2)
c_pd = np.exp(log_b + step * 3)
return {"CCC": ccc_pd, "CC": cc_pd, "C": c_pd}
def compute_notch_pd_ratios(
notch_spreads: Dict[str, float],
lgd: float = DEFAULT_LGD
) -> Dict[str, Dict[str, float]]:
"""
노치등급 간 PD 비율 산출 (나중에 등급 쪼개기에 사용)
예: AA+ : AA : AA- = 0.91 : 1.00 : 1.07 (AA 대비 비율)
Parameters
----------
notch_spreads : Dict[str, float]
노치등급별 스프레드 (bp)
Returns
-------
Dict[str, Dict[str, float]]
대등급별 {노치등급: 비율} 딕셔너리
예: {'AA': {'AA+': 0.91, 'AA': 1.00, 'AA-': 1.07}}
"""
from data.ytm_fetcher import NOTCH_TO_BROAD
from collections import defaultdict
# 대등급별 노치 그룹핑
groups = defaultdict(dict)
for notch, spread in notch_spreads.items():
broad = NOTCH_TO_BROAD.get(notch, notch)
groups[broad][notch] = compute_market_implied_pd(spread, lgd)
# 중간 노치 대비 비율 계산
ratios = {}
for broad, notch_pds in groups.items():
if len(notch_pds) == 1:
key = list(notch_pds.keys())[0]
ratios[broad] = {key: 1.0}
else:
# 중간 노치(plain) 기준
mid_key = broad # AA+ → AA가 중간
if mid_key in notch_pds:
mid_pd = notch_pds[mid_key]
else:
mid_pd = sorted(notch_pds.values())[len(notch_pds) // 2]
ratios[broad] = {
notch: pd / mid_pd for notch, pd in notch_pds.items()
}
return ratios
def build_complete_pd_floor_table(
date: str = "2025-12-31",
anchor_aaa: float = DEFAULT_AAA_ANCHOR,
recovery_rate: float = DEFAULT_RECOVERY_RATE
) -> Tuple[Dict[str, float], Dict[str, float], Dict[str, Dict[str, float]]]:
"""
전체 PD 플로어 테이블 생성 (원스톱)
Parameters
----------
date : str
YTM 기준일
anchor_aaa : float
AAA 앵커 PD (기본: 5bp)
recovery_rate : float
회수율 (기본: 0.40)
Returns
-------
Tuple of:
broad_floors : Dict[str, float] - 대등급 PD 플로어 (AAA~B+D)
notch_ratios : Dict[str, Dict[str, float]] - 노치 비율
full_floors : Dict[str, float] - CCC/CC/C 포함 전체 PD (AAA~C+D)
"""
from data.ytm_fetcher import get_ytm_data, compute_spreads, compute_broad_grade_spreads
lgd = 1 - recovery_rate
# 1) YTM 데이터 수집
ytm_data = get_ytm_data(date)
notch_spreads = compute_spreads(ytm_data)
broad_spreads = compute_broad_grade_spreads(notch_spreads)
logger.info(f"대등급 스프레드: {broad_spreads}")
# 2) PD 플로어 산출 (투자적격)
pd_floors = compute_pd_floors(broad_spreads, anchor_aaa, lgd)
# 3) 투기등급 외삽
pd_floors = extrapolate_speculative_grades(pd_floors, ["BB", "B"])
# 4) CCC/CC/C 보간
ccc_pds = interpolate_ccc_pd(pd_floors["B"], pd_d=1.0, method="geometric")
full_floors = {**pd_floors, **ccc_pds, "D": 1.0}
# 5) 노치 비율
notch_ratios = compute_notch_pd_ratios(notch_spreads, lgd)
return pd_floors, notch_ratios, full_floors
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(message)s")
print("=" * 60)
print(" PD 플로어 산출 (KAP YTM 기반)")
print("=" * 60)
broad_floors, notch_ratios, full_floors = build_complete_pd_floor_table()
print("\n=== 대등급 PD 플로어 ===")
for grade in BROAD_GRADE_ORDER:
if grade in broad_floors:
print(f" {grade:>4}: {broad_floors[grade]*10000:8.1f}bp "
f"({broad_floors[grade]*100:.4f}%)")
print("\n=== 전체 PD 플로어 (CCC/CC/C 포함) ===")
order = ["AAA", "AA", "A", "BBB", "BB", "B", "CCC", "CC", "C", "D"]
for grade in order:
if grade in full_floors:
print(f" {grade:>4}: {full_floors[grade]*10000:8.1f}bp "
f"({full_floors[grade]*100:.4f}%)")
print("\n=== 노치 PD 비율 ===")
for broad, ratios in notch_ratios.items():
print(f" {broad}:")
for notch, ratio in sorted(ratios.items()):
print(f" {notch:>5}: ×{ratio:.3f}")