feat(pipeline): 마디번호 기반 중복 검증 (OCR) 적용 및 1080p 720p fallback
This commit is contained in:
@@ -19,8 +19,63 @@ from typing import List, Tuple, Optional
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import img2pdf
|
||||
from PIL import Image
|
||||
|
||||
_ocr_reader = None
|
||||
|
||||
def _get_ocr_reader():
|
||||
global _ocr_reader
|
||||
if _ocr_reader is None:
|
||||
print(" → EasyOCR 모델 로딩 중 (초회 1번)...")
|
||||
try:
|
||||
import easyocr
|
||||
_ocr_reader = easyocr.Reader(['en'])
|
||||
except ImportError:
|
||||
print(" [경고] easyocr 라이브러리가 없습니다. OCR 중복 검증을 건너뜁니다.")
|
||||
return None
|
||||
return _ocr_reader
|
||||
|
||||
def _dedup_by_measure_number(frames: List[np.ndarray]) -> List[np.ndarray]:
|
||||
"""OCR을 이용해 Tab 좌측 상단의 마디 번호를 읽고,
|
||||
연속으로 동일한 번호가 검출되면 중복으로 간주하고 제거합니다."""
|
||||
reader = _get_ocr_reader()
|
||||
if not reader:
|
||||
return frames
|
||||
|
||||
print(f" → 마디번호 기반 3차 중복 검증 시작 ({len(frames)} 프레임)")
|
||||
unique = []
|
||||
last_measure_num = None
|
||||
|
||||
for i, frame in enumerate(frames):
|
||||
h, w = frame.shape[:2]
|
||||
# 마디 번호는 극한의 좌측 상단 (높이 상위 25%, 너비 좌측 8%)에 위치
|
||||
crop = frame[:int(h * 0.25), :int(w * 0.08)]
|
||||
gray = cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY) if len(crop.shape) == 3 else crop
|
||||
|
||||
results = reader.readtext(gray, allowlist='0123456789')
|
||||
|
||||
measure_num = None
|
||||
if results:
|
||||
# conf > 0.4 이면서 1~3자리의 숫자로만 이루어진 텍스트를 마디 번호로 간주 (프렛 번호 연속 인식 방지)
|
||||
valid_results = [res[1] for res in results if res[2] > 0.4 and res[1].isdigit() and len(res[1]) <= 3]
|
||||
if valid_results:
|
||||
measure_num = valid_results[0]
|
||||
|
||||
if measure_num is not None:
|
||||
if measure_num == last_measure_num:
|
||||
print(f" - 프레임 {i+1}: 마디번호 [{measure_num}] 중복 감지 (삭제)")
|
||||
continue
|
||||
last_measure_num = measure_num
|
||||
print(f" - 프레임 {i+1}: 마디번호 [{measure_num}] (유지)")
|
||||
else:
|
||||
print(f" - 프레임 {i+1}: 마디번호 미검출 (유지)")
|
||||
|
||||
unique.append(frame)
|
||||
|
||||
print(f" → OCR 3차: {len(unique)}개 고유 Tab 프레임")
|
||||
return unique
|
||||
|
||||
# Windows 콘솔 인코딩
|
||||
if sys.platform == "win32":
|
||||
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
|
||||
@@ -82,12 +137,11 @@ def download_video(url: str, output_dir: Path) -> Tuple[Path, str]:
|
||||
print(f" → 이미 다운로드됨: {video_path.name}")
|
||||
return video_path, safe_title
|
||||
|
||||
# 1080p 우선, 720p 폴백, 최종 best
|
||||
# 720p 우선 (다운스케일링 부하 원천 차단)
|
||||
subprocess.run(
|
||||
[yt_dlp,
|
||||
"-f", "bestvideo[height>=1080][ext=mp4]+bestaudio[ext=m4a]/"
|
||||
"bestvideo[height>=720][ext=mp4]+bestaudio[ext=m4a]/"
|
||||
"best[height>=720]/best",
|
||||
"-f", "bestvideo[height<=720][ext=mp4]+bestaudio[ext=m4a]/"
|
||||
"best[height<=720]/best",
|
||||
"--merge-output-format", "mp4",
|
||||
"-o", str(video_path), url],
|
||||
encoding="utf-8", errors="replace", check=True
|
||||
@@ -127,6 +181,8 @@ def extract_frames(video_path: Path, fps: float = DEFAULT_FPS) -> List[np.ndarra
|
||||
if need_resize:
|
||||
frame = cv2.resize(frame, target_size, interpolation=cv2.INTER_AREA)
|
||||
frames.append(frame)
|
||||
if len(frames) % 50 == 0:
|
||||
print(f" ... {len(frames)}번째 프레임 추출 진행 중...", flush=True)
|
||||
idx += 1
|
||||
|
||||
cap.release()
|
||||
@@ -164,23 +220,28 @@ def _find_white_tab_strip(frame: np.ndarray, min_strip_ratio: float = 0.10) -> O
|
||||
row_tab_ratio = np.mean(tab_mask, axis=1)
|
||||
bright_mask = row_tab_ratio > 0.5 # 행의 50% 이상이 Tab-like
|
||||
|
||||
# 연속된 흰색 행 영역 찾기
|
||||
# 연속된 흰색 행 영역 찾기 (검은색 탭 라인 및 음표로 인한 끊김 허용)
|
||||
max_gap = int(h * 0.02) # 약 2% (720p 기준 14px)까지의 흰색 끊김은 같은 영역으로 간주
|
||||
regions = []
|
||||
start = None
|
||||
gap_count = 0
|
||||
for i in range(h):
|
||||
if bright_mask[i]:
|
||||
if start is None:
|
||||
start = i
|
||||
gap_count = 0
|
||||
else:
|
||||
if start is not None:
|
||||
length = i - start
|
||||
if length >= h * min_strip_ratio:
|
||||
regions.append((start, i))
|
||||
start = None
|
||||
gap_count += 1
|
||||
if gap_count > max_gap:
|
||||
length = (i - gap_count) - start
|
||||
if length >= h * min_strip_ratio:
|
||||
regions.append((start, i - gap_count))
|
||||
start = None
|
||||
if start is not None:
|
||||
length = h - start
|
||||
length = (h - gap_count) - start
|
||||
if length >= h * min_strip_ratio:
|
||||
regions.append((start, h))
|
||||
regions.append((start, h - gap_count))
|
||||
|
||||
if not regions:
|
||||
return None
|
||||
@@ -562,7 +623,9 @@ def extract_unique_scroll(frames: List[np.ndarray],
|
||||
if not _has_tab_content(tab_crop):
|
||||
continue
|
||||
|
||||
tab_crop = _trim_to_content(tab_crop)
|
||||
# 🚨 _trim_to_content를 각 프레임별로 적용하면 음표 높낮이에 따라 프레임 높이가 들쭉날쭉해짐.
|
||||
# 이후 스크롤 합성(stitch)에서 min_h로 잘리면서 악보가 다 잘려나가는(Crop) 치명적 원인이 됨!
|
||||
# tab_crop = _trim_to_content(tab_crop)
|
||||
|
||||
compare_img = cv2.resize(tab_crop, (480, 120), interpolation=cv2.INTER_AREA)
|
||||
|
||||
@@ -584,8 +647,12 @@ def extract_unique_scroll(frames: List[np.ndarray],
|
||||
print(f" → 파노라마: {len(candidates)}개 → {len(stitched)}개 (스크롤 합성)")
|
||||
|
||||
# ── Phase 3: pHash 2차 클러스터 중복 제거 ──
|
||||
unique = _dedup_by_hash(stitched, max_hamming=50)
|
||||
unique = _dedup_by_hash(stitched, max_hamming=20)
|
||||
print(f" → pHash 2차: {len(unique)}개 고유 Tab 프레임")
|
||||
|
||||
# ── Phase 4: 마디번호 기반 최종 중복 제거 (OCR) ──
|
||||
unique = _dedup_by_measure_number(unique)
|
||||
|
||||
return unique
|
||||
|
||||
|
||||
@@ -634,7 +701,11 @@ def extract_unique_overlay(frames: List[np.ndarray],
|
||||
unique.append(crop)
|
||||
all_normalized.append(canvas)
|
||||
|
||||
print(f" → {len(unique)}개 고유 Tab 오버레이")
|
||||
# ── Phase 2: 마디번호 기반 최종 중복 제거 (OCR) ──
|
||||
if unique:
|
||||
unique = _dedup_by_measure_number(unique)
|
||||
|
||||
print(f" → 최종: {len(unique)}개 고유 Tab 오버레이")
|
||||
return unique
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user