Files
LifetimePD/models/credit_cycle.py
Variet Agent 3a9374c61a feat: Lifetime PD (50yr) - Belkin & Suchower + Vasicek model
- Belkin & Suchower (1998) credit cycle index (Zt) estimation via WLS
- Vasicek single-factor conditional PD/TM model
- Macro-Zt OLS regression with stepwise variable selection
- 3-scenario (boom/neutral/recession) 50yr PD projection
- Statistical validation suite (ADF, Ljung-Box, R2, ARCH)
- BOK ECOS API integration with fallback data
- Visualization module (7 chart types)
- Detailed theoretical methodology docs/methodology.md
2026-03-10 21:57:34 +09:00

280 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Belkin & Suchower (1998) 신용사이클 인덱스 Zt 추정 모듈
핵심 방법론:
X_i = √ρ · Z + √(1-ρ) · Y_i
여기서:
X_i: 차입자 i의 신용도 변화 (표준정규)
Z: 체계적 요인 (credit cycle index, 표준정규)
Y_i: 개별적 요인 (표준정규, Z와 독립)
ρ: 자산상관계수
TTC 전이행렬의 누적확률 임계값을 Φ⁻¹로 변환한 후,
관측 연도별 전이행렬과 모델 전이행렬 사이의 WLS를 최소화하여 Zt 추정.
참고문헌:
- Belkin, B., Suchower, S., & Forest, L.R. (1998).
"A One-Parameter Representation of Credit Risk and Transition Matrices"
- Basel Committee on Banking Supervision (2005).
"An Explanatory Note on the Basel II IRB Risk Weight Functions"
"""
import numpy as np
from scipy.stats import norm
from scipy.optimize import minimize_scalar, minimize
from typing import Dict, Tuple, Optional
import logging
logger = logging.getLogger(__name__)
def compute_thresholds(ttc_matrix: np.ndarray) -> np.ndarray:
"""
TTC 전이행렬에서 등급 경계 임계값(thresholds) 산출
각 시작등급 i에 대해, 누적 전이확률의 역정규분포로 임계값 산출:
d_{i,j} = Φ⁻¹(Σ_{k≤j} p̄_{i,k})
Parameters
----------
ttc_matrix : np.ndarray
N×N TTC 전이행렬 (행 합 = 1)
Returns
-------
np.ndarray
N×N 임계값 행렬 (마지막 열은 항상 +∞)
"""
n = ttc_matrix.shape[0]
thresholds = np.full((n, n), np.inf)
for i in range(n):
cum_prob = 0.0
for j in range(n - 1):
cum_prob += ttc_matrix[i, j]
# 누적확률을 [1e-10, 1-1e-10] 범위로 클리핑 (Φ⁻¹ 발산 방지)
cum_prob_clipped = np.clip(cum_prob, 1e-10, 1.0 - 1e-10)
thresholds[i, j] = norm.ppf(cum_prob_clipped)
return thresholds
def model_transition_prob(
thresholds: np.ndarray,
z: float,
rho: float,
i: int,
j: int
) -> float:
"""
Z 조건부 전이확률 계산
p_{ij}(Z) = Φ((d_{i,j} - √ρ·Z) / √(1-ρ)) - Φ((d_{i,j-1} - √ρ·Z) / √(1-ρ))
Parameters
----------
thresholds : np.ndarray - 임계값 행렬
z : float - 신용사이클 인덱스
rho : float - 자산상관계수
i : int - 시작 등급 인덱스
j : int - 목표 등급 인덱스
Returns
-------
float : 조건부 전이확률
"""
sqrt_rho = np.sqrt(rho)
sqrt_1_rho = np.sqrt(1.0 - rho)
# 상한 임계값
d_upper = thresholds[i, j]
upper = norm.cdf((d_upper - sqrt_rho * z) / sqrt_1_rho)
# 하한 임계값 (j=0이면 -∞)
if j == 0:
lower = 0.0
else:
d_lower = thresholds[i, j - 1]
lower = norm.cdf((d_lower - sqrt_rho * z) / sqrt_1_rho)
return max(upper - lower, 0.0)
def model_transition_matrix(
thresholds: np.ndarray,
z: float,
rho: float
) -> np.ndarray:
"""
Z 조건부 전체 전이행렬 산출
"""
n = thresholds.shape[0]
tm = np.zeros((n, n))
for i in range(n - 1): # D행은 흡수상태
for j in range(n):
tm[i, j] = model_transition_prob(thresholds, z, rho, i, j)
# 행 합 정규화 (수치오차 보정)
row_sum = tm[i].sum()
if row_sum > 0:
tm[i] /= row_sum
# D행: 흡수상태
tm[-1, -1] = 1.0
return tm
def zt_objective(
z: float,
observed_tm: np.ndarray,
thresholds: np.ndarray,
rho: float,
weights: Optional[np.ndarray] = None
) -> float:
"""
Zt 추정을 위한 WLS 목적함수
minimize_Z Σ_{i,j} w_{ij} * (p_{ij}^{obs} - p_{ij}^{model}(Z))²
Parameters
----------
z : float - 신용사이클 인덱스 후보값
observed_tm : np.ndarray - 관측된 전이행렬
thresholds : np.ndarray - TTC 임계값
rho : float - 자산상관계수
weights : np.ndarray - 가중치 행렬 (기본: 부도열에 높은 가중치)
"""
n = observed_tm.shape[0]
if weights is None:
# 가중치: 부도열(D)에 10배 가중, 대각에 5배, 나머지 1배
weights = np.ones((n, n))
weights[:, -1] = 10.0 # 부도 전이확률에 높은 가중
for i in range(n):
weights[i, i] = 5.0 # 잔류 확률에도 가중
wss = 0.0
for i in range(n - 1): # D행 제외
for j in range(n):
p_obs = observed_tm[i, j]
p_model = model_transition_prob(thresholds, z, rho, i, j)
wss += weights[i, j] * (p_obs - p_model) ** 2
return wss
def estimate_zt(
observed_tm: np.ndarray,
thresholds: np.ndarray,
rho: float,
z_bounds: Tuple[float, float] = (-4.0, 4.0)
) -> float:
"""
단일 연도의 Zt 추정
scipy.optimize.minimize_scalar로 WLS 목적함수 최소화
Parameters
----------
observed_tm : np.ndarray - 해당 연도 관측 전이행렬
thresholds : np.ndarray - TTC 임계값
rho : float - 자산상관계수
z_bounds : tuple - Z 탐색 범위
Returns
-------
float : 추정된 Zt 값
"""
result = minimize_scalar(
zt_objective,
bounds=z_bounds,
method="bounded",
args=(observed_tm, thresholds, rho)
)
return result.x
def estimate_zt_series(
transition_matrices: Dict[int, np.ndarray],
ttc_matrix: np.ndarray,
rho: float = 0.20
) -> Dict[int, float]:
"""
전체 기간에 대한 Zt 시계열 추정
Parameters
----------
transition_matrices : Dict[int, np.ndarray]
연도별 관측 전이행렬
ttc_matrix : np.ndarray
TTC 전이행렬
rho : float
자산상관계수
Returns
-------
Dict[int, float]
{연도: Zt값} 딕셔너리
"""
logger.info("TTC 전이행렬에서 임계값 산출 중...")
thresholds = compute_thresholds(ttc_matrix)
zt_series = {}
years = sorted(transition_matrices.keys())
logger.info(f"Zt 시계열 추정 중 ({years[0]}-{years[-1]}, rho={rho})...")
for year in years:
observed_tm = transition_matrices[year]
z_hat = estimate_zt(observed_tm, thresholds, rho)
zt_series[year] = z_hat
logger.debug(f" {year}: Zt = {z_hat:+.4f}")
logger.info(f"Zt 추정 완료. 범위: [{min(zt_series.values()):.3f}, {max(zt_series.values()):.3f}]")
return zt_series
def estimate_rho_and_zt(
transition_matrices: Dict[int, np.ndarray],
ttc_matrix: np.ndarray,
rho_bounds: Tuple[float, float] = (0.05, 0.50)
) -> Tuple[float, Dict[int, float]]:
"""
자산상관계수 ρ와 Zt 시계열 동시 추정 (NLS)
총 목적함수 = Σ_t Σ_{i,j} w_{ij} * (p_{ij,t}^{obs} - p_{ij,t}^{model}(Z_t(ρ), ρ))²
외부 루프: ρ 탐색
내부 루프: 각 연도별 Zt 추정 (ρ 고정)
Returns
-------
Tuple[float, Dict[int, float]]
(최적 ρ, Zt 시계열)
"""
years = sorted(transition_matrices.keys())
def total_objective(rho):
thresholds = compute_thresholds(ttc_matrix)
total_wss = 0.0
for year in years:
observed_tm = transition_matrices[year]
z_hat = estimate_zt(observed_tm, thresholds, rho)
total_wss += zt_objective(z_hat, observed_tm, thresholds, rho)
return total_wss
logger.info(f"ρ 동시 추정 중 (범위: {rho_bounds})...")
result = minimize_scalar(total_objective, bounds=rho_bounds, method="bounded")
optimal_rho = result.x
logger.info(f"최적 ρ = {optimal_rho:.4f}")
# 최적 ρ로 Zt 재추정
zt_series = estimate_zt_series(transition_matrices, ttc_matrix, optimal_rho)
return optimal_rho, zt_series