Files
LifetimePD/data/parse_pdf_matrices.py

652 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
3사 전이행렬 PDF → CSV 변환 스크립트
한국기업평가(KR), NICE신용평가, 한신평(SCI) PDF에서
연도별 1년 전이행렬을 추출하여 8×8 CSV로 저장합니다.
후처리:
1. WR(등급취소) 열 제거 → 나머지 비례 재배분
2. B이하 → B 매핑
3. CCC 행/열: 등급간 PD 패턴으로 extrapolation
4. D 행: [0,...,0,1] 흡수상태
5. 행합 정규화 = 1.0
사용법:
python data/parse_pdf_matrices.py
"""
import sys
import io
import re
import numpy as np
import pandas as pd
import pdfplumber
from pathlib import Path
from typing import Dict, List, Optional
# Windows CP949
if sys.stdout.encoding != 'utf-8':
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
MODEL_GRADES_7 = ["AAA", "AA", "A", "BBB", "BB", "B", "D"]
MODEL_GRADES_8 = ["AAA", "AA", "A", "BBB", "BB", "B", "CCC", "D"]
GRADE_LABELS = ["AAA", "AA", "A", "BBB", "BB"] # B이하 is separate
BASE_DIR = Path(__file__).parent.parent
DOC_DIR = BASE_DIR / "doc"
OUTPUT_DIR = BASE_DIR / "data" / "real"
PDF_FILES = {
"KR": DOC_DIR / "260120143004692_KR 제출자료(2026년1월20일)_신용등급변화표(1년,3년).pdf",
"NICE": DOC_DIR / "260122103003349_NICE신용평가_2025년_신용등급변화표_202601.pdf",
"SCI": DOC_DIR / "260127134503220_1. 신용등급변화표_2025년.pdf",
}
def _fix_cell(cell: str) -> float:
"""셀 값 정리: 공백분리 숫자 ('9 3.10' → 93.10), 빈문자열/None → 0"""
if cell is None or cell.strip() == '' or cell.strip() == '-':
return 0.0
# 공백 제거
cleaned = cell.replace(' ', '')
try:
return float(cleaned)
except ValueError:
return 0.0
def _is_grade_label(text: str) -> Optional[str]:
"""등급 라벨 식별 — 'AAA', 'AA', 'A', 'BBB', 'BB', 'B이하'"""
if text is None:
return None
t = text.strip()
if t in GRADE_LABELS:
return t
# B이하/B하 — 인코딩 깨진 경우도 처리
if t.startswith('B') and t not in ['BB', 'BBB'] and len(t) > 1:
return "B_below"
return None
# ============================================================
# 테이블 기반 파서 (SCI / KR 용)
# ============================================================
def parse_via_tables(pdf_path: Path) -> Dict[int, np.ndarray]:
"""pdfplumber 테이블 추출로 전이행렬 파싱 (열 위치 보존)"""
matrices = {}
pdf = pdfplumber.open(pdf_path)
for page in pdf.pages:
tables = page.extract_tables()
for table in tables:
_extract_from_table(table, matrices)
pdf.close()
return matrices
def _extract_from_table(table: List[List], matrices: Dict[int, np.ndarray]):
"""하나의 pdfplumber 테이블에서 연도별 행렬 추출"""
rows = table
n_rows = len(rows)
if n_rows < 8:
return
i = 0
while i < n_rows:
row = rows[i]
# 연도 헤더 탐지: 셀에 "YYYY" + 비숫자 (단, ~ 없음)
year = _detect_year_in_row(row)
if year is not None and 1998 <= year <= 2025:
# 헤더 행 찾기 (AAA, AA, A, ...)
# 다음 6행이 데이터
mat = _parse_table_block(rows, i, n_rows)
if mat is not None:
matrices[year] = mat
i += 1
def _detect_year_in_row(row: List) -> Optional[int]:
"""테이블 행에서 단독 연도 탐지 (다년도 ~는 제외)"""
for cell in row:
if cell is None:
continue
text = str(cell).strip()
if '~' in text:
return None
m = re.search(r'(\d{4})', text)
if m:
year = int(m.group(1))
if 1998 <= year <= 2025:
return year
return None
def _parse_table_block(rows: List[List], start: int, total: int) -> Optional[np.ndarray]:
"""테이블에서 현재 위치부터 6개 등급 행 추출"""
# 헤더 행 (AAA, AA, A, ...) 찾기
header_idx = None
for j in range(start, min(start + 5, total)):
if any(str(c).strip() == 'AAA' for c in rows[j] if c):
header_idx = j
break
if header_idx is None:
return None
# 열 인덱스 매핑 (AAA, AA, A, BBB, BB, B이하, D, WR)
header = rows[header_idx]
col_map = {}
for ci, cell in enumerate(header):
if cell is None:
continue
t = str(cell).strip()
if t == 'AAA':
col_map['AAA'] = ci
elif t == 'AA':
col_map['AA'] = ci
elif t == 'A':
col_map['A'] = ci
elif t == 'BBB':
col_map['BBB'] = ci
elif t == 'BB':
col_map['BB'] = ci
elif t == 'D':
col_map['D'] = ci
elif t == 'WR':
col_map['WR'] = ci
elif t.startswith('B') and t not in ['BB', 'BBB']:
col_map['B_below'] = ci
required_cols = ['AAA', 'AA', 'A', 'BBB', 'BB', 'B_below', 'D', 'WR']
if not all(c in col_map for c in required_cols):
return None
# 데이터 행 추출 (header 다음부터)
mat = np.zeros((6, 8))
grade_idx = {"AAA": 0, "AA": 1, "A": 2, "BBB": 3, "BB": 4, "B_below": 5}
col_order = ['AAA', 'AA', 'A', 'BBB', 'BB', 'B_below', 'D', 'WR']
found_grades = set()
for j in range(header_idx + 1, min(header_idx + 15, total)):
row = rows[j]
# 등급 식별 (col 0 or 1)
grade = None
for ci in range(min(2, len(row))):
g = _is_grade_label(str(row[ci]) if row[ci] else '')
if g:
grade = g
break
if grade is None:
# 빈 행이면 이전 등급 컨텍스트 체크 — skip
continue
if grade in found_grades:
continue
if grade not in grade_idx:
continue
ri = grade_idx[grade]
for ci_name, ci_col in enumerate(col_order):
src_col = col_map[ci_col]
if src_col < len(row):
mat[ri, ci_name] = _fix_cell(str(row[src_col]) if row[src_col] else '')
found_grades.add(grade)
if len(found_grades) < 6:
return None
# 유효성: 행합 ~100
for ri in range(6):
s = mat[ri].sum()
if s < 30 or s > 110:
return None
return mat
# ============================================================
# NICE 텍스트 기반 파서 (숫자가 깔끔한 형태)
# ============================================================
def parse_nice(pdf_path: Path) -> Dict[int, np.ndarray]:
"""NICE PDF — clean numeric format, text-based"""
matrices = {}
pdf = pdfplumber.open(pdf_path)
for page in pdf.pages:
text = page.extract_text()
if not text:
continue
lines = text.split('\n')
i = 0
while i < len(lines):
line = lines[i].strip()
year_match = re.match(r'^(\d{4})\S', line)
if year_match:
year = int(year_match.group(1))
if '~' not in line and 1998 <= year <= 2025:
block = lines[i:i+15]
matrix = _extract_nice_matrix(block)
if matrix is not None:
matrices[year] = matrix
i += 1
pdf.close()
return matrices
def _extract_nice_matrix(block_lines: List[str]) -> Optional[np.ndarray]:
"""NICE에서 6×8 행렬 추출 (clean 8-number format)"""
matrix_rows = {}
for line in block_lines:
stripped = line.strip()
for grade in ["AAA", "BBB", "BB"]:
pat = re.match(rf'^{grade}\s+([\d.]+(?:\s+[\d.]+)*)', stripped)
if pat:
nums = [float(x) for x in pat.group(1).split()]
if len(nums) >= 6:
matrix_rows[grade] = nums[:8]
break
else:
# AA (not AAA)
pat = re.match(r'^AA\s+(?!A)([\d.]+(?:\s+[\d.]+)*)', stripped)
if pat:
nums = [float(x) for x in pat.group(1).split()]
if len(nums) >= 6:
matrix_rows["AA"] = nums[:8]
continue
# A (not AA/AAA)
pat = re.match(r'^A\s+(?!A)([\d.]+(?:\s+[\d.]+)*)', stripped)
if pat:
nums = [float(x) for x in pat.group(1).split()]
if len(nums) >= 6:
matrix_rows["A"] = nums[:8]
continue
# B이하
pat = re.match(r'^B[^\w\s]?\S*\s+([\d.]+(?:\s+[\d.]+)*)', stripped)
if pat and not stripped.startswith("BB") and not stripped.startswith("BBB"):
nums = [float(x) for x in pat.group(1).split()]
if len(nums) >= 6:
matrix_rows["B_below"] = nums[:8]
required = ["AAA", "AA", "A", "BBB", "BB", "B_below"]
if not all(g in matrix_rows for g in required):
return None
mat = np.zeros((6, 8))
for idx, grade in enumerate(required):
vals = matrix_rows[grade]
for j in range(min(len(vals), 8)):
mat[idx, j] = vals[j]
for idx in range(6):
s = mat[idx].sum()
if s < 30 or s > 110:
return None
return mat
# ============================================================
# KR 텍스트 기반 파서 (공백 분리 숫자 + 대시)
# ============================================================
def parse_kr(pdf_path: Path) -> Dict[int, np.ndarray]:
"""KR PDF — space-separated numbers, dashes for zeros, always 8 columns"""
matrices = {}
pdf = pdfplumber.open(pdf_path)
full_text = ""
for page in pdf.pages:
text = page.extract_text()
if text:
full_text += text + "\n"
pdf.close()
lines = full_text.split('\n')
i = 0
while i < len(lines):
line = lines[i].strip()
year_match = re.match(r'^(\d{4})[^\d~]', line)
if year_match and '~' not in line:
year = int(year_match.group(1))
if 1998 <= year <= 2025:
block = lines[i:i+20]
matrix = _extract_kr_matrix(block)
if matrix is not None:
matrices[year] = matrix
i += 1
return matrices
def _extract_kr_matrix(block_lines: List[str]) -> Optional[np.ndarray]:
"""KR에서 6×8 행렬 추출 (dash + space-separated nums)"""
matrix_rows = {}
for line in block_lines:
stripped = line.strip()
grade = None
rest = None
for g in ["AAA", "BBB", "BB"]:
pat = re.match(rf'^{g}\s+(.*)', stripped)
if pat:
grade = g
rest = pat.group(1)
break
if grade is None:
pat = re.match(r'^AA\s+(?!A)(.*)', stripped)
if pat:
grade = "AA"
rest = pat.group(1)
if grade is None:
pat = re.match(r'^A\s+(?!A)(.*)', stripped)
if pat:
grade = "A"
rest = pat.group(1)
if grade is None:
# B이하 — B + non-ascii
pat = re.match(r'^B[^\w\s]?\s*(.*)', stripped)
if pat and not stripped.startswith("BB") and not stripped.startswith("BBB"):
rest_raw = pat.group(1)
rest_cleaned = re.sub(r'^[^\d\s.-]+\s*', '', rest_raw)
if rest_cleaned and (re.search(r'\d', rest_cleaned) or '-' in rest_cleaned):
grade = "B_below"
rest = rest_cleaned
if grade is None or rest is None or grade in matrix_rows:
continue
values = _parse_kr_numbers(rest)
if values is not None and len(values) == 8:
matrix_rows[grade] = values
required = ["AAA", "AA", "A", "BBB", "BB", "B_below"]
if not all(g in matrix_rows for g in required):
return None
mat = np.zeros((6, 8))
for idx, grade in enumerate(required):
mat[idx] = matrix_rows[grade]
for idx in range(6):
s = mat[idx].sum()
if s < 30 or s > 110:
return None
return mat
def _parse_kr_numbers(s: str) -> Optional[List[float]]:
"""KR 숫자열 파싱 — 8개 토큰 (숫자 or 대시)"""
s = s.strip()
if not s:
return None
results = []
pos = 0
n = len(s)
while pos < n and len(results) < 8:
# 공백 스킵
while pos < n and s[pos] == ' ':
pos += 1
if pos >= n:
break
# 대시 → 0
if s[pos] == '-':
results.append(0.0)
pos += 1
continue
# 소수점 포함 숫자 찾기
dot_pos = None
scan = pos
while scan < n:
if s[scan] == '.':
dot_pos = scan
break
elif s[scan] in '0123456789 ':
scan += 1
else:
break
if dot_pos is None:
# 숫자만 있는 경우
num_str = ''
while pos < n and s[pos].isdigit():
num_str += s[pos]
pos += 1
if num_str:
results.append(float(num_str))
elif pos < n:
pos += 1
continue
int_part = ''.join(c for c in s[pos:dot_pos] if c.isdigit())
dec_part = ''
j = dot_pos + 1
while j < n and len(dec_part) < 2:
if s[j].isdigit():
dec_part += s[j]
j += 1
elif s[j] == ' ':
j += 1
else:
break
int_part = int_part or '0'
dec_part = dec_part or '0'
results.append(float(f"{int_part}.{dec_part}"))
pos = max(j, pos + 1)
return results if len(results) == 8 else None
# ============================================================
# 후처리: 6x8 -> 7x7 (WR->D 보정 + CCC 제거)
# ============================================================
_BROAD_GRADE_MAP_6 = {0: "AAA", 1: "AA", 2: "A", 3: "BBB", 4: "BB", 5: "B"}
def postprocess_matrix(raw_6x8, pd_floors=None):
"""6x8 (AAA~B이하 x AAA~WR+D) -> 7x7 (AAA~B+D)
Steps:
1. PD floor correction: if observed PD < floor, transfer from WR to D
2. Remaining WR -> proportional redistribution
3. B이하 -> B mapping
4. Add D row (absorbing state)
5. Normalize rows to sum=1
"""
assert raw_6x8.shape == (6, 8), f"Expected (6,8), got {raw_6x8.shape}"
mat = raw_6x8.copy()
COL_D = 6
COL_WR = 7
# Step 1: PD floor correction (WR -> D transfer)
if pd_floors is not None:
for i in range(6):
broad = _BROAD_GRADE_MAP_6[i]
if broad not in pd_floors:
continue
floor_pct = pd_floors[broad] * 100 # decimal -> %
observed_pd = mat[i, COL_D]
wr_available = mat[i, COL_WR]
if observed_pd < floor_pct and wr_available > 0:
deficit = floor_pct - observed_pd
transfer = min(deficit, wr_available)
mat[i, COL_D] += transfer
mat[i, COL_WR] -= transfer
# Step 2: Remaining WR -> proportional redistribution
for i in range(6):
wr_remaining = mat[i, COL_WR]
if wr_remaining > 0:
non_wr_cols = mat[i, :7]
non_wr_sum = non_wr_cols.sum()
if non_wr_sum > 0:
mat[i, :7] = non_wr_cols * (non_wr_sum + wr_remaining) / non_wr_sum
mat[i, COL_WR] = 0.0
# Step 3: B이하 -> B mapping + build 7x7
mat_7x7 = np.zeros((7, 7))
for i in range(6):
for j in range(6):
mat_7x7[i, j] = mat[i, j]
mat_7x7[i, 6] = mat[i, COL_D]
# Step 4: D row (absorbing state)
mat_7x7[6, :] = 0.0
mat_7x7[6, 6] = 100.0
# Step 5: Convert to probability and normalize
mat_7x7 /= 100.0
for i in range(7):
s = mat_7x7[i].sum()
if s > 0:
mat_7x7[i] /= s
return mat_7x7
# ============================================================
# 메인
# ============================================================
def main():
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
# Load PD floors
broad_floors = None
try:
import sys as _sys
_sys.path.insert(0, str(BASE_DIR))
from data.pd_floor import build_complete_pd_floor_table
broad_floors, _, _ = build_complete_pd_floor_table()
print(f" PD floor loaded: {', '.join(f'{g}={v*10000:.1f}bp' for g, v in broad_floors.items())}")
except Exception as e:
print(f" PD floor load failed ({e}), proceeding without floor")
all_matrices = {}
for agency, pdf_path in PDF_FILES.items():
print(f"\n{'='*60}")
print(f" Parsing: {agency} ({pdf_path.name})")
print(f"{'='*60}")
if not pdf_path.exists():
print(f" ERROR: File not found")
continue
# 파서 선택
if agency == "NICE":
raw = parse_nice(pdf_path)
elif agency == "KR":
raw = parse_kr(pdf_path)
else: # SCI
raw = parse_via_tables(pdf_path)
print(f" Extracted {len(raw)} matrices: {sorted(raw.keys())}")
# 샘플 출력
for sample_year in [1998, 2009, 2025]:
if sample_year in raw:
labels = ["AAA", "AA", "A", "BBB", "BB", "B_below"]
print(f"\n Raw {sample_year}:")
for idx, g in enumerate(labels):
print(f" {g:>7}: [{', '.join(f'{v:7.2f}' for v in raw[sample_year][idx])}]")
# 후처리 + CSV 저장
processed = {}
for year, raw_mat in sorted(raw.items()):
try:
processed[year] = postprocess_matrix(raw_mat, pd_floors=broad_floors)
except Exception as e:
print(f" ERROR {year}: {e}")
all_matrices[agency] = processed
print(f" Processed {len(processed)} matrices")
for year, mat in processed.items():
df = pd.DataFrame(mat, index=MODEL_GRADES_7, columns=MODEL_GRADES_7)
df.to_csv(OUTPUT_DIR / f"{agency}_{year}.csv", float_format="%.6f")
# 3사 평균
print(f"\n{'='*60}")
print(f" Computing 3-agency average")
print(f"{'='*60}")
agency_names = list(all_matrices.keys())
common_years = sorted(set.intersection(
*[set(all_matrices[a].keys()) for a in agency_names]
)) if len(agency_names) >= 2 else []
print(f" Common years: {len(common_years)}")
if common_years:
print(f" Range: {common_years[0]}~{common_years[-1]}")
for year in common_years:
avg = np.mean([all_matrices[a][year] for a in agency_names], axis=0)
for i in range(7):
s = avg[i].sum()
if s > 0:
avg[i] /= s
df = pd.DataFrame(avg, index=MODEL_GRADES_7, columns=MODEL_GRADES_7)
df.to_csv(OUTPUT_DIR / f"AVG_{year}.csv", float_format="%.6f")
# PD 요약
print(f"\n{'='*60}")
print(f" PD Summary")
print(f"{'='*60}")
print(f"\n {'':>6}", end='')
for a in agency_names:
print(f" {a:>10}", end='')
if common_years:
print(f" {'AVG':>10}", end='')
print()
for sample_year in [2000, 2009, 2020, 2025]:
if sample_year not in common_years and not any(sample_year in all_matrices[a] for a in agency_names):
continue
print(f"\n Year {sample_year}:")
for gi, grade in enumerate(MODEL_GRADES_7[:-1]):
print(f" {grade:>5}:", end='')
for a in agency_names:
if sample_year in all_matrices[a]:
pd_val = all_matrices[a][sample_year][gi, -1] * 100
print(f" {pd_val:9.3f}%", end='')
else:
print(f" {'N/A':>10}", end='')
if sample_year in common_years:
avg_f = OUTPUT_DIR / f"AVG_{sample_year}.csv"
avg_df = pd.read_csv(avg_f, index_col=0)
print(f" {avg_df.loc[grade, 'D']*100:9.3f}%", end='')
print()
print(f"\n Output: {OUTPUT_DIR}")
print(f" Total CSV files: {len(list(OUTPUT_DIR.glob('*.csv')))}")
if __name__ == "__main__":
main()