feat(tools): 애니메이션 자동화 파이프라인 구현

- tools/anissia_client.py: Anissia API 클라이언트 (편성표/자막)
- tools/nyaa_client.py: Nyaa.si RSS 토렌트 검색
- tools/qbit_client.py: qBittorrent Web API 클라이언트
- tools/subtitle_downloader.py: Google Drive/Tistory/Naver 자막 파서
- tools/title_matcher.py: 제목 매칭 + NAS 폴더명 생성
- tools/anime_pipeline.py: 전체 파이프라인 오케스트레이터
- tools/nas_scanner.py: NAS 폴더/파일 스캔
- prompts/unified.md: anime 모드 추가 (AI 평문 의도 분류)
- api/discord_bot.py: AI 평문 anime 핸들러 + /anime 슬래시 커맨드
- config.py: qBittorrent/NAS 설정 추가
- .agents/: agent_guide 워크플로우 통합
- docs/devlog: 세션 기록
This commit is contained in:
2026-03-08 16:07:16 +09:00
parent 49ee5f397c
commit c92433b0b1
36 changed files with 3663 additions and 128 deletions

1
tools/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Anime automation tools package.

244
tools/anime_pipeline.py Normal file
View File

@@ -0,0 +1,244 @@
"""애니메이션 자동화 파이프라인.
전체 흐름:
1. Anissia에서 애니 검색 → 자막 정보 확인
2. Nyaa.si에서 토렌트 검색 → 제목 매칭
3. qBittorrent에 magnet 추가 → NAS 경로 지정
4. 자막 다운로드 → 파일명 매칭
"""
import asyncio
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
import config
from tools.anissia_client import AnissiaClient, AnimeInfo, CaptionInfo
from tools.nyaa_client import NyaaClient, TorrentResult
from tools.qbit_client import QBitClient
from tools.subtitle_downloader import SubtitleDownloader, SubtitleFile
from tools.title_matcher import (
match_titles, make_nas_folder_name, rename_subtitle_to_video,
)
logger = logging.getLogger("variet.tools.pipeline")
@dataclass
class DownloadResult:
"""파이프라인 실행 결과."""
success: bool
anime: Optional[AnimeInfo] = None
captions: list[CaptionInfo] = field(default_factory=list)
torrents: list[TorrentResult] = field(default_factory=list)
subtitles: list[SubtitleFile] = field(default_factory=list)
nas_folder: str = ""
torrent_added: bool = False
message: str = ""
errors: list[str] = field(default_factory=list)
class AnimePipeline:
"""애니메이션 다운로드 자동화 파이프라인."""
def __init__(self):
self.anissia = AnissiaClient()
self.nyaa = NyaaClient()
self.qbit = QBitClient()
self.sub_downloader = SubtitleDownloader()
self.nas_base = getattr(config, "NAS_ANIME_PATH",
r"\\192.168.10.10\NasData\Video\Animation")
from tools.nas_scanner import NasScanner
self.nas = NasScanner(self.nas_base)
async def search(self, title: str) -> DownloadResult:
"""애니 검색 — 정보 + 자막 + 토렌트 현황 표시.
실제 다운로드 없이 검색 결과만 반환.
"""
result = DownloadResult(success=False)
# 1. Anissia에서 검색
try:
anime_list = await self.anissia.search_anime(title)
except Exception as e:
result.errors.append(f"Anissia 검색 오류: {e}")
return result
if not anime_list:
result.message = f"'{title}' 검색 결과가 없습니다."
return result
anime = anime_list[0] # 첫 번째 결과 사용
result.anime = anime
# 2. 자막 정보
try:
captions = await self.anissia.get_captions(anime.anime_no)
result.captions = captions
except Exception as e:
result.errors.append(f"자막 조회 오류: {e}")
# 3. Nyaa 토렌트 검색 (원제 로마자로)
try:
from tools.title_matcher import japanese_to_romaji
romaji_title = japanese_to_romaji(anime.original_subject)
# 먼저 로마자로 검색
torrents = await self.nyaa.search(romaji_title)
if not torrents:
# 원제 그대로 검색
torrents = await self.nyaa.search(anime.original_subject)
# 제목 매칭 필터링
matched = match_titles(
anime.subject, anime.original_subject, torrents, threshold=0.3
)
result.torrents = matched[:20] # 상위 20개
except Exception as e:
result.errors.append(f"Nyaa 검색 오류: {e}")
# NAS 폴더명 생성
result.nas_folder = make_nas_folder_name(anime.subject, anime.start_date)
result.success = True
result.message = (
f"**{anime.subject}** ({anime.original_subject})\n"
f"자막 제작자: {len(result.captions)}명 | "
f"토렌트: {len(result.torrents)}\n"
f"NAS 폴더: `{result.nas_folder}`"
)
return result
async def download(
self,
title: str,
mode: str = "auto",
episode: Optional[int] = None,
) -> DownloadResult:
"""애니 다운로드 실행.
Args:
title: 한글 제목
mode: "auto" (자막+영상), "sub_only" (자막만), "video_only" (영상만)
episode: 특정 에피소드만 (None이면 최신)
"""
# 먼저 검색
result = await self.search(title)
if not result.success:
return result
anime = result.anime
nas_folder = Path(self.nas_base) / result.nas_folder
# ── 자막 다운로드 ──
if mode in ("auto", "sub_only"):
await self._download_subtitles(result, nas_folder, episode)
# ── 영상 토렌트 추가 ──
if mode in ("auto", "video_only"):
force = (mode == "video_only")
await self._add_torrents(result, nas_folder, episode, force=force)
# 결과 메시지 구성
parts = [result.message]
if result.subtitles:
parts.append(f"\n📝 자막 {len(result.subtitles)}건 다운로드 완료")
if result.torrent_added:
parts.append(f"\n🎬 토렌트 추가 완료 → `{nas_folder}`")
if result.errors:
parts.append(f"\n⚠️ 오류: " + "; ".join(result.errors))
result.message = "\n".join(parts)
return result
async def _download_subtitles(
self,
result: DownloadResult,
nas_folder: Path,
episode: Optional[int],
):
"""자막 다운로드 처리."""
sub_dir = nas_folder / "subtitles"
for caption in result.captions:
if not caption.website:
continue
if episode is not None and caption.episode != str(episode):
continue
try:
subs = await self.sub_downloader.find_subtitles(caption.website)
for sub in subs:
if episode is not None and sub.episode is not None and sub.episode != episode:
continue
try:
await self.sub_downloader.download_file(sub, str(sub_dir))
result.subtitles.append(sub)
except Exception as e:
result.errors.append(f"자막 다운로드 실패 ({sub.filename}): {e}")
except Exception as e:
result.errors.append(f"자막 사이트 접근 실패 ({caption.name}): {e}")
async def _add_torrents(
self,
result: DownloadResult,
nas_folder: Path,
episode: Optional[int],
force: bool = False,
):
"""토렌트 추가 처리."""
if not result.torrents:
result.errors.append("매칭되는 토렌트가 없습니다.")
return
# 에피소드 필터링
candidates = result.torrents
if episode is not None:
candidates = [t for t in candidates if t.episode == episode]
if not candidates:
result.errors.append(f"{episode}화 토렌트를 찾지 못했습니다.")
return
# auto 모드 기본 조건: 자막이 있어야 영상 다운로드 (force면 무시)
if not force and not result.captions and not result.subtitles:
# 자막이 없으면 사용자에게 안내만
result.errors.append("자막이 없어 영상 다운로드를 보류합니다. /anime video로 강제 다운로드 가능")
return
# 최상위 1개 (가장 시더 많은) 추가
best = candidates[0]
try:
success = await self.qbit.add_torrent(
magnet_or_url=best.magnet_link,
save_path=str(nas_folder),
category="anime",
tags=result.anime.subject if result.anime else "",
)
result.torrent_added = success
if not success:
result.errors.append("qBittorrent 토렌트 추가 실패")
except Exception as e:
result.errors.append(f"qBittorrent 오류: {e}")
async def get_status(self) -> list[dict]:
"""현재 다운로드 큐 상태."""
try:
torrents = await self.qbit.list_torrents(category="anime")
return [
{
"name": t.name,
"progress": f"{t.progress * 100:.1f}%",
"state": t.state,
"size": f"{t.size / (1024**3):.2f} GB" if t.size > 0 else "?",
"speed": f"{t.download_speed / (1024**2):.1f} MB/s" if t.download_speed > 0 else "0",
"eta": f"{t.eta // 60}" if t.eta > 0 else "",
"path": t.save_path,
}
for t in torrents
]
except Exception as e:
logger.error(f"qBittorrent 상태 조회 오류: {e}")
return []

120
tools/anissia_client.py Normal file
View File

@@ -0,0 +1,120 @@
"""Anissia API 클라이언트 — 애니 편성표 + 자막 정보 조회.
API Base: https://api.anissia.net
"""
import httpx
import logging
from dataclasses import dataclass, field
from typing import Optional
logger = logging.getLogger("variet.tools.anissia")
BASE_URL = "https://api.anissia.net"
WEEK_NAMES = {
0: "", 1: "", 2: "", 3: "",
4: "", 5: "", 6: "", 7: "기타",
}
@dataclass
class CaptionInfo:
"""자막 제작 정보."""
episode: str
name: str # 제작자 이름
website: str # 제작자 사이트 URL
updated: str # 업데이트 시각
@dataclass
class AnimeInfo:
"""애니메이션 정보."""
anime_no: int
subject: str # 한글 제목
original_subject: str # 원어 제목 (일어)
genres: str
week: int
time: str
status: str # ON / OFF
caption_count: int
start_date: str
end_date: str
website: str
twitter: str
class AnissiaClient:
"""Anissia REST API 클라이언트."""
def __init__(self, timeout: float = 15.0):
self._timeout = timeout
async def get_schedule(self, week: int) -> list[AnimeInfo]:
"""요일별 편성표 조회 (week: 0=일 ~ 6=토, 7=기타)."""
async with httpx.AsyncClient(timeout=self._timeout) as client:
resp = await client.get(f"{BASE_URL}/anime/schedule/{week}")
resp.raise_for_status()
data = resp.json()
if data.get("code") != "ok":
raise RuntimeError(f"Anissia API 오류: {data}")
return [
AnimeInfo(
anime_no=item["animeNo"],
subject=item["subject"],
original_subject=item.get("originalSubject", ""),
genres=item.get("genres", ""),
week=item.get("week", week) if isinstance(item.get("week"), int) else int(item.get("week", week)),
time=item.get("time", ""),
status=item.get("status", ""),
caption_count=item.get("captionCount", 0),
start_date=item.get("startDate", ""),
end_date=item.get("endDate", ""),
website=item.get("website", ""),
twitter=item.get("twitter", ""),
)
for item in data["data"]
]
async def get_all_schedule(self) -> list[AnimeInfo]:
"""전체 요일 편성표 조회 (0~7)."""
all_anime = []
for week in range(8):
try:
schedule = await self.get_schedule(week)
all_anime.extend(schedule)
except Exception as e:
logger.warning(f"편성표 조회 실패 (week={week}): {e}")
return all_anime
async def get_captions(self, anime_no: int) -> list[CaptionInfo]:
"""특정 애니 자막 목록 조회."""
async with httpx.AsyncClient(timeout=self._timeout) as client:
resp = await client.get(f"{BASE_URL}/anime/caption/animeNo/{anime_no}")
resp.raise_for_status()
data = resp.json()
if data.get("code") != "ok":
raise RuntimeError(f"Anissia caption API 오류: {data}")
return [
CaptionInfo(
episode=item.get("episode", ""),
name=item.get("name", ""),
website=item.get("website", ""),
updated=item.get("updDt", ""),
)
for item in data["data"]
]
async def search_anime(self, keyword: str) -> list[AnimeInfo]:
"""키워드로 전체 편성표에서 검색 (한글/일어 제목 매칭)."""
all_anime = await self.get_all_schedule()
keyword_lower = keyword.lower()
return [
a for a in all_anime
if keyword_lower in a.subject.lower()
or keyword_lower in a.original_subject.lower()
]

152
tools/nas_scanner.py Normal file
View File

@@ -0,0 +1,152 @@
"""NAS 폴더 스캐너 — 다운로드된 애니 목록 + 파일 정보 조회.
NAS Animation 폴더 구조:
\\192.168.10.10\NasData\Video\Animation\
[26_1분기]장송의프리렌2기\
[ASW] Sousou no Frieren S2 - 07.mkv
subtitles\...
[25_4분기]그노시아\
...
"""
import logging
import os
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
import config
logger = logging.getLogger("variet.tools.nas")
@dataclass
class AnimeFolder:
"""NAS에 있는 애니 폴더 정보."""
folder_name: str # [26_1분기]장송의프리렌2기
full_path: str
title: str # 장송의프리렌2기
year: int # 26
quarter: int # 1
video_count: int = 0
subtitle_count: int = 0
total_size_gb: float = 0.0
video_files: list[str] = field(default_factory=list)
subtitle_files: list[str] = field(default_factory=list)
VIDEO_EXTS = {".mkv", ".mp4", ".avi", ".webm", ".m4v"}
SUB_EXTS = {".ass", ".srt", ".ssa", ".sub", ".smi"}
def _parse_folder_name(name: str) -> tuple[int, int, str]:
"""폴더명에서 연도, 분기, 제목 추출.
[26_1분기]장송의프리렌2기 → (26, 1, '장송의프리렌2기')
"""
m = re.match(r'\[(\d{2})_(\d)분기\](.+)', name)
if m:
return int(m.group(1)), int(m.group(2)), m.group(3)
return 0, 0, name
class NasScanner:
"""NAS Animation 폴더 스캐너."""
def __init__(self, base_path: str = ""):
self.base_path = Path(
base_path or getattr(config, "NAS_ANIME_PATH",
r"\\192.168.10.10\NasData\Video\Animation")
)
def is_accessible(self) -> bool:
"""NAS 접근 가능 여부."""
try:
return self.base_path.exists() and self.base_path.is_dir()
except (OSError, PermissionError):
return False
def list_anime_folders(
self,
year: Optional[int] = None,
quarter: Optional[int] = None,
) -> list[AnimeFolder]:
"""애니 폴더 목록 조회 (분기별 필터 가능)."""
if not self.is_accessible():
logger.error(f"NAS 경로 접근 불가: {self.base_path}")
return []
results = []
try:
for entry in sorted(self.base_path.iterdir()):
if not entry.is_dir():
continue
y, q, title = _parse_folder_name(entry.name)
# 필터링
if year is not None and y != year:
continue
if quarter is not None and q != quarter:
continue
folder = AnimeFolder(
folder_name=entry.name,
full_path=str(entry),
title=title,
year=y,
quarter=q,
)
# 파일 스캔
self._scan_folder(entry, folder)
results.append(folder)
except (OSError, PermissionError) as e:
logger.error(f"NAS 스캔 오류: {e}")
return results
def _scan_folder(self, path: Path, folder: AnimeFolder):
"""폴더 내 영상/자막 파일 집계."""
try:
for item in path.rglob("*"):
if not item.is_file():
continue
ext = item.suffix.lower()
size = item.stat().st_size
if ext in VIDEO_EXTS:
folder.video_count += 1
folder.video_files.append(item.name)
folder.total_size_gb += size / (1024 ** 3)
elif ext in SUB_EXTS:
folder.subtitle_count += 1
folder.subtitle_files.append(item.name)
except (OSError, PermissionError) as e:
logger.warning(f"파일 스캔 오류 ({path}): {e}")
def get_current_quarter_anime(self) -> list[AnimeFolder]:
"""이번 분기 다운로드된 애니 목록."""
from datetime import date
today = date.today()
year = today.year % 100
quarter = (today.month - 1) // 3 + 1
return self.list_anime_folders(year=year, quarter=quarter)
def search(self, keyword: str) -> list[AnimeFolder]:
"""키워드로 NAS 폴더 검색."""
all_folders = self.list_anime_folders()
kw = keyword.lower()
return [f for f in all_folders if kw in f.title.lower() or kw in f.folder_name.lower()]
def get_summary(self, year: Optional[int] = None, quarter: Optional[int] = None) -> dict:
"""요약 통계."""
folders = self.list_anime_folders(year=year, quarter=quarter)
return {
"total_anime": len(folders),
"total_videos": sum(f.video_count for f in folders),
"total_subtitles": sum(f.subtitle_count for f in folders),
"total_size_gb": round(sum(f.total_size_gb for f in folders), 2),
"folders": folders,
}

156
tools/nyaa_client.py Normal file
View File

@@ -0,0 +1,156 @@
"""Nyaa.si RSS 클라이언트 — 토렌트 검색 + Magnet 링크 생성.
RSS Feed: https://nyaa.si/?page=rss&q={query}&c={category}&f={filter}
"""
import httpx
import logging
import re
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from typing import Optional
from urllib.parse import quote
logger = logging.getLogger("variet.tools.nyaa")
RSS_BASE = "https://nyaa.si/"
# Nyaa RSS 네임스페이스
NYAA_NS = "https://nyaa.si/xmlns/nyaa"
@dataclass
class TorrentResult:
"""Nyaa 토렌트 검색 결과."""
title: str
torrent_url: str # .torrent 다운로드 URL
magnet_link: str # magnet:?xt=urn:btih:...
info_hash: str
size: str
seeders: int
leechers: int
downloads: int
category: str
pub_date: str
view_url: str # nyaa.si/view/... 페이지 URL
# 파싱된 정보
episode: Optional[int] = None
group: str = ""
def _parse_episode(title: str) -> Optional[int]:
"""제목에서 에피소드 번호 추출.
예: [ASW] Sousou no Frieren S2 - 07 [1080p ...] → 7
"""
# 패턴 1: "- 07" 또는 "- 07v2"
m = re.search(r'\s-\s(\d{1,4})(?:v\d)?(?:\s|\[|$)', title)
if m:
return int(m.group(1))
# 패턴 2: "S02E07"
m = re.search(r'[Ss]\d{1,2}[Ee](\d{1,4})', title)
if m:
return int(m.group(1))
# 패턴 3: "Episode 07"
m = re.search(r'[Ee]pisode\s*(\d{1,4})', title)
if m:
return int(m.group(1))
return None
def _parse_group(title: str) -> str:
"""제목에서 릴리스 그룹명 추출. 예: [ASW] → ASW"""
m = re.match(r'\[([^\]]+)\]', title)
return m.group(1) if m else ""
class NyaaClient:
"""Nyaa.si RSS 기반 토렌트 검색 클라이언트."""
def __init__(self, timeout: float = 15.0, default_suffix: str = "ASW HEVC"):
self._timeout = timeout
self.default_suffix = default_suffix
async def search(
self,
query: str,
category: str = "0_0",
filter_: int = 0,
use_default_suffix: bool = True,
) -> list[TorrentResult]:
"""RSS 기반 토렌트 검색.
Args:
query: 검색어
category: Nyaa 카테고리 (0_0=전체, 1_2=Anime English)
filter_: 필터 (0=없음, 2=trusted only)
use_default_suffix: True면 검색어에 default_suffix 자동 추가
"""
if use_default_suffix and self.default_suffix:
full_query = f"{query} {self.default_suffix}"
else:
full_query = query
url = f"{RSS_BASE}?page=rss&q={quote(full_query)}&c={category}&f={filter_}"
logger.info(f"Nyaa RSS 검색: {full_query}")
async with httpx.AsyncClient(timeout=self._timeout) as client:
resp = await client.get(url)
resp.raise_for_status()
return self._parse_rss(resp.text)
def _parse_rss(self, xml_text: str) -> list[TorrentResult]:
"""RSS XML 파싱."""
root = ET.fromstring(xml_text)
results = []
for item in root.findall(".//item"):
title = item.findtext("title", "")
link = item.findtext("link", "")
guid = item.findtext("guid", "")
pub_date = item.findtext("pubDate", "")
info_hash = item.findtext(f"{{{NYAA_NS}}}infoHash", "")
seeders = int(item.findtext(f"{{{NYAA_NS}}}seeders", "0"))
leechers = int(item.findtext(f"{{{NYAA_NS}}}leechers", "0"))
downloads = int(item.findtext(f"{{{NYAA_NS}}}downloads", "0"))
size = item.findtext(f"{{{NYAA_NS}}}size", "")
category = item.findtext(f"{{{NYAA_NS}}}category", "")
# Magnet 링크 생성
magnet = f"magnet:?xt=urn:btih:{info_hash}" if info_hash else ""
results.append(TorrentResult(
title=title,
torrent_url=link,
magnet_link=magnet,
info_hash=info_hash,
size=size,
seeders=seeders,
leechers=leechers,
downloads=downloads,
category=category,
pub_date=pub_date,
view_url=guid,
episode=_parse_episode(title),
group=_parse_group(title),
))
logger.info(f"Nyaa 검색 결과: {len(results)}")
return results
async def search_anime(
self,
title: str,
episode: Optional[int] = None,
) -> list[TorrentResult]:
"""애니 제목으로 검색. 에피소드 지정 시 필터링."""
results = await self.search(title)
if episode is not None:
results = [r for r in results if r.episode == episode]
# 시더 수 내림차순 정렬
results.sort(key=lambda r: r.seeders, reverse=True)
return results

198
tools/qbit_client.py Normal file
View File

@@ -0,0 +1,198 @@
"""qBittorrent Web API 클라이언트.
API Docs: https://github.com/qbittorrent/qBittorrent/wiki/WebUI-API-(qBittorrent-4.1)
"""
import httpx
import logging
from dataclasses import dataclass
from typing import Optional
import config
logger = logging.getLogger("variet.tools.qbit")
@dataclass
class TorrentStatus:
"""토렌트 상태."""
name: str
hash: str
progress: float # 0.0 ~ 1.0
state: str # downloading, uploading, pausedDL, ...
size: int # bytes
downloaded: int # bytes
upload_speed: int
download_speed: int
eta: int # seconds, -1 = unknown
save_path: str
category: str
class QBitClient:
"""qBittorrent Web API 클라이언트."""
def __init__(
self,
url: str = None,
username: str = None,
password: str = None,
):
self.url = (url or getattr(config, "QBIT_URL", "http://localhost:8080")).rstrip("/")
self.username = username or getattr(config, "QBIT_USERNAME", "admin")
self.password = password or getattr(config, "QBIT_PASSWORD", "")
self._sid: Optional[str] = None
async def login(self) -> bool:
"""로그인 → SID 쿠키 획득."""
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
f"{self.url}/api/v2/auth/login",
data={"username": self.username, "password": self.password},
)
if resp.text.strip().lower() == "ok.":
self._sid = resp.cookies.get("SID")
logger.info("qBittorrent 로그인 성공")
return True
else:
logger.error(f"qBittorrent 로그인 실패: {resp.text}")
return False
def _cookies(self) -> dict:
return {"SID": self._sid} if self._sid else {}
async def _ensure_login(self):
if not self._sid:
if not await self.login():
raise RuntimeError("qBittorrent 로그인 실패")
async def add_torrent(
self,
magnet_or_url: str,
save_path: str = "",
category: str = "anime",
tags: str = "",
) -> bool:
"""토렌트 추가 (magnet 링크 또는 .torrent URL).
Args:
magnet_or_url: magnet 링크 또는 .torrent URL
save_path: 저장 경로 (미지정 시 qBittorrent 기본)
category: 카테고리
tags: 태그 (쉼표 구분)
"""
await self._ensure_login()
data = {
"urls": magnet_or_url,
"category": category,
}
if save_path:
data["savepath"] = save_path
if tags:
data["tags"] = tags
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(
f"{self.url}/api/v2/torrents/add",
data=data,
cookies=self._cookies(),
)
if resp.text.strip().lower() == "ok.":
logger.info(f"토렌트 추가 성공: {magnet_or_url[:60]}... → {save_path}")
return True
else:
logger.error(f"토렌트 추가 실패: {resp.text}")
return False
async def get_torrent_status(self, info_hash: str) -> Optional[TorrentStatus]:
"""특정 토렌트 상태 조회."""
await self._ensure_login()
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"{self.url}/api/v2/torrents/info",
params={"hashes": info_hash},
cookies=self._cookies(),
)
resp.raise_for_status()
data = resp.json()
if not data:
return None
t = data[0]
return TorrentStatus(
name=t.get("name", ""),
hash=t.get("hash", ""),
progress=t.get("progress", 0),
state=t.get("state", ""),
size=t.get("total_size", 0),
downloaded=t.get("downloaded", 0),
upload_speed=t.get("upspeed", 0),
download_speed=t.get("dlspeed", 0),
eta=t.get("eta", -1),
save_path=t.get("save_path", ""),
category=t.get("category", ""),
)
async def list_torrents(self, category: str = "") -> list[TorrentStatus]:
"""토렌트 목록 조회."""
await self._ensure_login()
params = {}
if category:
params["category"] = category
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"{self.url}/api/v2/torrents/info",
params=params,
cookies=self._cookies(),
)
resp.raise_for_status()
data = resp.json()
return [
TorrentStatus(
name=t.get("name", ""),
hash=t.get("hash", ""),
progress=t.get("progress", 0),
state=t.get("state", ""),
size=t.get("total_size", 0),
downloaded=t.get("downloaded", 0),
upload_speed=t.get("upspeed", 0),
download_speed=t.get("dlspeed", 0),
eta=t.get("eta", -1),
save_path=t.get("save_path", ""),
category=t.get("category", ""),
)
for t in data
]
async def test_connection(self) -> dict:
"""연결 테스트 — 버전 정보 반환."""
try:
await self._ensure_login()
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"{self.url}/api/v2/app/version",
cookies=self._cookies(),
)
version = resp.text.strip()
resp2 = await client.get(
f"{self.url}/api/v2/app/webapiVersion",
cookies=self._cookies(),
)
api_version = resp2.text.strip()
return {
"connected": True,
"version": version,
"api_version": api_version,
"url": self.url,
}
except Exception as e:
return {"connected": False, "error": str(e), "url": self.url}

View File

@@ -0,0 +1,260 @@
"""자막 파일 다운로더 — 3개 플랫폼 파서.
지원 플랫폼:
1. Google Drive (Blogspot 제작자 대부분)
2. Tistory (Kakao CDN 직접 다운로드)
3. Naver Blog (네이티브 첨부파일)
"""
import httpx
import logging
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from urllib.parse import unquote
logger = logging.getLogger("variet.tools.subtitle")
@dataclass
class SubtitleFile:
"""다운로드된/발견된 자막 파일 정보."""
filename: str
download_url: str
platform: str # google_drive, tistory, naver
episode: Optional[int] = None
local_path: Optional[str] = None # 다운로드 후 로컬 경로
def _extract_episode_from_text(text: str) -> Optional[int]:
"""텍스트에서 화수 추출."""
# "9화", "09화", "9 화"
m = re.search(r'(\d{1,4})\s*화', text)
if m:
return int(m.group(1))
# "- 09"
m = re.search(r'[-]\s*(\d{1,4})(?:\s|$|\.)', text)
if m:
return int(m.group(1))
# "Episode 9", "EP09"
m = re.search(r'(?:EP|Episode)\s*(\d{1,4})', text, re.IGNORECASE)
if m:
return int(m.group(1))
return None
# ──────────────────────────────────────────────
# 1. Google Drive 파서
# ──────────────────────────────────────────────
def parse_google_drive_links(html: str) -> list[SubtitleFile]:
"""HTML에서 Google Drive 다운로드 링크 추출.
패턴: drive.google.com/file/d/{fileId}/view
→ 직접 다운로드: drive.google.com/uc?id={fileId}&export=download
"""
pattern = r'https://drive\.google\.com/file/d/([a-zA-Z0-9_-]+)/view[^"]*'
matches = re.findall(pattern, html)
# 링크 주변 텍스트에서 에피소드 정보 추출
link_pattern = r'<a[^>]*href="(https://drive\.google\.com/file/d/[^"]+)"[^>]*>([^<]*)</a>'
link_matches = re.findall(link_pattern, html)
results = []
seen_ids = set()
for url, text in link_matches:
m = re.search(r'/d/([a-zA-Z0-9_-]+)/', url)
if not m:
continue
file_id = m.group(1)
if file_id in seen_ids:
continue
seen_ids.add(file_id)
episode = _extract_episode_from_text(text)
download_url = f"https://drive.google.com/uc?id={file_id}&export=download"
results.append(SubtitleFile(
filename=text.strip() or f"subtitle_{file_id}",
download_url=download_url,
platform="google_drive",
episode=episode,
))
# 매칭되지 않은 bare ID도 추가
for file_id in matches:
if file_id not in seen_ids:
seen_ids.add(file_id)
results.append(SubtitleFile(
filename=f"subtitle_{file_id}",
download_url=f"https://drive.google.com/uc?id={file_id}&export=download",
platform="google_drive",
))
return results
# ──────────────────────────────────────────────
# 2. Tistory 파서
# ──────────────────────────────────────────────
def parse_tistory_links(html: str) -> list[SubtitleFile]:
"""HTML에서 Tistory/Kakao CDN 다운로드 링크 추출.
패턴: blog.kakaocdn.net/dna/.../filename.zip?...
"""
pattern = r'(https://blog\.kakaocdn\.net/[^"]+\.(zip|ass|srt|ssa|sub)[^"]*)'
matches = re.findall(pattern, html, re.IGNORECASE)
results = []
for url, ext in matches:
# URL에서 파일명 추출
name_match = re.search(r'/([^/?]+\.' + ext + r')', unquote(url))
filename = name_match.group(1) if name_match else f"subtitle.{ext}"
episode = _extract_episode_from_text(filename)
results.append(SubtitleFile(
filename=filename,
download_url=url,
platform="tistory",
episode=episode,
))
return results
# ──────────────────────────────────────────────
# 3. Naver Blog 파서
# ──────────────────────────────────────────────
def parse_naver_links(html: str) -> list[SubtitleFile]:
"""HTML에서 Naver Blog 첨부파일 다운로드 링크 추출.
패턴: download.blog.naver.com/... 또는 blogfiles.pstatic.net/...
"""
results = []
# Naver 파일 다운로드 버튼
# <a class="se-file-save-button" href="https://download.blog.naver.com/..." ...>
file_pattern = r'href="(https://(?:download\.blog\.naver\.com|blogfiles\.pstatic\.net)/[^"]+)"'
matches = re.findall(file_pattern, html)
for url in matches:
# URL에서 파일명 추출
decoded = unquote(url)
name_match = re.search(r'/([^/?]+\.(?:zip|ass|srt|ssa|sub|7z))', decoded, re.IGNORECASE)
filename = name_match.group(1) if name_match else "subtitle_naver"
episode = _extract_episode_from_text(filename)
results.append(SubtitleFile(
filename=filename,
download_url=url,
platform="naver",
episode=episode,
))
return results
# ──────────────────────────────────────────────
# 통합 다운로더
# ──────────────────────────────────────────────
class SubtitleDownloader:
"""자막 파일 검색 및 다운로드."""
def __init__(self, download_dir: str = ""):
self.download_dir = Path(download_dir) if download_dir else Path.cwd() / "subtitles"
async def fetch_page(self, url: str) -> str:
"""웹 페이지 HTML 가져오기."""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Accept-Language": "ko-KR,ko;q=0.9",
}
# Naver Blog iframe bypass
if "blog.naver.com" in url and "PostView" not in url:
# blog.naver.com/{blogId}/{logNo} → PostView URL
m = re.search(r'blog\.naver\.com/([^/]+)/(\d+)', url)
if m:
url = f"https://blog.naver.com/PostView.naver?blogId={m.group(1)}&logNo={m.group(2)}"
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
resp = await client.get(url, headers=headers)
resp.raise_for_status()
return resp.text
async def find_subtitles(self, url: str) -> list[SubtitleFile]:
"""URL에서 자막 파일 링크 자동 탐지."""
html = await self.fetch_page(url)
results = []
# 플랫폼 자동 감지 후 파싱
if "drive.google.com" in html:
results.extend(parse_google_drive_links(html))
if "blog.kakaocdn.net" in html:
results.extend(parse_tistory_links(html))
if "download.blog.naver.com" in html or "blogfiles.pstatic.net" in html:
results.extend(parse_naver_links(html))
# 범용: 직접 자막 파일 링크 탐지
generic_pattern = r'href="([^"]+\.(?:ass|srt|ssa|sub|zip|7z))"'
generic = re.findall(generic_pattern, html, re.IGNORECASE)
seen_urls = {r.download_url for r in results}
for gurl in generic:
if gurl not in seen_urls:
filename = gurl.split("/")[-1].split("?")[0]
results.append(SubtitleFile(
filename=unquote(filename),
download_url=gurl,
platform="generic",
episode=_extract_episode_from_text(filename),
))
logger.info(f"자막 {len(results)}건 발견: {url}")
return results
async def download_file(
self,
sub: SubtitleFile,
save_dir: Optional[str] = None,
) -> str:
"""자막 파일 다운로드 → 로컬 저장. 저장 경로 반환."""
target_dir = Path(save_dir) if save_dir else self.download_dir
target_dir.mkdir(parents=True, exist_ok=True)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
}
# Naver 리퍼러 헤더
if sub.platform == "naver":
headers["Referer"] = "https://blog.naver.com/"
async with httpx.AsyncClient(
timeout=60, follow_redirects=True, max_redirects=5
) as client:
resp = await client.get(sub.download_url, headers=headers)
resp.raise_for_status()
# Content-Disposition에서 실제 파일명 추출
cd = resp.headers.get("content-disposition", "")
if "filename" in cd:
m = re.search(r'filename[*]?=["\']?(?:UTF-8\'\')?([^"\';\n]+)', cd)
if m:
sub.filename = unquote(m.group(1).strip())
filepath = target_dir / sub.filename
filepath.write_bytes(resp.content)
sub.local_path = str(filepath)
logger.info(f"자막 다운로드 완료: {filepath}")
return str(filepath)

212
tools/title_matcher.py Normal file
View File

@@ -0,0 +1,212 @@
"""제목 매칭 + NAS 폴더명 생성.
Anissia(한글) ↔ Nyaa(영어/로마자) 제목을 매칭하고,
NAS 저장 폴더명을 [yy_q분기]제목 형식으로 생성합니다.
"""
import re
import logging
import unicodedata
from difflib import SequenceMatcher
from typing import Optional
logger = logging.getLogger("variet.tools.matcher")
# ──────────────────────────────────────────────
# 일어 → 로마자 변환 테이블 (히라가나/카타카나)
# ──────────────────────────────────────────────
_KANA_ROMAJI = {
# 히라가나
'': 'a', '': 'i', '': 'u', '': 'e', '': 'o',
'': 'ka', '': 'ki', '': 'ku', '': 'ke', '': 'ko',
'': 'sa', '': 'shi', '': 'su', '': 'se', '': 'so',
'': 'ta', '': 'chi', '': 'tsu', '': 'te', '': 'to',
'': 'na', '': 'ni', '': 'nu', '': 'ne', '': 'no',
'': 'ha', '': 'hi', '': 'fu', '': 'he', '': 'ho',
'': 'ma', '': 'mi', '': 'mu', '': 'me', '': 'mo',
'': 'ya', '': 'yu', '': 'yo',
'': 'ra', '': 'ri', '': 'ru', '': 're', '': 'ro',
'': 'wa', '': 'wo', '': 'n',
'': 'ga', '': 'gi', '': 'gu', '': 'ge', '': 'go',
'': 'za', '': 'ji', '': 'zu', '': 'ze', '': 'zo',
'': 'da', '': 'di', '': 'du', '': 'de', '': 'do',
'': 'ba', '': 'bi', '': 'bu', '': 'be', '': 'bo',
'': 'pa', '': 'pi', '': 'pu', '': 'pe', '': 'po',
'きゃ': 'kya', 'きゅ': 'kyu', 'きょ': 'kyo',
'しゃ': 'sha', 'しゅ': 'shu', 'しょ': 'sho',
'ちゃ': 'cha', 'ちゅ': 'chu', 'ちょ': 'cho',
'にゃ': 'nya', 'にゅ': 'nyu', 'にょ': 'nyo',
'ひゃ': 'hya', 'ひゅ': 'hyu', 'ひょ': 'hyo',
'みゃ': 'mya', 'みゅ': 'myu', 'みょ': 'myo',
'りゃ': 'rya', 'りゅ': 'ryu', 'りょ': 'ryo',
'ぎゃ': 'gya', 'ぎゅ': 'gyu', 'ぎょ': 'gyo',
'じゃ': 'ja', 'じゅ': 'ju', 'じょ': 'jo',
'びゃ': 'bya', 'びゅ': 'byu', 'びょ': 'byo',
'ぴゃ': 'pya', 'ぴゅ': 'pyu', 'ぴょ': 'pyo',
'': '', # 촉음 (다음 자음 반복)
}
# 카타카나 → 히라가나 변환 오프셋
_KATA_OFFSET = ord('') - ord('')
def _kata_to_hira(text: str) -> str:
"""카타카나를 히라가나로 변환."""
result = []
for ch in text:
cp = ord(ch)
if 0x30A0 <= cp <= 0x30FF: # 카타카나 범위
result.append(chr(cp - _KATA_OFFSET))
else:
result.append(ch)
return "".join(result)
def japanese_to_romaji(text: str) -> str:
"""일본어 텍스트를 로마자로 근사 변환."""
text = _kata_to_hira(text)
result = []
i = 0
while i < len(text):
# 2글자 매칭 우선 (きゃ 등)
if i + 1 < len(text) and text[i:i+2] in _KANA_ROMAJI:
result.append(_KANA_ROMAJI[text[i:i+2]])
i += 2
elif text[i] in _KANA_ROMAJI:
romaji = _KANA_ROMAJI[text[i]]
# 촉음(っ) 처리: 다음 자음 반복
if text[i] == '' and i + 1 < len(text):
next_romaji = _KANA_ROMAJI.get(text[i+1], "")
if next_romaji:
result.append(next_romaji[0])
else:
result.append(romaji)
i += 1
elif text[i] == '': # 장음
i += 1
else:
# 한자, 영어, 숫자 등 → 그대로
result.append(text[i])
i += 1
return "".join(result)
def normalize_title(title: str) -> str:
"""제목 비교용 정규화: 소문자 + 특수문자 제거 + 공백 정리."""
title = title.lower().strip()
# 기수 표기 정규화: 2nd → 2, S2 → 2
title = re.sub(r'\b(\d+)(?:st|nd|rd|th)\b', r'\1', title)
title = re.sub(r'\bs(\d+)\b', r'\1', title)
title = re.sub(r'season\s*(\d+)', r'\1', title)
title = re.sub(r'(\d+)\s*기', r'\1', title)
# 특수문자 제거
title = re.sub(r'[^\w\s]', ' ', title)
title = re.sub(r'\s+', ' ', title).strip()
return title
def title_similarity(title_a: str, title_b: str) -> float:
"""두 제목 간 유사도 (0.0 ~ 1.0)."""
a = normalize_title(title_a)
b = normalize_title(title_b)
return SequenceMatcher(None, a, b).ratio()
def match_titles(
korean_title: str,
original_title: str,
nyaa_results: list,
threshold: float = 0.4,
) -> list:
"""Anissia 제목과 Nyaa 검색 결과 매칭.
Args:
korean_title: 한글 제목 (Anissia subject)
original_title: 원어 제목 (Anissia originalSubject)
nyaa_results: TorrentResult 리스트
threshold: 최소 유사도
Returns:
매칭된 TorrentResult 리스트 (유사도 내림차순)
"""
# 원제의 로마자 변환
romaji = japanese_to_romaji(original_title)
scored = []
for result in nyaa_results:
# Nyaa 제목에서 그룹태그 제거: [ASW] Title - 07 [...] → Title
clean_title = re.sub(r'\[[^\]]*\]', '', result.title).strip()
clean_title = re.sub(r'\s*-\s*\d+.*$', '', clean_title).strip()
# 유사도 계산 (로마자 vs Nyaa 제목)
sim_romaji = title_similarity(romaji, clean_title)
# 한글 vs Nyaa (일부 자막 포함 릴리스인 경우)
sim_korean = title_similarity(korean_title, clean_title)
# 원제 그대로 vs Nyaa
sim_original = title_similarity(original_title, clean_title)
best_sim = max(sim_romaji, sim_korean, sim_original)
if best_sim >= threshold:
scored.append((best_sim, result))
# 유사도 내림차순 정렬
scored.sort(key=lambda x: x[0], reverse=True)
return [r for _, r in scored]
# ──────────────────────────────────────────────
# NAS 폴더명 생성
# ──────────────────────────────────────────────
def get_quarter(date_str: str) -> tuple[int, int]:
"""날짜 문자열에서 연도와 분기 추출.
Args:
date_str: "2026-01-11" 형식
Returns:
(year, quarter): (26, 1)
"""
if not date_str:
from datetime import date
today = date.today()
year = today.year % 100
quarter = (today.month - 1) // 3 + 1
return year, quarter
parts = date_str.split("-")
year = int(parts[0]) % 100
month = int(parts[1])
quarter = (month - 1) // 3 + 1
return year, quarter
def make_nas_folder_name(title: str, start_date: str = "") -> str:
"""NAS 저장 폴더명 생성.
예: [26_1분기]장송의프리렌2기
"""
year, quarter = get_quarter(start_date)
# 제목에서 폴더명에 쓸 수 없는 문자 제거
safe_title = re.sub(r'[<>:"/\\|?*]', '', title)
safe_title = safe_title.strip()
return f"[{year:02d}_{quarter}분기]{safe_title}"
def rename_subtitle_to_video(
video_filename: str,
subtitle_ext: str = ".ass",
) -> str:
"""영상 파일명에 맞게 자막 파일명 생성.
예: [ASW] Sousou no Frieren S2 - 07.mkv → [ASW] Sousou no Frieren S2 - 07.ass
"""
stem = re.sub(r'\.[^.]+$', '', video_filename)
return f"{stem}{subtitle_ext}"