120 lines
5.6 KiB
Python
120 lines
5.6 KiB
Python
import cv2
|
|
import numpy as np
|
|
from typing import List, Tuple, Optional
|
|
|
|
class TemporalTracker:
|
|
def __init__(self, min_confidence: float = 0.15):
|
|
self.min_confidence = min_confidence
|
|
self.last_clean_frame = None
|
|
self.last_conf = 1.0
|
|
self.panorama = None
|
|
self.total_frames_processed = 0
|
|
self.in_transition = False
|
|
|
|
def _extract_tracking_channel(self, bgr: np.ndarray) -> np.ndarray:
|
|
return bgr[:, :, 0]
|
|
|
|
def _extract_print_channel(self, bgr: np.ndarray) -> np.ndarray:
|
|
# 가장 밝은 채널을 취해 유색(노랑/파랑) 플레이헤드를 흰색 배경으로 흡수
|
|
gray = np.max(bgr, axis=2)
|
|
# 120 이하의 순수 검은색 음표들만 Foreground(255)로 추출 (플레이헤드 완전 삭제)
|
|
_, binary = cv2.threshold(gray, 120, 255, cv2.THRESH_BINARY_INV)
|
|
return binary
|
|
|
|
def _calculate_pixel_shift(self, prev_img: np.ndarray, curr_img: np.ndarray) -> Tuple[int, float]:
|
|
h, w = prev_img.shape[:2]
|
|
|
|
# 플레이헤드가 방해하지 않도록 Print Channel(음표만 추출, 하이라이트 삭제) 사용!
|
|
prev_chan = self._extract_print_channel(prev_img)
|
|
curr_chan = self._extract_print_channel(curr_img)
|
|
|
|
# 템플릿: PREV 프레임의 우측 60~90% 영역
|
|
template_w = int(w * 0.3)
|
|
start_x = int(w * 0.6)
|
|
template = prev_chan[:, start_x:start_x + template_w]
|
|
|
|
res = cv2.matchTemplate(curr_chan, template, cv2.TM_CCOEFF_NORMED)
|
|
_, max_val, _, max_loc = cv2.minMaxLoc(res)
|
|
|
|
curr_x = max_loc[0]
|
|
scroll_dx = start_x - curr_x
|
|
|
|
if max_val < self.min_confidence or scroll_dx <= 0:
|
|
return 0, max_val
|
|
|
|
# 기타 스크롤 속도 물리적 한계: 2fps 기준 프레임당 최대 이동량
|
|
# 1280px(1페이지)가 지나가는데 보통 4~10초 소요. 0.5초당 이동량은 150px 미만.
|
|
# 이를 초과하는 엄청난 점프값(예: 500px)은 똑같이 생긴 '다른 마디'를 현재로 착각한 OpenCV의 치명적 오탐!
|
|
# 따라서 허용치를 넘어가는 가속도는 무조건 무시(dx=0)하여 마디 순서 꼬임을 원천 차단.
|
|
max_dx = w * 0.15
|
|
|
|
if scroll_dx > max_dx:
|
|
return 0, max_val
|
|
|
|
return scroll_dx, max_val
|
|
|
|
def process_frame(self, frame: np.ndarray) -> None:
|
|
self.total_frames_processed += 1
|
|
|
|
if self.panorama is None:
|
|
self.panorama = frame.copy()
|
|
self.last_clean_frame = frame.copy()
|
|
return
|
|
|
|
dx, conf = self._calculate_pixel_shift(self.last_clean_frame, frame)
|
|
|
|
# Scene cut 진입 조건
|
|
if (conf < 0.45) or (self.last_conf - conf > 0.3):
|
|
self.in_transition = True
|
|
|
|
# Transition 중이고 화면이 이제서야 완전히 안정화 (정지) 되었을 때 == 페이지 넘김이 "끝난" 직후
|
|
elif self.in_transition and conf > 0.85 and dx == 0:
|
|
self.in_transition = False
|
|
|
|
# 전환(Fade/Slide)이 완전히 끝난 맑은 프레임을 시각적으로 겹참하여 부착
|
|
if self.panorama is not None and self.panorama.shape[1] > 0:
|
|
h = self.panorama.shape[0]
|
|
new_page = cv2.resize(frame, (frame.shape[1], h))
|
|
|
|
head_w = min(500, new_page.shape[1])
|
|
head = self._extract_print_channel(new_page[:, 50:50+head_w])
|
|
|
|
search_w = min(head_w + 500, self.panorama.shape[1])
|
|
search_region = self._extract_print_channel(self.panorama[:, -search_w:])
|
|
|
|
if head.shape[1] > 0 and search_region.shape[1] >= head.shape[1]:
|
|
edge_search = cv2.Canny(cv2.GaussianBlur(search_region, (3,3), 0), 30, 100)
|
|
edge_head = cv2.Canny(cv2.GaussianBlur(head, (3,3), 0), 30, 100)
|
|
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, 7))
|
|
edge_search = cv2.morphologyEx(edge_search, cv2.MORPH_OPEN, kernel)
|
|
edge_head = cv2.morphologyEx(edge_head, cv2.MORPH_OPEN, kernel)
|
|
|
|
if np.count_nonzero(edge_head) > 50:
|
|
res = cv2.matchTemplate(edge_search, edge_head, cv2.TM_CCOEFF_NORMED)
|
|
_, max_val, _, max_loc = cv2.minMaxLoc(res)
|
|
|
|
if max_val > 0.25:
|
|
overlap_px = search_w - max_loc[0] + 50
|
|
if overlap_px < new_page.shape[1] - 50:
|
|
self.panorama = np.hstack([self.panorama, new_page[:, overlap_px:]])
|
|
else:
|
|
self.panorama = np.hstack([self.panorama, new_page])
|
|
else:
|
|
self.panorama = np.hstack([self.panorama, new_page])
|
|
else:
|
|
self.panorama = np.hstack([self.panorama, new_page])
|
|
|
|
# 정상적인 스피드의 스크롤 처리 (트랜지션 쿨다운 중이 아닐 때만)
|
|
elif dx > 0 and dx < frame.shape[1] and not self.in_transition:
|
|
new_strip = frame[:, frame.shape[1] - dx:, :]
|
|
if new_strip.shape[0] != self.panorama.shape[0]:
|
|
new_strip = cv2.resize(new_strip, (dx, self.panorama.shape[0]))
|
|
self.panorama = np.hstack([self.panorama, new_strip])
|
|
|
|
self.last_conf = conf
|
|
self.last_clean_frame = frame.copy()
|
|
|
|
def get_final_panorama(self) -> Optional[np.ndarray]:
|
|
return self.panorama
|