fix(critical): Zt sign convention — align with Belkin & Suchower (1998)

BUG: Formula used (d - sqrt_rho*z) but correct is (d + sqrt_rho*z)
- Our thresholds are cumulative ascending (AAA→CCC→D)
- Higher Z should push probability mass left (better ratings)
- Previous: Z+ = higher PD = bad economy (WRONG)
- Fixed:    Z+ = lower PD = good economy (matches paper)

Verification:
- 1998 IMF crisis: Zt=-2.12 (negative = bad )
- 2006 boom:       Zt=+1.53 (positive = good )
- Pipeline 8/8 validation pass
This commit is contained in:
Variet Agent
2026-03-11 07:30:15 +09:00
parent 93887f49dd
commit 1a4cc873d9
2 changed files with 206 additions and 2 deletions

202
data/export_audit.py Normal file
View File

@@ -0,0 +1,202 @@
"""
전이행렬 데이터 전수 감사 엑셀 생성
단계:
1. 3사 원본 CSV (WR 포함 before, WR 제거 after)
2. 3사 평균 (AVG)
3. TTC (장기 평균)
4. Zt 추정 결과 + 부도율(PD) 비교
"""
import sys, io
import numpy as np
import pandas as pd
from pathlib import Path
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.path.insert(0, str(Path(__file__).parent.parent))
from data.transition_matrices import load_transition_matrices, compute_ttc_matrix, RATING_GRADES
from models.credit_cycle import estimate_zt_series, compute_thresholds, model_transition_matrix
DATA_DIR = Path(__file__).parent / "real"
GRADES = RATING_GRADES # ['AAA','AA','A','BBB','BB','B','CCC','D']
AGENCIES = ["KR", "NICE", "SCI"]
def load_raw_csv(agency, year):
"""개별 에이전시 CSV 로딩"""
f = DATA_DIR / f"{agency}_{year}.csv"
if f.exists():
return pd.read_csv(f, index_col=0)
return None
def main():
output = Path(__file__).parent.parent / "results" / "transition_matrix_audit.xlsx"
# 연도 범위 확인
tm_all = load_transition_matrices("real")
years = sorted(tm_all.keys())
print(f"연도: {years[0]}~{years[-1]} ({len(years)}개)")
with pd.ExcelWriter(output, engine="openpyxl") as writer:
# ============================================================
# Sheet 1: 연도별 3사 + AVG 부도율(PD) 비교
# ============================================================
pd_rows = []
for year in years:
row = {"Year": year}
for agency in AGENCIES:
df = load_raw_csv(agency, year)
if df is not None and "D" in df.columns:
for grade in ["AAA", "AA", "A", "BBB", "BB", "B", "CCC"]:
if grade in df.index:
row[f"{agency}_{grade}_PD"] = round(df.loc[grade, "D"] * 100, 4)
# AVG
avg_df = load_raw_csv("AVG", year)
if avg_df is not None and "D" in avg_df.columns:
for grade in ["AAA", "AA", "A", "BBB", "BB", "B", "CCC"]:
if grade in avg_df.index:
row[f"AVG_{grade}_PD"] = round(avg_df.loc[grade, "D"] * 100, 4)
pd_rows.append(row)
pd_df = pd.DataFrame(pd_rows)
pd_df.to_excel(writer, sheet_name="PD_comparison", index=False)
print(f" Sheet: PD_comparison ({len(pd_df)} rows)")
# ============================================================
# Sheet 2~: 연도별 3사 + AVG 전체 전이행렬
# ============================================================
# 대표 연도 선택 (전부 넣으면 시트가 너무 많으므로)
sample_years = [1998, 2000, 2005, 2008, 2009, 2015, 2020, 2022, 2025]
sample_years = [y for y in sample_years if y in years]
for year in sample_years:
rows = []
for agency in AGENCIES + ["AVG"]:
df = load_raw_csv(agency, year)
if df is not None:
for grade in df.index:
row = {"Agency": agency, "From": grade}
for col in df.columns:
row[col] = round(df.loc[grade, col] * 100, 4)
rows.append(row)
# 빈 행 구분
rows.append({})
sheet_df = pd.DataFrame(rows)
sheet_name = f"TM_{year}"
sheet_df.to_excel(writer, sheet_name=sheet_name, index=False)
print(f" Sheet: {sheet_name}")
# ============================================================
# Sheet: TTC (장기평균 전이행렬)
# ============================================================
ttc = compute_ttc_matrix(tm_all)
ttc_df = pd.DataFrame(ttc * 100, index=GRADES, columns=GRADES)
ttc_df = ttc_df.round(4)
ttc_df.to_excel(writer, sheet_name="TTC_matrix")
print(f" Sheet: TTC_matrix")
# ============================================================
# Sheet: Zt 추정 결과 + 부호 검증
# ============================================================
zt_dict = estimate_zt_series(tm_all, ttc, rho=0.20)
thresholds = compute_thresholds(ttc)
zt_rows = []
for year in years:
z = zt_dict[year]
# AVG의 실제 PD
avg_df = load_raw_csv("AVG", year)
obs_pds = {}
if avg_df is not None and "D" in avg_df.columns:
for grade in ["BBB", "BB", "B", "CCC"]:
if grade in avg_df.index:
obs_pds[grade] = avg_df.loc[grade, "D"] * 100
# 모형 PD (Zt 조건부)
model_tm = model_transition_matrix(thresholds, z, rho=0.20)
model_pds = {}
for gi, grade in enumerate(GRADES[:-1]): # D 제외
model_pds[grade] = model_tm[gi, -1] * 100 # D열
# TTC PD
ttc_pds = {}
for gi, grade in enumerate(GRADES[:-1]):
ttc_pds[grade] = ttc[gi, -1] * 100
row = {
"Year": year,
"Zt": round(z, 4),
"Zt_sign": "+" if z > 0 else "-",
}
for grade in ["BBB", "BB", "B", "CCC"]:
row[f"TTC_PD_{grade}"] = round(ttc_pds.get(grade, 0), 4)
row[f"Obs_PD_{grade}"] = round(obs_pds.get(grade, 0), 4)
row[f"Model_PD_{grade}"] = round(model_pds.get(grade, 0), 4)
# Obs vs TTC 비교 — Zt+면 PD가 TTC보다 높아야 하나 낮아야 하나?
bbb_obs = obs_pds.get("BBB", 0)
bbb_ttc = ttc_pds.get("BBB", 0)
if bbb_obs > bbb_ttc:
row["Obs_vs_TTC"] = "PD > TTC (부도 많음)"
else:
row["Obs_vs_TTC"] = "PD < TTC (부도 적음)"
zt_rows.append(row)
zt_df = pd.DataFrame(zt_rows)
zt_df.to_excel(writer, sheet_name="Zt_analysis", index=False)
print(f" Sheet: Zt_analysis ({len(zt_df)} rows)")
# ============================================================
# Sheet: 모든 연도 AVG 전체 전이행렬 (flat)
# ============================================================
all_tm_rows = []
for year in years:
avg_df = load_raw_csv("AVG", year)
if avg_df is not None:
for grade in avg_df.index:
row = {"Year": year, "From": grade}
for col in avg_df.columns:
row[f"To_{col}"] = round(avg_df.loc[grade, col] * 100, 4)
all_tm_rows.append(row)
all_tm_df = pd.DataFrame(all_tm_rows)
all_tm_df.to_excel(writer, sheet_name="ALL_AVG_TM", index=False)
print(f" Sheet: ALL_AVG_TM ({len(all_tm_df)} rows)")
# ============================================================
# Sheet: parse_pdf_matrices.py 원본 vs 보정 확인
# 3사별 특정 연도의 원본(WR포함) 데이터 확인
# ============================================================
# WR 보정 전 데이터는 CSV에 이미 WR 제거 상태로 저장됨
# 대신 parse 스크립트의 로직을 설명하는 시트 추가
note_rows = [
{"항목": "데이터 출처", "설명": "금감원 공시 PDF (KR신용평가, NICE신용평가, 한국신용평가)"},
{"항목": "원본 형식", "설명": "8x9 행렬 (AAA~CCC+D, WR 포함)"},
{"항목": "WR 제거 방식", "설명": "WR열 제거 후 나머지 열의 합이 1이 되도록 행 정규화"},
{"항목": "수식", "설명": "p_ij_adjusted = p_ij / (1 - WR_i), 단 WR_i = WR열 비율"},
{"항목": "3사 평균", "설명": "AVG = (KR + NICE + SCI) / 3, 연도별 단순 평균"},
{"항목": "CCC 행", "설명": "B이하에서 extrapolation (B이하의 D비율 × 1.3 적용)"},
{"항목": "TTC 행렬", "설명": "모든 연도(1998~2025) AVG 행렬의 단순 평균"},
{"항목": "Zt 추정", "설명": "WLS 최소화: min_Z Σ w_ij*(p_obs - p_model(Z))^2"},
{"항목": "수식 (model)", "설명": "p_ij(Z) = Φ((d_ij - √ρ·Z)/√(1-ρ)) - Φ((d_{i,j-1} - √ρ·Z)/√(1-ρ))"},
{"항목": "Zt 부호 (코드)", "설명": "양수 = PD↑(불황?), 음수 = PD↓(호황?)"},
{"항목": "Zt 부호 (논문)", "설명": "양수 = 호황(PD↓), 음수 = 불황(PD↑) — 부호 반전 확인 필요!"},
]
pd.DataFrame(note_rows).to_excel(writer, sheet_name="NOTES", index=False)
print(f" Sheet: NOTES")
print(f"\n 완료: {output}")
if __name__ == "__main__":
main()