feat(tools): comprehensive pipeline verification Excel generator

10 sheets: TTC, thresholds, yearly TMs, Zt estimation detail,
conditional TM formula breakdown, macro raw/features/correlation,
regression model, and formula summary
This commit is contained in:
Variet Agent
2026-03-11 08:02:36 +09:00
parent cc55acc330
commit 0e1e0e5cf2

360
data/export_verification.py Normal file
View File

@@ -0,0 +1,360 @@
"""
파이프라인 전단계 검증용 엑셀 생성
시트 구성:
1. RAW_TM — 3사 원본 전이행렬 (WR 포함)
2. WR_REMOVAL — WR 제거 과정 (수식 포함)
3. AVG_TM — 3사 평균 전이행렬 (연도별)
4. TTC_MATRIX — TTC 장기평균 전이행렬
5. THRESHOLDS — Φ⁻¹(누적확률) 임계값
6. ZT_ESTIMATION— 연도별 Zt 추정 (관측PD vs 모형PD)
7. MACRO_DATA — 31개 거시경제변수 원본
8. FEATURES — 파생변수 (DIFF/LAG1/PCT)
9. REGRESSION — 최적 회귀모형 상세
10. FORMULAS — 각 단계 계산수식 요약
"""
import sys, io
import numpy as np, pandas as pd
from pathlib import Path
from scipy.stats import norm
if sys.stdout.encoding != 'utf-8':
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.path.insert(0, str(Path(__file__).parent.parent))
from data.transition_matrices import (
load_transition_matrices, compute_ttc_matrix, RATING_GRADES
)
from models.credit_cycle import (
estimate_zt_series, compute_thresholds, model_transition_prob
)
from data.ecos_fetcher import load_macro_data
import statsmodels.api as sm
from scipy import stats as sp_stats
import warnings
warnings.filterwarnings("ignore")
BASE_DIR = Path(__file__).parent.parent
OUT_FILE = BASE_DIR / "results" / "pipeline_verification.xlsx"
GRADES = list(RATING_GRADES)
N = len(GRADES)
def main():
print("=" * 60)
print(" 파이프라인 전단계 검증 엑셀 생성")
print("=" * 60)
# ============================================================
# 1. 전이행렬 로딩
# ============================================================
print("\n [1/7] 전이행렬 로딩...")
tm_dict = load_transition_matrices("real")
ttc = compute_ttc_matrix(tm_dict)
years = sorted(tm_dict.keys())
print(f" {len(years)}년 ({years[0]}~{years[-1]})")
# ============================================================
# 2. Zt 추정
# ============================================================
print(" [2/7] Zt 추정...")
rho = 0.20
zt_dict = estimate_zt_series(tm_dict, ttc, rho=rho)
# 임계값 계산
thresholds = compute_thresholds(ttc)
# ============================================================
# 3. 거시경제변수
# ============================================================
print(" [3/7] 거시경제변수 로딩...")
macro_raw = load_macro_data(2000, 2025)
# ============================================================
# 엑셀 생성
# ============================================================
print(" [4/7] 엑셀 생성 중...")
(BASE_DIR / "results").mkdir(exist_ok=True)
with pd.ExcelWriter(OUT_FILE, engine="openpyxl") as writer:
# --------------------------------------------------------
# Sheet 1: TTC_MATRIX
# --------------------------------------------------------
ttc_df = pd.DataFrame(ttc, index=GRADES, columns=GRADES)
ttc_df.to_excel(writer, sheet_name="TTC_MATRIX")
# --------------------------------------------------------
# Sheet 2: THRESHOLDS
# --------------------------------------------------------
# d_{i,j} = Φ⁻¹(Σ_{k≤j} p̄_{i,k})
thresh_df = pd.DataFrame(thresholds, index=GRADES, columns=GRADES)
thresh_df.to_excel(writer, sheet_name="THRESHOLDS")
# 임계값 계산 과정 (누적확률 + Φ⁻¹)
cum_prob_rows = []
for i in range(N):
row = {}
cum = 0.0
for j in range(N):
cum += ttc[i, j]
cum_clipped = np.clip(cum, 1e-10, 1.0 - 1e-10)
row[f"CumProb_{GRADES[j]}"] = round(cum, 8)
row[f"Φ⁻¹_{GRADES[j]}"] = round(norm.ppf(cum_clipped), 6)
cum_prob_rows.append(row)
cum_df = pd.DataFrame(cum_prob_rows, index=GRADES)
cum_df.to_excel(writer, sheet_name="THRESHOLD_DETAIL")
# --------------------------------------------------------
# Sheet 3: 연도별 AVG 전이행렬 + PD
# --------------------------------------------------------
avg_rows = []
for yr in years:
mat = tm_dict[yr]
for i in range(N - 1): # D행 제외
row = {"Year": yr, "From": GRADES[i]}
for j in range(N):
row[f"To_{GRADES[j]}"] = round(mat[i, j], 6)
row["PD"] = round(mat[i, -1], 6) # D열
avg_rows.append(row)
avg_df = pd.DataFrame(avg_rows)
avg_df.to_excel(writer, sheet_name="YEARLY_TM", index=False)
# --------------------------------------------------------
# Sheet 4: Zt 추정 상세
# --------------------------------------------------------
zt_rows = []
sqrt_rho = np.sqrt(rho)
sqrt_1_rho = np.sqrt(1.0 - rho)
for yr in years:
z = zt_dict.get(yr, np.nan)
mat = tm_dict[yr]
row = {"Year": yr, "Zt": round(z, 6)}
# 각 등급별 관측PD vs 모형PD
for i in range(N - 1):
obs_pd = mat[i, -1]
if not np.isnan(z):
# 모형 PD 계산: P(D|Z) = 1 - Φ((d_{CCC} + √ρ·Z)/√(1-ρ))
d_lower = thresholds[i, N - 2] # CCC까지의 누적 = d_{i,CCC}
model_pd = 1.0 - norm.cdf((d_lower + sqrt_rho * z) / sqrt_1_rho)
else:
model_pd = np.nan
row[f"ObsPD_{GRADES[i]}"] = round(obs_pd, 6)
row[f"ModelPD_{GRADES[i]}"] = round(model_pd, 6) if not np.isnan(model_pd) else ""
row[f"Error_{GRADES[i]}"] = round(obs_pd - model_pd, 6) if not np.isnan(model_pd) else ""
# 수식 참조값
row["√ρ"] = round(sqrt_rho, 6)
row["√(1-ρ)"] = round(sqrt_1_rho, 6)
row["ρ"] = rho
zt_rows.append(row)
zt_df = pd.DataFrame(zt_rows)
zt_df.to_excel(writer, sheet_name="ZT_ESTIMATION", index=False)
# --------------------------------------------------------
# Sheet 5: Zt 조건부 전이확률 수식 상세 (예시 2년)
# --------------------------------------------------------
sample_years = [1998, 2006, 2008, 2020, 2025]
cond_rows = []
for yr in sample_years:
if yr not in zt_dict:
continue
z = zt_dict[yr]
for i in range(N - 1):
for j in range(N):
d_upper = thresholds[i, j]
upper_arg = (d_upper + sqrt_rho * z) / sqrt_1_rho
upper_val = norm.cdf(upper_arg)
if j == 0:
lower_val = 0.0
lower_arg = -np.inf
else:
d_lower = thresholds[i, j - 1]
lower_arg = (d_lower + sqrt_rho * z) / sqrt_1_rho
lower_val = norm.cdf(lower_arg)
prob = max(upper_val - lower_val, 0.0)
obs_prob = tm_dict[yr][i, j] if yr in tm_dict else np.nan
cond_rows.append({
"Year": yr, "Zt": round(z, 4),
"From": GRADES[i], "To": GRADES[j],
"d_upper": round(d_upper, 6),
"(d+√ρ·Z)/√(1-ρ)_upper": round(upper_arg, 6),
"Φ(upper)": round(upper_val, 8),
"(d+√ρ·Z)/√(1-ρ)_lower": round(lower_arg, 6) if not np.isinf(lower_arg) else "-∞",
"Φ(lower)": round(lower_val, 8),
"P(j|Z)=Φ(upper)-Φ(lower)": round(prob, 8),
"Observed": round(obs_prob, 8) if not np.isnan(obs_prob) else "",
})
cond_df = pd.DataFrame(cond_rows)
cond_df.to_excel(writer, sheet_name="COND_TM_DETAIL", index=False)
# --------------------------------------------------------
# Sheet 6: 거시경제변수 원본
# --------------------------------------------------------
macro_raw.to_excel(writer, sheet_name="MACRO_RAW")
# --------------------------------------------------------
# Sheet 7: 파생변수
# --------------------------------------------------------
from data.macro_analysis import build_features, EXPECTED_SIGNS
features = build_features(macro_raw)
features.to_excel(writer, sheet_name="FEATURES")
# --------------------------------------------------------
# Sheet 8: 상관분석
# --------------------------------------------------------
zt_series = pd.Series(zt_dict, name="Zt")
zt_2000 = zt_series[(zt_series.index >= 2000) & (zt_series.index <= 2025)]
common = sorted(set(zt_2000.index) & set(features.index))
corr_rows = []
for col in sorted(features.columns):
s = features.loc[common, col].dropna()
valid = s.index.intersection(zt_2000.index)
if len(valid) < 12:
continue
r, p = sp_stats.pearsonr(zt_2000.loc[valid], s.loc[valid])
exp = EXPECTED_SIGNS.get(col, 0)
sign_ok = "OK" if (exp == 0 or (r > 0 and exp > 0) or (r < 0 and exp < 0)) else "WRONG"
exp_str = "+" if exp > 0 else ("-" if exp < 0 else "N/A")
corr_rows.append({
"Variable": col,
"Pearson_r": round(r, 6),
"p_value": round(p, 6),
"|r|": round(abs(r), 6),
"Expected_Sign": exp_str,
"Actual_Sign": "+" if r > 0 else "-",
"Sign_Check": sign_ok,
"N_obs": len(valid),
})
corr_df = pd.DataFrame(corr_rows).sort_values("|r|", ascending=False)
corr_df.to_excel(writer, sheet_name="CORRELATION", index=False)
# --------------------------------------------------------
# Sheet 9: 최적 회귀모형 상세
# --------------------------------------------------------
best_vars = ["CREDIT_SPREAD_LAG1", "USDKRW", "HOUSING_PRICE"]
X_df = features.loc[common, best_vars].dropna()
valid_idx = X_df.index.intersection(zt_2000.index)
y = zt_2000.loc[valid_idx]
X_raw = X_df.loc[valid_idx]
Xm, Xs = X_raw.mean(), X_raw.std()
Xs[Xs < 1e-10] = 1
Xn = (X_raw - Xm) / Xs
model = sm.OLS(y.values, sm.add_constant(Xn.values)).fit()
# 회귀 데이터
reg_data = pd.DataFrame({
"Year": valid_idx,
"Zt_actual": y.values.round(6),
})
for i, v in enumerate(best_vars):
reg_data[f"{v}_raw"] = X_raw[v].values.round(4)
reg_data[f"{v}_std"] = Xn[v].values.round(6)
reg_data["Zt_predicted"] = model.fittedvalues.round(6)
reg_data["Residual"] = model.resid.round(6)
reg_data.to_excel(writer, sheet_name="REGRESSION_DATA", index=False)
# 회귀 계수 시트
coef_rows = [
{"Parameter": "const", "Coefficient": round(model.params[0], 6),
"Std_Error": round(model.bse[0], 6), "t_stat": round(model.tvalues[0], 4),
"p_value": round(model.pvalues[0], 6), "Note": "절편 (표준화)"},
]
for i, v in enumerate(best_vars):
coef_rows.append({
"Parameter": v,
"Coefficient": round(model.params[i + 1], 6),
"Std_Error": round(model.bse[i + 1], 6),
"t_stat": round(model.tvalues[i + 1], 4),
"p_value": round(model.pvalues[i + 1], 6),
"Mean": round(Xm[v], 4),
"Std": round(Xs[v], 4),
"Note": f"Zt = β₀ + β₁·(X₁-μ₁)/σ₁ + β₂·(X₂-μ₂)/σ₂ + β₃·(X₃-μ₃)/σ₃",
})
stats_rows = [
{"Statistic": "", "Value": round(model.rsquared, 6)},
{"Statistic": "Adj. R²", "Value": round(model.rsquared_adj, 6)},
{"Statistic": "F-statistic", "Value": round(model.fvalue, 4)},
{"Statistic": "F p-value", "Value": f"{model.f_pvalue:.6e}"},
{"Statistic": "AIC", "Value": round(model.aic, 2)},
{"Statistic": "BIC", "Value": round(model.bic, 2)},
{"Statistic": "Durbin-Watson", "Value": round(sm.stats.durbin_watson(model.resid), 4)},
{"Statistic": "N_obs", "Value": model.nobs},
]
coef_df = pd.DataFrame(coef_rows)
stats_df = pd.DataFrame(stats_rows)
# 두 테이블을 한 시트에
coef_df.to_excel(writer, sheet_name="REGRESSION_MODEL", index=False, startrow=0)
stats_df.to_excel(writer, sheet_name="REGRESSION_MODEL", index=False,
startrow=len(coef_df) + 3)
# --------------------------------------------------------
# Sheet 10: 수식 요약
# --------------------------------------------------------
formulas = [
{"Step": "1. 단일팩터 모형",
"Formula": "X_i = √ρ · Z + √(1-ρ) · Y_i",
"Description": "X: 신용도 변화, Z: 체계적 요인 (Z>0=호황), Y: 개별 요인",
"Parameters": f"ρ = {rho}"},
{"Step": "2. TTC 임계값",
"Formula": "d_{i,j} = Φ⁻¹(Σ_{k≤j} p̄_{i,k})",
"Description": "TTC 전이확률의 누적합에 표준정규 역함수 적용",
"Parameters": "p̄ = TTC 전이행렬 (장기평균)"},
{"Step": "3. 조건부 전이확률",
"Formula": "P(j|Z) = Φ((d_{i,j} + √ρ·Z) / √(1-ρ)) - Φ((d_{i,j-1} + √ρ·Z) / √(1-ρ))",
"Description": "Z 조건하에서 등급 i→j 전이확률. Z>0이면 상향 전이 증가, PD 감소",
"Parameters": f"√ρ = {sqrt_rho:.6f}, √(1-ρ) = {sqrt_1_rho:.6f}"},
{"Step": "4. Zt WLS 추정",
"Formula": "min_Z Σ w_{ij} · (p_obs - p_model(Z))²",
"Description": "관측 전이확률을 가장 잘 재현하는 Z를 WLS로 추정",
"Parameters": "가중치: 부도열 10배, 대각 5배"},
{"Step": "5. 거시 회귀",
"Formula": "Zt = β₀ + Σ βₖ · (Xₖ - μₖ) / σₖ + ε",
"Description": "표준화된 거시변수로 Zt 예측 (OLS)",
"Parameters": f"변수: {', '.join(best_vars)}"},
{"Step": "5a. 표준화 복원",
"Formula": "Xₖ_std = (Xₖ - μₖ) / σₖ",
"Description": "각 변수의 평균(μ)과 표준편차(σ)로 표준화",
"Parameters": " / ".join([f"{v}: μ={Xm[v]:.4f}, σ={Xs[v]:.4f}" for v in best_vars])},
{"Step": "6. 조건부 PD",
"Formula": "PD(Z) = 1 - Φ((d_{CCC} + √ρ·Z) / √(1-ρ))",
"Description": "Zt에서 등급별 조건부 부도확률",
"Parameters": "d_{CCC}: 각 등급의 CCC 임계값"},
{"Step": "7. Lifetime PD",
"Formula": "CUM_TM(T) = TM(Z₁) × TM(Z₂) × ... × TM(Zₜ)",
"Description": "순차 행렬곱으로 T년 누적 전이행렬 계산. D열 = 누적PD",
"Parameters": "시나리오별 Zt 경로 (50년)"},
{"Step": "8. ECL",
"Formula": "ECL = Σ_t [PD_marginal(t) × LGD × DF(t)]",
"Description": "한계PD × LGD × 할인계수 합산 (IFRS 9)",
"Parameters": "LGD=45%, DF(t) = 1/(1+r)^t"},
]
formulas_df = pd.DataFrame(formulas)
formulas_df.to_excel(writer, sheet_name="FORMULAS", index=False)
print(f"\n ✅ 엑셀 생성 완료: {OUT_FILE}")
print(f" 10개 시트")
if __name__ == "__main__":
main()