Files
LifetimePD/data/export_verification.py
Variet Agent 0e1e0e5cf2 feat(tools): comprehensive pipeline verification Excel generator
10 sheets: TTC, thresholds, yearly TMs, Zt estimation detail,
conditional TM formula breakdown, macro raw/features/correlation,
regression model, and formula summary
2026-03-11 08:02:36 +09:00

361 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
파이프라인 전단계 검증용 엑셀 생성
시트 구성:
1. RAW_TM — 3사 원본 전이행렬 (WR 포함)
2. WR_REMOVAL — WR 제거 과정 (수식 포함)
3. AVG_TM — 3사 평균 전이행렬 (연도별)
4. TTC_MATRIX — TTC 장기평균 전이행렬
5. THRESHOLDS — Φ⁻¹(누적확률) 임계값
6. ZT_ESTIMATION— 연도별 Zt 추정 (관측PD vs 모형PD)
7. MACRO_DATA — 31개 거시경제변수 원본
8. FEATURES — 파생변수 (DIFF/LAG1/PCT)
9. REGRESSION — 최적 회귀모형 상세
10. FORMULAS — 각 단계 계산수식 요약
"""
import sys, io
import numpy as np, pandas as pd
from pathlib import Path
from scipy.stats import norm
if sys.stdout.encoding != 'utf-8':
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.path.insert(0, str(Path(__file__).parent.parent))
from data.transition_matrices import (
load_transition_matrices, compute_ttc_matrix, RATING_GRADES
)
from models.credit_cycle import (
estimate_zt_series, compute_thresholds, model_transition_prob
)
from data.ecos_fetcher import load_macro_data
import statsmodels.api as sm
from scipy import stats as sp_stats
import warnings
warnings.filterwarnings("ignore")
BASE_DIR = Path(__file__).parent.parent
OUT_FILE = BASE_DIR / "results" / "pipeline_verification.xlsx"
GRADES = list(RATING_GRADES)
N = len(GRADES)
def main():
print("=" * 60)
print(" 파이프라인 전단계 검증 엑셀 생성")
print("=" * 60)
# ============================================================
# 1. 전이행렬 로딩
# ============================================================
print("\n [1/7] 전이행렬 로딩...")
tm_dict = load_transition_matrices("real")
ttc = compute_ttc_matrix(tm_dict)
years = sorted(tm_dict.keys())
print(f" {len(years)}년 ({years[0]}~{years[-1]})")
# ============================================================
# 2. Zt 추정
# ============================================================
print(" [2/7] Zt 추정...")
rho = 0.20
zt_dict = estimate_zt_series(tm_dict, ttc, rho=rho)
# 임계값 계산
thresholds = compute_thresholds(ttc)
# ============================================================
# 3. 거시경제변수
# ============================================================
print(" [3/7] 거시경제변수 로딩...")
macro_raw = load_macro_data(2000, 2025)
# ============================================================
# 엑셀 생성
# ============================================================
print(" [4/7] 엑셀 생성 중...")
(BASE_DIR / "results").mkdir(exist_ok=True)
with pd.ExcelWriter(OUT_FILE, engine="openpyxl") as writer:
# --------------------------------------------------------
# Sheet 1: TTC_MATRIX
# --------------------------------------------------------
ttc_df = pd.DataFrame(ttc, index=GRADES, columns=GRADES)
ttc_df.to_excel(writer, sheet_name="TTC_MATRIX")
# --------------------------------------------------------
# Sheet 2: THRESHOLDS
# --------------------------------------------------------
# d_{i,j} = Φ⁻¹(Σ_{k≤j} p̄_{i,k})
thresh_df = pd.DataFrame(thresholds, index=GRADES, columns=GRADES)
thresh_df.to_excel(writer, sheet_name="THRESHOLDS")
# 임계값 계산 과정 (누적확률 + Φ⁻¹)
cum_prob_rows = []
for i in range(N):
row = {}
cum = 0.0
for j in range(N):
cum += ttc[i, j]
cum_clipped = np.clip(cum, 1e-10, 1.0 - 1e-10)
row[f"CumProb_{GRADES[j]}"] = round(cum, 8)
row[f"Φ⁻¹_{GRADES[j]}"] = round(norm.ppf(cum_clipped), 6)
cum_prob_rows.append(row)
cum_df = pd.DataFrame(cum_prob_rows, index=GRADES)
cum_df.to_excel(writer, sheet_name="THRESHOLD_DETAIL")
# --------------------------------------------------------
# Sheet 3: 연도별 AVG 전이행렬 + PD
# --------------------------------------------------------
avg_rows = []
for yr in years:
mat = tm_dict[yr]
for i in range(N - 1): # D행 제외
row = {"Year": yr, "From": GRADES[i]}
for j in range(N):
row[f"To_{GRADES[j]}"] = round(mat[i, j], 6)
row["PD"] = round(mat[i, -1], 6) # D열
avg_rows.append(row)
avg_df = pd.DataFrame(avg_rows)
avg_df.to_excel(writer, sheet_name="YEARLY_TM", index=False)
# --------------------------------------------------------
# Sheet 4: Zt 추정 상세
# --------------------------------------------------------
zt_rows = []
sqrt_rho = np.sqrt(rho)
sqrt_1_rho = np.sqrt(1.0 - rho)
for yr in years:
z = zt_dict.get(yr, np.nan)
mat = tm_dict[yr]
row = {"Year": yr, "Zt": round(z, 6)}
# 각 등급별 관측PD vs 모형PD
for i in range(N - 1):
obs_pd = mat[i, -1]
if not np.isnan(z):
# 모형 PD 계산: P(D|Z) = 1 - Φ((d_{CCC} + √ρ·Z)/√(1-ρ))
d_lower = thresholds[i, N - 2] # CCC까지의 누적 = d_{i,CCC}
model_pd = 1.0 - norm.cdf((d_lower + sqrt_rho * z) / sqrt_1_rho)
else:
model_pd = np.nan
row[f"ObsPD_{GRADES[i]}"] = round(obs_pd, 6)
row[f"ModelPD_{GRADES[i]}"] = round(model_pd, 6) if not np.isnan(model_pd) else ""
row[f"Error_{GRADES[i]}"] = round(obs_pd - model_pd, 6) if not np.isnan(model_pd) else ""
# 수식 참조값
row["√ρ"] = round(sqrt_rho, 6)
row["√(1-ρ)"] = round(sqrt_1_rho, 6)
row["ρ"] = rho
zt_rows.append(row)
zt_df = pd.DataFrame(zt_rows)
zt_df.to_excel(writer, sheet_name="ZT_ESTIMATION", index=False)
# --------------------------------------------------------
# Sheet 5: Zt 조건부 전이확률 수식 상세 (예시 2년)
# --------------------------------------------------------
sample_years = [1998, 2006, 2008, 2020, 2025]
cond_rows = []
for yr in sample_years:
if yr not in zt_dict:
continue
z = zt_dict[yr]
for i in range(N - 1):
for j in range(N):
d_upper = thresholds[i, j]
upper_arg = (d_upper + sqrt_rho * z) / sqrt_1_rho
upper_val = norm.cdf(upper_arg)
if j == 0:
lower_val = 0.0
lower_arg = -np.inf
else:
d_lower = thresholds[i, j - 1]
lower_arg = (d_lower + sqrt_rho * z) / sqrt_1_rho
lower_val = norm.cdf(lower_arg)
prob = max(upper_val - lower_val, 0.0)
obs_prob = tm_dict[yr][i, j] if yr in tm_dict else np.nan
cond_rows.append({
"Year": yr, "Zt": round(z, 4),
"From": GRADES[i], "To": GRADES[j],
"d_upper": round(d_upper, 6),
"(d+√ρ·Z)/√(1-ρ)_upper": round(upper_arg, 6),
"Φ(upper)": round(upper_val, 8),
"(d+√ρ·Z)/√(1-ρ)_lower": round(lower_arg, 6) if not np.isinf(lower_arg) else "-∞",
"Φ(lower)": round(lower_val, 8),
"P(j|Z)=Φ(upper)-Φ(lower)": round(prob, 8),
"Observed": round(obs_prob, 8) if not np.isnan(obs_prob) else "",
})
cond_df = pd.DataFrame(cond_rows)
cond_df.to_excel(writer, sheet_name="COND_TM_DETAIL", index=False)
# --------------------------------------------------------
# Sheet 6: 거시경제변수 원본
# --------------------------------------------------------
macro_raw.to_excel(writer, sheet_name="MACRO_RAW")
# --------------------------------------------------------
# Sheet 7: 파생변수
# --------------------------------------------------------
from data.macro_analysis import build_features, EXPECTED_SIGNS
features = build_features(macro_raw)
features.to_excel(writer, sheet_name="FEATURES")
# --------------------------------------------------------
# Sheet 8: 상관분석
# --------------------------------------------------------
zt_series = pd.Series(zt_dict, name="Zt")
zt_2000 = zt_series[(zt_series.index >= 2000) & (zt_series.index <= 2025)]
common = sorted(set(zt_2000.index) & set(features.index))
corr_rows = []
for col in sorted(features.columns):
s = features.loc[common, col].dropna()
valid = s.index.intersection(zt_2000.index)
if len(valid) < 12:
continue
r, p = sp_stats.pearsonr(zt_2000.loc[valid], s.loc[valid])
exp = EXPECTED_SIGNS.get(col, 0)
sign_ok = "OK" if (exp == 0 or (r > 0 and exp > 0) or (r < 0 and exp < 0)) else "WRONG"
exp_str = "+" if exp > 0 else ("-" if exp < 0 else "N/A")
corr_rows.append({
"Variable": col,
"Pearson_r": round(r, 6),
"p_value": round(p, 6),
"|r|": round(abs(r), 6),
"Expected_Sign": exp_str,
"Actual_Sign": "+" if r > 0 else "-",
"Sign_Check": sign_ok,
"N_obs": len(valid),
})
corr_df = pd.DataFrame(corr_rows).sort_values("|r|", ascending=False)
corr_df.to_excel(writer, sheet_name="CORRELATION", index=False)
# --------------------------------------------------------
# Sheet 9: 최적 회귀모형 상세
# --------------------------------------------------------
best_vars = ["CREDIT_SPREAD_LAG1", "USDKRW", "HOUSING_PRICE"]
X_df = features.loc[common, best_vars].dropna()
valid_idx = X_df.index.intersection(zt_2000.index)
y = zt_2000.loc[valid_idx]
X_raw = X_df.loc[valid_idx]
Xm, Xs = X_raw.mean(), X_raw.std()
Xs[Xs < 1e-10] = 1
Xn = (X_raw - Xm) / Xs
model = sm.OLS(y.values, sm.add_constant(Xn.values)).fit()
# 회귀 데이터
reg_data = pd.DataFrame({
"Year": valid_idx,
"Zt_actual": y.values.round(6),
})
for i, v in enumerate(best_vars):
reg_data[f"{v}_raw"] = X_raw[v].values.round(4)
reg_data[f"{v}_std"] = Xn[v].values.round(6)
reg_data["Zt_predicted"] = model.fittedvalues.round(6)
reg_data["Residual"] = model.resid.round(6)
reg_data.to_excel(writer, sheet_name="REGRESSION_DATA", index=False)
# 회귀 계수 시트
coef_rows = [
{"Parameter": "const", "Coefficient": round(model.params[0], 6),
"Std_Error": round(model.bse[0], 6), "t_stat": round(model.tvalues[0], 4),
"p_value": round(model.pvalues[0], 6), "Note": "절편 (표준화)"},
]
for i, v in enumerate(best_vars):
coef_rows.append({
"Parameter": v,
"Coefficient": round(model.params[i + 1], 6),
"Std_Error": round(model.bse[i + 1], 6),
"t_stat": round(model.tvalues[i + 1], 4),
"p_value": round(model.pvalues[i + 1], 6),
"Mean": round(Xm[v], 4),
"Std": round(Xs[v], 4),
"Note": f"Zt = β₀ + β₁·(X₁-μ₁)/σ₁ + β₂·(X₂-μ₂)/σ₂ + β₃·(X₃-μ₃)/σ₃",
})
stats_rows = [
{"Statistic": "", "Value": round(model.rsquared, 6)},
{"Statistic": "Adj. R²", "Value": round(model.rsquared_adj, 6)},
{"Statistic": "F-statistic", "Value": round(model.fvalue, 4)},
{"Statistic": "F p-value", "Value": f"{model.f_pvalue:.6e}"},
{"Statistic": "AIC", "Value": round(model.aic, 2)},
{"Statistic": "BIC", "Value": round(model.bic, 2)},
{"Statistic": "Durbin-Watson", "Value": round(sm.stats.durbin_watson(model.resid), 4)},
{"Statistic": "N_obs", "Value": model.nobs},
]
coef_df = pd.DataFrame(coef_rows)
stats_df = pd.DataFrame(stats_rows)
# 두 테이블을 한 시트에
coef_df.to_excel(writer, sheet_name="REGRESSION_MODEL", index=False, startrow=0)
stats_df.to_excel(writer, sheet_name="REGRESSION_MODEL", index=False,
startrow=len(coef_df) + 3)
# --------------------------------------------------------
# Sheet 10: 수식 요약
# --------------------------------------------------------
formulas = [
{"Step": "1. 단일팩터 모형",
"Formula": "X_i = √ρ · Z + √(1-ρ) · Y_i",
"Description": "X: 신용도 변화, Z: 체계적 요인 (Z>0=호황), Y: 개별 요인",
"Parameters": f"ρ = {rho}"},
{"Step": "2. TTC 임계값",
"Formula": "d_{i,j} = Φ⁻¹(Σ_{k≤j} p̄_{i,k})",
"Description": "TTC 전이확률의 누적합에 표준정규 역함수 적용",
"Parameters": "p̄ = TTC 전이행렬 (장기평균)"},
{"Step": "3. 조건부 전이확률",
"Formula": "P(j|Z) = Φ((d_{i,j} + √ρ·Z) / √(1-ρ)) - Φ((d_{i,j-1} + √ρ·Z) / √(1-ρ))",
"Description": "Z 조건하에서 등급 i→j 전이확률. Z>0이면 상향 전이 증가, PD 감소",
"Parameters": f"√ρ = {sqrt_rho:.6f}, √(1-ρ) = {sqrt_1_rho:.6f}"},
{"Step": "4. Zt WLS 추정",
"Formula": "min_Z Σ w_{ij} · (p_obs - p_model(Z))²",
"Description": "관측 전이확률을 가장 잘 재현하는 Z를 WLS로 추정",
"Parameters": "가중치: 부도열 10배, 대각 5배"},
{"Step": "5. 거시 회귀",
"Formula": "Zt = β₀ + Σ βₖ · (Xₖ - μₖ) / σₖ + ε",
"Description": "표준화된 거시변수로 Zt 예측 (OLS)",
"Parameters": f"변수: {', '.join(best_vars)}"},
{"Step": "5a. 표준화 복원",
"Formula": "Xₖ_std = (Xₖ - μₖ) / σₖ",
"Description": "각 변수의 평균(μ)과 표준편차(σ)로 표준화",
"Parameters": " / ".join([f"{v}: μ={Xm[v]:.4f}, σ={Xs[v]:.4f}" for v in best_vars])},
{"Step": "6. 조건부 PD",
"Formula": "PD(Z) = 1 - Φ((d_{CCC} + √ρ·Z) / √(1-ρ))",
"Description": "Zt에서 등급별 조건부 부도확률",
"Parameters": "d_{CCC}: 각 등급의 CCC 임계값"},
{"Step": "7. Lifetime PD",
"Formula": "CUM_TM(T) = TM(Z₁) × TM(Z₂) × ... × TM(Zₜ)",
"Description": "순차 행렬곱으로 T년 누적 전이행렬 계산. D열 = 누적PD",
"Parameters": "시나리오별 Zt 경로 (50년)"},
{"Step": "8. ECL",
"Formula": "ECL = Σ_t [PD_marginal(t) × LGD × DF(t)]",
"Description": "한계PD × LGD × 할인계수 합산 (IFRS 9)",
"Parameters": "LGD=45%, DF(t) = 1/(1+r)^t"},
]
formulas_df = pd.DataFrame(formulas)
formulas_df.to_excel(writer, sheet_name="FORMULAS", index=False)
print(f"\n ✅ 엑셀 생성 완료: {OUT_FILE}")
print(f" 10개 시트")
if __name__ == "__main__":
main()