Files
LifetimePD/data/pd_floor.py
Variet Agent d1ddf06e5d feat(model): KAP YTM PD floor integration, expanded 226-var search, ADF fix (AIC->BIC), Model#2 with 6-test diagnostics
- Replace hardcoded DEFAULT_PD_FLOORS with build_complete_pd_floor_table() (KAP bond YTM)
- Fix ADF test: autolag='AIC' -> 'BIC' for small sample (N=26) robustness
- Expand variable search: 40 -> 226 vars (log/diff/return/lag2), 1.9M combos
- Select Model #2: HOUSING_PRICE + CREDIT_SPREAD_LAG1 + CURRENT_ACCOUNT_R
- Add 6-test diagnostics table to AR1 sheet (ADF/LB/DW/BP/ARCH/Shapiro)
- Add Korean variable names for transformed variables
- Generate report v7 with full diagnostics
2026-03-12 00:06:23 +09:00

428 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
시장 YTM 기반 PD 플로어 산출 모듈
KAP(한국자산평가) 등급별 회사채 수익률에서 신용스프레드를 추출하고,
시장 내재 부도율(PD)을 산출한 뒤, AAA=5bp 앵커링 + 로그 스케일링으로
각 등급의 PD 플로어를 결정합니다.
핵심 공식:
PD_raw = spread / (1 - recovery_rate) [LGD = 1 - RR]
scale = log(anchor_AAA) / log(PD_raw_AAA)
PD_floor(g) = exp(log(PD_raw(g)) × scale)
참고:
- Basel III CRE30.4: 기업 PD 플로어 5bp (0.05%)
- 한국 시장 회수율: 약 40% (LGD = 60%)
- docs/pd_floor_reference.md 참조
"""
import numpy as np
import logging
from typing import Dict, Optional, Tuple, List
logger = logging.getLogger(__name__)
# 기본 파라미터
DEFAULT_RECOVERY_RATE = 0.40 # 회수율 40% → LGD 60%
DEFAULT_AAA_ANCHOR = 0.0005 # 5bp = 0.05% = 0.0005
DEFAULT_LGD = 1 - DEFAULT_RECOVERY_RATE # 0.60
# 대등급 순서 (인덱스 기반 외삽에 사용)
BROAD_GRADE_ORDER = ["AAA", "AA", "A", "BBB", "BB", "B"]
BROAD_GRADE_INDEX = {g: i for i, g in enumerate(BROAD_GRADE_ORDER)}
def compute_market_implied_pd(
spread_bp: float,
lgd: float = DEFAULT_LGD
) -> float:
"""
신용스프레드에서 시장 내재 PD 산출
PD ≈ spread / LGD
Parameters
----------
spread_bp : float
신용스프레드 (bp, 1bp = 0.01%)
lgd : float
Loss Given Default (기본: 0.60)
Returns
-------
float
시장 내재 PD (소수, 예: 0.000727 = 7.27bp)
"""
spread_decimal = spread_bp / 10000 # bp → 소수
return spread_decimal / lgd
def compute_pd_floors(
spreads_bp: Dict[str, float],
anchor_aaa: float = DEFAULT_AAA_ANCHOR,
lgd: float = DEFAULT_LGD
) -> Dict[str, float]:
"""
AAA=5bp 앵커링 + 로그 스케일링으로 등급별 PD 플로어 산출
Parameters
----------
spreads_bp : Dict[str, float]
등급별 신용스프레드 (bp). 대등급(AAA, AA, A, BBB) 기준.
anchor_aaa : float
AAA 앵커 PD (기본: 5bp = 0.0005)
lgd : float
Loss Given Default (기본: 0.60)
Returns
-------
Dict[str, float]
등급별 PD 플로어 (소수). 예: {'AAA': 0.0005, 'AA': 0.00057, ...}
"""
# Step 1: 시장 내재 PD 산출
pd_raw = {}
for grade, spread in spreads_bp.items():
pd_raw[grade] = compute_market_implied_pd(spread, lgd)
if "AAA" not in pd_raw or pd_raw["AAA"] <= 0:
raise ValueError("AAA 스프레드가 0 이하입니다. 데이터를 확인하세요.")
# Step 2: 로그 스케일링
# scale = log(anchor) / log(pd_raw_AAA)
log_anchor = np.log(anchor_aaa)
log_aaa_raw = np.log(pd_raw["AAA"])
scale = log_anchor / log_aaa_raw
pd_floors = {}
for grade, pd_val in pd_raw.items():
pd_floors[grade] = np.exp(np.log(pd_val) * scale)
logger.info(f"PD 플로어 스케일링: scale={scale:.4f}, "
f"AAA raw={pd_raw['AAA']*10000:.1f}bp → floor={pd_floors['AAA']*10000:.1f}bp")
return pd_floors
# ============================================================
# 기본 PD 플로어 (시장 데이터 없이 사용 가능)
# ============================================================
# 근거:
# AAA = 5bp : Basel III CRE30.4 규제 플로어 (2023 개정, 기업 IRB)
# AA = 5bp : Basel III 최저선 + S&P 장기평균 2bp + Moody's 5bp 중간값
# A = 7bp : S&P 장기평균 5bp + Moody's 9bp 중간값
# BBB = 20bp: S&P 15bp + Moody's 26bp 중간값, 한국 BBB 관측 27bp와 정합
# BB = 60bp: S&P 56~63bp 범위, 관측치 사용 (floor 불필요)
# B = 300bp: S&P 293~334bp 범위, 관측치 사용 (floor 불필요)
#
# 참고문헌:
# [1] Basel Committee, CRE30.4: "PD shall not be less than 0.05%"
# [2] S&P Global, "2023 Annual Default and Transition Study"
# [3] Moody's, "Annual Default Study" (1920-2023)
# [4] 금융감독원, 신용평가공시 (한국기업평가 1998-2025)
DEFAULT_PD_FLOORS = {
"AAA": 0.0005, # 5bp — Basel III CRE30.4
"AA": 0.0005, # 5bp — Basel III 최저선
"A": 0.0007, # 7bp — S&P/Moody's 중간값
"BBB": 0.0020, # 20bp — S&P/Moody's 중간값
"BB": 0.0060, # 60bp — 관측치 수준 (floor 미적용)
"B": 0.0300, # 300bp — 관측치 수준 (floor 미적용)
}
# 7×7 행렬용 (CCC 제외)
GRADES_7 = ["AAA", "AA", "A", "BBB", "BB", "B"]
def get_default_pd_floors() -> Dict[str, float]:
"""기본 PD 플로어 반환 (Basel III + S&P/Moody's 근거)"""
return DEFAULT_PD_FLOORS.copy()
def apply_pd_floor_to_matrices(
matrices: Dict[int, 'np.ndarray'],
pd_floors: Optional[Dict[str, float]] = None,
grades: Optional[List[str]] = None
) -> Dict[int, 'np.ndarray']:
"""
전이행렬의 D열(부도 전이확률)에 PD 플로어 적용
로직:
1. 각 등급의 TM[i, D] < floor[i] 이면
2. TM[i, D] = floor[i] 로 상향
3. 초과분(delta)을 TM[i, i] (대각선)에서 차감
4. 행 합 = 1.0 유지
이론적 근거:
- 한국 투자적격등급(AAA~A) 부도 관측치 = 0%
- 0%"위험 없음"이 아니라 "관측 불가능한 확률"
- Basel III CRE30.4: 기업 PD ≥ 5bp (0.05%)
- S&P/Moody's 글로벌 장기평균으로 보정
Parameters
----------
matrices : Dict[int, np.ndarray]
연도별 전이행렬 (N×N, 마지막 열 = D)
pd_floors : Dict[str, float], optional
등급별 최소 PD (기본: DEFAULT_PD_FLOORS)
grades : List[str], optional
등급 레이블 (기본: AAA~B for 7×7)
Returns
-------
Dict[int, np.ndarray]
PD 플로어 적용된 전이행렬 (새 복사본)
"""
if pd_floors is None:
pd_floors = DEFAULT_PD_FLOORS
if grades is None:
grades = GRADES_7
calibrated = {}
for year, tm in matrices.items():
tm_new = tm.copy()
n = tm_new.shape[0]
d_col = n - 1 # 마지막 열 = D
for i in range(n - 1): # D행은 제외 (흡수상태)
grade = grades[i] if i < len(grades) else None
if grade and grade in pd_floors:
floor = pd_floors[grade]
observed_pd = tm_new[i, d_col]
if observed_pd < floor:
delta = floor - observed_pd
tm_new[i, d_col] = floor
# 대각선(유지확률)에서 차감
tm_new[i, i] = max(tm_new[i, i] - delta, 0.0)
# 행 합 재정규화 (안전장치)
row_sum = tm_new[i].sum()
if row_sum > 0:
tm_new[i] /= row_sum
calibrated[year] = tm_new
logger.info(f"PD 플로어 적용 완료: {len(calibrated)}개 연도")
return calibrated
def extrapolate_speculative_grades(
pd_floors: Dict[str, float],
grades_to_extrapolate: List[str] = ["BB", "B"]
) -> Dict[str, float]:
"""
투자적격등급(AAA~BBB)의 로그 트렌드를 외삽하여 투기등급 PD 산출
log(PD) vs grade_index의 선형 트렌드를 BBB 이후로 연장합니다.
Parameters
----------
pd_floors : Dict[str, float]
투자적격등급 PD 플로어 (AAA~BBB)
grades_to_extrapolate : List[str]
외삽할 등급 (기본: ["BB", "B"])
Returns
-------
Dict[str, float]
투기등급이 추가된 PD 플로어
"""
# 기존 투자적격 등급의 log(PD) vs index 관계
available = []
for g in BROAD_GRADE_ORDER:
if g in pd_floors:
available.append((BROAD_GRADE_INDEX[g], np.log(pd_floors[g])))
if len(available) < 2:
raise ValueError("외삽을 위해 최소 2개 등급의 PD가 필요합니다.")
# 선형 회귀: log(PD) = a × index + b
x = np.array([a[0] for a in available])
y = np.array([a[1] for a in available])
coeffs = np.polyfit(x, y, 1) # [기울기, 절편]
result = pd_floors.copy()
for grade in grades_to_extrapolate:
if grade in BROAD_GRADE_INDEX:
idx = BROAD_GRADE_INDEX[grade]
log_pd = coeffs[0] * idx + coeffs[1]
result[grade] = np.exp(log_pd)
logger.info(f"외삽 PD [{grade}]: {result[grade]*10000:.1f}bp "
f"(index={idx}, log_pd={log_pd:.4f})")
return result
def interpolate_ccc_pd(
pd_b: float,
pd_d: float = 1.0,
method: str = "geometric"
) -> Dict[str, float]:
"""
B와 D(=100%) 사이를 보간하여 CCC/CC/C 등급 PD 산출
Parameters
----------
pd_b : float
B등급 PD (소수)
pd_d : float
D등급 PD (기본: 1.0 = 100%)
method : str
'geometric': 기하평균
'log': 로그 등간격 보간
Returns
-------
Dict[str, float]
{'CCC': ..., 'CC': ..., 'C': ...}
"""
if method == "geometric":
# B → CCC → CC → C → D 순서로 기하적 등간격
# 4개 구간 (B→CCC, CCC→CC, CC→C, C→D)
log_b = np.log(pd_b)
log_d = np.log(pd_d)
step = (log_d - log_b) / 4
ccc_pd = np.exp(log_b + step * 1)
cc_pd = np.exp(log_b + step * 2)
c_pd = np.exp(log_b + step * 3)
else: # log-linear
log_b = np.log(pd_b)
log_d = np.log(pd_d)
step = (log_d - log_b) / 4
ccc_pd = np.exp(log_b + step * 1)
cc_pd = np.exp(log_b + step * 2)
c_pd = np.exp(log_b + step * 3)
return {"CCC": ccc_pd, "CC": cc_pd, "C": c_pd}
def compute_notch_pd_ratios(
notch_spreads: Dict[str, float],
lgd: float = DEFAULT_LGD
) -> Dict[str, Dict[str, float]]:
"""
노치등급 간 PD 비율 산출 (나중에 등급 쪼개기에 사용)
예: AA+ : AA : AA- = 0.91 : 1.00 : 1.07 (AA 대비 비율)
Parameters
----------
notch_spreads : Dict[str, float]
노치등급별 스프레드 (bp)
Returns
-------
Dict[str, Dict[str, float]]
대등급별 {노치등급: 비율} 딕셔너리
예: {'AA': {'AA+': 0.91, 'AA': 1.00, 'AA-': 1.07}}
"""
from data.ytm_fetcher import NOTCH_TO_BROAD
from collections import defaultdict
# 대등급별 노치 그룹핑
groups = defaultdict(dict)
for notch, spread in notch_spreads.items():
broad = NOTCH_TO_BROAD.get(notch, notch)
groups[broad][notch] = compute_market_implied_pd(spread, lgd)
# 중간 노치 대비 비율 계산
ratios = {}
for broad, notch_pds in groups.items():
if len(notch_pds) == 1:
key = list(notch_pds.keys())[0]
ratios[broad] = {key: 1.0}
else:
# 중간 노치(plain) 기준
mid_key = broad # AA+ → AA가 중간
if mid_key in notch_pds:
mid_pd = notch_pds[mid_key]
else:
mid_pd = sorted(notch_pds.values())[len(notch_pds) // 2]
ratios[broad] = {
notch: pd / mid_pd for notch, pd in notch_pds.items()
}
return ratios
def build_complete_pd_floor_table(
date: str = "2025-12-31",
anchor_aaa: float = DEFAULT_AAA_ANCHOR,
recovery_rate: float = DEFAULT_RECOVERY_RATE
) -> Tuple[Dict[str, float], Dict[str, float], Dict[str, Dict[str, float]]]:
"""
전체 PD 플로어 테이블 생성 (원스톱)
Parameters
----------
date : str
YTM 기준일
anchor_aaa : float
AAA 앵커 PD (기본: 5bp)
recovery_rate : float
회수율 (기본: 0.40)
Returns
-------
Tuple of:
broad_floors : Dict[str, float] - 대등급 PD 플로어 (AAA~B+D)
notch_ratios : Dict[str, Dict[str, float]] - 노치 비율
full_floors : Dict[str, float] - CCC/CC/C 포함 전체 PD (AAA~C+D)
"""
from data.ytm_fetcher import get_ytm_data, compute_spreads, compute_broad_grade_spreads
lgd = 1 - recovery_rate
# 1) YTM 데이터 수집
ytm_data = get_ytm_data(date)
notch_spreads = compute_spreads(ytm_data)
broad_spreads = compute_broad_grade_spreads(notch_spreads)
logger.info(f"대등급 스프레드: {broad_spreads}")
# 2) PD 플로어 산출 (투자적격)
pd_floors = compute_pd_floors(broad_spreads, anchor_aaa, lgd)
# 3) 투기등급 외삽
pd_floors = extrapolate_speculative_grades(pd_floors, ["BB", "B"])
# 4) CCC/CC/C 보간
ccc_pds = interpolate_ccc_pd(pd_floors["B"], pd_d=1.0, method="geometric")
full_floors = {**pd_floors, **ccc_pds, "D": 1.0}
# 5) 노치 비율
notch_ratios = compute_notch_pd_ratios(notch_spreads, lgd)
return pd_floors, notch_ratios, full_floors
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(message)s")
print("=" * 60)
print(" PD 플로어 산출 (KAP YTM 기반)")
print("=" * 60)
broad_floors, notch_ratios, full_floors = build_complete_pd_floor_table()
print("\n=== 대등급 PD 플로어 ===")
for grade in BROAD_GRADE_ORDER:
if grade in broad_floors:
print(f" {grade:>4}: {broad_floors[grade]*10000:8.1f}bp "
f"({broad_floors[grade]*100:.4f}%)")
print("\n=== 전체 PD 플로어 (CCC/CC/C 포함) ===")
order = ["AAA", "AA", "A", "BBB", "BB", "B", "CCC", "CC", "C", "D"]
for grade in order:
if grade in full_floors:
print(f" {grade:>4}: {full_floors[grade]*10000:8.1f}bp "
f"({full_floors[grade]*100:.4f}%)")
print("\n=== 노치 PD 비율 ===")
for broad, ratios in notch_ratios.items():
print(f" {broad}:")
for notch, ratio in sorted(ratios.items()):
print(f" {notch:>5}: ×{ratio:.3f}")