Files
variet-agent/tools/anime_pipeline.py

266 lines
9.9 KiB
Python

"""애니메이션 자동화 파이프라인.
전체 흐름:
1. Anissia에서 애니 검색 → 자막 정보 확인
2. Nyaa.si에서 토렌트 검색 → 제목 매칭
3. qBittorrent에 magnet 추가 → NAS 경로 지정
4. 자막 다운로드 → 파일명 매칭
"""
import asyncio
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
import config
from tools.anissia_client import AnissiaClient, AnimeInfo, CaptionInfo
from tools.nyaa_client import NyaaClient, TorrentResult
from tools.qbit_client import QBitClient
from tools.subtitle_downloader import SubtitleDownloader, SubtitleFile
from tools.title_matcher import (
match_titles, make_nas_folder_name, rename_subtitle_to_video,
)
logger = logging.getLogger("variet.tools.pipeline")
@dataclass
class DownloadResult:
"""파이프라인 실행 결과."""
success: bool
anime: Optional[AnimeInfo] = None
captions: list[CaptionInfo] = field(default_factory=list)
torrents: list[TorrentResult] = field(default_factory=list)
subtitles: list[SubtitleFile] = field(default_factory=list)
nas_folder: str = ""
torrent_added: bool = False
message: str = ""
errors: list[str] = field(default_factory=list)
class AnimePipeline:
"""애니메이션 다운로드 자동화 파이프라인."""
def __init__(self):
self.anissia = AnissiaClient()
self.nyaa = NyaaClient()
self.qbit = QBitClient()
self.sub_downloader = SubtitleDownloader()
self.nas_base = getattr(config, "NAS_ANIME_PATH",
r"\\192.168.10.10\NasData\Video\Animation")
from tools.nas_scanner import NasScanner
self.nas = NasScanner(self.nas_base)
async def search(self, title: str) -> DownloadResult:
"""애니 검색 — 정보 + 자막 + 토렌트 현황 표시.
실제 다운로드 없이 검색 결과만 반환.
"""
result = DownloadResult(success=False)
# 1. Anissia에서 검색
try:
anime_list = await self.anissia.search_anime(title)
except Exception as e:
result.errors.append(f"Anissia 검색 오류: {e}")
return result
if not anime_list:
result.message = f"'{title}' 검색 결과가 없습니다."
return result
anime = anime_list[0] # 첫 번째 결과 사용
result.anime = anime
# 2. 자막 정보
try:
captions = await self.anissia.get_captions(anime.anime_no)
result.captions = captions
except Exception as e:
result.errors.append(f"자막 조회 오류: {e}")
# 3. Nyaa 토렌트 검색 (다중 전략 — suffix 있는/없는 조합)
try:
from tools.title_matcher import japanese_to_romaji
import re as _re
romaji_full = japanese_to_romaji(anime.original_subject)
# 한자/비ASCII 잔류 문자 제거 → 순수 로마자만 추출
romaji_clean = _re.sub(r'[^\x00-\x7F]+', ' ', romaji_full).strip()
romaji_clean = _re.sub(r'\s+', ' ', romaji_clean)
# 검색 전략 (query, use_default_suffix) 순서
strategies: list[tuple[str, bool]] = []
if romaji_clean and len(romaji_clean) >= 3:
strategies.append((romaji_clean, True)) # romaji + ASW HEVC
strategies.append((romaji_clean, False)) # romaji only
strategies.append((anime.original_subject, True)) # 원제 + suffix
strategies.append((anime.original_subject, False)) # 원제 only
strategies.append((anime.subject, True)) # 한글 + suffix
strategies.append((anime.subject, False)) # 한글 only
torrents = []
for query, use_suffix in strategies:
torrents = await self.nyaa.search(
query, use_default_suffix=use_suffix,
)
if torrents:
suffix_label = " +suffix" if use_suffix else ""
logger.info(
f"Nyaa 검색 성공: '{query}'{suffix_label}{len(torrents)}"
)
break
# 제목 매칭 필터링
matched = match_titles(
anime.subject, anime.original_subject, torrents, threshold=0.3
)
result.torrents = matched[:20] # 상위 20개
except Exception as e:
result.errors.append(f"Nyaa 검색 오류: {e}")
# NAS 폴더명 생성
result.nas_folder = make_nas_folder_name(anime.subject, anime.start_date)
result.success = True
result.message = (
f"**{anime.subject}** ({anime.original_subject})\n"
f"자막 제작자: {len(result.captions)}명 | "
f"토렌트: {len(result.torrents)}\n"
f"NAS 폴더: `{result.nas_folder}`"
)
return result
async def download(
self,
title: str,
mode: str = "auto",
episode: Optional[int] = None,
) -> DownloadResult:
"""애니 다운로드 실행.
Args:
title: 한글 제목
mode: "auto" (자막+영상), "sub_only" (자막만), "video_only" (영상만)
episode: 특정 에피소드만 (None이면 최신)
"""
# 먼저 검색
result = await self.search(title)
if not result.success:
return result
anime = result.anime
nas_folder = Path(self.nas_base) / result.nas_folder
# ── 자막 다운로드 ──
if mode in ("auto", "sub_only"):
await self._download_subtitles(result, nas_folder, episode)
# ── 영상 토렌트 추가 ──
if mode in ("auto", "video_only"):
force = (mode == "video_only")
await self._add_torrents(result, nas_folder, episode, force=force)
# 결과 메시지 구성
parts = [result.message]
if result.subtitles:
parts.append(f"\n📝 자막 {len(result.subtitles)}건 다운로드 완료")
if result.torrent_added:
parts.append(f"\n🎬 토렌트 추가 완료 → `{nas_folder}`")
if result.errors:
parts.append(f"\n⚠️ 오류: " + "; ".join(result.errors))
result.message = "\n".join(parts)
return result
async def _download_subtitles(
self,
result: DownloadResult,
nas_folder: Path,
episode: Optional[int],
):
"""자막 다운로드 처리."""
sub_dir = nas_folder / "subtitles"
for caption in result.captions:
if not caption.website:
continue
if episode is not None and caption.episode != str(episode):
continue
try:
subs = await self.sub_downloader.find_subtitles(caption.website)
for sub in subs:
if episode is not None and sub.episode is not None and sub.episode != episode:
continue
try:
await self.sub_downloader.download_file(sub, str(sub_dir))
result.subtitles.append(sub)
except Exception as e:
result.errors.append(f"자막 다운로드 실패 ({sub.filename}): {e}")
except Exception as e:
result.errors.append(f"자막 사이트 접근 실패 ({caption.name}): {e}")
async def _add_torrents(
self,
result: DownloadResult,
nas_folder: Path,
episode: Optional[int],
force: bool = False,
):
"""토렌트 추가 처리."""
if not result.torrents:
result.errors.append("매칭되는 토렌트가 없습니다.")
return
# 에피소드 필터링
candidates = result.torrents
if episode is not None:
candidates = [t for t in candidates if t.episode == episode]
if not candidates:
result.errors.append(f"{episode}화 토렌트를 찾지 못했습니다.")
return
# auto 모드 기본 조건: 자막이 있어야 영상 다운로드 (force면 무시)
if not force and not result.captions and not result.subtitles:
# 자막이 없으면 사용자에게 안내만
result.errors.append("자막이 없어 영상 다운로드를 보류합니다. /anime video로 강제 다운로드 가능")
return
# 최상위 1개 (가장 시더 많은) 추가
best = candidates[0]
try:
success = await self.qbit.add_torrent(
magnet_or_url=best.magnet_link,
save_path=str(nas_folder),
category="anime",
tags=result.anime.subject if result.anime else "",
)
result.torrent_added = success
if not success:
result.errors.append("qBittorrent 토렌트 추가 실패")
except Exception as e:
result.errors.append(f"qBittorrent 오류: {e}")
async def get_status(self) -> list[dict]:
"""현재 다운로드 큐 상태."""
try:
torrents = await self.qbit.list_torrents(category="anime")
return [
{
"name": t.name,
"progress": f"{t.progress * 100:.1f}%",
"state": t.state,
"size": f"{t.size / (1024**3):.2f} GB" if t.size > 0 else "?",
"speed": f"{t.download_speed / (1024**2):.1f} MB/s" if t.download_speed > 0 else "0",
"eta": f"{t.eta // 60}" if t.eta > 0 else "",
"path": t.save_path,
}
for t in torrents
]
except Exception as e:
logger.error(f"qBittorrent 상태 조회 오류: {e}")
return []