""" 3사 전이행렬 PDF → CSV 변환 스크립트 한국기업평가(KR), NICE신용평가, 한신평(SCI) PDF에서 연도별 1년 전이행렬을 추출하여 8×8 CSV로 저장합니다. 후처리: 1. WR(등급취소) 열 제거 → 나머지 비례 재배분 2. B이하 → B 매핑 3. CCC 행/열: 등급간 PD 패턴으로 extrapolation 4. D 행: [0,...,0,1] 흡수상태 5. 행합 정규화 = 1.0 사용법: python data/parse_pdf_matrices.py """ import sys import io import re import numpy as np import pandas as pd import pdfplumber from pathlib import Path from typing import Dict, List, Optional # Windows CP949 if sys.stdout.encoding != 'utf-8': sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') MODEL_GRADES_7 = ["AAA", "AA", "A", "BBB", "BB", "B", "D"] MODEL_GRADES_8 = ["AAA", "AA", "A", "BBB", "BB", "B", "CCC", "D"] GRADE_LABELS = ["AAA", "AA", "A", "BBB", "BB"] # B이하 is separate BASE_DIR = Path(__file__).parent.parent DOC_DIR = BASE_DIR / "doc" OUTPUT_DIR = BASE_DIR / "data" / "real" PDF_FILES = { "KR": DOC_DIR / "260120143004692_KR 제출자료(2026년1월20일)_신용등급변화표(1년,3년).pdf", "NICE": DOC_DIR / "260122103003349_NICE신용평가_2025년_신용등급변화표_202601.pdf", "SCI": DOC_DIR / "260127134503220_1. 신용등급변화표_2025년.pdf", } def _fix_cell(cell: str) -> float: """셀 값 정리: 공백분리 숫자 ('9 3.10' → 93.10), 빈문자열/None → 0""" if cell is None or cell.strip() == '' or cell.strip() == '-': return 0.0 # 공백 제거 cleaned = cell.replace(' ', '') try: return float(cleaned) except ValueError: return 0.0 def _is_grade_label(text: str) -> Optional[str]: """등급 라벨 식별 — 'AAA', 'AA', 'A', 'BBB', 'BB', 'B이하' 등""" if text is None: return None t = text.strip() if t in GRADE_LABELS: return t # B이하/B하 — 인코딩 깨진 경우도 처리 if t.startswith('B') and t not in ['BB', 'BBB'] and len(t) > 1: return "B_below" return None # ============================================================ # 테이블 기반 파서 (SCI / KR 용) # ============================================================ def parse_via_tables(pdf_path: Path) -> Dict[int, np.ndarray]: """pdfplumber 테이블 추출로 전이행렬 파싱 (열 위치 보존)""" matrices = {} pdf = pdfplumber.open(pdf_path) for page in pdf.pages: tables = page.extract_tables() for table in tables: _extract_from_table(table, matrices) pdf.close() return matrices def _extract_from_table(table: List[List], matrices: Dict[int, np.ndarray]): """하나의 pdfplumber 테이블에서 연도별 행렬 추출""" rows = table n_rows = len(rows) if n_rows < 8: return i = 0 while i < n_rows: row = rows[i] # 연도 헤더 탐지: 셀에 "YYYY" + 비숫자 (단, ~ 없음) year = _detect_year_in_row(row) if year is not None and 1998 <= year <= 2025: # 헤더 행 찾기 (AAA, AA, A, ...) # 다음 6행이 데이터 mat = _parse_table_block(rows, i, n_rows) if mat is not None: matrices[year] = mat i += 1 def _detect_year_in_row(row: List) -> Optional[int]: """테이블 행에서 단독 연도 탐지 (다년도 ~는 제외)""" for cell in row: if cell is None: continue text = str(cell).strip() if '~' in text: return None m = re.search(r'(\d{4})', text) if m: year = int(m.group(1)) if 1998 <= year <= 2025: return year return None def _parse_table_block(rows: List[List], start: int, total: int) -> Optional[np.ndarray]: """테이블에서 현재 위치부터 6개 등급 행 추출""" # 헤더 행 (AAA, AA, A, ...) 찾기 header_idx = None for j in range(start, min(start + 5, total)): if any(str(c).strip() == 'AAA' for c in rows[j] if c): header_idx = j break if header_idx is None: return None # 열 인덱스 매핑 (AAA, AA, A, BBB, BB, B이하, D, WR) header = rows[header_idx] col_map = {} for ci, cell in enumerate(header): if cell is None: continue t = str(cell).strip() if t == 'AAA': col_map['AAA'] = ci elif t == 'AA': col_map['AA'] = ci elif t == 'A': col_map['A'] = ci elif t == 'BBB': col_map['BBB'] = ci elif t == 'BB': col_map['BB'] = ci elif t == 'D': col_map['D'] = ci elif t == 'WR': col_map['WR'] = ci elif t.startswith('B') and t not in ['BB', 'BBB']: col_map['B_below'] = ci required_cols = ['AAA', 'AA', 'A', 'BBB', 'BB', 'B_below', 'D', 'WR'] if not all(c in col_map for c in required_cols): return None # 데이터 행 추출 (header 다음부터) mat = np.zeros((6, 8)) grade_idx = {"AAA": 0, "AA": 1, "A": 2, "BBB": 3, "BB": 4, "B_below": 5} col_order = ['AAA', 'AA', 'A', 'BBB', 'BB', 'B_below', 'D', 'WR'] found_grades = set() for j in range(header_idx + 1, min(header_idx + 15, total)): row = rows[j] # 등급 식별 (col 0 or 1) grade = None for ci in range(min(2, len(row))): g = _is_grade_label(str(row[ci]) if row[ci] else '') if g: grade = g break if grade is None: # 빈 행이면 이전 등급 컨텍스트 체크 — skip continue if grade in found_grades: continue if grade not in grade_idx: continue ri = grade_idx[grade] for ci_name, ci_col in enumerate(col_order): src_col = col_map[ci_col] if src_col < len(row): mat[ri, ci_name] = _fix_cell(str(row[src_col]) if row[src_col] else '') found_grades.add(grade) if len(found_grades) < 6: return None # 유효성: 행합 ~100 for ri in range(6): s = mat[ri].sum() if s < 30 or s > 110: return None return mat # ============================================================ # NICE 텍스트 기반 파서 (숫자가 깔끔한 형태) # ============================================================ def parse_nice(pdf_path: Path) -> Dict[int, np.ndarray]: """NICE PDF — clean numeric format, text-based""" matrices = {} pdf = pdfplumber.open(pdf_path) for page in pdf.pages: text = page.extract_text() if not text: continue lines = text.split('\n') i = 0 while i < len(lines): line = lines[i].strip() year_match = re.match(r'^(\d{4})\S', line) if year_match: year = int(year_match.group(1)) if '~' not in line and 1998 <= year <= 2025: block = lines[i:i+15] matrix = _extract_nice_matrix(block) if matrix is not None: matrices[year] = matrix i += 1 pdf.close() return matrices def _extract_nice_matrix(block_lines: List[str]) -> Optional[np.ndarray]: """NICE에서 6×8 행렬 추출 (clean 8-number format)""" matrix_rows = {} for line in block_lines: stripped = line.strip() for grade in ["AAA", "BBB", "BB"]: pat = re.match(rf'^{grade}\s+([\d.]+(?:\s+[\d.]+)*)', stripped) if pat: nums = [float(x) for x in pat.group(1).split()] if len(nums) >= 6: matrix_rows[grade] = nums[:8] break else: # AA (not AAA) pat = re.match(r'^AA\s+(?!A)([\d.]+(?:\s+[\d.]+)*)', stripped) if pat: nums = [float(x) for x in pat.group(1).split()] if len(nums) >= 6: matrix_rows["AA"] = nums[:8] continue # A (not AA/AAA) pat = re.match(r'^A\s+(?!A)([\d.]+(?:\s+[\d.]+)*)', stripped) if pat: nums = [float(x) for x in pat.group(1).split()] if len(nums) >= 6: matrix_rows["A"] = nums[:8] continue # B이하 pat = re.match(r'^B[^\w\s]?\S*\s+([\d.]+(?:\s+[\d.]+)*)', stripped) if pat and not stripped.startswith("BB") and not stripped.startswith("BBB"): nums = [float(x) for x in pat.group(1).split()] if len(nums) >= 6: matrix_rows["B_below"] = nums[:8] required = ["AAA", "AA", "A", "BBB", "BB", "B_below"] if not all(g in matrix_rows for g in required): return None mat = np.zeros((6, 8)) for idx, grade in enumerate(required): vals = matrix_rows[grade] for j in range(min(len(vals), 8)): mat[idx, j] = vals[j] for idx in range(6): s = mat[idx].sum() if s < 30 or s > 110: return None return mat # ============================================================ # KR 텍스트 기반 파서 (공백 분리 숫자 + 대시) # ============================================================ def parse_kr(pdf_path: Path) -> Dict[int, np.ndarray]: """KR PDF — space-separated numbers, dashes for zeros, always 8 columns""" matrices = {} pdf = pdfplumber.open(pdf_path) full_text = "" for page in pdf.pages: text = page.extract_text() if text: full_text += text + "\n" pdf.close() lines = full_text.split('\n') i = 0 while i < len(lines): line = lines[i].strip() year_match = re.match(r'^(\d{4})[^\d~]', line) if year_match and '~' not in line: year = int(year_match.group(1)) if 1998 <= year <= 2025: block = lines[i:i+20] matrix = _extract_kr_matrix(block) if matrix is not None: matrices[year] = matrix i += 1 return matrices def _extract_kr_matrix(block_lines: List[str]) -> Optional[np.ndarray]: """KR에서 6×8 행렬 추출 (dash + space-separated nums)""" matrix_rows = {} for line in block_lines: stripped = line.strip() grade = None rest = None for g in ["AAA", "BBB", "BB"]: pat = re.match(rf'^{g}\s+(.*)', stripped) if pat: grade = g rest = pat.group(1) break if grade is None: pat = re.match(r'^AA\s+(?!A)(.*)', stripped) if pat: grade = "AA" rest = pat.group(1) if grade is None: pat = re.match(r'^A\s+(?!A)(.*)', stripped) if pat: grade = "A" rest = pat.group(1) if grade is None: # B이하 — B + non-ascii pat = re.match(r'^B[^\w\s]?\s*(.*)', stripped) if pat and not stripped.startswith("BB") and not stripped.startswith("BBB"): rest_raw = pat.group(1) rest_cleaned = re.sub(r'^[^\d\s.-]+\s*', '', rest_raw) if rest_cleaned and (re.search(r'\d', rest_cleaned) or '-' in rest_cleaned): grade = "B_below" rest = rest_cleaned if grade is None or rest is None or grade in matrix_rows: continue values = _parse_kr_numbers(rest) if values is not None and len(values) == 8: matrix_rows[grade] = values required = ["AAA", "AA", "A", "BBB", "BB", "B_below"] if not all(g in matrix_rows for g in required): return None mat = np.zeros((6, 8)) for idx, grade in enumerate(required): mat[idx] = matrix_rows[grade] for idx in range(6): s = mat[idx].sum() if s < 30 or s > 110: return None return mat def _parse_kr_numbers(s: str) -> Optional[List[float]]: """KR 숫자열 파싱 — 8개 토큰 (숫자 or 대시)""" s = s.strip() if not s: return None results = [] pos = 0 n = len(s) while pos < n and len(results) < 8: # 공백 스킵 while pos < n and s[pos] == ' ': pos += 1 if pos >= n: break # 대시 → 0 if s[pos] == '-': results.append(0.0) pos += 1 continue # 소수점 포함 숫자 찾기 dot_pos = None scan = pos while scan < n: if s[scan] == '.': dot_pos = scan break elif s[scan] in '0123456789 ': scan += 1 else: break if dot_pos is None: # 숫자만 있는 경우 num_str = '' while pos < n and s[pos].isdigit(): num_str += s[pos] pos += 1 if num_str: results.append(float(num_str)) elif pos < n: pos += 1 continue int_part = ''.join(c for c in s[pos:dot_pos] if c.isdigit()) dec_part = '' j = dot_pos + 1 while j < n and len(dec_part) < 2: if s[j].isdigit(): dec_part += s[j] j += 1 elif s[j] == ' ': j += 1 else: break int_part = int_part or '0' dec_part = dec_part or '0' results.append(float(f"{int_part}.{dec_part}")) pos = max(j, pos + 1) return results if len(results) == 8 else None # ============================================================ # 후처리: 6x8 -> 7x7 (WR->D 보정 + CCC 제거) # ============================================================ _BROAD_GRADE_MAP_6 = {0: "AAA", 1: "AA", 2: "A", 3: "BBB", 4: "BB", 5: "B"} def postprocess_matrix(raw_6x8, pd_floors=None): """6x8 (AAA~B이하 x AAA~WR+D) -> 7x7 (AAA~B+D) Steps: 1. PD floor correction: if observed PD < floor, transfer from WR to D 2. Remaining WR -> proportional redistribution 3. B이하 -> B mapping 4. Add D row (absorbing state) 5. Normalize rows to sum=1 """ assert raw_6x8.shape == (6, 8), f"Expected (6,8), got {raw_6x8.shape}" mat = raw_6x8.copy() COL_D = 6 COL_WR = 7 # Step 1: PD floor correction (WR -> D transfer) if pd_floors is not None: for i in range(6): broad = _BROAD_GRADE_MAP_6[i] if broad not in pd_floors: continue floor_pct = pd_floors[broad] * 100 # decimal -> % observed_pd = mat[i, COL_D] wr_available = mat[i, COL_WR] if observed_pd < floor_pct and wr_available > 0: deficit = floor_pct - observed_pd transfer = min(deficit, wr_available) mat[i, COL_D] += transfer mat[i, COL_WR] -= transfer # Step 2: Remaining WR -> proportional redistribution for i in range(6): wr_remaining = mat[i, COL_WR] if wr_remaining > 0: non_wr_cols = mat[i, :7] non_wr_sum = non_wr_cols.sum() if non_wr_sum > 0: mat[i, :7] = non_wr_cols * (non_wr_sum + wr_remaining) / non_wr_sum mat[i, COL_WR] = 0.0 # Step 3: B이하 -> B mapping + build 7x7 mat_7x7 = np.zeros((7, 7)) for i in range(6): for j in range(6): mat_7x7[i, j] = mat[i, j] mat_7x7[i, 6] = mat[i, COL_D] # Step 4: D row (absorbing state) mat_7x7[6, :] = 0.0 mat_7x7[6, 6] = 100.0 # Step 5: Convert to probability and normalize mat_7x7 /= 100.0 for i in range(7): s = mat_7x7[i].sum() if s > 0: mat_7x7[i] /= s return mat_7x7 # ============================================================ # 메인 # ============================================================ def main(): OUTPUT_DIR.mkdir(parents=True, exist_ok=True) # Load PD floors broad_floors = None try: import sys as _sys _sys.path.insert(0, str(BASE_DIR)) from data.pd_floor import build_complete_pd_floor_table broad_floors, _, _ = build_complete_pd_floor_table() print(f" PD floor loaded: {', '.join(f'{g}={v*10000:.1f}bp' for g, v in broad_floors.items())}") except Exception as e: print(f" PD floor load failed ({e}), proceeding without floor") all_matrices = {} for agency, pdf_path in PDF_FILES.items(): print(f"\n{'='*60}") print(f" Parsing: {agency} ({pdf_path.name})") print(f"{'='*60}") if not pdf_path.exists(): print(f" ERROR: File not found") continue # 파서 선택 if agency == "NICE": raw = parse_nice(pdf_path) elif agency == "KR": raw = parse_kr(pdf_path) else: # SCI raw = parse_via_tables(pdf_path) print(f" Extracted {len(raw)} matrices: {sorted(raw.keys())}") # 샘플 출력 for sample_year in [1998, 2009, 2025]: if sample_year in raw: labels = ["AAA", "AA", "A", "BBB", "BB", "B_below"] print(f"\n Raw {sample_year}:") for idx, g in enumerate(labels): print(f" {g:>7}: [{', '.join(f'{v:7.2f}' for v in raw[sample_year][idx])}]") # 후처리 + CSV 저장 processed = {} for year, raw_mat in sorted(raw.items()): try: processed[year] = postprocess_matrix(raw_mat, pd_floors=broad_floors) except Exception as e: print(f" ERROR {year}: {e}") all_matrices[agency] = processed print(f" Processed {len(processed)} matrices") for year, mat in processed.items(): df = pd.DataFrame(mat, index=MODEL_GRADES_7, columns=MODEL_GRADES_7) df.to_csv(OUTPUT_DIR / f"{agency}_{year}.csv", float_format="%.6f") # 3사 평균 print(f"\n{'='*60}") print(f" Computing 3-agency average") print(f"{'='*60}") agency_names = list(all_matrices.keys()) common_years = sorted(set.intersection( *[set(all_matrices[a].keys()) for a in agency_names] )) if len(agency_names) >= 2 else [] print(f" Common years: {len(common_years)}") if common_years: print(f" Range: {common_years[0]}~{common_years[-1]}") for year in common_years: avg = np.mean([all_matrices[a][year] for a in agency_names], axis=0) for i in range(7): s = avg[i].sum() if s > 0: avg[i] /= s df = pd.DataFrame(avg, index=MODEL_GRADES_7, columns=MODEL_GRADES_7) df.to_csv(OUTPUT_DIR / f"AVG_{year}.csv", float_format="%.6f") # PD 요약 print(f"\n{'='*60}") print(f" PD Summary") print(f"{'='*60}") print(f"\n {'':>6}", end='') for a in agency_names: print(f" {a:>10}", end='') if common_years: print(f" {'AVG':>10}", end='') print() for sample_year in [2000, 2009, 2020, 2025]: if sample_year not in common_years and not any(sample_year in all_matrices[a] for a in agency_names): continue print(f"\n Year {sample_year}:") for gi, grade in enumerate(MODEL_GRADES_7[:-1]): print(f" {grade:>5}:", end='') for a in agency_names: if sample_year in all_matrices[a]: pd_val = all_matrices[a][sample_year][gi, -1] * 100 print(f" {pd_val:9.3f}%", end='') else: print(f" {'N/A':>10}", end='') if sample_year in common_years: avg_f = OUTPUT_DIR / f"AVG_{sample_year}.csv" avg_df = pd.read_csv(avg_f, index_col=0) print(f" {avg_df.loc[grade, 'D']*100:9.3f}%", end='') print() print(f"\n Output: {OUTPUT_DIR}") print(f" Total CSV files: {len(list(OUTPUT_DIR.glob('*.csv')))}") if __name__ == "__main__": main()