- Replace hardcoded DEFAULT_PD_FLOORS with build_complete_pd_floor_table() (KAP bond YTM) - Fix ADF test: autolag='AIC' -> 'BIC' for small sample (N=26) robustness - Expand variable search: 40 -> 226 vars (log/diff/return/lag2), 1.9M combos - Select Model #2: HOUSING_PRICE + CREDIT_SPREAD_LAG1 + CURRENT_ACCOUNT_R - Add 6-test diagnostics table to AR1 sheet (ADF/LB/DW/BP/ARCH/Shapiro) - Add Korean variable names for transformed variables - Generate report v7 with full diagnostics
428 lines
14 KiB
Python
428 lines
14 KiB
Python
"""
|
||
시장 YTM 기반 PD 플로어 산출 모듈
|
||
|
||
KAP(한국자산평가) 등급별 회사채 수익률에서 신용스프레드를 추출하고,
|
||
시장 내재 부도율(PD)을 산출한 뒤, AAA=5bp 앵커링 + 로그 스케일링으로
|
||
각 등급의 PD 플로어를 결정합니다.
|
||
|
||
핵심 공식:
|
||
PD_raw = spread / (1 - recovery_rate) [LGD = 1 - RR]
|
||
scale = log(anchor_AAA) / log(PD_raw_AAA)
|
||
PD_floor(g) = exp(log(PD_raw(g)) × scale)
|
||
|
||
참고:
|
||
- Basel III CRE30.4: 기업 PD 플로어 5bp (0.05%)
|
||
- 한국 시장 회수율: 약 40% (LGD = 60%)
|
||
- docs/pd_floor_reference.md 참조
|
||
"""
|
||
|
||
import numpy as np
|
||
import logging
|
||
from typing import Dict, Optional, Tuple, List
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 기본 파라미터
|
||
DEFAULT_RECOVERY_RATE = 0.40 # 회수율 40% → LGD 60%
|
||
DEFAULT_AAA_ANCHOR = 0.0005 # 5bp = 0.05% = 0.0005
|
||
DEFAULT_LGD = 1 - DEFAULT_RECOVERY_RATE # 0.60
|
||
|
||
# 대등급 순서 (인덱스 기반 외삽에 사용)
|
||
BROAD_GRADE_ORDER = ["AAA", "AA", "A", "BBB", "BB", "B"]
|
||
BROAD_GRADE_INDEX = {g: i for i, g in enumerate(BROAD_GRADE_ORDER)}
|
||
|
||
|
||
def compute_market_implied_pd(
|
||
spread_bp: float,
|
||
lgd: float = DEFAULT_LGD
|
||
) -> float:
|
||
"""
|
||
신용스프레드에서 시장 내재 PD 산출
|
||
|
||
PD ≈ spread / LGD
|
||
|
||
Parameters
|
||
----------
|
||
spread_bp : float
|
||
신용스프레드 (bp, 1bp = 0.01%)
|
||
lgd : float
|
||
Loss Given Default (기본: 0.60)
|
||
|
||
Returns
|
||
-------
|
||
float
|
||
시장 내재 PD (소수, 예: 0.000727 = 7.27bp)
|
||
"""
|
||
spread_decimal = spread_bp / 10000 # bp → 소수
|
||
return spread_decimal / lgd
|
||
|
||
|
||
def compute_pd_floors(
|
||
spreads_bp: Dict[str, float],
|
||
anchor_aaa: float = DEFAULT_AAA_ANCHOR,
|
||
lgd: float = DEFAULT_LGD
|
||
) -> Dict[str, float]:
|
||
"""
|
||
AAA=5bp 앵커링 + 로그 스케일링으로 등급별 PD 플로어 산출
|
||
|
||
Parameters
|
||
----------
|
||
spreads_bp : Dict[str, float]
|
||
등급별 신용스프레드 (bp). 대등급(AAA, AA, A, BBB) 기준.
|
||
anchor_aaa : float
|
||
AAA 앵커 PD (기본: 5bp = 0.0005)
|
||
lgd : float
|
||
Loss Given Default (기본: 0.60)
|
||
|
||
Returns
|
||
-------
|
||
Dict[str, float]
|
||
등급별 PD 플로어 (소수). 예: {'AAA': 0.0005, 'AA': 0.00057, ...}
|
||
"""
|
||
# Step 1: 시장 내재 PD 산출
|
||
pd_raw = {}
|
||
for grade, spread in spreads_bp.items():
|
||
pd_raw[grade] = compute_market_implied_pd(spread, lgd)
|
||
|
||
if "AAA" not in pd_raw or pd_raw["AAA"] <= 0:
|
||
raise ValueError("AAA 스프레드가 0 이하입니다. 데이터를 확인하세요.")
|
||
|
||
# Step 2: 로그 스케일링
|
||
# scale = log(anchor) / log(pd_raw_AAA)
|
||
log_anchor = np.log(anchor_aaa)
|
||
log_aaa_raw = np.log(pd_raw["AAA"])
|
||
scale = log_anchor / log_aaa_raw
|
||
|
||
pd_floors = {}
|
||
for grade, pd_val in pd_raw.items():
|
||
pd_floors[grade] = np.exp(np.log(pd_val) * scale)
|
||
|
||
logger.info(f"PD 플로어 스케일링: scale={scale:.4f}, "
|
||
f"AAA raw={pd_raw['AAA']*10000:.1f}bp → floor={pd_floors['AAA']*10000:.1f}bp")
|
||
|
||
return pd_floors
|
||
|
||
|
||
# ============================================================
|
||
# 기본 PD 플로어 (시장 데이터 없이 사용 가능)
|
||
# ============================================================
|
||
# 근거:
|
||
# AAA = 5bp : Basel III CRE30.4 규제 플로어 (2023 개정, 기업 IRB)
|
||
# AA = 5bp : Basel III 최저선 + S&P 장기평균 2bp + Moody's 5bp 중간값
|
||
# A = 7bp : S&P 장기평균 5bp + Moody's 9bp 중간값
|
||
# BBB = 20bp: S&P 15bp + Moody's 26bp 중간값, 한국 BBB 관측 27bp와 정합
|
||
# BB = 60bp: S&P 56~63bp 범위, 관측치 사용 (floor 불필요)
|
||
# B = 300bp: S&P 293~334bp 범위, 관측치 사용 (floor 불필요)
|
||
#
|
||
# 참고문헌:
|
||
# [1] Basel Committee, CRE30.4: "PD shall not be less than 0.05%"
|
||
# [2] S&P Global, "2023 Annual Default and Transition Study"
|
||
# [3] Moody's, "Annual Default Study" (1920-2023)
|
||
# [4] 금융감독원, 신용평가공시 (한국기업평가 1998-2025)
|
||
DEFAULT_PD_FLOORS = {
|
||
"AAA": 0.0005, # 5bp — Basel III CRE30.4
|
||
"AA": 0.0005, # 5bp — Basel III 최저선
|
||
"A": 0.0007, # 7bp — S&P/Moody's 중간값
|
||
"BBB": 0.0020, # 20bp — S&P/Moody's 중간값
|
||
"BB": 0.0060, # 60bp — 관측치 수준 (floor 미적용)
|
||
"B": 0.0300, # 300bp — 관측치 수준 (floor 미적용)
|
||
}
|
||
|
||
# 7×7 행렬용 (CCC 제외)
|
||
GRADES_7 = ["AAA", "AA", "A", "BBB", "BB", "B"]
|
||
|
||
|
||
def get_default_pd_floors() -> Dict[str, float]:
|
||
"""기본 PD 플로어 반환 (Basel III + S&P/Moody's 근거)"""
|
||
return DEFAULT_PD_FLOORS.copy()
|
||
|
||
|
||
def apply_pd_floor_to_matrices(
|
||
matrices: Dict[int, 'np.ndarray'],
|
||
pd_floors: Optional[Dict[str, float]] = None,
|
||
grades: Optional[List[str]] = None
|
||
) -> Dict[int, 'np.ndarray']:
|
||
"""
|
||
전이행렬의 D열(부도 전이확률)에 PD 플로어 적용
|
||
|
||
로직:
|
||
1. 각 등급의 TM[i, D] < floor[i] 이면
|
||
2. TM[i, D] = floor[i] 로 상향
|
||
3. 초과분(delta)을 TM[i, i] (대각선)에서 차감
|
||
4. 행 합 = 1.0 유지
|
||
|
||
이론적 근거:
|
||
- 한국 투자적격등급(AAA~A) 부도 관측치 = 0%
|
||
- 0%는 "위험 없음"이 아니라 "관측 불가능한 확률"
|
||
- Basel III CRE30.4: 기업 PD ≥ 5bp (0.05%)
|
||
- S&P/Moody's 글로벌 장기평균으로 보정
|
||
|
||
Parameters
|
||
----------
|
||
matrices : Dict[int, np.ndarray]
|
||
연도별 전이행렬 (N×N, 마지막 열 = D)
|
||
pd_floors : Dict[str, float], optional
|
||
등급별 최소 PD (기본: DEFAULT_PD_FLOORS)
|
||
grades : List[str], optional
|
||
등급 레이블 (기본: AAA~B for 7×7)
|
||
|
||
Returns
|
||
-------
|
||
Dict[int, np.ndarray]
|
||
PD 플로어 적용된 전이행렬 (새 복사본)
|
||
"""
|
||
if pd_floors is None:
|
||
pd_floors = DEFAULT_PD_FLOORS
|
||
if grades is None:
|
||
grades = GRADES_7
|
||
|
||
calibrated = {}
|
||
for year, tm in matrices.items():
|
||
tm_new = tm.copy()
|
||
n = tm_new.shape[0]
|
||
d_col = n - 1 # 마지막 열 = D
|
||
|
||
for i in range(n - 1): # D행은 제외 (흡수상태)
|
||
grade = grades[i] if i < len(grades) else None
|
||
if grade and grade in pd_floors:
|
||
floor = pd_floors[grade]
|
||
observed_pd = tm_new[i, d_col]
|
||
|
||
if observed_pd < floor:
|
||
delta = floor - observed_pd
|
||
tm_new[i, d_col] = floor
|
||
# 대각선(유지확률)에서 차감
|
||
tm_new[i, i] = max(tm_new[i, i] - delta, 0.0)
|
||
|
||
# 행 합 재정규화 (안전장치)
|
||
row_sum = tm_new[i].sum()
|
||
if row_sum > 0:
|
||
tm_new[i] /= row_sum
|
||
|
||
calibrated[year] = tm_new
|
||
|
||
logger.info(f"PD 플로어 적용 완료: {len(calibrated)}개 연도")
|
||
return calibrated
|
||
|
||
|
||
def extrapolate_speculative_grades(
|
||
pd_floors: Dict[str, float],
|
||
grades_to_extrapolate: List[str] = ["BB", "B"]
|
||
) -> Dict[str, float]:
|
||
"""
|
||
투자적격등급(AAA~BBB)의 로그 트렌드를 외삽하여 투기등급 PD 산출
|
||
|
||
log(PD) vs grade_index의 선형 트렌드를 BBB 이후로 연장합니다.
|
||
|
||
Parameters
|
||
----------
|
||
pd_floors : Dict[str, float]
|
||
투자적격등급 PD 플로어 (AAA~BBB)
|
||
grades_to_extrapolate : List[str]
|
||
외삽할 등급 (기본: ["BB", "B"])
|
||
|
||
Returns
|
||
-------
|
||
Dict[str, float]
|
||
투기등급이 추가된 PD 플로어
|
||
"""
|
||
# 기존 투자적격 등급의 log(PD) vs index 관계
|
||
available = []
|
||
for g in BROAD_GRADE_ORDER:
|
||
if g in pd_floors:
|
||
available.append((BROAD_GRADE_INDEX[g], np.log(pd_floors[g])))
|
||
|
||
if len(available) < 2:
|
||
raise ValueError("외삽을 위해 최소 2개 등급의 PD가 필요합니다.")
|
||
|
||
# 선형 회귀: log(PD) = a × index + b
|
||
x = np.array([a[0] for a in available])
|
||
y = np.array([a[1] for a in available])
|
||
coeffs = np.polyfit(x, y, 1) # [기울기, 절편]
|
||
|
||
result = pd_floors.copy()
|
||
for grade in grades_to_extrapolate:
|
||
if grade in BROAD_GRADE_INDEX:
|
||
idx = BROAD_GRADE_INDEX[grade]
|
||
log_pd = coeffs[0] * idx + coeffs[1]
|
||
result[grade] = np.exp(log_pd)
|
||
logger.info(f"외삽 PD [{grade}]: {result[grade]*10000:.1f}bp "
|
||
f"(index={idx}, log_pd={log_pd:.4f})")
|
||
|
||
return result
|
||
|
||
|
||
def interpolate_ccc_pd(
|
||
pd_b: float,
|
||
pd_d: float = 1.0,
|
||
method: str = "geometric"
|
||
) -> Dict[str, float]:
|
||
"""
|
||
B와 D(=100%) 사이를 보간하여 CCC/CC/C 등급 PD 산출
|
||
|
||
Parameters
|
||
----------
|
||
pd_b : float
|
||
B등급 PD (소수)
|
||
pd_d : float
|
||
D등급 PD (기본: 1.0 = 100%)
|
||
method : str
|
||
'geometric': 기하평균
|
||
'log': 로그 등간격 보간
|
||
|
||
Returns
|
||
-------
|
||
Dict[str, float]
|
||
{'CCC': ..., 'CC': ..., 'C': ...}
|
||
"""
|
||
if method == "geometric":
|
||
# B → CCC → CC → C → D 순서로 기하적 등간격
|
||
# 4개 구간 (B→CCC, CCC→CC, CC→C, C→D)
|
||
log_b = np.log(pd_b)
|
||
log_d = np.log(pd_d)
|
||
step = (log_d - log_b) / 4
|
||
|
||
ccc_pd = np.exp(log_b + step * 1)
|
||
cc_pd = np.exp(log_b + step * 2)
|
||
c_pd = np.exp(log_b + step * 3)
|
||
else: # log-linear
|
||
log_b = np.log(pd_b)
|
||
log_d = np.log(pd_d)
|
||
step = (log_d - log_b) / 4
|
||
|
||
ccc_pd = np.exp(log_b + step * 1)
|
||
cc_pd = np.exp(log_b + step * 2)
|
||
c_pd = np.exp(log_b + step * 3)
|
||
|
||
return {"CCC": ccc_pd, "CC": cc_pd, "C": c_pd}
|
||
|
||
|
||
def compute_notch_pd_ratios(
|
||
notch_spreads: Dict[str, float],
|
||
lgd: float = DEFAULT_LGD
|
||
) -> Dict[str, Dict[str, float]]:
|
||
"""
|
||
노치등급 간 PD 비율 산출 (나중에 등급 쪼개기에 사용)
|
||
|
||
예: AA+ : AA : AA- = 0.91 : 1.00 : 1.07 (AA 대비 비율)
|
||
|
||
Parameters
|
||
----------
|
||
notch_spreads : Dict[str, float]
|
||
노치등급별 스프레드 (bp)
|
||
|
||
Returns
|
||
-------
|
||
Dict[str, Dict[str, float]]
|
||
대등급별 {노치등급: 비율} 딕셔너리
|
||
예: {'AA': {'AA+': 0.91, 'AA': 1.00, 'AA-': 1.07}}
|
||
"""
|
||
from data.ytm_fetcher import NOTCH_TO_BROAD
|
||
from collections import defaultdict
|
||
|
||
# 대등급별 노치 그룹핑
|
||
groups = defaultdict(dict)
|
||
for notch, spread in notch_spreads.items():
|
||
broad = NOTCH_TO_BROAD.get(notch, notch)
|
||
groups[broad][notch] = compute_market_implied_pd(spread, lgd)
|
||
|
||
# 중간 노치 대비 비율 계산
|
||
ratios = {}
|
||
for broad, notch_pds in groups.items():
|
||
if len(notch_pds) == 1:
|
||
key = list(notch_pds.keys())[0]
|
||
ratios[broad] = {key: 1.0}
|
||
else:
|
||
# 중간 노치(plain) 기준
|
||
mid_key = broad # AA+ → AA가 중간
|
||
if mid_key in notch_pds:
|
||
mid_pd = notch_pds[mid_key]
|
||
else:
|
||
mid_pd = sorted(notch_pds.values())[len(notch_pds) // 2]
|
||
|
||
ratios[broad] = {
|
||
notch: pd / mid_pd for notch, pd in notch_pds.items()
|
||
}
|
||
|
||
return ratios
|
||
|
||
|
||
def build_complete_pd_floor_table(
|
||
date: str = "2025-12-31",
|
||
anchor_aaa: float = DEFAULT_AAA_ANCHOR,
|
||
recovery_rate: float = DEFAULT_RECOVERY_RATE
|
||
) -> Tuple[Dict[str, float], Dict[str, float], Dict[str, Dict[str, float]]]:
|
||
"""
|
||
전체 PD 플로어 테이블 생성 (원스톱)
|
||
|
||
Parameters
|
||
----------
|
||
date : str
|
||
YTM 기준일
|
||
anchor_aaa : float
|
||
AAA 앵커 PD (기본: 5bp)
|
||
recovery_rate : float
|
||
회수율 (기본: 0.40)
|
||
|
||
Returns
|
||
-------
|
||
Tuple of:
|
||
broad_floors : Dict[str, float] - 대등급 PD 플로어 (AAA~B+D)
|
||
notch_ratios : Dict[str, Dict[str, float]] - 노치 비율
|
||
full_floors : Dict[str, float] - CCC/CC/C 포함 전체 PD (AAA~C+D)
|
||
"""
|
||
from data.ytm_fetcher import get_ytm_data, compute_spreads, compute_broad_grade_spreads
|
||
|
||
lgd = 1 - recovery_rate
|
||
|
||
# 1) YTM 데이터 수집
|
||
ytm_data = get_ytm_data(date)
|
||
notch_spreads = compute_spreads(ytm_data)
|
||
broad_spreads = compute_broad_grade_spreads(notch_spreads)
|
||
|
||
logger.info(f"대등급 스프레드: {broad_spreads}")
|
||
|
||
# 2) PD 플로어 산출 (투자적격)
|
||
pd_floors = compute_pd_floors(broad_spreads, anchor_aaa, lgd)
|
||
|
||
# 3) 투기등급 외삽
|
||
pd_floors = extrapolate_speculative_grades(pd_floors, ["BB", "B"])
|
||
|
||
# 4) CCC/CC/C 보간
|
||
ccc_pds = interpolate_ccc_pd(pd_floors["B"], pd_d=1.0, method="geometric")
|
||
full_floors = {**pd_floors, **ccc_pds, "D": 1.0}
|
||
|
||
# 5) 노치 비율
|
||
notch_ratios = compute_notch_pd_ratios(notch_spreads, lgd)
|
||
|
||
return pd_floors, notch_ratios, full_floors
|
||
|
||
|
||
if __name__ == "__main__":
|
||
logging.basicConfig(level=logging.INFO, format="%(message)s")
|
||
|
||
print("=" * 60)
|
||
print(" PD 플로어 산출 (KAP YTM 기반)")
|
||
print("=" * 60)
|
||
|
||
broad_floors, notch_ratios, full_floors = build_complete_pd_floor_table()
|
||
|
||
print("\n=== 대등급 PD 플로어 ===")
|
||
for grade in BROAD_GRADE_ORDER:
|
||
if grade in broad_floors:
|
||
print(f" {grade:>4}: {broad_floors[grade]*10000:8.1f}bp "
|
||
f"({broad_floors[grade]*100:.4f}%)")
|
||
|
||
print("\n=== 전체 PD 플로어 (CCC/CC/C 포함) ===")
|
||
order = ["AAA", "AA", "A", "BBB", "BB", "B", "CCC", "CC", "C", "D"]
|
||
for grade in order:
|
||
if grade in full_floors:
|
||
print(f" {grade:>4}: {full_floors[grade]*10000:8.1f}bp "
|
||
f"({full_floors[grade]*100:.4f}%)")
|
||
|
||
print("\n=== 노치 PD 비율 ===")
|
||
for broad, ratios in notch_ratios.items():
|
||
print(f" {broad}:")
|
||
for notch, ratio in sorted(ratios.items()):
|
||
print(f" {notch:>5}: ×{ratio:.3f}")
|