1271 lines
49 KiB
Python
1271 lines
49 KiB
Python
"""애니메이션 자동화 파이프라인.
|
||
|
||
전체 흐름:
|
||
1. resolve() — 4요소 세트 완성 (NAS폴더, 파일현황, Anissia명, Nyaa명)
|
||
2. download() — 완성된 WorkUnit으로 다운로드 실행
|
||
3. batch_download() — resolve() 반복 + download() 실행
|
||
"""
|
||
|
||
import asyncio
|
||
import logging
|
||
import math
|
||
import re
|
||
from collections import Counter
|
||
from dataclasses import dataclass, field
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
import config
|
||
from tools.anissia_client import AnissiaClient, AnimeInfo, CaptionInfo
|
||
from tools.nyaa_client import NyaaClient, TorrentResult
|
||
from tools.qbit_client import QBitClient
|
||
from tools.subtitle_downloader import SubtitleDownloader, SubtitleFile
|
||
from tools.title_matcher import (
|
||
match_titles, make_nas_folder_name, rename_subtitle_to_video,
|
||
fetch_english_title, fetch_title_via_jikan, web_search_anime_title,
|
||
)
|
||
|
||
logger = logging.getLogger("variet.tools.pipeline")
|
||
|
||
VIDEO_EXTS = {".mkv", ".mp4", ".avi", ".webm", ".m4v", ".ts"}
|
||
SUB_EXTS = {".ass", ".srt", ".ssa", ".sub", ".smi"}
|
||
|
||
|
||
# ──────────────────────────────────────────
|
||
# 데이터 모델
|
||
# ──────────────────────────────────────────
|
||
|
||
@dataclass
|
||
class AnimeWorkUnit:
|
||
"""다운로드 작업 1건의 완전한 정보 세트.
|
||
|
||
resolve()가 이 세트를 완성한 뒤에만 download()가 실행됨.
|
||
"""
|
||
# NAS 정보
|
||
nas_folder: str = "" # "[26_1분기]장송의프리렌2기"
|
||
nas_path: Path = field(default_factory=Path)
|
||
existing_videos: list[str] = field(default_factory=list)
|
||
existing_subs: list[str] = field(default_factory=list)
|
||
existing_eps: set[int] = field(default_factory=set)
|
||
release_group: str = "" # "ASW"
|
||
release_name: str = "" # "Sousou no Frieren S2"
|
||
episode_offset: int = 0 # 절대번호→시즌번호 변환 오프셋 (예: 12)
|
||
|
||
# Anissia 정보
|
||
anime: Optional[AnimeInfo] = None
|
||
captions: list[CaptionInfo] = field(default_factory=list)
|
||
|
||
# Nyaa 검색 정보
|
||
nyaa_keywords: list[str] = field(default_factory=list)
|
||
torrents: list[TorrentResult] = field(default_factory=list)
|
||
|
||
|
||
@dataclass
|
||
class DownloadResult:
|
||
"""파이프라인 실행 결과."""
|
||
success: bool
|
||
anime: Optional[AnimeInfo] = None
|
||
captions: list[CaptionInfo] = field(default_factory=list)
|
||
torrents: list[TorrentResult] = field(default_factory=list)
|
||
subtitles: list[SubtitleFile] = field(default_factory=list)
|
||
nas_folder: str = ""
|
||
torrent_added: bool = False
|
||
torrent_hashes: list[str] = field(default_factory=list)
|
||
message: str = ""
|
||
errors: list[str] = field(default_factory=list)
|
||
|
||
|
||
# ──────────────────────────────────────────
|
||
# 파이프라인
|
||
# ──────────────────────────────────────────
|
||
|
||
class AnimePipeline:
|
||
"""애니메이션 다운로드 자동화 파이프라인."""
|
||
|
||
def __init__(self):
|
||
self.anissia = AnissiaClient()
|
||
self.nyaa = NyaaClient()
|
||
self.qbit = QBitClient()
|
||
self.sub_downloader = SubtitleDownloader()
|
||
self.nas_base = getattr(config, "NAS_ANIME_PATH",
|
||
r"\\192.168.10.10\NasData\Video\Animation")
|
||
|
||
from tools.nas_scanner import NasScanner
|
||
self.nas = NasScanner(self.nas_base)
|
||
|
||
# 캐시 (세션당 1회 로드)
|
||
self._schedule_cache: list[AnimeInfo] | None = None
|
||
self._nas_folder_cache: list | None = None
|
||
|
||
# ──────────────────────────────────────
|
||
# 1단계: resolve — 4요소 세트 완성
|
||
# ──────────────────────────────────────
|
||
|
||
async def resolve(self, title: str) -> AnimeWorkUnit | None:
|
||
"""애니 제목에서 4요소 세트(NAS/Anissia/Nyaa/파일현황)를 완성합니다.
|
||
|
||
Fallback 체인:
|
||
1. Anissia 직접 검색 (compact 매칭 포함)
|
||
2. 웹 검색 — DuckDuckGo 한글+"애니" → 후보 → Anissia 재검색
|
||
3. NAS 파일명(영문) → Jikan(일본어) → Anissia 재검색
|
||
|
||
Returns:
|
||
완성된 AnimeWorkUnit, 또는 매칭 실패 시 None
|
||
"""
|
||
unit = AnimeWorkUnit()
|
||
|
||
# ── Step 1: NAS 기존 폴더 스캔 (1회) ──
|
||
nas_folder_obj = self._find_existing_nas_folder(title)
|
||
if nas_folder_obj:
|
||
unit.nas_folder = nas_folder_obj.folder_name
|
||
unit.nas_path = Path(nas_folder_obj.full_path)
|
||
unit.existing_videos = list(nas_folder_obj.video_files)
|
||
unit.existing_subs = list(nas_folder_obj.subtitle_files)
|
||
# 에피소드 번호 추출
|
||
for vf in unit.existing_videos:
|
||
ep = self._extract_episode(vf)
|
||
if ep is not None:
|
||
unit.existing_eps.add(ep)
|
||
# 릴리스 그룹 + 이름 추출
|
||
if unit.existing_videos:
|
||
unit.release_name = self._extract_release_name(unit.existing_videos[0])
|
||
groups = []
|
||
for vf in unit.existing_videos:
|
||
m = re.match(r'\[([^\]]+)\]', vf)
|
||
if m:
|
||
groups.append(m.group(1))
|
||
if groups:
|
||
unit.release_group = Counter(groups).most_common(1)[0][0]
|
||
|
||
if unit.existing_eps:
|
||
logger.info(
|
||
f"NAS 기존 폴더: {unit.nas_folder} | "
|
||
f"영상:{len(unit.existing_videos)} 자막:{len(unit.existing_subs)} "
|
||
f"그룹:[{unit.release_group}]"
|
||
)
|
||
|
||
# ── Step 2: Anissia 매칭 (fallback 체인) ──
|
||
anime = await self._resolve_anissia(title, unit)
|
||
if not anime:
|
||
logger.warning(f"Anissia 매칭 실패: '{title}'")
|
||
return None
|
||
unit.anime = anime
|
||
|
||
# NAS 폴더 재검증 — start_date로 정확한 시즌 매칭
|
||
if anime.start_date:
|
||
correct_folder = self._find_existing_nas_folder(
|
||
anime.subject, start_date=anime.start_date,
|
||
)
|
||
if correct_folder and correct_folder.folder_name != unit.nas_folder:
|
||
logger.info(
|
||
f"NAS 폴더 교정: {unit.nas_folder} → {correct_folder.folder_name}"
|
||
)
|
||
unit.nas_folder = correct_folder.folder_name
|
||
unit.nas_path = Path(correct_folder.full_path)
|
||
unit.existing_videos = list(correct_folder.video_files)
|
||
unit.existing_subs = list(correct_folder.subtitle_files)
|
||
unit.existing_eps = set()
|
||
for vf in unit.existing_videos:
|
||
ep = self._extract_episode(vf)
|
||
if ep is not None:
|
||
unit.existing_eps.add(ep)
|
||
if unit.existing_videos:
|
||
unit.release_name = self._extract_release_name(
|
||
unit.existing_videos[0]
|
||
)
|
||
groups = []
|
||
for v in unit.existing_videos:
|
||
m = re.match(r'\[([^\]]+)\]', v)
|
||
if m:
|
||
groups.append(m.group(1))
|
||
if groups:
|
||
from collections import Counter as _Counter
|
||
unit.release_group = _Counter(groups).most_common(1)[0][0]
|
||
|
||
# NAS 폴더가 없으면 Anissia 정보로 생성
|
||
if not unit.nas_folder:
|
||
unit.nas_folder = make_nas_folder_name(anime.subject, anime.start_date)
|
||
unit.nas_path = Path(self.nas_base) / unit.nas_folder
|
||
|
||
# ── Step 3: 자막 정보 ──
|
||
try:
|
||
unit.captions = await self.anissia.get_captions(anime.anime_no)
|
||
except Exception as e:
|
||
logger.warning(f"자막 조회 오류: {e}")
|
||
|
||
# ── Step 4: Nyaa 키워드 + 토렌트 확보 ──
|
||
await self._resolve_nyaa(unit)
|
||
|
||
# ── Step 5: 에피소드 번호 오프셋 감지 ──
|
||
self._detect_episode_offset(unit)
|
||
|
||
logger.info(
|
||
f"WorkUnit 완성: {anime.subject} | "
|
||
f"NAS:{unit.nas_folder} | sub:{len(unit.captions)} | "
|
||
f"tor:{len(unit.torrents)} | offset:{unit.episode_offset}"
|
||
)
|
||
return unit
|
||
|
||
async def _resolve_anissia(
|
||
self, title: str, unit: AnimeWorkUnit
|
||
) -> AnimeInfo | None:
|
||
"""다단계 fallback으로 Anissia 애니 매칭."""
|
||
|
||
# 1차: Anissia 직접 검색 (compact 매칭 포함)
|
||
anime_list = await self._search_anissia(title)
|
||
if anime_list:
|
||
return anime_list[0]
|
||
|
||
# 2차: 웹 검색 — 한글+"애니" → 후보 → Anissia 재검색
|
||
web_candidates = await web_search_anime_title(title)
|
||
for candidate in web_candidates:
|
||
logger.info(f"웹 검색 후보 → Anissia: '{candidate}'")
|
||
anime_list = await self._search_anissia(candidate)
|
||
if anime_list:
|
||
return anime_list[0]
|
||
|
||
# 3차: NAS 파일명(영문) → Jikan(일본어) → Anissia
|
||
if unit.release_name:
|
||
logger.info(f"NAS 파일명 fallback: '{unit.release_name}'")
|
||
jikan = await fetch_title_via_jikan(unit.release_name)
|
||
if jikan:
|
||
for key in ("japanese", "default"):
|
||
alt_title = jikan.get(key, "")
|
||
if not alt_title:
|
||
continue
|
||
logger.info(f"Jikan {key}: {alt_title}")
|
||
anime_list = await self._search_anissia(alt_title)
|
||
if anime_list:
|
||
return anime_list[0]
|
||
|
||
return None
|
||
|
||
async def _search_anissia(self, keyword: str) -> list[AnimeInfo]:
|
||
"""Anissia 검색 (에러 무시)."""
|
||
try:
|
||
return await self.anissia.search_anime(keyword)
|
||
except Exception:
|
||
return []
|
||
|
||
async def _resolve_nyaa(self, unit: AnimeWorkUnit):
|
||
"""Nyaa 토렌트 검색 — 기존 릴리스명 또는 Jikan 제목 사용."""
|
||
try:
|
||
# 전략 1: 기존 릴리스명이 있으면 그대로 검색 (가장 정확)
|
||
if unit.release_name:
|
||
logger.info(f"Nyaa 검색 (릴리스명): '{unit.release_name}'")
|
||
found = await self.nyaa.search(unit.release_name, use_default_suffix=False)
|
||
matched = [t for t in found
|
||
if self._title_contains_keyword(t.title, [unit.release_name.lower()])]
|
||
if matched:
|
||
# ASW/HEVC 우선 정렬 → 최대 50건
|
||
matched.sort(key=lambda t: (
|
||
0 if '[ASW]' in t.title else 1,
|
||
0 if 'HEVC' in t.title.upper() else 1,
|
||
))
|
||
unit.torrents = matched[:50]
|
||
unit.nyaa_keywords = [unit.release_name.lower()]
|
||
logger.info(f"Nyaa 릴리스명 → {len(found)}건 중 {len(matched)}건 매칭")
|
||
return
|
||
|
||
# 전략 2: Jikan 영어 제목 + ASW HEVC 검색
|
||
if unit.anime:
|
||
eng_titles = await fetch_english_title(unit.anime.original_subject)
|
||
eng_default = eng_titles.get("default", "")
|
||
eng_english = eng_titles.get("english", "")
|
||
synonyms = eng_titles.get("synonyms", [])
|
||
|
||
keywords = self._build_match_keywords(
|
||
eng_default, eng_english, synonyms, unit.anime.original_subject,
|
||
)
|
||
unit.nyaa_keywords = keywords
|
||
logger.info(f"매칭 키워드: {keywords}")
|
||
|
||
asw_results = await self.nyaa.search("ASW HEVC", use_default_suffix=False)
|
||
matched = [t for t in asw_results
|
||
if self._title_contains_keyword(t.title, keywords)]
|
||
if matched:
|
||
logger.info(f"ASW HEVC → {len(asw_results)}건 중 {len(matched)}건 매칭")
|
||
unit.torrents = matched[:30]
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Nyaa 검색 오류: {e}")
|
||
|
||
def _detect_episode_offset(self, unit: AnimeWorkUnit):
|
||
"""Nyaa 토렌트의 에피소드 번호 체계 감지 → 오프셋 설정.
|
||
|
||
- S2/S3 태그가 있으면 → 시즌 상대 번호 (오프셋 불필요)
|
||
- 없으면 → AniList prequel 체인으로 오프셋 계산
|
||
- Nyaa min_ep > offset이면 절대 번호 확정
|
||
"""
|
||
if not unit.torrents or not unit.anime:
|
||
return
|
||
|
||
# ASW 토렌트 중심으로 S-tag 감지
|
||
asw_torrents = [t for t in unit.torrents if '[ASW]' in t.title]
|
||
check_torrents = asw_torrents[:10] if asw_torrents else unit.torrents[:10]
|
||
|
||
has_season_tag = any(
|
||
re.search(r'\bS\d+\b|Season\s*\d+', t.title, re.IGNORECASE)
|
||
for t in check_torrents
|
||
)
|
||
|
||
if has_season_tag:
|
||
logger.info("에피소드 번호: 시즌 상대 번호 (S태그 감지)")
|
||
return # offset = 0
|
||
|
||
# S태그 없음 → AniList에서 prequel 체인으로 offset 계산
|
||
try:
|
||
offset = self._get_anilist_offset(unit.release_name or
|
||
unit.anime.original_subject)
|
||
if offset <= 0:
|
||
return
|
||
|
||
# Nyaa ep 번호가 실제로 offset보다 큰지 확인 (절대 번호 확정)
|
||
eps = [self._extract_episode(t.title) for t in check_torrents]
|
||
eps = [e for e in eps if e is not None]
|
||
if not eps:
|
||
return
|
||
|
||
min_ep = min(eps)
|
||
if min_ep > offset:
|
||
unit.episode_offset = offset
|
||
logger.info(
|
||
f"에피소드 번호: 절대 번호 감지 (offset={offset}, "
|
||
f"Nyaa min_ep={min_ep})"
|
||
)
|
||
else:
|
||
logger.info(
|
||
f"에피소드 번호: S태그 없지만 시즌 상대 번호 "
|
||
f"(offset={offset}, Nyaa min_ep={min_ep})"
|
||
)
|
||
except Exception as e:
|
||
logger.warning(f"에피소드 오프셋 감지 실패: {e}")
|
||
|
||
def _get_anilist_offset(self, title: str) -> int:
|
||
"""AniList prequel 체인으로 시즌 에피소드 오프셋 계산."""
|
||
import httpx
|
||
|
||
search_q = '''
|
||
query ($search: String) {
|
||
Page(page: 1, perPage: 10) {
|
||
media(search: $search, type: ANIME, format_in: [TV, TV_SHORT]) {
|
||
id
|
||
title { romaji }
|
||
episodes
|
||
relations {
|
||
edges {
|
||
relationType
|
||
node { id format }
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
'''
|
||
detail_q = '''
|
||
query ($id: Int) {
|
||
Media(id: $id, type: ANIME) {
|
||
id
|
||
title { romaji }
|
||
episodes
|
||
format
|
||
relations {
|
||
edges {
|
||
relationType
|
||
node { id format }
|
||
}
|
||
}
|
||
}
|
||
}
|
||
'''
|
||
|
||
resp = httpx.post(
|
||
'https://graphql.anilist.co',
|
||
json={'query': search_q, 'variables': {'search': title}},
|
||
timeout=15,
|
||
)
|
||
results = resp.json().get('data', {}).get('Page', {}).get('media', [])
|
||
if not results:
|
||
return 0
|
||
|
||
# prequel이 있는 결과 우선 (2기 이상)
|
||
media = None
|
||
for r in results:
|
||
edges = r.get('relations', {}).get('edges', [])
|
||
has_prequel = any(
|
||
e['relationType'] == 'PREQUEL'
|
||
and e['node'].get('format') in ('TV', 'TV_SHORT', None)
|
||
for e in edges
|
||
)
|
||
if has_prequel:
|
||
media = r
|
||
break
|
||
if not media:
|
||
return 0 # prequel 없음 = 1기 또는 단일 시즌
|
||
|
||
# prequel 체인 역추적
|
||
chain_eps = []
|
||
current = media
|
||
|
||
while True:
|
||
edges = current.get('relations', {}).get('edges', [])
|
||
prequels = [
|
||
e for e in edges
|
||
if e['relationType'] == 'PREQUEL'
|
||
and e['node'].get('format') in ('TV', 'TV_SHORT', None)
|
||
]
|
||
if not prequels:
|
||
break
|
||
|
||
prequel_id = prequels[0]['node']['id']
|
||
resp2 = httpx.post(
|
||
'https://graphql.anilist.co',
|
||
json={'query': detail_q, 'variables': {'id': prequel_id}},
|
||
timeout=15,
|
||
)
|
||
current = resp2.json().get('data', {}).get('Media')
|
||
if not current:
|
||
break
|
||
chain_eps.append(current.get('episodes', 0) or 0)
|
||
|
||
return sum(chain_eps)
|
||
|
||
# ──────────────────────────────────────
|
||
# 2단계: search — 검색 결과 표시용 (기존 호환)
|
||
# ──────────────────────────────────────
|
||
|
||
async def search(self, title: str) -> DownloadResult:
|
||
"""애니 검색 — 정보 + 자막 + 토렌트 현황 표시.
|
||
|
||
실제 다운로드 없이 검색 결과만 반환.
|
||
"""
|
||
unit = await self.resolve(title)
|
||
if not unit:
|
||
return DownloadResult(
|
||
success=False,
|
||
message=f"'{title}' 검색 결과가 없습니다.",
|
||
)
|
||
|
||
result = DownloadResult(
|
||
success=True,
|
||
anime=unit.anime,
|
||
captions=unit.captions,
|
||
torrents=unit.torrents,
|
||
nas_folder=unit.nas_folder,
|
||
)
|
||
result.message = (
|
||
f"**{unit.anime.subject}** ({unit.anime.original_subject})\n"
|
||
f"자막 제작자: {len(unit.captions)}명 | "
|
||
f"토렌트: {len(unit.torrents)}건\n"
|
||
f"NAS 폴더: `{unit.nas_folder}`"
|
||
)
|
||
if not unit.torrents:
|
||
result.errors.append(
|
||
f"⚠️ ASW HEVC 릴리스가 없습니다.\n"
|
||
f"Nyaa에서 직접 검색해주세요."
|
||
)
|
||
return result
|
||
|
||
# ──────────────────────────────────────
|
||
# 3단계: download — WorkUnit 기반 다운로드
|
||
# ──────────────────────────────────────
|
||
|
||
async def download(
|
||
self,
|
||
title: str,
|
||
mode: str = "auto",
|
||
episode: Optional[int] = None,
|
||
) -> DownloadResult:
|
||
"""애니 다운로드 실행.
|
||
|
||
Args:
|
||
title: 한글 제목
|
||
mode: "auto" | "sub_required" | "sub_only" | "video_only"
|
||
episode: 특정 에피소드 (None이면 빠진 것 전부)
|
||
"""
|
||
# resolve로 WorkUnit 완성
|
||
unit = await self.resolve(title)
|
||
if not unit:
|
||
return DownloadResult(
|
||
success=False,
|
||
message=f"'{title}' 검색 결과가 없습니다.",
|
||
)
|
||
|
||
return await self._execute_download(unit, mode, episode)
|
||
|
||
async def _execute_download(
|
||
self,
|
||
unit: AnimeWorkUnit,
|
||
mode: str = "auto",
|
||
episode: Optional[int] = None,
|
||
) -> DownloadResult:
|
||
"""완성된 WorkUnit으로 다운로드 실행 (추가 검색/스캔 없음)."""
|
||
result = DownloadResult(
|
||
success=True,
|
||
anime=unit.anime,
|
||
captions=unit.captions,
|
||
torrents=unit.torrents,
|
||
nas_folder=unit.nas_folder,
|
||
)
|
||
result.message = (
|
||
f"**{unit.anime.subject}** ({unit.anime.original_subject})\n"
|
||
f"자막: {len(unit.captions)}명 | 토렌트: {len(unit.torrents)}건"
|
||
)
|
||
|
||
nas_folder = unit.nas_path
|
||
|
||
# ── 자막 다운로드 ──
|
||
if mode in ("auto", "sub_only", "sub_required"):
|
||
await self._download_subtitles(result, unit, episode)
|
||
|
||
# ── 영상 토렌트 추가 ──
|
||
if mode in ("auto", "video_only"):
|
||
await self._add_torrents(result, unit, episode)
|
||
elif mode == "sub_required":
|
||
if result.subtitles:
|
||
await self._add_torrents(result, unit, episode)
|
||
else:
|
||
result.errors.append("자막이 없어 영상 다운로드를 보류합니다.")
|
||
|
||
# ── 토렌트 완료 대기 + 자막 리네임 + 정리 ──
|
||
if result.torrent_hashes:
|
||
await self._wait_and_cleanup_torrents(result, nas_folder)
|
||
|
||
# ── 항상: 기존+신규 자막을 영상 파일명에 맞게 리네임 ──
|
||
try:
|
||
self._rename_subtitles_to_match_videos(nas_folder, result, offset=unit.episode_offset)
|
||
except Exception as e:
|
||
logger.warning(f"자막 리네임 오류: {e}")
|
||
|
||
# 결과 메시지
|
||
parts = [result.message]
|
||
if result.subtitles:
|
||
parts.append(f"\n📝 자막 {len(result.subtitles)}건 다운로드 완료")
|
||
if result.torrent_added:
|
||
parts.append(f"\n🎬 토렌트 다운로드 완료 → `{nas_folder}`")
|
||
if result.errors:
|
||
parts.append(f"\n⚠️ 오류: " + "; ".join(result.errors))
|
||
result.message = "\n".join(parts)
|
||
return result
|
||
|
||
# ──────────────────────────────────────
|
||
# 4단계: batch_download — resolve + 일괄 실행
|
||
# ──────────────────────────────────────
|
||
|
||
async def batch_download(
|
||
self,
|
||
mode: str = "auto",
|
||
sub_filter: bool = True,
|
||
) -> list[DownloadResult]:
|
||
"""이번 분기 애니 일괄 다운로드.
|
||
|
||
Args:
|
||
mode: "auto" | "sub_only" | "video_only"
|
||
sub_filter: True면 자막 없는 애니는 영상만 다운 (video_only 모드)
|
||
"""
|
||
# 1. NAS 이번 분기 폴더 스캔
|
||
current_folders = self.nas.get_current_quarter_anime()
|
||
if not current_folders:
|
||
logger.warning("이번 분기 NAS 폴더 없음")
|
||
return []
|
||
|
||
logger.info(f"이번 분기 NAS 폴더: {len(current_folders)}개")
|
||
|
||
# 2. 캐시 미리 로드 (NAS + Anissia, 병렬 resolve 전)
|
||
self._nas_folder_cache = await asyncio.get_event_loop().run_in_executor(
|
||
None, self.nas.list_anime_folders
|
||
)
|
||
await self.anissia.search_anime("_cache_warmup_")
|
||
logger.info(f"캐시 로드 완료: NAS {len(self._nas_folder_cache)}개, Anissia {len(self.anissia._schedule_cache or [])}개")
|
||
|
||
# 3. 전체 resolve — 병렬 실행
|
||
async def _safe_resolve(folder):
|
||
try:
|
||
return await self.resolve(folder.title)
|
||
except Exception as e:
|
||
logger.error(f"resolve 오류 ({folder.title}): {e}")
|
||
return None
|
||
|
||
resolve_tasks = [_safe_resolve(f) for f in current_folders]
|
||
resolved = await asyncio.gather(*resolve_tasks)
|
||
|
||
# 4. 유효한 WorkUnit 필터 + 모드 결정
|
||
units: list[tuple[AnimeWorkUnit, str]] = []
|
||
for folder, unit in zip(current_folders, resolved):
|
||
if not unit:
|
||
logger.info(f" Anissia 매칭 실패: {folder.title}")
|
||
continue
|
||
|
||
effective_mode = mode
|
||
if sub_filter and not unit.captions:
|
||
effective_mode = "video_only"
|
||
logger.info(f" 자막 없음 → 영상만: {unit.anime.subject}")
|
||
|
||
units.append((unit, effective_mode))
|
||
|
||
logger.info(f"resolve 완료: {len(units)}/{len(current_folders)}개 유효")
|
||
|
||
# 5. 일괄 다운로드
|
||
results = []
|
||
for unit, effective_mode in units:
|
||
try:
|
||
result = await self._execute_download(unit, mode=effective_mode)
|
||
results.append(result)
|
||
status = "✅" if result.success else "❌"
|
||
logger.info(f" {status} {unit.anime.subject}: {result.message[:80]}")
|
||
except Exception as e:
|
||
logger.error(f" 오류 ({unit.anime.subject}): {e}")
|
||
results.append(DownloadResult(
|
||
success=False,
|
||
anime=unit.anime,
|
||
message=f"{unit.anime.subject}: 오류 - {e}",
|
||
errors=[str(e)],
|
||
))
|
||
|
||
return results
|
||
|
||
# ──────────────────────────────────────
|
||
# 토렌트 완료 대기 + 정리
|
||
# ──────────────────────────────────────
|
||
|
||
async def _wait_and_cleanup_torrents(
|
||
self,
|
||
result: DownloadResult,
|
||
nas_folder: Path,
|
||
timeout: int = 600,
|
||
poll_interval: int = 10,
|
||
):
|
||
"""토렌트 다운로드 완료 대기 → 자동 삭제.
|
||
|
||
Args:
|
||
timeout: 최대 대기 시간 (초, 기본 10분)
|
||
poll_interval: 폴링 간격 (초)
|
||
"""
|
||
if not result.torrent_hashes:
|
||
return
|
||
|
||
pending = set(result.torrent_hashes)
|
||
completed = set()
|
||
failed = set()
|
||
elapsed = 0
|
||
|
||
logger.info(
|
||
f"토렌트 완료 대기 시작: {len(pending)}건, 타임아웃 {timeout}초"
|
||
)
|
||
|
||
while pending and elapsed < timeout:
|
||
await asyncio.sleep(poll_interval)
|
||
elapsed += poll_interval
|
||
|
||
for h in list(pending):
|
||
try:
|
||
status = await self.qbit.get_torrent_status(h)
|
||
if status is None:
|
||
# 토렌트가 이미 없음 (수동 삭제 등)
|
||
pending.discard(h)
|
||
continue
|
||
|
||
if status.progress >= 1.0:
|
||
completed.add(h)
|
||
pending.discard(h)
|
||
logger.info(
|
||
f"토렌트 완료: {status.name[:50]} "
|
||
f"({status.size / (1024**2):.0f}MB)"
|
||
)
|
||
|
||
elif status.state in ("error", "missingFiles"):
|
||
failed.add(h)
|
||
pending.discard(h)
|
||
result.errors.append(
|
||
f"토렌트 실패 ({status.state}): {status.name[:50]}"
|
||
)
|
||
except Exception as e:
|
||
logger.warning(f"토렌트 상태 확인 실패 ({h[:8]}): {e}")
|
||
|
||
if pending:
|
||
logger.info(
|
||
f"토렌트 대기중: {len(pending)}건 남음 "
|
||
f"({elapsed}/{timeout}초)"
|
||
)
|
||
|
||
# 타임아웃된 토렌트
|
||
for h in pending:
|
||
result.errors.append(f"토렌트 타임아웃: {h[:16]}...")
|
||
failed.add(h)
|
||
|
||
# 완료된 토렌트 삭제 (파일은 남김)
|
||
for h in completed:
|
||
try:
|
||
await self.qbit.delete_torrent(h, delete_files=False)
|
||
logger.info(f"토렌트 삭제: {h[:16]}")
|
||
except Exception as e:
|
||
logger.warning(f"토렌트 삭제 실패 ({h[:8]}): {e}")
|
||
|
||
logger.info(
|
||
f"토렌트 정리 완료: {len(completed)}건 성공, "
|
||
f"{len(failed)}건 실패"
|
||
)
|
||
|
||
# ──────────────────────────────────────
|
||
# 자막 다운로드
|
||
# ──────────────────────────────────────
|
||
|
||
async def _download_subtitles(
|
||
self,
|
||
result: DownloadResult,
|
||
unit: AnimeWorkUnit,
|
||
episode: Optional[int],
|
||
):
|
||
"""자막 다운로드 → 영상 폴더에 직접 저장.
|
||
|
||
Anissia는 최신 에피소드 URL만 제공하므로,
|
||
URL 패턴을 분석해 이전 에피소드 URL을 자동 생성.
|
||
"""
|
||
nas_folder = unit.nas_path
|
||
nas_folder.mkdir(parents=True, exist_ok=True)
|
||
|
||
# ── 기존 자막 에피소드 + 파일 집합 (NAS 실시간 스캔) ──
|
||
existing_sub_eps = set()
|
||
existing_sub_files: set[str] = set()
|
||
offset = unit.episode_offset
|
||
|
||
# resolve 시점의 캐시 대신 실제 NAS 폴더에서 현재 자막 파일 스캔
|
||
try:
|
||
for f in nas_folder.iterdir():
|
||
if f.suffix.lower() in SUB_EXTS and f.is_file():
|
||
existing_sub_files.add(f.name.lower())
|
||
ep = self._extract_episode(f.name)
|
||
if ep is not None:
|
||
existing_sub_eps.add(ep)
|
||
if offset > 0 and ep > offset:
|
||
existing_sub_eps.add(ep - offset)
|
||
except (OSError, PermissionError) as e:
|
||
logger.warning(f"NAS 자막 스캔 오류: {e}")
|
||
|
||
if existing_sub_eps:
|
||
logger.info(f"기존 자막 에피소드 (스킵): {sorted(existing_sub_eps)}")
|
||
|
||
for caption in unit.captions:
|
||
if not caption.website:
|
||
continue
|
||
if episode is not None and caption.episode != str(episode):
|
||
continue
|
||
|
||
# 캡션 URL에서 에피소드 패턴 분석 → 이전 에피소드 URL 생성
|
||
urls_to_check = self._discover_episode_urls(
|
||
caption.website, caption.episode, episode,
|
||
)
|
||
|
||
for url, discovered_ep in urls_to_check:
|
||
# 이미 자막 있는 에피소드 → URL 페치 전에 스킵
|
||
if discovered_ep is not None and discovered_ep in existing_sub_eps:
|
||
logger.info(f"자막 URL 스킵 (기존 존재): ep{discovered_ep}")
|
||
continue
|
||
|
||
try:
|
||
subs = await self.sub_downloader.find_subtitles(url)
|
||
for sub in subs:
|
||
# sub.episode이 None이면 파일명에서 추출 시도
|
||
if sub.episode is None:
|
||
sub.episode = self._extract_episode(sub.filename)
|
||
# discovered_ep fallback
|
||
if sub.episode is None:
|
||
sub.episode = discovered_ep
|
||
|
||
if episode is not None and sub.episode is not None and sub.episode != episode:
|
||
continue
|
||
if sub.episode is not None and sub.episode in existing_sub_eps:
|
||
logger.info(f"자막 스킵 (기존 존재): {sub.episode}화 - {sub.filename}")
|
||
continue
|
||
|
||
# 에피소드 번호를 모르는 경우: 파일명 직접 비교로 스킵
|
||
if sub.episode is None and sub.filename.lower() in existing_sub_files:
|
||
logger.info(f"자막 스킵 (파일명 존재): {sub.filename}")
|
||
continue
|
||
|
||
try:
|
||
path = await self.sub_downloader.download_file(sub, str(nas_folder))
|
||
result.subtitles.append(sub)
|
||
# 다운 후 실제 파일명으로 existing 갱신 (세션 내 중복 방지)
|
||
actual_name = Path(path).name
|
||
existing_sub_files.add(actual_name.lower())
|
||
if sub.episode is not None:
|
||
existing_sub_eps.add(sub.episode)
|
||
# 실제 파일명에서도 에피소드 추출해서 등록
|
||
actual_ep = self._extract_episode(actual_name)
|
||
if actual_ep is not None:
|
||
existing_sub_eps.add(actual_ep)
|
||
except Exception as e:
|
||
result.errors.append(f"자막 다운로드 실패 ({sub.filename}): {e}")
|
||
except Exception as e:
|
||
# URL이 404일 수 있으므로 debug만
|
||
logger.debug(f"자막 URL 접근 실패: {url} - {e}")
|
||
|
||
# 자막 리네임
|
||
self._rename_subtitles_to_match_videos(nas_folder, result, offset=unit.episode_offset)
|
||
|
||
def _discover_episode_urls(
|
||
self,
|
||
base_url: str,
|
||
caption_episode: str,
|
||
target_episode: Optional[int],
|
||
) -> list[tuple[str, Optional[int]]]:
|
||
"""캡션 URL에서 같은 애니의 모든 에피소드 자막 URL 자동 검색.
|
||
|
||
Returns: [(url, episode_number), ...]
|
||
"""
|
||
try:
|
||
ep_num = int(caption_episode) if caption_episode else None
|
||
except (ValueError, TypeError):
|
||
ep_num = None
|
||
|
||
if target_episode is not None:
|
||
return [(base_url, ep_num)]
|
||
|
||
# Blogspot인 경우 Atom Feed 사용
|
||
if 'blogspot.com' in base_url:
|
||
feed_urls = self._discover_blogspot_episodes(base_url)
|
||
if feed_urls:
|
||
return feed_urls
|
||
|
||
return [(base_url, ep_num)]
|
||
|
||
def _discover_blogspot_episodes(self, known_url: str) -> list[tuple[str, Optional[int]]]:
|
||
"""Blogspot Atom Feed에서 같은 애니의 모든 에피소드 URL 검색.
|
||
|
||
Returns: [(url, episode_number), ...] 오래된 에피소드부터
|
||
"""
|
||
import httpx
|
||
|
||
m = re.match(r'(https?://[^/]+\.blogspot\.com)', known_url)
|
||
if not m:
|
||
return []
|
||
|
||
blog_base = m.group(1)
|
||
feed_url = f"{blog_base}/feeds/posts/default"
|
||
|
||
try:
|
||
resp = httpx.get(
|
||
feed_url,
|
||
params={'alt': 'json', 'max-results': 100},
|
||
timeout=15,
|
||
)
|
||
if resp.status_code != 200:
|
||
return []
|
||
|
||
data = resp.json()
|
||
entries = data.get('feed', {}).get('entry', [])
|
||
|
||
# known_url 포스트 찾기 → 애니 이름 추출
|
||
anime_name = None
|
||
for entry in entries:
|
||
links = [l for l in entry.get('link', []) if l.get('rel') == 'alternate']
|
||
entry_url = links[0].get('href', '') if links else ''
|
||
if entry_url == known_url:
|
||
title = entry.get('title', {}).get('$t', '')
|
||
anime_name = re.sub(r'\s*\d+화.*$', '', title).strip()
|
||
break
|
||
|
||
if not anime_name:
|
||
logger.info(f"Atom Feed에서 원본 포스트를 찾지 못함: {known_url}")
|
||
return []
|
||
|
||
# 같은 애니 이름의 모든 포스트 (url, episode) 수집
|
||
episode_urls = []
|
||
for entry in entries:
|
||
title = entry.get('title', {}).get('$t', '')
|
||
if anime_name not in title:
|
||
continue
|
||
links = [l for l in entry.get('link', []) if l.get('rel') == 'alternate']
|
||
entry_url = links[0].get('href', '') if links else ''
|
||
if entry_url:
|
||
ep = self._extract_episode(title)
|
||
episode_urls.append((entry_url, ep))
|
||
|
||
episode_urls.reverse() # 오래된 순으로
|
||
|
||
logger.info(
|
||
f"Blogspot Feed 검색: '{anime_name}' → {len(episode_urls)}건 발견"
|
||
)
|
||
return episode_urls
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Blogspot Feed 검색 실패: {e}")
|
||
return []
|
||
|
||
def _rename_subtitles_to_match_videos(
|
||
self, folder: Path, result: DownloadResult,
|
||
offset: int = 0,
|
||
):
|
||
"""폴더 내 자막 파일을 영상 파일명에 맞게 리네임."""
|
||
videos = {} # episode_num -> video_path
|
||
for f in folder.iterdir():
|
||
if f.suffix.lower() in VIDEO_EXTS:
|
||
ep = self._extract_episode(f.stem)
|
||
if ep is not None:
|
||
videos[ep] = f
|
||
|
||
if not videos:
|
||
return
|
||
|
||
for f in folder.iterdir():
|
||
if f.suffix.lower() not in SUB_EXTS:
|
||
continue
|
||
ep = self._extract_episode(f.stem)
|
||
if ep is None:
|
||
continue
|
||
|
||
# 직접 매칭 또는 offset 적용 매칭
|
||
video_ep = None
|
||
if ep in videos:
|
||
video_ep = ep
|
||
elif offset > 0 and (ep + offset) in videos:
|
||
video_ep = ep + offset
|
||
|
||
if video_ep is None:
|
||
continue
|
||
|
||
video_stem = videos[video_ep].stem
|
||
new_name = f"{video_stem}{f.suffix}"
|
||
new_path = folder / new_name
|
||
if new_path == f:
|
||
continue # 이미 정확한 이름
|
||
if new_path.exists():
|
||
# 이미 리네임된 자막 존재 → 중복 원본 삭제
|
||
try:
|
||
f.unlink()
|
||
logger.info(f"중복 자막 삭제: {f.name} (→ {new_name} 이미 존재)")
|
||
except Exception as e:
|
||
logger.warning(f"중복 자막 삭제 실패: {e}")
|
||
else:
|
||
try:
|
||
f.rename(new_path)
|
||
logger.info(f"자막 리네임: {f.name} → {new_name}")
|
||
except Exception as e:
|
||
logger.warning(f"자막 리네임 실패: {e}")
|
||
|
||
# ──────────────────────────────────────
|
||
# 토렌트 추가
|
||
# ──────────────────────────────────────
|
||
|
||
async def _add_torrents(
|
||
self,
|
||
result: DownloadResult,
|
||
unit: AnimeWorkUnit,
|
||
episode: Optional[int],
|
||
):
|
||
"""토렌트 추가 — WorkUnit의 기존 데이터 사용 (재스캔 없음)."""
|
||
if not unit.torrents:
|
||
result.errors.append("매칭되는 토렌트가 없습니다.")
|
||
return
|
||
|
||
# WorkUnit에서 기존 정보 사용
|
||
existing_eps = unit.existing_eps
|
||
required_group = unit.release_group if len([
|
||
v for v in unit.existing_videos
|
||
if re.match(r'\[' + re.escape(unit.release_group) + r'\]', v)
|
||
]) >= 2 else None if unit.release_group else None
|
||
|
||
if existing_eps:
|
||
logger.info(f"NAS 기존 에피소드: {sorted(existing_eps)}")
|
||
if required_group:
|
||
logger.info(f"NAS 릴리스 그룹: [{required_group}]")
|
||
|
||
# 에피소드별 최고 점수 토렌트
|
||
offset = unit.episode_offset
|
||
ep_best: dict[int, tuple[int, TorrentResult]] = {}
|
||
for t in unit.torrents:
|
||
raw_ep = self._extract_episode(t.title)
|
||
if raw_ep is None:
|
||
continue
|
||
# 절대번호 → 시즌번호 변환 (표시용)
|
||
season_ep = raw_ep - offset if offset > 0 and raw_ep > offset else raw_ep
|
||
if season_ep <= 0:
|
||
continue # 이전 시즌 에피소드
|
||
if episode is not None and season_ep != episode:
|
||
continue
|
||
# existing_eps는 절대번호이므로 raw_ep으로 비교
|
||
if raw_ep in existing_eps:
|
||
continue
|
||
|
||
title_upper = t.title.upper()
|
||
if "VOSTFR" in title_upper or "VOSTA" in title_upper:
|
||
continue
|
||
|
||
if required_group and f"[{required_group}]" not in t.title:
|
||
continue
|
||
|
||
score = 0
|
||
if "[ASW]" in t.title:
|
||
score += 100
|
||
if "HEVC" in title_upper or "X265" in title_upper:
|
||
score += 50
|
||
if "1080P" in title_upper:
|
||
score += 20
|
||
if t.seeders > 0:
|
||
score += int(math.log(t.seeders) * 5)
|
||
|
||
if season_ep not in ep_best or score > ep_best[season_ep][0]:
|
||
ep_best[season_ep] = (score, t)
|
||
|
||
if not ep_best:
|
||
if episode is not None:
|
||
result.errors.append(f"{episode}화 토렌트를 찾지 못했습니다.")
|
||
elif required_group:
|
||
result.errors.append(
|
||
f"새로 다운로드할 에피소드가 없습니다 "
|
||
f"([{required_group}] 릴리스 기준)."
|
||
)
|
||
else:
|
||
result.errors.append("새로 다운로드할 에피소드가 없습니다.")
|
||
return
|
||
|
||
# 에피소드 순서대로 추가
|
||
added_count = 0
|
||
for ep in sorted(ep_best.keys()):
|
||
_, torrent = ep_best[ep]
|
||
try:
|
||
success = await self.qbit.add_torrent(
|
||
magnet_or_url=torrent.magnet_link,
|
||
save_path=str(unit.nas_path),
|
||
category="anime",
|
||
tags=unit.anime.subject if unit.anime else "",
|
||
)
|
||
if success:
|
||
added_count += 1
|
||
# magnet에서 hash 추출
|
||
h_match = re.search(r'btih:([a-fA-F0-9]{40})', torrent.magnet_link)
|
||
if h_match:
|
||
result.torrent_hashes.append(h_match.group(1).lower())
|
||
logger.info(f"토렌트 추가: ep{ep} - {torrent.title[:50]}")
|
||
else:
|
||
result.errors.append(f"ep{ep} 토렌트 추가 실패")
|
||
except Exception as e:
|
||
result.errors.append(f"ep{ep} qBittorrent 오류: {e}")
|
||
|
||
result.torrent_added = added_count > 0
|
||
if added_count > 0:
|
||
new_eps = sorted(ep_best.keys())
|
||
result.message += f"\n📥 {added_count}개 에피소드 추가: {new_eps}"
|
||
|
||
# ──────────────────────────────────────
|
||
# 유틸리티
|
||
# ──────────────────────────────────────
|
||
|
||
async def get_status(self) -> list[dict]:
|
||
"""현재 다운로드 큐 상태."""
|
||
try:
|
||
torrents = await self.qbit.list_torrents(category="anime")
|
||
return [
|
||
{
|
||
"name": t.name,
|
||
"progress": f"{t.progress * 100:.1f}%",
|
||
"state": t.state,
|
||
"size": f"{t.size / (1024**3):.2f} GB" if t.size > 0 else "?",
|
||
"speed": f"{t.download_speed / (1024**2):.1f} MB/s" if t.download_speed > 0 else "0",
|
||
"eta": f"{t.eta // 60}분" if t.eta > 0 else "∞",
|
||
"path": t.save_path,
|
||
}
|
||
for t in torrents
|
||
]
|
||
except Exception as e:
|
||
logger.error(f"qBittorrent 상태 조회 오류: {e}")
|
||
return []
|
||
|
||
@staticmethod
|
||
def _extract_episode(text: str) -> Optional[int]:
|
||
"""텍스트에서 에피소드 번호 추출."""
|
||
# SxxExx
|
||
m = re.search(r'[Ss]\d{1,2}[Ee](\d{1,4})', text)
|
||
if m:
|
||
return int(m.group(1))
|
||
# "- 03", "- 06v2"
|
||
m = re.search(r'[-–]\s*(\d{1,4})(?:v\d)?(?:\s|$|\.|\[|\()', text)
|
||
if m:
|
||
return int(m.group(1))
|
||
# "3화"
|
||
m = re.search(r'(\d{1,4})\s*화', text)
|
||
if m:
|
||
return int(m.group(1))
|
||
# EP03
|
||
m = re.search(r'(?:EP|Episode)\s*(\d{1,4})', text, re.IGNORECASE)
|
||
if m:
|
||
return int(m.group(1))
|
||
return None
|
||
|
||
def _find_existing_nas_folder(self, korean_title: str, start_date: str = ""):
|
||
"""NAS에서 기존 폴더 찾기 (캐시 사용)."""
|
||
from tools.title_matcher import get_quarter
|
||
|
||
title_norm = re.sub(r'[^\w]', '', korean_title.lower())
|
||
if len(title_norm) < 2:
|
||
return None
|
||
|
||
anime_year, anime_quarter = get_quarter(start_date)
|
||
|
||
# 캐시가 있으면 사용, 없으면 직접 로드
|
||
try:
|
||
all_folders = self._nas_folder_cache or self.nas.list_anime_folders()
|
||
except Exception as e:
|
||
logger.warning(f"NAS 폴더 검색 실패: {e}")
|
||
return None
|
||
|
||
for folder in all_folders:
|
||
folder_norm = re.sub(r'[^\w]', '', folder.title.lower())
|
||
if not (title_norm in folder_norm or folder_norm in title_norm):
|
||
continue
|
||
if anime_year and folder.year != anime_year:
|
||
continue
|
||
if anime_quarter and folder.quarter != anime_quarter:
|
||
continue
|
||
logger.info(f"NAS 기존 폴더 발견: {folder.folder_name}")
|
||
return folder
|
||
|
||
return None
|
||
|
||
@staticmethod
|
||
def _extract_release_name(filename: str) -> str:
|
||
"""영상 파일명에서 릴리스 이름 추출.
|
||
|
||
[ASW] Sousou no Frieren S2 - 07 [1080p HEVC].mkv → 'Sousou no Frieren S2'
|
||
"""
|
||
name = re.sub(r'\.[^.]+$', '', filename)
|
||
name = re.sub(r'^\[[^\]]*\]\s*', '', name)
|
||
name = re.sub(r'\s*[-–]\s*\d+.*$', '', name).strip()
|
||
name = re.sub(r'\s*S\d+E\d+.*$', '', name, flags=re.IGNORECASE).strip()
|
||
return name
|
||
|
||
@staticmethod
|
||
def _build_match_keywords(
|
||
eng_default: str, eng_english: str,
|
||
synonyms: list[str], original_title: str,
|
||
) -> list[str]:
|
||
"""Jikan 제목들에서 매칭용 키워드 추출."""
|
||
keywords = []
|
||
|
||
for syn in synonyms:
|
||
cleaned = syn.strip()
|
||
if 3 <= len(cleaned) <= 30:
|
||
keywords.append(cleaned.lower())
|
||
|
||
if eng_default:
|
||
clean = re.sub(r'\s*(2nd|3rd|\d+th)\s*Season.*$', '', eng_default, flags=re.IGNORECASE).strip()
|
||
clean = re.sub(r'\s*S\d+$', '', clean).strip()
|
||
if len(clean) >= 3:
|
||
keywords.append(clean.lower())
|
||
|
||
if eng_english:
|
||
short = eng_english.split(":")[0].strip()
|
||
if len(short) >= 3:
|
||
keywords.append(short.lower())
|
||
|
||
if original_title and len(original_title) >= 2:
|
||
clean = re.sub(r'\s*第\d+期$', '', original_title).strip()
|
||
if clean:
|
||
keywords.append(clean.lower())
|
||
|
||
seen = set()
|
||
return [k for k in keywords if k not in seen and not seen.add(k)]
|
||
|
||
@staticmethod
|
||
def _title_contains_keyword(nyaa_title: str, keywords: list[str]) -> bool:
|
||
"""Nyaa 토렌트 제목에 키워드 중 하나라도 포함되는지 체크."""
|
||
title_lower = nyaa_title.lower()
|
||
title_norm = re.sub(r'[^a-z0-9\s]', '', title_lower)
|
||
|
||
for kw in keywords:
|
||
if not kw or len(kw) < 2:
|
||
continue
|
||
kw_norm = re.sub(r'[^a-z0-9\s]', '', kw)
|
||
if kw_norm and len(kw_norm) >= 3 and kw_norm in title_norm:
|
||
return True
|
||
if not kw.isascii() and kw in title_lower:
|
||
return True
|
||
return False
|
||
|
||
|
||
# ── CLI 진입점 ──
|
||
if __name__ == "__main__":
|
||
import sys
|
||
import asyncio
|
||
import logging
|
||
|
||
# 로그 출력 설정
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s'
|
||
)
|
||
|
||
args = sys.argv[1:]
|
||
pipeline = AnimePipeline()
|
||
|
||
async def main():
|
||
if not args:
|
||
print("사용법: python tools/anime_pipeline.py [search|download|batch|status] [옵션]")
|
||
return
|
||
|
||
if args[0] == "search" and len(args) > 1:
|
||
title = " ".join(args[1:])
|
||
result = await pipeline.search(title)
|
||
print(result.message)
|
||
if result.errors:
|
||
print(f"⚠️ 오류: {'; '.join(result.errors)}")
|
||
|
||
elif args[0] == "download" and len(args) > 1:
|
||
title_parts = []
|
||
mode = "auto"
|
||
episode = None
|
||
i = 1
|
||
while i < len(args):
|
||
if args[i] == "--mode" and i + 1 < len(args):
|
||
mode = args[i + 1]
|
||
i += 2
|
||
elif args[i] == "--episode" and i + 1 < len(args):
|
||
episode = int(args[i + 1])
|
||
i += 2
|
||
else:
|
||
title_parts.append(args[i])
|
||
i += 1
|
||
|
||
title = " ".join(title_parts)
|
||
result = await pipeline.download(title, mode=mode, episode=episode)
|
||
print(result.message)
|
||
|
||
elif args[0] == "batch":
|
||
mode = "auto"
|
||
sub_filter = True
|
||
i = 1
|
||
while i < len(args):
|
||
if args[i] == "--no-sub-filter":
|
||
sub_filter = False
|
||
i += 1
|
||
elif args[i] == "--mode" and i + 1 < len(args):
|
||
mode = args[i + 1]
|
||
i += 2
|
||
else:
|
||
i += 1
|
||
|
||
print(f"📦 이번 분기 배치 다운로드 시작 (자막 필터: {'ON' if sub_filter else 'OFF'})")
|
||
results = await pipeline.batch_download(mode=mode, sub_filter=sub_filter)
|
||
|
||
success = sum(1 for r in results if r.success)
|
||
failed = sum(1 for r in results if not r.success)
|
||
print(f"\n📊 완료: {success}건 성공, {failed}건 실패 (총 {len(results)}건)")
|
||
for r in results:
|
||
icon = "✅" if r.success else "❌"
|
||
title = r.anime.subject if r.anime else "?"
|
||
print(f" {icon} {title}: {r.message[:80]}")
|
||
|
||
elif args[0] == "status":
|
||
status = await pipeline.get_status()
|
||
if not status:
|
||
print("🎬 다운로드 중인 항목 없음")
|
||
else:
|
||
for s in status:
|
||
print(f" {s['progress']} | {s['name'][:50]} | {s['speed']} | ETA: {s['eta']}")
|
||
|
||
else:
|
||
print("사용법: python tools/anime_pipeline.py [search|download|batch|status] [옵션]")
|
||
|
||
asyncio.run(main())
|