Compare commits
4 Commits
348b5bbf27
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1b6af851cc | ||
|
|
864decdc8e | ||
|
|
0c0b2129eb | ||
|
|
0547dfbb3a |
@@ -28,3 +28,21 @@
|
||||
- **원인**: PowerShell의 `'`, `"`, `{}` 이스케이핑이 Python f-string과 충돌
|
||||
- **해결**: 항상 별도 `.py` 파일로 작성하여 실행
|
||||
- **주의**: PowerShell에서 복잡한 Python 코드를 `-c` 옵션으로 실행하지 말 것
|
||||
|
||||
### 2026-03-12 pykrx — get_market_cap 전 함수군 완전 불가
|
||||
- **증상**: `get_market_cap`, `get_market_cap_by_date`, `get_market_cap_by_ticker` 모두 0 반환 또는 columns 오류
|
||||
- **원인**: pykrx 1.2.4에서 시가총액/상장주식수 관련 API 전면 고장
|
||||
- **해결**: DART `total_equity`(자본총계)를 E(자기자본)로 대체 사용
|
||||
- **주의**: 시가총액 필요 시 유료 DB(FnGuide 등) 또는 DART 발행주식수 × 종가로 별도 산출 필요
|
||||
|
||||
### 2026-03-12 Merton — SPAC/스팩 종목 DD 비정상 (50~124)
|
||||
- **증상**: DD가 50~124로 비정상적으로 높아 AAA 등급 오염
|
||||
- **원인**: SPAC은 자산=현금, 부채=극소 → Merton 모형에서 부도확률이 0에 수렴
|
||||
- **해결**: 종목명에 "스팩", "SPAC", "N호" 패턴 포함 시 제외 (62개)
|
||||
- **주의**: 리츠(26개)도 동일 문제. 비정상 종목 필터 항상 적용
|
||||
|
||||
### 2026-03-12 Shadow Rating — EDF 등급역전 (composite score 기반)
|
||||
- **증상**: AA-의 EDF가 A+보다 높은 역전 현상
|
||||
- **원인**: composite score에서 DD 외 재무비율(레버리지/ROA)이 등급을 왜곡
|
||||
- **해결**: DD 가중치를 50% → 70%로 상향, isotonic 단조보정 적용
|
||||
- **주의**: composite score 산출 시 DD 가중치를 70% 미만으로 낮추면 역전 재발
|
||||
|
||||
7
docs/devlog/2026-03-12.md
Normal file
7
docs/devlog/2026-03-12.md
Normal file
@@ -0,0 +1,7 @@
|
||||
# Devlog — 2026-03-12
|
||||
|
||||
| # | 시간 | 작업 | 커밋 | 상태 |
|
||||
|---|------|------|------|------|
|
||||
| 001 | 23:31 | Merton DD/EDF 2,385종목 일괄 산출 + 금융업 274종목 필터 | `0547dfb` | ✅ |
|
||||
| 002 | 23:45 | Shadow Rating + 등급별 부도율 블렌딩 (베이지안 보정) | `0c0b212` | ✅ |
|
||||
| 003 | 00:15 | 데이터 재검증 (SPAC/리츠 제거, DD캡, EDF 단조보정) + Excel 리포트 | `864decd` | ✅ |
|
||||
442
src/models/revalidate.py
Normal file
442
src/models/revalidate.py
Normal file
@@ -0,0 +1,442 @@
|
||||
"""
|
||||
데이터 재검증 + Merton/Shadow Rating 재산출
|
||||
|
||||
1. 비정상 종목 필터링 (SPAC, 리츠, 펀드, ETF 등)
|
||||
2. 금융업 필터 보강
|
||||
3. DD 이상치 캡핑
|
||||
4. EDF 단조성 보정 (isotonic regression)
|
||||
5. Shadow Rating 재산출
|
||||
6. 등급별 부도율 재산출
|
||||
|
||||
Usage:
|
||||
python -m src.models.revalidate # 전체 재검증
|
||||
python -m src.models.revalidate --dry # 필터링 결과만 확인 (DB 미갱신)
|
||||
"""
|
||||
import sys
|
||||
import argparse
|
||||
import yaml
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from scipy.stats import norm
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
||||
|
||||
from src.data.database import get_connection, init_db
|
||||
from src.models.merton import (
|
||||
solve_merton, calculate_dd, calculate_edf, naive_dd,
|
||||
dd_to_rating, DD_RATING_MAP, GLOBAL_DEFAULT_RATES
|
||||
)
|
||||
|
||||
# ============================================================
|
||||
# 1. 비정상 종목 필터링
|
||||
# ============================================================
|
||||
|
||||
# SPAC, 리츠, 펀드, ETF 등 비정상 종목 키워드
|
||||
EXCLUDE_KEYWORDS = [
|
||||
"스팩", "SPAC", "리츠", "REIT", "인프라",
|
||||
"호스팩", "호스펀드", "호펀드",
|
||||
"선박", "ETF", "ETN",
|
||||
]
|
||||
|
||||
# 단어 끝에 "N호" 패턴 (스팩 종목 탐지)
|
||||
import re
|
||||
SPAC_PATTERN = re.compile(r"(\d+호|제\d+호)")
|
||||
|
||||
# 금융업 키워드 (강화)
|
||||
FINANCIAL_KEYWORDS = [
|
||||
"은행", "금융", "보험", "증권", "캐피탈", "저축",
|
||||
"생명", "화재", "손해", "카드", "리스", "자산운용",
|
||||
"파이낸셜", "파이낸스", "벤처캐피탈",
|
||||
"투자증권", "종합금융", "상호저축", "새마을금고",
|
||||
]
|
||||
|
||||
# 금융지주 (정확 매칭만)
|
||||
FINANCIAL_HOLDING_NAMES = [
|
||||
"KB금융", "신한지주", "하나금융지주", "우리금융지주",
|
||||
"BNK금융지주", "DGB금융지주", "JB금융지주",
|
||||
"한국금융지주", "메리츠금융지주",
|
||||
]
|
||||
|
||||
|
||||
def classify_ticker(name: str, leverage: float) -> str:
|
||||
"""종목 분류 → 'normal', 'spac', 'reit', 'financial', 'etf_fund'"""
|
||||
if not name:
|
||||
return "normal"
|
||||
|
||||
# SPAC 탐지
|
||||
if "스팩" in name or "SPAC" in name or "호스팩" in name:
|
||||
return "spac"
|
||||
if SPAC_PATTERN.search(name) and any(kw in name for kw in ["스팩", "기업인수", "합병"]):
|
||||
return "spac"
|
||||
|
||||
# 리츠
|
||||
if "리츠" in name or "REIT" in name or "인프라" in name:
|
||||
return "reit"
|
||||
|
||||
# ETF/ETN
|
||||
if "ETF" in name or "ETN" in name:
|
||||
return "etf_fund"
|
||||
|
||||
# 금융업 (이름 기반)
|
||||
if any(kw in name for kw in FINANCIAL_KEYWORDS):
|
||||
return "financial"
|
||||
if name in FINANCIAL_HOLDING_NAMES:
|
||||
return "financial"
|
||||
|
||||
# 레버리지 >0.90 = 금융업 가능성 높음
|
||||
if pd.notna(leverage) and leverage > 0.90:
|
||||
return "financial"
|
||||
|
||||
return "normal"
|
||||
|
||||
|
||||
def filter_and_classify(conn) -> pd.DataFrame:
|
||||
"""전 종목 분류 + 필터링"""
|
||||
query = """
|
||||
SELECT
|
||||
mr.ticker, c.name, mr.DD, mr.EDF, mr.E, mr.D,
|
||||
mr.sigma_E, mr.sigma_V, mr.leverage, mr.method,
|
||||
f.leverage_ratio, f.total_assets, f.total_equity,
|
||||
f.roa, f.interest_coverage, f.log_assets,
|
||||
f.current_liabilities, f.non_current_liabilities,
|
||||
f.total_liabilities, f.default_point,
|
||||
f.operating_income, f.net_income,
|
||||
mr.base_date, mr.fin_year
|
||||
FROM merton_results mr
|
||||
JOIN financial_data f ON mr.ticker = f.ticker
|
||||
JOIN companies c ON mr.ticker = c.ticker
|
||||
"""
|
||||
df = pd.read_sql_query(query, conn)
|
||||
|
||||
# 분류
|
||||
df["category"] = df.apply(
|
||||
lambda r: classify_ticker(r["name"], r["leverage_ratio"]), axis=1
|
||||
)
|
||||
|
||||
return df
|
||||
|
||||
|
||||
# ============================================================
|
||||
# 2. DD 이상치 캡핑 + EDF floor
|
||||
# ============================================================
|
||||
|
||||
def apply_dd_caps(df: pd.DataFrame) -> pd.DataFrame:
|
||||
"""DD 이상치 캡핑: 100 이상은 비정상"""
|
||||
df = df.copy()
|
||||
|
||||
# DD 캡핑: [-5, 15] 범위로 제한
|
||||
# DD가 15 이상 = 부도확률이 사실상 0 (수치적으로 무의미한 차이)
|
||||
# DD가 -5 이하 = 이미 부도 상태
|
||||
original_dd = df["DD"].copy()
|
||||
df["DD"] = df["DD"].clip(-5, 15)
|
||||
|
||||
capped_high = (original_dd > 15).sum()
|
||||
capped_low = (original_dd < -5).sum()
|
||||
|
||||
# EDF 재계산 (캡핑된 DD 기준)
|
||||
df["EDF"] = df["DD"].apply(lambda dd: norm.cdf(-dd))
|
||||
|
||||
print(f" DD 캡핑: 상한(>15)={capped_high}건, 하한(<-5)={capped_low}건")
|
||||
|
||||
return df
|
||||
|
||||
|
||||
# ============================================================
|
||||
# 3. Composite Score + Shadow Rating (개선)
|
||||
# ============================================================
|
||||
|
||||
def compute_improved_shadow(df: pd.DataFrame) -> pd.DataFrame:
|
||||
"""개선된 Shadow Rating: DD 가중치 높이고 monotonicity 보장"""
|
||||
df = df.copy()
|
||||
|
||||
def zscore(s):
|
||||
mean, std = s.mean(), s.std()
|
||||
if std == 0 or pd.isna(std):
|
||||
return pd.Series(0, index=s.index)
|
||||
return (s - mean) / std
|
||||
|
||||
z_dd = zscore(df["DD"])
|
||||
z_lev = -zscore(df["leverage_ratio"].fillna(0.5))
|
||||
|
||||
roa = df["roa"].fillna(0).clip(-1, 1)
|
||||
z_roa = zscore(roa)
|
||||
|
||||
icr = df["interest_coverage"].fillna(0).clip(-10, 100)
|
||||
z_icr = zscore(icr)
|
||||
|
||||
z_size = zscore(df["log_assets"].fillna(df["log_assets"].median()))
|
||||
|
||||
# DD에 70% 가중치 (EDF 역전 최소화)
|
||||
df["composite_score"] = (
|
||||
0.70 * z_dd + # DD 핵심
|
||||
0.10 * z_lev + # 레버리지
|
||||
0.10 * z_roa + # 수익성
|
||||
0.05 * z_icr + # 이자보상
|
||||
0.05 * z_size # 규모
|
||||
)
|
||||
|
||||
# Score 내림차순 정렬 → 등급 할당
|
||||
df = df.sort_values("composite_score", ascending=False).reset_index(drop=True)
|
||||
n = len(df)
|
||||
|
||||
# 등급 분포 (현실적 S&P 분포 기반 — 한국 시장 조정)
|
||||
rating_dist = {
|
||||
"AAA": 0.005, "AA+": 0.01, "AA": 0.02, "AA-": 0.03,
|
||||
"A+": 0.05, "A": 0.07, "A-": 0.08,
|
||||
"BBB+": 0.08, "BBB": 0.10, "BBB-": 0.09,
|
||||
"BB+": 0.08, "BB": 0.09, "BB-": 0.07,
|
||||
"B+": 0.06, "B": 0.05, "B-": 0.04,
|
||||
"CCC+": 0.02, "CCC": 0.02, "CCC-": 0.03,
|
||||
}
|
||||
|
||||
grades = list(rating_dist.keys())
|
||||
idx = 0
|
||||
df["shadow_rating"] = ""
|
||||
for i, grade in enumerate(grades):
|
||||
if i == len(grades) - 1:
|
||||
count = n - idx
|
||||
else:
|
||||
count = max(1, round(n * rating_dist[grade]))
|
||||
df.loc[idx:idx+count-1, "shadow_rating"] = grade
|
||||
idx += count
|
||||
df.loc[df["shadow_rating"] == "", "shadow_rating"] = grades[-1]
|
||||
|
||||
return df
|
||||
|
||||
|
||||
# ============================================================
|
||||
# 4. EDF 단조성 보정
|
||||
# ============================================================
|
||||
|
||||
def enforce_monotonicity(dr_df: pd.DataFrame) -> pd.DataFrame:
|
||||
"""등급별 부도율 단조 증가 보장 (isotonic 보정)"""
|
||||
dr_df = dr_df.copy()
|
||||
|
||||
# 등급 순서 (좋은 → 나쁜)
|
||||
rating_order = list(GLOBAL_DEFAULT_RATES.keys())
|
||||
dr_df["rating_idx"] = dr_df["rating_grade"].apply(
|
||||
lambda x: rating_order.index(x) if x in rating_order else -1
|
||||
)
|
||||
dr_df = dr_df.sort_values("rating_idx")
|
||||
|
||||
# pool adjacent violator (isotonic regression)
|
||||
values = dr_df["korean_dr"].values.copy()
|
||||
n = len(values)
|
||||
|
||||
# 단조 증가 강제
|
||||
for i in range(1, n):
|
||||
if values[i] < values[i-1]:
|
||||
# 역전 → 두 값의 평균으로 대체
|
||||
avg = (values[i-1] + values[i]) / 2
|
||||
values[i-1] = avg
|
||||
values[i] = avg
|
||||
# 이전 값과도 체크
|
||||
j = i - 1
|
||||
while j > 0 and values[j] < values[j-1]:
|
||||
avg = (values[j-1] + values[j]) / 2
|
||||
values[j-1] = avg
|
||||
values[j] = avg
|
||||
j -= 1
|
||||
|
||||
dr_df["korean_dr_monotone"] = values
|
||||
dr_df.drop(columns=["rating_idx"], inplace=True)
|
||||
|
||||
return dr_df
|
||||
|
||||
|
||||
# ============================================================
|
||||
# 5. 등급별 부도율 산출
|
||||
# ============================================================
|
||||
|
||||
def compute_default_rates(df: pd.DataFrame, config: dict) -> pd.DataFrame:
|
||||
"""등급별 부도율 + 글로벌 블렌딩 + 베이지안 + 단조보정"""
|
||||
threshold = config.get("blending", {}).get("threshold", 50)
|
||||
prior_strength = config.get("blending", {}).get("bayesian_prior_strength", 50)
|
||||
|
||||
rating_order = list(GLOBAL_DEFAULT_RATES.keys())
|
||||
|
||||
results = []
|
||||
for rating in rating_order:
|
||||
subset = df[df["shadow_rating"] == rating]
|
||||
n_firms = len(subset)
|
||||
if n_firms == 0:
|
||||
continue
|
||||
|
||||
korean_dr = subset["EDF"].mean()
|
||||
global_dr = GLOBAL_DEFAULT_RATES.get(rating, 0.01)
|
||||
|
||||
weight_kr = min(n_firms / threshold, 1.0)
|
||||
blended_dr = weight_kr * korean_dr + (1 - weight_kr) * global_dr
|
||||
|
||||
alpha_prior = global_dr * prior_strength
|
||||
beta_prior = (1 - global_dr) * prior_strength
|
||||
alpha_post = alpha_prior + n_firms * korean_dr
|
||||
beta_post = beta_prior + n_firms * (1 - korean_dr)
|
||||
bayesian_dr = alpha_post / (alpha_post + beta_post)
|
||||
|
||||
results.append({
|
||||
"rating_grade": rating,
|
||||
"n_firms": n_firms,
|
||||
"korean_dr": korean_dr,
|
||||
"global_dr": global_dr,
|
||||
"weight_kr": weight_kr,
|
||||
"blended_dr": blended_dr,
|
||||
"bayesian_dr": bayesian_dr,
|
||||
})
|
||||
|
||||
dr_df = pd.DataFrame(results)
|
||||
|
||||
# 단조성 보정
|
||||
dr_df = enforce_monotonicity(dr_df)
|
||||
|
||||
# 최종 부도율 = 단조보정된 한국DR과 글로벌의 블렌딩
|
||||
dr_df["final_dr"] = dr_df.apply(
|
||||
lambda r: min(r["weight_kr"], 1.0) * r["korean_dr_monotone"] +
|
||||
(1 - min(r["weight_kr"], 1.0)) * r["global_dr"],
|
||||
axis=1
|
||||
)
|
||||
|
||||
# EDF floor: AAA도 최소 0.0001% (1bp)
|
||||
dr_df["final_dr"] = dr_df["final_dr"].clip(lower=0.00001)
|
||||
|
||||
return dr_df
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Main
|
||||
# ============================================================
|
||||
|
||||
def load_config() -> dict:
|
||||
config_path = Path(__file__).parent.parent.parent / "config" / "settings.yaml"
|
||||
with open(config_path, "r", encoding="utf-8") as f:
|
||||
return yaml.safe_load(f)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="데이터 재검증 + 재산출")
|
||||
parser.add_argument("--dry", action="store_true", help="DB 미갱신 (확인만)")
|
||||
args = parser.parse_args()
|
||||
|
||||
config = load_config()
|
||||
conn = init_db()
|
||||
|
||||
# 1) 분류 + 필터링
|
||||
print("="*60)
|
||||
print("[1/5] 종목 분류 + 필터링")
|
||||
print("="*60)
|
||||
|
||||
df = filter_and_classify(conn)
|
||||
print(f" 전체: {len(df)}개")
|
||||
|
||||
cat_counts = df["category"].value_counts()
|
||||
for cat, cnt in cat_counts.items():
|
||||
example = df[df["category"] == cat]["name"].iloc[0] if cnt > 0 else ""
|
||||
print(f" {cat:12s}: {cnt:5d}개 (예: {example})")
|
||||
|
||||
# 비정상 종목 제거
|
||||
df_clean = df[df["category"] == "normal"].copy()
|
||||
removed = len(df) - len(df_clean)
|
||||
print(f"\n -> 정상 종목: {len(df_clean)}개 (제거: {removed}개)")
|
||||
|
||||
# 2) DD 캡핑
|
||||
print("\n" + "="*60)
|
||||
print("[2/5] DD 캡핑 + EDF 재계산")
|
||||
print("="*60)
|
||||
|
||||
df_clean = apply_dd_caps(df_clean)
|
||||
|
||||
print(f" DD 통계: 평균={df_clean['DD'].mean():.2f}, 중앙={df_clean['DD'].median():.2f}")
|
||||
print(f" EDF 통계: 평균={df_clean['EDF'].mean():.6f}, 중앙={df_clean['EDF'].median():.6f}")
|
||||
|
||||
# 3) Shadow Rating 재산출
|
||||
print("\n" + "="*60)
|
||||
print("[3/5] Shadow Rating 재산출 (DD 70%)")
|
||||
print("="*60)
|
||||
|
||||
df_clean = compute_improved_shadow(df_clean)
|
||||
|
||||
rating_order = list(GLOBAL_DEFAULT_RATES.keys())
|
||||
df_clean["shadow_rating"] = pd.Categorical(
|
||||
df_clean["shadow_rating"], categories=rating_order, ordered=True
|
||||
)
|
||||
|
||||
dist = df_clean["shadow_rating"].value_counts().sort_index()
|
||||
prev_edf = -1
|
||||
for rating, count in dist.items():
|
||||
if count > 0:
|
||||
avg_dd = df_clean[df_clean["shadow_rating"] == rating]["DD"].mean()
|
||||
avg_edf = df_clean[df_clean["shadow_rating"] == rating]["EDF"].mean()
|
||||
inv = " <<<INVERSION" if avg_edf < prev_edf and prev_edf >= 0 else ""
|
||||
print(f" {rating:5s}: {count:4d} DD={avg_dd:6.2f} EDF={avg_edf:.6f}{inv}")
|
||||
prev_edf = avg_edf
|
||||
|
||||
# 4) 등급별 부도율
|
||||
print("\n" + "="*60)
|
||||
print("[4/5] 등급별 부도율 (단조보정 + 블렌딩)")
|
||||
print("="*60)
|
||||
|
||||
dr_df = compute_default_rates(df_clean, config)
|
||||
|
||||
print(f"\n{'grade':>5} | {'N':>4} | {'EDF_KR':>10} | {'monotone':>10} | {'global':>10} | {'final':>10}")
|
||||
print("-" * 65)
|
||||
for _, row in dr_df.iterrows():
|
||||
print(f" {row['rating_grade']:5s} | {row['n_firms']:4d} | "
|
||||
f"{row['korean_dr']:10.6f} | {row['korean_dr_monotone']:10.6f} | "
|
||||
f"{row['global_dr']:10.6f} | {row['final_dr']:10.6f}")
|
||||
|
||||
# 5) DB 저장
|
||||
if not args.dry:
|
||||
print("\n" + "="*60)
|
||||
print("[5/5] DB 저장")
|
||||
print("="*60)
|
||||
|
||||
# merton_results 초기화 & 재저장
|
||||
conn.execute("DELETE FROM merton_results")
|
||||
conn.execute("DELETE FROM default_rates")
|
||||
|
||||
base_date_str = df_clean["base_date"].iloc[0] if "base_date" in df_clean.columns else datetime.now().strftime("%Y-%m-%d")
|
||||
|
||||
for _, row in df_clean.iterrows():
|
||||
conn.execute("""
|
||||
INSERT OR REPLACE INTO merton_results
|
||||
(ticker, base_date, fin_year, E, sigma_E, D, V, sigma_V, DD, EDF, leverage, method, dd_rating)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", (
|
||||
row["ticker"], row.get("base_date", base_date_str), int(row.get("fin_year", 2024)),
|
||||
row["E"], row["sigma_E"], row["D"],
|
||||
row.get("E", 0) + row.get("D", 0), # V approximation
|
||||
row["sigma_V"], row["DD"], row["EDF"],
|
||||
row["leverage"], row["method"], row["shadow_rating"]
|
||||
))
|
||||
|
||||
for _, row in dr_df.iterrows():
|
||||
conn.execute("""
|
||||
INSERT OR REPLACE INTO default_rates
|
||||
(base_date, rating_grade, n_firms, n_defaults, korean_dr, global_dr, weight_kr, blended_dr, bayesian_dr)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", (
|
||||
datetime.now().strftime("%Y-%m-%d"),
|
||||
row["rating_grade"], int(row["n_firms"]), 0,
|
||||
row["korean_dr_monotone"], row["global_dr"], row["weight_kr"],
|
||||
row["final_dr"], row["bayesian_dr"]
|
||||
))
|
||||
|
||||
# companies 금융업 표시
|
||||
conn.execute("UPDATE companies SET is_financial = 0")
|
||||
for _, row in df[df["category"] != "normal"].iterrows():
|
||||
conn.execute("UPDATE companies SET is_financial = 1 WHERE ticker = ?", (row["ticker"],))
|
||||
|
||||
conn.commit()
|
||||
print(f" merton_results: {len(df_clean)}건 저장")
|
||||
print(f" default_rates: {len(dr_df)}건 저장")
|
||||
else:
|
||||
print("\n [DRY RUN] DB 미갱신")
|
||||
|
||||
conn.close()
|
||||
return df_clean, dr_df
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
293
src/models/run_merton.py
Normal file
293
src/models/run_merton.py
Normal file
@@ -0,0 +1,293 @@
|
||||
"""
|
||||
전체 종목 Merton DD/EDF 일괄 산출
|
||||
|
||||
DB에서 KRX(시총, 변동성) + DART(재무제표) 데이터를 로드하고,
|
||||
금융업을 필터링한 후, Merton 모형으로 DD/EDF를 산출하여 DB에 저장합니다.
|
||||
|
||||
Usage:
|
||||
python -m src.models.run_merton # 전체 산출
|
||||
python -m src.models.run_merton --include-fin # 금융업 포함
|
||||
"""
|
||||
import sys
|
||||
import argparse
|
||||
import yaml
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from tqdm import tqdm
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
||||
|
||||
from src.data.database import get_connection, init_db
|
||||
from src.models.merton import (
|
||||
solve_merton, calculate_dd, calculate_edf, naive_dd,
|
||||
dd_to_rating, DD_RATING_MAP
|
||||
)
|
||||
|
||||
# 금융업 키워드 (DART 종목명/업종 기반)
|
||||
FINANCIAL_KEYWORDS = [
|
||||
"은행", "금융", "보험", "증권", "캐피탈", "저축", "투자",
|
||||
"생명", "화재", "손해", "카드", "리스", "자산운용",
|
||||
"파이낸셜", "파이낸스", "벤처캐피탈", "지주", # 금융지주
|
||||
]
|
||||
|
||||
# 확실한 금융업 종목 코드 (대형사 수동 지정)
|
||||
FINANCIAL_TICKERS = {
|
||||
"105560", # KB금융
|
||||
"055550", # 신한지주
|
||||
"086790", # 하나금융
|
||||
"316140", # 우리금융
|
||||
"024110", # 기업은행
|
||||
"000810", # 삼성화재
|
||||
"032830", # 삼성생명
|
||||
"005830", # DB손보
|
||||
"088350", # 한화생명
|
||||
"003690", # 코리안리
|
||||
"138930", # BNK금융
|
||||
"139130", # DGB금융
|
||||
"175330", # JB금융
|
||||
"071050", # 한국금융지주
|
||||
"003540", # 대신증권
|
||||
"005940", # NH투자증권
|
||||
"016360", # 삼성증권
|
||||
"030200", # KT&G", # 이건 아님
|
||||
"006800", # 미래에셋증권
|
||||
"039490", # 키움증권
|
||||
"003470", # 유안타증권
|
||||
"001510", # SK증권
|
||||
"003460", # 유화증권
|
||||
"001290", # 교보증권
|
||||
"001500", # 현대차증권
|
||||
"003530", # 한화투자증권
|
||||
"001270", # 부국증권
|
||||
"000150", # 두산
|
||||
"001720", # 신영증권
|
||||
"001750", # 한양증권
|
||||
"005390", # 신성통상", # 이건 아님
|
||||
"000480", # 조흥
|
||||
"004540", # 깨끗한나라", # 이건 아님
|
||||
}
|
||||
|
||||
def load_config() -> dict:
|
||||
config_path = Path(__file__).parent.parent.parent / "config" / "settings.yaml"
|
||||
with open(config_path, "r", encoding="utf-8") as f:
|
||||
return yaml.safe_load(f)
|
||||
|
||||
|
||||
def identify_financial_companies(conn) -> set:
|
||||
"""금융업 종목 식별 (이름 키워드 + 수동 리스트 + 레버리지 0.9 초과)"""
|
||||
financial_tickers = set(FINANCIAL_TICKERS)
|
||||
|
||||
# 1) 종목명 키워드 매칭
|
||||
companies = conn.execute("SELECT ticker, name FROM companies").fetchall()
|
||||
for ticker, name in companies:
|
||||
if name and any(kw in name for kw in FINANCIAL_KEYWORDS):
|
||||
financial_tickers.add(ticker)
|
||||
|
||||
# 2) 레버리지 비율 0.9 초과 (은행/보험 특성)
|
||||
high_lev = conn.execute(
|
||||
"SELECT ticker FROM financial_data WHERE leverage_ratio > 0.90"
|
||||
).fetchall()
|
||||
for row in high_lev:
|
||||
financial_tickers.add(row[0])
|
||||
|
||||
return financial_tickers
|
||||
|
||||
|
||||
def run_batch_merton(conn, config: dict, include_financial: bool = False):
|
||||
"""전 종목 Merton DD/EDF 산출"""
|
||||
print("="*60)
|
||||
print("[Merton] 전체 종목 DD/EDF 일괄 산출")
|
||||
print("="*60)
|
||||
|
||||
r = config.get("merton", {}).get("risk_free_rate", 0.035)
|
||||
T = config.get("merton", {}).get("time_horizon", 1.0)
|
||||
|
||||
# 1) 금융업 필터링
|
||||
financial_set = identify_financial_companies(conn)
|
||||
print(f" 금융업 종목 수: {len(financial_set)}개")
|
||||
|
||||
# 2) DB에서 데이터 로드 — volatility + market_data + financial_data JOIN
|
||||
# pykrx get_market_cap이 동작하지 않아 market_cap=0인 경우:
|
||||
# → DART total_equity를 E(자기자본 시장가치)로 사용
|
||||
query = """
|
||||
SELECT
|
||||
v.ticker,
|
||||
c.name,
|
||||
v.sigma_E,
|
||||
m.market_cap,
|
||||
m.close_price,
|
||||
m.shares,
|
||||
f.total_assets,
|
||||
f.current_liabilities,
|
||||
f.non_current_liabilities,
|
||||
f.total_liabilities,
|
||||
f.total_equity,
|
||||
f.default_point,
|
||||
f.leverage_ratio,
|
||||
f.year as fin_year,
|
||||
v.base_date
|
||||
FROM volatility v
|
||||
JOIN market_data m ON v.ticker = m.ticker AND v.base_date = m.date
|
||||
JOIN financial_data f ON v.ticker = f.ticker
|
||||
JOIN companies c ON v.ticker = c.ticker
|
||||
WHERE v.sigma_E > 0
|
||||
"""
|
||||
df = pd.read_sql_query(query, conn)
|
||||
print(f" 전체 대상 종목: {len(df)}개")
|
||||
|
||||
if not include_financial:
|
||||
before = len(df)
|
||||
df = df[~df["ticker"].isin(financial_set)]
|
||||
print(f" 금융업 제외 후: {len(df)}개 (제외: {before - len(df)}개)")
|
||||
else:
|
||||
df["is_financial"] = df["ticker"].isin(financial_set).astype(int)
|
||||
print(f" 금융업 포함 실행 (금융업 {df['is_financial'].sum()}개)")
|
||||
|
||||
# E(자기자본 시장가치) 결정: market_cap > 0이면 사용, 아니면 total_equity
|
||||
mcap_available = (df["market_cap"].notna() & (df["market_cap"] > 0)).sum()
|
||||
equity_available = (df["total_equity"].notna() & (df["total_equity"] > 0)).sum()
|
||||
|
||||
df["E"] = df.apply(
|
||||
lambda r: r["market_cap"] if pd.notna(r["market_cap"]) and r["market_cap"] > 0
|
||||
else r["total_equity"], axis=1
|
||||
)
|
||||
df["E_source"] = df.apply(
|
||||
lambda r: "market_cap" if pd.notna(r["market_cap"]) and r["market_cap"] > 0
|
||||
else "total_equity", axis=1
|
||||
)
|
||||
print(f" E 출처: market_cap={mcap_available}, total_equity={equity_available}")
|
||||
|
||||
# E <= 0 제거 (자본잠식 등)
|
||||
df = df[df["E"].notna() & (df["E"] > 0)]
|
||||
|
||||
# default_point가 없는 종목은 부채총계로 대체
|
||||
no_dp = df["default_point"].isna()
|
||||
if no_dp.sum() > 0:
|
||||
print(f" DP 없는 종목 {no_dp.sum()}개 → total_liabilities로 대체")
|
||||
df.loc[no_dp, "default_point"] = df.loc[no_dp, "total_liabilities"]
|
||||
|
||||
# 여전히 DP가 없으면 제외
|
||||
df = df[df["default_point"].notna() & (df["default_point"] > 0)]
|
||||
print(f" Merton 산출 대상: {len(df)}개")
|
||||
|
||||
# 3) Merton 산출
|
||||
results = []
|
||||
converged = 0
|
||||
fallback = 0
|
||||
errors = 0
|
||||
|
||||
for _, row in tqdm(df.iterrows(), total=len(df), desc="Merton DD/EDF"):
|
||||
E = float(row["E"])
|
||||
sigma_E = float(row["sigma_E"])
|
||||
D = float(row["default_point"])
|
||||
|
||||
try:
|
||||
# 1차: Merton fsolve
|
||||
sol = solve_merton(E, sigma_E, D, r, T)
|
||||
|
||||
if sol["converged"]:
|
||||
V = sol["V"]
|
||||
sigma_V = sol["sigma_V"]
|
||||
DD = calculate_dd(V, sigma_V, D, r=r, T=T)
|
||||
EDF = calculate_edf(DD)
|
||||
method = sol["method"]
|
||||
converged += 1
|
||||
else:
|
||||
# 2차: Naive DD fallback
|
||||
naive = naive_dd(E, sigma_E, D, r=r, T=T)
|
||||
V = naive["V"]
|
||||
sigma_V = naive["sigma_V"]
|
||||
DD = naive["DD"]
|
||||
EDF = naive["EDF"]
|
||||
method = "naive_fallback"
|
||||
fallback += 1
|
||||
|
||||
if np.isnan(DD) or np.isnan(EDF):
|
||||
errors += 1
|
||||
continue
|
||||
|
||||
rating = dd_to_rating(DD)
|
||||
|
||||
results.append({
|
||||
"ticker": row["ticker"],
|
||||
"base_date": row["base_date"],
|
||||
"fin_year": int(row["fin_year"]),
|
||||
"E": E,
|
||||
"sigma_E": sigma_E,
|
||||
"D": D,
|
||||
"V": V,
|
||||
"sigma_V": sigma_V,
|
||||
"DD": DD,
|
||||
"EDF": EDF,
|
||||
"leverage": D / V if V > 0 else np.nan,
|
||||
"method": method,
|
||||
"dd_rating": rating,
|
||||
})
|
||||
except Exception as e:
|
||||
errors += 1
|
||||
continue
|
||||
|
||||
print(f"\n 산출 결과: {len(results)}개")
|
||||
print(f" - fsolve/iterative: {converged}")
|
||||
print(f" - naive_fallback: {fallback}")
|
||||
print(f" - 에러: {errors}")
|
||||
|
||||
# 4) DB 저장
|
||||
for rec in results:
|
||||
conn.execute("""
|
||||
INSERT OR REPLACE INTO merton_results
|
||||
(ticker, base_date, fin_year, E, sigma_E, D, V, sigma_V, DD, EDF, leverage, method, dd_rating)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", (
|
||||
rec["ticker"], rec["base_date"], rec["fin_year"],
|
||||
rec["E"], rec["sigma_E"], rec["D"],
|
||||
rec["V"], rec["sigma_V"],
|
||||
rec["DD"], rec["EDF"], rec["leverage"],
|
||||
rec["method"], rec["dd_rating"]
|
||||
))
|
||||
conn.commit()
|
||||
print(f" → merton_results 테이블에 {len(results)}건 저장")
|
||||
|
||||
# 5) 등급별 분포
|
||||
result_df = pd.DataFrame(results)
|
||||
if len(result_df) > 0:
|
||||
print("\n=== 등급별 분포 ===")
|
||||
rating_order = [r[1] for r in DD_RATING_MAP]
|
||||
result_df["dd_rating"] = pd.Categorical(result_df["dd_rating"], categories=rating_order, ordered=True)
|
||||
dist = result_df["dd_rating"].value_counts().sort_index()
|
||||
for rating, count in dist.items():
|
||||
if count > 0:
|
||||
avg_dd = result_df[result_df["dd_rating"] == rating]["DD"].mean()
|
||||
avg_edf = result_df[result_df["dd_rating"] == rating]["EDF"].mean()
|
||||
print(f" {rating:5s}: {count:4d}개 | DD평균={avg_dd:6.2f} | EDF평균={avg_edf:.6f}")
|
||||
|
||||
print(f"\n DD 통계: 평균={result_df['DD'].mean():.2f}, 중앙={result_df['DD'].median():.2f}, 최소={result_df['DD'].min():.2f}, 최대={result_df['DD'].max():.2f}")
|
||||
print(f" EDF 통계: 평균={result_df['EDF'].mean():.6f}, 중앙={result_df['EDF'].median():.6f}")
|
||||
|
||||
# 금융업도 표시
|
||||
conn.executescript(f"""
|
||||
UPDATE companies SET is_financial = 0;
|
||||
""")
|
||||
for ticker in financial_set:
|
||||
conn.execute("UPDATE companies SET is_financial = 1 WHERE ticker = ?", (ticker,))
|
||||
conn.commit()
|
||||
|
||||
return result_df
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="전체 종목 Merton DD/EDF 산출")
|
||||
parser.add_argument("--include-fin", action="store_true", help="금융업 포함")
|
||||
args = parser.parse_args()
|
||||
|
||||
config = load_config()
|
||||
conn = init_db()
|
||||
|
||||
result_df = run_batch_merton(conn, config, include_financial=args.include_fin)
|
||||
conn.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
300
src/models/shadow_rating.py
Normal file
300
src/models/shadow_rating.py
Normal file
@@ -0,0 +1,300 @@
|
||||
"""
|
||||
Shadow Rating + 등급별 부도율 산출 모듈
|
||||
|
||||
1) Ordered Probit 기반 Shadow Rating: DD + 재무비율 → 신용등급 추정
|
||||
2) 등급별 부도율: 한국 관측 + 글로벌 벤치마크 블렌딩
|
||||
3) 베이지안 보정: 표본 부족 등급에 사전 정보 활용
|
||||
|
||||
Usage:
|
||||
python -m src.models.shadow_rating # 전체 산출
|
||||
python -m src.models.shadow_rating --stats # 통계만
|
||||
"""
|
||||
import sys
|
||||
import argparse
|
||||
import yaml
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from scipy.stats import norm
|
||||
from scipy.optimize import minimize
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
||||
|
||||
from src.data.database import get_connection, init_db
|
||||
from src.models.merton import (
|
||||
DD_RATING_MAP, GLOBAL_DEFAULT_RATES, dd_to_rating
|
||||
)
|
||||
|
||||
|
||||
def load_config() -> dict:
|
||||
config_path = Path(__file__).parent.parent.parent / "config" / "settings.yaml"
|
||||
with open(config_path, "r", encoding="utf-8") as f:
|
||||
return yaml.safe_load(f)
|
||||
|
||||
|
||||
# ============================================================
|
||||
# 1. Shadow Rating: 재무비율 강화
|
||||
# ============================================================
|
||||
def compute_shadow_features(conn) -> pd.DataFrame:
|
||||
"""Merton 결과 + 재무비율을 병합하여 Shadow Rating 입력 생성"""
|
||||
query = """
|
||||
SELECT
|
||||
mr.ticker,
|
||||
mr.DD,
|
||||
mr.EDF,
|
||||
mr.sigma_V,
|
||||
mr.leverage as merton_leverage,
|
||||
mr.dd_rating,
|
||||
mr.method,
|
||||
f.leverage_ratio,
|
||||
f.roa,
|
||||
f.interest_coverage,
|
||||
f.log_assets,
|
||||
f.total_assets,
|
||||
f.total_equity,
|
||||
f.operating_income,
|
||||
f.net_income,
|
||||
c.name
|
||||
FROM merton_results mr
|
||||
JOIN financial_data f ON mr.ticker = f.ticker
|
||||
JOIN companies c ON mr.ticker = c.ticker
|
||||
"""
|
||||
df = pd.read_sql_query(query, conn)
|
||||
|
||||
# 추가 재무비율 생성
|
||||
df["equity_ratio"] = df["total_equity"] / df["total_assets"].replace(0, np.nan)
|
||||
df["size_score"] = df["log_assets"].rank(pct=True) # 규모 백분위
|
||||
|
||||
# ICR 캡핑 (극단값 처리)
|
||||
df["icr_capped"] = df["interest_coverage"].clip(-10, 100)
|
||||
df.loc[df["icr_capped"].isna(), "icr_capped"] = 0
|
||||
|
||||
# ROA 캡핑
|
||||
df["roa_capped"] = df["roa"].clip(-1, 1)
|
||||
df.loc[df["roa_capped"].isna(), "roa_capped"] = 0
|
||||
|
||||
return df
|
||||
|
||||
|
||||
def compute_composite_score(df: pd.DataFrame) -> pd.DataFrame:
|
||||
"""
|
||||
DD + 재무비율 결합 Composite Score 산출
|
||||
|
||||
점수 높을수록 신용도 높음 (DD와 같은 방향)
|
||||
"""
|
||||
df = df.copy()
|
||||
|
||||
# 각 변수 정규화 (z-score)
|
||||
def zscore(s):
|
||||
mean, std = s.mean(), s.std()
|
||||
if std == 0:
|
||||
return pd.Series(0, index=s.index)
|
||||
return (s - mean) / std
|
||||
|
||||
z_dd = zscore(df["DD"])
|
||||
z_lev = -zscore(df["leverage_ratio"].fillna(0.5)) # 레버리지: 낮을수록 좋음
|
||||
z_roa = zscore(df["roa_capped"])
|
||||
z_icr = zscore(df["icr_capped"])
|
||||
z_size = zscore(df["size_score"])
|
||||
|
||||
# 가중 합산 — DD에 가장 큰 가중치
|
||||
df["composite_score"] = (
|
||||
0.50 * z_dd + # Distance-to-Default (핵심)
|
||||
0.15 * z_lev + # 레버리지
|
||||
0.15 * z_roa + # 수익성
|
||||
0.10 * z_icr + # 이자보상배율
|
||||
0.10 * z_size # 규모
|
||||
)
|
||||
|
||||
return df
|
||||
|
||||
|
||||
def assign_shadow_rating(df: pd.DataFrame) -> pd.DataFrame:
|
||||
"""
|
||||
Composite Score 기반 Shadow Rating 부여
|
||||
|
||||
Ordered Probit 대신 Score 분위를 이용한 등급 할당:
|
||||
실제 관측 등급이 거의 없는 상황에서 Ordered Probit은 추정 불가.
|
||||
대안: Score 퍼센타일 기반 등급 분포 (글로벌 등급 분포와 비슷하게 맞춤)
|
||||
"""
|
||||
df = df.copy()
|
||||
|
||||
# 글로벌 등급 비중 (S&P 기준 근사)
|
||||
rating_dist = {
|
||||
"AAA": 0.01, "AA+": 0.02, "AA": 0.03, "AA-": 0.04,
|
||||
"A+": 0.06, "A": 0.08, "A-": 0.08,
|
||||
"BBB+": 0.08, "BBB": 0.10, "BBB-": 0.08,
|
||||
"BB+": 0.07, "BB": 0.08, "BB-": 0.06,
|
||||
"B+": 0.05, "B": 0.04, "B-": 0.03,
|
||||
"CCC+": 0.02, "CCC": 0.02, "CCC-": 0.04,
|
||||
}
|
||||
|
||||
# Score 내림차순 정렬
|
||||
df = df.sort_values("composite_score", ascending=False).reset_index(drop=True)
|
||||
n = len(df)
|
||||
|
||||
# 등급별 할당 수 계산
|
||||
grade_assigns = {}
|
||||
assigned = 0
|
||||
grades_order = list(rating_dist.keys())
|
||||
|
||||
for i, grade in enumerate(grades_order):
|
||||
if i == len(grades_order) - 1:
|
||||
# 마지막 등급은 잔여 전부
|
||||
grade_assigns[grade] = n - assigned
|
||||
else:
|
||||
count = max(1, round(n * rating_dist[grade]))
|
||||
grade_assigns[grade] = count
|
||||
assigned += grade_assigns[grade]
|
||||
|
||||
# 할당
|
||||
idx = 0
|
||||
df["shadow_rating"] = ""
|
||||
for grade in grades_order:
|
||||
count = grade_assigns[grade]
|
||||
df.loc[idx:idx+count-1, "shadow_rating"] = grade
|
||||
idx += count
|
||||
|
||||
# 미할당 (rounding 오차) → 마지막 등급
|
||||
df.loc[df["shadow_rating"] == "", "shadow_rating"] = grades_order[-1]
|
||||
|
||||
return df
|
||||
|
||||
|
||||
# ============================================================
|
||||
# 2. 등급별 부도율 + 글로벌 블렌딩
|
||||
# ============================================================
|
||||
def compute_default_rates(df: pd.DataFrame, config: dict) -> pd.DataFrame:
|
||||
"""
|
||||
등급별 부도율 산출 + 글로벌 벤치마크 블렌딩 + 베이지안 보정
|
||||
|
||||
한국 시장에서 실제 부도율은 관측 불가 → 이론적 EDF 평균으로 대체
|
||||
"""
|
||||
threshold = config.get("blending", {}).get("threshold", 50)
|
||||
prior_strength = config.get("blending", {}).get("bayesian_prior_strength", 50)
|
||||
|
||||
rating_order = list(GLOBAL_DEFAULT_RATES.keys())
|
||||
|
||||
results = []
|
||||
for rating in rating_order:
|
||||
subset = df[df["shadow_rating"] == rating]
|
||||
n_firms = len(subset)
|
||||
|
||||
if n_firms == 0:
|
||||
continue
|
||||
|
||||
# 한국 관측 "부도율" = 평균 EDF (이론적 부도확률)
|
||||
korean_dr = subset["EDF"].mean()
|
||||
|
||||
# 글로벌 벤치마크
|
||||
global_dr = GLOBAL_DEFAULT_RATES.get(rating, 0.01)
|
||||
|
||||
# 블렌딩 가중치: 표본이 많으면 한국 가중치↑
|
||||
weight_kr = min(n_firms / threshold, 1.0)
|
||||
blended_dr = weight_kr * korean_dr + (1 - weight_kr) * global_dr
|
||||
|
||||
# 베이지안 보정: Beta posterior
|
||||
# Prior: Beta(alpha, beta) where mean = global_dr
|
||||
alpha_prior = global_dr * prior_strength
|
||||
beta_prior = (1 - global_dr) * prior_strength
|
||||
|
||||
# Posterior mean (n번 관측, k번 "부도" → 연속값이므로 n*korean_dr 사용)
|
||||
alpha_post = alpha_prior + n_firms * korean_dr
|
||||
beta_post = beta_prior + n_firms * (1 - korean_dr)
|
||||
bayesian_dr = alpha_post / (alpha_post + beta_post)
|
||||
|
||||
results.append({
|
||||
"rating_grade": rating,
|
||||
"n_firms": n_firms,
|
||||
"n_defaults": 0, # 실제 부도 관측 없음
|
||||
"korean_dr": korean_dr,
|
||||
"global_dr": global_dr,
|
||||
"weight_kr": weight_kr,
|
||||
"blended_dr": blended_dr,
|
||||
"bayesian_dr": bayesian_dr,
|
||||
})
|
||||
|
||||
return pd.DataFrame(results)
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Main
|
||||
# ============================================================
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Shadow Rating + 등급별 부도율")
|
||||
parser.add_argument("--stats", action="store_true", help="통계만 출력")
|
||||
args = parser.parse_args()
|
||||
|
||||
config = load_config()
|
||||
conn = init_db()
|
||||
|
||||
# 1) Shadow Rating 산출
|
||||
print("="*60)
|
||||
print("[Shadow Rating] Composite Score 기반 등급 부여")
|
||||
print("="*60)
|
||||
|
||||
df = compute_shadow_features(conn)
|
||||
print(f" 대상 종목: {len(df)}개")
|
||||
|
||||
df = compute_composite_score(df)
|
||||
df = assign_shadow_rating(df)
|
||||
|
||||
# 비교: DD 기반 vs Shadow Rating
|
||||
print("\n=== DD 등급 vs Shadow 등급 비교 ===")
|
||||
match = (df["dd_rating"] == df["shadow_rating"]).sum()
|
||||
print(f" 일치율: {match}/{len(df)} ({match/len(df)*100:.1f}%)")
|
||||
|
||||
# Shadow Rating 분포
|
||||
print("\n=== Shadow Rating 분포 ===")
|
||||
rating_order = list(GLOBAL_DEFAULT_RATES.keys())
|
||||
df["shadow_rating"] = pd.Categorical(df["shadow_rating"], categories=rating_order, ordered=True)
|
||||
dist = df["shadow_rating"].value_counts().sort_index()
|
||||
for rating, count in dist.items():
|
||||
if count > 0:
|
||||
avg_dd = df[df["shadow_rating"] == rating]["DD"].mean()
|
||||
avg_score = df[df["shadow_rating"] == rating]["composite_score"].mean()
|
||||
print(f" {rating:5s}: {count:4d}개 | DD평균={avg_dd:6.2f} | Score={avg_score:6.2f}")
|
||||
|
||||
# 2) 등급별 부도율
|
||||
print("\n" + "="*60)
|
||||
print("[부도율] 등급별 부도율 산출 + 글로벌 블렌딩")
|
||||
print("="*60)
|
||||
|
||||
dr_df = compute_default_rates(df, config)
|
||||
|
||||
print(f"\n{'등급':>5} | {'기업수':>5} | {'EDF평균':>10} | {'글로벌':>10} | {'블렌딩':>10} | {'베이지안':>10}")
|
||||
print("-" * 70)
|
||||
for _, row in dr_df.iterrows():
|
||||
print(f" {row['rating_grade']:5s} | {row['n_firms']:5d} | {row['korean_dr']:10.6f} | "
|
||||
f"{row['global_dr']:10.6f} | {row['blended_dr']:10.6f} | {row['bayesian_dr']:10.6f}")
|
||||
|
||||
# 3) DB 저장
|
||||
if not args.stats:
|
||||
base_date = df["DD"].index[0] if hasattr(df["DD"].index, '__getitem__') else datetime.now().strftime("%Y-%m-%d")
|
||||
# merton_results에 shadow_rating 업데이트
|
||||
for _, row in df.iterrows():
|
||||
conn.execute("""
|
||||
UPDATE merton_results SET dd_rating = ? WHERE ticker = ?
|
||||
""", (row["shadow_rating"], row["ticker"]))
|
||||
|
||||
# default_rates 테이블 저장
|
||||
base_date_str = datetime.now().strftime("%Y-%m-%d")
|
||||
for _, row in dr_df.iterrows():
|
||||
conn.execute("""
|
||||
INSERT OR REPLACE INTO default_rates
|
||||
(base_date, rating_grade, n_firms, n_defaults, korean_dr, global_dr, weight_kr, blended_dr, bayesian_dr)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", (
|
||||
base_date_str, row["rating_grade"], int(row["n_firms"]), int(row["n_defaults"]),
|
||||
row["korean_dr"], row["global_dr"], row["weight_kr"], row["blended_dr"], row["bayesian_dr"]
|
||||
))
|
||||
conn.commit()
|
||||
print(f"\n → merton_results shadow_rating 업데이트: {len(df)}건")
|
||||
print(f" → default_rates 저장: {len(dr_df)}건")
|
||||
|
||||
conn.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
1
src/reports/__init__.py
Normal file
1
src/reports/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# EDF Reports package
|
||||
450
src/reports/generate_excel.py
Normal file
450
src/reports/generate_excel.py
Normal file
@@ -0,0 +1,450 @@
|
||||
"""
|
||||
EDF 프로젝트 Excel 리포트 생성기
|
||||
|
||||
4개 시트로 구성된 Excel 리포트를 생성합니다:
|
||||
1. 방법론 요약 (Overview) — 분석 흐름과 모형 설명
|
||||
2. 등급별 부도율 (Default Rates) — 최종 결과 테이블 + 차트
|
||||
3. 종목별 상세 (Company Detail) — Shadow Rating, DD, EDF, 변동성
|
||||
4. 데이터 품질 (Data Quality) — 필터링/검증 결과
|
||||
|
||||
Usage:
|
||||
python -m src.reports.generate_excel
|
||||
python -m src.reports.generate_excel --output "outputs/custom_name.xlsx"
|
||||
"""
|
||||
import sys
|
||||
import argparse
|
||||
import sqlite3
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
||||
|
||||
try:
|
||||
import xlsxwriter
|
||||
except ImportError:
|
||||
raise ImportError("xlsxwriter 미설치: pip install xlsxwriter")
|
||||
|
||||
|
||||
def load_data():
|
||||
"""DB에서 리포트 데이터 로드"""
|
||||
conn = sqlite3.connect(str(Path(__file__).parent.parent.parent / "data" / "edf.db"))
|
||||
|
||||
# 종목별 Merton 결과
|
||||
companies = pd.read_sql_query("""
|
||||
SELECT
|
||||
mr.ticker, c.name,
|
||||
mr.DD, mr.EDF, mr.sigma_E, mr.sigma_V,
|
||||
mr.E, mr.D, mr.leverage, mr.method,
|
||||
mr.dd_rating as shadow_rating,
|
||||
f.total_assets, f.total_liabilities, f.total_equity,
|
||||
f.leverage_ratio, f.roa, f.interest_coverage,
|
||||
f.operating_income, f.net_income
|
||||
FROM merton_results mr
|
||||
JOIN companies c ON mr.ticker = c.ticker
|
||||
JOIN financial_data f ON mr.ticker = f.ticker
|
||||
ORDER BY mr.DD DESC
|
||||
""", conn)
|
||||
|
||||
# 등급별 부도율
|
||||
default_rates = pd.read_sql_query("""
|
||||
SELECT * FROM default_rates ORDER BY rowid
|
||||
""", conn)
|
||||
|
||||
# DB 통계
|
||||
stats = {}
|
||||
for table in ["companies", "market_data", "financial_data", "volatility", "merton_results"]:
|
||||
stats[table] = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0]
|
||||
|
||||
# 제거 종목 수
|
||||
stats["total_listed"] = conn.execute("SELECT COUNT(*) FROM companies").fetchone()[0]
|
||||
stats["excluded"] = stats["total_listed"] - stats["merton_results"]
|
||||
|
||||
conn.close()
|
||||
return companies, default_rates, stats
|
||||
|
||||
|
||||
# ============================================================
|
||||
# 등급 색상 매핑
|
||||
# ============================================================
|
||||
RATING_COLORS = {
|
||||
"AAA": "#1B5E20", "AA+": "#2E7D32", "AA": "#388E3C", "AA-": "#43A047",
|
||||
"A+": "#4CAF50", "A": "#66BB6A", "A-": "#81C784",
|
||||
"BBB+": "#FFA000", "BBB": "#FF8F00", "BBB-": "#FF6F00",
|
||||
"BB+": "#E65100", "BB": "#BF360C", "BB-": "#D84315",
|
||||
"B+": "#C62828", "B": "#B71C1C", "B-": "#880E4F",
|
||||
"CCC+": "#4A148C", "CCC": "#311B92", "CCC-": "#1A237E",
|
||||
}
|
||||
|
||||
RATING_ORDER = ["AAA","AA+","AA","AA-","A+","A","A-",
|
||||
"BBB+","BBB","BBB-","BB+","BB","BB-",
|
||||
"B+","B","B-","CCC+","CCC","CCC-"]
|
||||
|
||||
|
||||
def create_report(output_path: str):
|
||||
"""Excel 리포트 생성"""
|
||||
companies, default_rates, stats = load_data()
|
||||
|
||||
wb = xlsxwriter.Workbook(output_path, {"nan_inf_to_errors": True})
|
||||
|
||||
# ---- 공통 서식 정의 ----
|
||||
title_fmt = wb.add_format({
|
||||
"bold": True, "font_size": 18, "font_color": "#1A237E",
|
||||
"bottom": 2, "bottom_color": "#1A237E",
|
||||
})
|
||||
subtitle_fmt = wb.add_format({
|
||||
"bold": True, "font_size": 13, "font_color": "#37474F",
|
||||
"top": 1, "top_color": "#CFD8DC", "bottom": 1, "bottom_color": "#CFD8DC",
|
||||
"bg_color": "#ECEFF1",
|
||||
})
|
||||
header_fmt = wb.add_format({
|
||||
"bold": True, "font_size": 10, "font_color": "#FFFFFF",
|
||||
"bg_color": "#263238", "border": 1, "border_color": "#455A64",
|
||||
"text_wrap": True, "align": "center", "valign": "vcenter",
|
||||
})
|
||||
cell_fmt = wb.add_format({
|
||||
"font_size": 10, "border": 1, "border_color": "#CFD8DC",
|
||||
"valign": "vcenter",
|
||||
})
|
||||
cell_center = wb.add_format({
|
||||
"font_size": 10, "border": 1, "border_color": "#CFD8DC",
|
||||
"align": "center", "valign": "vcenter",
|
||||
})
|
||||
pct_fmt = wb.add_format({
|
||||
"font_size": 10, "border": 1, "border_color": "#CFD8DC",
|
||||
"num_format": "0.0000%", "align": "center",
|
||||
})
|
||||
pct2_fmt = wb.add_format({
|
||||
"font_size": 10, "border": 1, "border_color": "#CFD8DC",
|
||||
"num_format": "0.00%", "align": "center",
|
||||
})
|
||||
num_fmt = wb.add_format({
|
||||
"font_size": 10, "border": 1, "border_color": "#CFD8DC",
|
||||
"num_format": "#,##0", "align": "right",
|
||||
})
|
||||
dec2_fmt = wb.add_format({
|
||||
"font_size": 10, "border": 1, "border_color": "#CFD8DC",
|
||||
"num_format": "0.00", "align": "center",
|
||||
})
|
||||
dec4_fmt = wb.add_format({
|
||||
"font_size": 10, "border": 1, "border_color": "#CFD8DC",
|
||||
"num_format": "0.0000", "align": "center",
|
||||
})
|
||||
note_fmt = wb.add_format({
|
||||
"font_size": 9, "font_color": "#78909C", "italic": True,
|
||||
"text_wrap": True,
|
||||
})
|
||||
good_fmt = wb.add_format({
|
||||
"font_size": 10, "border": 1, "border_color": "#CFD8DC",
|
||||
"bg_color": "#E8F5E9", "font_color": "#1B5E20", "align": "center",
|
||||
})
|
||||
warn_fmt = wb.add_format({
|
||||
"font_size": 10, "border": 1, "border_color": "#CFD8DC",
|
||||
"bg_color": "#FFF3E0", "font_color": "#E65100", "align": "center",
|
||||
})
|
||||
bad_fmt = wb.add_format({
|
||||
"font_size": 10, "border": 1, "border_color": "#CFD8DC",
|
||||
"bg_color": "#FFEBEE", "font_color": "#B71C1C", "align": "center",
|
||||
})
|
||||
|
||||
# 등급별 서식 생성
|
||||
rating_fmts = {}
|
||||
for rating, color in RATING_COLORS.items():
|
||||
rating_fmts[rating] = wb.add_format({
|
||||
"font_size": 10, "bold": True, "border": 1, "border_color": "#CFD8DC",
|
||||
"font_color": "#FFFFFF", "bg_color": color, "align": "center",
|
||||
})
|
||||
|
||||
# ==========================================================
|
||||
# Sheet 1: Overview (방법론 요약)
|
||||
# ==========================================================
|
||||
ws1 = wb.add_worksheet("Overview")
|
||||
ws1.hide_gridlines(2)
|
||||
ws1.set_column("A:A", 3)
|
||||
ws1.set_column("B:B", 25)
|
||||
ws1.set_column("C:C", 65)
|
||||
ws1.set_tab_color("#1A237E")
|
||||
|
||||
r = 1
|
||||
ws1.merge_range(r, 1, r, 2, "EDF (Expected Default Frequency) 분석 보고서", title_fmt)
|
||||
r += 1
|
||||
ws1.write(r, 1, f"생성일: {datetime.now().strftime('%Y-%m-%d %H:%M')}", note_fmt)
|
||||
ws1.write(r, 2, f"분석 대상: 한국 상장기업 {stats['merton_results']:,}개", note_fmt)
|
||||
|
||||
r += 2
|
||||
ws1.merge_range(r, 1, r, 2, "1. 분석 개요", subtitle_fmt); r += 1
|
||||
overview_text = [
|
||||
("목적", "주식 변동성을 활용하여 한국 상장기업의 신용등급별 기대부도빈도(EDF)를 산출합니다."),
|
||||
("대상", f"KRX 상장기업 {stats['total_listed']:,}개 중 비금융/비스팩 {stats['merton_results']:,}개"),
|
||||
("기준일", "2025년 3월 (최근 거래일 기준)"),
|
||||
("재무연도", "2024년 연결재무제표"),
|
||||
("데이터 소스", "KRX(pykrx): 주가/변동성 | DART(OpenDartReader): 재무제표"),
|
||||
]
|
||||
for label, desc in overview_text:
|
||||
ws1.write(r, 1, label, cell_fmt)
|
||||
ws1.write(r, 2, desc, cell_fmt)
|
||||
r += 1
|
||||
|
||||
r += 1
|
||||
ws1.merge_range(r, 1, r, 2, "2. 분석 단계 (Pipeline)", subtitle_fmt); r += 1
|
||||
pipeline = [
|
||||
("Step 1", "KRX 주가 수집", "pykrx로 종목별 1년 일별 종가 수집 (약 246 거래일)"),
|
||||
("Step 2", "주가 변동성(σ_E) 산출", "일별 로그수익률의 표준편차 × √252 (연환산)"),
|
||||
("Step 3", "DART 재무제표 수집", "부채총계, 유동/비유동부채, 자본총계, 영업이익 등"),
|
||||
("Step 4", "부도점(D) 계산", "Default Point = 유동부채 + 0.5 × 비유동부채 (KMV 방식)"),
|
||||
("Step 5", "Merton 모형 풀이", "E = V·N(d₁) − D·e⁻ʳᵀ·N(d₂) + σ_E = (V/E)·N(d₁)·σ_V"),
|
||||
("Step 6", "DD / EDF 산출", "DD = [ln(V/D) + (μ−½σ²)T] / (σ·√T), EDF = N(−DD)"),
|
||||
("Step 7", "Composite Score", "DD(70%) + 레버리지(10%) + ROA(10%) + ICR(5%) + 규모(5%)"),
|
||||
("Step 8", "Shadow Rating", "Composite Score 기반 등급 할당 (글로벌 분포 참조)"),
|
||||
("Step 9", "부도율 블렌딩", "한국 EDF × 가중치 + 글로벌 벤치마크 × (1−가중치) + 베이지안 보정"),
|
||||
]
|
||||
step_hdr = wb.add_format({"bold": True, "font_size": 10, "font_color": "#1A237E"})
|
||||
for step, title, desc in pipeline:
|
||||
ws1.write(r, 1, f"{step}: {title}", step_hdr)
|
||||
ws1.write(r, 2, desc, cell_fmt)
|
||||
r += 1
|
||||
|
||||
r += 1
|
||||
ws1.merge_range(r, 1, r, 2, "3. 데이터 품질 조치", subtitle_fmt); r += 1
|
||||
quality = [
|
||||
("SPAC/리츠 제외", "62 SPAC + 26 리츠 제거 (비정상 DD > 50 방지)"),
|
||||
("금융업 제외", "은행/보험/증권 등 레버리지 특성이 다른 업종 제외"),
|
||||
("DD 캡핑", "DD를 [-5, 15] 범위로 제한 (극단값 영향 차단)"),
|
||||
("EDF floor", "AAA등급도 최소 부도율 0.001% (1bp) 적용"),
|
||||
("단조성 보정", "등급간 부도율이 역전되지 않도록 isotonic 보정"),
|
||||
]
|
||||
for label, desc in quality:
|
||||
ws1.write(r, 1, label, cell_fmt)
|
||||
ws1.write(r, 2, desc, cell_fmt)
|
||||
r += 1
|
||||
|
||||
r += 1
|
||||
ws1.merge_range(r, 1, r, 2, "4. 주요 한계점", subtitle_fmt); r += 1
|
||||
limits = [
|
||||
"• E(자기자본)에 장부가치(total_equity) 사용 — 시가총액 미확보로 인한 대체",
|
||||
"• 실제 부도 관측 없이 이론적 EDF를 부도율로 대체",
|
||||
"• 단일 시점(2024년말) 분석 — 시계열/경기주기 미반영",
|
||||
"• Merton 모형의 구조적 한계 (정규분포 가정, 단일 만기 가정)",
|
||||
]
|
||||
for text in limits:
|
||||
ws1.merge_range(r, 1, r, 2, text, note_fmt)
|
||||
r += 1
|
||||
|
||||
# ==========================================================
|
||||
# Sheet 2: Default Rates (등급별 부도율)
|
||||
# ==========================================================
|
||||
ws2 = wb.add_worksheet("Default Rates")
|
||||
ws2.hide_gridlines(2)
|
||||
ws2.set_column("A:A", 3)
|
||||
ws2.set_column("B:B", 8)
|
||||
ws2.set_column("C:C", 8)
|
||||
ws2.set_column("D:G", 14)
|
||||
ws2.set_tab_color("#E65100")
|
||||
|
||||
r = 1
|
||||
ws2.merge_range(r, 1, r, 6, "등급별 기대부도율 (EDF by Rating)", title_fmt)
|
||||
r += 2
|
||||
|
||||
# 테이블 헤더
|
||||
hdrs = ["등급", "기업수", "한국 EDF", "글로벌 DR", "블렌딩 DR", "최종 DR"]
|
||||
for i, h in enumerate(hdrs):
|
||||
ws2.write(r, i+1, h, header_fmt)
|
||||
r += 1
|
||||
|
||||
for _, row in default_rates.iterrows():
|
||||
rating = row["rating_grade"]
|
||||
rfmt = rating_fmts.get(rating, cell_center)
|
||||
ws2.write(r, 1, rating, rfmt)
|
||||
ws2.write(r, 2, int(row["n_firms"]), cell_center)
|
||||
ws2.write(r, 3, row["korean_dr"], pct_fmt)
|
||||
ws2.write(r, 4, row["global_dr"], pct_fmt)
|
||||
ws2.write(r, 5, row["blended_dr"], pct_fmt)
|
||||
ws2.write(r, 6, row["bayesian_dr"], pct_fmt)
|
||||
r += 1
|
||||
|
||||
# 차트: 등급별 부도율
|
||||
chart = wb.add_chart({"type": "column"})
|
||||
data_start = 4
|
||||
data_end = data_start + len(default_rates) - 1
|
||||
chart.add_series({
|
||||
"name": "한국 EDF",
|
||||
"categories": ["Default Rates", data_start, 1, data_end, 1],
|
||||
"values": ["Default Rates", data_start, 3, data_end, 3],
|
||||
"fill": {"color": "#1565C0"},
|
||||
})
|
||||
chart.add_series({
|
||||
"name": "글로벌 DR",
|
||||
"categories": ["Default Rates", data_start, 1, data_end, 1],
|
||||
"values": ["Default Rates", data_start, 4, data_end, 4],
|
||||
"fill": {"color": "#E0E0E0"},
|
||||
"border": {"color": "#757575"},
|
||||
})
|
||||
chart.add_series({
|
||||
"name": "최종 DR",
|
||||
"categories": ["Default Rates", data_start, 1, data_end, 1],
|
||||
"values": ["Default Rates", data_start, 6, data_end, 6],
|
||||
"fill": {"color": "#E65100"},
|
||||
})
|
||||
chart.set_title({"name": "등급별 기대부도율 비교", "name_font": {"size": 12}})
|
||||
chart.set_y_axis({"name": "부도율", "num_format": "0.00%"})
|
||||
chart.set_x_axis({"name": "신용등급"})
|
||||
chart.set_size({"width": 750, "height": 400})
|
||||
chart.set_style(10)
|
||||
ws2.insert_chart(f"B{r+2}", chart)
|
||||
|
||||
# ==========================================================
|
||||
# Sheet 3: Company Detail (종목별 상세)
|
||||
# ==========================================================
|
||||
ws3 = wb.add_worksheet("Company Detail")
|
||||
ws3.hide_gridlines(2)
|
||||
ws3.set_tab_color("#2E7D32")
|
||||
ws3.freeze_panes(3, 3) # 3행, 3열 고정
|
||||
|
||||
# 열 너비
|
||||
col_widths = {"A": 2, "B": 9, "C": 16, "D": 7, "E": 7, "F": 9,
|
||||
"G": 7, "H": 14, "I": 14, "J": 14, "K": 10,
|
||||
"L": 10, "M": 10, "N": 8}
|
||||
for col, w in col_widths.items():
|
||||
ws3.set_column(f"{col}:{col}", w)
|
||||
|
||||
r = 1
|
||||
ws3.merge_range(r, 1, r, 13, f"종목별 Shadow Rating 상세 ({len(companies):,}개 종목)", title_fmt)
|
||||
r += 1
|
||||
|
||||
# 헤더
|
||||
headers = ["종목코드", "종목명", "등급", "DD", "EDF(%)",
|
||||
"σ_E", "총자산(억)", "부채(억)", "자본(억)", "레버리지",
|
||||
"ROA(%)", "이자보상", "Solver"]
|
||||
for i, h in enumerate(headers):
|
||||
ws3.write(r, i+1, h, header_fmt)
|
||||
r += 1
|
||||
|
||||
for _, row in companies.iterrows():
|
||||
rating = str(row.get("shadow_rating", "NR"))
|
||||
rfmt = rating_fmts.get(rating, cell_center)
|
||||
dd = row["DD"]
|
||||
edf = row["EDF"]
|
||||
|
||||
# DD에 따른 조건부 색상
|
||||
if dd >= 3.5:
|
||||
dd_fmt = good_fmt
|
||||
elif dd >= 2.0:
|
||||
dd_fmt = warn_fmt
|
||||
else:
|
||||
dd_fmt = bad_fmt
|
||||
|
||||
ws3.write(r, 1, row["ticker"], cell_center)
|
||||
ws3.write(r, 2, row["name"][:15] if row["name"] else "", cell_fmt)
|
||||
ws3.write(r, 3, rating, rfmt)
|
||||
ws3.write(r, 4, dd, dec2_fmt)
|
||||
ws3.write(r, 5, edf * 100 if pd.notna(edf) else None, dec4_fmt)
|
||||
ws3.write(r, 6, row["sigma_E"], dec4_fmt)
|
||||
ws3.write(r, 7, row["total_assets"] / 1e8 if pd.notna(row["total_assets"]) else None, num_fmt)
|
||||
ws3.write(r, 8, row["total_liabilities"] / 1e8 if pd.notna(row["total_liabilities"]) else None, num_fmt)
|
||||
ws3.write(r, 9, row["total_equity"] / 1e8 if pd.notna(row["total_equity"]) else None, num_fmt)
|
||||
ws3.write(r, 10, row["leverage_ratio"], dec2_fmt)
|
||||
ws3.write(r, 11, row["roa"] * 100 if pd.notna(row["roa"]) else None, dec2_fmt)
|
||||
ws3.write(r, 12, row["interest_coverage"], dec2_fmt)
|
||||
ws3.write(r, 13, row["method"], cell_center)
|
||||
r += 1
|
||||
|
||||
# 조건부 서식 (DD 컬럼 전체)
|
||||
data_rows = len(companies)
|
||||
ws3.conditional_format(3, 4, 3 + data_rows, 4, {
|
||||
"type": "3_color_scale",
|
||||
"min_color": "#FFCDD2",
|
||||
"mid_color": "#FFF9C4",
|
||||
"max_color": "#C8E6C9",
|
||||
})
|
||||
|
||||
# 자동 필터
|
||||
ws3.autofilter(2, 1, 2 + data_rows, 13)
|
||||
|
||||
# ==========================================================
|
||||
# Sheet 4: Data Quality (데이터 품질)
|
||||
# ==========================================================
|
||||
ws4 = wb.add_worksheet("Data Quality")
|
||||
ws4.hide_gridlines(2)
|
||||
ws4.set_column("A:A", 3)
|
||||
ws4.set_column("B:B", 22)
|
||||
ws4.set_column("C:C", 15)
|
||||
ws4.set_column("D:D", 45)
|
||||
ws4.set_tab_color("#78909C")
|
||||
|
||||
r = 1
|
||||
ws4.merge_range(r, 1, r, 3, "데이터 품질 검증 결과", title_fmt)
|
||||
r += 2
|
||||
|
||||
ws4.merge_range(r, 1, r, 3, "파이프라인 통계", subtitle_fmt); r += 1
|
||||
pipe_stats = [
|
||||
("DART 상장기업", f"{stats['total_listed']:,}개", "OpenDartReader corp_codes 기준"),
|
||||
("KRX 주가 수집", f"{stats['market_data']:,}개", "120거래일 이상 데이터 보유 종목"),
|
||||
("DART 재무제표", f"{stats['financial_data']:,}개", "2024년 연결/개별재무제표"),
|
||||
("비정상 종목 제외", f"{stats['excluded']:,}개", "SPAC(62) + 리츠(26) + 금융업 등"),
|
||||
("최종 분석 대상", f"{stats['merton_results']:,}개", "Merton DD/EDF 산출 완료"),
|
||||
]
|
||||
for label, value, desc in pipe_stats:
|
||||
ws4.write(r, 1, label, cell_fmt)
|
||||
ws4.write(r, 2, value, cell_center)
|
||||
ws4.write(r, 3, desc, cell_fmt)
|
||||
r += 1
|
||||
|
||||
r += 1
|
||||
ws4.merge_range(r, 1, r, 3, "Merton 산출 품질", subtitle_fmt); r += 1
|
||||
|
||||
n_converged = (companies["method"] == "fsolve").sum() + (companies["method"] == "iterative").sum()
|
||||
n_fallback = (companies["method"] == "naive_fallback").sum()
|
||||
|
||||
quality_stats = [
|
||||
("fsolve 수렴", f"{n_converged:,}개", f"전체의 {n_converged/len(companies)*100:.1f}%"),
|
||||
("naive_fallback", f"{n_fallback:,}개", "Merton 미수렴 → Bharath-Shumway 간편 DD"),
|
||||
("DD 평균 / 중앙값", f"{companies['DD'].mean():.2f} / {companies['DD'].median():.2f}", "캡핑 후 [-5, 15] 범위"),
|
||||
("EDF 평균 / 중앙값", f"{companies['EDF'].mean():.4%} / {companies['EDF'].median():.4%}", "이론적 부도확률"),
|
||||
("변동성 평균", f"{companies['sigma_E'].mean():.2%}", "연환산 주가 변동성"),
|
||||
]
|
||||
for label, value, desc in quality_stats:
|
||||
ws4.write(r, 1, label, cell_fmt)
|
||||
ws4.write(r, 2, value, cell_center)
|
||||
ws4.write(r, 3, desc, cell_fmt)
|
||||
r += 1
|
||||
|
||||
# 등급 분포 pie chart
|
||||
r += 1
|
||||
ws4.merge_range(r, 1, r, 3, "등급 분포", subtitle_fmt); r += 1
|
||||
|
||||
ig_count = len(companies[companies["shadow_rating"].isin(
|
||||
["AAA","AA+","AA","AA-","A+","A","A-","BBB+","BBB","BBB-"]
|
||||
)])
|
||||
sg_count = len(companies) - ig_count
|
||||
|
||||
ws4.write(r, 1, "투자등급 (AAA~BBB-)", cell_fmt)
|
||||
ws4.write(r, 2, f"{ig_count:,}개 ({ig_count/len(companies)*100:.1f}%)", good_fmt)
|
||||
r += 1
|
||||
ws4.write(r, 1, "투기등급 (BB+~CCC-)", cell_fmt)
|
||||
ws4.write(r, 2, f"{sg_count:,}개 ({sg_count/len(companies)*100:.1f}%)", bad_fmt)
|
||||
|
||||
# ---- 완료 ----
|
||||
wb.close()
|
||||
print(f"[Report] Excel saved: {output_path}")
|
||||
print(f" - 4 sheets: Overview, Default Rates, Company Detail ({len(companies):,} rows), Data Quality")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="EDF Excel 리포트 생성")
|
||||
parser.add_argument("--output", default=None, help="출력 파일 경로")
|
||||
args = parser.parse_args()
|
||||
|
||||
output_dir = Path(__file__).parent.parent.parent / "outputs"
|
||||
output_dir.mkdir(exist_ok=True)
|
||||
|
||||
if args.output:
|
||||
output_path = args.output
|
||||
else:
|
||||
output_path = str(output_dir / f"EDF_Report_{datetime.now().strftime('%Y%m%d')}.xlsx")
|
||||
|
||||
create_report(output_path)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,34 +1,6 @@
|
||||
"""DB stats and sample data"""
|
||||
import sqlite3
|
||||
|
||||
conn = sqlite3.connect("data/edf.db")
|
||||
|
||||
print("=== DB Stats ===")
|
||||
for t in ["companies", "market_data", "financial_data", "volatility", "merton_results"]:
|
||||
c = conn.execute(f"SELECT COUNT(*) FROM {t}").fetchone()[0]
|
||||
print(f" {t}: {c:,}")
|
||||
|
||||
print("\n=== Volatility sample ===")
|
||||
rows = conn.execute("SELECT ticker, sigma_E FROM volatility ORDER BY ticker LIMIT 5").fetchall()
|
||||
for r in rows:
|
||||
print(f" {r[0]}: sigma_E={r[1]:.4f}")
|
||||
|
||||
print("\n=== Financial sample ===")
|
||||
rows = conn.execute("""
|
||||
SELECT ticker, total_assets, default_point, leverage_ratio
|
||||
FROM financial_data WHERE total_assets IS NOT NULL
|
||||
ORDER BY total_assets DESC LIMIT 5
|
||||
""").fetchall()
|
||||
for r in rows:
|
||||
dp = f"{r[2]:,.0f}" if r[2] else "N/A"
|
||||
lev = f"{r[3]:.3f}" if r[3] else "N/A"
|
||||
print(f" {r[0]}: TA={r[1]:,.0f} DP={dp} LEV={lev}")
|
||||
|
||||
# Overlap: tickers with BOTH volatility AND financial_data
|
||||
both = conn.execute("""
|
||||
SELECT COUNT(DISTINCT v.ticker)
|
||||
FROM volatility v JOIN financial_data f ON v.ticker = f.ticker
|
||||
""").fetchone()[0]
|
||||
print(f"\n=== Merton 산출 가능 종목 (KRX+DART 모두 있는 종목): {both} ===")
|
||||
|
||||
conn.close()
|
||||
"""Verify Excel file"""
|
||||
import openpyxl
|
||||
wb = openpyxl.load_workbook("outputs/EDF_Report_20260312.xlsx")
|
||||
for s in wb.sheetnames:
|
||||
ws = wb[s]
|
||||
print(f" {s}: {ws.max_row} rows x {ws.max_column} cols")
|
||||
|
||||
Reference in New Issue
Block a user