Files
variet-agent/tools/subtitle_downloader.py

342 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""자막 파일 다운로더 — 3개 플랫폼 파서.
지원 플랫폼:
1. Google Drive (Blogspot 제작자 대부분)
2. Tistory (Kakao CDN 직접 다운로드)
3. Naver Blog (네이티브 첨부파일)
"""
import httpx
import logging
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from urllib.parse import unquote
logger = logging.getLogger("variet.tools.subtitle")
@dataclass
class SubtitleFile:
"""다운로드된/발견된 자막 파일 정보."""
filename: str
download_url: str
platform: str # google_drive, tistory, naver
episode: Optional[int] = None
local_path: Optional[str] = None # 다운로드 후 로컬 경로
def _extract_episode_from_text(text: str) -> Optional[int]:
"""텍스트에서 화수 추출."""
# "9화", "09화", "9 화"
m = re.search(r'(\d{1,4})\s*화', text)
if m:
return int(m.group(1))
# "- 09"
m = re.search(r'[-]\s*(\d{1,4})(?:\s|$|\.)', text)
if m:
return int(m.group(1))
# "Episode 9", "EP09"
m = re.search(r'(?:EP|Episode)\s*(\d{1,4})', text, re.IGNORECASE)
if m:
return int(m.group(1))
return None
# ──────────────────────────────────────────────
# 1. Google Drive 파서
# ──────────────────────────────────────────────
def parse_google_drive_links(html: str) -> list[SubtitleFile]:
"""HTML에서 Google Drive 다운로드 링크 추출.
지원 패턴:
1. drive.google.com/file/d/{fileId}/view
2. drive.google.com/uc?id={fileId}&export=download (직접 다운로드)
"""
results = []
seen_ids = set()
# 패턴 1: /file/d/{id}/view — HTML <a> 태그
link_pattern = r'<a[^>]*href="(https://drive\.google\.com/file/d/[^"]+)"[^>]*>([^<]*)</a>'
for url, text in re.findall(link_pattern, html):
m = re.search(r'/d/([a-zA-Z0-9_-]+)/', url)
if not m:
continue
file_id = m.group(1)
if file_id in seen_ids:
continue
seen_ids.add(file_id)
results.append(SubtitleFile(
filename=text.strip() or f"subtitle_{file_id}",
download_url=f"https://drive.google.com/uc?id={file_id}&export=download",
platform="google_drive",
episode=_extract_episode_from_text(text),
))
# 패턴 1: bare ID (태그 밖)
for file_id in re.findall(r'drive\.google\.com/file/d/([a-zA-Z0-9_-]+)/', html):
if file_id not in seen_ids:
seen_ids.add(file_id)
results.append(SubtitleFile(
filename=f"subtitle_{file_id}",
download_url=f"https://drive.google.com/uc?id={file_id}&export=download",
platform="google_drive",
))
# 패턴 2: uc?id={id} 직접 다운로드 URL (Blogspot 등)
uc_pattern = r'drive\.google\.com/uc\?[^"\s\)]*id=([a-zA-Z0-9_-]+)[^"\s\)]*'
for file_id in re.findall(uc_pattern, html):
if file_id in seen_ids:
continue
seen_ids.add(file_id)
# 주변 텍스트에서 파일명 추출 시도 (마크다운: [파일명](url))
md_pattern = r'\[([^\]]+)\]\([^)]*' + re.escape(file_id) + r'[^)]*\)'
md_match = re.search(md_pattern, html)
filename = md_match.group(1).strip() if md_match else f"subtitle_{file_id}"
results.append(SubtitleFile(
filename=filename,
download_url=f"https://drive.google.com/uc?id={file_id}&export=download",
platform="google_drive",
episode=_extract_episode_from_text(filename),
))
return results
# ──────────────────────────────────────────────
# 2. Tistory 파서
# ──────────────────────────────────────────────
def parse_tistory_links(html: str) -> list[SubtitleFile]:
"""HTML에서 Tistory/Kakao CDN 다운로드 링크 추출.
패턴: blog.kakaocdn.net/dna/.../filename.zip?...
"""
pattern = r'(https://blog\.kakaocdn\.net/[^"]+\.(zip|ass|srt|ssa|sub)[^"]*)'
matches = re.findall(pattern, html, re.IGNORECASE)
results = []
for url, ext in matches:
# URL에서 파일명 추출
name_match = re.search(r'/([^/?]+\.' + ext + r')', unquote(url))
filename = name_match.group(1) if name_match else f"subtitle.{ext}"
episode = _extract_episode_from_text(filename)
results.append(SubtitleFile(
filename=filename,
download_url=url,
platform="tistory",
episode=episode,
))
return results
# ──────────────────────────────────────────────
# 3. Naver Blog 파서
# ──────────────────────────────────────────────
def parse_naver_links(html: str) -> list[SubtitleFile]:
"""HTML에서 Naver Blog 첨부파일 다운로드 링크 추출.
패턴: download.blog.naver.com/... 또는 blogfiles.pstatic.net/...
"""
results = []
# Naver 파일 다운로드 버튼
# <a class="se-file-save-button" href="https://download.blog.naver.com/..." ...>
file_pattern = r'href="(https://(?:download\.blog\.naver\.com|blogfiles\.pstatic\.net)/[^"]+)"'
matches = re.findall(file_pattern, html)
for url in matches:
# URL에서 파일명 추출
decoded = unquote(url)
name_match = re.search(r'/([^/?]+\.(?:zip|ass|srt|ssa|sub|7z))', decoded, re.IGNORECASE)
filename = name_match.group(1) if name_match else "subtitle_naver"
episode = _extract_episode_from_text(filename)
results.append(SubtitleFile(
filename=filename,
download_url=url,
platform="naver",
episode=episode,
))
return results
# ──────────────────────────────────────────────
# 통합 다운로더
# ──────────────────────────────────────────────
class SubtitleDownloader:
"""자막 파일 검색 및 다운로드."""
def __init__(self, download_dir: str = ""):
self.download_dir = Path(download_dir) if download_dir else Path.cwd() / "subtitles"
async def fetch_page(self, url: str) -> str:
"""웹 페이지 HTML 가져오기."""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Accept-Language": "ko-KR,ko;q=0.9",
}
# Naver Blog iframe bypass
if "blog.naver.com" in url and "PostView" not in url:
# blog.naver.com/{blogId}/{logNo} → PostView URL
m = re.search(r'blog\.naver\.com/([^/]+)/(\d+)', url)
if m:
url = f"https://blog.naver.com/PostView.naver?blogId={m.group(1)}&logNo={m.group(2)}"
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
resp = await client.get(url, headers=headers)
resp.raise_for_status()
return resp.text
async def find_subtitles(self, url: str) -> list[SubtitleFile]:
"""URL에서 자막 파일 링크 자동 탐지."""
html = await self.fetch_page(url)
results = []
# 플랫폼 자동 감지 후 파싱
if "drive.google.com" in html:
results.extend(parse_google_drive_links(html))
if "blog.kakaocdn.net" in html:
results.extend(parse_tistory_links(html))
if "download.blog.naver.com" in html or "blogfiles.pstatic.net" in html:
results.extend(parse_naver_links(html))
# 범용: 직접 자막 파일 링크 탐지 (HTML href + 마크다운)
seen_urls = {r.download_url for r in results}
# HTML <a href="...">
for gurl in re.findall(r'href="([^"]+\.(?:ass|srt|ssa|sub|smi|zip|7z)(?:\?[^"]*)?)"', html, re.IGNORECASE):
if gurl not in seen_urls:
seen_urls.add(gurl)
filename = unquote(gurl.split("/")[-1].split("?")[0])
results.append(SubtitleFile(
filename=filename,
download_url=gurl,
platform="generic",
episode=_extract_episode_from_text(filename),
))
# 마크다운 [텍스트](url) — Blogspot 등
for text, gurl in re.findall(r'\[([^\]]+)\]\((https?://[^)]+\.(?:ass|srt|ssa|sub|smi|zip|7z)[^)]*)\)', html, re.IGNORECASE):
if gurl not in seen_urls:
seen_urls.add(gurl)
results.append(SubtitleFile(
filename=text.strip(),
download_url=gurl,
platform="generic",
episode=_extract_episode_from_text(text),
))
logger.info(f"자막 {len(results)}건 발견: {url}")
return results
async def download_file(
self,
sub: SubtitleFile,
save_dir: Optional[str] = None,
) -> str:
"""자막 파일 다운로드 → 로컬 저장. ZIP이면 자동 해제. 저장 경로 반환."""
target_dir = Path(save_dir) if save_dir else self.download_dir
target_dir.mkdir(parents=True, exist_ok=True)
# 이미 같은 파일명이 존재하면 스킵 (1차 — 원본 파일명 기준)
existing = target_dir / sub.filename
if existing.exists() and existing.stat().st_size > 0:
logger.info(f"자막 이미 존재, 스킵: {existing}")
sub.local_path = str(existing)
return str(existing)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
}
# Naver 리퍼러 헤더
if sub.platform == "naver":
headers["Referer"] = "https://blog.naver.com/"
async with httpx.AsyncClient(
timeout=60, follow_redirects=True, max_redirects=5
) as client:
resp = await client.get(sub.download_url, headers=headers)
resp.raise_for_status()
# Content-Disposition에서 실제 파일명 추출
cd = resp.headers.get("content-disposition", "")
if "filename" in cd:
m = re.search(r'filename[*]?=["\']?(?:UTF-8\'\')?([^"\';\\n]+)', cd)
if m:
sub.filename = unquote(m.group(1).strip())
# 실제 파일명으로 2차 존재 체크 (Content-Disposition 반영 후)
filepath = target_dir / sub.filename
if filepath.exists() and filepath.stat().st_size > 0:
logger.info(f"자막 이미 존재 (실제 파일명), 스킵: {filepath}")
sub.local_path = str(filepath)
return str(filepath)
filepath.write_bytes(resp.content)
# ZIP/7z 자동 해제
extracted = self._extract_archive(filepath, target_dir)
if extracted:
sub.local_path = extracted[0] # 첫 번째 자막 파일
sub.filename = Path(extracted[0]).name
logger.info(f"자막 ZIP 해제 완료: {len(extracted)}건 → {target_dir}")
return extracted[0]
sub.local_path = str(filepath)
logger.info(f"자막 다운로드 완료: {filepath}")
return str(filepath)
@staticmethod
def _extract_archive(filepath: Path, target_dir: Path) -> list[str]:
"""ZIP/7z 파일 해제 → 자막 파일(.ass/.srt/.ssa/.sub) 경로 리스트 반환."""
import zipfile
suffix = filepath.suffix.lower()
if suffix not in (".zip", ".7z"):
return []
extracted = []
if suffix == ".zip":
try:
with zipfile.ZipFile(filepath, "r") as zf:
for name in zf.namelist():
# 디렉토리 건너뛰기
if name.endswith("/"):
continue
ext = Path(name).suffix.lower()
if ext in (".ass", ".srt", ".ssa", ".sub", ".smi"):
# 중첩 폴더 무시, 파일만 추출
out_name = Path(name).name
out_path = target_dir / out_name
# 이미 존재하면 스킵 (중복 다운로드 방지)
if out_path.exists() and out_path.stat().st_size > 0:
logger.info(f"ZIP 내 파일 이미 존재, 스킵: {out_name}")
extracted.append(str(out_path))
continue
with zf.open(name) as src, open(out_path, "wb") as dst:
dst.write(src.read())
extracted.append(str(out_path))
# ZIP 원본 삭제
filepath.unlink(missing_ok=True)
except (zipfile.BadZipFile, Exception) as e:
logger.warning(f"ZIP 해제 실패: {filepath} - {e}")
return extracted