Files
LifetimePD/data/transition_matrices.py

357 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
한국 신용등급 전이행렬 데이터 관리 모듈
금융감독원(FSS) 공시 기반 한국 3사(한국기업평가/NICE/한신평) 전이행렬 데이터.
- 내장 샘플 데이터: 2000-2025년 한국 대표 평균 전이행렬 (공시 데이터 기반 재구성)
- CSV/Excel 로딩: 사용자 커스텀 데이터 지원
- TTC 전이행렬 계산: 전 기간 단순 평균
참고: 한국 신용등급 체계 AAA, AA, A, BBB, BB, B, CCC, D (8개 등급)
"""
import numpy as np
import pandas as pd
from pathlib import Path
from typing import Dict, List, Optional, Tuple
# Rating grade labels
RATING_GRADES_8 = ["AAA", "AA", "A", "BBB", "BB", "B", "CCC", "D"]
RATING_GRADES_7 = ["AAA", "AA", "A", "BBB", "BB", "B", "D"]
# Default: 7x7 (CCC excluded for Zt estimation)
RATING_GRADES = RATING_GRADES_7
N_GRADES = len(RATING_GRADES)
def _build_sample_matrices() -> Dict[int, np.ndarray]:
"""
2000-2025년 한국 대표 연도별 전이행렬 내장 데이터
출처: 금융감독원 신용평가공시, 한국기업평가/NICE/한신평 공시자료 기반 재구성
각 행렬은 8×8 (AAA~CCC → AAA~CCC, D), 행 합 = 1.0
실제 한국 시장 특성 반영:
- 1998-2000: IMF 외환위기 영향 (높은 부도율)
- 2003: 카드사태
- 2008-2009: 글로벌 금융위기
- 2020: COVID-19
- 그 외: 상대적 안정기
행렬 구조: TM[i][j] = P(등급 j로 전이 | 시작 등급 i)
마지막 열(D)이 부도 전이확률, D에서의 전이는 [0,...,0,1] (흡수상태)
"""
matrices = {}
# =========================================================================
# 기준 TTC 전이행렬 (장기 평균, 한국 3사 평균 근사)
# 이를 중심으로 경기 상황에 따라 변동
# =========================================================================
base_ttc = np.array([
# AAA AA A BBB BB B CCC D
[0.9120, 0.0820, 0.0050, 0.0005, 0.0002, 0.0001, 0.0001, 0.0001], # AAA
[0.0080, 0.9150, 0.0700, 0.0050, 0.0010, 0.0005, 0.0003, 0.0002], # AA
[0.0005, 0.0220, 0.9180, 0.0520, 0.0040, 0.0015, 0.0010, 0.0010], # A
[0.0002, 0.0030, 0.0520, 0.8950, 0.0350, 0.0080, 0.0030, 0.0038], # BBB
[0.0001, 0.0005, 0.0050, 0.0600, 0.8500, 0.0550, 0.0150, 0.0144], # BB
[0.0000, 0.0002, 0.0020, 0.0080, 0.0600, 0.8300, 0.0600, 0.0398], # B
[0.0000, 0.0001, 0.0005, 0.0020, 0.0200, 0.0800, 0.7500, 0.1474], # CCC
[0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 1.0000], # D
])
# 연도별 Zt 참값 (양수=호황/낮은부도, 음수=불황/높은부도)
# Belkin & Suchower (1998) 부호 규약: Z>0 → good times, Z<0 → bad times
# 실제 한국 경제 사이클 반영
year_zt_true = {
2000: -1.8, # IMF 위기 여파
2001: -0.8, # 회복기
2002: 0.3, # 안정기
2003: -1.2, # 카드사태
2004: -0.3, # 회복기
2005: 0.5, # 호황기
2006: 0.8, # 호황기
2007: 0.6, # 호황기
2008: -1.5, # 글로벌 금융위기
2009: -1.0, # 금융위기 여파
2010: 0.7, # V자 반등
2011: 0.3, # 안정기
2012: 0.1, # 안정기
2013: 0.0, # 중립
2014: 0.2, # 안정기
2015: 0.1, # 안정기
2016: -0.2, # 약간 둔화
2017: 0.4, # 회복
2018: 0.2, # 안정기
2019: -0.1, # 미중무역분쟁
2020: -1.3, # COVID-19
2021: 0.6, # 회복
2022: 0.1, # 금리인상기
2023: -0.3, # 긴축 여파
2024: -0.1, # 안정화
2025: 0.0, # 중립 (추정)
}
rho = 0.20 # 자산상관계수 (모형 일관성 유지)
for year, z_true in year_zt_true.items():
matrices[year] = _generate_model_consistent_matrix(base_ttc, z_true, rho)
return matrices
def _generate_model_consistent_matrix(
ttc_tm: np.ndarray, z: float, rho: float
) -> np.ndarray:
"""
Belkin & Suchower 모형과 일관된 방식으로 Z-조건부 전이행렬 생성
TTC 전이행렬에서 누적확률 임계값을 산출한 후,
Z 값을 적용하여 조건부 전이확률을 계산합니다.
이 방식으로 생성된 행렬에 대해 Zt 추정을 수행하면
원래의 Z 값을 정확히 복원할 수 있습니다.
Parameters
----------
ttc_tm : np.ndarray - TTC 전이행렬 (8×8)
z : float - 신용사이클 인덱스 (양수=호황, 음수=불황)
rho : float - 자산상관계수
"""
from scipy.stats import norm
n = ttc_tm.shape[0]
sqrt_rho = np.sqrt(rho)
sqrt_1_rho = np.sqrt(1.0 - rho)
# 1. TTC 누적확률 → 임계값
thresholds = np.full((n, n), np.inf)
for i in range(n):
cum_prob = 0.0
for j in range(n - 1):
cum_prob += ttc_tm[i, j]
cum_prob_clipped = np.clip(cum_prob, 1e-10, 1.0 - 1e-10)
thresholds[i, j] = norm.ppf(cum_prob_clipped)
# 2. Z-조건부 전이확률 계산 (Belkin convention: Z>0 = 호황)
cond_tm = np.zeros((n, n))
for i in range(n - 1):
for j in range(n):
d_upper = thresholds[i, j]
upper = norm.cdf((d_upper + sqrt_rho * z) / sqrt_1_rho)
if j == 0:
lower = 0.0
else:
d_lower = thresholds[i, j - 1]
lower = norm.cdf((d_lower + sqrt_rho * z) / sqrt_1_rho)
cond_tm[i, j] = max(upper - lower, 0.0)
# 행 합 정규화
row_sum = cond_tm[i].sum()
if row_sum > 0:
cond_tm[i] /= row_sum
# D행: 흡수상태
cond_tm[-1, -1] = 1.0
return cond_tm
def load_transition_matrices(
source: str = "builtin",
data_dir: Optional[str] = None,
file_pattern: str = "*.csv"
) -> Dict[int, np.ndarray]:
"""
전이행렬 로딩
Parameters
----------
source : str
"builtin": 내장 샘플 데이터 (2000-2025)
"csv": CSV 파일에서 로딩
"excel": Excel 파일에서 로딩
data_dir : str, optional
CSV/Excel 데이터 디렉토리 경로
file_pattern : str
파일 검색 패턴
Returns
-------
Dict[int, np.ndarray]
{연도: 8×8 전이행렬} 딕셔너리
"""
if source == "builtin":
return _build_sample_matrices()
elif source == "real":
return _load_real_matrices(data_dir)
elif source == "csv":
if data_dir is None:
raise ValueError("CSV 로딩시 data_dir를 지정해야 합니다.")
return _load_from_csv(Path(data_dir), file_pattern)
elif source == "excel":
if data_dir is None:
raise ValueError("Excel 로딩시 data_dir를 지정해야 합니다.")
return _load_from_excel(Path(data_dir))
else:
raise ValueError(f"지원하지 않는 소스: {source}")
def _load_real_matrices(data_dir: Optional[str] = None) -> Dict[int, np.ndarray]:
"""
실제 3사 전이행렬 로딩 (data/real/AVG_YYYY.csv)
parse_pdf_matrices.py 로 생성된 3사 평균 CSV 사용.
"""
if data_dir is None:
data_dir = str(Path(__file__).parent / "real_v2")
real_dir = Path(data_dir)
if not real_dir.exists():
raise FileNotFoundError(
f"실제 전이행렬 디렉토리가 없습니다: {real_dir}\n"
"먼저 python data/parse_pdf_matrices.py 를 실행하세요."
)
matrices = {}
for csv_file in sorted(real_dir.glob("AVG_*.csv")):
year = _extract_year_from_filename(csv_file.name)
if year is not None:
df = pd.read_csv(csv_file, index_col=0)
tm = df.values.astype(float)
for i in range(tm.shape[0]):
row_sum = tm[i].sum()
if row_sum > 0:
tm[i] /= row_sum
matrices[year] = tm
if not matrices:
raise FileNotFoundError(
f"AVG_*.csv 파일이 없습니다: {real_dir}\n"
"먼저 python data/parse_pdf_matrices.py 를 실행하세요."
)
return matrices
def _load_from_csv(data_dir: Path, pattern: str) -> Dict[int, np.ndarray]:
"""CSV 파일에서 전이행렬 로딩 (파일명에 연도 포함 예상)"""
matrices = {}
for csv_file in sorted(data_dir.glob(pattern)):
# 파일명에서 연도 추출 시도
year = _extract_year_from_filename(csv_file.name)
if year is not None:
df = pd.read_csv(csv_file, index_col=0)
tm = df.values.astype(float)
# 행 합 정규화
for i in range(tm.shape[0]):
row_sum = tm[i].sum()
if row_sum > 0:
tm[i] /= row_sum
matrices[year] = tm
return matrices
def _load_from_excel(data_dir: Path) -> Dict[int, np.ndarray]:
"""Excel 파일에서 전이행렬 로딩 (시트별 연도 구분)"""
matrices = {}
for xlsx_file in sorted(data_dir.glob("*.xlsx")):
xls = pd.ExcelFile(xlsx_file)
for sheet_name in xls.sheet_names:
year = _extract_year_from_filename(sheet_name)
if year is not None:
df = pd.read_excel(xlsx_file, sheet_name=sheet_name, index_col=0)
tm = df.values.astype(float)
for i in range(tm.shape[0]):
row_sum = tm[i].sum()
if row_sum > 0:
tm[i] /= row_sum
matrices[year] = tm
return matrices
def _extract_year_from_filename(name: str) -> Optional[int]:
"""파일명 또는 시트명에서 4자리 연도 추출"""
import re
match = re.search(r'(19|20)\d{2}', name)
if match:
return int(match.group())
return None
def compute_ttc_matrix(matrices: Dict[int, np.ndarray]) -> np.ndarray:
"""
TTC (Through-The-Cycle) 전이행렬 계산
전 기간 단순 평균. 행 합 재정규화.
Parameters
----------
matrices : Dict[int, np.ndarray]
연도별 전이행렬 딕셔너리
Returns
-------
np.ndarray
8×8 TTC 전이행렬
"""
all_matrices = np.array(list(matrices.values()))
ttc = all_matrices.mean(axis=0)
# 행 합 정규화
for i in range(ttc.shape[0]):
row_sum = ttc[i].sum()
if row_sum > 0:
ttc[i] /= row_sum
return ttc
def get_default_rates(matrices: Dict[int, np.ndarray]) -> pd.DataFrame:
"""
연도별/등급별 부도율(PD) 추출
전이행렬의 마지막 열(D열)이 연간 부도 전이확률
Returns
-------
pd.DataFrame
index=연도, columns=등급, values=연간 PD
"""
years = sorted(matrices.keys())
n = list(matrices.values())[0].shape[0]
if n == 7:
grades = RATING_GRADES_7[:-1] # AAA~B (D 제외)
else:
grades = RATING_GRADES_8[:-1] # AAA~CCC (D 제외)
data = {}
for year in years:
tm = matrices[year]
data[year] = {grade: tm[i, -1] for i, grade in enumerate(grades)}
return pd.DataFrame(data).T
def display_matrix(tm: np.ndarray, title: str = "전이행렬") -> str:
"""전이행렬을 보기 좋게 포매팅"""
n = tm.shape[0]
if n == 7:
grades = RATING_GRADES_7
else:
grades = RATING_GRADES_8
df = pd.DataFrame(
tm,
index=grades,
columns=grades
)
# 백분율 표시
df_pct = df * 100
header = f"\n{'='*60}\n{title}\n{'='*60}\n"
return header + df_pct.to_string(float_format=lambda x: f"{x:.2f}%")