fix(cv): resolve measure duplication by isolating playhead and enforcing 1D sliding correlations

This commit is contained in:
2026-03-29 00:06:38 +09:00
parent 64ecc12d35
commit cd159c2a99
5 changed files with 207 additions and 58 deletions

119
video_cv_tracker.py Normal file
View File

@@ -0,0 +1,119 @@
import cv2
import numpy as np
from typing import List, Tuple, Optional
class TemporalTracker:
def __init__(self, min_confidence: float = 0.15):
self.min_confidence = min_confidence
self.last_clean_frame = None
self.last_conf = 1.0
self.panorama = None
self.total_frames_processed = 0
self.in_transition = False
def _extract_tracking_channel(self, bgr: np.ndarray) -> np.ndarray:
return bgr[:, :, 0]
def _extract_print_channel(self, bgr: np.ndarray) -> np.ndarray:
# 가장 밝은 채널을 취해 유색(노랑/파랑) 플레이헤드를 흰색 배경으로 흡수
gray = np.max(bgr, axis=2)
# 120 이하의 순수 검은색 음표들만 Foreground(255)로 추출 (플레이헤드 완전 삭제)
_, binary = cv2.threshold(gray, 120, 255, cv2.THRESH_BINARY_INV)
return binary
def _calculate_pixel_shift(self, prev_img: np.ndarray, curr_img: np.ndarray) -> Tuple[int, float]:
h, w = prev_img.shape[:2]
# 플레이헤드가 방해하지 않도록 Print Channel(음표만 추출, 하이라이트 삭제) 사용!
prev_chan = self._extract_print_channel(prev_img)
curr_chan = self._extract_print_channel(curr_img)
# 템플릿: PREV 프레임의 우측 60~90% 영역
template_w = int(w * 0.3)
start_x = int(w * 0.6)
template = prev_chan[:, start_x:start_x + template_w]
res = cv2.matchTemplate(curr_chan, template, cv2.TM_CCOEFF_NORMED)
_, max_val, _, max_loc = cv2.minMaxLoc(res)
curr_x = max_loc[0]
scroll_dx = start_x - curr_x
if max_val < self.min_confidence or scroll_dx <= 0:
return 0, max_val
# 기타 스크롤 속도 물리적 한계: 2fps 기준 프레임당 최대 이동량
# 1280px(1페이지)가 지나가는데 보통 4~10초 소요. 0.5초당 이동량은 150px 미만.
# 이를 초과하는 엄청난 점프값(예: 500px)은 똑같이 생긴 '다른 마디'를 현재로 착각한 OpenCV의 치명적 오탐!
# 따라서 허용치를 넘어가는 가속도는 무조건 무시(dx=0)하여 마디 순서 꼬임을 원천 차단.
max_dx = w * 0.15
if scroll_dx > max_dx:
return 0, max_val
return scroll_dx, max_val
def process_frame(self, frame: np.ndarray) -> None:
self.total_frames_processed += 1
if self.panorama is None:
self.panorama = frame.copy()
self.last_clean_frame = frame.copy()
return
dx, conf = self._calculate_pixel_shift(self.last_clean_frame, frame)
# Scene cut 진입 조건
if (conf < 0.45) or (self.last_conf - conf > 0.3):
self.in_transition = True
# Transition 중이고 화면이 이제서야 완전히 안정화 (정지) 되었을 때 == 페이지 넘김이 "끝난" 직후
elif self.in_transition and conf > 0.85 and dx == 0:
self.in_transition = False
# 전환(Fade/Slide)이 완전히 끝난 맑은 프레임을 시각적으로 겹참하여 부착
if self.panorama is not None and self.panorama.shape[1] > 0:
h = self.panorama.shape[0]
new_page = cv2.resize(frame, (frame.shape[1], h))
head_w = min(500, new_page.shape[1])
head = self._extract_print_channel(new_page[:, 50:50+head_w])
search_w = min(head_w + 500, self.panorama.shape[1])
search_region = self._extract_print_channel(self.panorama[:, -search_w:])
if head.shape[1] > 0 and search_region.shape[1] >= head.shape[1]:
edge_search = cv2.Canny(cv2.GaussianBlur(search_region, (3,3), 0), 30, 100)
edge_head = cv2.Canny(cv2.GaussianBlur(head, (3,3), 0), 30, 100)
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, 7))
edge_search = cv2.morphologyEx(edge_search, cv2.MORPH_OPEN, kernel)
edge_head = cv2.morphologyEx(edge_head, cv2.MORPH_OPEN, kernel)
if np.count_nonzero(edge_head) > 50:
res = cv2.matchTemplate(edge_search, edge_head, cv2.TM_CCOEFF_NORMED)
_, max_val, _, max_loc = cv2.minMaxLoc(res)
if max_val > 0.25:
overlap_px = search_w - max_loc[0] + 50
if overlap_px < new_page.shape[1] - 50:
self.panorama = np.hstack([self.panorama, new_page[:, overlap_px:]])
else:
self.panorama = np.hstack([self.panorama, new_page])
else:
self.panorama = np.hstack([self.panorama, new_page])
else:
self.panorama = np.hstack([self.panorama, new_page])
# 정상적인 스피드의 스크롤 처리 (트랜지션 쿨다운 중이 아닐 때만)
elif dx > 0 and dx < frame.shape[1] and not self.in_transition:
new_strip = frame[:, frame.shape[1] - dx:, :]
if new_strip.shape[0] != self.panorama.shape[0]:
new_strip = cv2.resize(new_strip, (dx, self.panorama.shape[0]))
self.panorama = np.hstack([self.panorama, new_strip])
self.last_conf = conf
self.last_clean_frame = frame.copy()
def get_final_panorama(self) -> Optional[np.ndarray]:
return self.panorama