652 lines
20 KiB
Python
652 lines
20 KiB
Python
"""
|
||
3사 전이행렬 PDF → CSV 변환 스크립트
|
||
|
||
한국기업평가(KR), NICE신용평가, 한신평(SCI) PDF에서
|
||
연도별 1년 전이행렬을 추출하여 8×8 CSV로 저장합니다.
|
||
|
||
후처리:
|
||
1. WR(등급취소) 열 제거 → 나머지 비례 재배분
|
||
2. B이하 → B 매핑
|
||
3. CCC 행/열: 등급간 PD 패턴으로 extrapolation
|
||
4. D 행: [0,...,0,1] 흡수상태
|
||
5. 행합 정규화 = 1.0
|
||
|
||
사용법:
|
||
python data/parse_pdf_matrices.py
|
||
"""
|
||
|
||
import sys
|
||
import io
|
||
import re
|
||
import numpy as np
|
||
import pandas as pd
|
||
import pdfplumber
|
||
from pathlib import Path
|
||
from typing import Dict, List, Optional
|
||
|
||
# Windows CP949
|
||
if sys.stdout.encoding != 'utf-8':
|
||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
|
||
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
|
||
|
||
MODEL_GRADES_7 = ["AAA", "AA", "A", "BBB", "BB", "B", "D"]
|
||
MODEL_GRADES_8 = ["AAA", "AA", "A", "BBB", "BB", "B", "CCC", "D"]
|
||
GRADE_LABELS = ["AAA", "AA", "A", "BBB", "BB"] # B이하 is separate
|
||
|
||
BASE_DIR = Path(__file__).parent.parent
|
||
DOC_DIR = BASE_DIR / "doc"
|
||
OUTPUT_DIR = BASE_DIR / "data" / "real"
|
||
|
||
PDF_FILES = {
|
||
"KR": DOC_DIR / "260120143004692_KR 제출자료(2026년1월20일)_신용등급변화표(1년,3년).pdf",
|
||
"NICE": DOC_DIR / "260122103003349_NICE신용평가_2025년_신용등급변화표_202601.pdf",
|
||
"SCI": DOC_DIR / "260127134503220_1. 신용등급변화표_2025년.pdf",
|
||
}
|
||
|
||
|
||
def _fix_cell(cell: str) -> float:
|
||
"""셀 값 정리: 공백분리 숫자 ('9 3.10' → 93.10), 빈문자열/None → 0"""
|
||
if cell is None or cell.strip() == '' or cell.strip() == '-':
|
||
return 0.0
|
||
# 공백 제거
|
||
cleaned = cell.replace(' ', '')
|
||
try:
|
||
return float(cleaned)
|
||
except ValueError:
|
||
return 0.0
|
||
|
||
|
||
def _is_grade_label(text: str) -> Optional[str]:
|
||
"""등급 라벨 식별 — 'AAA', 'AA', 'A', 'BBB', 'BB', 'B이하' 등"""
|
||
if text is None:
|
||
return None
|
||
t = text.strip()
|
||
if t in GRADE_LABELS:
|
||
return t
|
||
# B이하/B하 — 인코딩 깨진 경우도 처리
|
||
if t.startswith('B') and t not in ['BB', 'BBB'] and len(t) > 1:
|
||
return "B_below"
|
||
return None
|
||
|
||
|
||
# ============================================================
|
||
# 테이블 기반 파서 (SCI / KR 용)
|
||
# ============================================================
|
||
def parse_via_tables(pdf_path: Path) -> Dict[int, np.ndarray]:
|
||
"""pdfplumber 테이블 추출로 전이행렬 파싱 (열 위치 보존)"""
|
||
matrices = {}
|
||
pdf = pdfplumber.open(pdf_path)
|
||
|
||
for page in pdf.pages:
|
||
tables = page.extract_tables()
|
||
for table in tables:
|
||
_extract_from_table(table, matrices)
|
||
|
||
pdf.close()
|
||
return matrices
|
||
|
||
|
||
def _extract_from_table(table: List[List], matrices: Dict[int, np.ndarray]):
|
||
"""하나의 pdfplumber 테이블에서 연도별 행렬 추출"""
|
||
rows = table
|
||
n_rows = len(rows)
|
||
if n_rows < 8:
|
||
return
|
||
|
||
i = 0
|
||
while i < n_rows:
|
||
row = rows[i]
|
||
|
||
# 연도 헤더 탐지: 셀에 "YYYY" + 비숫자 (단, ~ 없음)
|
||
year = _detect_year_in_row(row)
|
||
if year is not None and 1998 <= year <= 2025:
|
||
# 헤더 행 찾기 (AAA, AA, A, ...)
|
||
# 다음 6행이 데이터
|
||
mat = _parse_table_block(rows, i, n_rows)
|
||
if mat is not None:
|
||
matrices[year] = mat
|
||
i += 1
|
||
|
||
|
||
def _detect_year_in_row(row: List) -> Optional[int]:
|
||
"""테이블 행에서 단독 연도 탐지 (다년도 ~는 제외)"""
|
||
for cell in row:
|
||
if cell is None:
|
||
continue
|
||
text = str(cell).strip()
|
||
if '~' in text:
|
||
return None
|
||
m = re.search(r'(\d{4})', text)
|
||
if m:
|
||
year = int(m.group(1))
|
||
if 1998 <= year <= 2025:
|
||
return year
|
||
return None
|
||
|
||
|
||
def _parse_table_block(rows: List[List], start: int, total: int) -> Optional[np.ndarray]:
|
||
"""테이블에서 현재 위치부터 6개 등급 행 추출"""
|
||
# 헤더 행 (AAA, AA, A, ...) 찾기
|
||
header_idx = None
|
||
for j in range(start, min(start + 5, total)):
|
||
if any(str(c).strip() == 'AAA' for c in rows[j] if c):
|
||
header_idx = j
|
||
break
|
||
|
||
if header_idx is None:
|
||
return None
|
||
|
||
# 열 인덱스 매핑 (AAA, AA, A, BBB, BB, B이하, D, WR)
|
||
header = rows[header_idx]
|
||
col_map = {}
|
||
for ci, cell in enumerate(header):
|
||
if cell is None:
|
||
continue
|
||
t = str(cell).strip()
|
||
if t == 'AAA':
|
||
col_map['AAA'] = ci
|
||
elif t == 'AA':
|
||
col_map['AA'] = ci
|
||
elif t == 'A':
|
||
col_map['A'] = ci
|
||
elif t == 'BBB':
|
||
col_map['BBB'] = ci
|
||
elif t == 'BB':
|
||
col_map['BB'] = ci
|
||
elif t == 'D':
|
||
col_map['D'] = ci
|
||
elif t == 'WR':
|
||
col_map['WR'] = ci
|
||
elif t.startswith('B') and t not in ['BB', 'BBB']:
|
||
col_map['B_below'] = ci
|
||
|
||
required_cols = ['AAA', 'AA', 'A', 'BBB', 'BB', 'B_below', 'D', 'WR']
|
||
if not all(c in col_map for c in required_cols):
|
||
return None
|
||
|
||
# 데이터 행 추출 (header 다음부터)
|
||
mat = np.zeros((6, 8))
|
||
grade_idx = {"AAA": 0, "AA": 1, "A": 2, "BBB": 3, "BB": 4, "B_below": 5}
|
||
col_order = ['AAA', 'AA', 'A', 'BBB', 'BB', 'B_below', 'D', 'WR']
|
||
|
||
found_grades = set()
|
||
for j in range(header_idx + 1, min(header_idx + 15, total)):
|
||
row = rows[j]
|
||
|
||
# 등급 식별 (col 0 or 1)
|
||
grade = None
|
||
for ci in range(min(2, len(row))):
|
||
g = _is_grade_label(str(row[ci]) if row[ci] else '')
|
||
if g:
|
||
grade = g
|
||
break
|
||
|
||
if grade is None:
|
||
# 빈 행이면 이전 등급 컨텍스트 체크 — skip
|
||
continue
|
||
|
||
if grade in found_grades:
|
||
continue
|
||
|
||
if grade not in grade_idx:
|
||
continue
|
||
|
||
ri = grade_idx[grade]
|
||
for ci_name, ci_col in enumerate(col_order):
|
||
src_col = col_map[ci_col]
|
||
if src_col < len(row):
|
||
mat[ri, ci_name] = _fix_cell(str(row[src_col]) if row[src_col] else '')
|
||
|
||
found_grades.add(grade)
|
||
|
||
if len(found_grades) < 6:
|
||
return None
|
||
|
||
# 유효성: 행합 ~100
|
||
for ri in range(6):
|
||
s = mat[ri].sum()
|
||
if s < 30 or s > 110:
|
||
return None
|
||
|
||
return mat
|
||
|
||
|
||
# ============================================================
|
||
# NICE 텍스트 기반 파서 (숫자가 깔끔한 형태)
|
||
# ============================================================
|
||
def parse_nice(pdf_path: Path) -> Dict[int, np.ndarray]:
|
||
"""NICE PDF — clean numeric format, text-based"""
|
||
matrices = {}
|
||
pdf = pdfplumber.open(pdf_path)
|
||
|
||
for page in pdf.pages:
|
||
text = page.extract_text()
|
||
if not text:
|
||
continue
|
||
|
||
lines = text.split('\n')
|
||
i = 0
|
||
while i < len(lines):
|
||
line = lines[i].strip()
|
||
|
||
year_match = re.match(r'^(\d{4})\S', line)
|
||
if year_match:
|
||
year = int(year_match.group(1))
|
||
if '~' not in line and 1998 <= year <= 2025:
|
||
block = lines[i:i+15]
|
||
matrix = _extract_nice_matrix(block)
|
||
if matrix is not None:
|
||
matrices[year] = matrix
|
||
i += 1
|
||
|
||
pdf.close()
|
||
return matrices
|
||
|
||
|
||
def _extract_nice_matrix(block_lines: List[str]) -> Optional[np.ndarray]:
|
||
"""NICE에서 6×8 행렬 추출 (clean 8-number format)"""
|
||
matrix_rows = {}
|
||
|
||
for line in block_lines:
|
||
stripped = line.strip()
|
||
|
||
for grade in ["AAA", "BBB", "BB"]:
|
||
pat = re.match(rf'^{grade}\s+([\d.]+(?:\s+[\d.]+)*)', stripped)
|
||
if pat:
|
||
nums = [float(x) for x in pat.group(1).split()]
|
||
if len(nums) >= 6:
|
||
matrix_rows[grade] = nums[:8]
|
||
break
|
||
else:
|
||
# AA (not AAA)
|
||
pat = re.match(r'^AA\s+(?!A)([\d.]+(?:\s+[\d.]+)*)', stripped)
|
||
if pat:
|
||
nums = [float(x) for x in pat.group(1).split()]
|
||
if len(nums) >= 6:
|
||
matrix_rows["AA"] = nums[:8]
|
||
continue
|
||
|
||
# A (not AA/AAA)
|
||
pat = re.match(r'^A\s+(?!A)([\d.]+(?:\s+[\d.]+)*)', stripped)
|
||
if pat:
|
||
nums = [float(x) for x in pat.group(1).split()]
|
||
if len(nums) >= 6:
|
||
matrix_rows["A"] = nums[:8]
|
||
continue
|
||
|
||
# B이하
|
||
pat = re.match(r'^B[^\w\s]?\S*\s+([\d.]+(?:\s+[\d.]+)*)', stripped)
|
||
if pat and not stripped.startswith("BB") and not stripped.startswith("BBB"):
|
||
nums = [float(x) for x in pat.group(1).split()]
|
||
if len(nums) >= 6:
|
||
matrix_rows["B_below"] = nums[:8]
|
||
|
||
required = ["AAA", "AA", "A", "BBB", "BB", "B_below"]
|
||
if not all(g in matrix_rows for g in required):
|
||
return None
|
||
|
||
mat = np.zeros((6, 8))
|
||
for idx, grade in enumerate(required):
|
||
vals = matrix_rows[grade]
|
||
for j in range(min(len(vals), 8)):
|
||
mat[idx, j] = vals[j]
|
||
|
||
for idx in range(6):
|
||
s = mat[idx].sum()
|
||
if s < 30 or s > 110:
|
||
return None
|
||
|
||
return mat
|
||
|
||
|
||
# ============================================================
|
||
# KR 텍스트 기반 파서 (공백 분리 숫자 + 대시)
|
||
# ============================================================
|
||
def parse_kr(pdf_path: Path) -> Dict[int, np.ndarray]:
|
||
"""KR PDF — space-separated numbers, dashes for zeros, always 8 columns"""
|
||
matrices = {}
|
||
pdf = pdfplumber.open(pdf_path)
|
||
|
||
full_text = ""
|
||
for page in pdf.pages:
|
||
text = page.extract_text()
|
||
if text:
|
||
full_text += text + "\n"
|
||
pdf.close()
|
||
|
||
lines = full_text.split('\n')
|
||
i = 0
|
||
while i < len(lines):
|
||
line = lines[i].strip()
|
||
|
||
year_match = re.match(r'^(\d{4})[^\d~]', line)
|
||
if year_match and '~' not in line:
|
||
year = int(year_match.group(1))
|
||
if 1998 <= year <= 2025:
|
||
block = lines[i:i+20]
|
||
matrix = _extract_kr_matrix(block)
|
||
if matrix is not None:
|
||
matrices[year] = matrix
|
||
i += 1
|
||
|
||
return matrices
|
||
|
||
|
||
def _extract_kr_matrix(block_lines: List[str]) -> Optional[np.ndarray]:
|
||
"""KR에서 6×8 행렬 추출 (dash + space-separated nums)"""
|
||
matrix_rows = {}
|
||
|
||
for line in block_lines:
|
||
stripped = line.strip()
|
||
grade = None
|
||
rest = None
|
||
|
||
for g in ["AAA", "BBB", "BB"]:
|
||
pat = re.match(rf'^{g}\s+(.*)', stripped)
|
||
if pat:
|
||
grade = g
|
||
rest = pat.group(1)
|
||
break
|
||
|
||
if grade is None:
|
||
pat = re.match(r'^AA\s+(?!A)(.*)', stripped)
|
||
if pat:
|
||
grade = "AA"
|
||
rest = pat.group(1)
|
||
|
||
if grade is None:
|
||
pat = re.match(r'^A\s+(?!A)(.*)', stripped)
|
||
if pat:
|
||
grade = "A"
|
||
rest = pat.group(1)
|
||
|
||
if grade is None:
|
||
# B이하 — B + non-ascii
|
||
pat = re.match(r'^B[^\w\s]?\s*(.*)', stripped)
|
||
if pat and not stripped.startswith("BB") and not stripped.startswith("BBB"):
|
||
rest_raw = pat.group(1)
|
||
rest_cleaned = re.sub(r'^[^\d\s.-]+\s*', '', rest_raw)
|
||
if rest_cleaned and (re.search(r'\d', rest_cleaned) or '-' in rest_cleaned):
|
||
grade = "B_below"
|
||
rest = rest_cleaned
|
||
|
||
if grade is None or rest is None or grade in matrix_rows:
|
||
continue
|
||
|
||
values = _parse_kr_numbers(rest)
|
||
if values is not None and len(values) == 8:
|
||
matrix_rows[grade] = values
|
||
|
||
required = ["AAA", "AA", "A", "BBB", "BB", "B_below"]
|
||
if not all(g in matrix_rows for g in required):
|
||
return None
|
||
|
||
mat = np.zeros((6, 8))
|
||
for idx, grade in enumerate(required):
|
||
mat[idx] = matrix_rows[grade]
|
||
|
||
for idx in range(6):
|
||
s = mat[idx].sum()
|
||
if s < 30 or s > 110:
|
||
return None
|
||
|
||
return mat
|
||
|
||
|
||
def _parse_kr_numbers(s: str) -> Optional[List[float]]:
|
||
"""KR 숫자열 파싱 — 8개 토큰 (숫자 or 대시)"""
|
||
s = s.strip()
|
||
if not s:
|
||
return None
|
||
|
||
results = []
|
||
pos = 0
|
||
n = len(s)
|
||
|
||
while pos < n and len(results) < 8:
|
||
# 공백 스킵
|
||
while pos < n and s[pos] == ' ':
|
||
pos += 1
|
||
if pos >= n:
|
||
break
|
||
|
||
# 대시 → 0
|
||
if s[pos] == '-':
|
||
results.append(0.0)
|
||
pos += 1
|
||
continue
|
||
|
||
# 소수점 포함 숫자 찾기
|
||
dot_pos = None
|
||
scan = pos
|
||
while scan < n:
|
||
if s[scan] == '.':
|
||
dot_pos = scan
|
||
break
|
||
elif s[scan] in '0123456789 ':
|
||
scan += 1
|
||
else:
|
||
break
|
||
|
||
if dot_pos is None:
|
||
# 숫자만 있는 경우
|
||
num_str = ''
|
||
while pos < n and s[pos].isdigit():
|
||
num_str += s[pos]
|
||
pos += 1
|
||
if num_str:
|
||
results.append(float(num_str))
|
||
elif pos < n:
|
||
pos += 1
|
||
continue
|
||
|
||
int_part = ''.join(c for c in s[pos:dot_pos] if c.isdigit())
|
||
dec_part = ''
|
||
j = dot_pos + 1
|
||
while j < n and len(dec_part) < 2:
|
||
if s[j].isdigit():
|
||
dec_part += s[j]
|
||
j += 1
|
||
elif s[j] == ' ':
|
||
j += 1
|
||
else:
|
||
break
|
||
|
||
int_part = int_part or '0'
|
||
dec_part = dec_part or '0'
|
||
results.append(float(f"{int_part}.{dec_part}"))
|
||
pos = max(j, pos + 1)
|
||
|
||
return results if len(results) == 8 else None
|
||
|
||
|
||
# ============================================================
|
||
# 후처리: 6x8 -> 7x7 (WR->D 보정 + CCC 제거)
|
||
# ============================================================
|
||
|
||
_BROAD_GRADE_MAP_6 = {0: "AAA", 1: "AA", 2: "A", 3: "BBB", 4: "BB", 5: "B"}
|
||
|
||
|
||
def postprocess_matrix(raw_6x8, pd_floors=None):
|
||
"""6x8 (AAA~B이하 x AAA~WR+D) -> 7x7 (AAA~B+D)
|
||
|
||
Steps:
|
||
1. PD floor correction: if observed PD < floor, transfer from WR to D
|
||
2. Remaining WR -> proportional redistribution
|
||
3. B이하 -> B mapping
|
||
4. Add D row (absorbing state)
|
||
5. Normalize rows to sum=1
|
||
"""
|
||
assert raw_6x8.shape == (6, 8), f"Expected (6,8), got {raw_6x8.shape}"
|
||
|
||
mat = raw_6x8.copy()
|
||
COL_D = 6
|
||
COL_WR = 7
|
||
|
||
# Step 1: PD floor correction (WR -> D transfer)
|
||
if pd_floors is not None:
|
||
for i in range(6):
|
||
broad = _BROAD_GRADE_MAP_6[i]
|
||
if broad not in pd_floors:
|
||
continue
|
||
floor_pct = pd_floors[broad] * 100 # decimal -> %
|
||
observed_pd = mat[i, COL_D]
|
||
wr_available = mat[i, COL_WR]
|
||
if observed_pd < floor_pct and wr_available > 0:
|
||
deficit = floor_pct - observed_pd
|
||
transfer = min(deficit, wr_available)
|
||
mat[i, COL_D] += transfer
|
||
mat[i, COL_WR] -= transfer
|
||
|
||
# Step 2: Remaining WR -> proportional redistribution
|
||
for i in range(6):
|
||
wr_remaining = mat[i, COL_WR]
|
||
if wr_remaining > 0:
|
||
non_wr_cols = mat[i, :7]
|
||
non_wr_sum = non_wr_cols.sum()
|
||
if non_wr_sum > 0:
|
||
mat[i, :7] = non_wr_cols * (non_wr_sum + wr_remaining) / non_wr_sum
|
||
mat[i, COL_WR] = 0.0
|
||
|
||
# Step 3: B이하 -> B mapping + build 7x7
|
||
mat_7x7 = np.zeros((7, 7))
|
||
for i in range(6):
|
||
for j in range(6):
|
||
mat_7x7[i, j] = mat[i, j]
|
||
mat_7x7[i, 6] = mat[i, COL_D]
|
||
|
||
# Step 4: D row (absorbing state)
|
||
mat_7x7[6, :] = 0.0
|
||
mat_7x7[6, 6] = 100.0
|
||
|
||
# Step 5: Convert to probability and normalize
|
||
mat_7x7 /= 100.0
|
||
for i in range(7):
|
||
s = mat_7x7[i].sum()
|
||
if s > 0:
|
||
mat_7x7[i] /= s
|
||
|
||
return mat_7x7
|
||
|
||
|
||
# ============================================================
|
||
# 메인
|
||
# ============================================================
|
||
def main():
|
||
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
||
|
||
# Load PD floors
|
||
broad_floors = None
|
||
try:
|
||
import sys as _sys
|
||
_sys.path.insert(0, str(BASE_DIR))
|
||
from data.pd_floor import build_complete_pd_floor_table
|
||
broad_floors, _, _ = build_complete_pd_floor_table()
|
||
print(f" PD floor loaded: {', '.join(f'{g}={v*10000:.1f}bp' for g, v in broad_floors.items())}")
|
||
except Exception as e:
|
||
print(f" PD floor load failed ({e}), proceeding without floor")
|
||
|
||
all_matrices = {}
|
||
|
||
for agency, pdf_path in PDF_FILES.items():
|
||
print(f"\n{'='*60}")
|
||
print(f" Parsing: {agency} ({pdf_path.name})")
|
||
print(f"{'='*60}")
|
||
|
||
if not pdf_path.exists():
|
||
print(f" ERROR: File not found")
|
||
continue
|
||
|
||
# 파서 선택
|
||
if agency == "NICE":
|
||
raw = parse_nice(pdf_path)
|
||
elif agency == "KR":
|
||
raw = parse_kr(pdf_path)
|
||
else: # SCI
|
||
raw = parse_via_tables(pdf_path)
|
||
|
||
print(f" Extracted {len(raw)} matrices: {sorted(raw.keys())}")
|
||
|
||
# 샘플 출력
|
||
for sample_year in [1998, 2009, 2025]:
|
||
if sample_year in raw:
|
||
labels = ["AAA", "AA", "A", "BBB", "BB", "B_below"]
|
||
print(f"\n Raw {sample_year}:")
|
||
for idx, g in enumerate(labels):
|
||
print(f" {g:>7}: [{', '.join(f'{v:7.2f}' for v in raw[sample_year][idx])}]")
|
||
|
||
# 후처리 + CSV 저장
|
||
processed = {}
|
||
for year, raw_mat in sorted(raw.items()):
|
||
try:
|
||
processed[year] = postprocess_matrix(raw_mat, pd_floors=broad_floors)
|
||
except Exception as e:
|
||
print(f" ERROR {year}: {e}")
|
||
|
||
all_matrices[agency] = processed
|
||
print(f" Processed {len(processed)} matrices")
|
||
|
||
for year, mat in processed.items():
|
||
df = pd.DataFrame(mat, index=MODEL_GRADES_7, columns=MODEL_GRADES_7)
|
||
df.to_csv(OUTPUT_DIR / f"{agency}_{year}.csv", float_format="%.6f")
|
||
|
||
# 3사 평균
|
||
print(f"\n{'='*60}")
|
||
print(f" Computing 3-agency average")
|
||
print(f"{'='*60}")
|
||
|
||
agency_names = list(all_matrices.keys())
|
||
common_years = sorted(set.intersection(
|
||
*[set(all_matrices[a].keys()) for a in agency_names]
|
||
)) if len(agency_names) >= 2 else []
|
||
|
||
print(f" Common years: {len(common_years)}")
|
||
if common_years:
|
||
print(f" Range: {common_years[0]}~{common_years[-1]}")
|
||
|
||
for year in common_years:
|
||
avg = np.mean([all_matrices[a][year] for a in agency_names], axis=0)
|
||
for i in range(7):
|
||
s = avg[i].sum()
|
||
if s > 0:
|
||
avg[i] /= s
|
||
df = pd.DataFrame(avg, index=MODEL_GRADES_7, columns=MODEL_GRADES_7)
|
||
df.to_csv(OUTPUT_DIR / f"AVG_{year}.csv", float_format="%.6f")
|
||
|
||
# PD 요약
|
||
print(f"\n{'='*60}")
|
||
print(f" PD Summary")
|
||
print(f"{'='*60}")
|
||
|
||
print(f"\n {'':>6}", end='')
|
||
for a in agency_names:
|
||
print(f" {a:>10}", end='')
|
||
if common_years:
|
||
print(f" {'AVG':>10}", end='')
|
||
print()
|
||
|
||
for sample_year in [2000, 2009, 2020, 2025]:
|
||
if sample_year not in common_years and not any(sample_year in all_matrices[a] for a in agency_names):
|
||
continue
|
||
print(f"\n Year {sample_year}:")
|
||
for gi, grade in enumerate(MODEL_GRADES_7[:-1]):
|
||
print(f" {grade:>5}:", end='')
|
||
for a in agency_names:
|
||
if sample_year in all_matrices[a]:
|
||
pd_val = all_matrices[a][sample_year][gi, -1] * 100
|
||
print(f" {pd_val:9.3f}%", end='')
|
||
else:
|
||
print(f" {'N/A':>10}", end='')
|
||
if sample_year in common_years:
|
||
avg_f = OUTPUT_DIR / f"AVG_{sample_year}.csv"
|
||
avg_df = pd.read_csv(avg_f, index_col=0)
|
||
print(f" {avg_df.loc[grade, 'D']*100:9.3f}%", end='')
|
||
print()
|
||
|
||
print(f"\n Output: {OUTPUT_DIR}")
|
||
print(f" Total CSV files: {len(list(OUTPUT_DIR.glob('*.csv')))}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|