Files
variet-agent/tools/anime_pipeline.py

1286 lines
50 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""애니메이션 자동화 파이프라인.
전체 흐름:
1. resolve() — 4요소 세트 완성 (NAS폴더, 파일현황, Anissia명, Nyaa명)
2. download() — 완성된 WorkUnit으로 다운로드 실행
3. batch_download() — resolve() 반복 + download() 실행
"""
import asyncio
import logging
import math
import re
from collections import Counter
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
import config
from tools.anissia_client import AnissiaClient, AnimeInfo, CaptionInfo
from tools.nyaa_client import NyaaClient, TorrentResult
from tools.qbit_client import QBitClient
from tools.subtitle_downloader import SubtitleDownloader, SubtitleFile
from tools.title_matcher import (
match_titles, make_nas_folder_name, rename_subtitle_to_video,
fetch_english_title, fetch_title_via_jikan, web_search_anime_title,
)
logger = logging.getLogger("variet.tools.pipeline")
VIDEO_EXTS = {".mkv", ".mp4", ".avi", ".webm", ".m4v", ".ts"}
SUB_EXTS = {".ass", ".srt", ".ssa", ".sub", ".smi"}
# ──────────────────────────────────────────
# 데이터 모델
# ──────────────────────────────────────────
@dataclass
class AnimeWorkUnit:
"""다운로드 작업 1건의 완전한 정보 세트.
resolve()가 이 세트를 완성한 뒤에만 download()가 실행됨.
"""
# NAS 정보
nas_folder: str = "" # "[26_1분기]장송의프리렌2기"
nas_path: Path = field(default_factory=Path)
existing_videos: list[str] = field(default_factory=list)
existing_subs: list[str] = field(default_factory=list)
existing_eps: set[int] = field(default_factory=set)
release_group: str = "" # "ASW"
release_name: str = "" # "Sousou no Frieren S2"
episode_offset: int = 0 # 절대번호→시즌번호 변환 오프셋 (예: 12)
# Anissia 정보
anime: Optional[AnimeInfo] = None
captions: list[CaptionInfo] = field(default_factory=list)
# Nyaa 검색 정보
nyaa_keywords: list[str] = field(default_factory=list)
torrents: list[TorrentResult] = field(default_factory=list)
@dataclass
class DownloadResult:
"""파이프라인 실행 결과."""
success: bool
anime: Optional[AnimeInfo] = None
captions: list[CaptionInfo] = field(default_factory=list)
torrents: list[TorrentResult] = field(default_factory=list)
subtitles: list[SubtitleFile] = field(default_factory=list)
nas_folder: str = ""
torrent_added: bool = False
torrent_hashes: list[str] = field(default_factory=list)
message: str = ""
errors: list[str] = field(default_factory=list)
# ──────────────────────────────────────────
# 파이프라인
# ──────────────────────────────────────────
class AnimePipeline:
"""애니메이션 다운로드 자동화 파이프라인."""
def __init__(self):
self.anissia = AnissiaClient()
self.nyaa = NyaaClient()
self.qbit = QBitClient()
self.sub_downloader = SubtitleDownloader()
self.nas_base = getattr(config, "NAS_ANIME_PATH",
r"\\192.168.10.10\NasData\Video\Animation")
from tools.nas_scanner import NasScanner
self.nas = NasScanner(self.nas_base)
# 캐시 (세션당 1회 로드)
self._schedule_cache: list[AnimeInfo] | None = None
self._nas_folder_cache: list | None = None
# ──────────────────────────────────────
# 1단계: resolve — 4요소 세트 완성
# ──────────────────────────────────────
async def resolve(self, title: str) -> AnimeWorkUnit | None:
"""애니 제목에서 4요소 세트(NAS/Anissia/Nyaa/파일현황)를 완성합니다.
Fallback 체인:
1. Anissia 직접 검색 (compact 매칭 포함)
2. 웹 검색 — DuckDuckGo 한글+"애니" → 후보 → Anissia 재검색
3. NAS 파일명(영문) → Jikan(일본어) → Anissia 재검색
Returns:
완성된 AnimeWorkUnit, 또는 매칭 실패 시 None
"""
unit = AnimeWorkUnit()
# ── Step 1: NAS 기존 폴더 스캔 (1회) ──
nas_folder_obj = self._find_existing_nas_folder(title)
if nas_folder_obj:
unit.nas_folder = nas_folder_obj.folder_name
unit.nas_path = Path(nas_folder_obj.full_path)
unit.existing_videos = list(nas_folder_obj.video_files)
unit.existing_subs = list(nas_folder_obj.subtitle_files)
# 에피소드 번호 추출
for vf in unit.existing_videos:
ep = self._extract_episode(vf)
if ep is not None:
unit.existing_eps.add(ep)
# 릴리스 그룹 + 이름 추출
if unit.existing_videos:
unit.release_name = self._extract_release_name(unit.existing_videos[0])
groups = []
for vf in unit.existing_videos:
m = re.match(r'\[([^\]]+)\]', vf)
if m:
groups.append(m.group(1))
if groups:
unit.release_group = Counter(groups).most_common(1)[0][0]
if unit.existing_eps:
logger.info(
f"NAS 기존 폴더: {unit.nas_folder} | "
f"영상:{len(unit.existing_videos)} 자막:{len(unit.existing_subs)} "
f"그룹:[{unit.release_group}]"
)
# ── Step 2: Anissia 매칭 (fallback 체인) ──
anime = await self._resolve_anissia(title, unit)
if not anime:
logger.warning(f"Anissia 매칭 실패: '{title}'")
return None
unit.anime = anime
# NAS 폴더 재검증 — start_date로 정확한 시즌 매칭
if anime.start_date:
correct_folder = self._find_existing_nas_folder(
anime.subject, start_date=anime.start_date,
)
if correct_folder and correct_folder.folder_name != unit.nas_folder:
logger.info(
f"NAS 폴더 교정: {unit.nas_folder}{correct_folder.folder_name}"
)
unit.nas_folder = correct_folder.folder_name
unit.nas_path = Path(correct_folder.full_path)
unit.existing_videos = list(correct_folder.video_files)
unit.existing_subs = list(correct_folder.subtitle_files)
unit.existing_eps = set()
for vf in unit.existing_videos:
ep = self._extract_episode(vf)
if ep is not None:
unit.existing_eps.add(ep)
if unit.existing_videos:
unit.release_name = self._extract_release_name(
unit.existing_videos[0]
)
groups = []
for v in unit.existing_videos:
m = re.match(r'\[([^\]]+)\]', v)
if m:
groups.append(m.group(1))
if groups:
from collections import Counter as _Counter
unit.release_group = _Counter(groups).most_common(1)[0][0]
# NAS 폴더가 없으면 Anissia 정보로 생성
if not unit.nas_folder:
unit.nas_folder = make_nas_folder_name(anime.subject, anime.start_date)
unit.nas_path = Path(self.nas_base) / unit.nas_folder
# ── Step 3: 자막 정보 ──
try:
unit.captions = await self.anissia.get_captions(anime.anime_no)
except Exception as e:
logger.warning(f"자막 조회 오류: {e}")
# ── Step 4: Nyaa 키워드 + 토렌트 확보 ──
await self._resolve_nyaa(unit)
# ── Step 5: 에피소드 번호 오프셋 감지 ──
self._detect_episode_offset(unit)
logger.info(
f"WorkUnit 완성: {anime.subject} | "
f"NAS:{unit.nas_folder} | sub:{len(unit.captions)} | "
f"tor:{len(unit.torrents)} | offset:{unit.episode_offset}"
)
return unit
async def _resolve_anissia(
self, title: str, unit: AnimeWorkUnit
) -> AnimeInfo | None:
"""다단계 fallback으로 Anissia 애니 매칭."""
# 1차: Anissia 직접 검색 (compact 매칭 포함)
anime_list = await self._search_anissia(title)
if anime_list:
return anime_list[0]
# 2차: 웹 검색 — 한글+"애니" → 후보 → Anissia 재검색
web_candidates = await web_search_anime_title(title)
for candidate in web_candidates:
logger.info(f"웹 검색 후보 → Anissia: '{candidate}'")
anime_list = await self._search_anissia(candidate)
if anime_list:
return anime_list[0]
# 3차: NAS 파일명(영문) → Jikan(일본어) → Anissia
if unit.release_name:
logger.info(f"NAS 파일명 fallback: '{unit.release_name}'")
jikan = await fetch_title_via_jikan(unit.release_name)
if jikan:
for key in ("japanese", "default"):
alt_title = jikan.get(key, "")
if not alt_title:
continue
logger.info(f"Jikan {key}: {alt_title}")
anime_list = await self._search_anissia(alt_title)
if anime_list:
return anime_list[0]
return None
async def _search_anissia(self, keyword: str) -> list[AnimeInfo]:
"""Anissia 검색 (에러 무시)."""
try:
return await self.anissia.search_anime(keyword)
except Exception:
return []
async def _resolve_nyaa(self, unit: AnimeWorkUnit):
"""Nyaa 토렌트 검색 — 기존 릴리스명 또는 Jikan 제목 사용."""
try:
# 전략 1: 기존 릴리스명이 있으면 그대로 검색 (가장 정확)
if unit.release_name:
logger.info(f"Nyaa 검색 (릴리스명): '{unit.release_name}'")
found = await self.nyaa.search(unit.release_name, use_default_suffix=False)
matched = [t for t in found
if self._title_contains_keyword(t.title, [unit.release_name.lower()])]
if matched:
# ASW/HEVC 우선 정렬 → 최대 50건
matched.sort(key=lambda t: (
0 if '[ASW]' in t.title else 1,
0 if 'HEVC' in t.title.upper() else 1,
))
unit.torrents = matched[:50]
unit.nyaa_keywords = [unit.release_name.lower()]
logger.info(f"Nyaa 릴리스명 → {len(found)}건 중 {len(matched)}건 매칭")
return
# 전략 2: Jikan 영어 제목 + ASW HEVC 검색
if unit.anime:
eng_titles = await fetch_english_title(unit.anime.original_subject)
eng_default = eng_titles.get("default", "")
eng_english = eng_titles.get("english", "")
synonyms = eng_titles.get("synonyms", [])
keywords = self._build_match_keywords(
eng_default, eng_english, synonyms, unit.anime.original_subject,
)
unit.nyaa_keywords = keywords
logger.info(f"매칭 키워드: {keywords}")
asw_results = await self.nyaa.search("ASW HEVC", use_default_suffix=False)
matched = [t for t in asw_results
if self._title_contains_keyword(t.title, keywords)]
if matched:
logger.info(f"ASW HEVC → {len(asw_results)}건 중 {len(matched)}건 매칭")
unit.torrents = matched[:30]
except Exception as e:
logger.warning(f"Nyaa 검색 오류: {e}")
def _detect_episode_offset(self, unit: AnimeWorkUnit):
"""Nyaa 토렌트의 에피소드 번호 체계 감지 → 오프셋 설정.
- S2/S3 태그가 있으면 → 시즌 상대 번호 (오프셋 불필요)
- 없으면 → AniList prequel 체인으로 오프셋 계산
- Nyaa min_ep > offset이면 절대 번호 확정
"""
if not unit.torrents or not unit.anime:
return
# ASW 토렌트 중심으로 S-tag 감지
asw_torrents = [t for t in unit.torrents if '[ASW]' in t.title]
check_torrents = asw_torrents[:10] if asw_torrents else unit.torrents[:10]
has_season_tag = any(
re.search(r'\bS\d+\b|Season\s*\d+', t.title, re.IGNORECASE)
for t in check_torrents
)
if has_season_tag:
logger.info("에피소드 번호: 시즌 상대 번호 (S태그 감지)")
return # offset = 0
# S태그 없음 → AniList에서 prequel 체인으로 offset 계산
try:
offset = self._get_anilist_offset(unit.release_name or
unit.anime.original_subject)
if offset <= 0:
return
# Nyaa ep 번호가 실제로 offset보다 큰지 확인 (절대 번호 확정)
eps = [self._extract_episode(t.title) for t in check_torrents]
eps = [e for e in eps if e is not None]
if not eps:
return
min_ep = min(eps)
if min_ep > offset:
unit.episode_offset = offset
logger.info(
f"에피소드 번호: 절대 번호 감지 (offset={offset}, "
f"Nyaa min_ep={min_ep})"
)
else:
logger.info(
f"에피소드 번호: S태그 없지만 시즌 상대 번호 "
f"(offset={offset}, Nyaa min_ep={min_ep})"
)
except Exception as e:
logger.warning(f"에피소드 오프셋 감지 실패: {e}")
def _get_anilist_offset(self, title: str) -> int:
"""AniList prequel 체인으로 시즌 에피소드 오프셋 계산."""
import httpx
search_q = '''
query ($search: String) {
Page(page: 1, perPage: 10) {
media(search: $search, type: ANIME, format_in: [TV, TV_SHORT]) {
id
title { romaji }
episodes
relations {
edges {
relationType
node { id format }
}
}
}
}
}
'''
detail_q = '''
query ($id: Int) {
Media(id: $id, type: ANIME) {
id
title { romaji }
episodes
format
relations {
edges {
relationType
node { id format }
}
}
}
}
'''
resp = httpx.post(
'https://graphql.anilist.co',
json={'query': search_q, 'variables': {'search': title}},
timeout=15,
)
results = resp.json().get('data', {}).get('Page', {}).get('media', [])
if not results:
return 0
# prequel이 있는 결과 우선 (2기 이상)
media = None
for r in results:
edges = r.get('relations', {}).get('edges', [])
has_prequel = any(
e['relationType'] == 'PREQUEL'
and e['node'].get('format') in ('TV', 'TV_SHORT', None)
for e in edges
)
if has_prequel:
media = r
break
if not media:
return 0 # prequel 없음 = 1기 또는 단일 시즌
# prequel 체인 역추적
chain_eps = []
current = media
while True:
edges = current.get('relations', {}).get('edges', [])
prequels = [
e for e in edges
if e['relationType'] == 'PREQUEL'
and e['node'].get('format') in ('TV', 'TV_SHORT', None)
]
if not prequels:
break
prequel_id = prequels[0]['node']['id']
resp2 = httpx.post(
'https://graphql.anilist.co',
json={'query': detail_q, 'variables': {'id': prequel_id}},
timeout=15,
)
current = resp2.json().get('data', {}).get('Media')
if not current:
break
chain_eps.append(current.get('episodes', 0) or 0)
return sum(chain_eps)
# ──────────────────────────────────────
# 2단계: search — 검색 결과 표시용 (기존 호환)
# ──────────────────────────────────────
async def search(self, title: str) -> DownloadResult:
"""애니 검색 — 정보 + 자막 + 토렌트 현황 표시.
실제 다운로드 없이 검색 결과만 반환.
"""
unit = await self.resolve(title)
if not unit:
return DownloadResult(
success=False,
message=f"'{title}' 검색 결과가 없습니다.",
)
result = DownloadResult(
success=True,
anime=unit.anime,
captions=unit.captions,
torrents=unit.torrents,
nas_folder=unit.nas_folder,
)
result.message = (
f"**{unit.anime.subject}** ({unit.anime.original_subject})\n"
f"자막 제작자: {len(unit.captions)}명 | "
f"토렌트: {len(unit.torrents)}\n"
f"NAS 폴더: `{unit.nas_folder}`"
)
if not unit.torrents:
result.errors.append(
f"⚠️ ASW HEVC 릴리스가 없습니다.\n"
f"Nyaa에서 직접 검색해주세요."
)
return result
# ──────────────────────────────────────
# 3단계: download — WorkUnit 기반 다운로드
# ──────────────────────────────────────
async def download(
self,
title: str,
mode: str = "auto",
episode: Optional[int] = None,
) -> DownloadResult:
"""애니 다운로드 실행.
Args:
title: 한글 제목
mode: "auto" | "sub_required" | "sub_only" | "video_only"
episode: 특정 에피소드 (None이면 빠진 것 전부)
"""
# resolve로 WorkUnit 완성
unit = await self.resolve(title)
if not unit:
return DownloadResult(
success=False,
message=f"'{title}' 검색 결과가 없습니다.",
)
return await self._execute_download(unit, mode, episode)
async def _execute_download(
self,
unit: AnimeWorkUnit,
mode: str = "auto",
episode: Optional[int] = None,
) -> DownloadResult:
"""완성된 WorkUnit으로 다운로드 실행 (추가 검색/스캔 없음)."""
result = DownloadResult(
success=True,
anime=unit.anime,
captions=unit.captions,
torrents=unit.torrents,
nas_folder=unit.nas_folder,
)
result.message = (
f"**{unit.anime.subject}** ({unit.anime.original_subject})\n"
f"자막: {len(unit.captions)}명 | 토렌트: {len(unit.torrents)}"
)
nas_folder = unit.nas_path
# ── 자막 다운로드 ──
if mode in ("auto", "sub_only", "sub_required"):
await self._download_subtitles(result, unit, episode)
# ── 영상 토렌트 추가 ──
if mode in ("auto", "video_only"):
await self._add_torrents(result, unit, episode)
elif mode == "sub_required":
if result.subtitles:
await self._add_torrents(result, unit, episode)
else:
result.errors.append("자막이 없어 영상 다운로드를 보류합니다.")
# ── 토렌트 완료 대기 + 자막 리네임 + 정리 ──
if result.torrent_hashes:
await self._wait_and_cleanup_torrents(result, nas_folder)
# ── 항상: 기존+신규 자막을 영상 파일명에 맞게 리네임 ──
try:
self._rename_subtitles_to_match_videos(nas_folder, result, offset=unit.episode_offset)
except Exception as e:
logger.warning(f"자막 리네임 오류: {e}")
# 결과 메시지
parts = [result.message]
if result.subtitles:
parts.append(f"\n📝 자막 {len(result.subtitles)}건 다운로드 완료")
if result.torrent_added:
parts.append(f"\n🎬 토렌트 다운로드 완료 → `{nas_folder}`")
if result.errors:
parts.append(f"\n⚠️ 오류: " + "; ".join(result.errors))
result.message = "\n".join(parts)
return result
# ──────────────────────────────────────
# 4단계: batch_download — resolve + 일괄 실행
# ──────────────────────────────────────
async def batch_download(
self,
mode: str = "auto",
sub_filter: bool = True,
) -> list[DownloadResult]:
"""이번 분기 애니 일괄 다운로드.
Args:
mode: "auto" | "sub_only" | "video_only"
sub_filter: True면 자막 없는 애니는 영상만 다운 (video_only 모드)
"""
# 1. NAS 이번 분기 폴더 스캔
current_folders = self.nas.get_current_quarter_anime()
if not current_folders:
logger.warning("이번 분기 NAS 폴더 없음")
return []
logger.info(f"이번 분기 NAS 폴더: {len(current_folders)}")
# 2. 캐시 미리 로드 (NAS + Anissia, 병렬 resolve 전)
self._nas_folder_cache = await asyncio.get_event_loop().run_in_executor(
None, self.nas.list_anime_folders
)
await self.anissia.search_anime("_cache_warmup_")
logger.info(f"캐시 로드 완료: NAS {len(self._nas_folder_cache)}개, Anissia {len(self.anissia._schedule_cache or [])}")
# 3. 전체 resolve — 병렬 실행
async def _safe_resolve(folder):
try:
return await self.resolve(folder.title)
except Exception as e:
logger.error(f"resolve 오류 ({folder.title}): {e}")
return None
resolve_tasks = [_safe_resolve(f) for f in current_folders]
resolved = await asyncio.gather(*resolve_tasks)
# 4. 유효한 WorkUnit 필터 + 모드 결정
units: list[tuple[AnimeWorkUnit, str]] = []
for folder, unit in zip(current_folders, resolved):
if not unit:
logger.info(f" Anissia 매칭 실패: {folder.title}")
continue
effective_mode = mode
if sub_filter and not unit.captions:
effective_mode = "video_only"
logger.info(f" 자막 없음 → 영상만: {unit.anime.subject}")
units.append((unit, effective_mode))
logger.info(f"resolve 완료: {len(units)}/{len(current_folders)}개 유효")
# 5. 일괄 다운로드
results = []
for unit, effective_mode in units:
try:
result = await self._execute_download(unit, mode=effective_mode)
results.append(result)
status = "" if result.success else ""
logger.info(f" {status} {unit.anime.subject}: {result.message[:80]}")
except Exception as e:
logger.error(f" 오류 ({unit.anime.subject}): {e}")
results.append(DownloadResult(
success=False,
anime=unit.anime,
message=f"{unit.anime.subject}: 오류 - {e}",
errors=[str(e)],
))
return results
# ──────────────────────────────────────
# 토렌트 완료 대기 + 정리
# ──────────────────────────────────────
async def _wait_and_cleanup_torrents(
self,
result: DownloadResult,
nas_folder: Path,
timeout: int = 600,
poll_interval: int = 10,
):
"""토렌트 다운로드 완료 대기 → 자동 삭제.
Args:
timeout: 최대 대기 시간 (초, 기본 10분)
poll_interval: 폴링 간격 (초)
"""
if not result.torrent_hashes:
return
pending = set(result.torrent_hashes)
completed = set()
failed = set()
elapsed = 0
logger.info(
f"토렌트 완료 대기 시작: {len(pending)}건, 타임아웃 {timeout}"
)
while pending and elapsed < timeout:
await asyncio.sleep(poll_interval)
elapsed += poll_interval
for h in list(pending):
try:
status = await self.qbit.get_torrent_status(h)
if status is None:
# 토렌트가 이미 없음 (수동 삭제 등)
pending.discard(h)
continue
if status.progress >= 1.0:
completed.add(h)
pending.discard(h)
logger.info(
f"토렌트 완료: {status.name[:50]} "
f"({status.size / (1024**2):.0f}MB)"
)
elif status.state in ("error", "missingFiles"):
failed.add(h)
pending.discard(h)
result.errors.append(
f"토렌트 실패 ({status.state}): {status.name[:50]}"
)
except Exception as e:
logger.warning(f"토렌트 상태 확인 실패 ({h[:8]}): {e}")
if pending:
logger.info(
f"토렌트 대기중: {len(pending)}건 남음 "
f"({elapsed}/{timeout}초)"
)
# 타임아웃된 토렌트
for h in pending:
result.errors.append(f"토렌트 타임아웃: {h[:16]}...")
failed.add(h)
# 완료된 토렌트 삭제 (파일은 남김)
for h in completed:
try:
await self.qbit.delete_torrent(h, delete_files=False)
logger.info(f"토렌트 삭제: {h[:16]}")
except Exception as e:
logger.warning(f"토렌트 삭제 실패 ({h[:8]}): {e}")
logger.info(
f"토렌트 정리 완료: {len(completed)}건 성공, "
f"{len(failed)}건 실패"
)
# ──────────────────────────────────────
# 자막 다운로드
# ──────────────────────────────────────
async def _download_subtitles(
self,
result: DownloadResult,
unit: AnimeWorkUnit,
episode: Optional[int],
):
"""자막 다운로드 → 영상 폴더에 직접 저장.
Anissia는 최신 에피소드 URL만 제공하므로,
URL 패턴을 분석해 이전 에피소드 URL을 자동 생성.
"""
nas_folder = unit.nas_path
nas_folder.mkdir(parents=True, exist_ok=True)
# ── 기존 자막 에피소드 + 파일 집합 (NAS 실시간 스캔) ──
existing_sub_eps = set()
existing_sub_files: set[str] = set()
offset = unit.episode_offset
# resolve 시점의 캐시 대신 실제 NAS 폴더에서 현재 자막 파일 스캔
try:
for f in nas_folder.rglob("*"):
if f.suffix.lower() in SUB_EXTS and f.is_file():
existing_sub_files.add(f.name.lower())
ep = self._extract_episode(f.name)
if ep is not None:
existing_sub_eps.add(ep)
if offset > 0 and ep > offset:
existing_sub_eps.add(ep - offset)
except (OSError, PermissionError) as e:
logger.warning(f"NAS 자막 스캔 오류: {e}")
if existing_sub_eps:
logger.info(f"기존 자막 에피소드 (스킵): {sorted(existing_sub_eps)}")
for caption in unit.captions:
if not caption.website:
continue
if episode is not None and caption.episode != str(episode):
continue
# 캡션 URL에서 에피소드 패턴 분석 → 이전 에피소드 URL 생성
urls_to_check = self._discover_episode_urls(
caption.website, caption.episode, episode,
)
for url, discovered_ep in urls_to_check:
# 이미 자막 있는 에피소드 → URL 페치 전에 스킵
if discovered_ep is not None and discovered_ep in existing_sub_eps:
logger.info(f"자막 URL 스킵 (기존 존재): ep{discovered_ep}")
continue
# 에피소드를 모르는 URL: 기존 영상 에피소드 전부 자막 있으면 다운로드 불필요
if discovered_ep is None and unit.existing_eps and unit.existing_eps.issubset(existing_sub_eps):
logger.info(f"자막 URL 스킵 (모든 에피소드 자막 존재): {url[:60]}")
continue
try:
subs = await self.sub_downloader.find_subtitles(url)
for sub in subs:
# sub.episode이 None이면 파일명에서 추출 시도
if sub.episode is None:
sub.episode = self._extract_episode(sub.filename)
# discovered_ep fallback
if sub.episode is None:
sub.episode = discovered_ep
if episode is not None and sub.episode is not None and sub.episode != episode:
continue
if sub.episode is not None and sub.episode in existing_sub_eps:
logger.info(f"자막 스킵 (기존 존재): {sub.episode}화 - {sub.filename}")
continue
# 에피소드 번호를 모르는 경우: 파일명 직접 비교로 스킵
if sub.episode is None and sub.filename.lower() in existing_sub_files:
logger.info(f"자막 스킵 (파일명 존재): {sub.filename}")
continue
try:
path = await self.sub_downloader.download_file(sub, str(nas_folder))
result.subtitles.append(sub)
# 다운 후 실제 파일명으로 existing 갱신 (세션 내 중복 방지)
actual_name = Path(path).name
existing_sub_files.add(actual_name.lower())
if sub.episode is not None:
existing_sub_eps.add(sub.episode)
# 실제 파일명에서도 에피소드 추출해서 등록
actual_ep = self._extract_episode(actual_name)
if actual_ep is not None:
existing_sub_eps.add(actual_ep)
# ZIP 해제 시 모든 해제된 파일을 existing에 등록 (중복 다운 방지)
for sibling in nas_folder.iterdir():
if sibling.suffix.lower() in SUB_EXTS and sibling.is_file():
sname = sibling.name.lower()
if sname not in existing_sub_files:
existing_sub_files.add(sname)
sep = self._extract_episode(sibling.name)
if sep is not None:
existing_sub_eps.add(sep)
except Exception as e:
result.errors.append(f"자막 다운로드 실패 ({sub.filename}): {e}")
except Exception as e:
# URL이 404일 수 있으므로 debug만
logger.debug(f"자막 URL 접근 실패: {url} - {e}")
def _discover_episode_urls(
self,
base_url: str,
caption_episode: str,
target_episode: Optional[int],
) -> list[tuple[str, Optional[int]]]:
"""캡션 URL에서 같은 애니의 모든 에피소드 자막 URL 자동 검색.
Returns: [(url, episode_number), ...]
"""
try:
ep_num = int(caption_episode) if caption_episode else None
except (ValueError, TypeError):
ep_num = None
if target_episode is not None:
return [(base_url, ep_num)]
# Blogspot인 경우 Atom Feed 사용
if 'blogspot.com' in base_url:
feed_urls = self._discover_blogspot_episodes(base_url)
if feed_urls:
return feed_urls
return [(base_url, ep_num)]
def _discover_blogspot_episodes(self, known_url: str) -> list[tuple[str, Optional[int]]]:
"""Blogspot Atom Feed에서 같은 애니의 모든 에피소드 URL 검색.
Returns: [(url, episode_number), ...] 오래된 에피소드부터
"""
import httpx
m = re.match(r'(https?://[^/]+\.blogspot\.com)', known_url)
if not m:
return []
blog_base = m.group(1)
feed_url = f"{blog_base}/feeds/posts/default"
try:
resp = httpx.get(
feed_url,
params={'alt': 'json', 'max-results': 100},
timeout=15,
)
if resp.status_code != 200:
return []
data = resp.json()
entries = data.get('feed', {}).get('entry', [])
# known_url 포스트 찾기 → 애니 이름 추출
anime_name = None
for entry in entries:
links = [l for l in entry.get('link', []) if l.get('rel') == 'alternate']
entry_url = links[0].get('href', '') if links else ''
if entry_url == known_url:
title = entry.get('title', {}).get('$t', '')
anime_name = re.sub(r'\s*\d+화.*$', '', title).strip()
break
if not anime_name:
logger.info(f"Atom Feed에서 원본 포스트를 찾지 못함: {known_url}")
return []
# 같은 애니 이름의 모든 포스트 (url, episode) 수집
episode_urls = []
for entry in entries:
title = entry.get('title', {}).get('$t', '')
if anime_name not in title:
continue
links = [l for l in entry.get('link', []) if l.get('rel') == 'alternate']
entry_url = links[0].get('href', '') if links else ''
if entry_url:
ep = self._extract_episode(title)
episode_urls.append((entry_url, ep))
episode_urls.reverse() # 오래된 순으로
logger.info(
f"Blogspot Feed 검색: '{anime_name}'{len(episode_urls)}건 발견"
)
return episode_urls
except Exception as e:
logger.warning(f"Blogspot Feed 검색 실패: {e}")
return []
def _rename_subtitles_to_match_videos(
self, folder: Path, result: DownloadResult,
offset: int = 0,
):
"""폴더 내 자막 파일을 영상 파일명에 맞게 리네임."""
videos = {} # episode_num -> video_path
for f in folder.iterdir():
if f.suffix.lower() in VIDEO_EXTS:
ep = self._extract_episode(f.stem)
if ep is not None:
videos[ep] = f
if not videos:
return
for f in folder.rglob("*"):
if not f.is_file() or f.suffix.lower() not in SUB_EXTS:
continue
ep = self._extract_episode(f.stem)
if ep is None:
continue
# 직접 매칭 또는 offset 적용 매칭
video_ep = None
if ep in videos:
video_ep = ep
elif offset > 0 and (ep + offset) in videos:
video_ep = ep + offset
if video_ep is None:
continue
video_stem = videos[video_ep].stem
new_name = f"{video_stem}{f.suffix}"
new_path = folder / new_name
if new_path == f:
continue # 이미 정확한 이름
if new_path.exists():
# 이미 리네임된 자막 존재 → 중복 원본 삭제
try:
f.unlink()
logger.info(f"중복 자막 삭제: {f.name} (→ {new_name} 이미 존재)")
except Exception as e:
logger.warning(f"중복 자막 삭제 실패: {e}")
else:
try:
f.rename(new_path)
logger.info(f"자막 리네임: {f.name}{new_name}")
except Exception as e:
logger.warning(f"자막 리네임 실패: {e}")
# ──────────────────────────────────────
# 토렌트 추가
# ──────────────────────────────────────
async def _add_torrents(
self,
result: DownloadResult,
unit: AnimeWorkUnit,
episode: Optional[int],
):
"""토렌트 추가 — WorkUnit의 기존 데이터 사용 (재스캔 없음)."""
if not unit.torrents:
result.message += "\n⚠ 매칭되는 토렌트가 없습니다."
return
# WorkUnit에서 기존 정보 사용
existing_eps = unit.existing_eps
required_group = unit.release_group if len([
v for v in unit.existing_videos
if re.match(r'\[' + re.escape(unit.release_group) + r'\]', v)
]) >= 2 else None if unit.release_group else None
if existing_eps:
logger.info(f"NAS 기존 에피소드: {sorted(existing_eps)}")
if required_group:
logger.info(f"NAS 릴리스 그룹: [{required_group}]")
# 에피소드별 최고 점수 토렌트
offset = unit.episode_offset
ep_best: dict[int, tuple[int, TorrentResult]] = {}
for t in unit.torrents:
raw_ep = self._extract_episode(t.title)
if raw_ep is None:
continue
# 절대번호 → 시즌번호 변환 (표시용)
season_ep = raw_ep - offset if offset > 0 and raw_ep > offset else raw_ep
if season_ep <= 0:
continue # 이전 시즌 에피소드
if episode is not None and season_ep != episode:
continue
# existing_eps는 절대번호이므로 raw_ep으로 비교
if raw_ep in existing_eps:
continue
title_upper = t.title.upper()
if "VOSTFR" in title_upper or "VOSTA" in title_upper:
continue
if required_group and f"[{required_group}]" not in t.title:
continue
score = 0
if "[ASW]" in t.title:
score += 100
if "HEVC" in title_upper or "X265" in title_upper:
score += 50
if "1080P" in title_upper:
score += 20
if t.seeders > 0:
score += int(math.log(t.seeders) * 5)
if season_ep not in ep_best or score > ep_best[season_ep][0]:
ep_best[season_ep] = (score, t)
if not ep_best:
if episode is not None:
result.errors.append(f"{episode}화 토렌트를 찾지 못했습니다.")
elif required_group:
result.message = (
f"{result.anime.subject}: "
f"새로 다운로드할 에피소드가 없습니다 "
f"([{required_group}] 릴리스 기준)."
)
result.success = True
else:
result.message = f"{result.anime.subject}: 새로 다운로드할 에피소드가 없습니다."
result.success = True
return
# 에피소드 순서대로 추가
added_count = 0
for ep in sorted(ep_best.keys()):
_, torrent = ep_best[ep]
try:
success = await self.qbit.add_torrent(
magnet_or_url=torrent.magnet_link,
save_path=str(unit.nas_path),
category="anime",
tags=unit.anime.subject if unit.anime else "",
)
if success:
added_count += 1
# magnet에서 hash 추출
h_match = re.search(r'btih:([a-fA-F0-9]{40})', torrent.magnet_link)
if h_match:
result.torrent_hashes.append(h_match.group(1).lower())
logger.info(f"토렌트 추가: ep{ep} - {torrent.title[:50]}")
else:
result.errors.append(f"ep{ep} 토렌트 추가 실패")
except Exception as e:
result.errors.append(f"ep{ep} qBittorrent 오류: {e}")
result.torrent_added = added_count > 0
if added_count > 0:
new_eps = sorted(ep_best.keys())
result.message += f"\n📥 {added_count}개 에피소드 추가: {new_eps}"
# ──────────────────────────────────────
# 유틸리티
# ──────────────────────────────────────
async def get_status(self) -> list[dict]:
"""현재 다운로드 큐 상태."""
try:
torrents = await self.qbit.list_torrents(category="anime")
return [
{
"name": t.name,
"progress": f"{t.progress * 100:.1f}%",
"state": t.state,
"size": f"{t.size / (1024**3):.2f} GB" if t.size > 0 else "?",
"speed": f"{t.download_speed / (1024**2):.1f} MB/s" if t.download_speed > 0 else "0",
"eta": f"{t.eta // 60}" if t.eta > 0 else "",
"path": t.save_path,
}
for t in torrents
]
except Exception as e:
logger.error(f"qBittorrent 상태 조회 오류: {e}")
return []
@staticmethod
def _extract_episode(text: str) -> Optional[int]:
"""텍스트에서 에피소드 번호 추출."""
# SxxExx
m = re.search(r'[Ss]\d{1,2}[Ee](\d{1,4})', text)
if m:
return int(m.group(1))
# "- 03", "- 06v2"
m = re.search(r'[-]\s*(\d{1,4})(?:v\d)?(?:\s|$|\.|\[|\()', text)
if m:
return int(m.group(1))
# "3화"
m = re.search(r'(\d{1,4})\s*화', text)
if m:
return int(m.group(1))
# EP03
m = re.search(r'(?:EP|Episode)\s*(\d{1,4})', text, re.IGNORECASE)
if m:
return int(m.group(1))
return None
def _find_existing_nas_folder(self, korean_title: str, start_date: str = ""):
"""NAS에서 기존 폴더 찾기 (캐시 사용)."""
from tools.title_matcher import get_quarter
title_norm = re.sub(r'[^\w]', '', korean_title.lower())
if len(title_norm) < 2:
return None
anime_year, anime_quarter = get_quarter(start_date)
# 캐시가 있으면 사용, 없으면 직접 로드
try:
all_folders = self._nas_folder_cache or self.nas.list_anime_folders()
except Exception as e:
logger.warning(f"NAS 폴더 검색 실패: {e}")
return None
for folder in all_folders:
folder_norm = re.sub(r'[^\w]', '', folder.title.lower())
if not (title_norm in folder_norm or folder_norm in title_norm):
continue
if anime_year and folder.year != anime_year:
continue
if anime_quarter and folder.quarter != anime_quarter:
continue
logger.info(f"NAS 기존 폴더 발견: {folder.folder_name}")
return folder
return None
@staticmethod
def _extract_release_name(filename: str) -> str:
"""영상 파일명에서 릴리스 이름 추출.
[ASW] Sousou no Frieren S2 - 07 [1080p HEVC].mkv → 'Sousou no Frieren S2'
"""
name = re.sub(r'\.[^.]+$', '', filename)
name = re.sub(r'^\[[^\]]*\]\s*', '', name)
name = re.sub(r'\s*[-]\s*\d+.*$', '', name).strip()
name = re.sub(r'\s*S\d+E\d+.*$', '', name, flags=re.IGNORECASE).strip()
return name
@staticmethod
def _build_match_keywords(
eng_default: str, eng_english: str,
synonyms: list[str], original_title: str,
) -> list[str]:
"""Jikan 제목들에서 매칭용 키워드 추출."""
keywords = []
for syn in synonyms:
cleaned = syn.strip()
if 3 <= len(cleaned) <= 30:
keywords.append(cleaned.lower())
if eng_default:
clean = re.sub(r'\s*(2nd|3rd|\d+th)\s*Season.*$', '', eng_default, flags=re.IGNORECASE).strip()
clean = re.sub(r'\s*S\d+$', '', clean).strip()
if len(clean) >= 3:
keywords.append(clean.lower())
if eng_english:
short = eng_english.split(":")[0].strip()
if len(short) >= 3:
keywords.append(short.lower())
if original_title and len(original_title) >= 2:
clean = re.sub(r'\s*第\d+期$', '', original_title).strip()
if clean:
keywords.append(clean.lower())
seen = set()
return [k for k in keywords if k not in seen and not seen.add(k)]
@staticmethod
def _title_contains_keyword(nyaa_title: str, keywords: list[str]) -> bool:
"""Nyaa 토렌트 제목에 키워드 중 하나라도 포함되는지 체크."""
title_lower = nyaa_title.lower()
title_norm = re.sub(r'[^a-z0-9\s]', '', title_lower)
for kw in keywords:
if not kw or len(kw) < 2:
continue
kw_norm = re.sub(r'[^a-z0-9\s]', '', kw)
if kw_norm and len(kw_norm) >= 3 and kw_norm in title_norm:
return True
if not kw.isascii() and kw in title_lower:
return True
return False
# ── CLI 진입점 ──
if __name__ == "__main__":
import sys
import asyncio
import logging
# 로그 출력 설정
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s'
)
args = sys.argv[1:]
pipeline = AnimePipeline()
async def main():
if not args:
print("사용법: python tools/anime_pipeline.py [search|download|batch|status] [옵션]")
return
if args[0] == "search" and len(args) > 1:
title = " ".join(args[1:])
result = await pipeline.search(title)
print(result.message)
if result.errors:
print(f"⚠️ 오류: {'; '.join(result.errors)}")
elif args[0] == "download" and len(args) > 1:
title_parts = []
mode = "auto"
episode = None
i = 1
while i < len(args):
if args[i] == "--mode" and i + 1 < len(args):
mode = args[i + 1]
i += 2
elif args[i] == "--episode" and i + 1 < len(args):
episode = int(args[i + 1])
i += 2
else:
title_parts.append(args[i])
i += 1
title = " ".join(title_parts)
result = await pipeline.download(title, mode=mode, episode=episode)
print(result.message)
elif args[0] == "batch":
mode = "auto"
sub_filter = True
i = 1
while i < len(args):
if args[i] == "--no-sub-filter":
sub_filter = False
i += 1
elif args[i] == "--mode" and i + 1 < len(args):
mode = args[i + 1]
i += 2
else:
i += 1
print(f"📦 이번 분기 배치 다운로드 시작 (자막 필터: {'ON' if sub_filter else 'OFF'})")
results = await pipeline.batch_download(mode=mode, sub_filter=sub_filter)
success = sum(1 for r in results if r.success)
failed = sum(1 for r in results if not r.success)
print(f"\n📊 완료: {success}건 성공, {failed}건 실패 (총 {len(results)}건)")
for r in results:
icon = "" if r.success else ""
title = r.anime.subject if r.anime else "?"
print(f" {icon} {title}: {r.message[:80]}")
elif args[0] == "status":
status = await pipeline.get_status()
if not status:
print("🎬 다운로드 중인 항목 없음")
else:
for s in status:
print(f" {s['progress']} | {s['name'][:50]} | {s['speed']} | ETA: {s['eta']}")
else:
print("사용법: python tools/anime_pipeline.py [search|download|batch|status] [옵션]")
asyncio.run(main())