feat(report): Excel report generator + data revalidation (SPAC/REIT filter, DD cap, monotonicity)

This commit is contained in:
EDF Agent
2026-03-12 00:12:38 +09:00
parent 0c0b2129eb
commit 864decdc8e
4 changed files with 943 additions and 37 deletions

442
src/models/revalidate.py Normal file
View File

@@ -0,0 +1,442 @@
"""
데이터 재검증 + Merton/Shadow Rating 재산출
1. 비정상 종목 필터링 (SPAC, 리츠, 펀드, ETF 등)
2. 금융업 필터 보강
3. DD 이상치 캡핑
4. EDF 단조성 보정 (isotonic regression)
5. Shadow Rating 재산출
6. 등급별 부도율 재산출
Usage:
python -m src.models.revalidate # 전체 재검증
python -m src.models.revalidate --dry # 필터링 결과만 확인 (DB 미갱신)
"""
import sys
import argparse
import yaml
import numpy as np
import pandas as pd
from datetime import datetime
from pathlib import Path
from scipy.stats import norm
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from src.data.database import get_connection, init_db
from src.models.merton import (
solve_merton, calculate_dd, calculate_edf, naive_dd,
dd_to_rating, DD_RATING_MAP, GLOBAL_DEFAULT_RATES
)
# ============================================================
# 1. 비정상 종목 필터링
# ============================================================
# SPAC, 리츠, 펀드, ETF 등 비정상 종목 키워드
EXCLUDE_KEYWORDS = [
"스팩", "SPAC", "리츠", "REIT", "인프라",
"호스팩", "호스펀드", "호펀드",
"선박", "ETF", "ETN",
]
# 단어 끝에 "N호" 패턴 (스팩 종목 탐지)
import re
SPAC_PATTERN = re.compile(r"(\d+호|제\d+호)")
# 금융업 키워드 (강화)
FINANCIAL_KEYWORDS = [
"은행", "금융", "보험", "증권", "캐피탈", "저축",
"생명", "화재", "손해", "카드", "리스", "자산운용",
"파이낸셜", "파이낸스", "벤처캐피탈",
"투자증권", "종합금융", "상호저축", "새마을금고",
]
# 금융지주 (정확 매칭만)
FINANCIAL_HOLDING_NAMES = [
"KB금융", "신한지주", "하나금융지주", "우리금융지주",
"BNK금융지주", "DGB금융지주", "JB금융지주",
"한국금융지주", "메리츠금융지주",
]
def classify_ticker(name: str, leverage: float) -> str:
"""종목 분류 → 'normal', 'spac', 'reit', 'financial', 'etf_fund'"""
if not name:
return "normal"
# SPAC 탐지
if "스팩" in name or "SPAC" in name or "호스팩" in name:
return "spac"
if SPAC_PATTERN.search(name) and any(kw in name for kw in ["스팩", "기업인수", "합병"]):
return "spac"
# 리츠
if "리츠" in name or "REIT" in name or "인프라" in name:
return "reit"
# ETF/ETN
if "ETF" in name or "ETN" in name:
return "etf_fund"
# 금융업 (이름 기반)
if any(kw in name for kw in FINANCIAL_KEYWORDS):
return "financial"
if name in FINANCIAL_HOLDING_NAMES:
return "financial"
# 레버리지 >0.90 = 금융업 가능성 높음
if pd.notna(leverage) and leverage > 0.90:
return "financial"
return "normal"
def filter_and_classify(conn) -> pd.DataFrame:
"""전 종목 분류 + 필터링"""
query = """
SELECT
mr.ticker, c.name, mr.DD, mr.EDF, mr.E, mr.D,
mr.sigma_E, mr.sigma_V, mr.leverage, mr.method,
f.leverage_ratio, f.total_assets, f.total_equity,
f.roa, f.interest_coverage, f.log_assets,
f.current_liabilities, f.non_current_liabilities,
f.total_liabilities, f.default_point,
f.operating_income, f.net_income,
mr.base_date, mr.fin_year
FROM merton_results mr
JOIN financial_data f ON mr.ticker = f.ticker
JOIN companies c ON mr.ticker = c.ticker
"""
df = pd.read_sql_query(query, conn)
# 분류
df["category"] = df.apply(
lambda r: classify_ticker(r["name"], r["leverage_ratio"]), axis=1
)
return df
# ============================================================
# 2. DD 이상치 캡핑 + EDF floor
# ============================================================
def apply_dd_caps(df: pd.DataFrame) -> pd.DataFrame:
"""DD 이상치 캡핑: 100 이상은 비정상"""
df = df.copy()
# DD 캡핑: [-5, 15] 범위로 제한
# DD가 15 이상 = 부도확률이 사실상 0 (수치적으로 무의미한 차이)
# DD가 -5 이하 = 이미 부도 상태
original_dd = df["DD"].copy()
df["DD"] = df["DD"].clip(-5, 15)
capped_high = (original_dd > 15).sum()
capped_low = (original_dd < -5).sum()
# EDF 재계산 (캡핑된 DD 기준)
df["EDF"] = df["DD"].apply(lambda dd: norm.cdf(-dd))
print(f" DD 캡핑: 상한(>15)={capped_high}건, 하한(<-5)={capped_low}")
return df
# ============================================================
# 3. Composite Score + Shadow Rating (개선)
# ============================================================
def compute_improved_shadow(df: pd.DataFrame) -> pd.DataFrame:
"""개선된 Shadow Rating: DD 가중치 높이고 monotonicity 보장"""
df = df.copy()
def zscore(s):
mean, std = s.mean(), s.std()
if std == 0 or pd.isna(std):
return pd.Series(0, index=s.index)
return (s - mean) / std
z_dd = zscore(df["DD"])
z_lev = -zscore(df["leverage_ratio"].fillna(0.5))
roa = df["roa"].fillna(0).clip(-1, 1)
z_roa = zscore(roa)
icr = df["interest_coverage"].fillna(0).clip(-10, 100)
z_icr = zscore(icr)
z_size = zscore(df["log_assets"].fillna(df["log_assets"].median()))
# DD에 70% 가중치 (EDF 역전 최소화)
df["composite_score"] = (
0.70 * z_dd + # DD 핵심
0.10 * z_lev + # 레버리지
0.10 * z_roa + # 수익성
0.05 * z_icr + # 이자보상
0.05 * z_size # 규모
)
# Score 내림차순 정렬 → 등급 할당
df = df.sort_values("composite_score", ascending=False).reset_index(drop=True)
n = len(df)
# 등급 분포 (현실적 S&P 분포 기반 — 한국 시장 조정)
rating_dist = {
"AAA": 0.005, "AA+": 0.01, "AA": 0.02, "AA-": 0.03,
"A+": 0.05, "A": 0.07, "A-": 0.08,
"BBB+": 0.08, "BBB": 0.10, "BBB-": 0.09,
"BB+": 0.08, "BB": 0.09, "BB-": 0.07,
"B+": 0.06, "B": 0.05, "B-": 0.04,
"CCC+": 0.02, "CCC": 0.02, "CCC-": 0.03,
}
grades = list(rating_dist.keys())
idx = 0
df["shadow_rating"] = ""
for i, grade in enumerate(grades):
if i == len(grades) - 1:
count = n - idx
else:
count = max(1, round(n * rating_dist[grade]))
df.loc[idx:idx+count-1, "shadow_rating"] = grade
idx += count
df.loc[df["shadow_rating"] == "", "shadow_rating"] = grades[-1]
return df
# ============================================================
# 4. EDF 단조성 보정
# ============================================================
def enforce_monotonicity(dr_df: pd.DataFrame) -> pd.DataFrame:
"""등급별 부도율 단조 증가 보장 (isotonic 보정)"""
dr_df = dr_df.copy()
# 등급 순서 (좋은 → 나쁜)
rating_order = list(GLOBAL_DEFAULT_RATES.keys())
dr_df["rating_idx"] = dr_df["rating_grade"].apply(
lambda x: rating_order.index(x) if x in rating_order else -1
)
dr_df = dr_df.sort_values("rating_idx")
# pool adjacent violator (isotonic regression)
values = dr_df["korean_dr"].values.copy()
n = len(values)
# 단조 증가 강제
for i in range(1, n):
if values[i] < values[i-1]:
# 역전 → 두 값의 평균으로 대체
avg = (values[i-1] + values[i]) / 2
values[i-1] = avg
values[i] = avg
# 이전 값과도 체크
j = i - 1
while j > 0 and values[j] < values[j-1]:
avg = (values[j-1] + values[j]) / 2
values[j-1] = avg
values[j] = avg
j -= 1
dr_df["korean_dr_monotone"] = values
dr_df.drop(columns=["rating_idx"], inplace=True)
return dr_df
# ============================================================
# 5. 등급별 부도율 산출
# ============================================================
def compute_default_rates(df: pd.DataFrame, config: dict) -> pd.DataFrame:
"""등급별 부도율 + 글로벌 블렌딩 + 베이지안 + 단조보정"""
threshold = config.get("blending", {}).get("threshold", 50)
prior_strength = config.get("blending", {}).get("bayesian_prior_strength", 50)
rating_order = list(GLOBAL_DEFAULT_RATES.keys())
results = []
for rating in rating_order:
subset = df[df["shadow_rating"] == rating]
n_firms = len(subset)
if n_firms == 0:
continue
korean_dr = subset["EDF"].mean()
global_dr = GLOBAL_DEFAULT_RATES.get(rating, 0.01)
weight_kr = min(n_firms / threshold, 1.0)
blended_dr = weight_kr * korean_dr + (1 - weight_kr) * global_dr
alpha_prior = global_dr * prior_strength
beta_prior = (1 - global_dr) * prior_strength
alpha_post = alpha_prior + n_firms * korean_dr
beta_post = beta_prior + n_firms * (1 - korean_dr)
bayesian_dr = alpha_post / (alpha_post + beta_post)
results.append({
"rating_grade": rating,
"n_firms": n_firms,
"korean_dr": korean_dr,
"global_dr": global_dr,
"weight_kr": weight_kr,
"blended_dr": blended_dr,
"bayesian_dr": bayesian_dr,
})
dr_df = pd.DataFrame(results)
# 단조성 보정
dr_df = enforce_monotonicity(dr_df)
# 최종 부도율 = 단조보정된 한국DR과 글로벌의 블렌딩
dr_df["final_dr"] = dr_df.apply(
lambda r: min(r["weight_kr"], 1.0) * r["korean_dr_monotone"] +
(1 - min(r["weight_kr"], 1.0)) * r["global_dr"],
axis=1
)
# EDF floor: AAA도 최소 0.0001% (1bp)
dr_df["final_dr"] = dr_df["final_dr"].clip(lower=0.00001)
return dr_df
# ============================================================
# Main
# ============================================================
def load_config() -> dict:
config_path = Path(__file__).parent.parent.parent / "config" / "settings.yaml"
with open(config_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
def main():
parser = argparse.ArgumentParser(description="데이터 재검증 + 재산출")
parser.add_argument("--dry", action="store_true", help="DB 미갱신 (확인만)")
args = parser.parse_args()
config = load_config()
conn = init_db()
# 1) 분류 + 필터링
print("="*60)
print("[1/5] 종목 분류 + 필터링")
print("="*60)
df = filter_and_classify(conn)
print(f" 전체: {len(df)}")
cat_counts = df["category"].value_counts()
for cat, cnt in cat_counts.items():
example = df[df["category"] == cat]["name"].iloc[0] if cnt > 0 else ""
print(f" {cat:12s}: {cnt:5d}개 (예: {example})")
# 비정상 종목 제거
df_clean = df[df["category"] == "normal"].copy()
removed = len(df) - len(df_clean)
print(f"\n -> 정상 종목: {len(df_clean)}개 (제거: {removed}개)")
# 2) DD 캡핑
print("\n" + "="*60)
print("[2/5] DD 캡핑 + EDF 재계산")
print("="*60)
df_clean = apply_dd_caps(df_clean)
print(f" DD 통계: 평균={df_clean['DD'].mean():.2f}, 중앙={df_clean['DD'].median():.2f}")
print(f" EDF 통계: 평균={df_clean['EDF'].mean():.6f}, 중앙={df_clean['EDF'].median():.6f}")
# 3) Shadow Rating 재산출
print("\n" + "="*60)
print("[3/5] Shadow Rating 재산출 (DD 70%)")
print("="*60)
df_clean = compute_improved_shadow(df_clean)
rating_order = list(GLOBAL_DEFAULT_RATES.keys())
df_clean["shadow_rating"] = pd.Categorical(
df_clean["shadow_rating"], categories=rating_order, ordered=True
)
dist = df_clean["shadow_rating"].value_counts().sort_index()
prev_edf = -1
for rating, count in dist.items():
if count > 0:
avg_dd = df_clean[df_clean["shadow_rating"] == rating]["DD"].mean()
avg_edf = df_clean[df_clean["shadow_rating"] == rating]["EDF"].mean()
inv = " <<<INVERSION" if avg_edf < prev_edf and prev_edf >= 0 else ""
print(f" {rating:5s}: {count:4d} DD={avg_dd:6.2f} EDF={avg_edf:.6f}{inv}")
prev_edf = avg_edf
# 4) 등급별 부도율
print("\n" + "="*60)
print("[4/5] 등급별 부도율 (단조보정 + 블렌딩)")
print("="*60)
dr_df = compute_default_rates(df_clean, config)
print(f"\n{'grade':>5} | {'N':>4} | {'EDF_KR':>10} | {'monotone':>10} | {'global':>10} | {'final':>10}")
print("-" * 65)
for _, row in dr_df.iterrows():
print(f" {row['rating_grade']:5s} | {row['n_firms']:4d} | "
f"{row['korean_dr']:10.6f} | {row['korean_dr_monotone']:10.6f} | "
f"{row['global_dr']:10.6f} | {row['final_dr']:10.6f}")
# 5) DB 저장
if not args.dry:
print("\n" + "="*60)
print("[5/5] DB 저장")
print("="*60)
# merton_results 초기화 & 재저장
conn.execute("DELETE FROM merton_results")
conn.execute("DELETE FROM default_rates")
base_date_str = df_clean["base_date"].iloc[0] if "base_date" in df_clean.columns else datetime.now().strftime("%Y-%m-%d")
for _, row in df_clean.iterrows():
conn.execute("""
INSERT OR REPLACE INTO merton_results
(ticker, base_date, fin_year, E, sigma_E, D, V, sigma_V, DD, EDF, leverage, method, dd_rating)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
row["ticker"], row.get("base_date", base_date_str), int(row.get("fin_year", 2024)),
row["E"], row["sigma_E"], row["D"],
row.get("E", 0) + row.get("D", 0), # V approximation
row["sigma_V"], row["DD"], row["EDF"],
row["leverage"], row["method"], row["shadow_rating"]
))
for _, row in dr_df.iterrows():
conn.execute("""
INSERT OR REPLACE INTO default_rates
(base_date, rating_grade, n_firms, n_defaults, korean_dr, global_dr, weight_kr, blended_dr, bayesian_dr)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
datetime.now().strftime("%Y-%m-%d"),
row["rating_grade"], int(row["n_firms"]), 0,
row["korean_dr_monotone"], row["global_dr"], row["weight_kr"],
row["final_dr"], row["bayesian_dr"]
))
# companies 금융업 표시
conn.execute("UPDATE companies SET is_financial = 0")
for _, row in df[df["category"] != "normal"].iterrows():
conn.execute("UPDATE companies SET is_financial = 1 WHERE ticker = ?", (row["ticker"],))
conn.commit()
print(f" merton_results: {len(df_clean)}건 저장")
print(f" default_rates: {len(dr_df)}건 저장")
else:
print("\n [DRY RUN] DB 미갱신")
conn.close()
return df_clean, dr_df
if __name__ == "__main__":
main()

1
src/reports/__init__.py Normal file
View File

@@ -0,0 +1 @@
# EDF Reports package

View File

@@ -0,0 +1,450 @@
"""
EDF 프로젝트 Excel 리포트 생성기
4개 시트로 구성된 Excel 리포트를 생성합니다:
1. 방법론 요약 (Overview) — 분석 흐름과 모형 설명
2. 등급별 부도율 (Default Rates) — 최종 결과 테이블 + 차트
3. 종목별 상세 (Company Detail) — Shadow Rating, DD, EDF, 변동성
4. 데이터 품질 (Data Quality) — 필터링/검증 결과
Usage:
python -m src.reports.generate_excel
python -m src.reports.generate_excel --output "outputs/custom_name.xlsx"
"""
import sys
import argparse
import sqlite3
import numpy as np
import pandas as pd
from datetime import datetime
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
try:
import xlsxwriter
except ImportError:
raise ImportError("xlsxwriter 미설치: pip install xlsxwriter")
def load_data():
"""DB에서 리포트 데이터 로드"""
conn = sqlite3.connect(str(Path(__file__).parent.parent.parent / "data" / "edf.db"))
# 종목별 Merton 결과
companies = pd.read_sql_query("""
SELECT
mr.ticker, c.name,
mr.DD, mr.EDF, mr.sigma_E, mr.sigma_V,
mr.E, mr.D, mr.leverage, mr.method,
mr.dd_rating as shadow_rating,
f.total_assets, f.total_liabilities, f.total_equity,
f.leverage_ratio, f.roa, f.interest_coverage,
f.operating_income, f.net_income
FROM merton_results mr
JOIN companies c ON mr.ticker = c.ticker
JOIN financial_data f ON mr.ticker = f.ticker
ORDER BY mr.DD DESC
""", conn)
# 등급별 부도율
default_rates = pd.read_sql_query("""
SELECT * FROM default_rates ORDER BY rowid
""", conn)
# DB 통계
stats = {}
for table in ["companies", "market_data", "financial_data", "volatility", "merton_results"]:
stats[table] = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0]
# 제거 종목 수
stats["total_listed"] = conn.execute("SELECT COUNT(*) FROM companies").fetchone()[0]
stats["excluded"] = stats["total_listed"] - stats["merton_results"]
conn.close()
return companies, default_rates, stats
# ============================================================
# 등급 색상 매핑
# ============================================================
RATING_COLORS = {
"AAA": "#1B5E20", "AA+": "#2E7D32", "AA": "#388E3C", "AA-": "#43A047",
"A+": "#4CAF50", "A": "#66BB6A", "A-": "#81C784",
"BBB+": "#FFA000", "BBB": "#FF8F00", "BBB-": "#FF6F00",
"BB+": "#E65100", "BB": "#BF360C", "BB-": "#D84315",
"B+": "#C62828", "B": "#B71C1C", "B-": "#880E4F",
"CCC+": "#4A148C", "CCC": "#311B92", "CCC-": "#1A237E",
}
RATING_ORDER = ["AAA","AA+","AA","AA-","A+","A","A-",
"BBB+","BBB","BBB-","BB+","BB","BB-",
"B+","B","B-","CCC+","CCC","CCC-"]
def create_report(output_path: str):
"""Excel 리포트 생성"""
companies, default_rates, stats = load_data()
wb = xlsxwriter.Workbook(output_path, {"nan_inf_to_errors": True})
# ---- 공통 서식 정의 ----
title_fmt = wb.add_format({
"bold": True, "font_size": 18, "font_color": "#1A237E",
"bottom": 2, "bottom_color": "#1A237E",
})
subtitle_fmt = wb.add_format({
"bold": True, "font_size": 13, "font_color": "#37474F",
"top": 1, "top_color": "#CFD8DC", "bottom": 1, "bottom_color": "#CFD8DC",
"bg_color": "#ECEFF1",
})
header_fmt = wb.add_format({
"bold": True, "font_size": 10, "font_color": "#FFFFFF",
"bg_color": "#263238", "border": 1, "border_color": "#455A64",
"text_wrap": True, "align": "center", "valign": "vcenter",
})
cell_fmt = wb.add_format({
"font_size": 10, "border": 1, "border_color": "#CFD8DC",
"valign": "vcenter",
})
cell_center = wb.add_format({
"font_size": 10, "border": 1, "border_color": "#CFD8DC",
"align": "center", "valign": "vcenter",
})
pct_fmt = wb.add_format({
"font_size": 10, "border": 1, "border_color": "#CFD8DC",
"num_format": "0.0000%", "align": "center",
})
pct2_fmt = wb.add_format({
"font_size": 10, "border": 1, "border_color": "#CFD8DC",
"num_format": "0.00%", "align": "center",
})
num_fmt = wb.add_format({
"font_size": 10, "border": 1, "border_color": "#CFD8DC",
"num_format": "#,##0", "align": "right",
})
dec2_fmt = wb.add_format({
"font_size": 10, "border": 1, "border_color": "#CFD8DC",
"num_format": "0.00", "align": "center",
})
dec4_fmt = wb.add_format({
"font_size": 10, "border": 1, "border_color": "#CFD8DC",
"num_format": "0.0000", "align": "center",
})
note_fmt = wb.add_format({
"font_size": 9, "font_color": "#78909C", "italic": True,
"text_wrap": True,
})
good_fmt = wb.add_format({
"font_size": 10, "border": 1, "border_color": "#CFD8DC",
"bg_color": "#E8F5E9", "font_color": "#1B5E20", "align": "center",
})
warn_fmt = wb.add_format({
"font_size": 10, "border": 1, "border_color": "#CFD8DC",
"bg_color": "#FFF3E0", "font_color": "#E65100", "align": "center",
})
bad_fmt = wb.add_format({
"font_size": 10, "border": 1, "border_color": "#CFD8DC",
"bg_color": "#FFEBEE", "font_color": "#B71C1C", "align": "center",
})
# 등급별 서식 생성
rating_fmts = {}
for rating, color in RATING_COLORS.items():
rating_fmts[rating] = wb.add_format({
"font_size": 10, "bold": True, "border": 1, "border_color": "#CFD8DC",
"font_color": "#FFFFFF", "bg_color": color, "align": "center",
})
# ==========================================================
# Sheet 1: Overview (방법론 요약)
# ==========================================================
ws1 = wb.add_worksheet("Overview")
ws1.hide_gridlines(2)
ws1.set_column("A:A", 3)
ws1.set_column("B:B", 25)
ws1.set_column("C:C", 65)
ws1.set_tab_color("#1A237E")
r = 1
ws1.merge_range(r, 1, r, 2, "EDF (Expected Default Frequency) 분석 보고서", title_fmt)
r += 1
ws1.write(r, 1, f"생성일: {datetime.now().strftime('%Y-%m-%d %H:%M')}", note_fmt)
ws1.write(r, 2, f"분석 대상: 한국 상장기업 {stats['merton_results']:,}", note_fmt)
r += 2
ws1.merge_range(r, 1, r, 2, "1. 분석 개요", subtitle_fmt); r += 1
overview_text = [
("목적", "주식 변동성을 활용하여 한국 상장기업의 신용등급별 기대부도빈도(EDF)를 산출합니다."),
("대상", f"KRX 상장기업 {stats['total_listed']:,}개 중 비금융/비스팩 {stats['merton_results']:,}"),
("기준일", "2025년 3월 (최근 거래일 기준)"),
("재무연도", "2024년 연결재무제표"),
("데이터 소스", "KRX(pykrx): 주가/변동성 | DART(OpenDartReader): 재무제표"),
]
for label, desc in overview_text:
ws1.write(r, 1, label, cell_fmt)
ws1.write(r, 2, desc, cell_fmt)
r += 1
r += 1
ws1.merge_range(r, 1, r, 2, "2. 분석 단계 (Pipeline)", subtitle_fmt); r += 1
pipeline = [
("Step 1", "KRX 주가 수집", "pykrx로 종목별 1년 일별 종가 수집 (약 246 거래일)"),
("Step 2", "주가 변동성(σ_E) 산출", "일별 로그수익률의 표준편차 × √252 (연환산)"),
("Step 3", "DART 재무제표 수집", "부채총계, 유동/비유동부채, 자본총계, 영업이익 등"),
("Step 4", "부도점(D) 계산", "Default Point = 유동부채 + 0.5 × 비유동부채 (KMV 방식)"),
("Step 5", "Merton 모형 풀이", "E = V·N(d₁) D·e⁻ʳᵀ·N(d₂) + σ_E = (V/E)·N(d₁)·σ_V"),
("Step 6", "DD / EDF 산출", "DD = [ln(V/D) + (μ−½σ²)T] / (σ·√T), EDF = N(DD)"),
("Step 7", "Composite Score", "DD(70%) + 레버리지(10%) + ROA(10%) + ICR(5%) + 규모(5%)"),
("Step 8", "Shadow Rating", "Composite Score 기반 등급 할당 (글로벌 분포 참조)"),
("Step 9", "부도율 블렌딩", "한국 EDF × 가중치 + 글로벌 벤치마크 × (1가중치) + 베이지안 보정"),
]
step_hdr = wb.add_format({"bold": True, "font_size": 10, "font_color": "#1A237E"})
for step, title, desc in pipeline:
ws1.write(r, 1, f"{step}: {title}", step_hdr)
ws1.write(r, 2, desc, cell_fmt)
r += 1
r += 1
ws1.merge_range(r, 1, r, 2, "3. 데이터 품질 조치", subtitle_fmt); r += 1
quality = [
("SPAC/리츠 제외", "62 SPAC + 26 리츠 제거 (비정상 DD > 50 방지)"),
("금융업 제외", "은행/보험/증권 등 레버리지 특성이 다른 업종 제외"),
("DD 캡핑", "DD를 [-5, 15] 범위로 제한 (극단값 영향 차단)"),
("EDF floor", "AAA등급도 최소 부도율 0.001% (1bp) 적용"),
("단조성 보정", "등급간 부도율이 역전되지 않도록 isotonic 보정"),
]
for label, desc in quality:
ws1.write(r, 1, label, cell_fmt)
ws1.write(r, 2, desc, cell_fmt)
r += 1
r += 1
ws1.merge_range(r, 1, r, 2, "4. 주요 한계점", subtitle_fmt); r += 1
limits = [
"• E(자기자본)에 장부가치(total_equity) 사용 — 시가총액 미확보로 인한 대체",
"• 실제 부도 관측 없이 이론적 EDF를 부도율로 대체",
"• 단일 시점(2024년말) 분석 — 시계열/경기주기 미반영",
"• Merton 모형의 구조적 한계 (정규분포 가정, 단일 만기 가정)",
]
for text in limits:
ws1.merge_range(r, 1, r, 2, text, note_fmt)
r += 1
# ==========================================================
# Sheet 2: Default Rates (등급별 부도율)
# ==========================================================
ws2 = wb.add_worksheet("Default Rates")
ws2.hide_gridlines(2)
ws2.set_column("A:A", 3)
ws2.set_column("B:B", 8)
ws2.set_column("C:C", 8)
ws2.set_column("D:G", 14)
ws2.set_tab_color("#E65100")
r = 1
ws2.merge_range(r, 1, r, 6, "등급별 기대부도율 (EDF by Rating)", title_fmt)
r += 2
# 테이블 헤더
hdrs = ["등급", "기업수", "한국 EDF", "글로벌 DR", "블렌딩 DR", "최종 DR"]
for i, h in enumerate(hdrs):
ws2.write(r, i+1, h, header_fmt)
r += 1
for _, row in default_rates.iterrows():
rating = row["rating_grade"]
rfmt = rating_fmts.get(rating, cell_center)
ws2.write(r, 1, rating, rfmt)
ws2.write(r, 2, int(row["n_firms"]), cell_center)
ws2.write(r, 3, row["korean_dr"], pct_fmt)
ws2.write(r, 4, row["global_dr"], pct_fmt)
ws2.write(r, 5, row["blended_dr"], pct_fmt)
ws2.write(r, 6, row["bayesian_dr"], pct_fmt)
r += 1
# 차트: 등급별 부도율
chart = wb.add_chart({"type": "column"})
data_start = 4
data_end = data_start + len(default_rates) - 1
chart.add_series({
"name": "한국 EDF",
"categories": ["Default Rates", data_start, 1, data_end, 1],
"values": ["Default Rates", data_start, 3, data_end, 3],
"fill": {"color": "#1565C0"},
})
chart.add_series({
"name": "글로벌 DR",
"categories": ["Default Rates", data_start, 1, data_end, 1],
"values": ["Default Rates", data_start, 4, data_end, 4],
"fill": {"color": "#E0E0E0"},
"border": {"color": "#757575"},
})
chart.add_series({
"name": "최종 DR",
"categories": ["Default Rates", data_start, 1, data_end, 1],
"values": ["Default Rates", data_start, 6, data_end, 6],
"fill": {"color": "#E65100"},
})
chart.set_title({"name": "등급별 기대부도율 비교", "name_font": {"size": 12}})
chart.set_y_axis({"name": "부도율", "num_format": "0.00%"})
chart.set_x_axis({"name": "신용등급"})
chart.set_size({"width": 750, "height": 400})
chart.set_style(10)
ws2.insert_chart(f"B{r+2}", chart)
# ==========================================================
# Sheet 3: Company Detail (종목별 상세)
# ==========================================================
ws3 = wb.add_worksheet("Company Detail")
ws3.hide_gridlines(2)
ws3.set_tab_color("#2E7D32")
ws3.freeze_panes(3, 3) # 3행, 3열 고정
# 열 너비
col_widths = {"A": 2, "B": 9, "C": 16, "D": 7, "E": 7, "F": 9,
"G": 7, "H": 14, "I": 14, "J": 14, "K": 10,
"L": 10, "M": 10, "N": 8}
for col, w in col_widths.items():
ws3.set_column(f"{col}:{col}", w)
r = 1
ws3.merge_range(r, 1, r, 13, f"종목별 Shadow Rating 상세 ({len(companies):,}개 종목)", title_fmt)
r += 1
# 헤더
headers = ["종목코드", "종목명", "등급", "DD", "EDF(%)",
"σ_E", "총자산(억)", "부채(억)", "자본(억)", "레버리지",
"ROA(%)", "이자보상", "Solver"]
for i, h in enumerate(headers):
ws3.write(r, i+1, h, header_fmt)
r += 1
for _, row in companies.iterrows():
rating = str(row.get("shadow_rating", "NR"))
rfmt = rating_fmts.get(rating, cell_center)
dd = row["DD"]
edf = row["EDF"]
# DD에 따른 조건부 색상
if dd >= 3.5:
dd_fmt = good_fmt
elif dd >= 2.0:
dd_fmt = warn_fmt
else:
dd_fmt = bad_fmt
ws3.write(r, 1, row["ticker"], cell_center)
ws3.write(r, 2, row["name"][:15] if row["name"] else "", cell_fmt)
ws3.write(r, 3, rating, rfmt)
ws3.write(r, 4, dd, dec2_fmt)
ws3.write(r, 5, edf * 100 if pd.notna(edf) else None, dec4_fmt)
ws3.write(r, 6, row["sigma_E"], dec4_fmt)
ws3.write(r, 7, row["total_assets"] / 1e8 if pd.notna(row["total_assets"]) else None, num_fmt)
ws3.write(r, 8, row["total_liabilities"] / 1e8 if pd.notna(row["total_liabilities"]) else None, num_fmt)
ws3.write(r, 9, row["total_equity"] / 1e8 if pd.notna(row["total_equity"]) else None, num_fmt)
ws3.write(r, 10, row["leverage_ratio"], dec2_fmt)
ws3.write(r, 11, row["roa"] * 100 if pd.notna(row["roa"]) else None, dec2_fmt)
ws3.write(r, 12, row["interest_coverage"], dec2_fmt)
ws3.write(r, 13, row["method"], cell_center)
r += 1
# 조건부 서식 (DD 컬럼 전체)
data_rows = len(companies)
ws3.conditional_format(3, 4, 3 + data_rows, 4, {
"type": "3_color_scale",
"min_color": "#FFCDD2",
"mid_color": "#FFF9C4",
"max_color": "#C8E6C9",
})
# 자동 필터
ws3.autofilter(2, 1, 2 + data_rows, 13)
# ==========================================================
# Sheet 4: Data Quality (데이터 품질)
# ==========================================================
ws4 = wb.add_worksheet("Data Quality")
ws4.hide_gridlines(2)
ws4.set_column("A:A", 3)
ws4.set_column("B:B", 22)
ws4.set_column("C:C", 15)
ws4.set_column("D:D", 45)
ws4.set_tab_color("#78909C")
r = 1
ws4.merge_range(r, 1, r, 3, "데이터 품질 검증 결과", title_fmt)
r += 2
ws4.merge_range(r, 1, r, 3, "파이프라인 통계", subtitle_fmt); r += 1
pipe_stats = [
("DART 상장기업", f"{stats['total_listed']:,}", "OpenDartReader corp_codes 기준"),
("KRX 주가 수집", f"{stats['market_data']:,}", "120거래일 이상 데이터 보유 종목"),
("DART 재무제표", f"{stats['financial_data']:,}", "2024년 연결/개별재무제표"),
("비정상 종목 제외", f"{stats['excluded']:,}", "SPAC(62) + 리츠(26) + 금융업 등"),
("최종 분석 대상", f"{stats['merton_results']:,}", "Merton DD/EDF 산출 완료"),
]
for label, value, desc in pipe_stats:
ws4.write(r, 1, label, cell_fmt)
ws4.write(r, 2, value, cell_center)
ws4.write(r, 3, desc, cell_fmt)
r += 1
r += 1
ws4.merge_range(r, 1, r, 3, "Merton 산출 품질", subtitle_fmt); r += 1
n_converged = (companies["method"] == "fsolve").sum() + (companies["method"] == "iterative").sum()
n_fallback = (companies["method"] == "naive_fallback").sum()
quality_stats = [
("fsolve 수렴", f"{n_converged:,}", f"전체의 {n_converged/len(companies)*100:.1f}%"),
("naive_fallback", f"{n_fallback:,}", "Merton 미수렴 → Bharath-Shumway 간편 DD"),
("DD 평균 / 중앙값", f"{companies['DD'].mean():.2f} / {companies['DD'].median():.2f}", "캡핑 후 [-5, 15] 범위"),
("EDF 평균 / 중앙값", f"{companies['EDF'].mean():.4%} / {companies['EDF'].median():.4%}", "이론적 부도확률"),
("변동성 평균", f"{companies['sigma_E'].mean():.2%}", "연환산 주가 변동성"),
]
for label, value, desc in quality_stats:
ws4.write(r, 1, label, cell_fmt)
ws4.write(r, 2, value, cell_center)
ws4.write(r, 3, desc, cell_fmt)
r += 1
# 등급 분포 pie chart
r += 1
ws4.merge_range(r, 1, r, 3, "등급 분포", subtitle_fmt); r += 1
ig_count = len(companies[companies["shadow_rating"].isin(
["AAA","AA+","AA","AA-","A+","A","A-","BBB+","BBB","BBB-"]
)])
sg_count = len(companies) - ig_count
ws4.write(r, 1, "투자등급 (AAA~BBB-)", cell_fmt)
ws4.write(r, 2, f"{ig_count:,}개 ({ig_count/len(companies)*100:.1f}%)", good_fmt)
r += 1
ws4.write(r, 1, "투기등급 (BB+~CCC-)", cell_fmt)
ws4.write(r, 2, f"{sg_count:,}개 ({sg_count/len(companies)*100:.1f}%)", bad_fmt)
# ---- 완료 ----
wb.close()
print(f"[Report] Excel saved: {output_path}")
print(f" - 4 sheets: Overview, Default Rates, Company Detail ({len(companies):,} rows), Data Quality")
def main():
parser = argparse.ArgumentParser(description="EDF Excel 리포트 생성")
parser.add_argument("--output", default=None, help="출력 파일 경로")
args = parser.parse_args()
output_dir = Path(__file__).parent.parent.parent / "outputs"
output_dir.mkdir(exist_ok=True)
if args.output:
output_path = args.output
else:
output_path = str(output_dir / f"EDF_Report_{datetime.now().strftime('%Y%m%d')}.xlsx")
create_report(output_path)
if __name__ == "__main__":
main()

View File

@@ -1,42 +1,55 @@
"""시가총액 backfill: per-ticker get_market_cap_by_date 재시도"""
import sqlite3, time
from pykrx import stock
"""등급역전 분석 + AAA EDF 진단"""
import sqlite3
import numpy as np
from scipy.stats import norm
conn = sqlite3.connect("data/edf.db")
# 샘플 5개로 정확한 API 동작 확인
tickers = ["005930", "000660", "005380", "035720", "068270"]
# 1) 등급별 EDF 상세 확인 — 역전 여부
print("=== 등급별 EDF 상세 (역전 확인) ===")
rows = conn.execute("""
SELECT dd_rating, COUNT(*), AVG(DD), MIN(DD), MAX(DD), AVG(EDF), MIN(EDF), MAX(EDF)
FROM merton_results
GROUP BY dd_rating
ORDER BY AVG(DD) DESC
""").fetchall()
for tk in tickers:
print(f"\n=== {tk} ===")
# Method 1: get_market_cap_by_date (fromdate, todate, ticker)
try:
cap = stock.get_market_cap_by_date("20250301", "20250307", tk)
time.sleep(0.3)
print(f" cap_by_date: {len(cap)} rows")
if len(cap) > 0:
print(f" columns: {list(cap.columns)}")
print(cap.tail(2))
except Exception as e:
print(f" cap_by_date ERROR: {e}")
# Method 2: get_market_fundamental_by_date
try:
fund = stock.get_market_fundamental_by_date("20250301", "20250307", tk)
time.sleep(0.3)
print(f" fundamental: {len(fund)} rows, columns: {list(fund.columns)}")
except Exception as e:
print(f" fundamental ERROR: {e}")
rating_order = ["AAA","AA+","AA","AA-","A+","A","A-","BBB+","BBB","BBB-",
"BB+","BB","BB-","B+","B","B-","CCC+","CCC","CCC-"]
# Method 3: get_exhaustive_info
try:
cap = stock.get_market_cap_by_ticker("20250307")
time.sleep(0.3)
if tk in cap.index:
print(f" cap_by_ticker: 시총={cap.loc[tk, '시가총액']:,.0f}, 주식수={cap.loc[tk, '상장주식수']:,}")
else:
print(f" cap_by_ticker: {tk} not found, total rows={len(cap)}")
break # 한번만 호출 (전체 시장 데이터)
except Exception as e:
print(f" cap_by_ticker ERROR: {e}")
prev_edf = -1
print(f"{'등급':>5} | {'N':>5} | {'DD평균':>8} | {'DD최소':>8} | {'EDF평균':>12} | {'역전':>4}")
print("-" * 65)
for rating in rating_order:
match = [r for r in rows if r[0] == rating]
if match:
r = match[0]
edf = r[5]
inversion = " !!!" if edf < prev_edf and prev_edf >= 0 else ""
print(f" {r[0]:5s} | {r[1]:5d} | {r[2]:8.2f} | {r[3]:8.2f} | {edf:12.8f} | {inversion}")
prev_edf = edf
# 2) AAA 개별 확인
print("\n=== AAA 종목 상세 ===")
aaa = conn.execute("""
SELECT mr.ticker, c.name, mr.DD, mr.EDF, mr.E, mr.D, mr.sigma_V
FROM merton_results mr JOIN companies c ON mr.ticker = c.ticker
WHERE mr.dd_rating = 'AAA'
ORDER BY mr.DD DESC
LIMIT 15
""").fetchall()
for r in aaa:
# 수동 EDF 계산
manual_edf = norm.cdf(-r[2])
print(f" {r[0]} {r[1][:15]:15s} | DD={r[2]:8.2f} | EDF={r[3]:.2e} | manual_N(-DD)={manual_edf:.2e} | E={r[4]:.2e} D={r[5]:.2e}")
# 3) AA- EDF 역전 확인 (AA-가 A+보다 높은 문제)
print("\n=== AA- vs A+ 비교 ===")
for grade in ["AA-", "A+", "A", "A-"]:
data = conn.execute(f"""
SELECT AVG(DD), AVG(EDF), MIN(EDF), MAX(EDF), COUNT(*)
FROM merton_results WHERE dd_rating = '{grade}'
""").fetchone()
print(f" {grade:5s}: DD평균={data[0]:.2f}, EDF평균={data[1]:.6f}, EDF범위=[{data[2]:.6f}, {data[3]:.6f}], N={data[4]}")
conn.close()