Files
guitar_score/score_extractor.py

249 lines
12 KiB
Python

import cv2
import numpy as np
from typing import List
class ScoreExtractor:
def __init__(self):
self.seen_pages: List[np.ndarray] = []
self.final_sheet_chunks: List[np.ndarray] = []
def _find_overlap_len(self, ref_img: np.ndarray, query_img: np.ndarray) -> int:
"""Returns the NUMBER OF PIXELS that query_img overlaps with the right side of ref_img.
0 means no overlap (pure jump cut or new line)."""
if ref_img.shape[0] != query_img.shape[0]: return 0
ref_gray = cv2.cvtColor(ref_img, cv2.COLOR_BGR2GRAY) if len(ref_img.shape) == 3 else ref_img
query_gray = cv2.cvtColor(query_img, cv2.COLOR_BGR2GRAY) if len(query_img.shape) == 3 else query_img
# Downsample for extreme speed & noise reduction
h, w = ref_gray.shape
small_ref = cv2.resize(ref_gray, (w//2, h//2))
small_qry = cv2.resize(query_gray, (query_gray.shape[1]//2, h//2))
sw = min(small_ref.shape[1], small_qry.shape[1])
min_ov_search = int(sw * 0.3)
for ov in range(sw-2, min_ov_search, -1):
ref_patch = small_ref[:, -ov:]
qry_patch = small_qry[:, :ov]
# MASKED MAD: We ONLY compute differences where there is ink (black pixels)!
mask = (ref_patch < 230) | (qry_patch < 230)
valid_pixels = np.count_nonzero(mask)
if valid_pixels < 100:
continue # Ignore overlaps that are basically pure white
diff = cv2.absdiff(ref_patch, qry_patch)
mad = np.sum(diff[mask]) / valid_pixels
if mad < 35.0:
return int(ov * 2)
return 0
def _ends_with_repeat_sign(self, block_bgr: np.ndarray) -> bool:
"""Checks if the end of the block has a thick repeat measure line (||:)."""
bars = self._find_all_measure_bars(block_bgr, block_bgr.shape[1], return_thickness=True)
if not bars: return False
x, thickness = bars[-1]
# If the last bar in the block is very close to the right edge and is thick >= 6px
if thickness >= 6 and (block_bgr.shape[1] - x < 150):
return True
return False
def process_pages(self, unique_pages: List[np.ndarray]):
print(f"[ScoreExtractor] Initializing Full-Page Structural State Machine over {len(unique_pages)} Pages")
waiting_for_return = False
for idx, page_bgr in enumerate(unique_pages):
page_gray = cv2.cvtColor(page_bgr, cv2.COLOR_BGR2GRAY) if len(page_bgr.shape) == 3 else page_bgr
if np.mean(page_gray) < 120:
print(f" [Page {idx}] Ignored: Failed brightness check (Dark Scene).")
continue
if not self.final_sheet_chunks:
self.final_sheet_chunks.append(page_bgr)
else:
last_chunk = self.final_sheet_chunks[-1]
search_tail_width = min(last_chunk.shape[1], 1500)
ref_tail = last_chunk[:, -search_tail_width:]
overlap_len = self._find_overlap_len(ref_tail, page_bgr)
if overlap_len > 0 and overlap_len < page_bgr.shape[1]:
# CONTINUOUS SCROLL
new_slice = page_bgr[:, overlap_len:]
if waiting_for_return:
print(f" [Page {idx}] Ignored (Continuous Scroll inside Rewind State).")
else:
if new_slice.shape[1] > 20:
self.final_sheet_chunks[-1] = np.hstack([last_chunk, new_slice])
print(f" [Page {idx}] Stitched continuously! Overlap: {overlap_len}px.")
elif overlap_len == page_bgr.shape[1] or overlap_len >= page_bgr.shape[1] * 0.95:
print(f" [Page {idx}] Ignored: 100% duplicate of previous context.")
else:
# JUMP CUT detected!
# If we were in a waiting state, we check if this jump cut breaks us out!
if waiting_for_return:
# Did it jump to a completely new measure (e.g. Coda)? Or is it continuing the rewind?
# If cross-block trim finds it, it's just a duplicate jump.
# We will strictly look at the jump. If it's a rewind jump cut, the chords will be identical to history.
# Wait, we don't even need that. Any jump cut after a wait state usually means moving to the Coda!
# We'll assume the FIRST jump cut AFTER a wait state ends the wait state!
waiting_for_return = False
print(f" [Page {idx}] New block started. Breaking out of Rewind Wait State!")
self.final_sheet_chunks.append(page_bgr)
continue
# Check if the current block ends with a repeat sign ||: BEFORE creating a new block
# Actually, if the CURRENT block (last_chunk) ends with ||:, then this jump cut IS a rewind!
if self._ends_with_repeat_sign(last_chunk):
waiting_for_return = True
print(f" [Page {idx}] Ignored: Video jumped backward after ||: sign. Entering Rewind Wait State.")
# We do NOT append this page because it's the start of the rewind!
else:
# Normal jump cut (like Verse 1 to Verse 2)
trim_x = self._find_cross_block_trim(last_chunk, page_bgr)
if trim_x > 0:
print(f" [Page {idx}] New block (Jump cut). Cross-Block overlap matched! Trimming {last_chunk.shape[1] - trim_x}px.")
self.final_sheet_chunks[-1] = last_chunk[:, :trim_x]
else:
print(f" [Page {idx}] New block started (Jump cut detected). No cross-block match.")
self.final_sheet_chunks.append(page_bgr)
print(f"[ScoreExtractor] Finalized with {len(self.final_sheet_chunks)} jump-cut super-blocks.")
def _find_all_measure_bars(self, img_bgr: np.ndarray, max_width: int, return_thickness=False) -> List:
"""Returns physical x-coordinates of all vertical measure lines.
If return_thickness is True, returns List of (x_bar, thickness)."""
cw = min(img_bgr.shape[1], max_width)
img_gray = cv2.cvtColor(img_bgr[:, :cw], cv2.COLOR_BGR2GRAY)
_, bin_inv = cv2.threshold(img_gray, 200, 255, cv2.THRESH_BINARY_INV)
row_sums = np.sum(bin_inv, axis=1) / 255.0
staff_rows = np.where(row_sums > cw * 0.4)[0]
if len(staff_rows) >= 6:
staff_y_top, staff_y_bottom = staff_rows[0], staff_rows[-1]
for r in staff_rows:
if r - staff_y_top > 100: break
staff_y_bottom = r
else:
staff_y_top, staff_y_bottom = int(img_bgr.shape[0] * 0.3), int(img_bgr.shape[0] * 0.8)
expected_h = max(10, staff_y_bottom - staff_y_top + 1)
staff_region = bin_inv[staff_y_top:staff_y_bottom+1, :]
col_sums = np.sum(staff_region, axis=0) / 255.0
bar_xs = np.where(col_sums >= expected_h * 0.8)[0]
grouped_bars = []
if len(bar_xs) > 0:
current_group = [bar_xs[0]]
for x in bar_xs[1:]:
if x - current_group[-1] <= 15:
current_group.append(x)
else:
if len(current_group) <= 20:
grouped_bars.append((int(np.mean(current_group)), len(current_group)))
current_group = [x]
if len(current_group) <= 20:
grouped_bars.append((int(np.mean(current_group)), len(current_group)))
unique_bars = []
for p, thick in grouped_bars:
if not unique_bars or p - unique_bars[-1][0] >= 50:
unique_bars.append((p, thick))
if return_thickness:
return unique_bars
return [p for p, thick in unique_bars]
def _find_cross_block_trim(self, ref_block: np.ndarray, query_page: np.ndarray) -> int:
q_bars = self._find_all_measure_bars(query_page, min(1000, query_page.shape[1]))
if len(q_bars) < 2: return -1
x_start, x_end = q_bars[0], q_bars[1]
query_gray = cv2.cvtColor(query_page, cv2.COLOR_BGR2GRAY) if len(query_page.shape) == 3 else query_page
_, bin_inv = cv2.threshold(query_gray, 200, 255, cv2.THRESH_BINARY_INV)
staff_y_top = int(query_gray.shape[0] * 0.3)
row_sums = np.sum(bin_inv[:, :1000], axis=1) / 255.0
staff_rows = np.where(row_sums > 1000 * 0.4)[0]
if len(staff_rows) >= 6: staff_y_top = staff_rows[0]
box_y1 = max(0, staff_y_top - 25)
box_y2 = staff_y_top
box_x1 = x_start
box_x2 = min(x_end, x_start + 40)
measure_template = query_gray[box_y1:box_y2, box_x1:box_x2]
_, template_inv = cv2.threshold(measure_template, 200, 255, cv2.THRESH_BINARY_INV)
if np.count_nonzero(template_inv) < 5: return -1
search_w = min(1500, ref_block.shape[1])
ref_tail = ref_block[:, -search_w:]
ref_gray = cv2.cvtColor(ref_tail, cv2.COLOR_BGR2GRAY)
search_y1 = max(0, box_y1 - 10)
search_y2 = min(ref_gray.shape[0], box_y2 + 10)
ref_search_area = ref_gray[search_y1:search_y2, :]
_, ref_search_inv = cv2.threshold(ref_search_area, 200, 255, cv2.THRESH_BINARY_INV)
res = cv2.matchTemplate(ref_search_inv, template_inv, cv2.TM_CCOEFF_NORMED)
_, max_val, _, max_loc = cv2.minMaxLoc(res)
if max_val > 0.55: # Relaxed threshold to absorb ┌─ 1. symbols bleeding into the number box
match_x_in_tail = max_loc[0]
absolute_trim_x = ref_block.shape[1] - search_w + match_x_in_tail - x_start
return max(0, absolute_trim_x - 5)
return -1
def tile_to_a4(self, chunk_width: int=1800) -> List[np.ndarray]:
if not self.final_sheet_chunks: return []
panorama = np.hstack(self.final_sheet_chunks)
rows = []
x_curr = 0
total_w = panorama.shape[1]
print(f"[ScoreExtractor] Formatting {total_w}px panorama sequence into A4 sheets...")
while x_curr < total_w:
remaining_w = total_w - x_curr
if remaining_w <= chunk_width:
r = panorama[:, x_curr:]
if r.shape[1] > 50:
r_padded = cv2.copyMakeBorder(r, 0, 0, 0, chunk_width - r.shape[1], cv2.BORDER_CONSTANT, value=[255,255,255])
rows.append(r_padded)
break
slice_bgr = panorama[:, x_curr : min(x_curr + chunk_width + 100, total_w)]
bars = self._find_all_measure_bars(slice_bgr, slice_bgr.shape[1])
# Find the last bar. Subtract a safe margin so we don't bleed into the next measure box!
# If we cut 10px BEFORE the measure bar, the bar itself and its digit (like '97') uniquely sit on the NEXT row!
# Require b > 50 so we don't get trapped cutting repeatedly at the left-most bar!
valid_bars = [b for b in bars if 50 < b < chunk_width - 15]
if not valid_bars:
cut_offset = chunk_width
else:
# Cut EXACTLY 10 pixels BEFORE the measure bar!
cut_offset = valid_bars[-1] - 10
r = panorama[:, x_curr : x_curr + cut_offset]
r_padded = cv2.copyMakeBorder(r, 0, 0, 0, chunk_width - r.shape[1], cv2.BORDER_CONSTANT, value=[255,255,255])
rows.append(r_padded)
x_curr += cut_offset
print(f"[ScoreExtractor] Success: Tiled structurally into {len(rows)} A4 landscape rows (chops are aligned with measures).")
return rows