"""애니메이션 자동화 파이프라인. 전체 흐름: 1. resolve() — 4요소 세트 완성 (NAS폴더, 파일현황, Anissia명, Nyaa명) 2. download() — 완성된 WorkUnit으로 다운로드 실행 3. batch_download() — resolve() 반복 + download() 실행 """ import asyncio import logging import math import re from collections import Counter from dataclasses import dataclass, field from pathlib import Path from typing import Optional import config from tools.anissia_client import AnissiaClient, AnimeInfo, CaptionInfo from tools.nyaa_client import NyaaClient, TorrentResult from tools.qbit_client import QBitClient from tools.subtitle_downloader import SubtitleDownloader, SubtitleFile from tools.title_matcher import ( match_titles, make_nas_folder_name, rename_subtitle_to_video, fetch_english_title, fetch_title_via_jikan, web_search_anime_title, ) logger = logging.getLogger("variet.tools.pipeline") VIDEO_EXTS = {".mkv", ".mp4", ".avi", ".webm", ".m4v", ".ts"} SUB_EXTS = {".ass", ".srt", ".ssa", ".sub", ".smi"} # ────────────────────────────────────────── # 데이터 모델 # ────────────────────────────────────────── @dataclass class AnimeWorkUnit: """다운로드 작업 1건의 완전한 정보 세트. resolve()가 이 세트를 완성한 뒤에만 download()가 실행됨. """ # NAS 정보 nas_folder: str = "" # "[26_1분기]장송의프리렌2기" nas_path: Path = field(default_factory=Path) existing_videos: list[str] = field(default_factory=list) existing_subs: list[str] = field(default_factory=list) existing_eps: set[int] = field(default_factory=set) release_group: str = "" # "ASW" release_name: str = "" # "Sousou no Frieren S2" episode_offset: int = 0 # 절대번호→시즌번호 변환 오프셋 (예: 12) # Anissia 정보 anime: Optional[AnimeInfo] = None captions: list[CaptionInfo] = field(default_factory=list) # Nyaa 검색 정보 nyaa_keywords: list[str] = field(default_factory=list) torrents: list[TorrentResult] = field(default_factory=list) @dataclass class DownloadResult: """파이프라인 실행 결과.""" success: bool anime: Optional[AnimeInfo] = None captions: list[CaptionInfo] = field(default_factory=list) torrents: list[TorrentResult] = field(default_factory=list) subtitles: list[SubtitleFile] = field(default_factory=list) nas_folder: str = "" torrent_added: bool = False torrent_hashes: list[str] = field(default_factory=list) message: str = "" errors: list[str] = field(default_factory=list) # ────────────────────────────────────────── # 파이프라인 # ────────────────────────────────────────── class AnimePipeline: """애니메이션 다운로드 자동화 파이프라인.""" def __init__(self): self.anissia = AnissiaClient() self.nyaa = NyaaClient() self.qbit = QBitClient() self.sub_downloader = SubtitleDownloader() self.nas_base = getattr(config, "NAS_ANIME_PATH", r"\\192.168.10.10\NasData\Video\Animation") from tools.nas_scanner import NasScanner self.nas = NasScanner(self.nas_base) # 캐시 (세션당 1회 로드) self._schedule_cache: list[AnimeInfo] | None = None self._nas_folder_cache: list | None = None # ────────────────────────────────────── # 1단계: resolve — 4요소 세트 완성 # ────────────────────────────────────── async def resolve(self, title: str) -> AnimeWorkUnit | None: """애니 제목에서 4요소 세트(NAS/Anissia/Nyaa/파일현황)를 완성합니다. Fallback 체인: 1. Anissia 직접 검색 (compact 매칭 포함) 2. 웹 검색 — DuckDuckGo 한글+"애니" → 후보 → Anissia 재검색 3. NAS 파일명(영문) → Jikan(일본어) → Anissia 재검색 Returns: 완성된 AnimeWorkUnit, 또는 매칭 실패 시 None """ unit = AnimeWorkUnit() # ── Step 1: NAS 기존 폴더 스캔 (1회) ── nas_folder_obj = self._find_existing_nas_folder(title) if nas_folder_obj: unit.nas_folder = nas_folder_obj.folder_name unit.nas_path = Path(nas_folder_obj.full_path) unit.existing_videos = list(nas_folder_obj.video_files) unit.existing_subs = list(nas_folder_obj.subtitle_files) # 에피소드 번호 추출 for vf in unit.existing_videos: ep = self._extract_episode(vf) if ep is not None: unit.existing_eps.add(ep) # 릴리스 그룹 + 이름 추출 if unit.existing_videos: unit.release_name = self._extract_release_name(unit.existing_videos[0]) groups = [] for vf in unit.existing_videos: m = re.match(r'\[([^\]]+)\]', vf) if m: groups.append(m.group(1)) if groups: unit.release_group = Counter(groups).most_common(1)[0][0] if unit.existing_eps: logger.info( f"NAS 기존 폴더: {unit.nas_folder} | " f"영상:{len(unit.existing_videos)} 자막:{len(unit.existing_subs)} " f"그룹:[{unit.release_group}]" ) # ── Step 2: Anissia 매칭 (fallback 체인) ── anime = await self._resolve_anissia(title, unit) if not anime: logger.warning(f"Anissia 매칭 실패: '{title}'") return None unit.anime = anime # NAS 폴더 재검증 — start_date로 정확한 시즌 매칭 if anime.start_date: correct_folder = self._find_existing_nas_folder( anime.subject, start_date=anime.start_date, ) if correct_folder and correct_folder.folder_name != unit.nas_folder: logger.info( f"NAS 폴더 교정: {unit.nas_folder} → {correct_folder.folder_name}" ) unit.nas_folder = correct_folder.folder_name unit.nas_path = Path(correct_folder.full_path) unit.existing_videos = list(correct_folder.video_files) unit.existing_subs = list(correct_folder.subtitle_files) unit.existing_eps = set() for vf in unit.existing_videos: ep = self._extract_episode(vf) if ep is not None: unit.existing_eps.add(ep) if unit.existing_videos: unit.release_name = self._extract_release_name( unit.existing_videos[0] ) groups = [] for v in unit.existing_videos: m = re.match(r'\[([^\]]+)\]', v) if m: groups.append(m.group(1)) if groups: from collections import Counter as _Counter unit.release_group = _Counter(groups).most_common(1)[0][0] # NAS 폴더가 없으면 Anissia 정보로 생성 if not unit.nas_folder: unit.nas_folder = make_nas_folder_name(anime.subject, anime.start_date) unit.nas_path = Path(self.nas_base) / unit.nas_folder # ── Step 3: 자막 정보 ── try: unit.captions = await self.anissia.get_captions(anime.anime_no) except Exception as e: logger.warning(f"자막 조회 오류: {e}") # ── Step 4: Nyaa 키워드 + 토렌트 확보 ── await self._resolve_nyaa(unit) # ── Step 5: 에피소드 번호 오프셋 감지 ── self._detect_episode_offset(unit) logger.info( f"WorkUnit 완성: {anime.subject} | " f"NAS:{unit.nas_folder} | sub:{len(unit.captions)} | " f"tor:{len(unit.torrents)} | offset:{unit.episode_offset}" ) return unit async def _resolve_anissia( self, title: str, unit: AnimeWorkUnit ) -> AnimeInfo | None: """다단계 fallback으로 Anissia 애니 매칭.""" # 1차: Anissia 직접 검색 (compact 매칭 포함) anime_list = await self._search_anissia(title) if anime_list: return anime_list[0] # 2차: 웹 검색 — 한글+"애니" → 후보 → Anissia 재검색 web_candidates = await web_search_anime_title(title) for candidate in web_candidates: logger.info(f"웹 검색 후보 → Anissia: '{candidate}'") anime_list = await self._search_anissia(candidate) if anime_list: return anime_list[0] # 3차: NAS 파일명(영문) → Jikan(일본어) → Anissia if unit.release_name: logger.info(f"NAS 파일명 fallback: '{unit.release_name}'") jikan = await fetch_title_via_jikan(unit.release_name) if jikan: for key in ("japanese", "default"): alt_title = jikan.get(key, "") if not alt_title: continue logger.info(f"Jikan {key}: {alt_title}") anime_list = await self._search_anissia(alt_title) if anime_list: return anime_list[0] return None async def _search_anissia(self, keyword: str) -> list[AnimeInfo]: """Anissia 검색 (에러 무시).""" try: return await self.anissia.search_anime(keyword) except Exception: return [] async def _resolve_nyaa(self, unit: AnimeWorkUnit): """Nyaa 토렌트 검색 — 기존 릴리스명 또는 Jikan 제목 사용.""" try: # 전략 1: 기존 릴리스명이 있으면 그대로 검색 (가장 정확) if unit.release_name: logger.info(f"Nyaa 검색 (릴리스명): '{unit.release_name}'") found = await self.nyaa.search(unit.release_name, use_default_suffix=False) matched = [t for t in found if self._title_contains_keyword(t.title, [unit.release_name.lower()])] if matched: # ASW/HEVC 우선 정렬 → 최대 50건 matched.sort(key=lambda t: ( 0 if '[ASW]' in t.title else 1, 0 if 'HEVC' in t.title.upper() else 1, )) unit.torrents = matched[:50] unit.nyaa_keywords = [unit.release_name.lower()] logger.info(f"Nyaa 릴리스명 → {len(found)}건 중 {len(matched)}건 매칭") return # 전략 2: Jikan 영어 제목 + ASW HEVC 검색 if unit.anime: eng_titles = await fetch_english_title(unit.anime.original_subject) eng_default = eng_titles.get("default", "") eng_english = eng_titles.get("english", "") synonyms = eng_titles.get("synonyms", []) keywords = self._build_match_keywords( eng_default, eng_english, synonyms, unit.anime.original_subject, ) unit.nyaa_keywords = keywords logger.info(f"매칭 키워드: {keywords}") asw_results = await self.nyaa.search("ASW HEVC", use_default_suffix=False) matched = [t for t in asw_results if self._title_contains_keyword(t.title, keywords)] if matched: logger.info(f"ASW HEVC → {len(asw_results)}건 중 {len(matched)}건 매칭") unit.torrents = matched[:30] except Exception as e: logger.warning(f"Nyaa 검색 오류: {e}") def _detect_episode_offset(self, unit: AnimeWorkUnit): """Nyaa 토렌트의 에피소드 번호 체계 감지 → 오프셋 설정. - S2/S3 태그가 있으면 → 시즌 상대 번호 (오프셋 불필요) - 없으면 → AniList prequel 체인으로 오프셋 계산 - Nyaa min_ep > offset이면 절대 번호 확정 """ if not unit.torrents or not unit.anime: return # ASW 토렌트 중심으로 S-tag 감지 asw_torrents = [t for t in unit.torrents if '[ASW]' in t.title] check_torrents = asw_torrents[:10] if asw_torrents else unit.torrents[:10] has_season_tag = any( re.search(r'\bS\d+\b|Season\s*\d+', t.title, re.IGNORECASE) for t in check_torrents ) if has_season_tag: logger.info("에피소드 번호: 시즌 상대 번호 (S태그 감지)") return # offset = 0 # S태그 없음 → AniList에서 prequel 체인으로 offset 계산 try: offset = self._get_anilist_offset(unit.release_name or unit.anime.original_subject) if offset <= 0: return # Nyaa ep 번호가 실제로 offset보다 큰지 확인 (절대 번호 확정) eps = [self._extract_episode(t.title) for t in check_torrents] eps = [e for e in eps if e is not None] if not eps: return min_ep = min(eps) if min_ep > offset: unit.episode_offset = offset logger.info( f"에피소드 번호: 절대 번호 감지 (offset={offset}, " f"Nyaa min_ep={min_ep})" ) else: logger.info( f"에피소드 번호: S태그 없지만 시즌 상대 번호 " f"(offset={offset}, Nyaa min_ep={min_ep})" ) except Exception as e: logger.warning(f"에피소드 오프셋 감지 실패: {e}") def _get_anilist_offset(self, title: str) -> int: """AniList prequel 체인으로 시즌 에피소드 오프셋 계산.""" import httpx search_q = ''' query ($search: String) { Page(page: 1, perPage: 10) { media(search: $search, type: ANIME, format_in: [TV, TV_SHORT]) { id title { romaji } episodes relations { edges { relationType node { id format } } } } } } ''' detail_q = ''' query ($id: Int) { Media(id: $id, type: ANIME) { id title { romaji } episodes format relations { edges { relationType node { id format } } } } } ''' resp = httpx.post( 'https://graphql.anilist.co', json={'query': search_q, 'variables': {'search': title}}, timeout=15, ) results = resp.json().get('data', {}).get('Page', {}).get('media', []) if not results: return 0 # prequel이 있는 결과 우선 (2기 이상) media = None for r in results: edges = r.get('relations', {}).get('edges', []) has_prequel = any( e['relationType'] == 'PREQUEL' and e['node'].get('format') in ('TV', 'TV_SHORT', None) for e in edges ) if has_prequel: media = r break if not media: return 0 # prequel 없음 = 1기 또는 단일 시즌 # prequel 체인 역추적 chain_eps = [] current = media while True: edges = current.get('relations', {}).get('edges', []) prequels = [ e for e in edges if e['relationType'] == 'PREQUEL' and e['node'].get('format') in ('TV', 'TV_SHORT', None) ] if not prequels: break prequel_id = prequels[0]['node']['id'] resp2 = httpx.post( 'https://graphql.anilist.co', json={'query': detail_q, 'variables': {'id': prequel_id}}, timeout=15, ) current = resp2.json().get('data', {}).get('Media') if not current: break chain_eps.append(current.get('episodes', 0) or 0) return sum(chain_eps) # ────────────────────────────────────── # 2단계: search — 검색 결과 표시용 (기존 호환) # ────────────────────────────────────── async def search(self, title: str) -> DownloadResult: """애니 검색 — 정보 + 자막 + 토렌트 현황 표시. 실제 다운로드 없이 검색 결과만 반환. """ unit = await self.resolve(title) if not unit: return DownloadResult( success=False, message=f"'{title}' 검색 결과가 없습니다.", ) result = DownloadResult( success=True, anime=unit.anime, captions=unit.captions, torrents=unit.torrents, nas_folder=unit.nas_folder, ) result.message = ( f"**{unit.anime.subject}** ({unit.anime.original_subject})\n" f"자막 제작자: {len(unit.captions)}명 | " f"토렌트: {len(unit.torrents)}건\n" f"NAS 폴더: `{unit.nas_folder}`" ) if not unit.torrents: result.errors.append( f"⚠️ ASW HEVC 릴리스가 없습니다.\n" f"Nyaa에서 직접 검색해주세요." ) return result # ────────────────────────────────────── # 3단계: download — WorkUnit 기반 다운로드 # ────────────────────────────────────── async def download( self, title: str, mode: str = "auto", episode: Optional[int] = None, ) -> DownloadResult: """애니 다운로드 실행. Args: title: 한글 제목 mode: "auto" | "sub_required" | "sub_only" | "video_only" episode: 특정 에피소드 (None이면 빠진 것 전부) """ # resolve로 WorkUnit 완성 unit = await self.resolve(title) if not unit: return DownloadResult( success=False, message=f"'{title}' 검색 결과가 없습니다.", ) return await self._execute_download(unit, mode, episode) async def _execute_download( self, unit: AnimeWorkUnit, mode: str = "auto", episode: Optional[int] = None, ) -> DownloadResult: """완성된 WorkUnit으로 다운로드 실행 (추가 검색/스캔 없음).""" result = DownloadResult( success=True, anime=unit.anime, captions=unit.captions, torrents=unit.torrents, nas_folder=unit.nas_folder, ) result.message = ( f"**{unit.anime.subject}** ({unit.anime.original_subject})\n" f"자막: {len(unit.captions)}명 | 토렌트: {len(unit.torrents)}건" ) nas_folder = unit.nas_path # ── 자막 다운로드 ── if mode in ("auto", "sub_only", "sub_required"): await self._download_subtitles(result, unit, episode) # ── 영상 토렌트 추가 ── if mode in ("auto", "video_only"): await self._add_torrents(result, unit, episode) elif mode == "sub_required": if result.subtitles: await self._add_torrents(result, unit, episode) else: result.errors.append("자막이 없어 영상 다운로드를 보류합니다.") # ── 토렌트 완료 대기 + 자막 리네임 + 정리 ── if result.torrent_hashes: await self._wait_and_cleanup_torrents(result, nas_folder) # ── 항상: 기존+신규 자막을 영상 파일명에 맞게 리네임 ── try: self._rename_subtitles_to_match_videos(nas_folder, result, offset=unit.episode_offset) except Exception as e: logger.warning(f"자막 리네임 오류: {e}") # 결과 메시지 parts = [result.message] if result.subtitles: parts.append(f"\n📝 자막 {len(result.subtitles)}건 다운로드 완료") if result.torrent_added: parts.append(f"\n🎬 토렌트 다운로드 완료 → `{nas_folder}`") if result.errors: parts.append(f"\n⚠️ 오류: " + "; ".join(result.errors)) result.message = "\n".join(parts) return result # ────────────────────────────────────── # 4단계: batch_download — resolve + 일괄 실행 # ────────────────────────────────────── async def batch_download( self, mode: str = "auto", sub_filter: bool = True, ) -> list[DownloadResult]: """이번 분기 애니 일괄 다운로드. Args: mode: "auto" | "sub_only" | "video_only" sub_filter: True면 자막 없는 애니는 영상만 다운 (video_only 모드) """ # 1. NAS 이번 분기 폴더 스캔 current_folders = self.nas.get_current_quarter_anime() if not current_folders: logger.warning("이번 분기 NAS 폴더 없음") return [] logger.info(f"이번 분기 NAS 폴더: {len(current_folders)}개") # 2. 캐시 미리 로드 (NAS + Anissia, 병렬 resolve 전) self._nas_folder_cache = await asyncio.get_event_loop().run_in_executor( None, self.nas.list_anime_folders ) await self.anissia.search_anime("_cache_warmup_") logger.info(f"캐시 로드 완료: NAS {len(self._nas_folder_cache)}개, Anissia {len(self.anissia._schedule_cache or [])}개") # 3. 전체 resolve — 병렬 실행 async def _safe_resolve(folder): try: return await self.resolve(folder.title) except Exception as e: logger.error(f"resolve 오류 ({folder.title}): {e}") return None resolve_tasks = [_safe_resolve(f) for f in current_folders] resolved = await asyncio.gather(*resolve_tasks) # 4. 유효한 WorkUnit 필터 + 모드 결정 units: list[tuple[AnimeWorkUnit, str]] = [] for folder, unit in zip(current_folders, resolved): if not unit: logger.info(f" Anissia 매칭 실패: {folder.title}") continue effective_mode = mode if sub_filter and not unit.captions: effective_mode = "video_only" logger.info(f" 자막 없음 → 영상만: {unit.anime.subject}") units.append((unit, effective_mode)) logger.info(f"resolve 완료: {len(units)}/{len(current_folders)}개 유효") # 5. 일괄 다운로드 results = [] for unit, effective_mode in units: try: result = await self._execute_download(unit, mode=effective_mode) results.append(result) status = "✅" if result.success else "❌" logger.info(f" {status} {unit.anime.subject}: {result.message[:80]}") except Exception as e: logger.error(f" 오류 ({unit.anime.subject}): {e}") results.append(DownloadResult( success=False, anime=unit.anime, message=f"{unit.anime.subject}: 오류 - {e}", errors=[str(e)], )) return results # ────────────────────────────────────── # 토렌트 완료 대기 + 정리 # ────────────────────────────────────── async def _wait_and_cleanup_torrents( self, result: DownloadResult, nas_folder: Path, timeout: int = 600, poll_interval: int = 10, ): """토렌트 다운로드 완료 대기 → 자동 삭제. Args: timeout: 최대 대기 시간 (초, 기본 10분) poll_interval: 폴링 간격 (초) """ if not result.torrent_hashes: return pending = set(result.torrent_hashes) completed = set() failed = set() elapsed = 0 logger.info( f"토렌트 완료 대기 시작: {len(pending)}건, 타임아웃 {timeout}초" ) while pending and elapsed < timeout: await asyncio.sleep(poll_interval) elapsed += poll_interval for h in list(pending): try: status = await self.qbit.get_torrent_status(h) if status is None: # 토렌트가 이미 없음 (수동 삭제 등) pending.discard(h) continue if status.progress >= 1.0: completed.add(h) pending.discard(h) logger.info( f"토렌트 완료: {status.name[:50]} " f"({status.size / (1024**2):.0f}MB)" ) elif status.state in ("error", "missingFiles"): failed.add(h) pending.discard(h) result.errors.append( f"토렌트 실패 ({status.state}): {status.name[:50]}" ) except Exception as e: logger.warning(f"토렌트 상태 확인 실패 ({h[:8]}): {e}") if pending: logger.info( f"토렌트 대기중: {len(pending)}건 남음 " f"({elapsed}/{timeout}초)" ) # 타임아웃된 토렌트 for h in pending: result.errors.append(f"토렌트 타임아웃: {h[:16]}...") failed.add(h) # 완료된 토렌트 삭제 (파일은 남김) for h in completed: try: await self.qbit.delete_torrent(h, delete_files=False) logger.info(f"토렌트 삭제: {h[:16]}") except Exception as e: logger.warning(f"토렌트 삭제 실패 ({h[:8]}): {e}") logger.info( f"토렌트 정리 완료: {len(completed)}건 성공, " f"{len(failed)}건 실패" ) # ────────────────────────────────────── # 자막 다운로드 # ────────────────────────────────────── async def _download_subtitles( self, result: DownloadResult, unit: AnimeWorkUnit, episode: Optional[int], ): """자막 다운로드 → 영상 폴더에 직접 저장. Anissia는 최신 에피소드 URL만 제공하므로, URL 패턴을 분석해 이전 에피소드 URL을 자동 생성. """ nas_folder = unit.nas_path nas_folder.mkdir(parents=True, exist_ok=True) # WorkUnit에서 기존 자막 에피소드 가져옴 (재스캔 없음) existing_sub_eps = set() offset = unit.episode_offset for sf in unit.existing_subs: ep = self._extract_episode(sf) if ep is not None: existing_sub_eps.add(ep) # offset이 있으면 시즌 상대 번호도 추가 if offset > 0 and ep > offset: existing_sub_eps.add(ep - offset) if existing_sub_eps: logger.info(f"기존 자막 에피소드 (스킵): {sorted(existing_sub_eps)}") for caption in unit.captions: if not caption.website: continue if episode is not None and caption.episode != str(episode): continue # 캡션 URL에서 에피소드 패턴 분석 → 이전 에피소드 URL 생성 urls_to_check = self._discover_episode_urls( caption.website, caption.episode, episode, ) for url, discovered_ep in urls_to_check: # 이미 자막 있는 에피소드 → URL 페치 전에 스킵 if discovered_ep is not None and discovered_ep in existing_sub_eps: logger.info(f"자막 URL 스킵 (기존 존재): ep{discovered_ep}") continue try: subs = await self.sub_downloader.find_subtitles(url) for sub in subs: # sub.episode이 None이면 파일명에서 추출 시도 if sub.episode is None: sub.episode = self._extract_episode(sub.filename) # discovered_ep fallback if sub.episode is None: sub.episode = discovered_ep if episode is not None and sub.episode is not None and sub.episode != episode: continue if sub.episode is not None and sub.episode in existing_sub_eps: logger.info(f"자막 스킵 (기존 존재): {sub.episode}화 - {sub.filename}") continue try: await self.sub_downloader.download_file(sub, str(nas_folder)) result.subtitles.append(sub) if sub.episode is not None: existing_sub_eps.add(sub.episode) except Exception as e: result.errors.append(f"자막 다운로드 실패 ({sub.filename}): {e}") except Exception as e: # URL이 404일 수 있으므로 debug만 logger.debug(f"자막 URL 접근 실패: {url} - {e}") # 자막 리네임 self._rename_subtitles_to_match_videos(nas_folder, result, offset=unit.episode_offset) def _discover_episode_urls( self, base_url: str, caption_episode: str, target_episode: Optional[int], ) -> list[tuple[str, Optional[int]]]: """캡션 URL에서 같은 애니의 모든 에피소드 자막 URL 자동 검색. Returns: [(url, episode_number), ...] """ try: ep_num = int(caption_episode) if caption_episode else None except (ValueError, TypeError): ep_num = None if target_episode is not None: return [(base_url, ep_num)] # Blogspot인 경우 Atom Feed 사용 if 'blogspot.com' in base_url: feed_urls = self._discover_blogspot_episodes(base_url) if feed_urls: return feed_urls return [(base_url, ep_num)] def _discover_blogspot_episodes(self, known_url: str) -> list[tuple[str, Optional[int]]]: """Blogspot Atom Feed에서 같은 애니의 모든 에피소드 URL 검색. Returns: [(url, episode_number), ...] 오래된 에피소드부터 """ import httpx m = re.match(r'(https?://[^/]+\.blogspot\.com)', known_url) if not m: return [] blog_base = m.group(1) feed_url = f"{blog_base}/feeds/posts/default" try: resp = httpx.get( feed_url, params={'alt': 'json', 'max-results': 100}, timeout=15, ) if resp.status_code != 200: return [] data = resp.json() entries = data.get('feed', {}).get('entry', []) # known_url 포스트 찾기 → 애니 이름 추출 anime_name = None for entry in entries: links = [l for l in entry.get('link', []) if l.get('rel') == 'alternate'] entry_url = links[0].get('href', '') if links else '' if entry_url == known_url: title = entry.get('title', {}).get('$t', '') anime_name = re.sub(r'\s*\d+화.*$', '', title).strip() break if not anime_name: logger.info(f"Atom Feed에서 원본 포스트를 찾지 못함: {known_url}") return [] # 같은 애니 이름의 모든 포스트 (url, episode) 수집 episode_urls = [] for entry in entries: title = entry.get('title', {}).get('$t', '') if anime_name not in title: continue links = [l for l in entry.get('link', []) if l.get('rel') == 'alternate'] entry_url = links[0].get('href', '') if links else '' if entry_url: ep = self._extract_episode(title) episode_urls.append((entry_url, ep)) episode_urls.reverse() # 오래된 순으로 logger.info( f"Blogspot Feed 검색: '{anime_name}' → {len(episode_urls)}건 발견" ) return episode_urls except Exception as e: logger.warning(f"Blogspot Feed 검색 실패: {e}") return [] def _rename_subtitles_to_match_videos( self, folder: Path, result: DownloadResult, offset: int = 0, ): """폴더 내 자막 파일을 영상 파일명에 맞게 리네임.""" videos = {} # episode_num -> video_path for f in folder.iterdir(): if f.suffix.lower() in VIDEO_EXTS: ep = self._extract_episode(f.stem) if ep is not None: videos[ep] = f if not videos: return for f in folder.iterdir(): if f.suffix.lower() not in SUB_EXTS: continue ep = self._extract_episode(f.stem) if ep is None: continue # 직접 매칭 또는 offset 적용 매칭 video_ep = None if ep in videos: video_ep = ep elif offset > 0 and (ep + offset) in videos: video_ep = ep + offset if video_ep is None: continue video_stem = videos[video_ep].stem new_name = f"{video_stem}{f.suffix}" new_path = folder / new_name if new_path == f: continue # 이미 정확한 이름 if new_path.exists(): # 이미 리네임된 자막 존재 → 중복 원본 삭제 try: f.unlink() logger.info(f"중복 자막 삭제: {f.name} (→ {new_name} 이미 존재)") except Exception as e: logger.warning(f"중복 자막 삭제 실패: {e}") else: try: f.rename(new_path) logger.info(f"자막 리네임: {f.name} → {new_name}") except Exception as e: logger.warning(f"자막 리네임 실패: {e}") # ────────────────────────────────────── # 토렌트 추가 # ────────────────────────────────────── async def _add_torrents( self, result: DownloadResult, unit: AnimeWorkUnit, episode: Optional[int], ): """토렌트 추가 — WorkUnit의 기존 데이터 사용 (재스캔 없음).""" if not unit.torrents: result.errors.append("매칭되는 토렌트가 없습니다.") return # WorkUnit에서 기존 정보 사용 existing_eps = unit.existing_eps required_group = unit.release_group if len([ v for v in unit.existing_videos if re.match(r'\[' + re.escape(unit.release_group) + r'\]', v) ]) >= 2 else None if unit.release_group else None if existing_eps: logger.info(f"NAS 기존 에피소드: {sorted(existing_eps)}") if required_group: logger.info(f"NAS 릴리스 그룹: [{required_group}]") # 에피소드별 최고 점수 토렌트 offset = unit.episode_offset ep_best: dict[int, tuple[int, TorrentResult]] = {} for t in unit.torrents: raw_ep = self._extract_episode(t.title) if raw_ep is None: continue # 절대번호 → 시즌번호 변환 (표시용) season_ep = raw_ep - offset if offset > 0 and raw_ep > offset else raw_ep if season_ep <= 0: continue # 이전 시즌 에피소드 if episode is not None and season_ep != episode: continue # existing_eps는 절대번호이므로 raw_ep으로 비교 if raw_ep in existing_eps: continue title_upper = t.title.upper() if "VOSTFR" in title_upper or "VOSTA" in title_upper: continue if required_group and f"[{required_group}]" not in t.title: continue score = 0 if "[ASW]" in t.title: score += 100 if "HEVC" in title_upper or "X265" in title_upper: score += 50 if "1080P" in title_upper: score += 20 if t.seeders > 0: score += int(math.log(t.seeders) * 5) if season_ep not in ep_best or score > ep_best[season_ep][0]: ep_best[season_ep] = (score, t) if not ep_best: if episode is not None: result.errors.append(f"{episode}화 토렌트를 찾지 못했습니다.") elif required_group: result.errors.append( f"새로 다운로드할 에피소드가 없습니다 " f"([{required_group}] 릴리스 기준)." ) else: result.errors.append("새로 다운로드할 에피소드가 없습니다.") return # 에피소드 순서대로 추가 added_count = 0 for ep in sorted(ep_best.keys()): _, torrent = ep_best[ep] try: success = await self.qbit.add_torrent( magnet_or_url=torrent.magnet_link, save_path=str(unit.nas_path), category="anime", tags=unit.anime.subject if unit.anime else "", ) if success: added_count += 1 # magnet에서 hash 추출 h_match = re.search(r'btih:([a-fA-F0-9]{40})', torrent.magnet_link) if h_match: result.torrent_hashes.append(h_match.group(1).lower()) logger.info(f"토렌트 추가: ep{ep} - {torrent.title[:50]}") else: result.errors.append(f"ep{ep} 토렌트 추가 실패") except Exception as e: result.errors.append(f"ep{ep} qBittorrent 오류: {e}") result.torrent_added = added_count > 0 if added_count > 0: new_eps = sorted(ep_best.keys()) result.message += f"\n📥 {added_count}개 에피소드 추가: {new_eps}" # ────────────────────────────────────── # 유틸리티 # ────────────────────────────────────── async def get_status(self) -> list[dict]: """현재 다운로드 큐 상태.""" try: torrents = await self.qbit.list_torrents(category="anime") return [ { "name": t.name, "progress": f"{t.progress * 100:.1f}%", "state": t.state, "size": f"{t.size / (1024**3):.2f} GB" if t.size > 0 else "?", "speed": f"{t.download_speed / (1024**2):.1f} MB/s" if t.download_speed > 0 else "0", "eta": f"{t.eta // 60}분" if t.eta > 0 else "∞", "path": t.save_path, } for t in torrents ] except Exception as e: logger.error(f"qBittorrent 상태 조회 오류: {e}") return [] @staticmethod def _extract_episode(text: str) -> Optional[int]: """텍스트에서 에피소드 번호 추출.""" # SxxExx m = re.search(r'[Ss]\d{1,2}[Ee](\d{1,4})', text) if m: return int(m.group(1)) # "- 03", "- 06v2" m = re.search(r'[-–]\s*(\d{1,4})(?:v\d)?(?:\s|$|\.|\[|\()', text) if m: return int(m.group(1)) # "3화" m = re.search(r'(\d{1,4})\s*화', text) if m: return int(m.group(1)) # EP03 m = re.search(r'(?:EP|Episode)\s*(\d{1,4})', text, re.IGNORECASE) if m: return int(m.group(1)) return None def _find_existing_nas_folder(self, korean_title: str, start_date: str = ""): """NAS에서 기존 폴더 찾기 (캐시 사용).""" from tools.title_matcher import get_quarter title_norm = re.sub(r'[^\w]', '', korean_title.lower()) if len(title_norm) < 2: return None anime_year, anime_quarter = get_quarter(start_date) # 캐시가 있으면 사용, 없으면 직접 로드 try: all_folders = self._nas_folder_cache or self.nas.list_anime_folders() except Exception as e: logger.warning(f"NAS 폴더 검색 실패: {e}") return None for folder in all_folders: folder_norm = re.sub(r'[^\w]', '', folder.title.lower()) if not (title_norm in folder_norm or folder_norm in title_norm): continue if anime_year and folder.year != anime_year: continue if anime_quarter and folder.quarter != anime_quarter: continue logger.info(f"NAS 기존 폴더 발견: {folder.folder_name}") return folder return None @staticmethod def _extract_release_name(filename: str) -> str: """영상 파일명에서 릴리스 이름 추출. [ASW] Sousou no Frieren S2 - 07 [1080p HEVC].mkv → 'Sousou no Frieren S2' """ name = re.sub(r'\.[^.]+$', '', filename) name = re.sub(r'^\[[^\]]*\]\s*', '', name) name = re.sub(r'\s*[-–]\s*\d+.*$', '', name).strip() name = re.sub(r'\s*S\d+E\d+.*$', '', name, flags=re.IGNORECASE).strip() return name @staticmethod def _build_match_keywords( eng_default: str, eng_english: str, synonyms: list[str], original_title: str, ) -> list[str]: """Jikan 제목들에서 매칭용 키워드 추출.""" keywords = [] for syn in synonyms: cleaned = syn.strip() if 3 <= len(cleaned) <= 30: keywords.append(cleaned.lower()) if eng_default: clean = re.sub(r'\s*(2nd|3rd|\d+th)\s*Season.*$', '', eng_default, flags=re.IGNORECASE).strip() clean = re.sub(r'\s*S\d+$', '', clean).strip() if len(clean) >= 3: keywords.append(clean.lower()) if eng_english: short = eng_english.split(":")[0].strip() if len(short) >= 3: keywords.append(short.lower()) if original_title and len(original_title) >= 2: clean = re.sub(r'\s*第\d+期$', '', original_title).strip() if clean: keywords.append(clean.lower()) seen = set() return [k for k in keywords if k not in seen and not seen.add(k)] @staticmethod def _title_contains_keyword(nyaa_title: str, keywords: list[str]) -> bool: """Nyaa 토렌트 제목에 키워드 중 하나라도 포함되는지 체크.""" title_lower = nyaa_title.lower() title_norm = re.sub(r'[^a-z0-9\s]', '', title_lower) for kw in keywords: if not kw or len(kw) < 2: continue kw_norm = re.sub(r'[^a-z0-9\s]', '', kw) if kw_norm and len(kw_norm) >= 3 and kw_norm in title_norm: return True if not kw.isascii() and kw in title_lower: return True return False # ── CLI 진입점 ── if __name__ == "__main__": import sys import asyncio import logging # 로그 출력 설정 logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s' ) args = sys.argv[1:] pipeline = AnimePipeline() async def main(): if not args: print("사용법: python tools/anime_pipeline.py [search|download|batch|status] [옵션]") return if args[0] == "search" and len(args) > 1: title = " ".join(args[1:]) result = await pipeline.search(title) print(result.message) if result.errors: print(f"⚠️ 오류: {'; '.join(result.errors)}") elif args[0] == "download" and len(args) > 1: title_parts = [] mode = "auto" episode = None i = 1 while i < len(args): if args[i] == "--mode" and i + 1 < len(args): mode = args[i + 1] i += 2 elif args[i] == "--episode" and i + 1 < len(args): episode = int(args[i + 1]) i += 2 else: title_parts.append(args[i]) i += 1 title = " ".join(title_parts) result = await pipeline.download(title, mode=mode, episode=episode) print(result.message) elif args[0] == "batch": mode = "auto" sub_filter = True i = 1 while i < len(args): if args[i] == "--no-sub-filter": sub_filter = False i += 1 elif args[i] == "--mode" and i + 1 < len(args): mode = args[i + 1] i += 2 else: i += 1 print(f"📦 이번 분기 배치 다운로드 시작 (자막 필터: {'ON' if sub_filter else 'OFF'})") results = await pipeline.batch_download(mode=mode, sub_filter=sub_filter) success = sum(1 for r in results if r.success) failed = sum(1 for r in results if not r.success) print(f"\n📊 완료: {success}건 성공, {failed}건 실패 (총 {len(results)}건)") for r in results: icon = "✅" if r.success else "❌" title = r.anime.subject if r.anime else "?" print(f" {icon} {title}: {r.message[:80]}") elif args[0] == "status": status = await pipeline.get_status() if not status: print("🎬 다운로드 중인 항목 없음") else: for s in status: print(f" {s['progress']} | {s['name'][:50]} | {s['speed']} | ETA: {s['eta']}") else: print("사용법: python tools/anime_pipeline.py [search|download|batch|status] [옵션]") asyncio.run(main())