Files
LifetimePD/data/macro_analysis.py
Variet Agent cc55acc330 feat(analysis): v3 re-analysis with 31 ECOS variables
Best model: CREDIT_SPREAD_LAG1 + USDKRW + HOUSING_PRICE (R²=0.646)
- 31 raw vars → 80+ features (DIFF/LAG1/PCT/spreads)
- 2267 valid combos searched (collinearity filtered)
- All Top 10 sign-consistent
- Zt sign corrected: Z+ = 호황 (Belkin convention)
2026-03-11 07:55:36 +09:00

367 lines
15 KiB
Python

"""
거시변수 재분석 v3 — 31변수 확장 + Zt 부호 수정반영
규칙:
1. 금리 변수: DIFF만 허용 (LEVEL/LOG 등 제외)
2. 지수/금액: 원본 + DIFF/PCT/LAG1
3. 이미 변화율 변수: 원본 + LAG1만
4. 계수 부호 경제적 일관성 체크
5. Zt: 2000~2025 (26obs)
Zt 부호 (Belkin 수정후): **양수 = 호황** (PD 하락), 음수 = 불황
경제적 부호 기대 (Zt↑ = 호황):
GDP_GROWTH: 양(+) — 성장 ↑ → 호황 → Zt ↑
UNEMPLOYMENT: 음(-) — 실업 ↑ → 불황 → Zt ↓
BASE_RATE_DIFF: 음(-) — 금리인상 → 긴축 → Zt ↓
CPI_GROWTH: 음(-) — 물가급등 → 구매력↓ → Zt ↓
LEADING_INDEX: 양(+) — 선행 ↑ → 호황 → Zt ↑
CREDIT_SPREAD: 음(-) — 스프레드↑ → 위험↑ → Zt ↓
EXPORT: 양(+) — 수출 ↑ → 호황 → Zt ↑
KOSPI: 양(+) — 주가↑ → 호황 → Zt ↑
OIL_PRICE: 음(-) — 유가↑ → 비용↑ → Zt ↓ (수입국)
DISHONOR_RATE: 음(-) — 부도율↑ → 불황 → Zt ↓
USDKRW: 음(-) — 원화약세 → 불황 → Zt ↓
BSI_MANUF: 양(+) — BSI↑ → 경기전망↑ → Zt ↑
CSI: 양(+) — 소비심리↑ → 호황 → Zt ↑
"""
import sys, io, itertools
import numpy as np, pandas as pd
import statsmodels.api as sm
from scipy import stats
from pathlib import Path
if sys.stdout.encoding != 'utf-8':
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
import warnings
warnings.filterwarnings("ignore")
BASE_DIR = Path(__file__).parent.parent
# Zt 부호: 양수=호황 (Belkin 수정후)
# 각 변수가 증가할때 Zt가 어느 방향으로 움직여야 하는지
EXPECTED_SIGNS = {
# --- 성장/경기 ---
"GDP_GROWTH": +1, "GDP_GROWTH_LAG1": +1,
"LEADING_INDEX": +1, "LEADING_INDEX_DIFF": +1, "LEADING_INDEX_LAG1": +1,
"COINCIDENT": +1, "COINCIDENT_DIFF": +1, "COINCIDENT_LAG1": +1,
"BSI_MANUF": +1, "BSI_MANUF_LAG1": +1,
# --- 고용 ---
"UNEMPLOYMENT": -1, "UNEMPLOYMENT_LAG1": -1, "UNEMPLOYMENT_DIFF": -1,
"EMPLOYED": +1, "EMPLOYED_DIFF": +1, "EMPLOYED_PCT": +1, "EMPLOYED_LAG1": +1,
"EMPLOYMENT_RATE": +1, "EMPLOYMENT_RATE_DIFF": +1, "EMPLOYMENT_RATE_LAG1": +1,
# --- 금리 차분 ---
"BASE_RATE_DIFF": -1, "CD_RATE_DIFF": -1,
"GOVT_3Y_DIFF": -1, "GOVT_10Y_DIFF": -1,
"CORP_AA_DIFF": -1, "CORP_BBB_DIFF": -1,
# 금리 래그 (레벨): 부호 방향 불확실 → 제약 없음
"BASE_RATE_LAG1": 0, "CD_RATE_LAG1": 0,
"GOVT_3Y_LAG1": 0, "GOVT_10Y_LAG1": 0,
"CORP_AA_LAG1": 0, "CORP_BBB_LAG1": 0,
# --- 물가 ---
"CPI_GROWTH": -1, "CPI_GROWTH_LAG1": -1,
"IMPORT_PRICE": 0, "IMPORT_PRICE_DIFF": -1, "IMPORT_PRICE_PCT": -1,
"IMPORT_PRICE_LAG1": 0,
"OIL_PRICE": -1, "OIL_PRICE_DIFF": -1, "OIL_PRICE_PCT": -1, "OIL_PRICE_LAG1": -1,
# --- 스프레드/파생 ---
"CREDIT_SPREAD": -1, "CREDIT_SPREAD_DIFF": -1, "CREDIT_SPREAD_LAG1": -1,
"TERM_SPREAD": 0, "TERM_SPREAD_DIFF": 0, "TERM_SPREAD_LAG1": 0,
"REAL_RATE": 0, "REAL_RATE_DIFF": 0,
# --- 교역 ---
"EXPORT_PCT": +1, "EXPORT_DIFF": +1,
"IMPORT_AMT_PCT": -1, "IMPORT_AMT_DIFF": -1,
"TRADE_BALANCE": +1, "TRADE_BALANCE_DIFF": +1,
"CURRENT_ACCOUNT": +1, "CURRENT_ACCOUNT_DIFF": +1, "CURRENT_ACCOUNT_LAG1": +1,
# --- 금융 ---
"USDKRW": -1, "USDKRW_DIFF": -1, "USDKRW_PCT": -1, "USDKRW_LAG1": -1,
"M2_PCT": 0,
"KOSPI": +1, "KOSPI_PCT": +1, "KOSPI_DIFF": +1, "KOSPI_LAG1": +1,
"DISHONOR_RATE": -1, "DISHONOR_RATE_DIFF": -1, "DISHONOR_RATE_LAG1": -1,
# --- 소비/심리 ---
"CSI": +1, "CSI_DIFF": +1, "CSI_LAG1": +1,
"RETAIL_SALES": +1, "RETAIL_SALES_DIFF": +1, "RETAIL_SALES_PCT": +1, "RETAIL_SALES_LAG1": +1,
# --- 투자/생산 ---
"IPI": +1, "IPI_DIFF": +1, "IPI_LAG1": +1,
"SPI": +1, "SPI_DIFF": +1, "SPI_LAG1": +1,
"FACILITY_INVEST": +1, "FACILITY_INVEST_DIFF": +1, "FACILITY_INVEST_PCT": +1, "FACILITY_INVEST_LAG1": +1,
# --- 부동산/가계 ---
"HOUSING_PRICE": 0, "HOUSING_PRICE_DIFF": 0, "HOUSING_PRICE_LAG1": 0,
"HOUSEHOLD_DEBT": 0, "HOUSEHOLD_DEBT_PCT": 0,
"CONSTRUCTION_DONE": 0, "CONSTRUCTION_DONE_DIFF": 0,
}
# 금리 변수 목록 (DIFF만 허용)
RATE_VARS = {"BASE_RATE", "CD_RATE", "GOVT_3Y", "GOVT_10Y", "CORP_AA", "CORP_BBB"}
# 이미 변화율/지수인 변수 (원본 + LAG1만)
ALREADY_RATE_VARS = {"GDP_GROWTH", "CPI_GROWTH", "UNEMPLOYMENT", "EMPLOYMENT_RATE"}
# 지수형 변수 (원본 + DIFF + LAG1)
INDEX_VARS = {"LEADING_INDEX", "COINCIDENT", "BSI_MANUF", "CSI", "IPI", "SPI",
"RETAIL_SALES", "FACILITY_INVEST", "IMPORT_PRICE", "HOUSING_PRICE"}
# 금액형 변수 (DIFF + PCT)
AMOUNT_VARS = {"EXPORT", "IMPORT_AMT", "M2", "HOUSEHOLD_DEBT", "CONSTRUCTION_DONE", "EMPLOYED"}
# 가격형 (원본 + DIFF + PCT + LAG1)
PRICE_VARS = {"USDKRW", "OIL_PRICE", "KOSPI"}
def build_features(raw: pd.DataFrame) -> pd.DataFrame:
"""31개 원본 → 파생변수 생성"""
feat = {}
for col in raw.columns:
s = raw[col].sort_index()
if col in RATE_VARS:
feat[f"{col}_DIFF"] = s.diff()
feat[f"{col}_LAG1"] = s.shift(1)
elif col in ALREADY_RATE_VARS:
feat[col] = s
feat[f"{col}_LAG1"] = s.shift(1)
elif col in INDEX_VARS:
feat[col] = s
feat[f"{col}_DIFF"] = s.diff()
feat[f"{col}_LAG1"] = s.shift(1)
elif col in AMOUNT_VARS:
feat[f"{col}_DIFF"] = s.diff()
feat[f"{col}_PCT"] = s.pct_change() * 100
elif col in PRICE_VARS:
feat[col] = s
feat[f"{col}_DIFF"] = s.diff()
feat[f"{col}_PCT"] = s.pct_change() * 100
feat[f"{col}_LAG1"] = s.shift(1)
elif col == "DISHONOR_RATE":
feat[col] = s
feat[f"{col}_DIFF"] = s.diff()
feat[f"{col}_LAG1"] = s.shift(1)
elif col == "CURRENT_ACCOUNT":
feat[col] = s
feat[f"{col}_DIFF"] = s.diff()
feat[f"{col}_LAG1"] = s.shift(1)
# 파생 변수
if "CORP_BBB" in raw.columns and "CORP_AA" in raw.columns:
cs = raw["CORP_BBB"] - raw["CORP_AA"]
feat["CREDIT_SPREAD"] = cs
feat["CREDIT_SPREAD_DIFF"] = cs.diff()
feat["CREDIT_SPREAD_LAG1"] = cs.shift(1)
if "GOVT_10Y" in raw.columns and "BASE_RATE" in raw.columns:
ts = raw["GOVT_10Y"] - raw["BASE_RATE"]
feat["TERM_SPREAD"] = ts
feat["TERM_SPREAD_DIFF"] = ts.diff()
feat["TERM_SPREAD_LAG1"] = ts.shift(1)
if "BASE_RATE" in raw.columns and "CPI_GROWTH" in raw.columns:
feat["REAL_RATE"] = raw["BASE_RATE"] - raw["CPI_GROWTH"]
feat["REAL_RATE_DIFF"] = feat["REAL_RATE"].diff()
if "EXPORT" in raw.columns and "IMPORT_AMT" in raw.columns:
tb = raw["EXPORT"] - raw["IMPORT_AMT"]
feat["TRADE_BALANCE"] = tb
feat["TRADE_BALANCE_DIFF"] = tb.diff()
return pd.DataFrame(feat).dropna(axis=1, thresh=15)
def check_sign_consistency(combo_vars, coefficients):
"""계수 부호 경제적 일관성 검사"""
issues = []
all_ok = True
for var, coef in zip(combo_vars, coefficients):
expected = EXPECTED_SIGNS.get(var, 0)
if expected == 0:
continue
actual_sign = +1 if coef > 0 else -1
if actual_sign != expected:
all_ok = False
direction = "양(+)" if expected > 0 else "음(-)"
issues.append(f"{var}: expected {direction}, got {coef:+.3f}")
return all_ok, issues
def main():
print("=" * 70)
print(" 거시변수 재분석 v3 — 31변수 확장 + Zt 부호 수정")
print(" Zt: 양수=호황(Belkin), 2000~2025")
print("=" * 70)
# Zt
sys.path.insert(0, str(BASE_DIR))
from data.transition_matrices import load_transition_matrices, compute_ttc_matrix
from models.credit_cycle import estimate_zt_series
tm = load_transition_matrices("real")
ttc = compute_ttc_matrix(tm)
zt_full = estimate_zt_series(tm, ttc, rho=0.20)
zt_series = pd.Series(zt_full, name="Zt")
zt_series = zt_series[(zt_series.index >= 2000) & (zt_series.index <= 2025)]
print(f"\n Zt: {len(zt_series)}obs ({zt_series.index.min()}~{zt_series.index.max()})")
print(f" Zt 부호 확인: 1998={zt_full.get(1998, 'N/A'):.3f} (위기=음수 OK?)")
print(f" 2006={zt_full.get(2006, 'N/A'):.3f} (호황=양수 OK?)")
# 31변수 로딩 (캐시)
from data.ecos_fetcher import load_macro_data
raw = load_macro_data(2000, 2025)
print(f"\n 원본 변수: {len(raw.columns)}")
features = build_features(raw)
features = features[(features.index >= 2000) & (features.index <= 2025)]
print(f" 파생 포함: {len(features.columns)}")
print(f" 변수: {', '.join(sorted(features.columns))}")
# 상관분석
common = sorted(set(zt_series.index) & set(features.index))
zt = zt_series.loc[common]
print(f"\n === Top 30 상관 (|r|) ===")
corrs = []
for col in features.columns:
s = features.loc[common, col].dropna()
valid = s.index.intersection(zt.index)
if len(valid) < 12:
continue
r, p = stats.pearsonr(zt.loc[valid], s.loc[valid])
exp = EXPECTED_SIGNS.get(col, 0)
sign_ok = "OK" if (exp == 0 or (r > 0 and exp > 0) or (r < 0 and exp < 0)) else "WRONG"
corrs.append({"var": col, "r": r, "p": p, "abs_r": abs(r), "sign": sign_ok, "n": len(valid)})
corrs = sorted(corrs, key=lambda x: x["abs_r"], reverse=True)
print(f" {'Variable':30s} {'r':>8} {'p':>8} {'Sign':>6} {'n':>4}")
for c in corrs[:30]:
sig = "***" if c["p"] < 0.01 else ("**" if c["p"] < 0.05 else ("*" if c["p"] < 0.1 else ""))
print(f" {c['var']:30s} {c['r']:>7.4f}{sig:<1} {c['p']:>7.4f} {c['sign']:>6} {c['n']:>4}")
# 부호 OK인 변수만 후보
sign_ok_vars = [c["var"] for c in corrs if c["sign"] == "OK" and c["abs_r"] > 0.15]
print(f"\n 부호 일관 + |r|>0.15 후보: {len(sign_ok_vars)}")
for v in sign_ok_vars:
c = next(x for x in corrs if x["var"] == v)
print(f" {v:30s} r={c['r']:+.4f}")
# 3변수 탐색
print(f"\n === 3변수 Exhaustive Search (부호 검증 포함) ===")
top_n = min(30, len(sign_ok_vars))
candidates = sign_ok_vars[:top_n]
print(f" 후보 {top_n}개에서 C({top_n},3)={len(list(itertools.combinations(range(top_n), 3)))} 조합 탐색")
results = []
for combo in itertools.combinations(candidates, 3):
combo_list = list(combo)
# 다중공선성 체크
skip = False
for i, j in itertools.combinations(range(3), 2):
s1 = features.loc[common, combo_list[i]].dropna()
s2 = features.loc[common, combo_list[j]].dropna()
ci = s1.index.intersection(s2.index)
if len(ci) > 5 and abs(s1.loc[ci].corr(s2.loc[ci])) > 0.80:
skip = True
break
if skip:
continue
X_df = features.loc[common, combo_list].dropna()
valid_idx = X_df.index.intersection(zt.index)
if len(valid_idx) < 15:
continue
y = zt.loc[valid_idx].values
X = X_df.loc[valid_idx].values
Xm, Xs = X.mean(0), X.std(0)
Xs[Xs < 1e-10] = 1
Xn = (X - Xm) / Xs
try:
model = sm.OLS(y, sm.add_constant(Xn)).fit()
except:
continue
sign_ok, sign_issues = check_sign_consistency(combo_list, model.params[1:])
results.append({
"vars": combo_list,
"r2": model.rsquared,
"adj_r2": model.rsquared_adj,
"aic": model.aic,
"f_p": model.f_pvalue,
"dw": sm.stats.durbin_watson(model.resid),
"sign_ok": sign_ok,
"sign_issues": sign_issues,
"pvalues": model.pvalues[1:].tolist(),
"coeffs": model.params[1:].tolist(),
})
results.sort(key=lambda x: (-x["sign_ok"], -x["adj_r2"]))
print(f"\n 검색: {len(results)} 유효 조합 (공선성 제거 후)")
print(f"\n === Top 10 (부호 일관 + adj.R² 기준) ===")
print(f" {'#':>3} {'R2':>7} {'adjR2':>7} {'AIC':>7} {'DW':>5} {'Sign':>5} | {'Variables (coefficient)'}")
for i, res in enumerate(results[:10]):
vars_info = " + ".join([
f"{v}({c:+.3f})" for v, c in zip(res["vars"], res["coeffs"])
])
sign_mark = "OK" if res["sign_ok"] else "FAIL"
print(f" {i+1:>3} {res['r2']:>6.4f} {res['adj_r2']:>6.4f} {res['aic']:>6.1f} {res['dw']:>5.2f} {sign_mark:>5} | {vars_info}")
if res["sign_issues"]:
for issue in res["sign_issues"]:
print(f" SIGN: {issue}")
# 최적 모형 상세
best_sign_ok = [r for r in results if r["sign_ok"]]
if best_sign_ok:
best = best_sign_ok[0]
print(f"\n {'='*60}")
print(f" 최적 모형 (부호 일관)")
print(f" {'='*60}")
print(f" Variables: {best['vars']}")
print(f" R² = {best['r2']:.4f}, Adj.R² = {best['adj_r2']:.4f}")
print(f" AIC = {best['aic']:.2f}, DW = {best['dw']:.3f}")
print(f" F p-value = {best['f_p']:.6f}")
X_df = features.loc[common, best["vars"]].dropna()
valid_idx = X_df.index.intersection(zt.index)
y = zt.loc[valid_idx].values
X = X_df.loc[valid_idx].values
Xm, Xs = X.mean(0), X.std(0)
Xs[Xs < 1e-10] = 1
Xn = (X - Xm) / Xs
model = sm.OLS(y, sm.add_constant(Xn)).fit()
print(f"\n{model.summary()}")
# R² > 0.7 필터
high_r2 = [r for r in results if r["sign_ok"] and r["r2"] >= 0.7]
if high_r2:
print(f"\n === R² ≥ 0.7 조합 ({len(high_r2)}개) ===")
for i, r in enumerate(high_r2[:10]):
vi = " + ".join([f"{v}({c:+.3f})" for v, c in zip(r["vars"], r["coeffs"])])
print(f" {i+1:>3} R²={r['r2']:.4f} adjR²={r['adj_r2']:.4f} DW={r['dw']:.2f} | {vi}")
else:
print(f"\n R² ≥ 0.7 조합: 없음 (top adj.R² = {best_sign_ok[0]['adj_r2']:.4f})")
# CSV 저장
out = BASE_DIR / "results"
out.mkdir(exist_ok=True)
pd.DataFrame([{
"rank": i+1, "vars": " + ".join(r["vars"]),
"r2": round(r["r2"], 4), "adj_r2": round(r["adj_r2"], 4),
"aic": round(r["aic"], 2), "dw": round(r["dw"], 3),
"sign_ok": r["sign_ok"],
"coeffs": " / ".join([f"{c:+.3f}" for c in r["coeffs"]]),
} for i, r in enumerate(results[:30])]).to_csv(
out / "macro_top30_combos.csv", index=False
)
print(f"\n Top 30 저장: {out / 'macro_top30_combos.csv'}")
if __name__ == "__main__":
main()