1065 lines
42 KiB
Python
1065 lines
42 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
YouTube Tab → PDF 캡처 파이프라인
|
||
YouTube 기타 TAB 영상에서 Tab 프레임을 추출하여 깔끔한 A4 PDF 악보로 만듭니다.
|
||
|
||
사용법:
|
||
python youtube_tab_to_pdf.py "https://youtu.be/VIDEO_ID"
|
||
python youtube_tab_to_pdf.py "https://youtu.be/VIDEO_ID" -o output.pdf --debug
|
||
"""
|
||
|
||
import argparse
|
||
import os
|
||
import sys
|
||
import subprocess
|
||
import shutil
|
||
import re
|
||
from pathlib import Path
|
||
from typing import List, Tuple, Optional
|
||
|
||
import cv2
|
||
from video_cv_tracker import TemporalTracker
|
||
import numpy as np
|
||
import img2pdf
|
||
from PIL import Image
|
||
|
||
_ocr_reader = None
|
||
|
||
def _get_ocr_reader():
|
||
global _ocr_reader
|
||
if _ocr_reader is None:
|
||
print(" → EasyOCR 모델 로딩 중 (초회 1번)...")
|
||
try:
|
||
import easyocr
|
||
_ocr_reader = easyocr.Reader(['en'])
|
||
except ImportError:
|
||
print(" [경고] easyocr 라이브러리가 없습니다. OCR 중복 검증을 건너뜁니다.")
|
||
return None
|
||
return _ocr_reader
|
||
|
||
def _extract_number_above_bars(crop: np.ndarray) -> Optional[int]:
|
||
"""오선지 | 마디선 바로 위에 찍힌 번호(또는 좌측 여백의 번호)를 현미경 크롭하여 OCR로 판독합니다."""
|
||
reader = _get_ocr_reader()
|
||
if not reader: return None
|
||
|
||
gray = cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY) if len(crop.shape) == 3 else crop.copy()
|
||
h, w = gray.shape
|
||
|
||
# 1. 오선지 Top Line 탐색
|
||
_, thresh = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY_INV)
|
||
row_sums = np.sum(thresh, axis=1) / 255.0
|
||
staff_rows = np.where(row_sums > w * 0.4)[0]
|
||
|
||
if len(staff_rows) == 0: return None
|
||
staff_top = staff_rows[0]
|
||
|
||
# 2. 구조적 마디선(|) 검출 (기존 로직 활용)
|
||
bars = _detect_measure_bars(gray)
|
||
|
||
candidates = []
|
||
# 마디 번호는 보통 맨 좌측 (5~10% 위치)이나 시작 바(|) 바로 위에 존재함
|
||
scan_x_anchors = [int(w * 0.05), int(w * 0.1)] + bars
|
||
|
||
for x in scan_x_anchors:
|
||
# | 기준 위쪽 70픽셀 내외, 좌우 40픽셀의 좁은 영역만 집중 크롭
|
||
# 버그 픽스: y2를 staff_top + 10으로 하면 기타 1번줄(High E)의 프렛 번호(0,1,2,3..)가 검출되어
|
||
# 마디 번호로 오인되는 치명적 버그 발생! 무조건 오선지 위쪽(staff_top - 5)에서 컷해야 함.
|
||
y1 = max(0, staff_top - 80)
|
||
y2 = staff_top - 5
|
||
x1 = max(0, x - 40)
|
||
x2 = min(w, x + 40)
|
||
|
||
region = gray[y1:y2, x1:x2]
|
||
if region.size == 0 or region.shape[0] < 15 or region.shape[1] < 15:
|
||
continue
|
||
|
||
# 인식률 극대화를 위해 3배 업스케일 및 이진화
|
||
upscaled = cv2.resize(region, (0, 0), fx=3.0, fy=3.0, interpolation=cv2.INTER_CUBIC)
|
||
_, upscaled_thresh = cv2.threshold(upscaled, 150, 255, cv2.THRESH_BINARY_INV)
|
||
|
||
results = reader.readtext(upscaled_thresh, allowlist='0123456789')
|
||
for _, text, conf in results:
|
||
if conf > 0.35 and text.isdigit():
|
||
num = int(text)
|
||
if num < 500: # 500마디가 넘는 곡은 희귀하므로 이상치 거름
|
||
candidates.append(num)
|
||
|
||
if candidates:
|
||
return min(candidates) # 한 페이지에 여러 마디 번호가 잡히면 가장 첫(작은) 번호를 대표로 삼음
|
||
return None
|
||
|
||
# Windows 콘솔 인코딩
|
||
if sys.platform == "win32":
|
||
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
|
||
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
|
||
|
||
|
||
# ─── 설정 ─────────────────────────────────────────────────────────────────
|
||
|
||
DEFAULT_FPS = 2
|
||
SIMILARITY_THRESHOLD = 0.95
|
||
OVERLAY_SIMILARITY_THRESHOLD = 0.55
|
||
|
||
OVERLAY_MIN_AREA_RATIO = 0.05
|
||
OVERLAY_MAX_AREA_RATIO = 0.6
|
||
MIN_TAB_LINES = 4
|
||
|
||
# 프레임 추출 시 최대 폭 (1080p→1280p 다운스케일로 메모리 세이브)
|
||
MAX_FRAME_WIDTH = 1280
|
||
# 검출용 업스케일 폭 (360p→960px, 1.5x → Tab 라인 두꺼워짐)
|
||
DETECT_WIDTH = 960
|
||
|
||
PDF_DPI = 150
|
||
PDF_PAGE_WIDTH_MM = 210
|
||
PDF_PAGE_HEIGHT_MM = 297
|
||
PDF_MARGIN_MM = 10
|
||
TAB_GAP_MM = 3
|
||
|
||
|
||
# ─── Step 1: 다운로드 ─────────────────────────────────────────────────────
|
||
|
||
def _find_yt_dlp() -> str:
|
||
yt_dlp = shutil.which("yt-dlp")
|
||
if yt_dlp:
|
||
return yt_dlp
|
||
for pyver in ["Python312", "Python311", "Python310"]:
|
||
p = Path(os.environ.get("APPDATA", "")) / "Python" / pyver / "Scripts" / "yt-dlp.exe"
|
||
if p.exists():
|
||
return str(p)
|
||
p = Path(sys.executable).parent / "Scripts" / "yt-dlp.exe"
|
||
if p.exists():
|
||
return str(p)
|
||
raise RuntimeError("yt-dlp를 찾을 수 없습니다. pip install yt-dlp")
|
||
|
||
|
||
def download_video(url: str, output_dir: Path) -> Tuple[Path, str]:
|
||
"""영상 다운로드 (1080p 우선)"""
|
||
print("[1/5] 영상 다운로드 중...")
|
||
yt_dlp = _find_yt_dlp()
|
||
|
||
result = subprocess.run(
|
||
[yt_dlp, "--get-title", "--encoding", "utf-8", url],
|
||
capture_output=True, encoding="utf-8", errors="replace"
|
||
)
|
||
title = (result.stdout or "").strip() or "untitled"
|
||
safe_title = re.sub(r'[\\/:*?"<>|\x00-\x1f]', '_', title)[:80]
|
||
video_path = output_dir / f"{safe_title}.mp4"
|
||
|
||
if video_path.exists():
|
||
print(f" → 이미 다운로드됨: {video_path.name}")
|
||
return video_path, safe_title
|
||
|
||
# 영상 추출 처리(CV)만 필요하므로, ffmpeg 병합이 불필요한 video-only 고화질 포맷(720p)을 직접 요청하여 360p 강등을 방지
|
||
subprocess.run(
|
||
[yt_dlp,
|
||
"-f", "bestvideo[ext=mp4]",
|
||
"-S", "res:720",
|
||
"-o", str(video_path), url],
|
||
encoding="utf-8", errors="replace", check=True
|
||
)
|
||
print(f" → 다운로드 완료: {video_path.name}")
|
||
return video_path, safe_title
|
||
|
||
|
||
# ─── Step 2: 프레임 추출 ──────────────────────────────────────────────────
|
||
|
||
def extract_frames(video_path: Path, fps: float = DEFAULT_FPS) -> List[np.ndarray]:
|
||
print(f"[2/5] 프레임 추출 중 (fps={fps})...")
|
||
cap = cv2.VideoCapture(str(video_path))
|
||
if not cap.isOpened():
|
||
raise RuntimeError(f"영상을 열 수 없습니다: {video_path}")
|
||
|
||
video_fps = cap.get(cv2.CAP_PROP_FPS)
|
||
total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||
interval = max(1, int(video_fps / fps))
|
||
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||
|
||
# 4K 이상 → 1080p 다운스케일 (OOM 방지)
|
||
need_resize = w > MAX_FRAME_WIDTH
|
||
if need_resize:
|
||
scale = MAX_FRAME_WIDTH / w
|
||
target_size = (MAX_FRAME_WIDTH, int(h * scale))
|
||
print(f" → {w}x{h} → {target_size[0]}x{target_size[1]} 다운스케일")
|
||
|
||
frames = []
|
||
idx = 0
|
||
while True:
|
||
ret, frame = cap.read()
|
||
if not ret:
|
||
break
|
||
if idx % interval == 0:
|
||
if need_resize:
|
||
frame = cv2.resize(frame, target_size, interpolation=cv2.INTER_AREA)
|
||
frames.append(frame)
|
||
if len(frames) % 50 == 0:
|
||
print(f" ... {len(frames)}번째 프레임 추출 진행 중...", flush=True)
|
||
idx += 1
|
||
|
||
cap.release()
|
||
print(f" → {len(frames)}개 프레임 추출 ({w}x{h}, 원본 {video_fps:.0f}fps)")
|
||
return frames
|
||
|
||
|
||
# ─── 핵심: 흰색 배경 Tab 영역 검출 ───────────────────────────────────────
|
||
|
||
def _find_white_tab_strip(frame: np.ndarray, min_strip_ratio: float = 0.10, mode: str = "largest") -> Optional[Tuple[int, int]]:
|
||
"""프레임에서 흰색 배경의 Tab 스트립 영역의 Y범위(top, bottom)를 반환.
|
||
|
||
mode="largest": 가장 큰 하나의 스트립만 반환 (연속 스크롤용)
|
||
mode="union": 최상단 스트립부터 최하단 스트립까지 전체를 포괄하여 반환 (오버레이용 다중 줄 보존)
|
||
"""
|
||
h, w = frame.shape[:2]
|
||
margin_x = int(w * 0.1)
|
||
|
||
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
|
||
_, s_ch, v_ch = cv2.split(hsv)
|
||
|
||
roi_v = v_ch[:, margin_x:w - margin_x]
|
||
roi_s = s_ch[:, margin_x:w - margin_x]
|
||
|
||
pure_white = (roi_v > 180) & (roi_s < 40)
|
||
bright_pastel = (roi_v > 200) & (roi_s < 100)
|
||
tab_mask = pure_white | bright_pastel
|
||
|
||
row_tab_ratio = np.mean(tab_mask, axis=1)
|
||
bright_mask = row_tab_ratio > 0.5
|
||
|
||
max_gap = int(h * 0.02)
|
||
regions = []
|
||
start = None
|
||
gap_count = 0
|
||
for i in range(h):
|
||
if bright_mask[i]:
|
||
if start is None: start = i
|
||
gap_count = 0
|
||
else:
|
||
if start is not None:
|
||
gap_count += 1
|
||
if gap_count > max_gap:
|
||
length = (i - gap_count) - start
|
||
if length >= h * min_strip_ratio:
|
||
regions.append((start, i - gap_count))
|
||
start = None
|
||
if start is not None:
|
||
length = (h - gap_count) - start
|
||
if length >= h * min_strip_ratio:
|
||
regions.append((start, h - gap_count))
|
||
|
||
if not regions: return None
|
||
|
||
pad_top = int(h * 0.15)
|
||
pad_bottom = int(h * 0.03)
|
||
|
||
if mode == "union":
|
||
top = max(0, min(r[0] for r in regions) - pad_top)
|
||
bottom = min(h, max(r[1] for r in regions) + pad_bottom)
|
||
return (top, bottom)
|
||
|
||
# largest
|
||
best = max(regions, key=lambda r: r[1] - r[0])
|
||
top = max(0, best[0] - pad_top)
|
||
bottom = min(h, best[1] + pad_bottom)
|
||
return (top, bottom)
|
||
|
||
|
||
def _trim_to_content(crop: np.ndarray, margin_px: int = 6) -> np.ndarray:
|
||
"""넓게 크롭된 Tab 이미지에서 Tab 콘텐츠 영역만 정밀 트림.
|
||
|
||
전략: HSV 기반으로 각 행의 '흰색 배경 비율'을 계산.
|
||
- Tab 영역: 30~95%가 흰색 (흰 배경 + Tab 라인/숫자)
|
||
- 기타 영상: 흰색 < 20% (어두운 배경)
|
||
- 순수 여백: 흰색 > 97%
|
||
이를 통해 상/하단의 기타 영상과 빈 여백 모두 제거."""
|
||
h, w = crop.shape[:2]
|
||
if h < 15 or w < 50:
|
||
return crop
|
||
|
||
hsv = cv2.cvtColor(crop, cv2.COLOR_BGR2HSV)
|
||
_, s_ch, v_ch = cv2.split(hsv)
|
||
|
||
# 흰색/밝은 파스텔 픽셀 비율 (Tab 배경 감지)
|
||
white_mask = ((v_ch > 180) & (s_ch < 40)) | ((v_ch > 200) & (s_ch < 100))
|
||
row_white = np.mean(white_mask, axis=1)
|
||
|
||
# Tab 행 = 흰색 비율 30~97% (라인/숫자 + 흰 배경)
|
||
tab_rows = (row_white > 0.30) & (row_white < 0.97)
|
||
|
||
# 콘텐츠 존재 확인 (어두운 픽셀 > 0.2%) - 마디번호 같이 아주 작은 숫자도 보존하기 위해 스레스홀드 극단적 하향
|
||
gray = cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY) if len(crop.shape) == 3 else crop
|
||
row_dark = np.mean(gray < 180, axis=1)
|
||
content_rows = row_dark > 0.002
|
||
|
||
# Tab 행 OR 콘텐츠 행
|
||
valid_rows = tab_rows | content_rows
|
||
|
||
# 상단: 첫 번째 유효 행
|
||
top = 0
|
||
for i in range(h):
|
||
if valid_rows[i] and row_white[i] > 0.20:
|
||
top = max(0, i - 140) # 마디번호 및 ┌─1 (도돌이표) 보존을 위해 역대급으로 넉넉한 140px 보호 여백 부여
|
||
break
|
||
|
||
# 하단: 마지막 유효 행
|
||
bottom = h
|
||
for i in range(h - 1, -1, -1):
|
||
if valid_rows[i] and row_white[i] > 0.20:
|
||
bottom = min(h, i + margin_px)
|
||
break
|
||
|
||
if bottom - top < 15:
|
||
return crop
|
||
|
||
return crop[top:bottom, :]
|
||
|
||
|
||
def _has_tab_content(region: np.ndarray) -> bool:
|
||
"""흰색 영역 내에 실제 Tab 내용이 있는지 검증.
|
||
방법: 흰색 배경 위의 어두운 픽셀(Tab 라인, 숫자, 코드명) 비율을 확인.
|
||
Tab 영역은 일반적으로 3~25%의 어두운 콘텐츠를 포함."""
|
||
if region is None or region.size == 0:
|
||
return False
|
||
|
||
gray = cv2.cvtColor(region, cv2.COLOR_BGR2GRAY) if len(region.shape) == 3 else region
|
||
h, w = gray.shape
|
||
if h < 15 or w < 50:
|
||
return False
|
||
|
||
# 어두운 픽셀 비율 (< 180 = 라인/숫자/코드 등)
|
||
dark_pixels = np.sum(gray < 180)
|
||
dark_ratio = dark_pixels / gray.size
|
||
|
||
# Tab 영역: 3~25%가 어두운 콘텐츠 (순수 흰 배경이면 < 1%, 기타 영상이면 > 30%)
|
||
return 0.02 < dark_ratio < 0.30
|
||
|
||
|
||
# ─── Step 3: 패턴 감지 ────────────────────────────────────────────────────
|
||
|
||
def _detect_tab_overlay(frame: np.ndarray) -> Optional[Tuple[int, int, int, int]]:
|
||
"""Tab을 포함한 흰색 오버레이 박스 검출"""
|
||
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||
h, w = gray.shape
|
||
|
||
_, thresh = cv2.threshold(gray, 220, 255, cv2.THRESH_BINARY)
|
||
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (15, 15))
|
||
closed = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
|
||
contours, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||
|
||
total_area = h * w
|
||
best = None
|
||
best_area = 0
|
||
|
||
for cnt in contours:
|
||
x, y, cw, ch = cv2.boundingRect(cnt)
|
||
area = cw * ch
|
||
ratio = area / total_area
|
||
# 오버레이 = 프레임 폭의 85% 미만인 독립 박스 (전폭 스트립은 scroll)
|
||
width_ratio = cw / w
|
||
if (OVERLAY_MIN_AREA_RATIO < ratio < OVERLAY_MAX_AREA_RATIO
|
||
and width_ratio < 0.85
|
||
and cw > ch * 0.5 and area > best_area):
|
||
# Tab 내용 검증
|
||
region = frame[y:y + ch, x:x + cw]
|
||
if _has_tab_content(region):
|
||
best = (x, y, cw, ch)
|
||
best_area = area
|
||
|
||
return best
|
||
|
||
|
||
def detect_pattern(frames: List[np.ndarray], sample_count: int = 15) -> str:
|
||
print("[3/5] 영상 패턴 정밀 분석 중 (Motion Tracking)...")
|
||
if len(frames) < 30: return "scroll"
|
||
|
||
scroll_votes = 0
|
||
overlay_votes = 0
|
||
tab_bounds = None
|
||
for f in frames[::30]:
|
||
bounds = _find_white_tab_strip(f, mode="largest")
|
||
if bounds:
|
||
tab_bounds = bounds
|
||
break
|
||
|
||
if tab_bounds:
|
||
top, bottom = tab_bounds
|
||
else:
|
||
top, bottom = int(frames[0].shape[0]*0.2), int(frames[0].shape[0]*0.8) # Default
|
||
|
||
step = max(1, len(frames) // sample_count)
|
||
for i in range(2, len(frames)-1, step):
|
||
f1 = frames[i]
|
||
f2 = frames[i+1]
|
||
h, w = f1.shape[:2]
|
||
|
||
# 악보 영역 내부에서 높이의 중앙부분(잡음이 적은 곳)만 사용
|
||
crop_h = bottom - top
|
||
safe_top = int(top + crop_h * 0.2)
|
||
safe_bottom = int(top + crop_h * 0.8)
|
||
|
||
crop1 = f1[safe_top:safe_bottom, :]
|
||
crop2 = f2[safe_top:safe_bottom, :]
|
||
|
||
g1 = _extract_tracking_channel(crop1)
|
||
g2 = _extract_tracking_channel(crop2)
|
||
|
||
template_w = int(w * 0.5)
|
||
template = g1[:, w - template_w:]
|
||
|
||
res = cv2.matchTemplate(g2, template, cv2.TM_CCOEFF_NORMED)
|
||
_, max_val, _, max_loc = cv2.minMaxLoc(res)
|
||
|
||
scroll_px = (w - template_w) - max_loc[0]
|
||
|
||
# 강한 매칭이면서 스크롤이 없으면 정지된 페이지(overlay)
|
||
if max_val > 0.90 and scroll_px <= 1:
|
||
overlay_votes += 1
|
||
# 의미있는 매칭이면서 확연한 스크롤이 보이면 연속 스크롤(scroll)
|
||
elif max_val > 0.10 and scroll_px > 1:
|
||
scroll_votes += 1
|
||
else:
|
||
overlay_votes += 1
|
||
|
||
pattern = "scroll" if scroll_votes > overlay_votes else "overlay"
|
||
print(f" → 판단 패턴: {pattern} (Scroll:{scroll_votes}, Overlay/Static:{overlay_votes})")
|
||
return pattern
|
||
|
||
|
||
# ─── Step 4: 고유 Tab 프레임 추출 ─────────────────────────────────────────
|
||
|
||
def compare_frames(frame1: np.ndarray, frame2: np.ndarray) -> float:
|
||
"""MSE 기반 유사도 (0~1, 1=동일)"""
|
||
g1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY) if len(frame1.shape) == 3 else frame1
|
||
g2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY) if len(frame2.shape) == 3 else frame2
|
||
|
||
if g1.shape != g2.shape:
|
||
g2 = cv2.resize(g2, (g1.shape[1], g1.shape[0]))
|
||
|
||
target_w = 480
|
||
if g1.shape[1] > target_w:
|
||
scale = target_w / g1.shape[1]
|
||
sz = (target_w, int(g1.shape[0] * scale))
|
||
g1 = cv2.resize(g1, sz)
|
||
g2 = cv2.resize(g2, sz)
|
||
|
||
mse = np.mean(((g1.astype(np.float32) - g2.astype(np.float32)) / 255.0) ** 2)
|
||
return max(0.0, 1.0 - min(mse * 8.0, 1.0))
|
||
|
||
|
||
def _dhash(image: np.ndarray, hash_size: int = 32) -> np.ndarray:
|
||
"""Difference Hash — 구조 기반 해시 (32×32 = 1024비트).
|
||
인접 픽셀의 밝기 차이를 기록하여 위치 이동에 강건한 fingerprint 생성.
|
||
16→32 확대로 마디번호/음표 위치까지 구분 가능."""
|
||
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if len(image.shape) == 3 else image
|
||
resized = cv2.resize(gray, (hash_size + 1, hash_size), interpolation=cv2.INTER_AREA)
|
||
return (resized[:, 1:] > resized[:, :-1]).flatten()
|
||
|
||
|
||
def _dedup_by_hash(frames: List[np.ndarray],
|
||
max_hamming: int = 20) -> List[np.ndarray]:
|
||
"""pHash 기반 클러스터 중복 제거.
|
||
유사 프레임을 그룹핑하고, 각 그룹에서 가장 선명한(Laplacian 분산 최대) 1장만 선택.
|
||
→ 스크롤 중복 + 반복 연습 구간 모두 제거."""
|
||
if not frames:
|
||
return []
|
||
|
||
hashes = [_dhash(f) for f in frames]
|
||
n = len(frames)
|
||
used = [False] * n
|
||
clusters = []
|
||
|
||
for i in range(n):
|
||
if used[i]:
|
||
continue
|
||
cluster = [i]
|
||
used[i] = True
|
||
for j in range(i + 1, n):
|
||
if used[j]:
|
||
continue
|
||
dist = int(np.sum(hashes[i] != hashes[j]))
|
||
if dist <= max_hamming:
|
||
cluster.append(j)
|
||
used[j] = True
|
||
clusters.append(cluster)
|
||
|
||
# 각 클러스터에서 최고 선명도 프레임 선택
|
||
result = []
|
||
for cluster in clusters:
|
||
best_idx = max(cluster, key=lambda idx: cv2.Laplacian(
|
||
cv2.cvtColor(frames[idx], cv2.COLOR_BGR2GRAY)
|
||
if len(frames[idx].shape) == 3 else frames[idx],
|
||
cv2.CV_64F).var())
|
||
result.append(frames[best_idx])
|
||
|
||
return result
|
||
|
||
|
||
def _extract_print_channel(frame: np.ndarray) -> np.ndarray:
|
||
"""PDF 출력용 채널 (Red 채널): 노란색을 투명(White)하게 만듦"""
|
||
if len(frame.shape) != 3: return frame
|
||
return frame[:, :, 2]
|
||
|
||
def _extract_tracking_channel(frame: np.ndarray) -> np.ndarray:
|
||
"""트래킹 전용 채널: 유색 커서(빨강, 노랑 등) 및 배경 노이즈를 완벽히 투명화하고, 오직 순수한 검은색 음표와 오선지만을 마스킹하여 추출"""
|
||
if len(frame.shape) != 3:
|
||
return frame
|
||
|
||
# B, G, R 모두 120 미만인 어두운 픽셀(순수 블랙 및 진회색)만 True로 마스킹
|
||
# 빨간색(0, 0, 255)이나 노란색(0, 255, 255)은 R이나 G가 255이므로 완벽하게 걸러짐
|
||
black_mask = (frame[:,:,0] < 120) & (frame[:,:,1] < 120) & (frame[:,:,2] < 120)
|
||
|
||
# 흰 배경 위에 검은 음표만 그리기 (바이너리 이미지와 동일한 효과)
|
||
img = np.full_like(frame[:,:,0], 255)
|
||
img[black_mask] = 0
|
||
|
||
# OpenCV matchTemplate은 밝기 기준 매칭을 하므로, 이미지 전체를 반전시킬 필요 없이
|
||
# 이대로 넘기면 흰 바탕의 검은색 패턴 매칭이 정확히 일어남
|
||
return img
|
||
|
||
def _detect_scroll_offset(frame_a: np.ndarray, frame_b: np.ndarray, min_confidence: float = 0.1) -> Tuple[int, float]:
|
||
"""이전 프레임(A)과 현재 프레임(B) 사이의 X축 이동량(Scroll)을 추정합니다."""
|
||
h, w = frame_a.shape[:2]
|
||
|
||
gb = _extract_tracking_channel(frame_b)
|
||
ga = _extract_tracking_channel(frame_a)
|
||
|
||
template_w = int(w * 0.5)
|
||
template = ga[:, w - template_w:]
|
||
result = cv2.matchTemplate(gb, template, cv2.TM_CCOEFF_NORMED)
|
||
_, max_val, _, max_loc = cv2.minMaxLoc(result)
|
||
scroll_px = (w - template_w) - max_loc[0]
|
||
if max_val < min_confidence or scroll_px <= 0:
|
||
return (0, max_val)
|
||
return (scroll_px, max_val)
|
||
|
||
def _detect_measure_bars(gray_pano: np.ndarray) -> List[int]:
|
||
"""오직 기타 6현의 영역만 계산하여 세로로 쫙 채워진 마디 선(|)의 X좌표만 정밀하게 반환합니다."""
|
||
_, thresh = cv2.threshold(gray_pano, 200, 255, cv2.THRESH_BINARY_INV)
|
||
h, w = thresh.shape
|
||
row_sums = np.sum(thresh, axis=1) / 255
|
||
staff_rows = np.where(row_sums > w * 0.5)[0]
|
||
|
||
if len(staff_rows) < 2: return []
|
||
|
||
top_line = staff_rows[0]
|
||
bottom_line = top_line
|
||
for r in staff_rows:
|
||
if r - top_line > 100: break
|
||
bottom_line = r
|
||
|
||
staff_region = thresh[top_line:bottom_line+1, :]
|
||
expected_h = bottom_line - top_line + 1
|
||
if expected_h < 10: return []
|
||
|
||
col_sums = np.sum(staff_region, axis=0) / 255
|
||
bar_cols = np.where(col_sums >= expected_h * 0.8)[0]
|
||
|
||
measures = []
|
||
curr = []
|
||
for c in bar_cols:
|
||
if not curr: curr.append(c)
|
||
else:
|
||
# [BUG3 FIX] 클러스터 허용폭 10→30px (마디선은 보통 2~5px 폭 클러스터)
|
||
if c - curr[-1] < 30: curr.append(c)
|
||
else:
|
||
measures.append(int(np.mean(curr)))
|
||
curr = [c]
|
||
if curr: measures.append(int(np.mean(curr)))
|
||
# [BUG3 FIX] 100px 미만 간격 마디선 제거 (음표 기둥 오탐 방지)
|
||
measures = [x for i, x in enumerate(measures)
|
||
if i == 0 or x - measures[i-1] >= 100]
|
||
return measures
|
||
|
||
def _stamp_measure_number(measure_bgr: np.ndarray, num: int) -> np.ndarray:
|
||
"""마디 이미지 좌측 상단의 빈 공간에 자동으로 순차 진행 마디번호를 파란색 도장(Stamp)으로 찍습니다."""
|
||
text = f"[{num}]"
|
||
font = cv2.FONT_HERSHEY_SIMPLEX
|
||
font_scale = 0.7
|
||
thickness = 2
|
||
color = (200, 0, 0)
|
||
cv2.putText(measure_bgr, text, (15, 30), font, font_scale, color, thickness, cv2.LINE_AA)
|
||
return measure_bgr
|
||
|
||
def _stitch_scroll_segment(segment: List[np.ndarray]) -> np.ndarray:
|
||
if len(segment) == 1: return segment[0]
|
||
min_h = min(f.shape[0] for f in segment)
|
||
panorama = segment[0][:min_h, :]
|
||
for i in range(1, len(segment)):
|
||
curr = segment[i][:min_h, :]
|
||
scroll_px, conf = _detect_scroll_offset(segment[i-1][:min_h, :], curr, min_confidence=0.1)
|
||
if scroll_px > 0 and conf > 0.15:
|
||
new_strip = curr[:, curr.shape[1] - scroll_px:]
|
||
panorama = np.hstack([panorama, new_strip])
|
||
else:
|
||
panorama = np.hstack([panorama, curr])
|
||
return panorama
|
||
|
||
def _merge_scroll_candidates(candidates: List[np.ndarray], min_scroll: int = 5, min_segment_len: int = 2) -> List[np.ndarray]:
|
||
if len(candidates) <= 1: return candidates
|
||
result = []
|
||
current_segment = [candidates[0]]
|
||
prev_s_px = 0
|
||
prev_conf = 1.0
|
||
|
||
for i in range(1, len(candidates)):
|
||
prev_frame = candidates[i-1]
|
||
curr_frame = candidates[i]
|
||
s_px, conf = _detect_scroll_offset(prev_frame, curr_frame, min_confidence=0.1)
|
||
|
||
# [BUG1 FIX] 씬 전환 조건: conf 기반만 사용
|
||
# abs(s_px - prev_s_px) > 100 제거 — 스크롤 가속도를 씬전환으로 오탐하던 원인
|
||
is_cut = (conf <= 0.15) or (prev_conf - conf > 0.4)
|
||
|
||
if not is_cut:
|
||
current_segment.append(curr_frame)
|
||
else:
|
||
if len(current_segment) >= min_segment_len:
|
||
result.append(_stitch_scroll_segment(current_segment))
|
||
else:
|
||
result.extend(current_segment)
|
||
current_segment = [curr_frame]
|
||
|
||
prev_s_px = s_px
|
||
prev_conf = conf
|
||
|
||
if len(current_segment) >= min_segment_len:
|
||
result.append(_stitch_scroll_segment(current_segment))
|
||
else:
|
||
result.extend(current_segment)
|
||
|
||
return result
|
||
|
||
def _is_rewind_duplicate(query_bgr: np.ndarray, history_pano: np.ndarray) -> bool:
|
||
if history_pano is None: return False
|
||
h_gray = _extract_tracking_channel(history_pano)
|
||
qw = min(800, query_bgr.shape[1])
|
||
q_gray = _extract_tracking_channel(query_bgr[:, :qw])
|
||
|
||
if h_gray.shape[0] != q_gray.shape[0] or h_gray.shape[1] < q_gray.shape[1]: return False
|
||
|
||
res = cv2.matchTemplate(h_gray, q_gray, cv2.TM_CCOEFF_NORMED)
|
||
_, max_val, _, max_loc = cv2.minMaxLoc(res)
|
||
|
||
if max_val < 0.85: return False
|
||
|
||
match_x = max_loc[0]
|
||
|
||
# HEURISTIC 1: Is it just consecutive stitching from the immediate past?
|
||
# If it matched the very end of history (< 2500 pixels from the end), it's just normal scroll overlap!
|
||
if history_pano.shape[1] - match_x < 2500:
|
||
return False
|
||
|
||
# HEURISTIC 2: It matched deep in the past! It might be a rewind, OR it might be an identical Chorus.
|
||
# We must check the measure number to differentiate identical Chorus vs exact D.S. al Coda rewind.
|
||
|
||
qw_img = query_bgr[:, :qw]
|
||
gray_for_staff = cv2.cvtColor(qw_img, cv2.COLOR_BGR2GRAY) if len(qw_img.shape) == 3 else qw_img
|
||
_, bin_inv = cv2.threshold(gray_for_staff, 200, 255, cv2.THRESH_BINARY_INV)
|
||
row_sums = np.sum(bin_inv, axis=1) / 255.0
|
||
staff_rows = np.where(row_sums > qw * 0.4)[0]
|
||
|
||
if len(staff_rows) < 2: return False
|
||
staff_top = staff_rows[0]
|
||
|
||
box_y1 = max(0, staff_top - 60)
|
||
box_y2 = staff_top + 10
|
||
|
||
box_x2 = min(250, query_bgr.shape[1], history_pano.shape[1] - match_x)
|
||
|
||
q_num = query_bgr[box_y1:box_y2, 0:box_x2]
|
||
h_num = history_pano[box_y1:box_y2, match_x:match_x+box_x2]
|
||
|
||
if q_num.shape != h_num.shape or q_num.size == 0: return False
|
||
|
||
diff = cv2.absdiff(cv2.cvtColor(q_num, cv2.COLOR_BGR2GRAY), cv2.cvtColor(h_num, cv2.COLOR_BGR2GRAY))
|
||
mse = np.mean(diff ** 2)
|
||
|
||
if mse < 300.0:
|
||
return True
|
||
return False
|
||
|
||
def merge_panoramas_list(panoramas):
|
||
if not panoramas: return []
|
||
merged_list = []
|
||
current_master = panoramas[0].copy()
|
||
history_pano = current_master.copy()
|
||
rewind_state = False
|
||
|
||
for i in range(1, len(panoramas)):
|
||
next_pano = panoramas[i].copy()
|
||
|
||
if _is_rewind_duplicate(next_pano, history_pano):
|
||
print(" [Rewind Filter] D.S. al Coda or Backward Jump detected. Dropping redundant chronological playback.")
|
||
rewind_state = True
|
||
continue
|
||
|
||
if rewind_state:
|
||
print(" [Rewind Filter] Returning from rewind jump! Searching for novelty.")
|
||
merged_list.append(current_master)
|
||
current_master = next_pano
|
||
if current_master.shape[0] == history_pano.shape[0]:
|
||
history_pano = np.hstack([history_pano, next_pano])
|
||
rewind_state = False
|
||
continue
|
||
|
||
head_w = min(800, next_pano.shape[1])
|
||
head = next_pano[:, :head_w]
|
||
|
||
search_w = min(1500, current_master.shape[1])
|
||
search_region = current_master[:, -search_w:]
|
||
h_gray = _extract_tracking_channel(head)
|
||
s_gray = _extract_tracking_channel(search_region)
|
||
matched = False
|
||
if h_gray.shape[1] <= s_gray.shape[1] and h_gray.shape[0] == s_gray.shape[0]:
|
||
res = cv2.matchTemplate(s_gray, h_gray, cv2.TM_CCOEFF_NORMED)
|
||
_, max_val, _, max_loc = cv2.minMaxLoc(res)
|
||
|
||
if max_val > 0.50:
|
||
match_x_in_search = max_loc[0]
|
||
absolute_match_x = current_master.shape[1] - search_w + match_x_in_search
|
||
next_start_idx = current_master.shape[1] - absolute_match_x
|
||
if next_start_idx < next_pano.shape[1]:
|
||
append_part = next_pano[:, next_start_idx:]
|
||
if append_part.shape[1] > 0:
|
||
current_master = np.hstack([current_master, append_part])
|
||
if current_master.shape[0] == history_pano.shape[0]:
|
||
history_pano = np.hstack([history_pano, append_part])
|
||
matched = True
|
||
|
||
if not matched:
|
||
merged_list.append(current_master)
|
||
current_master = next_pano
|
||
if current_master.shape[0] == history_pano.shape[0]:
|
||
history_pano = np.hstack([history_pano, next_pano])
|
||
|
||
merged_list.append(current_master)
|
||
return merged_list
|
||
|
||
def _find_all_measure_bars_standalone(img_bgr: np.ndarray, max_width: int) -> List[int]:
|
||
cw = min(img_bgr.shape[1], max_width)
|
||
img_gray = cv2.cvtColor(img_bgr[:, :cw], cv2.COLOR_BGR2GRAY) if len(img_bgr.shape) == 3 else img_bgr
|
||
_, bin_inv = cv2.threshold(img_gray, 200, 255, cv2.THRESH_BINARY_INV)
|
||
row_sums = np.sum(bin_inv, axis=1) / 255.0
|
||
staff_rows = np.where(row_sums > cw * 0.4)[0]
|
||
if len(staff_rows) >= 6:
|
||
staff_y_top, staff_y_bottom = staff_rows[0], staff_rows[-1]
|
||
else:
|
||
staff_y_top, staff_y_bottom = int(img_bgr.shape[0] * 0.3), int(img_bgr.shape[0] * 0.8)
|
||
expected_h = max(10, staff_y_bottom - staff_y_top + 1)
|
||
staff_region = bin_inv[staff_y_top:staff_y_bottom+1, :]
|
||
col_sums = np.sum(staff_region, axis=0) / 255.0
|
||
bar_xs = np.where(col_sums >= expected_h * 0.6)[0]
|
||
grouped_bars = []
|
||
if len(bar_xs) > 0:
|
||
c = [bar_xs[0]]
|
||
for x in bar_xs[1:]:
|
||
if x - c[-1] <= 15: c.append(x)
|
||
else:
|
||
grouped_bars.append(int(np.mean(c)))
|
||
c = [x]
|
||
grouped_bars.append(int(np.mean(c)))
|
||
unique_bars = []
|
||
for p in grouped_bars:
|
||
if not unique_bars or p - unique_bars[-1] >= 50:
|
||
unique_bars.append(p)
|
||
return unique_bars
|
||
|
||
def tile_panoramas_to_a4(panoramas: List[np.ndarray], chunk_width: int=1800) -> List[np.ndarray]:
|
||
if not panoramas: return []
|
||
panorama = np.hstack(panoramas) if len(panoramas) > 1 else panoramas[0]
|
||
rows = []
|
||
x_curr = 0
|
||
total_w = panorama.shape[1]
|
||
while x_curr < total_w:
|
||
remaining_w = total_w - x_curr
|
||
if remaining_w <= chunk_width:
|
||
r = panorama[:, x_curr:]
|
||
if r.shape[1] > 50:
|
||
r_padded = cv2.copyMakeBorder(r, 0, 0, 0, chunk_width - r.shape[1], cv2.BORDER_CONSTANT, value=[255,255,255])
|
||
rows.append(r_padded)
|
||
break
|
||
slice_bgr = panorama[:, x_curr : min(x_curr + chunk_width + 100, total_w)]
|
||
bars = _find_all_measure_bars_standalone(slice_bgr, slice_bgr.shape[1])
|
||
valid_bars = [b for b in bars if 50 < b < chunk_width - 15]
|
||
cut_offset = (valid_bars[-1] - 10) if valid_bars else chunk_width
|
||
r = panorama[:, x_curr : x_curr + cut_offset]
|
||
r_padded = cv2.copyMakeBorder(r, 0, 0, 0, chunk_width - r.shape[1], cv2.BORDER_CONSTANT, value=[255,255,255])
|
||
rows.append(r_padded)
|
||
x_curr += cut_offset
|
||
return rows
|
||
|
||
def extract_unique_scroll(frames: List[np.ndarray], scan_dist: int = 4) -> List[np.ndarray]:
|
||
from video_cv_tracker import TemporalTracker
|
||
print("[Pipeline] Isolating static structures via TemporalTracker")
|
||
tracker = TemporalTracker(diff_threshold=0.05)
|
||
|
||
tab_bounds = None
|
||
for f in frames[::30]:
|
||
bounds = _find_white_tab_strip(f)
|
||
if bounds:
|
||
tab_bounds = bounds
|
||
break
|
||
|
||
if tab_bounds:
|
||
top, bottom = tab_bounds
|
||
print(f" -> Found precise sheet music bounds: Y={top} to Y={bottom}")
|
||
else:
|
||
top, bottom = 0, frames[0].shape[0]
|
||
|
||
for frame in frames:
|
||
roi = frame[top:bottom, :]
|
||
tracker.process_frame(roi)
|
||
|
||
unique_pages = tracker.get_unique_pages()
|
||
print(f"[Pipeline] Reduced down to {len(unique_pages)} static structural median pages.")
|
||
|
||
print(" -> 점프 컷 및 도돌이표 처리 중...")
|
||
panoramas = merge_panoramas_list(unique_pages)
|
||
|
||
print(" -> A4 타일링 포맷팅 중...")
|
||
return tile_panoramas_to_a4(panoramas, chunk_width=1800)
|
||
|
||
def extract_unique_overlay(frames: List[np.ndarray],
|
||
threshold: float = OVERLAY_SIMILARITY_THRESHOLD) -> List[np.ndarray]:
|
||
"""오버레이형: TemporalTracker 기반의 고해상도 페이지(단일 스트립 크롭) 추출 및 정밀 픽셀 중복 필터"""
|
||
from video_cv_tracker import TemporalTracker
|
||
print("[4/5] 정지형(Overlay) Tab 트래킹 및 고해상도 추출 중...")
|
||
|
||
tab_bounds = None
|
||
for f in frames[::30]:
|
||
bounds = _find_white_tab_strip(f, mode="largest")
|
||
if bounds:
|
||
tab_bounds = bounds
|
||
break
|
||
|
||
if tab_bounds:
|
||
top, bottom = tab_bounds
|
||
print(f" -> Found precise sheet music bounds: Y={top} to Y={bottom}")
|
||
else:
|
||
top, bottom = int(frames[0].shape[0]*0.2), int(frames[0].shape[0]*0.8)
|
||
|
||
# BGR2GRAY 대신 True Black 채널을 사용하므로, 붉은색 재생커서 이동을 완벽히 무시합니다.
|
||
# 따라서 악보가 물리적으로 넘어갈 때 발생하는 픽셀 변화(음표 교체)만 감지하게 되므로, 임계값을 0.03(3%)으로 극도로 낮춰 정밀도를 높입니다.
|
||
tracker = TemporalTracker(diff_threshold=0.03)
|
||
|
||
for frame in frames:
|
||
if top is not None and bottom is not None:
|
||
roi = frame[top:bottom, :]
|
||
else:
|
||
roi = frame
|
||
|
||
roi_tracking = _extract_tracking_channel(roi)
|
||
# 상하단 20%는 악보 밖이므로(예: 연주자 머리카락, 천장 등) 변화 감지에서 완전히 배제
|
||
h_r = roi_tracking.shape[0]
|
||
s_top = int(h_r * 0.20)
|
||
s_bot = int(h_r * 0.80)
|
||
roi_tracking[:s_top, :] = 255
|
||
roi_tracking[s_bot:, :] = 255
|
||
|
||
tracker.process_frame(roi, tracking_channel=roi_tracking)
|
||
|
||
pages = tracker.get_unique_pages()
|
||
print(f"[Tracker] {len(pages)}개의 최초 구분 페이지 추출됨. 전역 중복 페이지 병합 심사 중...")
|
||
|
||
unique = []
|
||
last_measure_num = -1
|
||
|
||
def _is_duplicate_cv(new_crop: np.ndarray, past_crops: List[np.ndarray]) -> bool:
|
||
crop_gray = _extract_tracking_channel(new_crop)
|
||
h_c, w_c = crop_gray.shape
|
||
crop_gray[:int(h_c * 0.20), :] = 255
|
||
crop_gray[int(h_c * 0.80):, :] = 255
|
||
|
||
for p_crop in past_crops:
|
||
past_gray = _extract_tracking_channel(p_crop)
|
||
past_gray[:int(h_c * 0.20), :] = 255
|
||
past_gray[int(h_c * 0.80):, :] = 255
|
||
|
||
template = crop_gray[10:h_c-10, 10:w_c-10]
|
||
if template.shape[0] > 0 and past_gray.shape[0] >= template.shape[0] and past_gray.shape[1] >= template.shape[1]:
|
||
res = cv2.matchTemplate(past_gray, template, cv2.TM_CCOEFF_NORMED)
|
||
_, max_val, _, _ = cv2.minMaxLoc(res)
|
||
if max_val > 0.90:
|
||
return True
|
||
return False
|
||
|
||
for crop in pages:
|
||
if np.mean(cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY)) < 80:
|
||
continue
|
||
|
||
m_num = _extract_number_above_bars(crop)
|
||
|
||
if m_num is not None:
|
||
# 1페이지당 통상 마디수 허용 범위 (+1 ~ +25 이내의 점진적 상승만 신뢰)
|
||
# 영상에서 마디번호가 10, 11, 12 ... 씩 1단위로 증가하거나, 1, 17, 33 처럼 페이지 단위로 증가하는 것을 상정
|
||
if last_measure_num < m_num <= last_measure_num + 25 or last_measure_num == -1:
|
||
print(f" > [Keep] 마디번호 {m_num} (안정적인 순차 상승 패턴)")
|
||
last_measure_num = m_num
|
||
unique.append(crop)
|
||
else:
|
||
# 역행하거나(+0 이하), 너무 크게 점프한 경우(+25 초과) => 반복 코러스이거나 OCR 환각(32, 1017 등)임
|
||
# CV 보완 추론을 통해 진짜 새로운 악보인지 확인!
|
||
if _is_duplicate_cv(crop, unique):
|
||
print(f" > [Skip] 측정 번호({m_num}) 이상 & CV 분석 결과 이전 페이지와 동일(반복 코러스). 버림.")
|
||
continue
|
||
else:
|
||
print(f" > [Keep] 측정 번호({m_num})는 OCR 환각이지만 CV 분석 결과 순수 새로운 페이지임! 번호 무시하고 채택.")
|
||
# OCR 값이 환각이므로 last_measure_num은 갱신하지 않고 징검다리를 연결
|
||
unique.append(crop)
|
||
else:
|
||
if not _is_duplicate_cv(crop, unique):
|
||
print(f" > [Keep] OCR 무효/실패. Pixel 변화량에 따른 완전 새로운 페이지 채택")
|
||
unique.append(crop)
|
||
else:
|
||
print(f" > [Skip] OCR 무효/실패 & Pixel 분석 결과 도돌이표 코러스로 판명. 버림.")
|
||
|
||
print(f" → 임시: {len(unique)}개 고유 오버레이 페이지 추출 성공. 상하단 여백 및 제목 정리 중...")
|
||
|
||
trimmed_unique = []
|
||
for crop in unique:
|
||
gray = cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY)
|
||
_, thresh = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY_INV)
|
||
row_sums = np.sum(thresh, axis=1) / 255.0
|
||
|
||
# 폭의 40% 이상 차지하는 검은 오선지 영역만 찾음 (정적 제목 등 배제)
|
||
h_c, w_c = crop.shape[:2]
|
||
staff_rows = np.where(row_sums > w_c * 0.4)[0]
|
||
if len(staff_rows) > 0:
|
||
# ┌─1 (반복 기호) 및 마디 번호 등을 충분히 보존할 수 있도록 파격적으로 130px 여유를 둡니다.
|
||
top_y = max(0, staff_rows[0] - 130)
|
||
bottom_y = min(h_c, staff_rows[-1] + 30)
|
||
trimmed_unique.append(crop[top_y:bottom_y, :])
|
||
else:
|
||
trimmed_unique.append(crop)
|
||
|
||
print(f" → 최종: {len(trimmed_unique)}개 정제된 오버레이 페이지 추출 성공")
|
||
return trimmed_unique
|
||
|
||
|
||
# ─── Step 5: A4 PDF 생성 ─────────────────────────────────────────────────
|
||
|
||
def generate_pdf(frames: List[np.ndarray], output_path: Path,
|
||
debug_dir: Optional[Path] = None) -> None:
|
||
"""Tab 프레임들을 A4 페이지에 여러 행으로 배치"""
|
||
print("[5/5] A4 PDF 생성 중...")
|
||
if not frames:
|
||
print(" ⚠ 프레임 없음!")
|
||
return
|
||
|
||
page_w = int(PDF_PAGE_HEIGHT_MM / 25.4 * PDF_DPI) # Landscape width
|
||
page_h = int(PDF_PAGE_WIDTH_MM / 25.4 * PDF_DPI) # Landscape height
|
||
margin = int(PDF_MARGIN_MM / 25.4 * PDF_DPI)
|
||
gap = int(TAB_GAP_MM / 25.4 * PDF_DPI)
|
||
content_w = page_w - 2 * margin
|
||
|
||
resized = []
|
||
for i, frame in enumerate(frames):
|
||
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||
img = Image.fromarray(rgb)
|
||
if debug_dir:
|
||
img.save(debug_dir / f"frame_{i:04d}.png")
|
||
scale = content_w / img.width
|
||
img_r = img.resize((content_w, int(img.height * scale)), Image.LANCZOS)
|
||
resized.append(img_r)
|
||
|
||
pages = []
|
||
cur_y = margin
|
||
page = Image.new('RGB', (page_w, page_h), (255, 255, 255))
|
||
|
||
for img in resized:
|
||
if cur_y + img.height > page_h - margin:
|
||
pages.append(page)
|
||
page = Image.new('RGB', (page_w, page_h), (255, 255, 255))
|
||
cur_y = margin
|
||
page.paste(img, (margin, cur_y))
|
||
cur_y += img.height + gap
|
||
|
||
if cur_y > margin + gap:
|
||
pages.append(page)
|
||
|
||
if not pages:
|
||
return
|
||
|
||
pages[0].save(str(output_path), save_all=True,
|
||
append_images=pages[1:], resolution=PDF_DPI)
|
||
print(f" → PDF: {len(resized)} Tab → {len(pages)} 페이지, {output_path.stat().st_size // 1024} KB")
|
||
|
||
|
||
def generate_long_image(chunks: List[np.ndarray], output_path: str):
|
||
if not chunks:
|
||
return
|
||
|
||
print(f"DEBUG: First chunk shape = {chunks[0].shape}, dtype = {chunks[0].dtype}")
|
||
# Calculate exact total height required
|
||
total_h = sum(chunk.shape[0] for chunk in chunks)
|
||
max_w = max(chunk.shape[1] for chunk in chunks)
|
||
|
||
# Ensure correct channel dimensions for the canvas to prevent squishing!
|
||
if len(chunks[0].shape) == 3:
|
||
canvas = np.full((total_h, max_w, 3), 255, dtype=np.uint8)
|
||
else:
|
||
canvas = np.full((total_h, max_w), 255, dtype=np.uint8)
|
||
|
||
y_offset = 0
|
||
for chunk in chunks:
|
||
h, w = chunk.shape[:2]
|
||
if len(chunk.shape) == 3 and len(canvas.shape) == 2:
|
||
canvas[y_offset:y_offset+h, :w] = cv2.cvtColor(chunk, cv2.COLOR_BGR2GRAY)
|
||
elif len(chunk.shape) == 2 and len(canvas.shape) == 3:
|
||
canvas[y_offset:y_offset+h, :w] = cv2.cvtColor(chunk, cv2.COLOR_GRAY2BGR)
|
||
else:
|
||
canvas[y_offset:y_offset+h, :w] = chunk
|
||
|
||
y_offset += h
|
||
|
||
cv2.imwrite(str(output_path), canvas)
|
||
|
||
|
||
# ─── Main ─────────────────────────────────────────────────────────────────
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="YouTube TAB → A4 PDF")
|
||
parser.add_argument("url", help="YouTube URL")
|
||
parser.add_argument("-o", "--output", help="출력 PDF 경로")
|
||
parser.add_argument("--fps", type=float, default=DEFAULT_FPS)
|
||
parser.add_argument("--similarity", type=float, default=None)
|
||
parser.add_argument("--pattern", choices=["auto", "scroll", "overlay"],
|
||
default="auto")
|
||
parser.add_argument("--debug", action="store_true")
|
||
args = parser.parse_args()
|
||
|
||
output_dir = Path("output")
|
||
output_dir.mkdir(exist_ok=True)
|
||
debug_dir = None
|
||
if args.debug:
|
||
debug_dir = output_dir / "debug_frames"
|
||
debug_dir.mkdir(exist_ok=True)
|
||
|
||
video_path, safe_title = download_video(args.url, output_dir)
|
||
frames = extract_frames(video_path, fps=args.fps)
|
||
if not frames:
|
||
print("❌ 프레임 추출 실패")
|
||
sys.exit(1)
|
||
|
||
pattern = detect_pattern(frames) if args.pattern == "auto" else args.pattern
|
||
|
||
if pattern == "scroll":
|
||
sim = args.similarity if args.similarity else SIMILARITY_THRESHOLD
|
||
unique = extract_unique_scroll(frames, threshold=sim)
|
||
else:
|
||
sim = args.similarity if args.similarity else OVERLAY_SIMILARITY_THRESHOLD
|
||
unique = extract_unique_overlay(frames, threshold=sim)
|
||
|
||
if not unique:
|
||
print("❌ 고유 Tab 프레임 없음. --similarity를 낮추거나 --pattern을 수동 지정하세요.")
|
||
sys.exit(1)
|
||
|
||
pdf_path = Path(args.output) if args.output else output_dir / f"{safe_title}.pdf"
|
||
generate_pdf(unique, pdf_path, debug_dir=debug_dir)
|
||
generate_long_image(unique, pdf_path.with_suffix(".png"))
|
||
|
||
print(f"\n✅ 완료! PDF: {pdf_path}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|