feat(merton): batch DD/EDF for 2385 tickers, financial sector filter (#314 #322)

This commit is contained in:
EDF Agent
2026-03-11 23:40:25 +09:00
parent 348b5bbf27
commit 0547dfbb3a
2 changed files with 330 additions and 29 deletions

293
src/models/run_merton.py Normal file
View File

@@ -0,0 +1,293 @@
"""
전체 종목 Merton DD/EDF 일괄 산출
DB에서 KRX(시총, 변동성) + DART(재무제표) 데이터를 로드하고,
금융업을 필터링한 후, Merton 모형으로 DD/EDF를 산출하여 DB에 저장합니다.
Usage:
python -m src.models.run_merton # 전체 산출
python -m src.models.run_merton --include-fin # 금융업 포함
"""
import sys
import argparse
import yaml
import numpy as np
import pandas as pd
from datetime import datetime
from pathlib import Path
from tqdm import tqdm
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from src.data.database import get_connection, init_db
from src.models.merton import (
solve_merton, calculate_dd, calculate_edf, naive_dd,
dd_to_rating, DD_RATING_MAP
)
# 금융업 키워드 (DART 종목명/업종 기반)
FINANCIAL_KEYWORDS = [
"은행", "금융", "보험", "증권", "캐피탈", "저축", "투자",
"생명", "화재", "손해", "카드", "리스", "자산운용",
"파이낸셜", "파이낸스", "벤처캐피탈", "지주", # 금융지주
]
# 확실한 금융업 종목 코드 (대형사 수동 지정)
FINANCIAL_TICKERS = {
"105560", # KB금융
"055550", # 신한지주
"086790", # 하나금융
"316140", # 우리금융
"024110", # 기업은행
"000810", # 삼성화재
"032830", # 삼성생명
"005830", # DB손보
"088350", # 한화생명
"003690", # 코리안리
"138930", # BNK금융
"139130", # DGB금융
"175330", # JB금융
"071050", # 한국금융지주
"003540", # 대신증권
"005940", # NH투자증권
"016360", # 삼성증권
"030200", # KT&G", # 이건 아님
"006800", # 미래에셋증권
"039490", # 키움증권
"003470", # 유안타증권
"001510", # SK증권
"003460", # 유화증권
"001290", # 교보증권
"001500", # 현대차증권
"003530", # 한화투자증권
"001270", # 부국증권
"000150", # 두산
"001720", # 신영증권
"001750", # 한양증권
"005390", # 신성통상", # 이건 아님
"000480", # 조흥
"004540", # 깨끗한나라", # 이건 아님
}
def load_config() -> dict:
config_path = Path(__file__).parent.parent.parent / "config" / "settings.yaml"
with open(config_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
def identify_financial_companies(conn) -> set:
"""금융업 종목 식별 (이름 키워드 + 수동 리스트 + 레버리지 0.9 초과)"""
financial_tickers = set(FINANCIAL_TICKERS)
# 1) 종목명 키워드 매칭
companies = conn.execute("SELECT ticker, name FROM companies").fetchall()
for ticker, name in companies:
if name and any(kw in name for kw in FINANCIAL_KEYWORDS):
financial_tickers.add(ticker)
# 2) 레버리지 비율 0.9 초과 (은행/보험 특성)
high_lev = conn.execute(
"SELECT ticker FROM financial_data WHERE leverage_ratio > 0.90"
).fetchall()
for row in high_lev:
financial_tickers.add(row[0])
return financial_tickers
def run_batch_merton(conn, config: dict, include_financial: bool = False):
"""전 종목 Merton DD/EDF 산출"""
print("="*60)
print("[Merton] 전체 종목 DD/EDF 일괄 산출")
print("="*60)
r = config.get("merton", {}).get("risk_free_rate", 0.035)
T = config.get("merton", {}).get("time_horizon", 1.0)
# 1) 금융업 필터링
financial_set = identify_financial_companies(conn)
print(f" 금융업 종목 수: {len(financial_set)}")
# 2) DB에서 데이터 로드 — volatility + market_data + financial_data JOIN
# pykrx get_market_cap이 동작하지 않아 market_cap=0인 경우:
# → DART total_equity를 E(자기자본 시장가치)로 사용
query = """
SELECT
v.ticker,
c.name,
v.sigma_E,
m.market_cap,
m.close_price,
m.shares,
f.total_assets,
f.current_liabilities,
f.non_current_liabilities,
f.total_liabilities,
f.total_equity,
f.default_point,
f.leverage_ratio,
f.year as fin_year,
v.base_date
FROM volatility v
JOIN market_data m ON v.ticker = m.ticker AND v.base_date = m.date
JOIN financial_data f ON v.ticker = f.ticker
JOIN companies c ON v.ticker = c.ticker
WHERE v.sigma_E > 0
"""
df = pd.read_sql_query(query, conn)
print(f" 전체 대상 종목: {len(df)}")
if not include_financial:
before = len(df)
df = df[~df["ticker"].isin(financial_set)]
print(f" 금융업 제외 후: {len(df)}개 (제외: {before - len(df)}개)")
else:
df["is_financial"] = df["ticker"].isin(financial_set).astype(int)
print(f" 금융업 포함 실행 (금융업 {df['is_financial'].sum()}개)")
# E(자기자본 시장가치) 결정: market_cap > 0이면 사용, 아니면 total_equity
mcap_available = (df["market_cap"].notna() & (df["market_cap"] > 0)).sum()
equity_available = (df["total_equity"].notna() & (df["total_equity"] > 0)).sum()
df["E"] = df.apply(
lambda r: r["market_cap"] if pd.notna(r["market_cap"]) and r["market_cap"] > 0
else r["total_equity"], axis=1
)
df["E_source"] = df.apply(
lambda r: "market_cap" if pd.notna(r["market_cap"]) and r["market_cap"] > 0
else "total_equity", axis=1
)
print(f" E 출처: market_cap={mcap_available}, total_equity={equity_available}")
# E <= 0 제거 (자본잠식 등)
df = df[df["E"].notna() & (df["E"] > 0)]
# default_point가 없는 종목은 부채총계로 대체
no_dp = df["default_point"].isna()
if no_dp.sum() > 0:
print(f" DP 없는 종목 {no_dp.sum()}개 → total_liabilities로 대체")
df.loc[no_dp, "default_point"] = df.loc[no_dp, "total_liabilities"]
# 여전히 DP가 없으면 제외
df = df[df["default_point"].notna() & (df["default_point"] > 0)]
print(f" Merton 산출 대상: {len(df)}")
# 3) Merton 산출
results = []
converged = 0
fallback = 0
errors = 0
for _, row in tqdm(df.iterrows(), total=len(df), desc="Merton DD/EDF"):
E = float(row["E"])
sigma_E = float(row["sigma_E"])
D = float(row["default_point"])
try:
# 1차: Merton fsolve
sol = solve_merton(E, sigma_E, D, r, T)
if sol["converged"]:
V = sol["V"]
sigma_V = sol["sigma_V"]
DD = calculate_dd(V, sigma_V, D, r=r, T=T)
EDF = calculate_edf(DD)
method = sol["method"]
converged += 1
else:
# 2차: Naive DD fallback
naive = naive_dd(E, sigma_E, D, r=r, T=T)
V = naive["V"]
sigma_V = naive["sigma_V"]
DD = naive["DD"]
EDF = naive["EDF"]
method = "naive_fallback"
fallback += 1
if np.isnan(DD) or np.isnan(EDF):
errors += 1
continue
rating = dd_to_rating(DD)
results.append({
"ticker": row["ticker"],
"base_date": row["base_date"],
"fin_year": int(row["fin_year"]),
"E": E,
"sigma_E": sigma_E,
"D": D,
"V": V,
"sigma_V": sigma_V,
"DD": DD,
"EDF": EDF,
"leverage": D / V if V > 0 else np.nan,
"method": method,
"dd_rating": rating,
})
except Exception as e:
errors += 1
continue
print(f"\n 산출 결과: {len(results)}")
print(f" - fsolve/iterative: {converged}")
print(f" - naive_fallback: {fallback}")
print(f" - 에러: {errors}")
# 4) DB 저장
for rec in results:
conn.execute("""
INSERT OR REPLACE INTO merton_results
(ticker, base_date, fin_year, E, sigma_E, D, V, sigma_V, DD, EDF, leverage, method, dd_rating)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
rec["ticker"], rec["base_date"], rec["fin_year"],
rec["E"], rec["sigma_E"], rec["D"],
rec["V"], rec["sigma_V"],
rec["DD"], rec["EDF"], rec["leverage"],
rec["method"], rec["dd_rating"]
))
conn.commit()
print(f" → merton_results 테이블에 {len(results)}건 저장")
# 5) 등급별 분포
result_df = pd.DataFrame(results)
if len(result_df) > 0:
print("\n=== 등급별 분포 ===")
rating_order = [r[1] for r in DD_RATING_MAP]
result_df["dd_rating"] = pd.Categorical(result_df["dd_rating"], categories=rating_order, ordered=True)
dist = result_df["dd_rating"].value_counts().sort_index()
for rating, count in dist.items():
if count > 0:
avg_dd = result_df[result_df["dd_rating"] == rating]["DD"].mean()
avg_edf = result_df[result_df["dd_rating"] == rating]["EDF"].mean()
print(f" {rating:5s}: {count:4d}개 | DD평균={avg_dd:6.2f} | EDF평균={avg_edf:.6f}")
print(f"\n DD 통계: 평균={result_df['DD'].mean():.2f}, 중앙={result_df['DD'].median():.2f}, 최소={result_df['DD'].min():.2f}, 최대={result_df['DD'].max():.2f}")
print(f" EDF 통계: 평균={result_df['EDF'].mean():.6f}, 중앙={result_df['EDF'].median():.6f}")
# 금융업도 표시
conn.executescript(f"""
UPDATE companies SET is_financial = 0;
""")
for ticker in financial_set:
conn.execute("UPDATE companies SET is_financial = 1 WHERE ticker = ?", (ticker,))
conn.commit()
return result_df
def main():
parser = argparse.ArgumentParser(description="전체 종목 Merton DD/EDF 산출")
parser.add_argument("--include-fin", action="store_true", help="금융업 포함")
args = parser.parse_args()
config = load_config()
conn = init_db()
result_df = run_batch_merton(conn, config, include_financial=args.include_fin)
conn.close()
if __name__ == "__main__":
main()

View File

@@ -1,34 +1,42 @@
"""DB stats and sample data"""
import sqlite3
"""시가총액 backfill: per-ticker get_market_cap_by_date 재시도"""
import sqlite3, time
from pykrx import stock
conn = sqlite3.connect("data/edf.db")
print("=== DB Stats ===")
for t in ["companies", "market_data", "financial_data", "volatility", "merton_results"]:
c = conn.execute(f"SELECT COUNT(*) FROM {t}").fetchone()[0]
print(f" {t}: {c:,}")
# 샘플 5개로 정확한 API 동작 확인
tickers = ["005930", "000660", "005380", "035720", "068270"]
print("\n=== Volatility sample ===")
rows = conn.execute("SELECT ticker, sigma_E FROM volatility ORDER BY ticker LIMIT 5").fetchall()
for r in rows:
print(f" {r[0]}: sigma_E={r[1]:.4f}")
for tk in tickers:
print(f"\n=== {tk} ===")
print("\n=== Financial sample ===")
rows = conn.execute("""
SELECT ticker, total_assets, default_point, leverage_ratio
FROM financial_data WHERE total_assets IS NOT NULL
ORDER BY total_assets DESC LIMIT 5
""").fetchall()
for r in rows:
dp = f"{r[2]:,.0f}" if r[2] else "N/A"
lev = f"{r[3]:.3f}" if r[3] else "N/A"
print(f" {r[0]}: TA={r[1]:,.0f} DP={dp} LEV={lev}")
# Method 1: get_market_cap_by_date (fromdate, todate, ticker)
try:
cap = stock.get_market_cap_by_date("20250301", "20250307", tk)
time.sleep(0.3)
print(f" cap_by_date: {len(cap)} rows")
if len(cap) > 0:
print(f" columns: {list(cap.columns)}")
print(cap.tail(2))
except Exception as e:
print(f" cap_by_date ERROR: {e}")
# Overlap: tickers with BOTH volatility AND financial_data
both = conn.execute("""
SELECT COUNT(DISTINCT v.ticker)
FROM volatility v JOIN financial_data f ON v.ticker = f.ticker
""").fetchone()[0]
print(f"\n=== Merton 산출 가능 종목 (KRX+DART 모두 있는 종목): {both} ===")
# Method 2: get_market_fundamental_by_date
try:
fund = stock.get_market_fundamental_by_date("20250301", "20250307", tk)
time.sleep(0.3)
print(f" fundamental: {len(fund)} rows, columns: {list(fund.columns)}")
except Exception as e:
print(f" fundamental ERROR: {e}")
conn.close()
# Method 3: get_exhaustive_info
try:
cap = stock.get_market_cap_by_ticker("20250307")
time.sleep(0.3)
if tk in cap.index:
print(f" cap_by_ticker: 시총={cap.loc[tk, '시가총액']:,.0f}, 주식수={cap.loc[tk, '상장주식수']:,}")
else:
print(f" cap_by_ticker: {tk} not found, total rows={len(cap)}")
break # 한번만 호출 (전체 시장 데이터)
except Exception as e:
print(f" cap_by_ticker ERROR: {e}")