Files
LifetimePD/data/pd_floor.py

326 lines
9.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
시장 YTM 기반 PD 플로어 산출 모듈
KAP(한국자산평가) 등급별 회사채 수익률에서 신용스프레드를 추출하고,
시장 내재 부도율(PD)을 산출한 뒤, AAA=5bp 앵커링 + 로그 스케일링으로
각 등급의 PD 플로어를 결정합니다.
핵심 공식:
PD_raw = spread / (1 - recovery_rate) [LGD = 1 - RR]
scale = log(anchor_AAA) / log(PD_raw_AAA)
PD_floor(g) = exp(log(PD_raw(g)) × scale)
참고:
- Basel III CRE30.4: 기업 PD 플로어 5bp (0.05%)
- 한국 시장 회수율: 약 40% (LGD = 60%)
- docs/pd_floor_reference.md 참조
"""
import numpy as np
import logging
from typing import Dict, Optional, Tuple, List
logger = logging.getLogger(__name__)
# 기본 파라미터
DEFAULT_RECOVERY_RATE = 0.40 # 회수율 40% → LGD 60%
DEFAULT_AAA_ANCHOR = 0.0005 # 5bp = 0.05% = 0.0005
DEFAULT_LGD = 1 - DEFAULT_RECOVERY_RATE # 0.60
# 대등급 순서 (인덱스 기반 외삽에 사용)
BROAD_GRADE_ORDER = ["AAA", "AA", "A", "BBB", "BB", "B"]
BROAD_GRADE_INDEX = {g: i for i, g in enumerate(BROAD_GRADE_ORDER)}
def compute_market_implied_pd(
spread_bp: float,
lgd: float = DEFAULT_LGD
) -> float:
"""
신용스프레드에서 시장 내재 PD 산출
PD ≈ spread / LGD
Parameters
----------
spread_bp : float
신용스프레드 (bp, 1bp = 0.01%)
lgd : float
Loss Given Default (기본: 0.60)
Returns
-------
float
시장 내재 PD (소수, 예: 0.000727 = 7.27bp)
"""
spread_decimal = spread_bp / 10000 # bp → 소수
return spread_decimal / lgd
def compute_pd_floors(
spreads_bp: Dict[str, float],
anchor_aaa: float = DEFAULT_AAA_ANCHOR,
lgd: float = DEFAULT_LGD
) -> Dict[str, float]:
"""
AAA=5bp 앵커링 + 로그 스케일링으로 등급별 PD 플로어 산출
Parameters
----------
spreads_bp : Dict[str, float]
등급별 신용스프레드 (bp). 대등급(AAA, AA, A, BBB) 기준.
anchor_aaa : float
AAA 앵커 PD (기본: 5bp = 0.0005)
lgd : float
Loss Given Default (기본: 0.60)
Returns
-------
Dict[str, float]
등급별 PD 플로어 (소수). 예: {'AAA': 0.0005, 'AA': 0.00057, ...}
"""
# Step 1: 시장 내재 PD 산출
pd_raw = {}
for grade, spread in spreads_bp.items():
pd_raw[grade] = compute_market_implied_pd(spread, lgd)
if "AAA" not in pd_raw or pd_raw["AAA"] <= 0:
raise ValueError("AAA 스프레드가 0 이하입니다. 데이터를 확인하세요.")
# Step 2: 로그 스케일링
# scale = log(anchor) / log(pd_raw_AAA)
log_anchor = np.log(anchor_aaa)
log_aaa_raw = np.log(pd_raw["AAA"])
scale = log_anchor / log_aaa_raw
pd_floors = {}
for grade, pd_val in pd_raw.items():
pd_floors[grade] = np.exp(np.log(pd_val) * scale)
logger.info(f"PD 플로어 스케일링: scale={scale:.4f}, "
f"AAA raw={pd_raw['AAA']*10000:.1f}bp → floor={pd_floors['AAA']*10000:.1f}bp")
return pd_floors
def extrapolate_speculative_grades(
pd_floors: Dict[str, float],
grades_to_extrapolate: List[str] = ["BB", "B"]
) -> Dict[str, float]:
"""
투자적격등급(AAA~BBB)의 로그 트렌드를 외삽하여 투기등급 PD 산출
log(PD) vs grade_index의 선형 트렌드를 BBB 이후로 연장합니다.
Parameters
----------
pd_floors : Dict[str, float]
투자적격등급 PD 플로어 (AAA~BBB)
grades_to_extrapolate : List[str]
외삽할 등급 (기본: ["BB", "B"])
Returns
-------
Dict[str, float]
투기등급이 추가된 PD 플로어
"""
# 기존 투자적격 등급의 log(PD) vs index 관계
available = []
for g in BROAD_GRADE_ORDER:
if g in pd_floors:
available.append((BROAD_GRADE_INDEX[g], np.log(pd_floors[g])))
if len(available) < 2:
raise ValueError("외삽을 위해 최소 2개 등급의 PD가 필요합니다.")
# 선형 회귀: log(PD) = a × index + b
x = np.array([a[0] for a in available])
y = np.array([a[1] for a in available])
coeffs = np.polyfit(x, y, 1) # [기울기, 절편]
result = pd_floors.copy()
for grade in grades_to_extrapolate:
if grade in BROAD_GRADE_INDEX:
idx = BROAD_GRADE_INDEX[grade]
log_pd = coeffs[0] * idx + coeffs[1]
result[grade] = np.exp(log_pd)
logger.info(f"외삽 PD [{grade}]: {result[grade]*10000:.1f}bp "
f"(index={idx}, log_pd={log_pd:.4f})")
return result
def interpolate_ccc_pd(
pd_b: float,
pd_d: float = 1.0,
method: str = "geometric"
) -> Dict[str, float]:
"""
B와 D(=100%) 사이를 보간하여 CCC/CC/C 등급 PD 산출
Parameters
----------
pd_b : float
B등급 PD (소수)
pd_d : float
D등급 PD (기본: 1.0 = 100%)
method : str
'geometric': 기하평균
'log': 로그 등간격 보간
Returns
-------
Dict[str, float]
{'CCC': ..., 'CC': ..., 'C': ...}
"""
if method == "geometric":
# B → CCC → CC → C → D 순서로 기하적 등간격
# 4개 구간 (B→CCC, CCC→CC, CC→C, C→D)
log_b = np.log(pd_b)
log_d = np.log(pd_d)
step = (log_d - log_b) / 4
ccc_pd = np.exp(log_b + step * 1)
cc_pd = np.exp(log_b + step * 2)
c_pd = np.exp(log_b + step * 3)
else: # log-linear
log_b = np.log(pd_b)
log_d = np.log(pd_d)
step = (log_d - log_b) / 4
ccc_pd = np.exp(log_b + step * 1)
cc_pd = np.exp(log_b + step * 2)
c_pd = np.exp(log_b + step * 3)
return {"CCC": ccc_pd, "CC": cc_pd, "C": c_pd}
def compute_notch_pd_ratios(
notch_spreads: Dict[str, float],
lgd: float = DEFAULT_LGD
) -> Dict[str, Dict[str, float]]:
"""
노치등급 간 PD 비율 산출 (나중에 등급 쪼개기에 사용)
예: AA+ : AA : AA- = 0.91 : 1.00 : 1.07 (AA 대비 비율)
Parameters
----------
notch_spreads : Dict[str, float]
노치등급별 스프레드 (bp)
Returns
-------
Dict[str, Dict[str, float]]
대등급별 {노치등급: 비율} 딕셔너리
예: {'AA': {'AA+': 0.91, 'AA': 1.00, 'AA-': 1.07}}
"""
from data.ytm_fetcher import NOTCH_TO_BROAD
from collections import defaultdict
# 대등급별 노치 그룹핑
groups = defaultdict(dict)
for notch, spread in notch_spreads.items():
broad = NOTCH_TO_BROAD.get(notch, notch)
groups[broad][notch] = compute_market_implied_pd(spread, lgd)
# 중간 노치 대비 비율 계산
ratios = {}
for broad, notch_pds in groups.items():
if len(notch_pds) == 1:
key = list(notch_pds.keys())[0]
ratios[broad] = {key: 1.0}
else:
# 중간 노치(plain) 기준
mid_key = broad # AA+ → AA가 중간
if mid_key in notch_pds:
mid_pd = notch_pds[mid_key]
else:
mid_pd = sorted(notch_pds.values())[len(notch_pds) // 2]
ratios[broad] = {
notch: pd / mid_pd for notch, pd in notch_pds.items()
}
return ratios
def build_complete_pd_floor_table(
date: str = "2025-12-31",
anchor_aaa: float = DEFAULT_AAA_ANCHOR,
recovery_rate: float = DEFAULT_RECOVERY_RATE
) -> Tuple[Dict[str, float], Dict[str, float], Dict[str, Dict[str, float]]]:
"""
전체 PD 플로어 테이블 생성 (원스톱)
Parameters
----------
date : str
YTM 기준일
anchor_aaa : float
AAA 앵커 PD (기본: 5bp)
recovery_rate : float
회수율 (기본: 0.40)
Returns
-------
Tuple of:
broad_floors : Dict[str, float] - 대등급 PD 플로어 (AAA~B+D)
notch_ratios : Dict[str, Dict[str, float]] - 노치 비율
full_floors : Dict[str, float] - CCC/CC/C 포함 전체 PD (AAA~C+D)
"""
from data.ytm_fetcher import get_ytm_data, compute_spreads, compute_broad_grade_spreads
lgd = 1 - recovery_rate
# 1) YTM 데이터 수집
ytm_data = get_ytm_data(date)
notch_spreads = compute_spreads(ytm_data)
broad_spreads = compute_broad_grade_spreads(notch_spreads)
logger.info(f"대등급 스프레드: {broad_spreads}")
# 2) PD 플로어 산출 (투자적격)
pd_floors = compute_pd_floors(broad_spreads, anchor_aaa, lgd)
# 3) 투기등급 외삽
pd_floors = extrapolate_speculative_grades(pd_floors, ["BB", "B"])
# 4) CCC/CC/C 보간
ccc_pds = interpolate_ccc_pd(pd_floors["B"], pd_d=1.0, method="geometric")
full_floors = {**pd_floors, **ccc_pds, "D": 1.0}
# 5) 노치 비율
notch_ratios = compute_notch_pd_ratios(notch_spreads, lgd)
return pd_floors, notch_ratios, full_floors
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(message)s")
print("=" * 60)
print(" PD 플로어 산출 (KAP YTM 기반)")
print("=" * 60)
broad_floors, notch_ratios, full_floors = build_complete_pd_floor_table()
print("\n=== 대등급 PD 플로어 ===")
for grade in BROAD_GRADE_ORDER:
if grade in broad_floors:
print(f" {grade:>4}: {broad_floors[grade]*10000:8.1f}bp "
f"({broad_floors[grade]*100:.4f}%)")
print("\n=== 전체 PD 플로어 (CCC/CC/C 포함) ===")
order = ["AAA", "AA", "A", "BBB", "BB", "B", "CCC", "CC", "C", "D"]
for grade in order:
if grade in full_floors:
print(f" {grade:>4}: {full_floors[grade]*10000:8.1f}bp "
f"({full_floors[grade]*100:.4f}%)")
print("\n=== 노치 PD 비율 ===")
for broad, ratios in notch_ratios.items():
print(f" {broad}:")
for notch, ratio in sorted(ratios.items()):
print(f" {notch:>5}: ×{ratio:.3f}")