fix(anime): 파이프라인 5건 수정 — 에피소드 정규식(v2/S01E), 릴리스 그룹 필터, 자막 보호, 배치 다운로드, 타임아웃

This commit is contained in:
2026-03-15 08:27:08 +09:00
parent 63818999d9
commit 9f74812710
40 changed files with 2759 additions and 815 deletions

View File

@@ -1 +1 @@
# Anime automation tools package.
# tools 패키지

View File

@@ -20,6 +20,7 @@ from tools.qbit_client import QBitClient
from tools.subtitle_downloader import SubtitleDownloader, SubtitleFile
from tools.title_matcher import (
match_titles, make_nas_folder_name, rename_subtitle_to_video,
fetch_english_title,
)
logger = logging.getLogger("variet.tools.pipeline")
@@ -81,48 +82,58 @@ class AnimePipeline:
except Exception as e:
result.errors.append(f"자막 조회 오류: {e}")
# 3. Nyaa 토렌트 검색 (다중 전략 — suffix 있는/없는 조합)
# 3. NAS 기존 폴더 확인 → 검색 전략 결정 (방영 시점 기반)
nas_existing = self._find_existing_nas_folder(anime.subject, anime.start_date)
# 3. Nyaa 토렌트 검색
try:
from tools.title_matcher import japanese_to_romaji
import re as _re
if nas_existing and nas_existing.video_files:
# ── 기존 파일명에서 릴리스명 추출 → Nyaa 검색 (안전) ──
release_name = self._extract_release_name(nas_existing.video_files[0])
if release_name:
logger.info(f"NAS 기존 릴리스명: '{release_name}'")
found = await self.nyaa.search(release_name, use_default_suffix=False)
matched = [t for t in found
if self._title_contains_keyword(t.title, [release_name.lower()])]
result.torrents = matched[:30]
if matched:
logger.info(f"NAS 릴리스명 검색 → {len(found)}건 중 {len(matched)}건 매칭")
romaji_full = japanese_to_romaji(anime.original_subject)
# 한자/비ASCII 잔류 문자 제거 → 순수 로마자만 추출
romaji_clean = _re.sub(r'[^\x00-\x7F]+', ' ', romaji_full).strip()
romaji_clean = _re.sub(r'\s+', ' ', romaji_clean)
if not result.torrents:
# ── 신규 애니: Jikan API + ASW HEVC 전략 ──
eng_titles = await fetch_english_title(anime.original_subject)
eng_default = eng_titles.get("default", "")
eng_english = eng_titles.get("english", "")
synonyms = eng_titles.get("synonyms", [])
# 검색 전략 (query, use_default_suffix) 순서
strategies: list[tuple[str, bool]] = []
if romaji_clean and len(romaji_clean) >= 3:
strategies.append((romaji_clean, True)) # romaji + ASW HEVC
strategies.append((romaji_clean, False)) # romaji only
strategies.append((anime.original_subject, True)) # 원제 + suffix
strategies.append((anime.original_subject, False)) # 원제 only
strategies.append((anime.subject, True)) # 한글 + suffix
strategies.append((anime.subject, False)) # 한글 only
torrents = []
for query, use_suffix in strategies:
torrents = await self.nyaa.search(
query, use_default_suffix=use_suffix,
keywords = self._build_match_keywords(
eng_default, eng_english, synonyms, anime.original_subject,
)
if torrents:
suffix_label = " +suffix" if use_suffix else ""
logger.info(
f"Nyaa 검색 성공: '{query}'{suffix_label}{len(torrents)}"
)
break
logger.info(f"매칭 키워드: {keywords}")
# 제목 매칭 필터
matched = match_titles(
anime.subject, anime.original_subject, torrents, threshold=0.3
)
result.torrents = matched[:20] # 상위 20개
# STEP 1: "ASW HEVC"로 검색 → 키워드로 필터
asw_results = await self.nyaa.search("ASW HEVC", use_default_suffix=False)
matched = [t for t in asw_results
if self._title_contains_keyword(t.title, keywords)]
if matched:
logger.info(f"ASW HEVC 검색 → {len(asw_results)}건 중 {len(matched)}건 매칭")
else:
# ASW 릴리스 없음 — 사용자에게 안내
result.errors.append(
f"⚠️ ASW HEVC 릴리스가 없습니다.\n"
f"영어 제목: {eng_default or '(조회 실패)'}\n"
f"Nyaa에서 직접 검색해주세요."
)
result.torrents = matched[:30]
except Exception as e:
result.errors.append(f"Nyaa 검색 오류: {e}")
# NAS 폴더 생성
result.nas_folder = make_nas_folder_name(anime.subject, anime.start_date)
# NAS 폴더: 기존 폴더 있으면 재사용, 없으면 새로 생성
if nas_existing:
result.nas_folder = nas_existing.folder_name
else:
result.nas_folder = make_nas_folder_name(anime.subject, anime.start_date)
result.success = True
result.message = (
@@ -143,8 +154,12 @@ class AnimePipeline:
Args:
title: 한글 제목
mode: "auto" (자막+영상), "sub_only" (자막만), "video_only" (영상만)
episode: 특정 에피소드만 (None이면 최신)
mode:
"auto" — 영상+자막 무조건 다운 (기본)
"sub_required" — 자막 있는 에피소드만 영상 다운
"sub_only" — 자막만
"video_only" — 영상만
episode: 특정 에피소드만 (None이면 빠진 것 전부)
"""
# 먼저 검색
result = await self.search(title)
@@ -155,13 +170,18 @@ class AnimePipeline:
nas_folder = Path(self.nas_base) / result.nas_folder
# ── 자막 다운로드 ──
if mode in ("auto", "sub_only"):
if mode in ("auto", "sub_only", "sub_required"):
await self._download_subtitles(result, nas_folder, episode)
# ── 영상 토렌트 추가 ──
if mode in ("auto", "video_only"):
force = (mode == "video_only")
await self._add_torrents(result, nas_folder, episode, force=force)
await self._add_torrents(result, nas_folder, episode)
elif mode == "sub_required":
# 자막이 실제로 다운됐을 때만 영상 추가
if result.subtitles:
await self._add_torrents(result, nas_folder, episode)
else:
result.errors.append("자막이 없어 영상 다운로드를 보류합니다.")
# 결과 메시지 구성
parts = [result.message]
@@ -181,8 +201,25 @@ class AnimePipeline:
nas_folder: Path,
episode: Optional[int],
):
"""자막 다운로드 처리."""
sub_dir = nas_folder / "subtitles"
"""자막 다운로드 → 영상 폴더에 직접 저장 + 영상명 매칭 리네임.
기존 자막이 있는 에피소드는 건너뜀 (수동 자막 보호).
"""
# 영상 폴더에 직접 저장 (subtitles/ 하위 아님)
nas_folder.mkdir(parents=True, exist_ok=True)
# 기존 자막 파일이 있는 에피소드 스캔 → 스킵 대상
existing_sub_eps = set()
sub_exts = {".ass", ".srt", ".ssa", ".sub", ".smi"}
if nas_folder.exists():
for f in nas_folder.iterdir():
if f.suffix.lower() in sub_exts:
ep = self._extract_episode(f.stem)
if ep is not None:
existing_sub_eps.add(ep)
if existing_sub_eps:
logger.info(f"기존 자막 에피소드 (스킵): {sorted(existing_sub_eps)}")
for caption in result.captions:
if not caption.website:
@@ -195,54 +232,379 @@ class AnimePipeline:
for sub in subs:
if episode is not None and sub.episode is not None and sub.episode != episode:
continue
# 기존 자막이 있는 에피소드 스킵
if sub.episode is not None and sub.episode in existing_sub_eps:
logger.info(f"자막 스킵 (기존 존재): {sub.episode}화 - {sub.filename}")
continue
try:
await self.sub_downloader.download_file(sub, str(sub_dir))
await self.sub_downloader.download_file(sub, str(nas_folder))
result.subtitles.append(sub)
except Exception as e:
result.errors.append(f"자막 다운로드 실패 ({sub.filename}): {e}")
except Exception as e:
result.errors.append(f"자막 사이트 접근 실패 ({caption.name}): {e}")
# 다운로드 후: 기존 영상 파일과 매칭하여 자막 리네임
self._rename_subtitles_to_match_videos(nas_folder, result)
def _rename_subtitles_to_match_videos(
self, folder: Path, result: DownloadResult
):
"""폴더 내 자막 파일을 영상 파일명에 맞게 리네임.
예: [ASW] Sousou no Frieren S2 - 03.mkv
→ 3화.ass 를 [ASW] Sousou no Frieren S2 - 03.ass 로 변경
"""
import re as _re
# 영상 파일 목록 (에피소드 → 파일명)
video_exts = {".mkv", ".mp4", ".avi", ".webm"}
videos = {} # episode_num -> video_path
for f in folder.iterdir():
if f.suffix.lower() in video_exts:
ep = self._extract_episode(f.stem)
if ep is not None:
videos[ep] = f
if not videos:
return
# 자막 파일 리네임
sub_exts = {".ass", ".srt", ".ssa", ".sub"}
for f in folder.iterdir():
if f.suffix.lower() not in sub_exts:
continue
ep = self._extract_episode(f.stem)
if ep is not None and ep in videos:
video_stem = videos[ep].stem
new_name = f"{video_stem}{f.suffix}"
new_path = folder / new_name
if new_path != f and not new_path.exists():
try:
f.rename(new_path)
logger.info(f"자막 리네임: {f.name}{new_name}")
except Exception as e:
logger.warning(f"자막 리네임 실패: {e}")
@staticmethod
def _extract_episode(text: str) -> Optional[int]:
"""텍스트에서 에피소드 번호 추출."""
import re as _re
# 패턴 1: S01E03, S02E07 (SxxExx — 시즌+에피소드, 가장 먼저 체크)
m = _re.search(r'[Ss]\d{1,2}[Ee](\d{1,4})', text)
if m:
return int(m.group(1))
# 패턴 2: "- 03", "- 06", "- 10v2" (torrent 파일명, v2 등 version suffix 허용)
m = _re.search(r'[-]\s*(\d{1,4})(?:v\d)?(?:\s|$|\.|\[|\()', text)
if m:
return int(m.group(1))
# 패턴 3: "3화", "03화"
m = _re.search(r'(\d{1,4})\s*화', text)
if m:
return int(m.group(1))
# 패턴 4: "EP03", "Episode 3"
m = _re.search(r'(?:EP|Episode)\s*(\d{1,4})', text, _re.IGNORECASE)
if m:
return int(m.group(1))
return None
def _find_existing_nas_folder(self, korean_title: str, start_date: str = ""):
"""NAS에서 기존 폴더 찾기 — 제목 + 방영 시점(year/quarter) 기반.
같은 애니라도 방영 분기가 다르면 다른 시즌 → 매칭하지 않음.
예: '최애의 아이 3기'(26_1분기) ≠ [23_2분기]최애의아이 (1기)
"""
import re as _re
from tools.title_matcher import get_quarter
title_norm = _re.sub(r'[^\w]', '', korean_title.lower())
if len(title_norm) < 2:
return None
# 방영 분기 계산
anime_year, anime_quarter = get_quarter(start_date)
try:
all_folders = self.nas.list_anime_folders()
except Exception as e:
logger.warning(f"NAS 폴더 검색 실패: {e}")
return None
candidates = []
for folder in all_folders:
folder_norm = _re.sub(r'[^\w]', '', folder.title.lower())
# 제목 부분 매칭 (양방향)
if not (title_norm in folder_norm or folder_norm in title_norm):
continue
# 방영 분기 일치 확인
if anime_year and folder.year != anime_year:
continue
if anime_quarter and folder.quarter != anime_quarter:
continue
candidates.append(folder)
if not candidates:
return None
best = candidates[0]
logger.info(f"NAS 기존 폴더 발견: {best.folder_name}")
return best
@staticmethod
def _extract_release_name(filename: str) -> str:
"""영상 파일명에서 릴리스 이름 추출.
[ASW] Hime-sama Goumon no Jikan desu - 21 [1080p HEVC].mkv
'Hime-sama Goumon no Jikan desu'
"""
import re as _re
# 확장자 제거
name = _re.sub(r'\.[^.]+$', '', filename)
# [그룹태그] 제거
name = _re.sub(r'^\[[^\]]*\]\s*', '', name)
# 에피소드 번호 이후 제거: " - 21 [...]"
name = _re.sub(r'\s*[-]\s*\d+.*$', '', name).strip()
# S02E09 패턴 제거
name = _re.sub(r'\s*S\d+E\d+.*$', '', name, flags=_re.IGNORECASE).strip()
return name
@staticmethod
def _build_match_keywords(
eng_default: str, eng_english: str,
synonyms: list[str], original_title: str,
) -> list[str]:
"""Jikan 제목들에서 매칭용 키워드 추출.
예: "Sousou no Frieren 2nd Season" → ["Frieren", "Sousou no Frieren"]
synonyms: ["Omagoto"] → ["Omagoto"]
"""
import re as _re
keywords = []
# synonyms 중 짧은 것 (Omagoto 같은 약칭)
for syn in synonyms:
cleaned = syn.strip()
if 3 <= len(cleaned) <= 30:
keywords.append(cleaned.lower())
# eng_default에서 키워드 추출 (시즌 표기 제거)
if eng_default:
clean = _re.sub(r'\s*(2nd|3rd|\d+th)\s*Season.*$', '', eng_default, flags=_re.IGNORECASE).strip()
clean = _re.sub(r'\s*S\d+$', '', clean).strip()
if len(clean) >= 3:
keywords.append(clean.lower())
# eng_english에서 콜론 앞 핵심 단어
if eng_english:
short = eng_english.split(":")[0].strip()
if len(short) >= 3:
keywords.append(short.lower())
# 원제 (일본어) — 정규화 없이 원본으로 비교
if original_title and len(original_title) >= 2:
import re as _re2
clean = _re2.sub(r'\s*第\d+期$', '', original_title).strip()
if clean:
keywords.append(clean.lower())
# 중복 제거
seen = set()
unique = []
for k in keywords:
if k not in seen:
seen.add(k)
unique.append(k)
return unique
@staticmethod
def _title_contains_keyword(nyaa_title: str, keywords: list[str]) -> bool:
"""Nyaa 토렌트 제목에 키워드 중 하나라도 포함되는지 체크.
영문 키워드: 특수문자(하이픈, 따옴표) 제거 후 비교.
일본어 키워드: 원본 그대로 비교.
"""
import re as _re
title_lower = nyaa_title.lower()
# 영문 정규화 버전
title_norm = _re.sub(r'[^a-z0-9\s]', '', title_lower)
for kw in keywords:
if not kw or len(kw) < 2:
continue
# ASCII만 포함된 키워드 → 정규화 비교
kw_norm = _re.sub(r'[^a-z0-9\s]', '', kw)
if kw_norm and len(kw_norm) >= 3 and kw_norm in title_norm:
return True
# 비ASCII(일본어 등) → 원본 비교
if not kw.isascii() and kw in title_lower:
return True
return False
async def _add_torrents(
self,
result: DownloadResult,
nas_folder: Path,
episode: Optional[int],
force: bool = False,
):
"""토렌트 추가 처리."""
"""토렌트 추가 — 빠진 에피소드 전부 다운로드.
릴리스 그룹 일관성: NAS 기존 파일의 릴리스 그룹(예: ASW)이 있으면
같은 그룹의 토렌트만 추가. 매칭 없으면 스킵.
"""
if not result.torrents:
result.errors.append("매칭되는 토렌트가 없습니다.")
return
# 에피소드 필터링
candidates = result.torrents
if episode is not None:
candidates = [t for t in candidates if t.episode == episode]
if not candidates:
result.errors.append(f"{episode}화 토렌트를 찾지 못했습니다.")
return
import math
import re as _re
# auto 모드 기본 조건: 자막이 있어야 영상 다운로드 (force면 무시)
if not force and not result.captions and not result.subtitles:
# 자막이 없으면 사용자에게 안내만
result.errors.append("자막이 없어 영상 다운로드를 보류합니다. /anime video로 강제 다운로드 가능")
# NAS 기존 에피소드 + 릴리스 그룹 스캔
existing_eps = set()
existing_groups = [] # 기존 파일들의 릴리스 그룹
if nas_folder.exists():
video_exts = {".mkv", ".mp4", ".avi", ".webm", ".ts"}
for f in nas_folder.iterdir():
if f.suffix.lower() in video_exts:
ep = self._extract_episode(f.stem)
if ep is not None:
existing_eps.add(ep)
# 릴리스 그룹 추출: [ASW], [SubsPlease] 등
m = _re.match(r'\[([^\]]+)\]', f.name)
if m:
existing_groups.append(m.group(1))
if existing_eps:
logger.info(f"NAS 기존 에피소드: {sorted(existing_eps)}")
# 기존 릴리스 그룹 결정 (가장 많이 등장하는 그룹)
required_group = None
if existing_groups:
from collections import Counter
group_counts = Counter(existing_groups)
dominant_group, count = group_counts.most_common(1)[0]
if count >= 2: # 2개 이상 파일에서 동일 그룹이면 확정
required_group = dominant_group
logger.info(f"NAS 릴리스 그룹: [{required_group}] ({count}개 파일)")
# 에피소드별 최고 점수 토렌트 그룹핑
ep_best: dict[int, tuple[int, object]] = {} # ep → (score, torrent)
for t in result.torrents:
ep = self._extract_episode(t.title)
if ep is None:
continue
# 특정 에피소드 요청 시 해당 에피소드만
if episode is not None and ep != episode:
continue
# NAS 중복 스킵
if ep in existing_eps:
continue
# VOSTFR 제외
title_upper = t.title.upper()
if "VOSTFR" in title_upper or "VOSTA" in title_upper:
continue
# 릴리스 그룹 일관성 필터: NAS에 특정 그룹이 있으면 같은 그룹만 허용
if required_group:
if f"[{required_group}]" not in t.title:
continue
# 스코어링
score = 0
if "[ASW]" in t.title:
score += 100
if "HEVC" in title_upper or "X265" in title_upper:
score += 50
if "1080P" in title_upper:
score += 20
if t.seeders > 0:
score += int(math.log(t.seeders) * 5)
if ep not in ep_best or score > ep_best[ep][0]:
ep_best[ep] = (score, t)
if not ep_best:
if episode is not None:
result.errors.append(f"{episode}화 토렌트를 찾지 못했습니다.")
elif required_group:
result.errors.append(
f"새로 다운로드할 에피소드가 없습니다 "
f"([{required_group}] 릴리스 기준, 모두 NAS에 존재하거나 미출시)."
)
else:
result.errors.append("새로 다운로드할 에피소드가 없습니다 (모두 NAS에 존재).")
return
# 최상위 1개 (가장 시더 많은) 추가
best = candidates[0]
try:
success = await self.qbit.add_torrent(
magnet_or_url=best.magnet_link,
save_path=str(nas_folder),
category="anime",
tags=result.anime.subject if result.anime else "",
)
result.torrent_added = success
if not success:
result.errors.append("qBittorrent 토렌트 추가 실패")
except Exception as e:
result.errors.append(f"qBittorrent 오류: {e}")
# 에피소드 순서대로 추가
added_count = 0
for ep in sorted(ep_best.keys()):
_, torrent = ep_best[ep]
try:
success = await self.qbit.add_torrent(
magnet_or_url=torrent.magnet_link,
save_path=str(nas_folder),
category="anime",
tags=result.anime.subject if result.anime else "",
)
if success:
added_count += 1
logger.info(f"토렌트 추가: ep{ep} - {torrent.title[:50]}")
else:
result.errors.append(f"ep{ep} 토렌트 추가 실패")
except Exception as e:
result.errors.append(f"ep{ep} qBittorrent 오류: {e}")
result.torrent_added = added_count > 0
if added_count > 0:
new_eps = sorted(ep_best.keys())
result.message += f"\n📥 {added_count}개 에피소드 추가: {new_eps}"
@staticmethod
def _select_best_torrent(candidates: list, existing_eps: set = None):
"""ASW HEVC 우선으로 최적 토렌트 선택 (검색 결과 표시용).
스코어링:
+100 [ASW] 그룹
+50 HEVC / x265 코덱
+20 1080p 해상도
+log(시더수) * 5 (최대 ~30)
VOSTFR / non-English 릴리스는 완전 제외.
기존 에피소드는 스킵.
"""
import re as _re
import math
if existing_eps is None:
existing_eps = set()
scored = []
for t in candidates:
title_upper = t.title.upper()
if "VOSTFR" in title_upper or "VOSTA" in title_upper:
continue
score = 0
if "[ASW]" in t.title:
score += 100
if "HEVC" in title_upper or "X265" in title_upper:
score += 50
if "1080P" in title_upper:
score += 20
if t.seeders > 0:
score += int(math.log(t.seeders) * 5)
scored.append((score, t))
if not scored:
return None
scored.sort(key=lambda x: x[0], reverse=True)
return scored[0][1]
async def get_status(self) -> list[dict]:
"""현재 다운로드 큐 상태."""
@@ -263,3 +625,163 @@ class AnimePipeline:
except Exception as e:
logger.error(f"qBittorrent 상태 조회 오류: {e}")
return []
async def batch_download(
self,
mode: str = "auto",
sub_filter: bool = True,
) -> list[DownloadResult]:
"""이번 분기 애니 일괄 다운로드.
Args:
mode: 다운로드 모드 ("auto", "sub_only", "video_only")
sub_filter: True면 Anissia에 자막이 등록된 애니만 처리
Returns:
각 애니별 DownloadResult 리스트
"""
# 1. NAS에서 이번 분기 애니 폴더 스캔
current_folders = self.nas.get_current_quarter_anime()
if not current_folders:
logger.warning("이번 분기 NAS 폴더 없음")
return []
logger.info(f"이번 분기 NAS 폴더: {len(current_folders)}")
results = []
for folder in current_folders:
title = folder.title
logger.info(f"\n{'='*40}")
logger.info(f"처리 중: {folder.folder_name}")
try:
# 2. Anissia에서 검색 → 자막 정보 확인
anime_list = await self.anissia.search_anime(title)
if not anime_list:
logger.info(f" Anissia 검색 결과 없음 → 건너뜀")
continue
anime = anime_list[0]
# 3. 자막 필터: Anissia에 자막 제작자가 있는지 확인
if sub_filter:
captions = await self.anissia.get_captions(anime.anime_no)
if not captions:
logger.info(f" 자막 없음 → 건너뜀")
continue
logger.info(f" 자막 {len(captions)}건 발견 → 다운로드 진행")
# 4. 기존 에피소드 확인
from pathlib import Path as _Path
nas_path = _Path(self.nas_base) / folder.folder_name
existing_eps = set()
video_exts = {".mkv", ".mp4", ".avi", ".webm", ".ts"}
if nas_path.exists():
for f in nas_path.iterdir():
if f.suffix.lower() in video_exts:
ep = self._extract_episode(f.stem)
if ep is not None:
existing_eps.add(ep)
# 5. 다운로드 실행
result = await self.download(title, mode=mode)
results.append(result)
status = "" if result.success else ""
logger.info(f" {status} {result.message[:100]}")
except Exception as e:
logger.error(f" 오류 ({folder.folder_name}): {e}")
err_result = DownloadResult(
success=False,
message=f"{folder.folder_name}: 오류 - {e}",
errors=[str(e)],
)
results.append(err_result)
return results
# ── CLI 진입점 ──
if __name__ == "__main__":
import sys
import asyncio
import json
args = sys.argv[1:]
pipeline = AnimePipeline()
async def main():
if not args:
print("사용법: python tools/anime_pipeline.py [search|download|batch|status] [옵션]")
return
if args[0] == "search" and len(args) > 1:
# python tools/anime_pipeline.py search "프리렌"
title = " ".join(args[1:])
result = await pipeline.search(title)
print(result.message)
if result.errors:
print(f"⚠️ 오류: {'; '.join(result.errors)}")
elif args[0] == "download" and len(args) > 1:
# python tools/anime_pipeline.py download "프리렌" [--mode auto] [--episode 10]
title_parts = []
mode = "auto"
episode = None
i = 1
while i < len(args):
if args[i] == "--mode" and i + 1 < len(args):
mode = args[i + 1]
i += 2
elif args[i] == "--episode" and i + 1 < len(args):
episode = int(args[i + 1])
i += 2
else:
title_parts.append(args[i])
i += 1
title = " ".join(title_parts)
result = await pipeline.download(title, mode=mode, episode=episode)
print(result.message)
elif args[0] == "batch":
# python tools/anime_pipeline.py batch [--no-sub-filter] [--mode auto]
mode = "auto"
sub_filter = True
i = 1
while i < len(args):
if args[i] == "--no-sub-filter":
sub_filter = False
i += 1
elif args[i] == "--mode" and i + 1 < len(args):
mode = args[i + 1]
i += 2
else:
i += 1
print(f"📦 이번 분기 배치 다운로드 시작 (자막 필터: {'ON' if sub_filter else 'OFF'})")
results = await pipeline.batch_download(mode=mode, sub_filter=sub_filter)
success = sum(1 for r in results if r.success)
failed = sum(1 for r in results if not r.success)
print(f"\n📊 완료: {success}건 성공, {failed}건 실패 (총 {len(results)}건)")
for r in results:
icon = "" if r.success else ""
title = r.anime.subject if r.anime else "?"
print(f" {icon} {title}: {r.message[:80]}")
elif args[0] == "status":
# python tools/anime_pipeline.py status
status = await pipeline.get_status()
if not status:
print("🎬 다운로드 중인 항목 없음")
else:
for s in status:
print(f" {s['progress']} | {s['name'][:50]} | {s['speed']} | ETA: {s['eta']}")
else:
print("사용법: python tools/anime_pipeline.py [search|download|batch|status] [옵션]")
asyncio.run(main())

View File

@@ -110,11 +110,102 @@ class AnissiaClient:
]
async def search_anime(self, keyword: str) -> list[AnimeInfo]:
"""키워드로 전체 편성표에서 검색 (한글/일어 제목 매칭)."""
"""키워드로 전체 편성표에서 검색 (한글/일어/영문 fuzzy 매칭)."""
import re as _re
all_anime = await self.get_all_schedule()
keyword_lower = keyword.lower()
return [
a for a in all_anime
if keyword_lower in a.subject.lower()
or keyword_lower in a.original_subject.lower()
]
# 특수문자 제거 버전 (따옴표, 괄호 등)
keyword_norm = _re.sub(r'[^\w\s]', '', keyword_lower)
try:
from tools.title_matcher import japanese_to_romaji, title_similarity
use_romaji = True
except ImportError:
use_romaji = False
results = []
fuzzy_candidates = []
for a in all_anime:
subj_lower = a.subject.lower()
orig_lower = a.original_subject.lower()
# 특수문자 제거 버전
subj_norm = _re.sub(r'[^\w\s]', '', subj_lower)
orig_norm = _re.sub(r'[^\w\s]', '', orig_lower)
# 1차: substring 매칭 (원본 + 정규화)
if (keyword_lower in subj_lower or keyword_norm in subj_norm):
results.append(a)
elif (keyword_lower in orig_lower or keyword_norm in orig_norm):
results.append(a)
elif use_romaji:
romaji = japanese_to_romaji(a.original_subject).lower()
# 2차: romaji substring
if keyword_lower in romaji:
results.append(a)
else:
# 3차: 단어 단위 fuzzy — 검색어와 romaji 개별 단어 비교
words = romaji.split()
best_word_sim = max(
(title_similarity(keyword, w) for w in words),
default=0.0,
)
# 전체 문자열 유사도도 참고
full_sim = title_similarity(keyword, romaji)
best_sim = max(best_word_sim, full_sim)
if best_sim >= 0.6:
fuzzy_candidates.append((best_sim, a))
# exact 결과가 없을 때만 fuzzy 결과 사용
if not results and fuzzy_candidates:
fuzzy_candidates.sort(key=lambda x: x[0], reverse=True)
results = [a for _, a in fuzzy_candidates[:10]]
return results
# ── CLI 진입점 ──
if __name__ == "__main__":
import sys
import asyncio
client = AnissiaClient()
args = sys.argv[1:]
async def main():
if not args:
print("사용법: python tools/anissia_client.py [schedule|search|captions] [인자]")
return
if args[0] == "schedule":
# python tools/anissia_client.py schedule 3 (수요일)
week = int(args[1]) if len(args) > 1 else 0
anime_list = await client.get_schedule(week)
day = WEEK_NAMES.get(week, "?")
print(f"📺 {day}요일 편성표 ({len(anime_list)}개):")
for a in anime_list:
cap = f"자막 {a.caption_count}" if a.caption_count else "자막 없음"
print(f" {a.time} {a.subject} ({a.original_subject}) [{cap}]")
elif args[0] == "search" and len(args) > 1:
# python tools/anissia_client.py search "프리렌"
keyword = " ".join(args[1:])
results = await client.search_anime(keyword)
print(f"🔍 '{keyword}' 검색 결과 ({len(results)}개):")
for a in results:
print(f" [{a.anime_no}] {a.subject} ({a.original_subject}) | {WEEK_NAMES.get(a.week, '?')} {a.time}")
elif args[0] == "captions" and len(args) > 1:
# python tools/anissia_client.py captions 12345
anime_no = int(args[1])
captions = await client.get_captions(anime_no)
print(f"📝 자막 목록 ({len(captions)}건):")
for c in captions:
print(f" {c.episode}화 | {c.name} | {c.website} | {c.updated}")
else:
print("사용법: python tools/anissia_client.py [schedule|search|captions] [인자]")
asyncio.run(main())

View File

@@ -150,3 +150,44 @@ class NasScanner:
"total_size_gb": round(sum(f.total_size_gb for f in folders), 2),
"folders": folders,
}
# ── CLI 진입점 ──
if __name__ == "__main__":
import sys
import json
scanner = NasScanner()
args = sys.argv[1:]
if not args or args[0] == "scan":
# python tools/nas_scanner.py scan [--year 26] [--quarter 1]
year = quarter = None
for i, a in enumerate(args):
if a == "--year" and i + 1 < len(args):
year = int(args[i + 1])
if a == "--quarter" and i + 1 < len(args):
quarter = int(args[i + 1])
folders = scanner.list_anime_folders(year=year, quarter=quarter)
for f in folders:
print(f"📁 {f.folder_name} | 영상 {f.video_count}개 | 자막 {f.subtitle_count}개 | {f.total_size_gb:.1f}GB")
print(f"\n{len(folders)}개 애니, 영상 {sum(f.video_count for f in folders)}")
elif args[0] == "search" and len(args) > 1:
# python tools/nas_scanner.py search "프리렌"
keyword = " ".join(args[1:])
results = scanner.search(keyword)
for f in results:
print(f"📁 {f.folder_name} | 영상 {f.video_count}개 | 자막 {f.subtitle_count}")
if not results:
print(f"'{keyword}' 검색 결과 없음")
elif args[0] == "summary":
# python tools/nas_scanner.py summary
summary = scanner.get_summary()
print(json.dumps(summary, ensure_ascii=False, indent=2, default=str))
else:
print("사용법: python tools/nas_scanner.py [scan|search|summary] [옵션]")

View File

@@ -154,3 +154,48 @@ class NyaaClient:
# 시더 수 내림차순 정렬
results.sort(key=lambda r: r.seeders, reverse=True)
return results
# ── CLI 진입점 ──
if __name__ == "__main__":
import sys
import asyncio
args = sys.argv[1:]
client = NyaaClient()
async def main():
if not args or args[0] == "search":
# python tools/nyaa_client.py search "Sousou no Frieren" [--suffix "ASW HEVC"]
query_parts = []
suffix = "ASW HEVC"
i = 1 if args and args[0] == "search" else 0
while i < len(args):
if args[i] == "--suffix" and i + 1 < len(args):
suffix = args[i + 1]
i += 2
elif args[i] == "--no-suffix":
suffix = ""
i += 1
else:
query_parts.append(args[i])
i += 1
if not query_parts:
print("사용법: python tools/nyaa_client.py search \"제목\" [--suffix \"ASW HEVC\"]")
return
query = " ".join(query_parts)
client.default_suffix = suffix
results = await client.search(query, use_default_suffix=bool(suffix))
print(f"🔍 Nyaa 검색: '{query}' +'{suffix}'{len(results)}")
for r in results[:20]:
ep = f" {r.episode}" if r.episode else ""
print(f" [{r.group}] {r.title[:60]}... | {r.size} | S:{r.seeders}{ep}")
print(f" magnet: {r.magnet_link[:80]}...")
else:
print("사용법: python tools/nyaa_client.py search \"제목\" [--suffix \"ASW HEVC\"]")
asyncio.run(main())

View File

@@ -196,3 +196,66 @@ class QBitClient:
}
except Exception as e:
return {"connected": False, "error": str(e), "url": self.url}
async def delete_torrent(self, info_hash: str, delete_files: bool = False) -> bool:
"""토렌트 삭제 (완료 후 정리용)."""
await self._ensure_login()
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
f"{self.url}/api/v2/torrents/delete",
data={
"hashes": info_hash,
"deleteFiles": str(delete_files).lower(),
},
cookies=self._cookies(),
)
return resp.status_code == 200
# ── CLI 진입점 ──
if __name__ == "__main__":
import sys
import asyncio
args = sys.argv[1:]
client = QBitClient()
async def main():
if not args or args[0] == "status":
# python tools/qbit_client.py status
torrents = await client.list_torrents(category="anime")
if not torrents:
print("🎬 다운로드 중인 애니 없음")
return
print(f"🎬 다운로드 현황 ({len(torrents)}건):")
for t in torrents:
speed = f"{t.download_speed / (1024**2):.1f}MB/s" if t.download_speed > 0 else "-"
eta = f"{t.eta // 60}" if t.eta > 0 else ""
print(f" {t.progress*100:.0f}% | {t.name[:50]} | {speed} | ETA: {eta}")
elif args[0] == "add" and len(args) > 1:
# python tools/qbit_client.py add "magnet:..." --path "\\NAS\path"
magnet = args[1]
path = ""
for i, a in enumerate(args):
if a == "--path" and i + 1 < len(args):
path = args[i + 1]
ok = await client.add_torrent(magnet, save_path=path)
print(f"{'✅ 추가 성공' if ok else '❌ 추가 실패'}")
elif args[0] == "delete" and len(args) > 1:
# python tools/qbit_client.py delete <hash> [--files]
hash_ = args[1]
delete_files = "--files" in args
ok = await client.delete_torrent(hash_, delete_files=delete_files)
print(f"{'✅ 삭제 성공' if ok else '❌ 삭제 실패'}")
elif args[0] == "test":
info = await client.test_connection()
print(f"연결: {'' if info['connected'] else ''} {info}")
else:
print("사용법: python tools/qbit_client.py [status|add|delete|test] [옵션]")
asyncio.run(main())

View File

@@ -51,20 +51,16 @@ def _extract_episode_from_text(text: str) -> Optional[int]:
def parse_google_drive_links(html: str) -> list[SubtitleFile]:
"""HTML에서 Google Drive 다운로드 링크 추출.
패턴: drive.google.com/file/d/{fileId}/view
→ 직접 다운로드: drive.google.com/uc?id={fileId}&export=download
지원 패턴:
1. drive.google.com/file/d/{fileId}/view
2. drive.google.com/uc?id={fileId}&export=download (직접 다운로드)
"""
pattern = r'https://drive\.google\.com/file/d/([a-zA-Z0-9_-]+)/view[^"]*'
matches = re.findall(pattern, html)
# 링크 주변 텍스트에서 에피소드 정보 추출
link_pattern = r'<a[^>]*href="(https://drive\.google\.com/file/d/[^"]+)"[^>]*>([^<]*)</a>'
link_matches = re.findall(link_pattern, html)
results = []
seen_ids = set()
for url, text in link_matches:
# 패턴 1: /file/d/{id}/view — HTML <a> 태그
link_pattern = r'<a[^>]*href="(https://drive\.google\.com/file/d/[^"]+)"[^>]*>([^<]*)</a>'
for url, text in re.findall(link_pattern, html):
m = re.search(r'/d/([a-zA-Z0-9_-]+)/', url)
if not m:
continue
@@ -72,19 +68,15 @@ def parse_google_drive_links(html: str) -> list[SubtitleFile]:
if file_id in seen_ids:
continue
seen_ids.add(file_id)
episode = _extract_episode_from_text(text)
download_url = f"https://drive.google.com/uc?id={file_id}&export=download"
results.append(SubtitleFile(
filename=text.strip() or f"subtitle_{file_id}",
download_url=download_url,
download_url=f"https://drive.google.com/uc?id={file_id}&export=download",
platform="google_drive",
episode=episode,
episode=_extract_episode_from_text(text),
))
# 매칭되지 않은 bare ID도 추가
for file_id in matches:
# 패턴 1: bare ID (태그 밖)
for file_id in re.findall(r'drive\.google\.com/file/d/([a-zA-Z0-9_-]+)/', html):
if file_id not in seen_ids:
seen_ids.add(file_id)
results.append(SubtitleFile(
@@ -93,6 +85,23 @@ def parse_google_drive_links(html: str) -> list[SubtitleFile]:
platform="google_drive",
))
# 패턴 2: uc?id={id} 직접 다운로드 URL (Blogspot 등)
uc_pattern = r'drive\.google\.com/uc\?[^"\s\)]*id=([a-zA-Z0-9_-]+)[^"\s\)]*'
for file_id in re.findall(uc_pattern, html):
if file_id in seen_ids:
continue
seen_ids.add(file_id)
# 주변 텍스트에서 파일명 추출 시도 (마크다운: [파일명](url))
md_pattern = r'\[([^\]]+)\]\([^)]*' + re.escape(file_id) + r'[^)]*\)'
md_match = re.search(md_pattern, html)
filename = md_match.group(1).strip() if md_match else f"subtitle_{file_id}"
results.append(SubtitleFile(
filename=filename,
download_url=f"https://drive.google.com/uc?id={file_id}&export=download",
platform="google_drive",
episode=_extract_episode_from_text(filename),
))
return results
@@ -205,20 +214,32 @@ class SubtitleDownloader:
if "download.blog.naver.com" in html or "blogfiles.pstatic.net" in html:
results.extend(parse_naver_links(html))
# 범용: 직접 자막 파일 링크 탐지
generic_pattern = r'href="([^"]+\.(?:ass|srt|ssa|sub|zip|7z))"'
generic = re.findall(generic_pattern, html, re.IGNORECASE)
# 범용: 직접 자막 파일 링크 탐지 (HTML href + 마크다운)
seen_urls = {r.download_url for r in results}
for gurl in generic:
# HTML <a href="...">
for gurl in re.findall(r'href="([^"]+\.(?:ass|srt|ssa|sub|zip|7z)(?:\?[^"]*)?)"', html, re.IGNORECASE):
if gurl not in seen_urls:
filename = gurl.split("/")[-1].split("?")[0]
seen_urls.add(gurl)
filename = unquote(gurl.split("/")[-1].split("?")[0])
results.append(SubtitleFile(
filename=unquote(filename),
filename=filename,
download_url=gurl,
platform="generic",
episode=_extract_episode_from_text(filename),
))
# 마크다운 [텍스트](url) — Blogspot 등
for text, gurl in re.findall(r'\[([^\]]+)\]\((https?://[^)]+\.(?:ass|srt|ssa|sub|zip|7z)[^)]*)\)', html, re.IGNORECASE):
if gurl not in seen_urls:
seen_urls.add(gurl)
results.append(SubtitleFile(
filename=text.strip(),
download_url=gurl,
platform="generic",
episode=_extract_episode_from_text(text),
))
logger.info(f"자막 {len(results)}건 발견: {url}")
return results
@@ -227,7 +248,7 @@ class SubtitleDownloader:
sub: SubtitleFile,
save_dir: Optional[str] = None,
) -> str:
"""자막 파일 다운로드 → 로컬 저장. 저장 경로 반환."""
"""자막 파일 다운로드 → 로컬 저장. ZIP이면 자동 해제. 저장 경로 반환."""
target_dir = Path(save_dir) if save_dir else self.download_dir
target_dir.mkdir(parents=True, exist_ok=True)
@@ -248,13 +269,55 @@ class SubtitleDownloader:
# Content-Disposition에서 실제 파일명 추출
cd = resp.headers.get("content-disposition", "")
if "filename" in cd:
m = re.search(r'filename[*]?=["\']?(?:UTF-8\'\')?([^"\';\n]+)', cd)
m = re.search(r'filename[*]?=["\']?(?:UTF-8\'\')?([^"\';\\n]+)', cd)
if m:
sub.filename = unquote(m.group(1).strip())
filepath = target_dir / sub.filename
filepath.write_bytes(resp.content)
# ZIP/7z 자동 해제
extracted = self._extract_archive(filepath, target_dir)
if extracted:
sub.local_path = extracted[0] # 첫 번째 자막 파일
sub.filename = Path(extracted[0]).name
logger.info(f"자막 ZIP 해제 완료: {len(extracted)}건 → {target_dir}")
return extracted[0]
sub.local_path = str(filepath)
logger.info(f"자막 다운로드 완료: {filepath}")
return str(filepath)
@staticmethod
def _extract_archive(filepath: Path, target_dir: Path) -> list[str]:
"""ZIP/7z 파일 해제 → 자막 파일(.ass/.srt/.ssa/.sub) 경로 리스트 반환."""
import zipfile
suffix = filepath.suffix.lower()
if suffix not in (".zip", ".7z"):
return []
extracted = []
if suffix == ".zip":
try:
with zipfile.ZipFile(filepath, "r") as zf:
for name in zf.namelist():
# 디렉토리 건너뛰기
if name.endswith("/"):
continue
ext = Path(name).suffix.lower()
if ext in (".ass", ".srt", ".ssa", ".sub"):
# 중첩 폴더 무시, 파일만 추출
out_name = Path(name).name
out_path = target_dir / out_name
with zf.open(name) as src, open(out_path, "wb") as dst:
dst.write(src.read())
extracted.append(str(out_path))
# ZIP 원본 삭제
filepath.unlink(missing_ok=True)
except (zipfile.BadZipFile, Exception) as e:
logger.warning(f"ZIP 해제 실패: {filepath} - {e}")
return extracted

View File

@@ -10,9 +10,70 @@ import unicodedata
from difflib import SequenceMatcher
from typing import Optional
import httpx
logger = logging.getLogger("variet.tools.matcher")
# ──────────────────────────────────────────────
# 영어 제목 조회 (Jikan API / MyAnimeList)
# ──────────────────────────────────────────────
async def fetch_english_title(japanese_title: str) -> dict[str, str]:
"""Jikan API로 일본어 원제의 영어/로마자 제목 조회.
Returns:
{"default": "Sousou no Frieren 2nd Season",
"english": "Frieren: Beyond Journey's End Season 2",
"synonyms": ["Frieren at the Funeral Season 2"]}
실패 시 빈 dict.
"""
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
"https://api.jikan.moe/v4/anime",
params={"q": japanese_title, "limit": 5},
)
resp.raise_for_status()
data = resp.json()
items = data.get("data", [])
if not items:
return {}
# 원제와 가장 잘 매칭되는 항목 선택
best = None
best_score = 0.0
for item in items:
jp = item.get("title_japanese", "")
score = SequenceMatcher(None, japanese_title, jp).ratio()
if score > best_score:
best_score = score
best = item
if not best or best_score < 0.5:
return {}
result = {
"default": best.get("title", ""),
"english": best.get("title_english") or "",
"synonyms": [],
}
for t in best.get("titles", []):
if t["type"] == "Synonym":
result["synonyms"].append(t["title"])
logger.info(
f"Jikan 영어 제목 조회: {japanese_title}"
f"default={result['default']}, english={result['english']}"
)
return result
except Exception as e:
logger.warning(f"Jikan API 조회 실패: {e}")
return {}
# ──────────────────────────────────────────────
# 일어 → 로마자 변환 테이블 (히라가나/카타카나)
# ──────────────────────────────────────────────
@@ -66,24 +127,35 @@ def _kata_to_hira(text: str) -> str:
def japanese_to_romaji(text: str) -> str:
"""일본어 텍스트를 로마자로 근사 변환."""
"""일본어 텍스트를 로마자로 변환 (pykakasi 기반, fallback: 카나 테이블)."""
try:
import pykakasi
kks = pykakasi.kakasi()
result_items = kks.convert(text)
romaji = " ".join(item["hepburn"] for item in result_items)
# 연속 공백 정리
romaji = re.sub(r'\s+', ' ', romaji).strip()
return romaji
except ImportError:
logger.warning("pykakasi 미설치 — 카나 테이블 fallback 사용")
return _japanese_to_romaji_fallback(text)
def _japanese_to_romaji_fallback(text: str) -> str:
"""일본어→로마자 fallback (카나만 변환, 한자는 그대로)."""
text = _kata_to_hira(text)
result = []
i = 0
while i < len(text):
# 장음 기호 (ー U+30FC, ー가 히라가나로 안 변환되므로 여기서 처리)
if text[i] == '\u30FC': # ー
# 장음: 이전 모음 반복 (간략화: 스킵)
i += 1
continue
# 2글자 매칭 우선 (きゃ 등)
if i + 1 < len(text) and text[i:i+2] in _KANA_ROMAJI:
result.append(_KANA_ROMAJI[text[i:i+2]])
i += 2
elif text[i] in _KANA_ROMAJI:
romaji = _KANA_ROMAJI[text[i]]
# 촉음(っ) 처리: 다음 자음 반복
if text[i] == '' and i + 1 < len(text):
next_romaji = _KANA_ROMAJI.get(text[i+1], "")
if next_romaji:
@@ -92,13 +164,13 @@ def japanese_to_romaji(text: str) -> str:
result.append(romaji)
i += 1
else:
# 한자, 영어, 숫자 등 → 그대로
result.append(text[i])
i += 1
return "".join(result)
def normalize_title(title: str) -> str:
"""제목 비교용 정규화: 소문자 + 특수문자 제거 + 공백 정리."""
title = title.lower().strip()
@@ -155,8 +227,14 @@ def match_titles(
best_sim = max(sim_romaji, sim_korean, sim_original)
if best_sim >= threshold:
scored.append((best_sim, result))
# ASW HEVC 릴리스는 threshold 면제 (약칭으로 올라와도 포함)
title_upper = result.title.upper()
is_preferred = "[ASW]" in result.title and ("HEVC" in title_upper or "X265" in title_upper)
if best_sim >= threshold or is_preferred:
# ASW HEVC 릴리스는 유사도 보너스 (+0.5) → 정렬 시 상위 배치
effective_sim = best_sim + 0.5 if is_preferred else best_sim
scored.append((effective_sim, result))
# 유사도 내림차순 정렬
scored.sort(key=lambda x: x[0], reverse=True)