Files
LifetimePD/reports/generate_report.py
Variet Agent d1ddf06e5d feat(model): KAP YTM PD floor integration, expanded 226-var search, ADF fix (AIC->BIC), Model#2 with 6-test diagnostics
- Replace hardcoded DEFAULT_PD_FLOORS with build_complete_pd_floor_table() (KAP bond YTM)
- Fix ADF test: autolag='AIC' -> 'BIC' for small sample (N=26) robustness
- Expand variable search: 40 -> 226 vars (log/diff/return/lag2), 1.9M combos
- Select Model #2: HOUSING_PRICE + CREDIT_SPREAD_LAG1 + CURRENT_ACCOUNT_R
- Add 6-test diagnostics table to AR1 sheet (ADF/LB/DW/BP/ARCH/Shapiro)
- Add Korean variable names for transformed variables
- Generate report v7 with full diagnostics
2026-03-12 00:06:23 +09:00

693 lines
32 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Lifetime PD 분석 보고서 생성기 (Excel)
사용법:
python reports/generate_report.py
python reports/generate_report.py --config config.yaml --output results/report.xlsx
다중 시트 구성:
1. 요약 (Summary) — 모형 개요, 핵심 파라미터, 결론
2. 원시데이터_전이행렬 — 연도별 전이행렬, TTC 행렬
3. 원시데이터_거시변수 — ECOS 거시경제변수 시계열
4. Zt_추정 — Belkin & Suchower Zt 역산 결과
5. AR1_모형 — AR(1)+Macro 회귀 결과, 계수, 진단
6. 시나리오_Z경로 — 3 시나리오별 50년 Zt 경로
7. Lifetime_PD — 시나리오별 누적 PD term structure
8. 가중평균_PD — 확률가중 최종 PD
9. 검증결과 — 통계 검정 결과
"""
import sys, io, os, argparse, math
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
if sys.stdout.encoding != 'utf-8':
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
import numpy as np
import pandas as pd
import yaml
from datetime import datetime
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils import get_column_letter
from data.transition_matrices import (
load_transition_matrices, compute_ttc_matrix,
RATING_GRADES, RATING_GRADES_8
)
from data.ccc_interpolator import expand_to_8x8
from data.macro_data import _fallback_macro_data, compute_derived_features
from data.ecos_fetcher import load_macro_data as load_ecos_macro
from models.credit_cycle import estimate_zt_series
from models.macro_model import build_macro_zt_model
from scenarios.scenario_engine import ScenarioEngine
from projection.lifetime_pd import LifetimePDEngine
from validation.statistical_tests import run_full_validation
# ================================================================
# 스타일 정의
# ================================================================
NAVY = "1F3864"
DARK_BLUE = "2B5797"
LIGHT_BLUE = "D6E4F0"
LIGHTER_BLUE = "EDF2F9"
WHITE = "FFFFFF"
BORDER_CLR = "B4C6E7"
TITLE_FONT = Font(name="맑은 고딕", size=16, bold=True, color=WHITE)
HEADER_FONT = Font(name="맑은 고딕", size=10, bold=True, color=WHITE)
SUBHEADER_FONT = Font(name="맑은 고딕", size=10, bold=True, color=NAVY)
BODY_FONT = Font(name="맑은 고딕", size=9)
BODY_BOLD = Font(name="맑은 고딕", size=9, bold=True)
SMALL_FONT = Font(name="맑은 고딕", size=8, color="666666")
NUM_FONT = Font(name="Consolas", size=9)
PASS_FONT = Font(name="맑은 고딕", size=9, bold=True, color="2E7D32")
FAIL_FONT = Font(name="맑은 고딕", size=9, bold=True, color="C62828")
TITLE_FILL = PatternFill("solid", fgColor=NAVY)
HEADER_FILL = PatternFill("solid", fgColor=DARK_BLUE)
SUBHEADER_FILL = PatternFill("solid", fgColor=LIGHT_BLUE)
ALT_FILL = PatternFill("solid", fgColor=LIGHTER_BLUE)
PASS_FILL = PatternFill("solid", fgColor="E2EFDA")
FAIL_FILL = PatternFill("solid", fgColor="FCE4EC")
THIN_BORDER = Border(
left=Side(style='thin', color=BORDER_CLR),
right=Side(style='thin', color=BORDER_CLR),
top=Side(style='thin', color=BORDER_CLR),
bottom=Side(style='thin', color=BORDER_CLR)
)
CENTER = Alignment(horizontal='center', vertical='center', wrap_text=True)
LEFT = Alignment(horizontal='left', vertical='center', wrap_text=True)
RIGHT = Alignment(horizontal='right', vertical='center')
NUM4 = '0.0000'
NUM2 = '0.00'
def _widths(ws, widths):
for i, w in enumerate(widths, 1):
ws.column_dimensions[get_column_letter(i)].width = w
def _title(ws, row, text, ncols=10):
ws.merge_cells(start_row=row, start_column=1, end_row=row, end_column=ncols)
c = ws.cell(row=row, column=1, value=text)
c.font = TITLE_FONT; c.fill = TITLE_FILL; c.alignment = LEFT
ws.row_dimensions[row].height = 35
def _section(ws, row, text, ncols=10):
ws.merge_cells(start_row=row, start_column=1, end_row=row, end_column=ncols)
c = ws.cell(row=row, column=1, value=text)
c.font = SUBHEADER_FONT; c.fill = SUBHEADER_FILL; c.alignment = LEFT
ws.row_dimensions[row].height = 22
return row + 1
def _headers(ws, row, hdrs):
for j, h in enumerate(hdrs, 1):
c = ws.cell(row=row, column=j, value=h)
c.font = HEADER_FONT; c.fill = HEADER_FILL; c.alignment = CENTER; c.border = THIN_BORDER
return row + 1
def _row(ws, row, vals, alt=False, fmt=None):
fill = ALT_FILL if alt else PatternFill()
for j, v in enumerate(vals, 1):
c = ws.cell(row=row, column=j, value=v)
c.font = NUM_FONT if isinstance(v, (int, float, np.floating, np.integer)) else BODY_FONT
c.fill = fill; c.border = THIN_BORDER
c.alignment = RIGHT if isinstance(v, (int, float, np.floating, np.integer)) else LEFT
if fmt and isinstance(v, (float, np.floating)):
c.number_format = fmt
return row + 1
def _kv(ws, row, key, value, col=2, fmt=None):
ws.cell(row=row, column=col, value=key).font = BODY_BOLD
cell = ws.cell(row=row, column=col+1, value=value)
cell.font = NUM_FONT if isinstance(value, (int, float, np.floating)) else BODY_FONT
if fmt and isinstance(value, (float, np.floating)):
cell.number_format = fmt
return row + 1
# ================================================================
# 시트 생성
# ================================================================
def sheet_summary(wb, config, model, zt_dict, diag, z_paths, val_df, pd_engine, pd_results, grades):
ws = wb.active; ws.title = "요약"
_widths(ws, [3,25,18,18,18,18,12,12,12,12])
r = 1; _title(ws, r, " Lifetime PD 분석 보고서", 10)
r = 2; ws.cell(row=r, column=2, value=f"생성일시: {datetime.now().strftime('%Y-%m-%d %H:%M')}").font = SMALL_FONT
r = 4
# 1. 모형 개요
r = _section(ws, r, " 1. 모형 개요", 10)
r = _kv(ws, r, "모형 구조", "Z(t) = c + φ·Z(t-1) + β₁·X₁_std + β₂·X₂_std + β₃·X₃_std + ε")
r = _kv(ws, r, "모형 유형", "AR(1) + Macro (Vasicek Single-Factor)")
r = _kv(ws, r, "적용 기준", "IFRS 9 (2018, 2024 개정)")
r = _kv(ws, r, "변수 선택", ", ".join(model.selected_vars))
r += 1
# 2. AR(1) 파라미터
r = _section(ws, r, " 2. AR(1) 모형 파라미터", 10)
r = _kv(ws, r, "자기회귀 계수 (φ)", model.ar1_phi, fmt=NUM4)
phi = model.ar1_phi
hl = math.log(2)/abs(math.log(abs(phi))) if 0<abs(phi)<1 else float('inf')
r = _kv(ws, r, "반감기", f"{hl:.1f}")
r = _kv(ws, r, "절편 (c)", model.ar1_const, fmt=NUM4)
for var, beta in model.ar1_beta.items():
r = _kv(ws, r, f"β({var})", beta, fmt=NUM4)
r = _kv(ws, r, "잔차 σ", model.ar1_sigma_eps, fmt=NUM4)
lr = model.ar1_const / (1 - model.ar1_phi) if abs(model.ar1_phi) < 1 else 0
r = _kv(ws, r, "장기 균형 Z", lr, fmt=NUM4)
r += 1
# 3. 적합도
r = _section(ws, r, " 3. 모형 적합도", 10)
r = _kv(ws, r, "", diag.get("r_squared",0), fmt=NUM4)
r = _kv(ws, r, "Adj. R²", diag.get("adj_r_squared",0), fmt=NUM4)
r = _kv(ws, r, "F p-value", diag.get("f_pvalue",0), fmt=NUM4)
r = _kv(ws, r, "AIC", diag.get("aic",0), fmt=NUM2)
r = _kv(ws, r, "DW", diag.get("durbin_watson",0), fmt=NUM4)
r += 1
# 4. 시나리오
r = _section(ws, r, " 4. 시나리오 설정", 10)
hdrs = ["", "시나리오", "가중치"]
for v in model.selected_vars:
hdrs.append(f"{v} (σ)")
hdrs.append("Z(t+1)")
r = _headers(ws, r, hdrs)
for sname, scfg in config.get("scenarios", {}).items():
vals = [None, scfg.get("name", sname), scfg.get("weight", 0)]
for v in model.selected_vars:
vals.append(scfg.get("macro_shocks", {}).get(v, 0))
z1 = z_paths.get(sname, [0])[0] if z_paths else 0
vals.append(float(z1))
r = _row(ws, r, vals, alt=(sname=="base"), fmt=NUM4)
r += 1
# 5. 1년차 가중 PD
r = _section(ws, r, " 5. 1년차 확률가중 PD (%)", 10)
r = _headers(ws, r, ["", ""] + list(grades[:-1]))
by_sc = pd_results.get("by_scenario", pd_results)
wcpd = pd_results.get("weighted_cumulative_pd", None)
if wcpd is not None and wcpd.shape[0] > 0:
wpd = wcpd[0, :len(grades)-1] * 100
else:
wpd = np.zeros(len(grades)-1)
vals = [None, "가중PD(1Y)"] + list(wpd)
r = _row(ws, r, vals, fmt=NUM4)
def sheet_tm(wb, tm_raw, tm_floor, ttc, pd_floors, config):
ws = wb.create_sheet("원시데이터_전이행렬")
grades = config.get("model",{}).get("rating_grades", RATING_GRADES)
ng = len(grades)
_widths(ws, [3,12]+[12]*ng)
nc = 2+ng; r=1
_title(ws, r, " 전이행렬 파이프라인: Original → PD Floor → TTC", nc)
r=3
# KAP 채권 YTM 기반 PD Floor 산출 과정
from data.ytm_fetcher import get_ytm_data, compute_spreads, compute_broad_grade_spreads
from data.pd_floor import compute_market_implied_pd
ytm_data = get_ytm_data()
notch_spreads = compute_spreads(ytm_data)
broad_spreads = compute_broad_grade_spreads(notch_spreads)
lgd = 0.60
rf = ytm_data.get('rf', 0)
r = _section(ws, r, " KAP 채권 YTM → 신용스프레드 → 시장내재 PD (Floor 산출 근거)", nc)
hdr_ytm = ["", "등급", "KAP YTM(%)", "스프레드(bp)", "내재PD(bp)", "Basel III(bp)", "적용Floor(bp)"]
while len(hdr_ytm) < 2 + ng:
hdr_ytm.append("")
r = _headers(ws, r, hdr_ytm[:2 + ng])
floor_grades = ["AAA", "AA", "A", "BBB", "BB", "B"]
for fg in floor_grades:
ytm_val = None
for notch in [fg, fg + '+', fg + '-']:
if notch in ytm_data:
ytm_val = ytm_data[notch]
break
if ytm_val is None:
ytm_val = rf
sp = broad_spreads.get(fg, 0)
implied_pd = compute_market_implied_pd(sp, lgd) * 10000
applied = pd_floors.get(fg, 0) * 10000
v = [None, fg, ytm_val, sp, implied_pd, 5, applied]
while len(v) < 2 + ng:
v.append(None)
r = _row(ws, r, v[:2 + ng], fmt=NUM2)
ws.cell(row=r, column=2,
value=f"기준일: 2025-12-31, 국고1Y: {rf}%, LGD: 60%, 출처: KAP(한국자산평가)").font = SMALL_FONT
r += 1
ws.cell(row=r, column=2,
value="산식: Implied PD = 1 - exp(-spread_bp / (LGD×10000)), Floor = max(Implied PD, Basel 5bp)").font = SMALL_FONT
r += 2
# TTC 전이행렬
r = _section(ws, r, f" TTC 전이행렬 (PD Floor 적용 후, {min(tm_floor.keys())}~{max(tm_floor.keys())} 평균)", nc)
r = _headers(ws, r, ["","From\\To"]+grades)
for i,g in enumerate(grades):
vals = [None,g]+[ttc[i,j] for j in range(min(ng,ttc.shape[1]))]
r = _row(ws, r, vals, alt=i%2==1, fmt=NUM4)
r += 1
# 전체 연도별 전이행렬 (Floor 적용 후)
for year in sorted(tm_floor.keys()):
r = _section(ws, r, f" {year}년 전이행렬 (PD Floor 적용 후)", nc)
r = _headers(ws, r, ["","From\\To"]+grades)
mat = tm_floor[year]
for i,g in enumerate(grades):
if i < mat.shape[0]:
vals = [None,g]+[mat[i,j] for j in range(min(ng,mat.shape[1]))]
r = _row(ws, r, vals, alt=i%2==1, fmt=NUM4)
r += 1
# 영문→한글 변수명 매핑
VAR_KOR = {
"GDP_GROWTH": "GDP성장률(%)", "IPI": "광공업생산지수", "SPI": "서비스업생산지수",
"MANUF_CAPACITY": "제조업가동률", "GFCF_GROWTH": "총고정자본증감률",
"CONSTR_INVEST": "건설투자증감률", "FACILITY_INVEST": "설비투자지수",
"RETAIL_SALES": "소매판매액지수", "CSI": "소비자심리지수", "BSI_MANUF": "제조업BSI",
"LEADING_INDEX": "경기선행지수", "COINCIDENT": "경기동행지수",
"EXPORT": "수출(백만달러)", "IMPORT_AMT": "수입(백만달러)",
"TRADE_GNI": "수출입/GNI(%)", "KOSPI": "KOSPI지수",
"INVEST_RATE": "국내총투자율(%)", "SAVING_RATE": "총저축률(%)",
"HOUSING_PRICE": "주택매매가격지수",
"UNEMPLOYMENT": "실업률(%)", "EMPLOYMENT": "고용률(%)",
"EMPLOYED": "취업자수(만명)", "EMPLOYMENT_RATE": "고용률(%)",
"BASE_RATE": "기준금리(%)", "CD_RATE": "CD91일(%)",
"GOVT_3Y": "국고3Y(%)", "GOVT_10Y": "국고10Y(%)",
"CORP_AA": "회사체AA-(%)", "CORP_BBB": "회사체BBB-(%)",
"CPI_GROWTH": "소비자물가상승률(%)", "IMPORT_PRICE": "수입물가지수",
"PPI": "생산자물가지수", "USDKRW": "원/달러환율",
"M2": "M2광의통화(조원)", "DISHONOR_RATE": "어음부도율(%)",
"DISHONOR_AMT": "부도금액(억원)", "HOUSEHOLD_DEBT": "가계부채(조원)",
"CONSTRUCTION": "건설수주액(억원)", "CONSTRUCTION_DONE": "건설기성액",
"CREDIT_SPREAD": "신용스프레드(BBB-AA)", "TERM_SPREAD": "기간스프레드(10Y-3Y)",
"CREDIT_SPREAD_LAG1": "신용스프레드(t-1)",
"EXPORT_DIFF": "수출증감액", "IPI_LAG1": "광공업생산(t-1)",
"CONSTR_INVEST_GR": "건설투자증가율", "CURRENT_ACCOUNT": "경상수지",
}
# 변환 변수 한글명 자동 생성
TRANSFORM_SUFFIX = {"_LAG2": "(t-2)", "_L": "(log)", "_D": "(차분)",
"_R": "(수익률)", "_LR": "(log수익률)"}
def _kor(varname):
if varname in VAR_KOR:
return VAR_KOR[varname]
for sfx, label in TRANSFORM_SUFFIX.items():
if varname.endswith(sfx):
base = varname[:-len(sfx)]
base_kor = VAR_KOR.get(base, base)
return f"{base_kor}{label}"
return varname
def sheet_macro(wb, macro_data, forced_vars):
ws = wb.create_sheet("원시데이터_거시변수")
display_cols = list(forced_vars) + [c for c in macro_data.columns if c not in forced_vars]
display_cols = [c for c in display_cols if c in macro_data.columns]
_widths(ws, [3,8]+[14]*len(display_cols))
nc = 2+len(display_cols); r=1
_title(ws, r, " 원시 데이터: 거시경제변수", nc)
r=3
r = _section(ws, r, f" ★ 선택 변수: {', '.join([_kor(v) for v in forced_vars])}", nc)
r = _headers(ws, r, ["","연도"]+[_kor(c) for c in display_cols])
for i,(year,rd) in enumerate(macro_data.iterrows()):
vals = [None,int(year)]+[rd[c] if c in rd and pd.notna(rd[c]) else None for c in display_cols]
r = _row(ws, r, vals, alt=i%2==1, fmt=NUM2)
def sheet_zt(wb, zt_dict, macro_data, forced_vars, rho):
ws = wb.create_sheet("Zt_추정")
ncols = 3+len(forced_vars)
_widths(ws, [3,10,14]+[14]*len(forced_vars))
r=1; _title(ws, r, " Zt 추정 (Belkin & Suchower 1998)", ncols)
r=3
r = _section(ws, r, " 방법론: 관측 전이행렬 역산 → WLS → Zt", ncols)
zv = np.array(list(zt_dict.values()))
r = _kv(ws, r, "자산상관계수 (ρ)", rho, fmt=NUM4)
r += 1
# ρ 근거
r = _section(ws, r, " ρ = 0.20 근거", ncols)
ws.cell(row=r, column=2, value="[1] Basel III IRB: 기업 ρ = 0.12~0.24 (CRE31.6)").font = SMALL_FONT; r+=1
ws.cell(row=r, column=2, value=" R = 0.12×(1-e^(-50×PD))/(1-e^(-50)) + 0.24×(1-(1-e^(-50×PD))/(1-e^(-50)))").font = SMALL_FONT; r+=1
ws.cell(row=r, column=2, value="[2] BBB(PD≈0.2%) → R=0.208, A(PD≈0.07%) → R=0.217").font = SMALL_FONT; r+=1
ws.cell(row=r, column=2, value="[3] 한국 기업 포트폴리오 평균: ρ ≈ 0.20 (투자/투기 혼합)").font = SMALL_FONT; r+=1
ws.cell(row=r, column=2, value="[4] Moody's Analytics CreditEdge: single-factor ρ ≈ 0.15~0.25").font = SMALL_FONT; r+=1
r += 1
r = _kv(ws, r, "Zt 평균 (μ)", float(zv.mean()), fmt=NUM4)
r = _kv(ws, r, "Zt 표준편차 (σ)", float(zv.std()), fmt=NUM4)
r = _kv(ws, r, "관측 기간", f"{min(zt_dict.keys())}~{max(zt_dict.keys())} ({len(zt_dict)}개년)")
r += 1
hdrs = ["","연도","Zt"]+forced_vars
r = _headers(ws, r, hdrs)
for i,(year,zt) in enumerate(sorted(zt_dict.items())):
vals = [None,int(year),float(zt)]
for v in forced_vars:
if v in macro_data.columns and year in macro_data.index:
vals.append(macro_data.loc[year,v] if pd.notna(macro_data.loc[year,v]) else None)
else: vals.append(None)
r = _row(ws, r, vals, alt=i%2==1, fmt=NUM4)
def sheet_ar1(wb, model, diag):
ws = wb.create_sheet("AR1_모형")
_widths(ws, [3,22,14,14,14,14,14])
r=1; _title(ws, r, " AR(1) + Macro 회귀 모형", 7)
r=3
r = _section(ws, r, " Z(t) = c + φ·Z(t-1) + Σ βᵢ·Xᵢ_std(t) + ε(t)", 7)
ws.cell(row=r, column=2, value="※ 거시변수는 표준화(mean=0, std=1) 후 투입. β = '1σ 충격 → ΔZ'로 해석").font = SMALL_FONT
r += 2
# 계수
r = _section(ws, r, " 회귀 계수", 7)
r = _headers(ws, r, ["","변수","계수","표준오차","t값","p값","유의성"])
coef_df = diag.get("coefficients", pd.DataFrame())
for i,(_,rd) in enumerate(coef_df.iterrows()):
pv = rd.get("p값",1)
sig = "***" if pv<0.01 else "**" if pv<0.05 else "*" if pv<0.10 else ""
vals = [None, rd.get("변수",""), rd.get("계수",0), rd.get("표준오차",0), rd.get("t값",0), pv, sig]
rn = _row(ws, r, vals, alt=i%2==1, fmt=NUM4)
if pv < 0.05: ws.cell(row=r,column=7).font = PASS_FONT
elif pv < 0.10: ws.cell(row=r,column=7).font = Font(name="맑은 고딕",size=9,color="FF8F00")
r = rn
r += 1
# 진단 — 모형 적합도
r = _section(ws, r, " 모형 적합도", 7)
for k,v,f in [("","r_squared",NUM4),("Adj. R²","adj_r_squared",NUM4),
("F 통계량","f_stat",NUM4),("F p-value","f_pvalue",NUM4),
("AIC","aic",NUM2),("BIC","bic",NUM2)]:
r = _kv(ws, r, k, diag.get(v, None), fmt=f)
r += 1
# 진단 — 잔차 검정 (6개 전항목)
r = _section(ws, r, " 잔차 검정 (6개 전항목)", 7)
r = _headers(ws, r, ["","검정","통계량","p-value","기준","결과","해석"])
tests_data = [
("ADF (Zt 정상성)", diag.get("adf_stat"), diag.get("adf_pvalue"),
"p < 0.05", diag.get("adf_pvalue",1) < 0.05 if diag.get("adf_pvalue") else False,
"BIC lag 선택, H0: 비정상"),
("Ljung-Box Q(5)", diag.get("ljung_box_stat"), diag.get("ljung_box_pvalue"),
"p > 0.05", diag.get("ljung_box_pvalue",0) > 0.05 if diag.get("ljung_box_pvalue") else False,
"H0: 자기상관 없음"),
("Durbin-Watson", diag.get("durbin_watson"), None,
"1.5~2.5", 1.5 <= diag.get("durbin_watson",0) <= 2.5 if diag.get("durbin_watson") else False,
"≈2 이상적"),
("Breusch-Pagan", diag.get("bp_stat"), diag.get("bp_pvalue"),
"p > 0.05", diag.get("bp_pvalue",0) > 0.05 if diag.get("bp_pvalue") else False,
"H0: 등분산"),
("ARCH-LM", diag.get("arch_stat"), diag.get("arch_pvalue"),
"p > 0.05", diag.get("arch_pvalue",0) > 0.05 if diag.get("arch_pvalue") else False,
"H0: ARCH 효과 없음"),
("Shapiro-Wilk", diag.get("shapiro_stat"), diag.get("shapiro_pvalue"),
"p > 0.05", diag.get("shapiro_pvalue",0) > 0.05 if diag.get("shapiro_pvalue") else False,
"H0: 정규분포"),
]
for tname, stat, pval, crit, passed, note in tests_data:
stat_str = f"{stat:.4f}" if stat is not None else "-"
pval_str = f"{pval:.4f}" if pval is not None else "-"
result_str = "Pass ✅" if passed else "Fail ❌"
vals = [None, tname, stat_str, pval_str, crit, result_str, note]
r = _row(ws, r, vals)
if passed:
ws.cell(row=r-1, column=6).font = PASS_FONT
else:
ws.cell(row=r-1, column=6).font = FAIL_FONT
r += 1
# 변수 통계
r = _section(ws, r, " 거시변수 표본 통계 (표준화 전 원시값)", 7)
r = _headers(ws, r, ["","변수","평균","표준편차","최근값","",""])
for var,st in model.ar1_macro_stats.items():
vals = [None,_kor(var),st["mean"],st["std"],st["last"],None,None]
r = _row(ws, r, vals, fmt=NUM2)
r += 1
# 경제적 해석 섹션
r = _section(ws, r, " 변수별 경제적 해석", 7)
interp = {
"CORP_BBB_LAG2": "2년전 BBB금리↑ → 신용위험 잔존 → 부도↑ → Z↓ (시차효과)",
"GFCF_GROWTH_LAG2": "2년전 고정자본투자↑ → 생산능력↑ → 부도↓ → Z↑",
"SAVING_RATE_L": "log(저축률)↑ → 경제안정성↑ → 부도↓ → Z↑",
"HOUSING_PRICE": "주택가격↑ → 담보가치↑ → 차입여력↑ → 부도↓ → Z↑",
"CREDIT_SPREAD_LAG1": "전년 스프레드↑ → 당해 신용위험 전이 → 부도↑ → Z↓ (시차 효과)",
"EXPORT_DIFF": "수출증감↑ → 기업매출↑ → 수익성↑ → 부도↓ → Z↑",
"CURRENT_ACCOUNT": "경상수지↑(흑자) → 불황기 수출의존↑ → Z↓",
"CURRENT_ACCOUNT_R": "경상수지변화율↑ → 대외부문 개선 속도↑ → Z↑ (단기 모멘텀)",
"LEADING_INDEX": "경기선행지수↑ → 3~6개월 후 경기확장 → 부도↓ → Z↑",
"CONSTR_INVEST_GR": "건설투자↑ → 과잉투자/레버리지 → Z↓ (민스키 가설)",
}
for var in model.selected_vars:
beta = model.ar1_beta.get(var, 0)
sign = "+" if beta > 0 else ""
desc = interp.get(var, "")
ws.cell(row=r, column=2, value=_kor(var)).font = BODY_BOLD
ws.cell(row=r, column=3, value=f"β={beta:+.4f} ({sign})").font = NUM_FONT
ws.cell(row=r, column=4, value=desc).font = SMALL_FONT
ws.merge_cells(start_row=r, start_column=4, end_row=r, end_column=7)
r += 1
def sheet_zpath(wb, z_paths, config):
ws = wb.create_sheet("시나리오_Z경로")
scenarios = list(z_paths.keys())
nc = 2+len(scenarios)
_widths(ws, [3,10]+[16]*len(scenarios))
r=1; _title(ws, r, " 시나리오별 Z(t) 경로", nc)
r=3
r = _section(ws, r, " t=1: 거시 충격 적용 | t≥2: AR(1) 감쇠 → TTC 수렴", nc)
r += 1
names = []
for s in scenarios:
c = config.get("scenarios",{}).get(s,{})
names.append(c.get("name",s))
r = _headers(ws, r, ["","연도(t+k)"]+names)
horizon = len(list(z_paths.values())[0])
key_years = list(range(1,11))+[15,20,25,30,40,50]
for t in key_years:
if t <= horizon:
vals = [None,t]+[float(z_paths[s][t-1]) for s in scenarios]
r = _row(ws, r, vals, alt=t%2==0, fmt=NUM4)
def sheet_pd(wb, pd_results, config, grades8):
ws = wb.create_sheet("Lifetime_PD")
ng = len(grades8)-1 # D 제외
_widths(ws, [3,14,8]+[14]*ng)
nc = 3+ng
r=1; _title(ws, r, " 시나리오별 누적 Lifetime PD (%)", nc)
r=3
ky = [1,2,3,5,7,10,15,20,30,50]
by_sc = pd_results.get("by_scenario", {})
for sname, sdata in by_sc.items():
c = config.get("scenarios",{}).get(sname,{})
dn = c.get("name",sname); w = c.get("weight",0)
r = _section(ws, r, f" {dn} (가중치 {w*100:.0f}%)", nc)
r = _headers(ws, r, ["","시나리오","연도"]+list(grades8[:-1]))
cpd = sdata.get("cumulative_pd", np.zeros((50,ng)))
for t in ky:
if t <= cpd.shape[0]:
vals = [None,dn,t]+[cpd[t-1,g]*100 for g in range(min(ng,cpd.shape[1]))]
r = _row(ws, r, vals, alt=ky.index(t)%2==1, fmt=NUM4)
r += 1
def sheet_weighted(wb, pd_results, config, grades8):
ws = wb.create_sheet("가중평균_PD")
ng = len(grades8)-1
_widths(ws, [3,8]+[14]*ng)
nc = 2+ng
r=1; _title(ws, r, " 확률가중 Lifetime PD (%)", nc)
r=3
# IFRS 9 근거
r = _section(ws, r, " IFRS 9 근거: 확률가중 기대신용손실", nc)
ws.cell(row=r, column=2, value='IFRS 9 B5.5.42: "기대신용손실은 확률가중 금액이어야 하며,').font = SMALL_FONT; r+=1
ws.cell(row=r, column=2, value='가능한 결과의 범위를 반영하여야 한다. 단일 가장 가능성 높은 결과가 아닌,').font = SMALL_FONT; r+=1
ws.cell(row=r, column=2, value='신용위험의 벽혹을 변경시키는 일반적 경제 조건에 대한 예측을 포함하여야 한다."').font = SMALL_FONT; r+=1
ws.cell(row=r, column=2, value='IFRS 9 B5.5.44: "최소 2개 시나리오(호황/불황)+확률가중치 = ECL 요구사항을 충족할 수 있다."').font = SMALL_FONT; r+=1
r += 1
r = _section(ws, r, " PD_weighted(t) = Σ w_s × PD_s(t)", nc)
wstr = " + ".join([f"{c.get('weight',0)*100:.0f}%×{c.get('name',s)}" for s,c in config.get("scenarios",{}).items()])
ws.cell(row=r, column=2, value=f"= {wstr}").font = SMALL_FONT
r += 2
ky = [1,2,3,5,7,10,15,20,30,50]
r = _headers(ws, r, ["","연도"]+list(grades8[:-1]))
wcpd = pd_results.get("weighted_cumulative_pd", np.zeros((50, ng)))
for t in ky:
if t <= wcpd.shape[0]:
wpd = wcpd[t-1,:ng] * 100
else:
wpd = np.zeros(ng)
vals = [None,t]+list(wpd)
r = _row(ws, r, vals, alt=ky.index(t)%2==1, fmt=NUM4)
def sheet_validation(wb, val_df):
ws = wb.create_sheet("검증결과")
_widths(ws, [3,30,22,14,14,10,40])
r=1; _title(ws, r, " 통계적 검증 결과", 7)
r=3
cols = list(val_df.columns)
r = _headers(ws, r, [""]+cols)
for i,(_,rd) in enumerate(val_df.iterrows()):
vals = [None]+[rd[c] for c in cols]
rn = _row(ws, r, vals, alt=i%2==1)
# 결과 색상
result_col = cols.index("결과")+2 if "결과" in cols else None
if result_col:
cell = ws.cell(row=r, column=result_col)
if "Pass" in str(cell.value):
cell.fill = PASS_FILL; cell.font = PASS_FONT
elif "Fail" in str(cell.value):
cell.fill = FAIL_FILL; cell.font = FAIL_FONT
r = rn
# ================================================================
# 메인
# ================================================================
def generate_report(config_path="config.yaml", output_path="results/lifetime_pd_report.xlsx"):
print("=" * 60)
print(" Lifetime PD 분석 보고서 생성")
print("=" * 60)
with open(config_path) as f:
config = yaml.safe_load(f)
rho = config.get("model",{}).get("rho", 0.20)
grades = config.get("model",{}).get("rating_grades", list(RATING_GRADES))
forced_vars = config.get("model",{}).get("macro_vars", [])
macro_method = config.get("model",{}).get("macro_method", "ar1_macro")
horizon = config.get("convergence",{}).get("total_horizon", 50)
from data.pd_floor import apply_pd_floor_to_matrices, build_complete_pd_floor_table
# 1. 데이터
print("\n [1/6] 데이터 로딩...")
data_config = config.get("data", {})
tm_source = data_config.get("transition_source", "real")
tm_dir = data_config.get("transition_dir", None)
tm_all = load_transition_matrices(tm_source, data_dir=tm_dir)
# 2000-2025 필터
tm_raw = {y:m for y,m in tm_all.items() if 2000 <= y <= 2025}
# KAP 채권 YTM 기반 PD Floor 적용
pd_floors, _, pd_floors_full = build_complete_pd_floor_table()
tm = apply_pd_floor_to_matrices(tm_raw, pd_floors)
ttc = compute_ttc_matrix(tm)
# 거시변수
macro_data = _fallback_macro_data()
try:
ecos = load_ecos_macro()
if ecos is not None and not ecos.empty:
macro_data = pd.concat([macro_data, ecos], axis=1)
macro_data = macro_data.loc[:,~macro_data.columns.duplicated()]
except: pass
derived = compute_derived_features(macro_data)
if not derived.empty:
macro_data = pd.concat([macro_data, derived], axis=1)
macro_data = macro_data.loc[:,~macro_data.columns.duplicated()]
# 확장 변환: LAG2, log, diff, pctchg, log-return
base_cols = list(macro_data.columns)
for col in base_cols:
s = macro_data[col]
d = s.diff()
if d.std() > 1e-10:
macro_data[f"{col}_D"] = d
pc = s.pct_change().replace([np.inf, -np.inf], np.nan)
if pc.dropna().std() > 1e-10:
macro_data[f"{col}_R"] = pc
if (s > 0).all():
ls = np.log(s)
if ls.std() > 1e-10:
macro_data[f"{col}_L"] = ls
ld = ls.diff()
if ld.dropna().std() > 1e-10:
macro_data[f"{col}_LR"] = ld
l2 = s.shift(1)
if l2.dropna().std() > 1e-10:
macro_data[f"{col}_LAG2"] = l2
macro_data = macro_data.ffill().bfill()
macro_data = macro_data.loc[:,~macro_data.columns.duplicated()]
print(f" 전이행렬: {len(tm)}개년 [{tm_source}], PD Floor 적용, 거시변수: {len(macro_data.columns)}")
# 2. Zt
print(" [2/6] Zt 추정...")
zt_dict = estimate_zt_series(tm, ttc, rho)
# 3. AR(1)
print(" [3/6] AR(1)+Macro 적합...")
model = build_macro_zt_model(zt_dict, macro_data, method=macro_method, forced_vars=forced_vars)
diag = model.diagnostics()
# 추가 진단 통계 (AR1 시트 6개 검정용)
zt_arr = np.array([zt_dict[yr] for yr in sorted(zt_dict.keys())])
from statsmodels.tsa.stattools import adfuller as _adfuller
_adf = _adfuller(zt_arr, autolag="BIC")
diag["adf_stat"] = _adf[0]; diag["adf_pvalue"] = _adf[1]
if model.result is not None:
_resid = model.result.resid
_exog = model.result.model.exog
from statsmodels.stats.diagnostic import acorr_ljungbox as _lb, het_breuschpagan as _bp, het_arch as _arch
from scipy.stats import shapiro as _shapiro
try:
_lbr = _lb(_resid, lags=[5], return_df=True)
diag["ljung_box_stat"] = float(_lbr["lb_stat"].iloc[0])
diag["ljung_box_pvalue"] = float(_lbr["lb_pvalue"].iloc[0])
except: pass
try:
_bpr = _bp(_resid, _exog)
diag["bp_stat"] = float(_bpr[0]); diag["bp_pvalue"] = float(_bpr[1])
except: pass
try:
_ar = _arch(_resid, nlags=3)
diag["arch_stat"] = float(_ar[0]); diag["arch_pvalue"] = float(_ar[1])
except: pass
try:
_sw = _shapiro(_resid)
diag["shapiro_stat"] = float(_sw.statistic); diag["shapiro_pvalue"] = float(_sw.pvalue)
except: pass
diag["bic"] = float(model.result.bic) if hasattr(model.result, 'bic') else None
print(f" φ={model.ar1_phi:.4f}, R²={diag['r_squared']:.4f}, Adj.R²={diag['adj_r_squared']:.4f}")
# 4. 시나리오
print(" [4/6] 시나리오 Z경로...")
engine = ScenarioEngine(config)
z_paths = engine.generate_z_paths(zt_dict, macro_model=model)
weights = engine.get_scenario_weights()
# 5. Lifetime PD
print(" [5/6] Lifetime PD 산출...")
ttc_8x8 = expand_to_8x8(ttc) if ttc.shape == (7,7) else ttc
pd_engine = LifetimePDEngine(ttc_8x8, rho, rating_grades=RATING_GRADES_8)
pd_results = pd_engine.compute_all_scenarios(z_paths, weights, horizon)
# 6. 검증
print(" [6/6] 통계 검증...")
zt_series = pd.Series(zt_dict)
reg_result = model.result
val_df = run_full_validation(zt_series.values, reg_result, pd_results, list(RATING_GRADES[:-1]))
# ================================================================
# Excel 생성
# ================================================================
print(f"\n Excel 보고서 생성 중...")
wb = Workbook()
sheet_summary(wb, config, model, zt_dict, diag, z_paths, val_df, pd_engine, pd_results, RATING_GRADES_8)
sheet_tm(wb, tm_raw, tm, ttc, pd_floors, config)
sheet_macro(wb, macro_data, forced_vars)
sheet_zt(wb, zt_dict, macro_data, forced_vars, rho)
sheet_ar1(wb, model, diag)
sheet_zpath(wb, z_paths, config)
sheet_pd(wb, pd_results, config, RATING_GRADES_8)
sheet_weighted(wb, pd_results, config, RATING_GRADES_8)
sheet_validation(wb, val_df)
os.makedirs(os.path.dirname(output_path) or '.', exist_ok=True)
wb.save(output_path)
print(f"\n ✓ 보고서 저장: {output_path}")
print(f" 시트: {len(wb.sheetnames)}개 ({', '.join(wb.sheetnames)})")
return output_path
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Lifetime PD 보고서 생성")
parser.add_argument("--config", default="config.yaml")
parser.add_argument("--output", default="results/lifetime_pd_report.xlsx")
args = parser.parse_args()
generate_report(args.config, args.output)