feat: initial project setup - Merton-KMV model, data pipeline, .agents workflows

This commit is contained in:
EDF Agent
2026-03-11 19:59:38 +09:00
commit a20a7207c4
28 changed files with 3212 additions and 0 deletions

1
src/__init__.py Normal file
View File

@@ -0,0 +1 @@
# src package

1
src/data/__init__.py Normal file
View File

@@ -0,0 +1 @@
# data package

315
src/data/dart_fetcher.py Normal file
View File

@@ -0,0 +1,315 @@
"""
DART 재무제표 데이터 수집 모듈
OpenDartReader를 사용하여 KRX 상장사의 재무제표(부채구조)를 수집합니다.
Merton 모형에 필요한 유동부채(STD), 비유동부채(LTD), 총자산 등을 추출합니다.
"""
import os
import time
import yaml
import pandas as pd
import numpy as np
from datetime import datetime
from pathlib import Path
from tqdm import tqdm
try:
import OpenDartReader
except ImportError:
raise ImportError("OpenDartReader가 설치되지 않았습니다. pip install opendart-reader 를 실행하세요.")
def load_config() -> dict:
"""config/settings.yaml 로드"""
config_path = Path(__file__).parent.parent.parent / "config" / "settings.yaml"
with open(config_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
def init_dart(api_key: str = None) -> OpenDartReader:
"""OpenDartReader 초기화"""
if api_key is None:
config = load_config()
api_key = config["dart_api_key"]
return OpenDartReader(api_key)
def get_corp_codes(dart: OpenDartReader) -> pd.DataFrame:
"""
DART 기업 코드 목록 조회 (상장사)
Returns
-------
pd.DataFrame
columns: [corp_code, corp_name, stock_code, modify_date]
"""
corp_list = dart.corp_codes
# 상장사만 (stock_code가 존재하는 것)
listed = corp_list[corp_list["stock_code"].notna() & (corp_list["stock_code"] != " ")]
return listed.reset_index(drop=True)
def extract_financial_items(dart: OpenDartReader,
corp_code: str,
year: int,
report_code: str = "11011") -> dict:
"""
특정 기업의 재무제표에서 Merton 모형에 필요한 항목 추출.
Parameters
----------
dart : OpenDartReader
corp_code : str
DART 고유 기업 코드
year : int
사업연도
report_code : str
보고서 코드 (11011=사업보고서/연간, 11012=반기, 11013=1분기, 11014=3분기)
Returns
-------
dict with keys:
total_assets, current_liabilities (STD), non_current_liabilities (LTD),
total_liabilities, total_equity, revenue, operating_income,
interest_expense, net_income, ebitda_proxy
"""
result = {
"total_assets": np.nan,
"current_liabilities": np.nan, # 유동부채 = STD
"non_current_liabilities": np.nan, # 비유동부채 = LTD
"total_liabilities": np.nan,
"total_equity": np.nan,
"revenue": np.nan,
"operating_income": np.nan,
"interest_expense": np.nan,
"net_income": np.nan,
}
try:
# 재무상태표 (BS)
bs = dart.finstate(corp_code, year, reprt_code=report_code)
if bs is None or len(bs) == 0:
return result
# 연결재무제표 우선, 없으면 개별
if "fs_div" in bs.columns:
cfs = bs[bs["fs_div"] == "CFS"] # 연결
if len(cfs) == 0:
cfs = bs[bs["fs_div"] == "OFS"] # 개별
else:
cfs = bs
if len(cfs) == 0:
return result
# 항목명으로 검색하는 헬퍼
def find_amount(df, keywords, col="thstrm_amount"):
"""키워드 목록으로 금액 검색"""
if col not in df.columns:
# 대체 컬럼 시도
for alt_col in ["thstrm_amount", "frmtrm_amount", "bfefrmtrm_amount"]:
if alt_col in df.columns:
col = alt_col
break
else:
return np.nan
for kw in keywords:
matches = df[df["account_nm"].str.contains(kw, na=False, regex=False)]
if len(matches) > 0:
val = matches.iloc[0][col]
if pd.notna(val):
# 쉼표 제거 후 숫자 변환
if isinstance(val, str):
val = val.replace(",", "").replace(" ", "")
try:
return float(val)
except ValueError:
return np.nan
return float(val)
return np.nan
# 재무상태표 항목 추출
result["total_assets"] = find_amount(cfs, ["자산총계", "자산 총계"])
result["current_liabilities"] = find_amount(cfs, ["유동부채", "유동 부채"])
result["non_current_liabilities"] = find_amount(cfs, ["비유동부채", "비유동 부채"])
result["total_liabilities"] = find_amount(cfs, ["부채총계", "부채 총계"])
result["total_equity"] = find_amount(cfs, ["자본총계", "자본 총계"])
# 손익계산서 항목
result["revenue"] = find_amount(cfs, ["매출액", "매출", "영업수익", "수익(매출액)"])
result["operating_income"] = find_amount(cfs, ["영업이익", "영업 이익"])
result["net_income"] = find_amount(cfs, ["당기순이익", "당기 순이익", "분기순이익"])
result["interest_expense"] = find_amount(cfs, ["이자비용", "금융비용", "금융원가"])
except Exception as e:
result["_error"] = str(e)
return result
def compute_derived_ratios(row: dict) -> dict:
"""
Merton 모형 및 Shadow Rating에 필요한 파생 비율 계산
"""
derived = {}
# 부도점 (Default Point) = STD + 0.5 * LTD
std = row.get("current_liabilities", np.nan)
ltd = row.get("non_current_liabilities", np.nan)
if pd.notna(std) and pd.notna(ltd):
derived["default_point"] = std + 0.5 * ltd
else:
derived["default_point"] = np.nan
# 레버리지 비율 = 총부채 / 총자산
ta = row.get("total_assets", np.nan)
tl = row.get("total_liabilities", np.nan)
if pd.notna(tl) and pd.notna(ta) and ta > 0:
derived["leverage_ratio"] = tl / ta
else:
derived["leverage_ratio"] = np.nan
# 이자보상비율 = 영업이익 / 이자비용
oi = row.get("operating_income", np.nan)
ie = row.get("interest_expense", np.nan)
if pd.notna(oi) and pd.notna(ie) and ie > 0:
derived["interest_coverage"] = oi / ie
else:
derived["interest_coverage"] = np.nan
# ROA = 순이익 / 총자산
ni = row.get("net_income", np.nan)
if pd.notna(ni) and pd.notna(ta) and ta > 0:
derived["roa"] = ni / ta
else:
derived["roa"] = np.nan
# 유동비율 = (총자산 - 비유동자산 근사) / 유동부채
# 간접 산출: 유동자산 ≈ 총자산 - (총부채 - 유동부채 + 자본 - 유동자산)
# 단순화: current_ratio = (총자산 - 비유동부채 근사) 는 어려우므로,
# 유동부채 대비 총자산 비율로 대체
if pd.notna(ta) and pd.notna(std) and std > 0:
derived["asset_to_std_ratio"] = ta / std
else:
derived["asset_to_std_ratio"] = np.nan
# 기업 규모 (로그 총자산)
if pd.notna(ta) and ta > 0:
derived["log_assets"] = np.log(ta)
else:
derived["log_assets"] = np.nan
return derived
def fetch_all_financial_data(tickers: list,
year: int = 2024,
config: dict = None) -> pd.DataFrame:
"""
전 상장사의 재무제표 데이터를 수집.
Parameters
----------
tickers : list
종목코드(6자리) 리스트
year : int
사업연도
config : dict
설정 딕셔너리
Returns
-------
pd.DataFrame
종목별 재무 데이터 + 파생 비율
"""
if config is None:
config = load_config()
dart = init_dart(config["dart_api_key"])
sleep_sec = config.get("krx", {}).get("sleep_seconds", 0.5)
# DART 기업코드 목록
print("[DART] 기업코드 목록 로딩...")
corp_codes = get_corp_codes(dart)
# ticker → corp_code 매핑
ticker_to_corp = {}
for _, row in corp_codes.iterrows():
sc = str(row["stock_code"]).strip()
if sc and sc != "nan":
ticker_to_corp[sc] = row["corp_code"]
print(f"[DART] {len(ticker_to_corp)}개 상장사 매핑 완료")
records = []
errors = []
print(f"[DART] {year}년 재무제표 수집 중...")
for ticker in tqdm(tickers, desc="재무제표 수집"):
ticker_str = str(ticker).zfill(6)
corp_code = ticker_to_corp.get(ticker_str)
if corp_code is None:
errors.append({"ticker": ticker_str, "error": "corp_code not found"})
continue
try:
fin_data = extract_financial_items(dart, corp_code, year)
derived = compute_derived_ratios(fin_data)
record = {"ticker": ticker_str, "year": year}
record.update(fin_data)
record.update(derived)
records.append(record)
time.sleep(sleep_sec) # Rate limiting
except Exception as e:
errors.append({"ticker": ticker_str, "error": str(e)})
time.sleep(sleep_sec)
continue
df = pd.DataFrame(records)
if "ticker" in df.columns:
df = df.set_index("ticker")
print(f"[DART] 수집 완료: {len(df)}개 (에러: {len(errors)}건)")
return df, errors
def save_financial_data(df: pd.DataFrame, errors: list,
year: int, output_dir: str = None):
"""수집 결과를 CSV로 저장"""
if output_dir is None:
output_dir = Path(__file__).parent.parent.parent / "data" / "raw"
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
df.to_csv(output_dir / f"financial_data_{year}.csv", encoding="utf-8-sig")
if errors:
pd.DataFrame(errors).to_csv(
output_dir / f"financial_errors_{year}.csv", encoding="utf-8-sig", index=False
)
print(f"[DART] 데이터 저장 완료: {output_dir}")
# ---- CLI 실행 ----
if __name__ == "__main__":
from krx_fetcher import load_config, get_market_cap_all
config = load_config()
year = config.get("end_year", 2024) - 1 # 가장 최근 확정 사업연도
# 상장사 목록 가져오기
target_date = datetime.now().strftime("%Y%m%d")
market_cap = get_market_cap_all(target_date)
tickers = list(market_cap.index)
# 재무 데이터 수집
df, errors = fetch_all_financial_data(tickers, year=year, config=config)
save_financial_data(df, errors, year)

311
src/data/database.py Normal file
View File

@@ -0,0 +1,311 @@
"""
EDF 프로젝트 SQLite 데이터베이스 모듈
KRX 주가, DART 재무제표, Merton DD/EDF 결과를 영구 저장합니다.
"""
import sqlite3
import pandas as pd
import numpy as np
from pathlib import Path
from datetime import datetime
DB_PATH = Path(__file__).parent.parent.parent / "data" / "edf.db"
def get_connection(db_path: str = None) -> sqlite3.Connection:
"""SQLite 연결 반환"""
if db_path is None:
db_path = str(DB_PATH)
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
def init_db(conn: sqlite3.Connection = None):
"""데이터베이스 스키마 초기화"""
if conn is None:
conn = get_connection()
conn.executescript("""
-- ============================================================
-- 1. 종목 마스터
-- ============================================================
CREATE TABLE IF NOT EXISTS companies (
ticker TEXT PRIMARY KEY, -- 종목코드 (6자리)
name TEXT, -- 종목명
market TEXT, -- KOSPI / KOSDAQ
sector TEXT, -- 업종
corp_code TEXT, -- DART 고유코드
is_financial INTEGER DEFAULT 0, -- 금융업 여부 (1=금융, 0=비금융)
updated_at TEXT
);
-- ============================================================
-- 2. 일별 시장 데이터 (주가, 시가총액)
-- ============================================================
CREATE TABLE IF NOT EXISTS market_data (
ticker TEXT NOT NULL,
date TEXT NOT NULL, -- YYYY-MM-DD
close_price REAL, -- 종가 (원)
market_cap REAL, -- 시가총액 (원)
volume INTEGER, -- 거래량
shares INTEGER, -- 상장주식수
PRIMARY KEY (ticker, date),
FOREIGN KEY (ticker) REFERENCES companies(ticker)
);
CREATE INDEX IF NOT EXISTS idx_market_data_date ON market_data(date);
-- ============================================================
-- 3. 재무제표 데이터 (연간/분기)
-- ============================================================
CREATE TABLE IF NOT EXISTS financial_data (
ticker TEXT NOT NULL,
year INTEGER NOT NULL, -- 사업연도
report_type TEXT DEFAULT 'annual', -- annual / q1 / q2 / q3
total_assets REAL, -- 총자산
current_liabilities REAL, -- 유동부채 (STD)
non_current_liabilities REAL, -- 비유동부채 (LTD)
total_liabilities REAL, -- 부채총계
total_equity REAL, -- 자본총계
revenue REAL, -- 매출액
operating_income REAL, -- 영업이익
net_income REAL, -- 당기순이익
interest_expense REAL, -- 이자비용
-- 파생 항목 (계산됨)
default_point REAL, -- STD + 0.5*LTD
leverage_ratio REAL, -- 총부채/총자산
interest_coverage REAL, -- 영업이익/이자비용
roa REAL, -- 순이익/총자산
log_assets REAL, -- ln(총자산)
fetched_at TEXT, -- 수집 시각
PRIMARY KEY (ticker, year, report_type),
FOREIGN KEY (ticker) REFERENCES companies(ticker)
);
-- ============================================================
-- 4. 변동성 산출 결과
-- ============================================================
CREATE TABLE IF NOT EXISTS volatility (
ticker TEXT NOT NULL,
base_date TEXT NOT NULL, -- 기준일 (YYYY-MM-DD)
method TEXT NOT NULL, -- historical / ewma / garch
sigma_E REAL, -- 주가변동성 (연환산)
n_trading_days INTEGER, -- 사용 거래일수
PRIMARY KEY (ticker, base_date, method),
FOREIGN KEY (ticker) REFERENCES companies(ticker)
);
-- ============================================================
-- 5. Merton DD/EDF 결과
-- ============================================================
CREATE TABLE IF NOT EXISTS merton_results (
ticker TEXT NOT NULL,
base_date TEXT NOT NULL, -- 기준일
fin_year INTEGER NOT NULL, -- 사용된 재무 연도
E REAL, -- 자기자본 시장가치
sigma_E REAL, -- 주가변동성
D REAL, -- 부도점
V REAL, -- 추정 자산가치
sigma_V REAL, -- 추정 자산변동성
DD REAL, -- Distance-to-Default
EDF REAL, -- Expected Default Frequency
leverage REAL, -- D/V
method TEXT, -- fsolve / iterative / naive_fallback
dd_rating TEXT, -- DD 기반 내재등급
PRIMARY KEY (ticker, base_date),
FOREIGN KEY (ticker) REFERENCES companies(ticker)
);
CREATE INDEX IF NOT EXISTS idx_merton_dd ON merton_results(DD);
CREATE INDEX IF NOT EXISTS idx_merton_rating ON merton_results(dd_rating);
-- ============================================================
-- 6. 신용등급 (실제 관측 등급)
-- ============================================================
CREATE TABLE IF NOT EXISTS credit_ratings (
ticker TEXT NOT NULL,
rating_date TEXT NOT NULL, -- 등급 확인일
agency TEXT, -- 한기평/한신평/나이스
rating TEXT, -- AAA, AA+, ... , D
source TEXT, -- DART공시 / 수동입력
PRIMARY KEY (ticker, rating_date, agency),
FOREIGN KEY (ticker) REFERENCES companies(ticker)
);
-- ============================================================
-- 7. 부도 이력
-- ============================================================
CREATE TABLE IF NOT EXISTS defaults (
ticker TEXT NOT NULL,
default_date TEXT NOT NULL, -- 부도 발생일
default_type TEXT, -- 법정관리/워크아웃/상장폐지/부도어음
description TEXT,
PRIMARY KEY (ticker, default_date),
FOREIGN KEY (ticker) REFERENCES companies(ticker)
);
-- ============================================================
-- 8. 등급별 부도율 최종 결과
-- ============================================================
CREATE TABLE IF NOT EXISTS default_rates (
base_date TEXT NOT NULL, -- 산출 기준일
rating_grade TEXT NOT NULL, -- 등급
n_firms INTEGER, -- 관측 기업수
n_defaults INTEGER, -- 부도 건수
korean_dr REAL, -- 한국 관측 부도율
global_dr REAL, -- 글로벌 벤치마크 부도율
weight_kr REAL, -- 한국 가중치
blended_dr REAL, -- 블렌딩 부도율
bayesian_dr REAL, -- 베이지안 사후 부도율
PRIMARY KEY (base_date, rating_grade)
);
-- ============================================================
-- 메타 정보
-- ============================================================
CREATE TABLE IF NOT EXISTS meta (
key TEXT PRIMARY KEY,
value TEXT
);
""")
# 메타 정보 기록
conn.execute(
"INSERT OR REPLACE INTO meta (key, value) VALUES (?, ?)",
("schema_version", "1.0")
)
conn.execute(
"INSERT OR REPLACE INTO meta (key, value) VALUES (?, ?)",
("created_at", datetime.now().isoformat())
)
conn.commit()
print(f"[DB] Schema initialized: {conn.execute('SELECT value FROM meta WHERE key=?', ('schema_version',)).fetchone()[0]}")
return conn
# ============================================================
# DataFrame ↔ SQLite 유틸리티
# ============================================================
def upsert_companies(conn: sqlite3.Connection, df: pd.DataFrame):
"""종목 마스터 upsert"""
now = datetime.now().isoformat()
for _, row in df.iterrows():
conn.execute("""
INSERT OR REPLACE INTO companies (ticker, name, market, sector, corp_code, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
""", (
str(row.get("ticker", "")),
str(row.get("name", "")),
str(row.get("market", "")),
str(row.get("sector", "")),
str(row.get("corp_code", "")),
now
))
conn.commit()
def upsert_market_data(conn: sqlite3.Connection, ticker: str, df: pd.DataFrame):
"""일별 시장 데이터 upsert (df.index = DatetimeIndex)"""
records = []
for date, row in df.iterrows():
date_str = date.strftime("%Y-%m-%d") if hasattr(date, 'strftime') else str(date)
records.append((
ticker, date_str,
float(row.get("종가", 0)),
float(row.get("시가총액", 0)) if "시가총액" in row else None,
int(row.get("거래량", 0)),
int(row.get("상장주식수", 0)) if "상장주식수" in row else None,
))
conn.executemany("""
INSERT OR REPLACE INTO market_data (ticker, date, close_price, market_cap, volume, shares)
VALUES (?, ?, ?, ?, ?, ?)
""", records)
conn.commit()
def upsert_financial(conn: sqlite3.Connection, df: pd.DataFrame):
"""재무제표 데이터 upsert"""
now = datetime.now().isoformat()
for idx, row in df.iterrows():
ticker = str(idx) if isinstance(idx, str) else str(row.get("ticker", idx))
conn.execute("""
INSERT OR REPLACE INTO financial_data
(ticker, year, report_type, total_assets, current_liabilities, non_current_liabilities,
total_liabilities, total_equity, revenue, operating_income, net_income, interest_expense,
default_point, leverage_ratio, interest_coverage, roa, log_assets, fetched_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
ticker,
int(row.get("year", 0)),
str(row.get("report_type", "annual")),
row.get("total_assets"),
row.get("current_liabilities"),
row.get("non_current_liabilities"),
row.get("total_liabilities"),
row.get("total_equity"),
row.get("revenue"),
row.get("operating_income"),
row.get("net_income"),
row.get("interest_expense"),
row.get("default_point"),
row.get("leverage_ratio"),
row.get("interest_coverage"),
row.get("roa"),
row.get("log_assets"),
now
))
conn.commit()
def upsert_merton_results(conn: sqlite3.Connection, df: pd.DataFrame, base_date: str, fin_year: int):
"""Merton DD/EDF 결과 upsert"""
for idx, row in df.iterrows():
ticker = str(idx)
conn.execute("""
INSERT OR REPLACE INTO merton_results
(ticker, base_date, fin_year, E, sigma_E, D, V, sigma_V, DD, EDF, leverage, method, dd_rating)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
ticker, base_date, fin_year,
row.get("E"), row.get("sigma_E"), row.get("D"),
row.get("V"), row.get("sigma_V"),
row.get("DD"), row.get("EDF"), row.get("leverage"),
row.get("method"), row.get("dd_rating")
))
conn.commit()
def load_merton_results(conn: sqlite3.Connection, base_date: str = None) -> pd.DataFrame:
"""Merton 결과 조회"""
if base_date:
query = "SELECT * FROM merton_results WHERE base_date = ?"
return pd.read_sql_query(query, conn, params=(base_date,), index_col="ticker")
else:
query = "SELECT * FROM merton_results"
return pd.read_sql_query(query, conn, index_col="ticker")
def get_stats(conn: sqlite3.Connection) -> dict:
"""DB 통계 조회"""
stats = {}
for table in ["companies", "market_data", "financial_data", "volatility",
"merton_results", "credit_ratings", "defaults", "default_rates"]:
try:
count = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0]
stats[table] = count
except Exception:
stats[table] = 0
return stats
# ---- CLI ----
if __name__ == "__main__":
conn = init_db()
stats = get_stats(conn)
print("\n[DB] Table stats:")
for table, count in stats.items():
print(f" {table}: {count} rows")
conn.close()

261
src/data/krx_fetcher.py Normal file
View File

@@ -0,0 +1,261 @@
"""
KRX 주가/시가총액 데이터 수집 모듈
pykrx를 사용하여 KRX 상장사의 일별 주가, 시가총액, 거래량 데이터를 수집합니다.
"""
import os
import time
import yaml
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from pathlib import Path
from tqdm import tqdm
try:
from pykrx import stock
except ImportError:
raise ImportError("pykrx가 설치되지 않았습니다. pip install pykrx 를 실행하세요.")
def load_config() -> dict:
"""config/settings.yaml 로드"""
config_path = Path(__file__).parent.parent.parent / "config" / "settings.yaml"
with open(config_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
def get_all_tickers(date: str, market: str = "ALL") -> pd.DataFrame:
"""
특정 날짜의 전 종목 티커/종목명 조회
Parameters
----------
date : str
조회 날짜 (YYYYMMDD)
market : str
시장 구분 (KOSPI, KOSDAQ, ALL)
Returns
-------
pd.DataFrame
columns: [ticker, name, market]
"""
tickers = stock.get_market_ticker_list(date, market=market)
records = []
for t in tickers:
name = stock.get_market_ticker_name(t)
records.append({"ticker": t, "name": name, "market": market})
return pd.DataFrame(records)
def get_market_cap_all(date: str, market: str = "ALL") -> pd.DataFrame:
"""
특정 날짜의 전 종목 시가총액 조회
Returns
-------
pd.DataFrame
index: ticker, columns: [종가, 시가총액, 거래량, 거래대금, 상장주식수]
"""
df = stock.get_market_cap(date, market=market)
df.index.name = "ticker"
return df
def get_ohlcv(ticker: str, start: str, end: str) -> pd.DataFrame:
"""
특정 종목의 일별 OHLCV 데이터 조회
Parameters
----------
ticker : str
종목코드 (6자리)
start : str
시작 날짜 (YYYYMMDD)
end : str
종료 날짜 (YYYYMMDD)
Returns
-------
pd.DataFrame
columns: [시가, 고가, 저가, 종가, 거래량]
"""
df = stock.get_market_ohlcv(start, end, ticker)
return df
def calculate_equity_volatility(prices: pd.Series,
method: str = "historical",
window: int = 252,
ewma_lambda: float = 0.94) -> float:
"""
주가 수익률 변동성 추정 (연환산)
Parameters
----------
prices : pd.Series
일별 종가 시리즈
method : str
추정 방법 (historical / ewma / garch)
window : int
추정 윈도우 (거래일)
ewma_lambda : float
EWMA lambda (method='ewma'일 때)
Returns
-------
float
연환산 변동성
"""
# 로그 수익률
log_returns = np.log(prices / prices.shift(1)).dropna()
if len(log_returns) < 30:
return np.nan
if method == "historical":
use_returns = log_returns.tail(window) if len(log_returns) >= window else log_returns
return float(use_returns.std() * np.sqrt(252))
elif method == "ewma":
variance = log_returns.iloc[0] ** 2
for ret in log_returns.iloc[1:]:
variance = ewma_lambda * variance + (1 - ewma_lambda) * ret ** 2
return float(np.sqrt(variance * 252))
elif method == "garch":
try:
from arch import arch_model
returns_pct = log_returns * 100
model = arch_model(returns_pct, vol='Garch', p=1, q=1, dist='normal')
result = model.fit(disp='off', show_warning=False)
cond_vol = result.conditional_volatility.iloc[-1] / 100
return float(cond_vol * np.sqrt(252))
except Exception:
# GARCH 실패 시 historical로 폴백
return calculate_equity_volatility(prices, "historical", window)
else:
raise ValueError(f"Unknown method: {method}")
def fetch_all_stock_data(target_date: str = None,
lookback_years: int = 2,
config: dict = None) -> dict:
"""
전 종목의 주가 데이터 및 변동성을 수집.
Parameters
----------
target_date : str
기준 날짜 (YYYYMMDD). None이면 최근 거래일.
lookback_years : int
주가 수집 기간 (년)
config : dict
설정 딕셔너리
Returns
-------
dict with keys:
'market_cap': 기준일 시가총액 DataFrame
'volatility': 종목별 변동성 DataFrame
'tickers': 종목 정보 DataFrame
"""
if config is None:
config = load_config()
if target_date is None:
target_date = datetime.now().strftime("%Y%m%d")
sleep_sec = config.get("krx", {}).get("sleep_seconds", 0.5)
vol_method = config.get("merton", {}).get("volatility_method", "historical")
vol_window = config.get("merton", {}).get("volatility_window", 252)
ewma_lam = config.get("merton", {}).get("ewma_lambda", 0.94)
min_trading = config.get("krx", {}).get("min_trading_days", 200)
# 시작일 계산
target_dt = datetime.strptime(target_date, "%Y%m%d")
start_dt = target_dt - timedelta(days=365 * lookback_years + 30)
start_date = start_dt.strftime("%Y%m%d")
print(f"[KRX] 기준일: {target_date}, 주가 수집 시작일: {start_date}")
# 1) 시가총액 조회
print("[KRX] 시가총액 조회 중...")
market_cap = get_market_cap_all(target_date)
time.sleep(sleep_sec)
# 2) 종목 리스트
print(f"[KRX] 총 {len(market_cap)}개 종목 확인")
# 3) 변동성 산출
vol_records = []
errors = []
print(f"[KRX] 종목별 변동성 산출 중 (method={vol_method})...")
for ticker in tqdm(market_cap.index, desc="변동성 산출"):
try:
ohlcv = get_ohlcv(ticker, start_date, target_date)
time.sleep(sleep_sec)
if len(ohlcv) < min_trading:
continue
prices = ohlcv["종가"]
prices = prices[prices > 0] # 0원 제거
if len(prices) < min_trading:
continue
sigma_e = calculate_equity_volatility(
prices, method=vol_method, window=vol_window, ewma_lambda=ewma_lam
)
vol_records.append({
"ticker": ticker,
"sigma_E": sigma_e,
"n_trading_days": len(prices),
"last_price": float(prices.iloc[-1]),
})
except Exception as e:
errors.append({"ticker": ticker, "error": str(e)})
continue
vol_df = pd.DataFrame(vol_records).set_index("ticker")
print(f"[KRX] 변동성 산출 완료: {len(vol_df)}개 종목 (에러: {len(errors)}건)")
return {
"market_cap": market_cap,
"volatility": vol_df,
"errors": errors,
}
def save_stock_data(data: dict, output_dir: str = None):
"""수집 결과를 CSV로 저장"""
if output_dir is None:
output_dir = Path(__file__).parent.parent.parent / "data" / "raw"
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d")
data["market_cap"].to_csv(output_dir / f"market_cap_{timestamp}.csv", encoding="utf-8-sig")
data["volatility"].to_csv(output_dir / f"volatility_{timestamp}.csv", encoding="utf-8-sig")
if data.get("errors"):
pd.DataFrame(data["errors"]).to_csv(
output_dir / f"fetch_errors_{timestamp}.csv", encoding="utf-8-sig", index=False
)
print(f"[KRX] 데이터 저장 완료: {output_dir}")
# ---- CLI 실행 ----
if __name__ == "__main__":
config = load_config()
data = fetch_all_stock_data(config=config)
save_stock_data(data)

1
src/models/__init__.py Normal file
View File

@@ -0,0 +1 @@
# models package

371
src/models/merton.py Normal file
View File

@@ -0,0 +1,371 @@
"""
Merton-KMV 모형 모듈
기업의 자기자본 시장가치와 주가변동성으로부터
자산가치/자산변동성을 추정하고 Distance-to-Default(DD) 및 EDF를 산출합니다.
"""
import numpy as np
import pandas as pd
from scipy.optimize import fsolve, brentq
from scipy.stats import norm
from pathlib import Path
import yaml
def solve_merton(E: float, sigma_E: float, D: float,
r: float, T: float = 1.0,
max_iter: int = 100, tol: float = 1e-6) -> dict:
"""
Merton 연립방정식을 풀어 자산가치(V)와 자산변동성(σ_V)을 반복 추정.
두 가지 방법을 시도:
1) scipy.fsolve (뉴턴법)
2) 실패 시 반복 대입법 (Iterative substitution)
Parameters
----------
E : float
자기자본 시장가치 (시가총액, 원 단위)
sigma_E : float
주가수익률 변동성 (연환산, 예: 0.30)
D : float
부도점 (= STD + 0.5 * LTD, 원 단위)
r : float
무위험이자율 (연, 예: 0.035)
T : float
시간 수평선 (년, 기본 1.0)
max_iter : int
반복 대입법 최대 반복 횟수
tol : float
수렴 허용 오차
Returns
-------
dict:
V: 추정 자산가치
sigma_V: 추정 자산변동성
d1, d2: Black-Scholes 파라미터
converged: 수렴 여부
method: 사용된 방법
"""
if E <= 0 or sigma_E <= 0 or D <= 0:
return {"V": np.nan, "sigma_V": np.nan, "d1": np.nan, "d2": np.nan,
"converged": False, "method": "invalid_input"}
# --- 방법 1: fsolve ---
def equations(params):
V, sigma_V = params
if V <= 0 or sigma_V <= 0:
return [1e10, 1e10]
d1 = (np.log(V / D) + (r + 0.5 * sigma_V**2) * T) / (sigma_V * np.sqrt(T))
d2 = d1 - sigma_V * np.sqrt(T)
eq1 = V * norm.cdf(d1) - D * np.exp(-r * T) * norm.cdf(d2) - E
eq2 = (V / E) * norm.cdf(d1) * sigma_V - sigma_E
return [eq1, eq2]
# 초기값
V0 = E + D
sigma_V0 = sigma_E * E / (E + D)
try:
sol, info, ier, msg = fsolve(equations, [V0, sigma_V0], full_output=True)
V_sol, sigma_V_sol = sol
if ier == 1 and V_sol > 0 and sigma_V_sol > 0:
d1 = (np.log(V_sol / D) + (r + 0.5 * sigma_V_sol**2) * T) / (sigma_V_sol * np.sqrt(T))
d2 = d1 - sigma_V_sol * np.sqrt(T)
return {
"V": V_sol, "sigma_V": sigma_V_sol,
"d1": d1, "d2": d2,
"converged": True, "method": "fsolve"
}
except Exception:
pass
# --- 방법 2: 반복 대입법 ---
sigma_V = sigma_V0
V = V0
for i in range(max_iter):
d1 = (np.log(V / D) + (r + 0.5 * sigma_V**2) * T) / (sigma_V * np.sqrt(T))
d2 = d1 - sigma_V * np.sqrt(T)
# V 업데이트: E = V*N(d1) - D*e^(-rT)*N(d2) → V = (E + D*e^(-rT)*N(d2)) / N(d1)
Nd1 = norm.cdf(d1)
Nd2 = norm.cdf(d2)
if Nd1 < 1e-10:
break
V_new = (E + D * np.exp(-r * T) * Nd2) / Nd1
# sigma_V 업데이트: sigma_E = (V/E)*N(d1)*sigma_V → sigma_V = sigma_E*E / (V*N(d1))
sigma_V_new = sigma_E * E / (V_new * Nd1)
if abs(V_new - V) / V < tol and abs(sigma_V_new - sigma_V) / sigma_V < tol:
d1 = (np.log(V_new / D) + (r + 0.5 * sigma_V_new**2) * T) / (sigma_V_new * np.sqrt(T))
d2 = d1 - sigma_V_new * np.sqrt(T)
return {
"V": V_new, "sigma_V": sigma_V_new,
"d1": d1, "d2": d2,
"converged": True, "method": "iterative"
}
V = V_new
sigma_V = sigma_V_new
# 수렴하지 않았지만 마지막 값 반환
d1 = (np.log(V / D) + (r + 0.5 * sigma_V**2) * T) / (sigma_V * np.sqrt(T))
d2 = d1 - sigma_V * np.sqrt(T)
return {
"V": V, "sigma_V": sigma_V,
"d1": d1, "d2": d2,
"converged": False, "method": "iterative_no_converge"
}
def calculate_dd(V: float, sigma_V: float, D: float,
mu: float = None, r: float = 0.035,
T: float = 1.0) -> float:
"""
Distance-to-Default 산출
Parameters
----------
V : float
자산가치
sigma_V : float
자산변동성
D : float
부도점
mu : float
자산 기대수익률 (None이면 r 사용)
r : float
무위험이자율
T : float
시간 수평선
"""
if mu is None:
mu = r
if D <= 0 or V <= 0 or sigma_V <= 0:
return np.nan
DD = (np.log(V / D) + (mu - 0.5 * sigma_V**2) * T) / (sigma_V * np.sqrt(T))
return DD
def calculate_edf(DD: float) -> float:
"""이론적 EDF 산출 (정규분포 가정)"""
if np.isnan(DD):
return np.nan
return norm.cdf(-DD)
def naive_dd(E: float, sigma_E: float, D: float,
mu: float = None, r: float = 0.035,
T: float = 1.0) -> dict:
"""
Bharath-Shumway 간편 DD (반복 추정 없이 직접 산출)
빠른 1차 필터링이나 반복추정 실패 시 대안으로 사용.
"""
if E <= 0 or sigma_E <= 0 or D <= 0:
return {"DD": np.nan, "EDF": np.nan, "V": np.nan, "sigma_V": np.nan}
if mu is None:
mu = r
V = E + D
sigma_V = (E / (E + D)) * sigma_E + (D / (E + D)) * (0.05 + 0.25 * sigma_E)
DD = (np.log(V / D) + (mu - 0.5 * sigma_V**2) * T) / (sigma_V * np.sqrt(T))
EDF = norm.cdf(-DD)
return {"DD": DD, "EDF": EDF, "V": V, "sigma_V": sigma_V}
def run_merton_for_all(market_data: pd.DataFrame,
financial_data: pd.DataFrame,
r: float = 0.035,
T: float = 1.0,
use_naive_fallback: bool = True) -> pd.DataFrame:
"""
전 종목에 대해 Merton 모형 실행.
Parameters
----------
market_data : pd.DataFrame
index=ticker, columns 포함: [시가총액, sigma_E]
(krx_fetcher에서 시가총액 + volatility 병합한 데이터)
financial_data : pd.DataFrame
index=ticker, columns 포함: [default_point, ...]
(dart_fetcher에서 수집한 재무 데이터)
r : float
무위험이자율
T : float
시간 수평선
use_naive_fallback : bool
Merton 수렴 실패 시 Naive DD 사용 여부
Returns
-------
pd.DataFrame
종목별 V, sigma_V, DD, EDF 등
"""
# 두 데이터셋 병합
common_tickers = market_data.index.intersection(financial_data.index)
print(f"[Merton] 공통 종목 수: {len(common_tickers)}")
results = []
for ticker in common_tickers:
mkt = market_data.loc[ticker]
fin = financial_data.loc[ticker]
E = mkt.get("시가총액", np.nan)
sigma_E = mkt.get("sigma_E", np.nan)
D = fin.get("default_point", np.nan)
if pd.isna(E) or pd.isna(sigma_E) or pd.isna(D) or D <= 0 or E <= 0:
continue
# Merton 풀이
sol = solve_merton(E, sigma_E, D, r, T)
if sol["converged"]:
V = sol["V"]
sigma_V = sol["sigma_V"]
DD = calculate_dd(V, sigma_V, D, r=r, T=T)
EDF = calculate_edf(DD)
method = sol["method"]
elif use_naive_fallback:
naive = naive_dd(E, sigma_E, D, r=r, T=T)
V = naive["V"]
sigma_V = naive["sigma_V"]
DD = naive["DD"]
EDF = naive["EDF"]
method = "naive_fallback"
else:
continue
results.append({
"ticker": ticker,
"E": E,
"sigma_E": sigma_E,
"D": D,
"V": V,
"sigma_V": sigma_V,
"DD": DD,
"EDF": EDF,
"leverage": D / V if V > 0 else np.nan,
"method": method,
})
df = pd.DataFrame(results).set_index("ticker")
print(f"[Merton] DD/EDF 산출 완료: {len(df)}개 종목")
print(f" - fsolve: {(df['method']=='fsolve').sum()}")
print(f" - iterative: {(df['method']=='iterative').sum()}")
print(f" - naive_fallback: {(df['method']=='naive_fallback').sum()}")
return df
# ---- 글로벌 벤치마크 등급별 부도율 ----
GLOBAL_DEFAULT_RATES = {
# Moody's 1983-2023 평균 1년 부도율 (근사치)
"AAA": 0.0000,
"AA+": 0.0002,
"AA": 0.0003,
"AA-": 0.0005,
"A+": 0.0006,
"A": 0.0008,
"A-": 0.0012,
"BBB+": 0.0020,
"BBB": 0.0035,
"BBB-": 0.0070,
"BB+": 0.0100,
"BB": 0.0180,
"BB-": 0.0300,
"B+": 0.0450,
"B": 0.0700,
"B-": 0.1100,
"CCC+": 0.1500,
"CCC": 0.2200,
"CCC-": 0.3000,
}
# DD → 등급 매핑 테이블 (글로벌 벤치마크 기반)
DD_RATING_MAP = [
(6.5, "AAA"),
(6.0, "AA+"),
(5.5, "AA"),
(5.0, "AA-"),
(4.5, "A+"),
(4.0, "A"),
(3.5, "A-"),
(3.2, "BBB+"),
(2.8, "BBB"),
(2.5, "BBB-"),
(2.2, "BB+"),
(1.8, "BB"),
(1.5, "BB-"),
(1.2, "B+"),
(0.9, "B"),
(0.6, "B-"),
(0.3, "CCC+"),
(0.0, "CCC"),
(-999, "CCC-"),
]
def dd_to_rating(dd: float) -> str:
"""DD 값을 신용등급으로 매핑"""
if np.isnan(dd):
return "NR"
for threshold, rating in DD_RATING_MAP:
if dd >= threshold:
return rating
return "CCC-"
def assign_dd_ratings(df: pd.DataFrame, dd_col: str = "DD") -> pd.DataFrame:
"""전 종목에 DD 기반 등급 부여"""
df = df.copy()
df["dd_rating"] = df[dd_col].apply(dd_to_rating)
return df
# ---- CLI 테스트 ----
if __name__ == "__main__":
# 단일 기업 테스트
print("=== Merton 모형 단일 테스트 ===")
# 예시: 시가총액 10조, 변동성 30%, 부도점 5조, 무위험 3.5%
E = 10_000_000_000_000 # 10조
sigma_E = 0.30
D = 5_000_000_000_000 # 5조
r = 0.035
sol = solve_merton(E, sigma_E, D, r)
print(f" V = {sol['V']/1e12:.2f}")
print(f" σ_V = {sol['sigma_V']:.4f}")
print(f" 수렴: {sol['converged']}, 방법: {sol['method']}")
DD = calculate_dd(sol['V'], sol['sigma_V'], D, r=r)
EDF = calculate_edf(DD)
rating = dd_to_rating(DD)
print(f" DD = {DD:.4f}")
print(f" EDF = {EDF:.6f} ({EDF*100:.4f}%)")
print(f" 내재등급 = {rating}")
# Naive DD 비교
naive = naive_dd(E, sigma_E, D, r=r)
print(f"\n=== Naive DD 비교 ===")
print(f" DD = {naive['DD']:.4f}")
print(f" EDF = {naive['EDF']:.6f} ({naive['EDF']*100:.4f}%)")
print(f" 내재등급 = {dd_to_rating(naive['DD'])}")