""" 파이프라인 전단계 검증용 엑셀 생성 시트 구성: 1. RAW_TM — 3사 원본 전이행렬 (WR 포함) 2. WR_REMOVAL — WR 제거 과정 (수식 포함) 3. AVG_TM — 3사 평균 전이행렬 (연도별) 4. TTC_MATRIX — TTC 장기평균 전이행렬 5. THRESHOLDS — Φ⁻¹(누적확률) 임계값 6. ZT_ESTIMATION— 연도별 Zt 추정 (관측PD vs 모형PD) 7. MACRO_DATA — 31개 거시경제변수 원본 8. FEATURES — 파생변수 (DIFF/LAG1/PCT) 9. REGRESSION — 최적 회귀모형 상세 10. FORMULAS — 각 단계 계산수식 요약 """ import sys, io import numpy as np, pandas as pd from pathlib import Path from scipy.stats import norm if sys.stdout.encoding != 'utf-8': sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') sys.path.insert(0, str(Path(__file__).parent.parent)) from data.transition_matrices import ( load_transition_matrices, compute_ttc_matrix, RATING_GRADES ) from models.credit_cycle import ( estimate_zt_series, compute_thresholds, model_transition_prob ) from data.ecos_fetcher import load_macro_data import statsmodels.api as sm from scipy import stats as sp_stats import warnings warnings.filterwarnings("ignore") BASE_DIR = Path(__file__).parent.parent OUT_FILE = BASE_DIR / "results" / "pipeline_verification.xlsx" GRADES = list(RATING_GRADES) N = len(GRADES) def main(): print("=" * 60) print(" 파이프라인 전단계 검증 엑셀 생성") print("=" * 60) # ============================================================ # 1. 전이행렬 로딩 # ============================================================ print("\n [1/7] 전이행렬 로딩...") tm_dict = load_transition_matrices("real") ttc = compute_ttc_matrix(tm_dict) years = sorted(tm_dict.keys()) print(f" {len(years)}년 ({years[0]}~{years[-1]})") # ============================================================ # 2. Zt 추정 # ============================================================ print(" [2/7] Zt 추정...") rho = 0.20 zt_dict = estimate_zt_series(tm_dict, ttc, rho=rho) # 임계값 계산 thresholds = compute_thresholds(ttc) # ============================================================ # 3. 거시경제변수 # ============================================================ print(" [3/7] 거시경제변수 로딩...") macro_raw = load_macro_data(2000, 2025) # ============================================================ # 엑셀 생성 # ============================================================ print(" [4/7] 엑셀 생성 중...") (BASE_DIR / "results").mkdir(exist_ok=True) with pd.ExcelWriter(OUT_FILE, engine="openpyxl") as writer: # -------------------------------------------------------- # Sheet 1: TTC_MATRIX # -------------------------------------------------------- ttc_df = pd.DataFrame(ttc, index=GRADES, columns=GRADES) ttc_df.to_excel(writer, sheet_name="TTC_MATRIX") # -------------------------------------------------------- # Sheet 2: THRESHOLDS # -------------------------------------------------------- # d_{i,j} = Φ⁻¹(Σ_{k≤j} p̄_{i,k}) thresh_df = pd.DataFrame(thresholds, index=GRADES, columns=GRADES) thresh_df.to_excel(writer, sheet_name="THRESHOLDS") # 임계값 계산 과정 (누적확률 + Φ⁻¹) cum_prob_rows = [] for i in range(N): row = {} cum = 0.0 for j in range(N): cum += ttc[i, j] cum_clipped = np.clip(cum, 1e-10, 1.0 - 1e-10) row[f"CumProb_{GRADES[j]}"] = round(cum, 8) row[f"Φ⁻¹_{GRADES[j]}"] = round(norm.ppf(cum_clipped), 6) cum_prob_rows.append(row) cum_df = pd.DataFrame(cum_prob_rows, index=GRADES) cum_df.to_excel(writer, sheet_name="THRESHOLD_DETAIL") # -------------------------------------------------------- # Sheet 3: 연도별 AVG 전이행렬 + PD # -------------------------------------------------------- avg_rows = [] for yr in years: mat = tm_dict[yr] for i in range(N - 1): # D행 제외 row = {"Year": yr, "From": GRADES[i]} for j in range(N): row[f"To_{GRADES[j]}"] = round(mat[i, j], 6) row["PD"] = round(mat[i, -1], 6) # D열 avg_rows.append(row) avg_df = pd.DataFrame(avg_rows) avg_df.to_excel(writer, sheet_name="YEARLY_TM", index=False) # -------------------------------------------------------- # Sheet 4: Zt 추정 상세 # -------------------------------------------------------- zt_rows = [] sqrt_rho = np.sqrt(rho) sqrt_1_rho = np.sqrt(1.0 - rho) for yr in years: z = zt_dict.get(yr, np.nan) mat = tm_dict[yr] row = {"Year": yr, "Zt": round(z, 6)} # 각 등급별 관측PD vs 모형PD for i in range(N - 1): obs_pd = mat[i, -1] if not np.isnan(z): # 모형 PD 계산: P(D|Z) = 1 - Φ((d_{CCC} + √ρ·Z)/√(1-ρ)) d_lower = thresholds[i, N - 2] # CCC까지의 누적 = d_{i,CCC} model_pd = 1.0 - norm.cdf((d_lower + sqrt_rho * z) / sqrt_1_rho) else: model_pd = np.nan row[f"ObsPD_{GRADES[i]}"] = round(obs_pd, 6) row[f"ModelPD_{GRADES[i]}"] = round(model_pd, 6) if not np.isnan(model_pd) else "" row[f"Error_{GRADES[i]}"] = round(obs_pd - model_pd, 6) if not np.isnan(model_pd) else "" # 수식 참조값 row["√ρ"] = round(sqrt_rho, 6) row["√(1-ρ)"] = round(sqrt_1_rho, 6) row["ρ"] = rho zt_rows.append(row) zt_df = pd.DataFrame(zt_rows) zt_df.to_excel(writer, sheet_name="ZT_ESTIMATION", index=False) # -------------------------------------------------------- # Sheet 5: Zt 조건부 전이확률 수식 상세 (예시 2년) # -------------------------------------------------------- sample_years = [1998, 2006, 2008, 2020, 2025] cond_rows = [] for yr in sample_years: if yr not in zt_dict: continue z = zt_dict[yr] for i in range(N - 1): for j in range(N): d_upper = thresholds[i, j] upper_arg = (d_upper + sqrt_rho * z) / sqrt_1_rho upper_val = norm.cdf(upper_arg) if j == 0: lower_val = 0.0 lower_arg = -np.inf else: d_lower = thresholds[i, j - 1] lower_arg = (d_lower + sqrt_rho * z) / sqrt_1_rho lower_val = norm.cdf(lower_arg) prob = max(upper_val - lower_val, 0.0) obs_prob = tm_dict[yr][i, j] if yr in tm_dict else np.nan cond_rows.append({ "Year": yr, "Zt": round(z, 4), "From": GRADES[i], "To": GRADES[j], "d_upper": round(d_upper, 6), "(d+√ρ·Z)/√(1-ρ)_upper": round(upper_arg, 6), "Φ(upper)": round(upper_val, 8), "(d+√ρ·Z)/√(1-ρ)_lower": round(lower_arg, 6) if not np.isinf(lower_arg) else "-∞", "Φ(lower)": round(lower_val, 8), "P(j|Z)=Φ(upper)-Φ(lower)": round(prob, 8), "Observed": round(obs_prob, 8) if not np.isnan(obs_prob) else "", }) cond_df = pd.DataFrame(cond_rows) cond_df.to_excel(writer, sheet_name="COND_TM_DETAIL", index=False) # -------------------------------------------------------- # Sheet 6: 거시경제변수 원본 # -------------------------------------------------------- macro_raw.to_excel(writer, sheet_name="MACRO_RAW") # -------------------------------------------------------- # Sheet 7: 파생변수 # -------------------------------------------------------- from data.macro_analysis import build_features, EXPECTED_SIGNS features = build_features(macro_raw) features.to_excel(writer, sheet_name="FEATURES") # -------------------------------------------------------- # Sheet 8: 상관분석 # -------------------------------------------------------- zt_series = pd.Series(zt_dict, name="Zt") zt_2000 = zt_series[(zt_series.index >= 2000) & (zt_series.index <= 2025)] common = sorted(set(zt_2000.index) & set(features.index)) corr_rows = [] for col in sorted(features.columns): s = features.loc[common, col].dropna() valid = s.index.intersection(zt_2000.index) if len(valid) < 12: continue r, p = sp_stats.pearsonr(zt_2000.loc[valid], s.loc[valid]) exp = EXPECTED_SIGNS.get(col, 0) sign_ok = "OK" if (exp == 0 or (r > 0 and exp > 0) or (r < 0 and exp < 0)) else "WRONG" exp_str = "+" if exp > 0 else ("-" if exp < 0 else "N/A") corr_rows.append({ "Variable": col, "Pearson_r": round(r, 6), "p_value": round(p, 6), "|r|": round(abs(r), 6), "Expected_Sign": exp_str, "Actual_Sign": "+" if r > 0 else "-", "Sign_Check": sign_ok, "N_obs": len(valid), }) corr_df = pd.DataFrame(corr_rows).sort_values("|r|", ascending=False) corr_df.to_excel(writer, sheet_name="CORRELATION", index=False) # -------------------------------------------------------- # Sheet 9: 최적 회귀모형 상세 # -------------------------------------------------------- best_vars = ["CREDIT_SPREAD_LAG1", "USDKRW", "HOUSING_PRICE"] X_df = features.loc[common, best_vars].dropna() valid_idx = X_df.index.intersection(zt_2000.index) y = zt_2000.loc[valid_idx] X_raw = X_df.loc[valid_idx] Xm, Xs = X_raw.mean(), X_raw.std() Xs[Xs < 1e-10] = 1 Xn = (X_raw - Xm) / Xs model = sm.OLS(y.values, sm.add_constant(Xn.values)).fit() # 회귀 데이터 reg_data = pd.DataFrame({ "Year": valid_idx, "Zt_actual": y.values.round(6), }) for i, v in enumerate(best_vars): reg_data[f"{v}_raw"] = X_raw[v].values.round(4) reg_data[f"{v}_std"] = Xn[v].values.round(6) reg_data["Zt_predicted"] = model.fittedvalues.round(6) reg_data["Residual"] = model.resid.round(6) reg_data.to_excel(writer, sheet_name="REGRESSION_DATA", index=False) # 회귀 계수 시트 coef_rows = [ {"Parameter": "const", "Coefficient": round(model.params[0], 6), "Std_Error": round(model.bse[0], 6), "t_stat": round(model.tvalues[0], 4), "p_value": round(model.pvalues[0], 6), "Note": "절편 (표준화)"}, ] for i, v in enumerate(best_vars): coef_rows.append({ "Parameter": v, "Coefficient": round(model.params[i + 1], 6), "Std_Error": round(model.bse[i + 1], 6), "t_stat": round(model.tvalues[i + 1], 4), "p_value": round(model.pvalues[i + 1], 6), "Mean": round(Xm[v], 4), "Std": round(Xs[v], 4), "Note": f"Zt = β₀ + β₁·(X₁-μ₁)/σ₁ + β₂·(X₂-μ₂)/σ₂ + β₃·(X₃-μ₃)/σ₃", }) stats_rows = [ {"Statistic": "R²", "Value": round(model.rsquared, 6)}, {"Statistic": "Adj. R²", "Value": round(model.rsquared_adj, 6)}, {"Statistic": "F-statistic", "Value": round(model.fvalue, 4)}, {"Statistic": "F p-value", "Value": f"{model.f_pvalue:.6e}"}, {"Statistic": "AIC", "Value": round(model.aic, 2)}, {"Statistic": "BIC", "Value": round(model.bic, 2)}, {"Statistic": "Durbin-Watson", "Value": round(sm.stats.durbin_watson(model.resid), 4)}, {"Statistic": "N_obs", "Value": model.nobs}, ] coef_df = pd.DataFrame(coef_rows) stats_df = pd.DataFrame(stats_rows) # 두 테이블을 한 시트에 coef_df.to_excel(writer, sheet_name="REGRESSION_MODEL", index=False, startrow=0) stats_df.to_excel(writer, sheet_name="REGRESSION_MODEL", index=False, startrow=len(coef_df) + 3) # -------------------------------------------------------- # Sheet 10: 수식 요약 # -------------------------------------------------------- formulas = [ {"Step": "1. 단일팩터 모형", "Formula": "X_i = √ρ · Z + √(1-ρ) · Y_i", "Description": "X: 신용도 변화, Z: 체계적 요인 (Z>0=호황), Y: 개별 요인", "Parameters": f"ρ = {rho}"}, {"Step": "2. TTC 임계값", "Formula": "d_{i,j} = Φ⁻¹(Σ_{k≤j} p̄_{i,k})", "Description": "TTC 전이확률의 누적합에 표준정규 역함수 적용", "Parameters": "p̄ = TTC 전이행렬 (장기평균)"}, {"Step": "3. 조건부 전이확률", "Formula": "P(j|Z) = Φ((d_{i,j} + √ρ·Z) / √(1-ρ)) - Φ((d_{i,j-1} + √ρ·Z) / √(1-ρ))", "Description": "Z 조건하에서 등급 i→j 전이확률. Z>0이면 상향 전이 증가, PD 감소", "Parameters": f"√ρ = {sqrt_rho:.6f}, √(1-ρ) = {sqrt_1_rho:.6f}"}, {"Step": "4. Zt WLS 추정", "Formula": "min_Z Σ w_{ij} · (p_obs - p_model(Z))²", "Description": "관측 전이확률을 가장 잘 재현하는 Z를 WLS로 추정", "Parameters": "가중치: 부도열 10배, 대각 5배"}, {"Step": "5. 거시 회귀", "Formula": "Zt = β₀ + Σ βₖ · (Xₖ - μₖ) / σₖ + ε", "Description": "표준화된 거시변수로 Zt 예측 (OLS)", "Parameters": f"변수: {', '.join(best_vars)}"}, {"Step": "5a. 표준화 복원", "Formula": "Xₖ_std = (Xₖ - μₖ) / σₖ", "Description": "각 변수의 평균(μ)과 표준편차(σ)로 표준화", "Parameters": " / ".join([f"{v}: μ={Xm[v]:.4f}, σ={Xs[v]:.4f}" for v in best_vars])}, {"Step": "6. 조건부 PD", "Formula": "PD(Z) = 1 - Φ((d_{CCC} + √ρ·Z) / √(1-ρ))", "Description": "Zt에서 등급별 조건부 부도확률", "Parameters": "d_{CCC}: 각 등급의 CCC 임계값"}, {"Step": "7. Lifetime PD", "Formula": "CUM_TM(T) = TM(Z₁) × TM(Z₂) × ... × TM(Zₜ)", "Description": "순차 행렬곱으로 T년 누적 전이행렬 계산. D열 = 누적PD", "Parameters": "시나리오별 Zt 경로 (50년)"}, {"Step": "8. ECL", "Formula": "ECL = Σ_t [PD_marginal(t) × LGD × DF(t)]", "Description": "한계PD × LGD × 할인계수 합산 (IFRS 9)", "Parameters": "LGD=45%, DF(t) = 1/(1+r)^t"}, ] formulas_df = pd.DataFrame(formulas) formulas_df.to_excel(writer, sheet_name="FORMULAS", index=False) print(f"\n ✅ 엑셀 생성 완료: {OUT_FILE}") print(f" 10개 시트") if __name__ == "__main__": main()