diff --git a/src/models/revalidate.py b/src/models/revalidate.py new file mode 100644 index 0000000..c534d1d --- /dev/null +++ b/src/models/revalidate.py @@ -0,0 +1,442 @@ +""" +데이터 재검증 + Merton/Shadow Rating 재산출 + +1. 비정상 종목 필터링 (SPAC, 리츠, 펀드, ETF 등) +2. 금융업 필터 보강 +3. DD 이상치 캡핑 +4. EDF 단조성 보정 (isotonic regression) +5. Shadow Rating 재산출 +6. 등급별 부도율 재산출 + +Usage: + python -m src.models.revalidate # 전체 재검증 + python -m src.models.revalidate --dry # 필터링 결과만 확인 (DB 미갱신) +""" +import sys +import argparse +import yaml +import numpy as np +import pandas as pd +from datetime import datetime +from pathlib import Path +from scipy.stats import norm + +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + +from src.data.database import get_connection, init_db +from src.models.merton import ( + solve_merton, calculate_dd, calculate_edf, naive_dd, + dd_to_rating, DD_RATING_MAP, GLOBAL_DEFAULT_RATES +) + +# ============================================================ +# 1. 비정상 종목 필터링 +# ============================================================ + +# SPAC, 리츠, 펀드, ETF 등 비정상 종목 키워드 +EXCLUDE_KEYWORDS = [ + "스팩", "SPAC", "리츠", "REIT", "인프라", + "호스팩", "호스펀드", "호펀드", + "선박", "ETF", "ETN", +] + +# 단어 끝에 "N호" 패턴 (스팩 종목 탐지) +import re +SPAC_PATTERN = re.compile(r"(\d+호|제\d+호)") + +# 금융업 키워드 (강화) +FINANCIAL_KEYWORDS = [ + "은행", "금융", "보험", "증권", "캐피탈", "저축", + "생명", "화재", "손해", "카드", "리스", "자산운용", + "파이낸셜", "파이낸스", "벤처캐피탈", + "투자증권", "종합금융", "상호저축", "새마을금고", +] + +# 금융지주 (정확 매칭만) +FINANCIAL_HOLDING_NAMES = [ + "KB금융", "신한지주", "하나금융지주", "우리금융지주", + "BNK금융지주", "DGB금융지주", "JB금융지주", + "한국금융지주", "메리츠금융지주", +] + + +def classify_ticker(name: str, leverage: float) -> str: + """종목 분류 → 'normal', 'spac', 'reit', 'financial', 'etf_fund'""" + if not name: + return "normal" + + # SPAC 탐지 + if "스팩" in name or "SPAC" in name or "호스팩" in name: + return "spac" + if SPAC_PATTERN.search(name) and any(kw in name for kw in ["스팩", "기업인수", "합병"]): + return "spac" + + # 리츠 + if "리츠" in name or "REIT" in name or "인프라" in name: + return "reit" + + # ETF/ETN + if "ETF" in name or "ETN" in name: + return "etf_fund" + + # 금융업 (이름 기반) + if any(kw in name for kw in FINANCIAL_KEYWORDS): + return "financial" + if name in FINANCIAL_HOLDING_NAMES: + return "financial" + + # 레버리지 >0.90 = 금융업 가능성 높음 + if pd.notna(leverage) and leverage > 0.90: + return "financial" + + return "normal" + + +def filter_and_classify(conn) -> pd.DataFrame: + """전 종목 분류 + 필터링""" + query = """ + SELECT + mr.ticker, c.name, mr.DD, mr.EDF, mr.E, mr.D, + mr.sigma_E, mr.sigma_V, mr.leverage, mr.method, + f.leverage_ratio, f.total_assets, f.total_equity, + f.roa, f.interest_coverage, f.log_assets, + f.current_liabilities, f.non_current_liabilities, + f.total_liabilities, f.default_point, + f.operating_income, f.net_income, + mr.base_date, mr.fin_year + FROM merton_results mr + JOIN financial_data f ON mr.ticker = f.ticker + JOIN companies c ON mr.ticker = c.ticker + """ + df = pd.read_sql_query(query, conn) + + # 분류 + df["category"] = df.apply( + lambda r: classify_ticker(r["name"], r["leverage_ratio"]), axis=1 + ) + + return df + + +# ============================================================ +# 2. DD 이상치 캡핑 + EDF floor +# ============================================================ + +def apply_dd_caps(df: pd.DataFrame) -> pd.DataFrame: + """DD 이상치 캡핑: 100 이상은 비정상""" + df = df.copy() + + # DD 캡핑: [-5, 15] 범위로 제한 + # DD가 15 이상 = 부도확률이 사실상 0 (수치적으로 무의미한 차이) + # DD가 -5 이하 = 이미 부도 상태 + original_dd = df["DD"].copy() + df["DD"] = df["DD"].clip(-5, 15) + + capped_high = (original_dd > 15).sum() + capped_low = (original_dd < -5).sum() + + # EDF 재계산 (캡핑된 DD 기준) + df["EDF"] = df["DD"].apply(lambda dd: norm.cdf(-dd)) + + print(f" DD 캡핑: 상한(>15)={capped_high}건, 하한(<-5)={capped_low}건") + + return df + + +# ============================================================ +# 3. Composite Score + Shadow Rating (개선) +# ============================================================ + +def compute_improved_shadow(df: pd.DataFrame) -> pd.DataFrame: + """개선된 Shadow Rating: DD 가중치 높이고 monotonicity 보장""" + df = df.copy() + + def zscore(s): + mean, std = s.mean(), s.std() + if std == 0 or pd.isna(std): + return pd.Series(0, index=s.index) + return (s - mean) / std + + z_dd = zscore(df["DD"]) + z_lev = -zscore(df["leverage_ratio"].fillna(0.5)) + + roa = df["roa"].fillna(0).clip(-1, 1) + z_roa = zscore(roa) + + icr = df["interest_coverage"].fillna(0).clip(-10, 100) + z_icr = zscore(icr) + + z_size = zscore(df["log_assets"].fillna(df["log_assets"].median())) + + # DD에 70% 가중치 (EDF 역전 최소화) + df["composite_score"] = ( + 0.70 * z_dd + # DD 핵심 + 0.10 * z_lev + # 레버리지 + 0.10 * z_roa + # 수익성 + 0.05 * z_icr + # 이자보상 + 0.05 * z_size # 규모 + ) + + # Score 내림차순 정렬 → 등급 할당 + df = df.sort_values("composite_score", ascending=False).reset_index(drop=True) + n = len(df) + + # 등급 분포 (현실적 S&P 분포 기반 — 한국 시장 조정) + rating_dist = { + "AAA": 0.005, "AA+": 0.01, "AA": 0.02, "AA-": 0.03, + "A+": 0.05, "A": 0.07, "A-": 0.08, + "BBB+": 0.08, "BBB": 0.10, "BBB-": 0.09, + "BB+": 0.08, "BB": 0.09, "BB-": 0.07, + "B+": 0.06, "B": 0.05, "B-": 0.04, + "CCC+": 0.02, "CCC": 0.02, "CCC-": 0.03, + } + + grades = list(rating_dist.keys()) + idx = 0 + df["shadow_rating"] = "" + for i, grade in enumerate(grades): + if i == len(grades) - 1: + count = n - idx + else: + count = max(1, round(n * rating_dist[grade])) + df.loc[idx:idx+count-1, "shadow_rating"] = grade + idx += count + df.loc[df["shadow_rating"] == "", "shadow_rating"] = grades[-1] + + return df + + +# ============================================================ +# 4. EDF 단조성 보정 +# ============================================================ + +def enforce_monotonicity(dr_df: pd.DataFrame) -> pd.DataFrame: + """등급별 부도율 단조 증가 보장 (isotonic 보정)""" + dr_df = dr_df.copy() + + # 등급 순서 (좋은 → 나쁜) + rating_order = list(GLOBAL_DEFAULT_RATES.keys()) + dr_df["rating_idx"] = dr_df["rating_grade"].apply( + lambda x: rating_order.index(x) if x in rating_order else -1 + ) + dr_df = dr_df.sort_values("rating_idx") + + # pool adjacent violator (isotonic regression) + values = dr_df["korean_dr"].values.copy() + n = len(values) + + # 단조 증가 강제 + for i in range(1, n): + if values[i] < values[i-1]: + # 역전 → 두 값의 평균으로 대체 + avg = (values[i-1] + values[i]) / 2 + values[i-1] = avg + values[i] = avg + # 이전 값과도 체크 + j = i - 1 + while j > 0 and values[j] < values[j-1]: + avg = (values[j-1] + values[j]) / 2 + values[j-1] = avg + values[j] = avg + j -= 1 + + dr_df["korean_dr_monotone"] = values + dr_df.drop(columns=["rating_idx"], inplace=True) + + return dr_df + + +# ============================================================ +# 5. 등급별 부도율 산출 +# ============================================================ + +def compute_default_rates(df: pd.DataFrame, config: dict) -> pd.DataFrame: + """등급별 부도율 + 글로벌 블렌딩 + 베이지안 + 단조보정""" + threshold = config.get("blending", {}).get("threshold", 50) + prior_strength = config.get("blending", {}).get("bayesian_prior_strength", 50) + + rating_order = list(GLOBAL_DEFAULT_RATES.keys()) + + results = [] + for rating in rating_order: + subset = df[df["shadow_rating"] == rating] + n_firms = len(subset) + if n_firms == 0: + continue + + korean_dr = subset["EDF"].mean() + global_dr = GLOBAL_DEFAULT_RATES.get(rating, 0.01) + + weight_kr = min(n_firms / threshold, 1.0) + blended_dr = weight_kr * korean_dr + (1 - weight_kr) * global_dr + + alpha_prior = global_dr * prior_strength + beta_prior = (1 - global_dr) * prior_strength + alpha_post = alpha_prior + n_firms * korean_dr + beta_post = beta_prior + n_firms * (1 - korean_dr) + bayesian_dr = alpha_post / (alpha_post + beta_post) + + results.append({ + "rating_grade": rating, + "n_firms": n_firms, + "korean_dr": korean_dr, + "global_dr": global_dr, + "weight_kr": weight_kr, + "blended_dr": blended_dr, + "bayesian_dr": bayesian_dr, + }) + + dr_df = pd.DataFrame(results) + + # 단조성 보정 + dr_df = enforce_monotonicity(dr_df) + + # 최종 부도율 = 단조보정된 한국DR과 글로벌의 블렌딩 + dr_df["final_dr"] = dr_df.apply( + lambda r: min(r["weight_kr"], 1.0) * r["korean_dr_monotone"] + + (1 - min(r["weight_kr"], 1.0)) * r["global_dr"], + axis=1 + ) + + # EDF floor: AAA도 최소 0.0001% (1bp) + dr_df["final_dr"] = dr_df["final_dr"].clip(lower=0.00001) + + return dr_df + + +# ============================================================ +# Main +# ============================================================ + +def load_config() -> dict: + config_path = Path(__file__).parent.parent.parent / "config" / "settings.yaml" + with open(config_path, "r", encoding="utf-8") as f: + return yaml.safe_load(f) + + +def main(): + parser = argparse.ArgumentParser(description="데이터 재검증 + 재산출") + parser.add_argument("--dry", action="store_true", help="DB 미갱신 (확인만)") + args = parser.parse_args() + + config = load_config() + conn = init_db() + + # 1) 분류 + 필터링 + print("="*60) + print("[1/5] 종목 분류 + 필터링") + print("="*60) + + df = filter_and_classify(conn) + print(f" 전체: {len(df)}개") + + cat_counts = df["category"].value_counts() + for cat, cnt in cat_counts.items(): + example = df[df["category"] == cat]["name"].iloc[0] if cnt > 0 else "" + print(f" {cat:12s}: {cnt:5d}개 (예: {example})") + + # 비정상 종목 제거 + df_clean = df[df["category"] == "normal"].copy() + removed = len(df) - len(df_clean) + print(f"\n -> 정상 종목: {len(df_clean)}개 (제거: {removed}개)") + + # 2) DD 캡핑 + print("\n" + "="*60) + print("[2/5] DD 캡핑 + EDF 재계산") + print("="*60) + + df_clean = apply_dd_caps(df_clean) + + print(f" DD 통계: 평균={df_clean['DD'].mean():.2f}, 중앙={df_clean['DD'].median():.2f}") + print(f" EDF 통계: 평균={df_clean['EDF'].mean():.6f}, 중앙={df_clean['EDF'].median():.6f}") + + # 3) Shadow Rating 재산출 + print("\n" + "="*60) + print("[3/5] Shadow Rating 재산출 (DD 70%)") + print("="*60) + + df_clean = compute_improved_shadow(df_clean) + + rating_order = list(GLOBAL_DEFAULT_RATES.keys()) + df_clean["shadow_rating"] = pd.Categorical( + df_clean["shadow_rating"], categories=rating_order, ordered=True + ) + + dist = df_clean["shadow_rating"].value_counts().sort_index() + prev_edf = -1 + for rating, count in dist.items(): + if count > 0: + avg_dd = df_clean[df_clean["shadow_rating"] == rating]["DD"].mean() + avg_edf = df_clean[df_clean["shadow_rating"] == rating]["EDF"].mean() + inv = " <<= 0 else "" + print(f" {rating:5s}: {count:4d} DD={avg_dd:6.2f} EDF={avg_edf:.6f}{inv}") + prev_edf = avg_edf + + # 4) 등급별 부도율 + print("\n" + "="*60) + print("[4/5] 등급별 부도율 (단조보정 + 블렌딩)") + print("="*60) + + dr_df = compute_default_rates(df_clean, config) + + print(f"\n{'grade':>5} | {'N':>4} | {'EDF_KR':>10} | {'monotone':>10} | {'global':>10} | {'final':>10}") + print("-" * 65) + for _, row in dr_df.iterrows(): + print(f" {row['rating_grade']:5s} | {row['n_firms']:4d} | " + f"{row['korean_dr']:10.6f} | {row['korean_dr_monotone']:10.6f} | " + f"{row['global_dr']:10.6f} | {row['final_dr']:10.6f}") + + # 5) DB 저장 + if not args.dry: + print("\n" + "="*60) + print("[5/5] DB 저장") + print("="*60) + + # merton_results 초기화 & 재저장 + conn.execute("DELETE FROM merton_results") + conn.execute("DELETE FROM default_rates") + + base_date_str = df_clean["base_date"].iloc[0] if "base_date" in df_clean.columns else datetime.now().strftime("%Y-%m-%d") + + for _, row in df_clean.iterrows(): + conn.execute(""" + INSERT OR REPLACE INTO merton_results + (ticker, base_date, fin_year, E, sigma_E, D, V, sigma_V, DD, EDF, leverage, method, dd_rating) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + row["ticker"], row.get("base_date", base_date_str), int(row.get("fin_year", 2024)), + row["E"], row["sigma_E"], row["D"], + row.get("E", 0) + row.get("D", 0), # V approximation + row["sigma_V"], row["DD"], row["EDF"], + row["leverage"], row["method"], row["shadow_rating"] + )) + + for _, row in dr_df.iterrows(): + conn.execute(""" + INSERT OR REPLACE INTO default_rates + (base_date, rating_grade, n_firms, n_defaults, korean_dr, global_dr, weight_kr, blended_dr, bayesian_dr) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + datetime.now().strftime("%Y-%m-%d"), + row["rating_grade"], int(row["n_firms"]), 0, + row["korean_dr_monotone"], row["global_dr"], row["weight_kr"], + row["final_dr"], row["bayesian_dr"] + )) + + # companies 금융업 표시 + conn.execute("UPDATE companies SET is_financial = 0") + for _, row in df[df["category"] != "normal"].iterrows(): + conn.execute("UPDATE companies SET is_financial = 1 WHERE ticker = ?", (row["ticker"],)) + + conn.commit() + print(f" merton_results: {len(df_clean)}건 저장") + print(f" default_rates: {len(dr_df)}건 저장") + else: + print("\n [DRY RUN] DB 미갱신") + + conn.close() + return df_clean, dr_df + + +if __name__ == "__main__": + main() diff --git a/src/reports/__init__.py b/src/reports/__init__.py new file mode 100644 index 0000000..9e058d3 --- /dev/null +++ b/src/reports/__init__.py @@ -0,0 +1 @@ +# EDF Reports package diff --git a/src/reports/generate_excel.py b/src/reports/generate_excel.py new file mode 100644 index 0000000..280cdd0 --- /dev/null +++ b/src/reports/generate_excel.py @@ -0,0 +1,450 @@ +""" +EDF 프로젝트 Excel 리포트 생성기 + +4개 시트로 구성된 Excel 리포트를 생성합니다: +1. 방법론 요약 (Overview) — 분석 흐름과 모형 설명 +2. 등급별 부도율 (Default Rates) — 최종 결과 테이블 + 차트 +3. 종목별 상세 (Company Detail) — Shadow Rating, DD, EDF, 변동성 +4. 데이터 품질 (Data Quality) — 필터링/검증 결과 + +Usage: + python -m src.reports.generate_excel + python -m src.reports.generate_excel --output "outputs/custom_name.xlsx" +""" +import sys +import argparse +import sqlite3 +import numpy as np +import pandas as pd +from datetime import datetime +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + +try: + import xlsxwriter +except ImportError: + raise ImportError("xlsxwriter 미설치: pip install xlsxwriter") + + +def load_data(): + """DB에서 리포트 데이터 로드""" + conn = sqlite3.connect(str(Path(__file__).parent.parent.parent / "data" / "edf.db")) + + # 종목별 Merton 결과 + companies = pd.read_sql_query(""" + SELECT + mr.ticker, c.name, + mr.DD, mr.EDF, mr.sigma_E, mr.sigma_V, + mr.E, mr.D, mr.leverage, mr.method, + mr.dd_rating as shadow_rating, + f.total_assets, f.total_liabilities, f.total_equity, + f.leverage_ratio, f.roa, f.interest_coverage, + f.operating_income, f.net_income + FROM merton_results mr + JOIN companies c ON mr.ticker = c.ticker + JOIN financial_data f ON mr.ticker = f.ticker + ORDER BY mr.DD DESC + """, conn) + + # 등급별 부도율 + default_rates = pd.read_sql_query(""" + SELECT * FROM default_rates ORDER BY rowid + """, conn) + + # DB 통계 + stats = {} + for table in ["companies", "market_data", "financial_data", "volatility", "merton_results"]: + stats[table] = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0] + + # 제거 종목 수 + stats["total_listed"] = conn.execute("SELECT COUNT(*) FROM companies").fetchone()[0] + stats["excluded"] = stats["total_listed"] - stats["merton_results"] + + conn.close() + return companies, default_rates, stats + + +# ============================================================ +# 등급 색상 매핑 +# ============================================================ +RATING_COLORS = { + "AAA": "#1B5E20", "AA+": "#2E7D32", "AA": "#388E3C", "AA-": "#43A047", + "A+": "#4CAF50", "A": "#66BB6A", "A-": "#81C784", + "BBB+": "#FFA000", "BBB": "#FF8F00", "BBB-": "#FF6F00", + "BB+": "#E65100", "BB": "#BF360C", "BB-": "#D84315", + "B+": "#C62828", "B": "#B71C1C", "B-": "#880E4F", + "CCC+": "#4A148C", "CCC": "#311B92", "CCC-": "#1A237E", +} + +RATING_ORDER = ["AAA","AA+","AA","AA-","A+","A","A-", + "BBB+","BBB","BBB-","BB+","BB","BB-", + "B+","B","B-","CCC+","CCC","CCC-"] + + +def create_report(output_path: str): + """Excel 리포트 생성""" + companies, default_rates, stats = load_data() + + wb = xlsxwriter.Workbook(output_path, {"nan_inf_to_errors": True}) + + # ---- 공통 서식 정의 ---- + title_fmt = wb.add_format({ + "bold": True, "font_size": 18, "font_color": "#1A237E", + "bottom": 2, "bottom_color": "#1A237E", + }) + subtitle_fmt = wb.add_format({ + "bold": True, "font_size": 13, "font_color": "#37474F", + "top": 1, "top_color": "#CFD8DC", "bottom": 1, "bottom_color": "#CFD8DC", + "bg_color": "#ECEFF1", + }) + header_fmt = wb.add_format({ + "bold": True, "font_size": 10, "font_color": "#FFFFFF", + "bg_color": "#263238", "border": 1, "border_color": "#455A64", + "text_wrap": True, "align": "center", "valign": "vcenter", + }) + cell_fmt = wb.add_format({ + "font_size": 10, "border": 1, "border_color": "#CFD8DC", + "valign": "vcenter", + }) + cell_center = wb.add_format({ + "font_size": 10, "border": 1, "border_color": "#CFD8DC", + "align": "center", "valign": "vcenter", + }) + pct_fmt = wb.add_format({ + "font_size": 10, "border": 1, "border_color": "#CFD8DC", + "num_format": "0.0000%", "align": "center", + }) + pct2_fmt = wb.add_format({ + "font_size": 10, "border": 1, "border_color": "#CFD8DC", + "num_format": "0.00%", "align": "center", + }) + num_fmt = wb.add_format({ + "font_size": 10, "border": 1, "border_color": "#CFD8DC", + "num_format": "#,##0", "align": "right", + }) + dec2_fmt = wb.add_format({ + "font_size": 10, "border": 1, "border_color": "#CFD8DC", + "num_format": "0.00", "align": "center", + }) + dec4_fmt = wb.add_format({ + "font_size": 10, "border": 1, "border_color": "#CFD8DC", + "num_format": "0.0000", "align": "center", + }) + note_fmt = wb.add_format({ + "font_size": 9, "font_color": "#78909C", "italic": True, + "text_wrap": True, + }) + good_fmt = wb.add_format({ + "font_size": 10, "border": 1, "border_color": "#CFD8DC", + "bg_color": "#E8F5E9", "font_color": "#1B5E20", "align": "center", + }) + warn_fmt = wb.add_format({ + "font_size": 10, "border": 1, "border_color": "#CFD8DC", + "bg_color": "#FFF3E0", "font_color": "#E65100", "align": "center", + }) + bad_fmt = wb.add_format({ + "font_size": 10, "border": 1, "border_color": "#CFD8DC", + "bg_color": "#FFEBEE", "font_color": "#B71C1C", "align": "center", + }) + + # 등급별 서식 생성 + rating_fmts = {} + for rating, color in RATING_COLORS.items(): + rating_fmts[rating] = wb.add_format({ + "font_size": 10, "bold": True, "border": 1, "border_color": "#CFD8DC", + "font_color": "#FFFFFF", "bg_color": color, "align": "center", + }) + + # ========================================================== + # Sheet 1: Overview (방법론 요약) + # ========================================================== + ws1 = wb.add_worksheet("Overview") + ws1.hide_gridlines(2) + ws1.set_column("A:A", 3) + ws1.set_column("B:B", 25) + ws1.set_column("C:C", 65) + ws1.set_tab_color("#1A237E") + + r = 1 + ws1.merge_range(r, 1, r, 2, "EDF (Expected Default Frequency) 분석 보고서", title_fmt) + r += 1 + ws1.write(r, 1, f"생성일: {datetime.now().strftime('%Y-%m-%d %H:%M')}", note_fmt) + ws1.write(r, 2, f"분석 대상: 한국 상장기업 {stats['merton_results']:,}개", note_fmt) + + r += 2 + ws1.merge_range(r, 1, r, 2, "1. 분석 개요", subtitle_fmt); r += 1 + overview_text = [ + ("목적", "주식 변동성을 활용하여 한국 상장기업의 신용등급별 기대부도빈도(EDF)를 산출합니다."), + ("대상", f"KRX 상장기업 {stats['total_listed']:,}개 중 비금융/비스팩 {stats['merton_results']:,}개"), + ("기준일", "2025년 3월 (최근 거래일 기준)"), + ("재무연도", "2024년 연결재무제표"), + ("데이터 소스", "KRX(pykrx): 주가/변동성 | DART(OpenDartReader): 재무제표"), + ] + for label, desc in overview_text: + ws1.write(r, 1, label, cell_fmt) + ws1.write(r, 2, desc, cell_fmt) + r += 1 + + r += 1 + ws1.merge_range(r, 1, r, 2, "2. 분석 단계 (Pipeline)", subtitle_fmt); r += 1 + pipeline = [ + ("Step 1", "KRX 주가 수집", "pykrx로 종목별 1년 일별 종가 수집 (약 246 거래일)"), + ("Step 2", "주가 변동성(σ_E) 산출", "일별 로그수익률의 표준편차 × √252 (연환산)"), + ("Step 3", "DART 재무제표 수집", "부채총계, 유동/비유동부채, 자본총계, 영업이익 등"), + ("Step 4", "부도점(D) 계산", "Default Point = 유동부채 + 0.5 × 비유동부채 (KMV 방식)"), + ("Step 5", "Merton 모형 풀이", "E = V·N(d₁) − D·e⁻ʳᵀ·N(d₂) + σ_E = (V/E)·N(d₁)·σ_V"), + ("Step 6", "DD / EDF 산출", "DD = [ln(V/D) + (μ−½σ²)T] / (σ·√T), EDF = N(−DD)"), + ("Step 7", "Composite Score", "DD(70%) + 레버리지(10%) + ROA(10%) + ICR(5%) + 규모(5%)"), + ("Step 8", "Shadow Rating", "Composite Score 기반 등급 할당 (글로벌 분포 참조)"), + ("Step 9", "부도율 블렌딩", "한국 EDF × 가중치 + 글로벌 벤치마크 × (1−가중치) + 베이지안 보정"), + ] + step_hdr = wb.add_format({"bold": True, "font_size": 10, "font_color": "#1A237E"}) + for step, title, desc in pipeline: + ws1.write(r, 1, f"{step}: {title}", step_hdr) + ws1.write(r, 2, desc, cell_fmt) + r += 1 + + r += 1 + ws1.merge_range(r, 1, r, 2, "3. 데이터 품질 조치", subtitle_fmt); r += 1 + quality = [ + ("SPAC/리츠 제외", "62 SPAC + 26 리츠 제거 (비정상 DD > 50 방지)"), + ("금융업 제외", "은행/보험/증권 등 레버리지 특성이 다른 업종 제외"), + ("DD 캡핑", "DD를 [-5, 15] 범위로 제한 (극단값 영향 차단)"), + ("EDF floor", "AAA등급도 최소 부도율 0.001% (1bp) 적용"), + ("단조성 보정", "등급간 부도율이 역전되지 않도록 isotonic 보정"), + ] + for label, desc in quality: + ws1.write(r, 1, label, cell_fmt) + ws1.write(r, 2, desc, cell_fmt) + r += 1 + + r += 1 + ws1.merge_range(r, 1, r, 2, "4. 주요 한계점", subtitle_fmt); r += 1 + limits = [ + "• E(자기자본)에 장부가치(total_equity) 사용 — 시가총액 미확보로 인한 대체", + "• 실제 부도 관측 없이 이론적 EDF를 부도율로 대체", + "• 단일 시점(2024년말) 분석 — 시계열/경기주기 미반영", + "• Merton 모형의 구조적 한계 (정규분포 가정, 단일 만기 가정)", + ] + for text in limits: + ws1.merge_range(r, 1, r, 2, text, note_fmt) + r += 1 + + # ========================================================== + # Sheet 2: Default Rates (등급별 부도율) + # ========================================================== + ws2 = wb.add_worksheet("Default Rates") + ws2.hide_gridlines(2) + ws2.set_column("A:A", 3) + ws2.set_column("B:B", 8) + ws2.set_column("C:C", 8) + ws2.set_column("D:G", 14) + ws2.set_tab_color("#E65100") + + r = 1 + ws2.merge_range(r, 1, r, 6, "등급별 기대부도율 (EDF by Rating)", title_fmt) + r += 2 + + # 테이블 헤더 + hdrs = ["등급", "기업수", "한국 EDF", "글로벌 DR", "블렌딩 DR", "최종 DR"] + for i, h in enumerate(hdrs): + ws2.write(r, i+1, h, header_fmt) + r += 1 + + for _, row in default_rates.iterrows(): + rating = row["rating_grade"] + rfmt = rating_fmts.get(rating, cell_center) + ws2.write(r, 1, rating, rfmt) + ws2.write(r, 2, int(row["n_firms"]), cell_center) + ws2.write(r, 3, row["korean_dr"], pct_fmt) + ws2.write(r, 4, row["global_dr"], pct_fmt) + ws2.write(r, 5, row["blended_dr"], pct_fmt) + ws2.write(r, 6, row["bayesian_dr"], pct_fmt) + r += 1 + + # 차트: 등급별 부도율 + chart = wb.add_chart({"type": "column"}) + data_start = 4 + data_end = data_start + len(default_rates) - 1 + chart.add_series({ + "name": "한국 EDF", + "categories": ["Default Rates", data_start, 1, data_end, 1], + "values": ["Default Rates", data_start, 3, data_end, 3], + "fill": {"color": "#1565C0"}, + }) + chart.add_series({ + "name": "글로벌 DR", + "categories": ["Default Rates", data_start, 1, data_end, 1], + "values": ["Default Rates", data_start, 4, data_end, 4], + "fill": {"color": "#E0E0E0"}, + "border": {"color": "#757575"}, + }) + chart.add_series({ + "name": "최종 DR", + "categories": ["Default Rates", data_start, 1, data_end, 1], + "values": ["Default Rates", data_start, 6, data_end, 6], + "fill": {"color": "#E65100"}, + }) + chart.set_title({"name": "등급별 기대부도율 비교", "name_font": {"size": 12}}) + chart.set_y_axis({"name": "부도율", "num_format": "0.00%"}) + chart.set_x_axis({"name": "신용등급"}) + chart.set_size({"width": 750, "height": 400}) + chart.set_style(10) + ws2.insert_chart(f"B{r+2}", chart) + + # ========================================================== + # Sheet 3: Company Detail (종목별 상세) + # ========================================================== + ws3 = wb.add_worksheet("Company Detail") + ws3.hide_gridlines(2) + ws3.set_tab_color("#2E7D32") + ws3.freeze_panes(3, 3) # 3행, 3열 고정 + + # 열 너비 + col_widths = {"A": 2, "B": 9, "C": 16, "D": 7, "E": 7, "F": 9, + "G": 7, "H": 14, "I": 14, "J": 14, "K": 10, + "L": 10, "M": 10, "N": 8} + for col, w in col_widths.items(): + ws3.set_column(f"{col}:{col}", w) + + r = 1 + ws3.merge_range(r, 1, r, 13, f"종목별 Shadow Rating 상세 ({len(companies):,}개 종목)", title_fmt) + r += 1 + + # 헤더 + headers = ["종목코드", "종목명", "등급", "DD", "EDF(%)", + "σ_E", "총자산(억)", "부채(억)", "자본(억)", "레버리지", + "ROA(%)", "이자보상", "Solver"] + for i, h in enumerate(headers): + ws3.write(r, i+1, h, header_fmt) + r += 1 + + for _, row in companies.iterrows(): + rating = str(row.get("shadow_rating", "NR")) + rfmt = rating_fmts.get(rating, cell_center) + dd = row["DD"] + edf = row["EDF"] + + # DD에 따른 조건부 색상 + if dd >= 3.5: + dd_fmt = good_fmt + elif dd >= 2.0: + dd_fmt = warn_fmt + else: + dd_fmt = bad_fmt + + ws3.write(r, 1, row["ticker"], cell_center) + ws3.write(r, 2, row["name"][:15] if row["name"] else "", cell_fmt) + ws3.write(r, 3, rating, rfmt) + ws3.write(r, 4, dd, dec2_fmt) + ws3.write(r, 5, edf * 100 if pd.notna(edf) else None, dec4_fmt) + ws3.write(r, 6, row["sigma_E"], dec4_fmt) + ws3.write(r, 7, row["total_assets"] / 1e8 if pd.notna(row["total_assets"]) else None, num_fmt) + ws3.write(r, 8, row["total_liabilities"] / 1e8 if pd.notna(row["total_liabilities"]) else None, num_fmt) + ws3.write(r, 9, row["total_equity"] / 1e8 if pd.notna(row["total_equity"]) else None, num_fmt) + ws3.write(r, 10, row["leverage_ratio"], dec2_fmt) + ws3.write(r, 11, row["roa"] * 100 if pd.notna(row["roa"]) else None, dec2_fmt) + ws3.write(r, 12, row["interest_coverage"], dec2_fmt) + ws3.write(r, 13, row["method"], cell_center) + r += 1 + + # 조건부 서식 (DD 컬럼 전체) + data_rows = len(companies) + ws3.conditional_format(3, 4, 3 + data_rows, 4, { + "type": "3_color_scale", + "min_color": "#FFCDD2", + "mid_color": "#FFF9C4", + "max_color": "#C8E6C9", + }) + + # 자동 필터 + ws3.autofilter(2, 1, 2 + data_rows, 13) + + # ========================================================== + # Sheet 4: Data Quality (데이터 품질) + # ========================================================== + ws4 = wb.add_worksheet("Data Quality") + ws4.hide_gridlines(2) + ws4.set_column("A:A", 3) + ws4.set_column("B:B", 22) + ws4.set_column("C:C", 15) + ws4.set_column("D:D", 45) + ws4.set_tab_color("#78909C") + + r = 1 + ws4.merge_range(r, 1, r, 3, "데이터 품질 검증 결과", title_fmt) + r += 2 + + ws4.merge_range(r, 1, r, 3, "파이프라인 통계", subtitle_fmt); r += 1 + pipe_stats = [ + ("DART 상장기업", f"{stats['total_listed']:,}개", "OpenDartReader corp_codes 기준"), + ("KRX 주가 수집", f"{stats['market_data']:,}개", "120거래일 이상 데이터 보유 종목"), + ("DART 재무제표", f"{stats['financial_data']:,}개", "2024년 연결/개별재무제표"), + ("비정상 종목 제외", f"{stats['excluded']:,}개", "SPAC(62) + 리츠(26) + 금융업 등"), + ("최종 분석 대상", f"{stats['merton_results']:,}개", "Merton DD/EDF 산출 완료"), + ] + for label, value, desc in pipe_stats: + ws4.write(r, 1, label, cell_fmt) + ws4.write(r, 2, value, cell_center) + ws4.write(r, 3, desc, cell_fmt) + r += 1 + + r += 1 + ws4.merge_range(r, 1, r, 3, "Merton 산출 품질", subtitle_fmt); r += 1 + + n_converged = (companies["method"] == "fsolve").sum() + (companies["method"] == "iterative").sum() + n_fallback = (companies["method"] == "naive_fallback").sum() + + quality_stats = [ + ("fsolve 수렴", f"{n_converged:,}개", f"전체의 {n_converged/len(companies)*100:.1f}%"), + ("naive_fallback", f"{n_fallback:,}개", "Merton 미수렴 → Bharath-Shumway 간편 DD"), + ("DD 평균 / 중앙값", f"{companies['DD'].mean():.2f} / {companies['DD'].median():.2f}", "캡핑 후 [-5, 15] 범위"), + ("EDF 평균 / 중앙값", f"{companies['EDF'].mean():.4%} / {companies['EDF'].median():.4%}", "이론적 부도확률"), + ("변동성 평균", f"{companies['sigma_E'].mean():.2%}", "연환산 주가 변동성"), + ] + for label, value, desc in quality_stats: + ws4.write(r, 1, label, cell_fmt) + ws4.write(r, 2, value, cell_center) + ws4.write(r, 3, desc, cell_fmt) + r += 1 + + # 등급 분포 pie chart + r += 1 + ws4.merge_range(r, 1, r, 3, "등급 분포", subtitle_fmt); r += 1 + + ig_count = len(companies[companies["shadow_rating"].isin( + ["AAA","AA+","AA","AA-","A+","A","A-","BBB+","BBB","BBB-"] + )]) + sg_count = len(companies) - ig_count + + ws4.write(r, 1, "투자등급 (AAA~BBB-)", cell_fmt) + ws4.write(r, 2, f"{ig_count:,}개 ({ig_count/len(companies)*100:.1f}%)", good_fmt) + r += 1 + ws4.write(r, 1, "투기등급 (BB+~CCC-)", cell_fmt) + ws4.write(r, 2, f"{sg_count:,}개 ({sg_count/len(companies)*100:.1f}%)", bad_fmt) + + # ---- 완료 ---- + wb.close() + print(f"[Report] Excel saved: {output_path}") + print(f" - 4 sheets: Overview, Default Rates, Company Detail ({len(companies):,} rows), Data Quality") + + +def main(): + parser = argparse.ArgumentParser(description="EDF Excel 리포트 생성") + parser.add_argument("--output", default=None, help="출력 파일 경로") + args = parser.parse_args() + + output_dir = Path(__file__).parent.parent.parent / "outputs" + output_dir.mkdir(exist_ok=True) + + if args.output: + output_path = args.output + else: + output_path = str(output_dir / f"EDF_Report_{datetime.now().strftime('%Y%m%d')}.xlsx") + + create_report(output_path) + + +if __name__ == "__main__": + main() diff --git a/test_setup.py b/test_setup.py index 7eae26a..2af75bf 100644 --- a/test_setup.py +++ b/test_setup.py @@ -1,42 +1,55 @@ -"""시가총액 backfill: per-ticker get_market_cap_by_date 재시도""" -import sqlite3, time -from pykrx import stock +"""등급역전 분석 + AAA EDF 진단""" +import sqlite3 +import numpy as np +from scipy.stats import norm conn = sqlite3.connect("data/edf.db") -# 샘플 5개로 정확한 API 동작 확인 -tickers = ["005930", "000660", "005380", "035720", "068270"] +# 1) 등급별 EDF 상세 확인 — 역전 여부 +print("=== 등급별 EDF 상세 (역전 확인) ===") +rows = conn.execute(""" + SELECT dd_rating, COUNT(*), AVG(DD), MIN(DD), MAX(DD), AVG(EDF), MIN(EDF), MAX(EDF) + FROM merton_results + GROUP BY dd_rating + ORDER BY AVG(DD) DESC +""").fetchall() -for tk in tickers: - print(f"\n=== {tk} ===") - - # Method 1: get_market_cap_by_date (fromdate, todate, ticker) - try: - cap = stock.get_market_cap_by_date("20250301", "20250307", tk) - time.sleep(0.3) - print(f" cap_by_date: {len(cap)} rows") - if len(cap) > 0: - print(f" columns: {list(cap.columns)}") - print(cap.tail(2)) - except Exception as e: - print(f" cap_by_date ERROR: {e}") - - # Method 2: get_market_fundamental_by_date - try: - fund = stock.get_market_fundamental_by_date("20250301", "20250307", tk) - time.sleep(0.3) - print(f" fundamental: {len(fund)} rows, columns: {list(fund.columns)}") - except Exception as e: - print(f" fundamental ERROR: {e}") +rating_order = ["AAA","AA+","AA","AA-","A+","A","A-","BBB+","BBB","BBB-", + "BB+","BB","BB-","B+","B","B-","CCC+","CCC","CCC-"] - # Method 3: get_exhaustive_info - try: - cap = stock.get_market_cap_by_ticker("20250307") - time.sleep(0.3) - if tk in cap.index: - print(f" cap_by_ticker: 시총={cap.loc[tk, '시가총액']:,.0f}, 주식수={cap.loc[tk, '상장주식수']:,}") - else: - print(f" cap_by_ticker: {tk} not found, total rows={len(cap)}") - break # 한번만 호출 (전체 시장 데이터) - except Exception as e: - print(f" cap_by_ticker ERROR: {e}") +prev_edf = -1 +print(f"{'등급':>5} | {'N':>5} | {'DD평균':>8} | {'DD최소':>8} | {'EDF평균':>12} | {'역전':>4}") +print("-" * 65) +for rating in rating_order: + match = [r for r in rows if r[0] == rating] + if match: + r = match[0] + edf = r[5] + inversion = " !!!" if edf < prev_edf and prev_edf >= 0 else "" + print(f" {r[0]:5s} | {r[1]:5d} | {r[2]:8.2f} | {r[3]:8.2f} | {edf:12.8f} | {inversion}") + prev_edf = edf + +# 2) AAA 개별 확인 +print("\n=== AAA 종목 상세 ===") +aaa = conn.execute(""" + SELECT mr.ticker, c.name, mr.DD, mr.EDF, mr.E, mr.D, mr.sigma_V + FROM merton_results mr JOIN companies c ON mr.ticker = c.ticker + WHERE mr.dd_rating = 'AAA' + ORDER BY mr.DD DESC + LIMIT 15 +""").fetchall() +for r in aaa: + # 수동 EDF 계산 + manual_edf = norm.cdf(-r[2]) + print(f" {r[0]} {r[1][:15]:15s} | DD={r[2]:8.2f} | EDF={r[3]:.2e} | manual_N(-DD)={manual_edf:.2e} | E={r[4]:.2e} D={r[5]:.2e}") + +# 3) AA- EDF 역전 확인 (AA-가 A+보다 높은 문제) +print("\n=== AA- vs A+ 비교 ===") +for grade in ["AA-", "A+", "A", "A-"]: + data = conn.execute(f""" + SELECT AVG(DD), AVG(EDF), MIN(EDF), MAX(EDF), COUNT(*) + FROM merton_results WHERE dd_rating = '{grade}' + """).fetchone() + print(f" {grade:5s}: DD평균={data[0]:.2f}, EDF평균={data[1]:.6f}, EDF범위=[{data[2]:.6f}, {data[3]:.6f}], N={data[4]}") + +conn.close()