Files
variet-agent/tools/qbit_client.py

262 lines
8.6 KiB
Python

"""qBittorrent Web API 클라이언트.
API Docs: https://github.com/qbittorrent/qBittorrent/wiki/WebUI-API-(qBittorrent-4.1)
"""
import httpx
import logging
from dataclasses import dataclass
from typing import Optional
import config
logger = logging.getLogger("variet.tools.qbit")
@dataclass
class TorrentStatus:
"""토렌트 상태."""
name: str
hash: str
progress: float # 0.0 ~ 1.0
state: str # downloading, uploading, pausedDL, ...
size: int # bytes
downloaded: int # bytes
upload_speed: int
download_speed: int
eta: int # seconds, -1 = unknown
save_path: str
category: str
class QBitClient:
"""qBittorrent Web API 클라이언트."""
def __init__(
self,
url: str = None,
username: str = None,
password: str = None,
):
self.url = (url or getattr(config, "QBIT_URL", "http://localhost:8080")).rstrip("/")
self.username = username or getattr(config, "QBIT_USERNAME", "admin")
self.password = password or getattr(config, "QBIT_PASSWORD", "")
self._sid: Optional[str] = None
async def login(self) -> bool:
"""로그인 → SID 쿠키 획득."""
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
f"{self.url}/api/v2/auth/login",
data={"username": self.username, "password": self.password},
)
if resp.text.strip().lower() == "ok.":
self._sid = resp.cookies.get("SID")
logger.info("qBittorrent 로그인 성공")
return True
else:
logger.error(f"qBittorrent 로그인 실패: {resp.text}")
return False
def _cookies(self) -> dict:
return {"SID": self._sid} if self._sid else {}
async def _ensure_login(self):
if not self._sid:
if not await self.login():
raise RuntimeError("qBittorrent 로그인 실패")
async def add_torrent(
self,
magnet_or_url: str,
save_path: str = "",
category: str = "anime",
tags: str = "",
) -> bool:
"""토렌트 추가 (magnet 링크 또는 .torrent URL).
Args:
magnet_or_url: magnet 링크 또는 .torrent URL
save_path: 저장 경로 (미지정 시 qBittorrent 기본)
category: 카테고리
tags: 태그 (쉼표 구분)
"""
await self._ensure_login()
data = {
"urls": magnet_or_url,
"category": category,
}
if save_path:
data["savepath"] = save_path
if tags:
data["tags"] = tags
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(
f"{self.url}/api/v2/torrents/add",
data=data,
cookies=self._cookies(),
)
if resp.text.strip().lower() == "ok.":
logger.info(f"토렌트 추가 성공: {magnet_or_url[:60]}... → {save_path}")
return True
else:
logger.error(f"토렌트 추가 실패: {resp.text}")
return False
async def get_torrent_status(self, info_hash: str) -> Optional[TorrentStatus]:
"""특정 토렌트 상태 조회."""
await self._ensure_login()
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"{self.url}/api/v2/torrents/info",
params={"hashes": info_hash},
cookies=self._cookies(),
)
resp.raise_for_status()
data = resp.json()
if not data:
return None
t = data[0]
return TorrentStatus(
name=t.get("name", ""),
hash=t.get("hash", ""),
progress=t.get("progress", 0),
state=t.get("state", ""),
size=t.get("total_size", 0),
downloaded=t.get("downloaded", 0),
upload_speed=t.get("upspeed", 0),
download_speed=t.get("dlspeed", 0),
eta=t.get("eta", -1),
save_path=t.get("save_path", ""),
category=t.get("category", ""),
)
async def list_torrents(self, category: str = "") -> list[TorrentStatus]:
"""토렌트 목록 조회."""
await self._ensure_login()
params = {}
if category:
params["category"] = category
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"{self.url}/api/v2/torrents/info",
params=params,
cookies=self._cookies(),
)
resp.raise_for_status()
data = resp.json()
return [
TorrentStatus(
name=t.get("name", ""),
hash=t.get("hash", ""),
progress=t.get("progress", 0),
state=t.get("state", ""),
size=t.get("total_size", 0),
downloaded=t.get("downloaded", 0),
upload_speed=t.get("upspeed", 0),
download_speed=t.get("dlspeed", 0),
eta=t.get("eta", -1),
save_path=t.get("save_path", ""),
category=t.get("category", ""),
)
for t in data
]
async def test_connection(self) -> dict:
"""연결 테스트 — 버전 정보 반환."""
try:
await self._ensure_login()
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"{self.url}/api/v2/app/version",
cookies=self._cookies(),
)
version = resp.text.strip()
resp2 = await client.get(
f"{self.url}/api/v2/app/webapiVersion",
cookies=self._cookies(),
)
api_version = resp2.text.strip()
return {
"connected": True,
"version": version,
"api_version": api_version,
"url": self.url,
}
except Exception as e:
return {"connected": False, "error": str(e), "url": self.url}
async def delete_torrent(self, info_hash: str, delete_files: bool = False) -> bool:
"""토렌트 삭제 (완료 후 정리용)."""
await self._ensure_login()
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
f"{self.url}/api/v2/torrents/delete",
data={
"hashes": info_hash,
"deleteFiles": str(delete_files).lower(),
},
cookies=self._cookies(),
)
return resp.status_code == 200
# ── CLI 진입점 ──
if __name__ == "__main__":
import sys
import asyncio
args = sys.argv[1:]
client = QBitClient()
async def main():
if not args or args[0] == "status":
# python tools/qbit_client.py status
torrents = await client.list_torrents(category="anime")
if not torrents:
print("🎬 다운로드 중인 애니 없음")
return
print(f"🎬 다운로드 현황 ({len(torrents)}건):")
for t in torrents:
speed = f"{t.download_speed / (1024**2):.1f}MB/s" if t.download_speed > 0 else "-"
eta = f"{t.eta // 60}" if t.eta > 0 else ""
print(f" {t.progress*100:.0f}% | {t.name[:50]} | {speed} | ETA: {eta}")
elif args[0] == "add" and len(args) > 1:
# python tools/qbit_client.py add "magnet:..." --path "\\NAS\path"
magnet = args[1]
path = ""
for i, a in enumerate(args):
if a == "--path" and i + 1 < len(args):
path = args[i + 1]
ok = await client.add_torrent(magnet, save_path=path)
print(f"{'✅ 추가 성공' if ok else '❌ 추가 실패'}")
elif args[0] == "delete" and len(args) > 1:
# python tools/qbit_client.py delete <hash> [--files]
hash_ = args[1]
delete_files = "--files" in args
ok = await client.delete_torrent(hash_, delete_files=delete_files)
print(f"{'✅ 삭제 성공' if ok else '❌ 삭제 실패'}")
elif args[0] == "test":
info = await client.test_connection()
print(f"연결: {'' if info['connected'] else ''} {info}")
else:
print("사용법: python tools/qbit_client.py [status|add|delete|test] [옵션]")
asyncio.run(main())