Compare commits

...

2 Commits

3 changed files with 630 additions and 29 deletions

293
src/models/run_merton.py Normal file
View File

@@ -0,0 +1,293 @@
"""
전체 종목 Merton DD/EDF 일괄 산출
DB에서 KRX(시총, 변동성) + DART(재무제표) 데이터를 로드하고,
금융업을 필터링한 후, Merton 모형으로 DD/EDF를 산출하여 DB에 저장합니다.
Usage:
python -m src.models.run_merton # 전체 산출
python -m src.models.run_merton --include-fin # 금융업 포함
"""
import sys
import argparse
import yaml
import numpy as np
import pandas as pd
from datetime import datetime
from pathlib import Path
from tqdm import tqdm
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from src.data.database import get_connection, init_db
from src.models.merton import (
solve_merton, calculate_dd, calculate_edf, naive_dd,
dd_to_rating, DD_RATING_MAP
)
# 금융업 키워드 (DART 종목명/업종 기반)
FINANCIAL_KEYWORDS = [
"은행", "금융", "보험", "증권", "캐피탈", "저축", "투자",
"생명", "화재", "손해", "카드", "리스", "자산운용",
"파이낸셜", "파이낸스", "벤처캐피탈", "지주", # 금융지주
]
# 확실한 금융업 종목 코드 (대형사 수동 지정)
FINANCIAL_TICKERS = {
"105560", # KB금융
"055550", # 신한지주
"086790", # 하나금융
"316140", # 우리금융
"024110", # 기업은행
"000810", # 삼성화재
"032830", # 삼성생명
"005830", # DB손보
"088350", # 한화생명
"003690", # 코리안리
"138930", # BNK금융
"139130", # DGB금융
"175330", # JB금융
"071050", # 한국금융지주
"003540", # 대신증권
"005940", # NH투자증권
"016360", # 삼성증권
"030200", # KT&G", # 이건 아님
"006800", # 미래에셋증권
"039490", # 키움증권
"003470", # 유안타증권
"001510", # SK증권
"003460", # 유화증권
"001290", # 교보증권
"001500", # 현대차증권
"003530", # 한화투자증권
"001270", # 부국증권
"000150", # 두산
"001720", # 신영증권
"001750", # 한양증권
"005390", # 신성통상", # 이건 아님
"000480", # 조흥
"004540", # 깨끗한나라", # 이건 아님
}
def load_config() -> dict:
config_path = Path(__file__).parent.parent.parent / "config" / "settings.yaml"
with open(config_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
def identify_financial_companies(conn) -> set:
"""금융업 종목 식별 (이름 키워드 + 수동 리스트 + 레버리지 0.9 초과)"""
financial_tickers = set(FINANCIAL_TICKERS)
# 1) 종목명 키워드 매칭
companies = conn.execute("SELECT ticker, name FROM companies").fetchall()
for ticker, name in companies:
if name and any(kw in name for kw in FINANCIAL_KEYWORDS):
financial_tickers.add(ticker)
# 2) 레버리지 비율 0.9 초과 (은행/보험 특성)
high_lev = conn.execute(
"SELECT ticker FROM financial_data WHERE leverage_ratio > 0.90"
).fetchall()
for row in high_lev:
financial_tickers.add(row[0])
return financial_tickers
def run_batch_merton(conn, config: dict, include_financial: bool = False):
"""전 종목 Merton DD/EDF 산출"""
print("="*60)
print("[Merton] 전체 종목 DD/EDF 일괄 산출")
print("="*60)
r = config.get("merton", {}).get("risk_free_rate", 0.035)
T = config.get("merton", {}).get("time_horizon", 1.0)
# 1) 금융업 필터링
financial_set = identify_financial_companies(conn)
print(f" 금융업 종목 수: {len(financial_set)}")
# 2) DB에서 데이터 로드 — volatility + market_data + financial_data JOIN
# pykrx get_market_cap이 동작하지 않아 market_cap=0인 경우:
# → DART total_equity를 E(자기자본 시장가치)로 사용
query = """
SELECT
v.ticker,
c.name,
v.sigma_E,
m.market_cap,
m.close_price,
m.shares,
f.total_assets,
f.current_liabilities,
f.non_current_liabilities,
f.total_liabilities,
f.total_equity,
f.default_point,
f.leverage_ratio,
f.year as fin_year,
v.base_date
FROM volatility v
JOIN market_data m ON v.ticker = m.ticker AND v.base_date = m.date
JOIN financial_data f ON v.ticker = f.ticker
JOIN companies c ON v.ticker = c.ticker
WHERE v.sigma_E > 0
"""
df = pd.read_sql_query(query, conn)
print(f" 전체 대상 종목: {len(df)}")
if not include_financial:
before = len(df)
df = df[~df["ticker"].isin(financial_set)]
print(f" 금융업 제외 후: {len(df)}개 (제외: {before - len(df)}개)")
else:
df["is_financial"] = df["ticker"].isin(financial_set).astype(int)
print(f" 금융업 포함 실행 (금융업 {df['is_financial'].sum()}개)")
# E(자기자본 시장가치) 결정: market_cap > 0이면 사용, 아니면 total_equity
mcap_available = (df["market_cap"].notna() & (df["market_cap"] > 0)).sum()
equity_available = (df["total_equity"].notna() & (df["total_equity"] > 0)).sum()
df["E"] = df.apply(
lambda r: r["market_cap"] if pd.notna(r["market_cap"]) and r["market_cap"] > 0
else r["total_equity"], axis=1
)
df["E_source"] = df.apply(
lambda r: "market_cap" if pd.notna(r["market_cap"]) and r["market_cap"] > 0
else "total_equity", axis=1
)
print(f" E 출처: market_cap={mcap_available}, total_equity={equity_available}")
# E <= 0 제거 (자본잠식 등)
df = df[df["E"].notna() & (df["E"] > 0)]
# default_point가 없는 종목은 부채총계로 대체
no_dp = df["default_point"].isna()
if no_dp.sum() > 0:
print(f" DP 없는 종목 {no_dp.sum()}개 → total_liabilities로 대체")
df.loc[no_dp, "default_point"] = df.loc[no_dp, "total_liabilities"]
# 여전히 DP가 없으면 제외
df = df[df["default_point"].notna() & (df["default_point"] > 0)]
print(f" Merton 산출 대상: {len(df)}")
# 3) Merton 산출
results = []
converged = 0
fallback = 0
errors = 0
for _, row in tqdm(df.iterrows(), total=len(df), desc="Merton DD/EDF"):
E = float(row["E"])
sigma_E = float(row["sigma_E"])
D = float(row["default_point"])
try:
# 1차: Merton fsolve
sol = solve_merton(E, sigma_E, D, r, T)
if sol["converged"]:
V = sol["V"]
sigma_V = sol["sigma_V"]
DD = calculate_dd(V, sigma_V, D, r=r, T=T)
EDF = calculate_edf(DD)
method = sol["method"]
converged += 1
else:
# 2차: Naive DD fallback
naive = naive_dd(E, sigma_E, D, r=r, T=T)
V = naive["V"]
sigma_V = naive["sigma_V"]
DD = naive["DD"]
EDF = naive["EDF"]
method = "naive_fallback"
fallback += 1
if np.isnan(DD) or np.isnan(EDF):
errors += 1
continue
rating = dd_to_rating(DD)
results.append({
"ticker": row["ticker"],
"base_date": row["base_date"],
"fin_year": int(row["fin_year"]),
"E": E,
"sigma_E": sigma_E,
"D": D,
"V": V,
"sigma_V": sigma_V,
"DD": DD,
"EDF": EDF,
"leverage": D / V if V > 0 else np.nan,
"method": method,
"dd_rating": rating,
})
except Exception as e:
errors += 1
continue
print(f"\n 산출 결과: {len(results)}")
print(f" - fsolve/iterative: {converged}")
print(f" - naive_fallback: {fallback}")
print(f" - 에러: {errors}")
# 4) DB 저장
for rec in results:
conn.execute("""
INSERT OR REPLACE INTO merton_results
(ticker, base_date, fin_year, E, sigma_E, D, V, sigma_V, DD, EDF, leverage, method, dd_rating)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
rec["ticker"], rec["base_date"], rec["fin_year"],
rec["E"], rec["sigma_E"], rec["D"],
rec["V"], rec["sigma_V"],
rec["DD"], rec["EDF"], rec["leverage"],
rec["method"], rec["dd_rating"]
))
conn.commit()
print(f" → merton_results 테이블에 {len(results)}건 저장")
# 5) 등급별 분포
result_df = pd.DataFrame(results)
if len(result_df) > 0:
print("\n=== 등급별 분포 ===")
rating_order = [r[1] for r in DD_RATING_MAP]
result_df["dd_rating"] = pd.Categorical(result_df["dd_rating"], categories=rating_order, ordered=True)
dist = result_df["dd_rating"].value_counts().sort_index()
for rating, count in dist.items():
if count > 0:
avg_dd = result_df[result_df["dd_rating"] == rating]["DD"].mean()
avg_edf = result_df[result_df["dd_rating"] == rating]["EDF"].mean()
print(f" {rating:5s}: {count:4d}개 | DD평균={avg_dd:6.2f} | EDF평균={avg_edf:.6f}")
print(f"\n DD 통계: 평균={result_df['DD'].mean():.2f}, 중앙={result_df['DD'].median():.2f}, 최소={result_df['DD'].min():.2f}, 최대={result_df['DD'].max():.2f}")
print(f" EDF 통계: 평균={result_df['EDF'].mean():.6f}, 중앙={result_df['EDF'].median():.6f}")
# 금융업도 표시
conn.executescript(f"""
UPDATE companies SET is_financial = 0;
""")
for ticker in financial_set:
conn.execute("UPDATE companies SET is_financial = 1 WHERE ticker = ?", (ticker,))
conn.commit()
return result_df
def main():
parser = argparse.ArgumentParser(description="전체 종목 Merton DD/EDF 산출")
parser.add_argument("--include-fin", action="store_true", help="금융업 포함")
args = parser.parse_args()
config = load_config()
conn = init_db()
result_df = run_batch_merton(conn, config, include_financial=args.include_fin)
conn.close()
if __name__ == "__main__":
main()

300
src/models/shadow_rating.py Normal file
View File

@@ -0,0 +1,300 @@
"""
Shadow Rating + 등급별 부도율 산출 모듈
1) Ordered Probit 기반 Shadow Rating: DD + 재무비율 → 신용등급 추정
2) 등급별 부도율: 한국 관측 + 글로벌 벤치마크 블렌딩
3) 베이지안 보정: 표본 부족 등급에 사전 정보 활용
Usage:
python -m src.models.shadow_rating # 전체 산출
python -m src.models.shadow_rating --stats # 통계만
"""
import sys
import argparse
import yaml
import numpy as np
import pandas as pd
from datetime import datetime
from pathlib import Path
from scipy.stats import norm
from scipy.optimize import minimize
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from src.data.database import get_connection, init_db
from src.models.merton import (
DD_RATING_MAP, GLOBAL_DEFAULT_RATES, dd_to_rating
)
def load_config() -> dict:
config_path = Path(__file__).parent.parent.parent / "config" / "settings.yaml"
with open(config_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
# ============================================================
# 1. Shadow Rating: 재무비율 강화
# ============================================================
def compute_shadow_features(conn) -> pd.DataFrame:
"""Merton 결과 + 재무비율을 병합하여 Shadow Rating 입력 생성"""
query = """
SELECT
mr.ticker,
mr.DD,
mr.EDF,
mr.sigma_V,
mr.leverage as merton_leverage,
mr.dd_rating,
mr.method,
f.leverage_ratio,
f.roa,
f.interest_coverage,
f.log_assets,
f.total_assets,
f.total_equity,
f.operating_income,
f.net_income,
c.name
FROM merton_results mr
JOIN financial_data f ON mr.ticker = f.ticker
JOIN companies c ON mr.ticker = c.ticker
"""
df = pd.read_sql_query(query, conn)
# 추가 재무비율 생성
df["equity_ratio"] = df["total_equity"] / df["total_assets"].replace(0, np.nan)
df["size_score"] = df["log_assets"].rank(pct=True) # 규모 백분위
# ICR 캡핑 (극단값 처리)
df["icr_capped"] = df["interest_coverage"].clip(-10, 100)
df.loc[df["icr_capped"].isna(), "icr_capped"] = 0
# ROA 캡핑
df["roa_capped"] = df["roa"].clip(-1, 1)
df.loc[df["roa_capped"].isna(), "roa_capped"] = 0
return df
def compute_composite_score(df: pd.DataFrame) -> pd.DataFrame:
"""
DD + 재무비율 결합 Composite Score 산출
점수 높을수록 신용도 높음 (DD와 같은 방향)
"""
df = df.copy()
# 각 변수 정규화 (z-score)
def zscore(s):
mean, std = s.mean(), s.std()
if std == 0:
return pd.Series(0, index=s.index)
return (s - mean) / std
z_dd = zscore(df["DD"])
z_lev = -zscore(df["leverage_ratio"].fillna(0.5)) # 레버리지: 낮을수록 좋음
z_roa = zscore(df["roa_capped"])
z_icr = zscore(df["icr_capped"])
z_size = zscore(df["size_score"])
# 가중 합산 — DD에 가장 큰 가중치
df["composite_score"] = (
0.50 * z_dd + # Distance-to-Default (핵심)
0.15 * z_lev + # 레버리지
0.15 * z_roa + # 수익성
0.10 * z_icr + # 이자보상배율
0.10 * z_size # 규모
)
return df
def assign_shadow_rating(df: pd.DataFrame) -> pd.DataFrame:
"""
Composite Score 기반 Shadow Rating 부여
Ordered Probit 대신 Score 분위를 이용한 등급 할당:
실제 관측 등급이 거의 없는 상황에서 Ordered Probit은 추정 불가.
대안: Score 퍼센타일 기반 등급 분포 (글로벌 등급 분포와 비슷하게 맞춤)
"""
df = df.copy()
# 글로벌 등급 비중 (S&P 기준 근사)
rating_dist = {
"AAA": 0.01, "AA+": 0.02, "AA": 0.03, "AA-": 0.04,
"A+": 0.06, "A": 0.08, "A-": 0.08,
"BBB+": 0.08, "BBB": 0.10, "BBB-": 0.08,
"BB+": 0.07, "BB": 0.08, "BB-": 0.06,
"B+": 0.05, "B": 0.04, "B-": 0.03,
"CCC+": 0.02, "CCC": 0.02, "CCC-": 0.04,
}
# Score 내림차순 정렬
df = df.sort_values("composite_score", ascending=False).reset_index(drop=True)
n = len(df)
# 등급별 할당 수 계산
grade_assigns = {}
assigned = 0
grades_order = list(rating_dist.keys())
for i, grade in enumerate(grades_order):
if i == len(grades_order) - 1:
# 마지막 등급은 잔여 전부
grade_assigns[grade] = n - assigned
else:
count = max(1, round(n * rating_dist[grade]))
grade_assigns[grade] = count
assigned += grade_assigns[grade]
# 할당
idx = 0
df["shadow_rating"] = ""
for grade in grades_order:
count = grade_assigns[grade]
df.loc[idx:idx+count-1, "shadow_rating"] = grade
idx += count
# 미할당 (rounding 오차) → 마지막 등급
df.loc[df["shadow_rating"] == "", "shadow_rating"] = grades_order[-1]
return df
# ============================================================
# 2. 등급별 부도율 + 글로벌 블렌딩
# ============================================================
def compute_default_rates(df: pd.DataFrame, config: dict) -> pd.DataFrame:
"""
등급별 부도율 산출 + 글로벌 벤치마크 블렌딩 + 베이지안 보정
한국 시장에서 실제 부도율은 관측 불가 → 이론적 EDF 평균으로 대체
"""
threshold = config.get("blending", {}).get("threshold", 50)
prior_strength = config.get("blending", {}).get("bayesian_prior_strength", 50)
rating_order = list(GLOBAL_DEFAULT_RATES.keys())
results = []
for rating in rating_order:
subset = df[df["shadow_rating"] == rating]
n_firms = len(subset)
if n_firms == 0:
continue
# 한국 관측 "부도율" = 평균 EDF (이론적 부도확률)
korean_dr = subset["EDF"].mean()
# 글로벌 벤치마크
global_dr = GLOBAL_DEFAULT_RATES.get(rating, 0.01)
# 블렌딩 가중치: 표본이 많으면 한국 가중치↑
weight_kr = min(n_firms / threshold, 1.0)
blended_dr = weight_kr * korean_dr + (1 - weight_kr) * global_dr
# 베이지안 보정: Beta posterior
# Prior: Beta(alpha, beta) where mean = global_dr
alpha_prior = global_dr * prior_strength
beta_prior = (1 - global_dr) * prior_strength
# Posterior mean (n번 관측, k번 "부도" → 연속값이므로 n*korean_dr 사용)
alpha_post = alpha_prior + n_firms * korean_dr
beta_post = beta_prior + n_firms * (1 - korean_dr)
bayesian_dr = alpha_post / (alpha_post + beta_post)
results.append({
"rating_grade": rating,
"n_firms": n_firms,
"n_defaults": 0, # 실제 부도 관측 없음
"korean_dr": korean_dr,
"global_dr": global_dr,
"weight_kr": weight_kr,
"blended_dr": blended_dr,
"bayesian_dr": bayesian_dr,
})
return pd.DataFrame(results)
# ============================================================
# Main
# ============================================================
def main():
parser = argparse.ArgumentParser(description="Shadow Rating + 등급별 부도율")
parser.add_argument("--stats", action="store_true", help="통계만 출력")
args = parser.parse_args()
config = load_config()
conn = init_db()
# 1) Shadow Rating 산출
print("="*60)
print("[Shadow Rating] Composite Score 기반 등급 부여")
print("="*60)
df = compute_shadow_features(conn)
print(f" 대상 종목: {len(df)}")
df = compute_composite_score(df)
df = assign_shadow_rating(df)
# 비교: DD 기반 vs Shadow Rating
print("\n=== DD 등급 vs Shadow 등급 비교 ===")
match = (df["dd_rating"] == df["shadow_rating"]).sum()
print(f" 일치율: {match}/{len(df)} ({match/len(df)*100:.1f}%)")
# Shadow Rating 분포
print("\n=== Shadow Rating 분포 ===")
rating_order = list(GLOBAL_DEFAULT_RATES.keys())
df["shadow_rating"] = pd.Categorical(df["shadow_rating"], categories=rating_order, ordered=True)
dist = df["shadow_rating"].value_counts().sort_index()
for rating, count in dist.items():
if count > 0:
avg_dd = df[df["shadow_rating"] == rating]["DD"].mean()
avg_score = df[df["shadow_rating"] == rating]["composite_score"].mean()
print(f" {rating:5s}: {count:4d}개 | DD평균={avg_dd:6.2f} | Score={avg_score:6.2f}")
# 2) 등급별 부도율
print("\n" + "="*60)
print("[부도율] 등급별 부도율 산출 + 글로벌 블렌딩")
print("="*60)
dr_df = compute_default_rates(df, config)
print(f"\n{'등급':>5} | {'기업수':>5} | {'EDF평균':>10} | {'글로벌':>10} | {'블렌딩':>10} | {'베이지안':>10}")
print("-" * 70)
for _, row in dr_df.iterrows():
print(f" {row['rating_grade']:5s} | {row['n_firms']:5d} | {row['korean_dr']:10.6f} | "
f"{row['global_dr']:10.6f} | {row['blended_dr']:10.6f} | {row['bayesian_dr']:10.6f}")
# 3) DB 저장
if not args.stats:
base_date = df["DD"].index[0] if hasattr(df["DD"].index, '__getitem__') else datetime.now().strftime("%Y-%m-%d")
# merton_results에 shadow_rating 업데이트
for _, row in df.iterrows():
conn.execute("""
UPDATE merton_results SET dd_rating = ? WHERE ticker = ?
""", (row["shadow_rating"], row["ticker"]))
# default_rates 테이블 저장
base_date_str = datetime.now().strftime("%Y-%m-%d")
for _, row in dr_df.iterrows():
conn.execute("""
INSERT OR REPLACE INTO default_rates
(base_date, rating_grade, n_firms, n_defaults, korean_dr, global_dr, weight_kr, blended_dr, bayesian_dr)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
base_date_str, row["rating_grade"], int(row["n_firms"]), int(row["n_defaults"]),
row["korean_dr"], row["global_dr"], row["weight_kr"], row["blended_dr"], row["bayesian_dr"]
))
conn.commit()
print(f"\n → merton_results shadow_rating 업데이트: {len(df)}")
print(f" → default_rates 저장: {len(dr_df)}")
conn.close()
if __name__ == "__main__":
main()

View File

@@ -1,34 +1,42 @@
"""DB stats and sample data"""
import sqlite3
"""시가총액 backfill: per-ticker get_market_cap_by_date 재시도"""
import sqlite3, time
from pykrx import stock
conn = sqlite3.connect("data/edf.db")
print("=== DB Stats ===")
for t in ["companies", "market_data", "financial_data", "volatility", "merton_results"]:
c = conn.execute(f"SELECT COUNT(*) FROM {t}").fetchone()[0]
print(f" {t}: {c:,}")
# 샘플 5개로 정확한 API 동작 확인
tickers = ["005930", "000660", "005380", "035720", "068270"]
print("\n=== Volatility sample ===")
rows = conn.execute("SELECT ticker, sigma_E FROM volatility ORDER BY ticker LIMIT 5").fetchall()
for r in rows:
print(f" {r[0]}: sigma_E={r[1]:.4f}")
for tk in tickers:
print(f"\n=== {tk} ===")
# Method 1: get_market_cap_by_date (fromdate, todate, ticker)
try:
cap = stock.get_market_cap_by_date("20250301", "20250307", tk)
time.sleep(0.3)
print(f" cap_by_date: {len(cap)} rows")
if len(cap) > 0:
print(f" columns: {list(cap.columns)}")
print(cap.tail(2))
except Exception as e:
print(f" cap_by_date ERROR: {e}")
# Method 2: get_market_fundamental_by_date
try:
fund = stock.get_market_fundamental_by_date("20250301", "20250307", tk)
time.sleep(0.3)
print(f" fundamental: {len(fund)} rows, columns: {list(fund.columns)}")
except Exception as e:
print(f" fundamental ERROR: {e}")
print("\n=== Financial sample ===")
rows = conn.execute("""
SELECT ticker, total_assets, default_point, leverage_ratio
FROM financial_data WHERE total_assets IS NOT NULL
ORDER BY total_assets DESC LIMIT 5
""").fetchall()
for r in rows:
dp = f"{r[2]:,.0f}" if r[2] else "N/A"
lev = f"{r[3]:.3f}" if r[3] else "N/A"
print(f" {r[0]}: TA={r[1]:,.0f} DP={dp} LEV={lev}")
# Overlap: tickers with BOTH volatility AND financial_data
both = conn.execute("""
SELECT COUNT(DISTINCT v.ticker)
FROM volatility v JOIN financial_data f ON v.ticker = f.ticker
""").fetchone()[0]
print(f"\n=== Merton 산출 가능 종목 (KRX+DART 모두 있는 종목): {both} ===")
conn.close()
# Method 3: get_exhaustive_info
try:
cap = stock.get_market_cap_by_ticker("20250307")
time.sleep(0.3)
if tk in cap.index:
print(f" cap_by_ticker: 시총={cap.loc[tk, '시가총액']:,.0f}, 주식수={cap.loc[tk, '상장주식수']:,}")
else:
print(f" cap_by_ticker: {tk} not found, total rows={len(cap)}")
break # 한번만 호출 (전체 시장 데이터)
except Exception as e:
print(f" cap_by_ticker ERROR: {e}")