feat(data): parse 3-agency PDF transition matrices to CSV #task-290

- New: data/parse_pdf_matrices.py (KR/NICE/SCI PDF parser)
  - KR: text-based parser (space-separated numbers + dashes)
  - NICE: text-based parser (clean numeric format)
  - SCI: pdfplumber table extraction (column-position-aware)
  - WR redistribution, B이하→B mapping, CCC extrapolation from PD patterns
- Modified: data/transition_matrices.py (added source='real' loader)
- Modified: config.yaml (data.transition_source: 'real')
- Modified: main.py (reads transition source from config)
- Output: 112 CSV files (KR/NICE/SCI/AVG × 28 years)
This commit is contained in:
Variet Agent
2026-03-11 01:07:27 +09:00
parent ebdc6b805b
commit 8af743e6f3
116 changed files with 1714 additions and 3 deletions

654
data/parse_pdf_matrices.py Normal file
View File

@@ -0,0 +1,654 @@
"""
3사 전이행렬 PDF → CSV 변환 스크립트
한국기업평가(KR), NICE신용평가, 한신평(SCI) PDF에서
연도별 1년 전이행렬을 추출하여 8×8 CSV로 저장합니다.
후처리:
1. WR(등급취소) 열 제거 → 나머지 비례 재배분
2. B이하 → B 매핑
3. CCC 행/열: 등급간 PD 패턴으로 extrapolation
4. D 행: [0,...,0,1] 흡수상태
5. 행합 정규화 = 1.0
사용법:
python data/parse_pdf_matrices.py
"""
import sys
import io
import re
import numpy as np
import pandas as pd
import pdfplumber
from pathlib import Path
from typing import Dict, List, Optional
# Windows CP949
if sys.stdout.encoding != 'utf-8':
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
MODEL_GRADES = ["AAA", "AA", "A", "BBB", "BB", "B", "CCC", "D"]
GRADE_LABELS = ["AAA", "AA", "A", "BBB", "BB"] # B이하 is separate
BASE_DIR = Path(__file__).parent.parent
DOC_DIR = BASE_DIR / "doc"
OUTPUT_DIR = BASE_DIR / "data" / "real"
PDF_FILES = {
"KR": DOC_DIR / "260120143004692_KR 제출자료(2026년1월20일)_신용등급변화표(1년,3년).pdf",
"NICE": DOC_DIR / "260122103003349_NICE신용평가_2025년_신용등급변화표_202601.pdf",
"SCI": DOC_DIR / "260127134503220_1. 신용등급변화표_2025년.pdf",
}
def _fix_cell(cell: str) -> float:
"""셀 값 정리: 공백분리 숫자 ('9 3.10' → 93.10), 빈문자열/None → 0"""
if cell is None or cell.strip() == '' or cell.strip() == '-':
return 0.0
# 공백 제거
cleaned = cell.replace(' ', '')
try:
return float(cleaned)
except ValueError:
return 0.0
def _is_grade_label(text: str) -> Optional[str]:
"""등급 라벨 식별 — 'AAA', 'AA', 'A', 'BBB', 'BB', 'B이하'"""
if text is None:
return None
t = text.strip()
if t in GRADE_LABELS:
return t
# B이하/B하 — 인코딩 깨진 경우도 처리
if t.startswith('B') and t not in ['BB', 'BBB'] and len(t) > 1:
return "B_below"
return None
# ============================================================
# 테이블 기반 파서 (SCI / KR 용)
# ============================================================
def parse_via_tables(pdf_path: Path) -> Dict[int, np.ndarray]:
"""pdfplumber 테이블 추출로 전이행렬 파싱 (열 위치 보존)"""
matrices = {}
pdf = pdfplumber.open(pdf_path)
for page in pdf.pages:
tables = page.extract_tables()
for table in tables:
_extract_from_table(table, matrices)
pdf.close()
return matrices
def _extract_from_table(table: List[List], matrices: Dict[int, np.ndarray]):
"""하나의 pdfplumber 테이블에서 연도별 행렬 추출"""
rows = table
n_rows = len(rows)
if n_rows < 8:
return
i = 0
while i < n_rows:
row = rows[i]
# 연도 헤더 탐지: 셀에 "YYYY" + 비숫자 (단, ~ 없음)
year = _detect_year_in_row(row)
if year is not None and 1998 <= year <= 2025:
# 헤더 행 찾기 (AAA, AA, A, ...)
# 다음 6행이 데이터
mat = _parse_table_block(rows, i, n_rows)
if mat is not None:
matrices[year] = mat
i += 1
def _detect_year_in_row(row: List) -> Optional[int]:
"""테이블 행에서 단독 연도 탐지 (다년도 ~는 제외)"""
for cell in row:
if cell is None:
continue
text = str(cell).strip()
if '~' in text:
return None
m = re.search(r'(\d{4})', text)
if m:
year = int(m.group(1))
if 1998 <= year <= 2025:
return year
return None
def _parse_table_block(rows: List[List], start: int, total: int) -> Optional[np.ndarray]:
"""테이블에서 현재 위치부터 6개 등급 행 추출"""
# 헤더 행 (AAA, AA, A, ...) 찾기
header_idx = None
for j in range(start, min(start + 5, total)):
if any(str(c).strip() == 'AAA' for c in rows[j] if c):
header_idx = j
break
if header_idx is None:
return None
# 열 인덱스 매핑 (AAA, AA, A, BBB, BB, B이하, D, WR)
header = rows[header_idx]
col_map = {}
for ci, cell in enumerate(header):
if cell is None:
continue
t = str(cell).strip()
if t == 'AAA':
col_map['AAA'] = ci
elif t == 'AA':
col_map['AA'] = ci
elif t == 'A':
col_map['A'] = ci
elif t == 'BBB':
col_map['BBB'] = ci
elif t == 'BB':
col_map['BB'] = ci
elif t == 'D':
col_map['D'] = ci
elif t == 'WR':
col_map['WR'] = ci
elif t.startswith('B') and t not in ['BB', 'BBB']:
col_map['B_below'] = ci
required_cols = ['AAA', 'AA', 'A', 'BBB', 'BB', 'B_below', 'D', 'WR']
if not all(c in col_map for c in required_cols):
return None
# 데이터 행 추출 (header 다음부터)
mat = np.zeros((6, 8))
grade_idx = {"AAA": 0, "AA": 1, "A": 2, "BBB": 3, "BB": 4, "B_below": 5}
col_order = ['AAA', 'AA', 'A', 'BBB', 'BB', 'B_below', 'D', 'WR']
found_grades = set()
for j in range(header_idx + 1, min(header_idx + 15, total)):
row = rows[j]
# 등급 식별 (col 0 or 1)
grade = None
for ci in range(min(2, len(row))):
g = _is_grade_label(str(row[ci]) if row[ci] else '')
if g:
grade = g
break
if grade is None:
# 빈 행이면 이전 등급 컨텍스트 체크 — skip
continue
if grade in found_grades:
continue
if grade not in grade_idx:
continue
ri = grade_idx[grade]
for ci_name, ci_col in enumerate(col_order):
src_col = col_map[ci_col]
if src_col < len(row):
mat[ri, ci_name] = _fix_cell(str(row[src_col]) if row[src_col] else '')
found_grades.add(grade)
if len(found_grades) < 6:
return None
# 유효성: 행합 ~100
for ri in range(6):
s = mat[ri].sum()
if s < 30 or s > 110:
return None
return mat
# ============================================================
# NICE 텍스트 기반 파서 (숫자가 깔끔한 형태)
# ============================================================
def parse_nice(pdf_path: Path) -> Dict[int, np.ndarray]:
"""NICE PDF — clean numeric format, text-based"""
matrices = {}
pdf = pdfplumber.open(pdf_path)
for page in pdf.pages:
text = page.extract_text()
if not text:
continue
lines = text.split('\n')
i = 0
while i < len(lines):
line = lines[i].strip()
year_match = re.match(r'^(\d{4})\S', line)
if year_match:
year = int(year_match.group(1))
if '~' not in line and 1998 <= year <= 2025:
block = lines[i:i+15]
matrix = _extract_nice_matrix(block)
if matrix is not None:
matrices[year] = matrix
i += 1
pdf.close()
return matrices
def _extract_nice_matrix(block_lines: List[str]) -> Optional[np.ndarray]:
"""NICE에서 6×8 행렬 추출 (clean 8-number format)"""
matrix_rows = {}
for line in block_lines:
stripped = line.strip()
for grade in ["AAA", "BBB", "BB"]:
pat = re.match(rf'^{grade}\s+([\d.]+(?:\s+[\d.]+)*)', stripped)
if pat:
nums = [float(x) for x in pat.group(1).split()]
if len(nums) >= 6:
matrix_rows[grade] = nums[:8]
break
else:
# AA (not AAA)
pat = re.match(r'^AA\s+(?!A)([\d.]+(?:\s+[\d.]+)*)', stripped)
if pat:
nums = [float(x) for x in pat.group(1).split()]
if len(nums) >= 6:
matrix_rows["AA"] = nums[:8]
continue
# A (not AA/AAA)
pat = re.match(r'^A\s+(?!A)([\d.]+(?:\s+[\d.]+)*)', stripped)
if pat:
nums = [float(x) for x in pat.group(1).split()]
if len(nums) >= 6:
matrix_rows["A"] = nums[:8]
continue
# B이하
pat = re.match(r'^B[^\w\s]?\S*\s+([\d.]+(?:\s+[\d.]+)*)', stripped)
if pat and not stripped.startswith("BB") and not stripped.startswith("BBB"):
nums = [float(x) for x in pat.group(1).split()]
if len(nums) >= 6:
matrix_rows["B_below"] = nums[:8]
required = ["AAA", "AA", "A", "BBB", "BB", "B_below"]
if not all(g in matrix_rows for g in required):
return None
mat = np.zeros((6, 8))
for idx, grade in enumerate(required):
vals = matrix_rows[grade]
for j in range(min(len(vals), 8)):
mat[idx, j] = vals[j]
for idx in range(6):
s = mat[idx].sum()
if s < 30 or s > 110:
return None
return mat
# ============================================================
# KR 텍스트 기반 파서 (공백 분리 숫자 + 대시)
# ============================================================
def parse_kr(pdf_path: Path) -> Dict[int, np.ndarray]:
"""KR PDF — space-separated numbers, dashes for zeros, always 8 columns"""
matrices = {}
pdf = pdfplumber.open(pdf_path)
full_text = ""
for page in pdf.pages:
text = page.extract_text()
if text:
full_text += text + "\n"
pdf.close()
lines = full_text.split('\n')
i = 0
while i < len(lines):
line = lines[i].strip()
year_match = re.match(r'^(\d{4})[^\d~]', line)
if year_match and '~' not in line:
year = int(year_match.group(1))
if 1998 <= year <= 2025:
block = lines[i:i+20]
matrix = _extract_kr_matrix(block)
if matrix is not None:
matrices[year] = matrix
i += 1
return matrices
def _extract_kr_matrix(block_lines: List[str]) -> Optional[np.ndarray]:
"""KR에서 6×8 행렬 추출 (dash + space-separated nums)"""
matrix_rows = {}
for line in block_lines:
stripped = line.strip()
grade = None
rest = None
for g in ["AAA", "BBB", "BB"]:
pat = re.match(rf'^{g}\s+(.*)', stripped)
if pat:
grade = g
rest = pat.group(1)
break
if grade is None:
pat = re.match(r'^AA\s+(?!A)(.*)', stripped)
if pat:
grade = "AA"
rest = pat.group(1)
if grade is None:
pat = re.match(r'^A\s+(?!A)(.*)', stripped)
if pat:
grade = "A"
rest = pat.group(1)
if grade is None:
# B이하 — B + non-ascii
pat = re.match(r'^B[^\w\s]?\s*(.*)', stripped)
if pat and not stripped.startswith("BB") and not stripped.startswith("BBB"):
rest_raw = pat.group(1)
rest_cleaned = re.sub(r'^[^\d\s.-]+\s*', '', rest_raw)
if rest_cleaned and (re.search(r'\d', rest_cleaned) or '-' in rest_cleaned):
grade = "B_below"
rest = rest_cleaned
if grade is None or rest is None or grade in matrix_rows:
continue
values = _parse_kr_numbers(rest)
if values is not None and len(values) == 8:
matrix_rows[grade] = values
required = ["AAA", "AA", "A", "BBB", "BB", "B_below"]
if not all(g in matrix_rows for g in required):
return None
mat = np.zeros((6, 8))
for idx, grade in enumerate(required):
mat[idx] = matrix_rows[grade]
for idx in range(6):
s = mat[idx].sum()
if s < 30 or s > 110:
return None
return mat
def _parse_kr_numbers(s: str) -> Optional[List[float]]:
"""KR 숫자열 파싱 — 8개 토큰 (숫자 or 대시)"""
s = s.strip()
if not s:
return None
results = []
pos = 0
n = len(s)
while pos < n and len(results) < 8:
# 공백 스킵
while pos < n and s[pos] == ' ':
pos += 1
if pos >= n:
break
# 대시 → 0
if s[pos] == '-':
results.append(0.0)
pos += 1
continue
# 소수점 포함 숫자 찾기
dot_pos = None
scan = pos
while scan < n:
if s[scan] == '.':
dot_pos = scan
break
elif s[scan] in '0123456789 ':
scan += 1
else:
break
if dot_pos is None:
# 숫자만 있는 경우
num_str = ''
while pos < n and s[pos].isdigit():
num_str += s[pos]
pos += 1
if num_str:
results.append(float(num_str))
elif pos < n:
pos += 1
continue
int_part = ''.join(c for c in s[pos:dot_pos] if c.isdigit())
dec_part = ''
j = dot_pos + 1
while j < n and len(dec_part) < 2:
if s[j].isdigit():
dec_part += s[j]
j += 1
elif s[j] == ' ':
j += 1
else:
break
int_part = int_part or '0'
dec_part = dec_part or '0'
results.append(float(f"{int_part}.{dec_part}"))
pos = max(j, pos + 1)
return results if len(results) == 8 else None
# ============================================================
# 후처리: 6×8 → 8×8
# ============================================================
def postprocess_matrix(raw_6x8: np.ndarray) -> np.ndarray:
"""6×8 (AAA~B이하 × AAA~WR) → 8×8 (AAA~D × AAA~D)"""
assert raw_6x8.shape == (6, 8), f"Expected (6,8), got {raw_6x8.shape}"
# WR 열(7) 제거 → 비례 재배분
mat_6x7 = raw_6x8[:, :7].copy()
for i in range(6):
row_sum = mat_6x7[i].sum()
if row_sum > 0:
mat_6x7[i] = mat_6x7[i] / row_sum * 100.0
# 8×8 구성: B이하(5) → B(5), D:col6→col7
mat = np.zeros((8, 8))
for i in range(6):
for j in range(6):
mat[i, j] = mat_6x7[i, j]
mat[i, 7] = mat_6x7[i, 6] # D
# CCC 행/열 extrapolation
mat = _extrapolate_ccc(mat)
# D 행
mat[7, :] = 0.0
mat[7, 7] = 100.0
# → 확률, 행합 정규화
mat /= 100.0
for i in range(8):
s = mat[i].sum()
if s > 0:
mat[i] /= s
return mat
def _extrapolate_ccc(mat: np.ndarray) -> np.ndarray:
"""CCC 행/열 extrapolation from B이하 PD 패턴"""
pd_bb = mat[4, 7]
pd_b = mat[5, 7]
# CCC PD
if pd_bb > 0 and pd_b > pd_bb:
ratio = pd_b / pd_bb
else:
ratio = 2.5
pd_ccc = min(pd_b * ratio, 60.0)
pd_ccc = max(pd_ccc, pd_b * 1.5)
# Stay rates
stay_bb = mat[4, 4]
stay_b = mat[5, 5]
stay_ratio = (stay_b / stay_bb) if (stay_bb > 0 and stay_b < stay_bb) else 0.7
stay_ccc = max(stay_b * stay_ratio, 5.0)
upgrade_to_b = mat[5, 4] * 0.8 if mat[5, 4] > 0 else 2.0
# CCC 행
mat[6, :] = [0, 0, 0.1, 0.2, 0.3, upgrade_to_b, stay_ccc, pd_ccc]
ccc_sum = mat[6].sum()
if ccc_sum > 100:
mat[6, 6] = max(mat[6, 6] - (ccc_sum - 100), 1.0)
elif ccc_sum < 100:
mat[6, 6] += (100 - ccc_sum)
# CCC 열: B→CCC, BB→CCC, BBB→CCC 전이 분리
b_to_ccc = mat[5, 5] * 0.15
mat[5, 6] = b_to_ccc
mat[5, 5] -= b_to_ccc
bb_to_ccc = mat[4, 5] * 0.1 if mat[4, 5] > 0 else 0.5
mat[4, 6] = bb_to_ccc
mat[4, 5] = max(mat[4, 5] - bb_to_ccc, 0)
mat[3, 6] = 0.3
mat[3, 5] = max(mat[3, 5] - 0.15, 0)
mat[3, 3] = max(mat[3, 3] - 0.15, 0)
return mat
# ============================================================
# 메인
# ============================================================
def main():
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
all_matrices = {}
for agency, pdf_path in PDF_FILES.items():
print(f"\n{'='*60}")
print(f" Parsing: {agency} ({pdf_path.name})")
print(f"{'='*60}")
if not pdf_path.exists():
print(f" ERROR: File not found")
continue
# 파서 선택
if agency == "NICE":
raw = parse_nice(pdf_path)
elif agency == "KR":
raw = parse_kr(pdf_path)
else: # SCI
raw = parse_via_tables(pdf_path)
print(f" Extracted {len(raw)} matrices: {sorted(raw.keys())}")
# 샘플 출력
for sample_year in [1998, 2009, 2025]:
if sample_year in raw:
labels = ["AAA", "AA", "A", "BBB", "BB", "B_below"]
print(f"\n Raw {sample_year}:")
for idx, g in enumerate(labels):
print(f" {g:>7}: [{', '.join(f'{v:7.2f}' for v in raw[sample_year][idx])}]")
# 후처리 + CSV 저장
processed = {}
for year, raw_mat in sorted(raw.items()):
try:
processed[year] = postprocess_matrix(raw_mat)
except Exception as e:
print(f" ERROR {year}: {e}")
all_matrices[agency] = processed
print(f" Processed {len(processed)} matrices")
for year, mat in processed.items():
df = pd.DataFrame(mat, index=MODEL_GRADES, columns=MODEL_GRADES)
df.to_csv(OUTPUT_DIR / f"{agency}_{year}.csv", float_format="%.6f")
# 3사 평균
print(f"\n{'='*60}")
print(f" Computing 3-agency average")
print(f"{'='*60}")
agency_names = list(all_matrices.keys())
common_years = sorted(set.intersection(
*[set(all_matrices[a].keys()) for a in agency_names]
)) if len(agency_names) >= 2 else []
print(f" Common years: {len(common_years)}")
if common_years:
print(f" Range: {common_years[0]}~{common_years[-1]}")
for year in common_years:
avg = np.mean([all_matrices[a][year] for a in agency_names], axis=0)
for i in range(8):
s = avg[i].sum()
if s > 0:
avg[i] /= s
df = pd.DataFrame(avg, index=MODEL_GRADES, columns=MODEL_GRADES)
df.to_csv(OUTPUT_DIR / f"AVG_{year}.csv", float_format="%.6f")
# PD 요약
print(f"\n{'='*60}")
print(f" PD Summary")
print(f"{'='*60}")
print(f"\n {'':>6}", end='')
for a in agency_names:
print(f" {a:>10}", end='')
if common_years:
print(f" {'AVG':>10}", end='')
print()
for sample_year in [2000, 2009, 2020, 2025]:
if sample_year not in common_years and not any(sample_year in all_matrices[a] for a in agency_names):
continue
print(f"\n Year {sample_year}:")
for gi, grade in enumerate(MODEL_GRADES[:-1]):
print(f" {grade:>5}:", end='')
for a in agency_names:
if sample_year in all_matrices[a]:
pd_val = all_matrices[a][sample_year][gi, -1] * 100
print(f" {pd_val:9.3f}%", end='')
else:
print(f" {'N/A':>10}", end='')
if sample_year in common_years:
avg_f = OUTPUT_DIR / f"AVG_{sample_year}.csv"
avg_df = pd.read_csv(avg_f, index_col=0)
print(f" {avg_df.loc[grade, 'D']*100:9.3f}%", end='')
print()
print(f"\n Output: {OUTPUT_DIR}")
print(f" Total CSV files: {len(list(OUTPUT_DIR.glob('*.csv')))}")
if __name__ == "__main__":
main()