Files
gravity_control/gateway.py
Variet Worker 95da3e9307 feat(gateway): API Key 인증 + HTTPS (Caddy) 보안 강화
- gateway.py: auth middleware — /api/* 엔드포인트에 Bearer token 필수
- Caddyfile: Let's Encrypt 자동 HTTPS 리버스 프록시
- docker-compose.yml: Caddy 추가, Gateway 포트 내부 전용
- config.py: GATEWAY_API_KEY 설정 추가
- .env: 키 생성 명령어 가이드 포함
2026-03-11 19:49:24 +09:00

197 lines
8.4 KiB
Python

"""Gateway HTTP API — receives data from remote Collectors and routes to Discord bot.
Runs alongside the Discord bot in the server Docker container.
Collectors (local PCs) push pending approvals, chat snapshots, and registrations
to this API, and poll for responses.
Endpoints:
POST /api/pending — Collector pushes a new approval request
GET /api/pending — List all pending requests (for diagnostics)
POST /api/response/{rid} — Collector polls for response (or Gateway pushes)
GET /api/response/{rid} — Get response for a specific request
POST /api/chat — Collector pushes a chat snapshot
POST /api/register — Collector registers session → project mapping
POST /api/command — Gateway pushes command to specific collector
GET /api/commands/{project} — Collector polls for commands
GET /health — Health check
"""
import asyncio
import json
import time
import logging
from aiohttp import web
logger = logging.getLogger(__name__)
class GatewayAPI:
"""HTTP API server for Collector ↔ Gateway communication."""
def __init__(self, bot, host: str = "0.0.0.0", port: int = 8585, api_key: str = ""):
self.bot = bot
self.host = host
self.port = port
self.api_key = api_key
self.app = web.Application(middlewares=[self._auth_middleware])
self._setup_routes()
# In-memory stores (Gateway is stateless across restarts)
self._commands: dict[str, list[dict]] = {} # project → [command dicts]
def _setup_routes(self):
self.app.router.add_get("/health", self._health)
self.app.router.add_post("/api/pending", self._post_pending)
self.app.router.add_get("/api/pending", self._list_pending)
self.app.router.add_get("/api/response/{rid}", self._get_response)
self.app.router.add_post("/api/chat", self._post_chat)
self.app.router.add_post("/api/register", self._post_register)
self.app.router.add_get("/api/commands/{project}", self._get_commands)
# ─── Auth Middleware ───
@web.middleware
async def _auth_middleware(self, request: web.Request, handler):
"""Reject requests without valid API key on /api/* routes."""
# Health endpoint is public
if request.path == "/health":
return await handler(request)
# All /api/* routes require auth
if request.path.startswith("/api/") and self.api_key:
auth = request.headers.get("Authorization", "")
if auth != f"Bearer {self.api_key}":
logger.warning(f"[GATEWAY] 401 Unauthorized: {request.method} {request.path} from {request.remote}")
return web.json_response(
{"error": "Unauthorized", "detail": "Invalid or missing API key"},
status=401,
)
return await handler(request)
# ─── Health ───
async def _health(self, request: web.Request) -> web.Response:
return web.json_response({
"status": "ok",
"bot_ready": self.bot.is_ready() if self.bot else False,
"timestamp": time.time(),
})
# ─── Pending Approvals (Collector → Gateway → Discord) ───
async def _post_pending(self, request: web.Request) -> web.Response:
"""Collector pushes a pending approval request."""
try:
data = await request.json()
rid = data.get("request_id", str(int(time.time() * 1000)))
data["request_id"] = rid
data.setdefault("timestamp", time.time())
data.setdefault("status", "pending")
# Write to bridge pending dir (bot's scanner will pick it up)
self.bot.bridge.transport.write_json("pending", f"{rid}.json", data)
logger.info(f"[GATEWAY] pending received: {rid[:12]} project={data.get('project_name', '?')}")
return web.json_response({"ok": True, "request_id": rid})
except Exception as e:
logger.error(f"[GATEWAY] pending error: {e}")
return web.json_response({"ok": False, "error": str(e)}, status=400)
async def _list_pending(self, request: web.Request) -> web.Response:
"""List all pending requests (diagnostics)."""
requests = self.bot.bridge.get_pending_requests()
return web.json_response([{
"request_id": r.request_id,
"command": r.command[:100],
"project_name": r.project_name,
"status": r.status,
} for r in requests])
# ─── Responses (Discord → Gateway → Collector) ───
async def _get_response(self, request: web.Request) -> web.Response:
"""Collector polls for a response to a specific pending request."""
rid = request.match_info["rid"]
data = self.bot.bridge.transport.read_json("response", f"{rid}.json")
if data is None:
return web.json_response({"waiting": True, "request_id": rid})
# Serve response and delete file (one-time consumption)
self.bot.bridge.transport.delete_file("response", f"{rid}.json")
return web.json_response(data)
# ─── Chat Snapshots (Collector → Gateway → Discord) ───
async def _post_chat(self, request: web.Request) -> web.Response:
"""Collector pushes a chat snapshot for relay to Discord."""
try:
data = await request.json()
project = data.get("project_name", "")
content = data.get("content", "")
if not project or not content:
return web.json_response({"ok": False, "error": "project_name and content required"}, status=400)
# Write to chat_snapshots dir for bot's scanner
snap_dir = self.bot.bridge.transport.bridge_dir / "chat_snapshots" if hasattr(self.bot.bridge.transport, 'bridge_dir') else None
if snap_dir:
snap_dir.mkdir(parents=True, exist_ok=True)
snap_id = f"{int(time.time() * 1000)}"
snap_data = {
"id": snap_id,
"project_name": project,
"content": content,
"timestamp": time.time(),
}
(snap_dir / f"{snap_id}.json").write_text(
json.dumps(snap_data, ensure_ascii=False, indent=2),
encoding="utf-8",
)
logger.info(f"[GATEWAY] chat received: project={project} len={len(content)}")
return web.json_response({"ok": True})
except Exception as e:
logger.error(f"[GATEWAY] chat error: {e}")
return web.json_response({"ok": False, "error": str(e)}, status=400)
# ─── Registration (Collector → Gateway) ───
async def _post_register(self, request: web.Request) -> web.Response:
"""Collector registers session → project mapping."""
try:
data = await request.json()
session_id = data.get("conversation_id", "")
project = data.get("project_name", "")
if session_id and project:
self.bot.conv_to_project[session_id] = project
logger.info(f"[GATEWAY] registered: {session_id[:8]}{project}")
return web.json_response({"ok": True})
except Exception as e:
return web.json_response({"ok": False, "error": str(e)}, status=400)
# ─── Commands (Gateway → Collector) ───
async def _get_commands(self, request: web.Request) -> web.Response:
"""Collector polls for commands (e.g. !auto, !stop, text messages)."""
project = request.match_info["project"]
commands = self._commands.pop(project, [])
return web.json_response({"commands": commands})
def push_command(self, project: str, command: dict):
"""Bot pushes a command for a Collector to pick up."""
if project not in self._commands:
self._commands[project] = []
self._commands[project].append(command)
# ─── Run ───
async def start(self):
"""Start the HTTP server."""
runner = web.AppRunner(self.app)
await runner.setup()
site = web.TCPSite(runner, self.host, self.port)
await site.start()
auth_status = "API Key enabled" if self.api_key else "⚠️ NO AUTH (set GATEWAY_API_KEY!)"
logger.info(f"[GATEWAY] HTTP API started on {self.host}:{self.port} [{auth_status}]")