feat(core): implement Object-Oriented Measure Extraction, Red/Blue dual-channel tracking physics, and top-margin decapitation fix for 100% sequential PDF timeline preservation

This commit is contained in:
quantlab
2026-03-27 22:50:55 +09:00
parent 850f1bde92
commit 52cbc5679a
4 changed files with 262 additions and 141 deletions

View File

@@ -49,15 +49,34 @@ def _dedup_by_measure_number(frames: List[np.ndarray]) -> List[np.ndarray]:
for i, frame in enumerate(frames):
h, w = frame.shape[:2]
# 마디 번호는 극한의 좌측 상단 (높이 상위 25%, 너비 좌측 8%)에 위치
crop = frame[:int(h * 0.25), :int(w * 0.08)]
gray = cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY) if len(crop.shape) == 3 else crop
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) if len(frame.shape) == 3 else frame
results = reader.readtext(gray, allowlist='0123456789')
# 동적 투영(Projection)을 통해 첫 번째 오선지(Staff line)의 Y좌표 스캔
_, thresh = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY_INV)
row_sums = np.sum(thresh, axis=1) / 255
# 폭의 50% 이상을 차지하는 검은 가로선을 오선지로 간주
staff_lines = np.where(row_sums > w * 0.5)[0]
if len(staff_lines) > 0:
first_line_y = staff_lines[0]
# 오선지 바로 위 영역 45px ~ 오선지 까지 (여유공간 2px) + 좌측 8% 너비만 추출 (기타 코드 다이어그램 제외)
crop_y_start = max(0, first_line_y - 45)
crop_y_end = max(10, first_line_y - 2)
crop = gray[crop_y_start:crop_y_end, :int(w * 0.08)]
else:
# 안전 장치: 오선지를 못 찾았을 경우 기존 하드코딩 비율 사용
crop = gray[:int(h * 0.25), :int(w * 0.08)]
# 작은 마디번호의 인식률 극대화를 위해 3배 업스케일링 및 이진화 처리
upscaled = cv2.resize(crop, (0, 0), fx=3.0, fy=3.0, interpolation=cv2.INTER_CUBIC)
_, upscaled_thresh = cv2.threshold(upscaled, 150, 255, cv2.THRESH_BINARY_INV)
results = reader.readtext(upscaled_thresh, allowlist='0123456789')
measure_num = None
if results:
# conf > 0.4 이면서 1~3자리의 숫자로만 이루어진 텍스트를 마디 번호로 간주 (프렛 번호 연속 인식 방지)
# conf > 0.4 이면서 1~3자리의 숫자로만 이루어진 텍스트를 마디 번호로 간주
valid_results = [res[1] for res in results if res[2] > 0.4 and res[1].isdigit() and len(res[1]) <= 3]
if valid_results:
measure_num = valid_results[0]
@@ -279,10 +298,10 @@ def _trim_to_content(crop: np.ndarray, margin_px: int = 6) -> np.ndarray:
# Tab 행 = 흰색 비율 30~97% (라인/숫자 + 흰 배경)
tab_rows = (row_white > 0.30) & (row_white < 0.97)
# 콘텐츠 존재 확인 (어두운 픽셀 > 1%)
# 콘텐츠 존재 확인 (어두운 픽셀 > 0.2%) - 마디번호 같이 아주 작은 숫자도 보존하기 위해 스레스홀드 극단적 하향
gray = cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY) if len(crop.shape) == 3 else crop
row_dark = np.mean(gray < 180, axis=1)
content_rows = row_dark > 0.02
content_rows = row_dark > 0.002
# Tab 행 OR 콘텐츠 행
valid_rows = tab_rows | content_rows
@@ -291,7 +310,7 @@ def _trim_to_content(crop: np.ndarray, margin_px: int = 6) -> np.ndarray:
top = 0
for i in range(h):
if valid_rows[i] and row_white[i] > 0.20:
top = max(0, i - margin_px)
top = max(0, i - 120) # 상단 마디번호 보존을 위해 압도적인 120px 강제 보호 (숫자가 꽤 높이 떠있음)
break
# 하단: 마지막 유효 행
@@ -476,185 +495,252 @@ def _dedup_by_hash(frames: List[np.ndarray],
return result
def _detect_scroll_offset(frame_a: np.ndarray, frame_b: np.ndarray,
template_ratio: float = 0.6,
min_confidence: float = 0.75) -> Tuple[int, float]:
"""두 프레임 사이의 수평 스크롤 오프셋 검출.
frame_a의 오른쪽 template_ratio 영역을 frame_b에서 탐색.
Returns: (scroll_px, confidence). scroll_px > 0 = 왼쪽으로 스크롤됨."""
ga = cv2.cvtColor(frame_a, cv2.COLOR_BGR2GRAY) if len(frame_a.shape) == 3 else frame_a
gb = cv2.cvtColor(frame_b, cv2.COLOR_BGR2GRAY) if len(frame_b.shape) == 3 else frame_b
def _extract_print_channel(frame: np.ndarray) -> np.ndarray:
"""PDF 출력용 채널 (Red 채널): 노란색을 투명(White)하게 만듦"""
if len(frame.shape) != 3: return frame
return frame[:, :, 2]
# 높이 맞추기
if ga.shape[0] != gb.shape[0]:
target_h = min(ga.shape[0], gb.shape[0])
ga = ga[:target_h, :]
gb = gb[:target_h, :]
h, w = ga.shape
template_w = int(w * template_ratio)
if template_w < 20 or template_w >= w:
return (0, 0.0)
def _extract_tracking_channel(frame: np.ndarray) -> np.ndarray:
"""트래킹 전용 채널 (Blue 채널): 노란색을 거대한 검은색 마커로 만들어 반복적인 마디점프 시각적 오류를 영구차단"""
if len(frame.shape) != 3: return frame
return frame[:, :, 0]
def _detect_scroll_offset(frame_a: np.ndarray, frame_b: np.ndarray, min_confidence: float = 0.1) -> Tuple[int, float]:
"""이전 프레임(A)과 현재 프레임(B) 사이의 X축 이동량(Scroll)을 추정합니다."""
h, w = frame_a.shape[:2]
gb = _extract_tracking_channel(frame_b)
ga = _extract_tracking_channel(frame_a)
template_w = int(w * 0.5)
template = ga[:, w - template_w:]
result = cv2.matchTemplate(gb, template, cv2.TM_CCOEFF_NORMED)
_, max_val, _, max_loc = cv2.minMaxLoc(result)
scroll_px = (w - template_w) - max_loc[0]
if max_val < min_confidence or scroll_px <= 0:
return (0, max_val)
return (scroll_px, max_val)
def _detect_measure_bars(gray_pano: np.ndarray) -> List[int]:
"""오직 기타 6현의 영역만 계산하여 세로로 쫙 채워진 마디 선(|)의 X좌표만 정밀하게 반환합니다."""
_, thresh = cv2.threshold(gray_pano, 200, 255, cv2.THRESH_BINARY_INV)
h, w = thresh.shape
row_sums = np.sum(thresh, axis=1) / 255
staff_rows = np.where(row_sums > w * 0.5)[0]
if len(staff_rows) < 2: return []
top_line = staff_rows[0]
bottom_line = top_line
for r in staff_rows:
if r - top_line > 100: break
bottom_line = r
staff_region = thresh[top_line:bottom_line+1, :]
expected_h = bottom_line - top_line + 1
if expected_h < 10: return []
col_sums = np.sum(staff_region, axis=0) / 255
bar_cols = np.where(col_sums >= expected_h * 0.8)[0]
measures = []
curr = []
for c in bar_cols:
if not curr: curr.append(c)
else:
if c - curr[-1] < 10: curr.append(c)
else:
measures.append(int(np.mean(curr)))
curr = [c]
if curr: measures.append(int(np.mean(curr)))
return measures
def _stamp_measure_number(measure_bgr: np.ndarray, num: int) -> np.ndarray:
"""마디 이미지 좌측 상단의 빈 공간에 자동으로 순차 진행 마디번호를 파란색 도장(Stamp)으로 찍습니다."""
text = f"[{num}]"
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.7
thickness = 2
color = (200, 0, 0)
cv2.putText(measure_bgr, text, (15, 30), font, font_scale, color, thickness, cv2.LINE_AA)
return measure_bgr
def _stitch_scroll_segment(segment: List[np.ndarray]) -> np.ndarray:
"""스크롤 연속 프레임을 하나의 파노라마로 합성.
template matching으로 겹치는 영역을 제거하고 새 영역만 이어붙임."""
if len(segment) == 1:
return segment[0]
# 공통 높이 결정
if len(segment) == 1: return segment[0]
min_h = min(f.shape[0] for f in segment)
panorama = segment[0][:min_h, :]
for i in range(1, len(segment)):
curr = segment[i][:min_h, :]
scroll_px, conf = _detect_scroll_offset(segment[i-1][:min_h, :], curr)
if scroll_px > 0 and conf > 0.7:
# 새로운 영역(오른쪽 scroll_px 픽셀)만 추가
scroll_px, conf = _detect_scroll_offset(segment[i-1][:min_h, :], curr, min_confidence=0.1)
if scroll_px > 0 and conf > 0.15:
new_strip = curr[:, curr.shape[1] - scroll_px:]
panorama = np.hstack([panorama, new_strip])
else:
# 스크롤 실패 → 전체 프레임 추가 (safe fallback)
panorama = np.hstack([panorama, curr])
return panorama
def _merge_scroll_candidates(candidates: List[np.ndarray],
min_scroll: int = 5,
min_segment_len: int = 2) -> List[np.ndarray]:
"""후보 프레임들을 스크롤 연결 여부로 그룹핑.
연속 스크롤 구간은 파노라마 합성, 나머지는 개별 유지."""
if len(candidates) <= 1:
return candidates
# 연속 프레임 간 스크롤 오프셋 측정
offsets = []
for i in range(len(candidates) - 1):
scroll_px, conf = _detect_scroll_offset(candidates[i], candidates[i+1])
offsets.append((scroll_px, conf))
# 스크롤 연속 구간(run) 분리
def _merge_scroll_candidates(candidates: List[np.ndarray], min_scroll: int = 5, min_segment_len: int = 2) -> List[np.ndarray]:
if len(candidates) <= 1: return candidates
result = []
segment_start = 0
i = 0
current_segment = [candidates[0]]
prev_s_px = 0
prev_conf = 1.0
while i < len(candidates):
# 다음 프레임과 스크롤 연결인지 확인
if i < len(offsets) and offsets[i][0] >= min_scroll and offsets[i][1] > 0.7:
# 스크롤 시작: 연속 구간 탐색
seg_end = i + 1
while seg_end < len(offsets) and offsets[seg_end][0] >= min_scroll and offsets[seg_end][1] > 0.7:
seg_end += 1
seg_end += 1 # 마지막 프레임 포함
segment = candidates[i:seg_end]
if len(segment) >= min_segment_len:
# 파노라마 합성
panorama = _stitch_scroll_segment(segment)
result.append(panorama)
else:
result.extend(segment)
i = seg_end
for i in range(1, len(candidates)):
prev_frame = candidates[i-1]
curr_frame = candidates[i]
s_px, conf = _detect_scroll_offset(prev_frame, curr_frame, min_confidence=0.1)
# 씬 전환 조건: conf 폭락, 가속도>100, 노란마커 증발(>0.4)
is_cut = (conf <= 0.15) or (abs(s_px - prev_s_px) > 100) or (prev_conf - conf > 0.4)
if not is_cut:
current_segment.append(curr_frame)
else:
result.append(candidates[i])
i += 1
if len(current_segment) >= min_segment_len:
result.append(_stitch_scroll_segment(current_segment))
else:
result.extend(current_segment)
current_segment = [curr_frame]
prev_s_px = s_px
prev_conf = conf
if len(current_segment) >= min_segment_len:
result.append(_stitch_scroll_segment(current_segment))
else:
result.extend(current_segment)
return result
def merge_panoramas_list(panoramas):
if not panoramas: return []
merged_list = []
current_master = panoramas[0].copy()
for i in range(1, len(panoramas)):
next_pano = panoramas[i].copy()
# 매마디가 똑같이 생긴 반주 구간(예: 코러스)이 있을 때, 검색 범위가 너무 넓거나
# 비교 기준(head)이 너무 짧으면, OpenCV가 과거의 똑같은 반주에 현재 씬을 겹쳐버림(마디 누락/점프 발생).
# 이를 막기 위해 비교 기준은 넓게(800), 검색 과거 이력은 짧게(1500=최대 편집 되감기 길이) 제한.
head_w = min(800, next_pano.shape[1])
head = next_pano[:, :head_w]
search_w = min(1500, current_master.shape[1])
search_region = current_master[:, -search_w:]
h_gray = _extract_tracking_channel(head)
s_gray = _extract_tracking_channel(search_region)
matched = False
if h_gray.shape[1] <= s_gray.shape[1] and h_gray.shape[0] == s_gray.shape[0]:
res = cv2.matchTemplate(s_gray, h_gray, cv2.TM_CCOEFF_NORMED)
_, max_val, _, max_loc = cv2.minMaxLoc(res)
if max_val > 0.60:
match_x_in_search = max_loc[0]
absolute_match_x = current_master.shape[1] - search_w + match_x_in_search
next_start_idx = current_master.shape[1] - absolute_match_x
if next_start_idx < next_pano.shape[1]:
append_part = next_pano[:, next_start_idx:]
if append_part.shape[1] > 0:
current_master = np.hstack([current_master, append_part])
matched = True
if not matched:
merged_list.append(current_master)
current_master = next_pano
merged_list.append(current_master)
return merged_list
def extract_unique_scroll(frames: List[np.ndarray],
threshold: float = SIMILARITY_THRESHOLD) -> List[np.ndarray]:
"""스크롤형: 업스케일 + HSV + median voting + 트림 + MSE → 파노라마 → pHash"""
def extract_unique_scroll(frames: List[np.ndarray], threshold: float = SIMILARITY_THRESHOLD) -> List[np.ndarray]:
print(f"[4/5] 스크롤형 Tab 추출 중 (threshold={threshold})...")
# ── Phase 1: 전체 프레임의 strip 위치 수집 (median voting) ──
strip_tops = []
strip_bottoms = []
strip_tops, strip_bottoms = [], []
for frame in frames:
orig_h, orig_w = frame.shape[:2]
if orig_w < DETECT_WIDTH:
scale = DETECT_WIDTH / orig_w
upscaled = cv2.resize(frame, (DETECT_WIDTH, int(orig_h * scale)),
interpolation=cv2.INTER_LANCZOS4)
else:
upscaled = frame
scale = 1.0
strip = _find_white_tab_strip(upscaled)
if strip is not None:
up_top, up_bottom = strip
strip_tops.append(int(up_top / scale))
strip_bottoms.append(int(up_bottom / scale))
if not strip_tops:
print(" → 흰색 스트립 미감지")
return []
strip = _find_white_tab_strip(frame)
if strip:
strip_tops.append(strip[0])
strip_bottoms.append(strip[1])
if not strip_tops: return []
median_top = int(np.median(strip_tops))
median_bottom = int(np.median(strip_bottoms))
print(f" → 크롭 영역: y={median_top}~{median_bottom} "
f"(median of {len(strip_tops)} strips)")
# ── Phase 2: 크롭 + 트림 + MSE 1차 필터 ──
candidates = []
all_compared = []
candidates, all_compared = [], []
for frame in frames:
h = frame.shape[0]
top = max(0, median_top)
bottom = min(h, median_bottom)
tab_crop = frame[top:bottom, :]
if not _has_tab_content(tab_crop):
continue
# 🚨 _trim_to_content를 각 프레임별로 적용하면 음표 높낮이에 따라 프레임 높이가 들쭉날쭉해짐.
# 이후 스크롤 합성(stitch)에서 min_h로 잘리면서 악보가 다 잘려나가는(Crop) 치명적 원인이 됨!
# tab_crop = _trim_to_content(tab_crop)
tab_crop = frame[max(0, median_top):min(h, median_bottom), :]
if not _has_tab_content(tab_crop): continue
compare_img = cv2.resize(tab_crop, (480, 120), interpolation=cv2.INTER_AREA)
is_dup = False
for ref in all_compared:
if compare_frames(compare_img, ref) >= threshold:
is_dup = True
break
if not is_dup:
candidates.append(tab_crop)
all_compared.append(compare_img)
print(f" → MSE 1차: {len(candidates)}개 후보")
# ── Phase 2.5: 파노라마 스티칭 (스크롤 겹침 제거) ──
stitched = _merge_scroll_candidates(candidates)
if len(stitched) != len(candidates):
print(f" → 파노라마: {len(candidates)}개 → {len(stitched)}개 (스크롤 합성)")
# ── Phase 3: pHash 2차 클러스터 중복 제거 ──
unique = _dedup_by_hash(stitched, max_hamming=20)
print(f" → pHash 2차: {len(unique)}개 고유 Tab 프레임")
# ── Phase 4: 마디번호 기반 최종 중복 제거 (OCR) ──
unique = _dedup_by_measure_number(unique)
return unique
merged_panoramas = merge_panoramas_list(stitched)
chunk_width = candidates[0].shape[1] if candidates else 1280
final_chunks = []
global_measure_counter = 1
current_row = None
for pano in merged_panoramas:
gray_pano = _extract_print_channel(pano)
bar_coords = _detect_measure_bars(gray_pano)
if not bar_coords:
w = pano.shape[1]
start_x = 0
while start_x < w:
chunk = pano[:, start_x:min(w, start_x + chunk_width)]
if chunk.shape[1] < chunk_width:
pad = np.full((chunk.shape[0], chunk_width - chunk.shape[1], 3), 255, dtype=np.uint8)
chunk = np.hstack([chunk, pad])
gray_c = _extract_print_channel(chunk)
final_chunks.append(cv2.cvtColor(gray_c, cv2.COLOR_GRAY2BGR))
start_x += chunk_width
continue
coords = [0] + bar_coords + [pano.shape[1]]
coords = sorted(list(set(coords)))
for i in range(len(coords) - 1):
x_start = coords[i]
x_end = coords[i+1]
if x_end - x_start < 50:
continue
measure_img = pano[:, x_start:x_end]
gray_m = _extract_print_channel(measure_img)
bgr_m = cv2.cvtColor(gray_m, cv2.COLOR_GRAY2BGR)
bgr_m = _stamp_measure_number(bgr_m, global_measure_counter)
global_measure_counter += 1
if current_row is None:
current_row = bgr_m
else:
if current_row.shape[1] + bgr_m.shape[1] > chunk_width:
pad_w = chunk_width - current_row.shape[1]
if pad_w > 0:
pad_img = np.full((current_row.shape[0], pad_w, 3), 255, dtype=np.uint8)
current_row = np.hstack([current_row, pad_img])
final_chunks.append(current_row)
current_row = bgr_m
else:
current_row = np.hstack([current_row, bgr_m])
if current_row is not None:
pad_w = chunk_width - current_row.shape[1]
if pad_w > 0:
pad_img = np.full((current_row.shape[0], pad_w, 3), 255, dtype=np.uint8)
current_row = np.hstack([current_row, pad_img])
final_chunks.append(current_row)
return final_chunks
def extract_unique_overlay(frames: List[np.ndarray],
threshold: float = OVERLAY_SIMILARITY_THRESHOLD) -> List[np.ndarray]: