#!/usr/bin/env python3 """ YouTube Tab → PDF 캡처 파이프라인 YouTube 기타 TAB 영상에서 Tab 프레임을 추출하여 깔끔한 A4 PDF 악보로 만듭니다. 사용법: python youtube_tab_to_pdf.py "https://youtu.be/VIDEO_ID" python youtube_tab_to_pdf.py "https://youtu.be/VIDEO_ID" -o output.pdf --debug """ import argparse import os import sys import subprocess import shutil import re from pathlib import Path from typing import List, Tuple, Optional import cv2 from video_cv_tracker import TemporalTracker import numpy as np import img2pdf from PIL import Image _ocr_reader = None def _get_ocr_reader(): global _ocr_reader if _ocr_reader is None: print(" → EasyOCR 모델 로딩 중 (초회 1번)...") try: import easyocr _ocr_reader = easyocr.Reader(['en']) except ImportError: print(" [경고] easyocr 라이브러리가 없습니다. OCR 중복 검증을 건너뜁니다.") return None return _ocr_reader def _dedup_by_measure_number(frames: List[np.ndarray]) -> List[np.ndarray]: """OCR을 이용해 Tab 좌측 상단의 마디 번호를 읽고, 연속으로 동일한 번호가 검출되면 중복으로 간주하고 제거합니다.""" reader = _get_ocr_reader() if not reader: return frames print(f" → 마디번호 기반 3차 중복 검증 시작 ({len(frames)} 프레임)") unique = [] last_measure_num = None for i, frame in enumerate(frames): h, w = frame.shape[:2] gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) if len(frame.shape) == 3 else frame # 동적 투영(Projection)을 통해 첫 번째 오선지(Staff line)의 Y좌표 스캔 _, thresh = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY_INV) row_sums = np.sum(thresh, axis=1) / 255 # 폭의 50% 이상을 차지하는 검은 가로선을 오선지로 간주 staff_lines = np.where(row_sums > w * 0.5)[0] if len(staff_lines) > 0: first_line_y = staff_lines[0] # 오선지 바로 위 영역 45px ~ 오선지 까지 (여유공간 2px) + 좌측 8% 너비만 추출 (기타 코드 다이어그램 제외) crop_y_start = max(0, first_line_y - 45) crop_y_end = max(10, first_line_y - 2) crop = gray[crop_y_start:crop_y_end, :int(w * 0.08)] else: # 안전 장치: 오선지를 못 찾았을 경우 기존 하드코딩 비율 사용 crop = gray[:int(h * 0.25), :int(w * 0.08)] # 작은 마디번호의 인식률 극대화를 위해 3배 업스케일링 및 이진화 처리 upscaled = cv2.resize(crop, (0, 0), fx=3.0, fy=3.0, interpolation=cv2.INTER_CUBIC) _, upscaled_thresh = cv2.threshold(upscaled, 150, 255, cv2.THRESH_BINARY_INV) results = reader.readtext(upscaled_thresh, allowlist='0123456789') measure_num = None if results: # conf > 0.4 이면서 1~3자리의 숫자로만 이루어진 텍스트를 마디 번호로 간주 valid_results = [res[1] for res in results if res[2] > 0.4 and res[1].isdigit() and len(res[1]) <= 3] if valid_results: measure_num = valid_results[0] if measure_num is not None: if measure_num == last_measure_num: print(f" - 프레임 {i+1}: 마디번호 [{measure_num}] 중복 감지 (삭제)") continue last_measure_num = measure_num print(f" - 프레임 {i+1}: 마디번호 [{measure_num}] (유지)") else: print(f" - 프레임 {i+1}: 마디번호 미검출 (유지)") unique.append(frame) print(f" → OCR 3차: {len(unique)}개 고유 Tab 프레임") return unique # Windows 콘솔 인코딩 if sys.platform == "win32": sys.stdout.reconfigure(encoding="utf-8", errors="replace") sys.stderr.reconfigure(encoding="utf-8", errors="replace") # ─── 설정 ───────────────────────────────────────────────────────────────── DEFAULT_FPS = 2 SIMILARITY_THRESHOLD = 0.95 OVERLAY_SIMILARITY_THRESHOLD = 0.55 OVERLAY_MIN_AREA_RATIO = 0.05 OVERLAY_MAX_AREA_RATIO = 0.6 MIN_TAB_LINES = 4 # 프레임 추출 시 최대 폭 (1080p→1280p 다운스케일로 메모리 세이브) MAX_FRAME_WIDTH = 1280 # 검출용 업스케일 폭 (360p→960px, 1.5x → Tab 라인 두꺼워짐) DETECT_WIDTH = 960 PDF_DPI = 150 PDF_PAGE_WIDTH_MM = 210 PDF_PAGE_HEIGHT_MM = 297 PDF_MARGIN_MM = 10 TAB_GAP_MM = 3 # ─── Step 1: 다운로드 ───────────────────────────────────────────────────── def _find_yt_dlp() -> str: yt_dlp = shutil.which("yt-dlp") if yt_dlp: return yt_dlp for pyver in ["Python312", "Python311", "Python310"]: p = Path(os.environ.get("APPDATA", "")) / "Python" / pyver / "Scripts" / "yt-dlp.exe" if p.exists(): return str(p) p = Path(sys.executable).parent / "Scripts" / "yt-dlp.exe" if p.exists(): return str(p) raise RuntimeError("yt-dlp를 찾을 수 없습니다. pip install yt-dlp") def download_video(url: str, output_dir: Path) -> Tuple[Path, str]: """영상 다운로드 (1080p 우선)""" print("[1/5] 영상 다운로드 중...") yt_dlp = _find_yt_dlp() result = subprocess.run( [yt_dlp, "--get-title", "--encoding", "utf-8", url], capture_output=True, encoding="utf-8", errors="replace" ) title = (result.stdout or "").strip() or "untitled" safe_title = re.sub(r'[\\/:*?"<>|\x00-\x1f]', '_', title)[:80] video_path = output_dir / f"{safe_title}.mp4" if video_path.exists(): print(f" → 이미 다운로드됨: {video_path.name}") return video_path, safe_title # 영상 추출 처리(CV)만 필요하므로, ffmpeg 병합이 불필요한 video-only 고화질 포맷(720p)을 직접 요청하여 360p 강등을 방지 subprocess.run( [yt_dlp, "-f", "bestvideo[ext=mp4]", "-S", "res:720", "-o", str(video_path), url], encoding="utf-8", errors="replace", check=True ) print(f" → 다운로드 완료: {video_path.name}") return video_path, safe_title # ─── Step 2: 프레임 추출 ────────────────────────────────────────────────── def extract_frames(video_path: Path, fps: float = DEFAULT_FPS) -> List[np.ndarray]: print(f"[2/5] 프레임 추출 중 (fps={fps})...") cap = cv2.VideoCapture(str(video_path)) if not cap.isOpened(): raise RuntimeError(f"영상을 열 수 없습니다: {video_path}") video_fps = cap.get(cv2.CAP_PROP_FPS) total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) interval = max(1, int(video_fps / fps)) w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # 4K 이상 → 1080p 다운스케일 (OOM 방지) need_resize = w > MAX_FRAME_WIDTH if need_resize: scale = MAX_FRAME_WIDTH / w target_size = (MAX_FRAME_WIDTH, int(h * scale)) print(f" → {w}x{h} → {target_size[0]}x{target_size[1]} 다운스케일") frames = [] idx = 0 while True: ret, frame = cap.read() if not ret: break if idx % interval == 0: if need_resize: frame = cv2.resize(frame, target_size, interpolation=cv2.INTER_AREA) frames.append(frame) if len(frames) % 50 == 0: print(f" ... {len(frames)}번째 프레임 추출 진행 중...", flush=True) idx += 1 cap.release() print(f" → {len(frames)}개 프레임 추출 ({w}x{h}, 원본 {video_fps:.0f}fps)") return frames # ─── 핵심: 흰색 배경 Tab 영역 검출 ─────────────────────────────────────── def _find_white_tab_strip(frame: np.ndarray, min_strip_ratio: float = 0.10, mode: str = "largest") -> Optional[Tuple[int, int]]: """프레임에서 흰색 배경의 Tab 스트립 영역의 Y범위(top, bottom)를 반환. mode="largest": 가장 큰 하나의 스트립만 반환 (연속 스크롤용) mode="union": 최상단 스트립부터 최하단 스트립까지 전체를 포괄하여 반환 (오버레이용 다중 줄 보존) """ h, w = frame.shape[:2] margin_x = int(w * 0.1) hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) _, s_ch, v_ch = cv2.split(hsv) roi_v = v_ch[:, margin_x:w - margin_x] roi_s = s_ch[:, margin_x:w - margin_x] pure_white = (roi_v > 180) & (roi_s < 40) bright_pastel = (roi_v > 200) & (roi_s < 100) tab_mask = pure_white | bright_pastel row_tab_ratio = np.mean(tab_mask, axis=1) bright_mask = row_tab_ratio > 0.5 max_gap = int(h * 0.02) regions = [] start = None gap_count = 0 for i in range(h): if bright_mask[i]: if start is None: start = i gap_count = 0 else: if start is not None: gap_count += 1 if gap_count > max_gap: length = (i - gap_count) - start if length >= h * min_strip_ratio: regions.append((start, i - gap_count)) start = None if start is not None: length = (h - gap_count) - start if length >= h * min_strip_ratio: regions.append((start, h - gap_count)) if not regions: return None pad_top = int(h * 0.15) pad_bottom = int(h * 0.03) if mode == "union": top = max(0, min(r[0] for r in regions) - pad_top) bottom = min(h, max(r[1] for r in regions) + pad_bottom) return (top, bottom) # largest best = max(regions, key=lambda r: r[1] - r[0]) top = max(0, best[0] - pad_top) bottom = min(h, best[1] + pad_bottom) return (top, bottom) def _trim_to_content(crop: np.ndarray, margin_px: int = 6) -> np.ndarray: """넓게 크롭된 Tab 이미지에서 Tab 콘텐츠 영역만 정밀 트림. 전략: HSV 기반으로 각 행의 '흰색 배경 비율'을 계산. - Tab 영역: 30~95%가 흰색 (흰 배경 + Tab 라인/숫자) - 기타 영상: 흰색 < 20% (어두운 배경) - 순수 여백: 흰색 > 97% 이를 통해 상/하단의 기타 영상과 빈 여백 모두 제거.""" h, w = crop.shape[:2] if h < 15 or w < 50: return crop hsv = cv2.cvtColor(crop, cv2.COLOR_BGR2HSV) _, s_ch, v_ch = cv2.split(hsv) # 흰색/밝은 파스텔 픽셀 비율 (Tab 배경 감지) white_mask = ((v_ch > 180) & (s_ch < 40)) | ((v_ch > 200) & (s_ch < 100)) row_white = np.mean(white_mask, axis=1) # Tab 행 = 흰색 비율 30~97% (라인/숫자 + 흰 배경) tab_rows = (row_white > 0.30) & (row_white < 0.97) # 콘텐츠 존재 확인 (어두운 픽셀 > 0.2%) - 마디번호 같이 아주 작은 숫자도 보존하기 위해 스레스홀드 극단적 하향 gray = cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY) if len(crop.shape) == 3 else crop row_dark = np.mean(gray < 180, axis=1) content_rows = row_dark > 0.002 # Tab 행 OR 콘텐츠 행 valid_rows = tab_rows | content_rows # 상단: 첫 번째 유효 행 top = 0 for i in range(h): if valid_rows[i] and row_white[i] > 0.20: top = max(0, i - 120) # 상단 마디번호 보존을 위해 압도적인 120px 강제 보호 (숫자가 꽤 높이 떠있음) break # 하단: 마지막 유효 행 bottom = h for i in range(h - 1, -1, -1): if valid_rows[i] and row_white[i] > 0.20: bottom = min(h, i + margin_px) break if bottom - top < 15: return crop return crop[top:bottom, :] def _has_tab_content(region: np.ndarray) -> bool: """흰색 영역 내에 실제 Tab 내용이 있는지 검증. 방법: 흰색 배경 위의 어두운 픽셀(Tab 라인, 숫자, 코드명) 비율을 확인. Tab 영역은 일반적으로 3~25%의 어두운 콘텐츠를 포함.""" if region is None or region.size == 0: return False gray = cv2.cvtColor(region, cv2.COLOR_BGR2GRAY) if len(region.shape) == 3 else region h, w = gray.shape if h < 15 or w < 50: return False # 어두운 픽셀 비율 (< 180 = 라인/숫자/코드 등) dark_pixels = np.sum(gray < 180) dark_ratio = dark_pixels / gray.size # Tab 영역: 3~25%가 어두운 콘텐츠 (순수 흰 배경이면 < 1%, 기타 영상이면 > 30%) return 0.02 < dark_ratio < 0.30 # ─── Step 3: 패턴 감지 ──────────────────────────────────────────────────── def _detect_tab_overlay(frame: np.ndarray) -> Optional[Tuple[int, int, int, int]]: """Tab을 포함한 흰색 오버레이 박스 검출""" gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) h, w = gray.shape _, thresh = cv2.threshold(gray, 220, 255, cv2.THRESH_BINARY) kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (15, 15)) closed = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel) contours, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) total_area = h * w best = None best_area = 0 for cnt in contours: x, y, cw, ch = cv2.boundingRect(cnt) area = cw * ch ratio = area / total_area # 오버레이 = 프레임 폭의 85% 미만인 독립 박스 (전폭 스트립은 scroll) width_ratio = cw / w if (OVERLAY_MIN_AREA_RATIO < ratio < OVERLAY_MAX_AREA_RATIO and width_ratio < 0.85 and cw > ch * 0.5 and area > best_area): # Tab 내용 검증 region = frame[y:y + ch, x:x + cw] if _has_tab_content(region): best = (x, y, cw, ch) best_area = area return best def detect_pattern(frames: List[np.ndarray], sample_count: int = 15) -> str: print("[3/5] 영상 패턴 정밀 분석 중 (Motion Tracking)...") if len(frames) < 30: return "scroll" scroll_votes = 0 overlay_votes = 0 tab_bounds = None for f in frames[::30]: bounds = _find_white_tab_strip(f, mode="largest") if bounds: tab_bounds = bounds break if tab_bounds: top, bottom = tab_bounds else: top, bottom = int(frames[0].shape[0]*0.2), int(frames[0].shape[0]*0.8) # Default step = max(1, len(frames) // sample_count) for i in range(2, len(frames)-1, step): f1 = frames[i] f2 = frames[i+1] h, w = f1.shape[:2] # 악보 영역 내부에서 높이의 중앙부분(잡음이 적은 곳)만 사용 crop_h = bottom - top safe_top = int(top + crop_h * 0.2) safe_bottom = int(top + crop_h * 0.8) crop1 = f1[safe_top:safe_bottom, :] crop2 = f2[safe_top:safe_bottom, :] g1 = _extract_tracking_channel(crop1) g2 = _extract_tracking_channel(crop2) template_w = int(w * 0.5) template = g1[:, w - template_w:] res = cv2.matchTemplate(g2, template, cv2.TM_CCOEFF_NORMED) _, max_val, _, max_loc = cv2.minMaxLoc(res) scroll_px = (w - template_w) - max_loc[0] # 강한 매칭이면서 스크롤이 없으면 정지된 페이지(overlay) if max_val > 0.90 and scroll_px <= 1: overlay_votes += 1 # 의미있는 매칭이면서 확연한 스크롤이 보이면 연속 스크롤(scroll) elif max_val > 0.10 and scroll_px > 1: scroll_votes += 1 else: overlay_votes += 1 pattern = "scroll" if scroll_votes > overlay_votes else "overlay" print(f" → 판단 패턴: {pattern} (Scroll:{scroll_votes}, Overlay/Static:{overlay_votes})") return pattern # ─── Step 4: 고유 Tab 프레임 추출 ───────────────────────────────────────── def compare_frames(frame1: np.ndarray, frame2: np.ndarray) -> float: """MSE 기반 유사도 (0~1, 1=동일)""" g1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY) if len(frame1.shape) == 3 else frame1 g2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY) if len(frame2.shape) == 3 else frame2 if g1.shape != g2.shape: g2 = cv2.resize(g2, (g1.shape[1], g1.shape[0])) target_w = 480 if g1.shape[1] > target_w: scale = target_w / g1.shape[1] sz = (target_w, int(g1.shape[0] * scale)) g1 = cv2.resize(g1, sz) g2 = cv2.resize(g2, sz) mse = np.mean(((g1.astype(np.float32) - g2.astype(np.float32)) / 255.0) ** 2) return max(0.0, 1.0 - min(mse * 8.0, 1.0)) def _dhash(image: np.ndarray, hash_size: int = 32) -> np.ndarray: """Difference Hash — 구조 기반 해시 (32×32 = 1024비트). 인접 픽셀의 밝기 차이를 기록하여 위치 이동에 강건한 fingerprint 생성. 16→32 확대로 마디번호/음표 위치까지 구분 가능.""" gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if len(image.shape) == 3 else image resized = cv2.resize(gray, (hash_size + 1, hash_size), interpolation=cv2.INTER_AREA) return (resized[:, 1:] > resized[:, :-1]).flatten() def _dedup_by_hash(frames: List[np.ndarray], max_hamming: int = 20) -> List[np.ndarray]: """pHash 기반 클러스터 중복 제거. 유사 프레임을 그룹핑하고, 각 그룹에서 가장 선명한(Laplacian 분산 최대) 1장만 선택. → 스크롤 중복 + 반복 연습 구간 모두 제거.""" if not frames: return [] hashes = [_dhash(f) for f in frames] n = len(frames) used = [False] * n clusters = [] for i in range(n): if used[i]: continue cluster = [i] used[i] = True for j in range(i + 1, n): if used[j]: continue dist = int(np.sum(hashes[i] != hashes[j])) if dist <= max_hamming: cluster.append(j) used[j] = True clusters.append(cluster) # 각 클러스터에서 최고 선명도 프레임 선택 result = [] for cluster in clusters: best_idx = max(cluster, key=lambda idx: cv2.Laplacian( cv2.cvtColor(frames[idx], cv2.COLOR_BGR2GRAY) if len(frames[idx].shape) == 3 else frames[idx], cv2.CV_64F).var()) result.append(frames[best_idx]) return result def _extract_print_channel(frame: np.ndarray) -> np.ndarray: """PDF 출력용 채널 (Red 채널): 노란색을 투명(White)하게 만듦""" if len(frame.shape) != 3: return frame return frame[:, :, 2] def _extract_tracking_channel(frame: np.ndarray) -> np.ndarray: """트래킹 전용 채널: 유색 커서(빨강, 노랑 등) 및 배경 노이즈를 완벽히 투명화하고, 오직 순수한 검은색 음표와 오선지만을 마스킹하여 추출""" if len(frame.shape) != 3: return frame # B, G, R 모두 120 미만인 어두운 픽셀(순수 블랙 및 진회색)만 True로 마스킹 # 빨간색(0, 0, 255)이나 노란색(0, 255, 255)은 R이나 G가 255이므로 완벽하게 걸러짐 black_mask = (frame[:,:,0] < 120) & (frame[:,:,1] < 120) & (frame[:,:,2] < 120) # 흰 배경 위에 검은 음표만 그리기 (바이너리 이미지와 동일한 효과) img = np.full_like(frame[:,:,0], 255) img[black_mask] = 0 # OpenCV matchTemplate은 밝기 기준 매칭을 하므로, 이미지 전체를 반전시킬 필요 없이 # 이대로 넘기면 흰 바탕의 검은색 패턴 매칭이 정확히 일어남 return img def _detect_scroll_offset(frame_a: np.ndarray, frame_b: np.ndarray, min_confidence: float = 0.1) -> Tuple[int, float]: """이전 프레임(A)과 현재 프레임(B) 사이의 X축 이동량(Scroll)을 추정합니다.""" h, w = frame_a.shape[:2] gb = _extract_tracking_channel(frame_b) ga = _extract_tracking_channel(frame_a) template_w = int(w * 0.5) template = ga[:, w - template_w:] result = cv2.matchTemplate(gb, template, cv2.TM_CCOEFF_NORMED) _, max_val, _, max_loc = cv2.minMaxLoc(result) scroll_px = (w - template_w) - max_loc[0] if max_val < min_confidence or scroll_px <= 0: return (0, max_val) return (scroll_px, max_val) def _detect_measure_bars(gray_pano: np.ndarray) -> List[int]: """오직 기타 6현의 영역만 계산하여 세로로 쫙 채워진 마디 선(|)의 X좌표만 정밀하게 반환합니다.""" _, thresh = cv2.threshold(gray_pano, 200, 255, cv2.THRESH_BINARY_INV) h, w = thresh.shape row_sums = np.sum(thresh, axis=1) / 255 staff_rows = np.where(row_sums > w * 0.5)[0] if len(staff_rows) < 2: return [] top_line = staff_rows[0] bottom_line = top_line for r in staff_rows: if r - top_line > 100: break bottom_line = r staff_region = thresh[top_line:bottom_line+1, :] expected_h = bottom_line - top_line + 1 if expected_h < 10: return [] col_sums = np.sum(staff_region, axis=0) / 255 bar_cols = np.where(col_sums >= expected_h * 0.8)[0] measures = [] curr = [] for c in bar_cols: if not curr: curr.append(c) else: # [BUG3 FIX] 클러스터 허용폭 10→30px (마디선은 보통 2~5px 폭 클러스터) if c - curr[-1] < 30: curr.append(c) else: measures.append(int(np.mean(curr))) curr = [c] if curr: measures.append(int(np.mean(curr))) # [BUG3 FIX] 100px 미만 간격 마디선 제거 (음표 기둥 오탐 방지) measures = [x for i, x in enumerate(measures) if i == 0 or x - measures[i-1] >= 100] return measures def _stamp_measure_number(measure_bgr: np.ndarray, num: int) -> np.ndarray: """마디 이미지 좌측 상단의 빈 공간에 자동으로 순차 진행 마디번호를 파란색 도장(Stamp)으로 찍습니다.""" text = f"[{num}]" font = cv2.FONT_HERSHEY_SIMPLEX font_scale = 0.7 thickness = 2 color = (200, 0, 0) cv2.putText(measure_bgr, text, (15, 30), font, font_scale, color, thickness, cv2.LINE_AA) return measure_bgr def _stitch_scroll_segment(segment: List[np.ndarray]) -> np.ndarray: if len(segment) == 1: return segment[0] min_h = min(f.shape[0] for f in segment) panorama = segment[0][:min_h, :] for i in range(1, len(segment)): curr = segment[i][:min_h, :] scroll_px, conf = _detect_scroll_offset(segment[i-1][:min_h, :], curr, min_confidence=0.1) if scroll_px > 0 and conf > 0.15: new_strip = curr[:, curr.shape[1] - scroll_px:] panorama = np.hstack([panorama, new_strip]) else: panorama = np.hstack([panorama, curr]) return panorama def _merge_scroll_candidates(candidates: List[np.ndarray], min_scroll: int = 5, min_segment_len: int = 2) -> List[np.ndarray]: if len(candidates) <= 1: return candidates result = [] current_segment = [candidates[0]] prev_s_px = 0 prev_conf = 1.0 for i in range(1, len(candidates)): prev_frame = candidates[i-1] curr_frame = candidates[i] s_px, conf = _detect_scroll_offset(prev_frame, curr_frame, min_confidence=0.1) # [BUG1 FIX] 씬 전환 조건: conf 기반만 사용 # abs(s_px - prev_s_px) > 100 제거 — 스크롤 가속도를 씬전환으로 오탐하던 원인 is_cut = (conf <= 0.15) or (prev_conf - conf > 0.4) if not is_cut: current_segment.append(curr_frame) else: if len(current_segment) >= min_segment_len: result.append(_stitch_scroll_segment(current_segment)) else: result.extend(current_segment) current_segment = [curr_frame] prev_s_px = s_px prev_conf = conf if len(current_segment) >= min_segment_len: result.append(_stitch_scroll_segment(current_segment)) else: result.extend(current_segment) return result def _is_rewind_duplicate(query_bgr: np.ndarray, history_pano: np.ndarray) -> bool: if history_pano is None: return False h_gray = _extract_tracking_channel(history_pano) qw = min(800, query_bgr.shape[1]) q_gray = _extract_tracking_channel(query_bgr[:, :qw]) if h_gray.shape[0] != q_gray.shape[0] or h_gray.shape[1] < q_gray.shape[1]: return False res = cv2.matchTemplate(h_gray, q_gray, cv2.TM_CCOEFF_NORMED) _, max_val, _, max_loc = cv2.minMaxLoc(res) if max_val < 0.85: return False match_x = max_loc[0] # HEURISTIC 1: Is it just consecutive stitching from the immediate past? # If it matched the very end of history (< 2500 pixels from the end), it's just normal scroll overlap! if history_pano.shape[1] - match_x < 2500: return False # HEURISTIC 2: It matched deep in the past! It might be a rewind, OR it might be an identical Chorus. # We must check the measure number to differentiate identical Chorus vs exact D.S. al Coda rewind. qw_img = query_bgr[:, :qw] gray_for_staff = cv2.cvtColor(qw_img, cv2.COLOR_BGR2GRAY) if len(qw_img.shape) == 3 else qw_img _, bin_inv = cv2.threshold(gray_for_staff, 200, 255, cv2.THRESH_BINARY_INV) row_sums = np.sum(bin_inv, axis=1) / 255.0 staff_rows = np.where(row_sums > qw * 0.4)[0] if len(staff_rows) < 2: return False staff_top = staff_rows[0] box_y1 = max(0, staff_top - 60) box_y2 = staff_top + 10 box_x2 = min(250, query_bgr.shape[1], history_pano.shape[1] - match_x) q_num = query_bgr[box_y1:box_y2, 0:box_x2] h_num = history_pano[box_y1:box_y2, match_x:match_x+box_x2] if q_num.shape != h_num.shape or q_num.size == 0: return False diff = cv2.absdiff(cv2.cvtColor(q_num, cv2.COLOR_BGR2GRAY), cv2.cvtColor(h_num, cv2.COLOR_BGR2GRAY)) mse = np.mean(diff ** 2) if mse < 300.0: return True return False def merge_panoramas_list(panoramas): if not panoramas: return [] merged_list = [] current_master = panoramas[0].copy() history_pano = current_master.copy() rewind_state = False for i in range(1, len(panoramas)): next_pano = panoramas[i].copy() if _is_rewind_duplicate(next_pano, history_pano): print(" [Rewind Filter] D.S. al Coda or Backward Jump detected. Dropping redundant chronological playback.") rewind_state = True continue if rewind_state: print(" [Rewind Filter] Returning from rewind jump! Searching for novelty.") merged_list.append(current_master) current_master = next_pano if current_master.shape[0] == history_pano.shape[0]: history_pano = np.hstack([history_pano, next_pano]) rewind_state = False continue head_w = min(800, next_pano.shape[1]) head = next_pano[:, :head_w] search_w = min(1500, current_master.shape[1]) search_region = current_master[:, -search_w:] h_gray = _extract_tracking_channel(head) s_gray = _extract_tracking_channel(search_region) matched = False if h_gray.shape[1] <= s_gray.shape[1] and h_gray.shape[0] == s_gray.shape[0]: res = cv2.matchTemplate(s_gray, h_gray, cv2.TM_CCOEFF_NORMED) _, max_val, _, max_loc = cv2.minMaxLoc(res) if max_val > 0.50: match_x_in_search = max_loc[0] absolute_match_x = current_master.shape[1] - search_w + match_x_in_search next_start_idx = current_master.shape[1] - absolute_match_x if next_start_idx < next_pano.shape[1]: append_part = next_pano[:, next_start_idx:] if append_part.shape[1] > 0: current_master = np.hstack([current_master, append_part]) if current_master.shape[0] == history_pano.shape[0]: history_pano = np.hstack([history_pano, append_part]) matched = True if not matched: merged_list.append(current_master) current_master = next_pano if current_master.shape[0] == history_pano.shape[0]: history_pano = np.hstack([history_pano, next_pano]) merged_list.append(current_master) return merged_list def _find_all_measure_bars_standalone(img_bgr: np.ndarray, max_width: int) -> List[int]: cw = min(img_bgr.shape[1], max_width) img_gray = cv2.cvtColor(img_bgr[:, :cw], cv2.COLOR_BGR2GRAY) if len(img_bgr.shape) == 3 else img_bgr _, bin_inv = cv2.threshold(img_gray, 200, 255, cv2.THRESH_BINARY_INV) row_sums = np.sum(bin_inv, axis=1) / 255.0 staff_rows = np.where(row_sums > cw * 0.4)[0] if len(staff_rows) >= 6: staff_y_top, staff_y_bottom = staff_rows[0], staff_rows[-1] else: staff_y_top, staff_y_bottom = int(img_bgr.shape[0] * 0.3), int(img_bgr.shape[0] * 0.8) expected_h = max(10, staff_y_bottom - staff_y_top + 1) staff_region = bin_inv[staff_y_top:staff_y_bottom+1, :] col_sums = np.sum(staff_region, axis=0) / 255.0 bar_xs = np.where(col_sums >= expected_h * 0.6)[0] grouped_bars = [] if len(bar_xs) > 0: c = [bar_xs[0]] for x in bar_xs[1:]: if x - c[-1] <= 15: c.append(x) else: grouped_bars.append(int(np.mean(c))) c = [x] grouped_bars.append(int(np.mean(c))) unique_bars = [] for p in grouped_bars: if not unique_bars or p - unique_bars[-1] >= 50: unique_bars.append(p) return unique_bars def tile_panoramas_to_a4(panoramas: List[np.ndarray], chunk_width: int=1800) -> List[np.ndarray]: if not panoramas: return [] panorama = np.hstack(panoramas) if len(panoramas) > 1 else panoramas[0] rows = [] x_curr = 0 total_w = panorama.shape[1] while x_curr < total_w: remaining_w = total_w - x_curr if remaining_w <= chunk_width: r = panorama[:, x_curr:] if r.shape[1] > 50: r_padded = cv2.copyMakeBorder(r, 0, 0, 0, chunk_width - r.shape[1], cv2.BORDER_CONSTANT, value=[255,255,255]) rows.append(r_padded) break slice_bgr = panorama[:, x_curr : min(x_curr + chunk_width + 100, total_w)] bars = _find_all_measure_bars_standalone(slice_bgr, slice_bgr.shape[1]) valid_bars = [b for b in bars if 50 < b < chunk_width - 15] cut_offset = (valid_bars[-1] - 10) if valid_bars else chunk_width r = panorama[:, x_curr : x_curr + cut_offset] r_padded = cv2.copyMakeBorder(r, 0, 0, 0, chunk_width - r.shape[1], cv2.BORDER_CONSTANT, value=[255,255,255]) rows.append(r_padded) x_curr += cut_offset return rows def extract_unique_scroll(frames: List[np.ndarray], scan_dist: int = 4) -> List[np.ndarray]: from video_cv_tracker import TemporalTracker print("[Pipeline] Isolating static structures via TemporalTracker") tracker = TemporalTracker(diff_threshold=0.05) tab_bounds = None for f in frames[::30]: bounds = _find_white_tab_strip(f) if bounds: tab_bounds = bounds break if tab_bounds: top, bottom = tab_bounds print(f" -> Found precise sheet music bounds: Y={top} to Y={bottom}") else: top, bottom = 0, frames[0].shape[0] for frame in frames: roi = frame[top:bottom, :] tracker.process_frame(roi) unique_pages = tracker.get_unique_pages() print(f"[Pipeline] Reduced down to {len(unique_pages)} static structural median pages.") print(" -> 점프 컷 및 도돌이표 처리 중...") panoramas = merge_panoramas_list(unique_pages) print(" -> A4 타일링 포맷팅 중...") return tile_panoramas_to_a4(panoramas, chunk_width=1800) def extract_unique_overlay(frames: List[np.ndarray], threshold: float = OVERLAY_SIMILARITY_THRESHOLD) -> List[np.ndarray]: """오버레이형: TemporalTracker 기반의 고해상도 페이지(단일 스트립 크롭) 추출 및 정밀 픽셀 중복 필터""" from video_cv_tracker import TemporalTracker print("[4/5] 정지형(Overlay) Tab 트래킹 및 고해상도 추출 중...") tab_bounds = None for f in frames[::30]: bounds = _find_white_tab_strip(f, mode="largest") if bounds: tab_bounds = bounds break if tab_bounds: top, bottom = tab_bounds print(f" -> Found precise sheet music bounds: Y={top} to Y={bottom}") else: top, bottom = int(frames[0].shape[0]*0.2), int(frames[0].shape[0]*0.8) # BGR2GRAY 대신 True Black 채널을 사용하므로, 붉은색 재생커서 이동을 완벽히 무시합니다. # 따라서 악보가 물리적으로 넘어갈 때 발생하는 픽셀 변화(음표 교체)만 감지하게 되므로, 임계값을 0.03(3%)으로 극도로 낮춰 정밀도를 높입니다. tracker = TemporalTracker(diff_threshold=0.03) for frame in frames: if top is not None and bottom is not None: roi = frame[top:bottom, :] else: roi = frame roi_tracking = _extract_tracking_channel(roi) # 상하단 20%는 악보 밖이므로(예: 연주자 머리카락, 천장 등) 변화 감지에서 완전히 배제 h_r = roi_tracking.shape[0] s_top = int(h_r * 0.20) s_bot = int(h_r * 0.80) roi_tracking[:s_top, :] = 255 roi_tracking[s_bot:, :] = 255 tracker.process_frame(roi, tracking_channel=roi_tracking) pages = tracker.get_unique_pages() print(f"[Tracker] {len(pages)}개의 최초 구분 페이지 추출됨. 전역 중복 페이지 병합 심사 중...") unique = [] for crop in pages: if np.mean(cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY)) < 80: continue is_dup = False crop_gray = _extract_tracking_channel(crop) h_c, w_c = crop_gray.shape crop_gray[:int(h_c * 0.20), :] = 255 crop_gray[int(h_c * 0.80):, :] = 255 for past_crop in unique: past_gray = _extract_tracking_channel(past_crop) past_gray[:int(h_c * 0.20), :] = 255 past_gray[int(h_c * 0.80):, :] = 255 # 약간의 위치 이동(+/- 10픽셀)을 탐색하기 위해 템플릿 사이즈를 줄임 template = crop_gray[10:h_c-10, 10:w_c-10] res = cv2.matchTemplate(past_gray, template, cv2.TM_CCOEFF_NORMED) _, max_val, _, _ = cv2.minMaxLoc(res) # 90% 이상의 강한 상관계수를 가지면 인간의 눈에는 완벽히 똑같은 악보(도돌이표)임. if max_val > 0.90: is_dup = True break if not is_dup: unique.append(crop) print(f" → 임시: {len(unique)}개 고유 오버레이 페이지 추출 성공. 상하단 여백 및 제목 정리 중...") trimmed_unique = [] for crop in unique: gray = cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY) _, thresh = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY_INV) row_sums = np.sum(thresh, axis=1) / 255.0 # 폭의 40% 이상 차지하는 검은 오선지 영역만 찾음 (정적 제목 등 배제) h_c, w_c = crop.shape[:2] staff_rows = np.where(row_sums > w_c * 0.4)[0] if len(staff_rows) > 0: # 상단 여백 60px (코드, 기호 등), 하단 여백 30px top_y = max(0, staff_rows[0] - 60) bottom_y = min(h_c, staff_rows[-1] + 30) trimmed_unique.append(crop[top_y:bottom_y, :]) else: trimmed_unique.append(crop) print(f" → 최종: {len(trimmed_unique)}개 정제된 오버레이 페이지 추출 성공") return trimmed_unique # ─── Step 5: A4 PDF 생성 ───────────────────────────────────────────────── def generate_pdf(frames: List[np.ndarray], output_path: Path, debug_dir: Optional[Path] = None) -> None: """Tab 프레임들을 A4 페이지에 여러 행으로 배치""" print("[5/5] A4 PDF 생성 중...") if not frames: print(" ⚠ 프레임 없음!") return page_w = int(PDF_PAGE_HEIGHT_MM / 25.4 * PDF_DPI) # Landscape width page_h = int(PDF_PAGE_WIDTH_MM / 25.4 * PDF_DPI) # Landscape height margin = int(PDF_MARGIN_MM / 25.4 * PDF_DPI) gap = int(TAB_GAP_MM / 25.4 * PDF_DPI) content_w = page_w - 2 * margin resized = [] for i, frame in enumerate(frames): rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) img = Image.fromarray(rgb) if debug_dir: img.save(debug_dir / f"frame_{i:04d}.png") scale = content_w / img.width img_r = img.resize((content_w, int(img.height * scale)), Image.LANCZOS) resized.append(img_r) pages = [] cur_y = margin page = Image.new('RGB', (page_w, page_h), (255, 255, 255)) for img in resized: if cur_y + img.height > page_h - margin: pages.append(page) page = Image.new('RGB', (page_w, page_h), (255, 255, 255)) cur_y = margin page.paste(img, (margin, cur_y)) cur_y += img.height + gap if cur_y > margin + gap: pages.append(page) if not pages: return pages[0].save(str(output_path), save_all=True, append_images=pages[1:], resolution=PDF_DPI) print(f" → PDF: {len(resized)} Tab → {len(pages)} 페이지, {output_path.stat().st_size // 1024} KB") def generate_long_image(chunks: List[np.ndarray], output_path: str): if not chunks: return print(f"DEBUG: First chunk shape = {chunks[0].shape}, dtype = {chunks[0].dtype}") # Calculate exact total height required total_h = sum(chunk.shape[0] for chunk in chunks) max_w = max(chunk.shape[1] for chunk in chunks) # Ensure correct channel dimensions for the canvas to prevent squishing! if len(chunks[0].shape) == 3: canvas = np.full((total_h, max_w, 3), 255, dtype=np.uint8) else: canvas = np.full((total_h, max_w), 255, dtype=np.uint8) y_offset = 0 for chunk in chunks: h, w = chunk.shape[:2] if len(chunk.shape) == 3 and len(canvas.shape) == 2: canvas[y_offset:y_offset+h, :w] = cv2.cvtColor(chunk, cv2.COLOR_BGR2GRAY) elif len(chunk.shape) == 2 and len(canvas.shape) == 3: canvas[y_offset:y_offset+h, :w] = cv2.cvtColor(chunk, cv2.COLOR_GRAY2BGR) else: canvas[y_offset:y_offset+h, :w] = chunk y_offset += h cv2.imwrite(str(output_path), canvas) # ─── Main ───────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="YouTube TAB → A4 PDF") parser.add_argument("url", help="YouTube URL") parser.add_argument("-o", "--output", help="출력 PDF 경로") parser.add_argument("--fps", type=float, default=DEFAULT_FPS) parser.add_argument("--similarity", type=float, default=None) parser.add_argument("--pattern", choices=["auto", "scroll", "overlay"], default="auto") parser.add_argument("--debug", action="store_true") args = parser.parse_args() output_dir = Path("output") output_dir.mkdir(exist_ok=True) debug_dir = None if args.debug: debug_dir = output_dir / "debug_frames" debug_dir.mkdir(exist_ok=True) video_path, safe_title = download_video(args.url, output_dir) frames = extract_frames(video_path, fps=args.fps) if not frames: print("❌ 프레임 추출 실패") sys.exit(1) pattern = detect_pattern(frames) if args.pattern == "auto" else args.pattern if pattern == "scroll": sim = args.similarity if args.similarity else SIMILARITY_THRESHOLD unique = extract_unique_scroll(frames, threshold=sim) else: sim = args.similarity if args.similarity else OVERLAY_SIMILARITY_THRESHOLD unique = extract_unique_overlay(frames, threshold=sim) if not unique: print("❌ 고유 Tab 프레임 없음. --similarity를 낮추거나 --pattern을 수동 지정하세요.") sys.exit(1) pdf_path = Path(args.output) if args.output else output_dir / f"{safe_title}.pdf" generate_pdf(unique, pdf_path, debug_dir=debug_dir) generate_long_image(unique, pdf_path.with_suffix(".png")) print(f"\n✅ 완료! PDF: {pdf_path}") if __name__ == "__main__": main()