Files
gravity_control/gateway.py
Variet Worker 58a421f5a6 fix: 전체 시스템 감사 — 6건 수정 (보안 + 안정성)
Bug 1 (만료됨 스팸): Collector 시작 시 기존 pending skip
Bug 2 (pending 미삭제): Gateway에서 response 소비 시 pending도 삭제
Bug 3 (재시작 중복): Bug 1로 해결

Security 1: API 요청 1MB 크기 제한 (client_max_size)
Security 2: IP별 rate limiting (10 req/s)
Security 3: _commands 메모리 누수 방지 (TTL 30분)
2026-03-11 22:42:05 +09:00

267 lines
11 KiB
Python

"""Gateway HTTP API — receives data from remote Collectors and routes to Discord bot.
Runs alongside the Discord bot in the server Docker container.
Collectors (local PCs) push pending approvals, chat snapshots, and registrations
to this API, and poll for responses.
Endpoints:
POST /api/pending — Collector pushes a new approval request
GET /api/pending — List all pending requests (for diagnostics)
POST /api/response/{rid} — Collector polls for response (or Gateway pushes)
GET /api/response/{rid} — Get response for a specific request
POST /api/chat — Collector pushes a chat snapshot
POST /api/register — Collector registers session → project mapping
POST /api/command — Gateway pushes command to specific collector
GET /api/commands/{project} — Collector polls for commands
GET /health — Health check
"""
import asyncio
import json
import time
import logging
from collections import defaultdict
from pathlib import Path
from aiohttp import web
logger = logging.getLogger(__name__)
# Rate limiting
RATE_LIMIT_WINDOW = 1.0 # seconds
RATE_LIMIT_MAX = 10 # max requests per window per IP
COMMAND_TTL = 1800 # 30 min — stale commands auto-deleted
class GatewayAPI:
"""HTTP API server for Collector ↔ Gateway communication."""
def __init__(self, bot, host: str = "0.0.0.0", port: int = 8585, api_key: str = ""):
self.bot = bot
self.host = host
self.port = port
self.api_key = api_key
self.app = web.Application(
middlewares=[self._auth_middleware],
client_max_size=1024 * 1024, # Security: 1MB max request body
)
self._setup_routes()
# In-memory stores
self._commands: dict[str, list[dict]] = {} # project → [command dicts]
self._rate_limits: dict[str, list[float]] = defaultdict(list) # IP → [timestamps]
def _setup_routes(self):
self.app.router.add_get("/health", self._health)
self.app.router.add_post("/api/pending", self._post_pending)
self.app.router.add_get("/api/pending", self._list_pending)
self.app.router.add_get("/api/response/{rid}", self._get_response)
self.app.router.add_post("/api/chat", self._post_chat)
self.app.router.add_post("/api/register", self._post_register)
self.app.router.add_get("/api/commands/{project}", self._get_commands)
self.app.router.add_post("/api/event", self._post_event)
# ─── Auth Middleware ───
@web.middleware
async def _auth_middleware(self, request: web.Request, handler):
"""Reject requests without valid API key on /api/* routes."""
# Health endpoint is public
if request.path == "/health":
return await handler(request)
# All /api/* routes require auth + rate limit
if request.path.startswith("/api/"):
# Auth check
if self.api_key:
auth = request.headers.get("Authorization", "")
if auth != f"Bearer {self.api_key}":
logger.warning(f"[GATEWAY] 401 Unauthorized: {request.method} {request.path} from {request.remote}")
return web.json_response(
{"error": "Unauthorized", "detail": "Invalid or missing API key"},
status=401,
)
# Rate limit check
ip = request.remote or "unknown"
now = time.time()
window = [t for t in self._rate_limits[ip] if now - t < RATE_LIMIT_WINDOW]
if len(window) >= RATE_LIMIT_MAX:
logger.warning(f"[GATEWAY] 429 Rate limited: {ip}")
return web.json_response(
{"error": "Too Many Requests"},
status=429,
)
window.append(now)
self._rate_limits[ip] = window
return await handler(request)
# ─── Health ───
async def _health(self, request: web.Request) -> web.Response:
return web.json_response({
"status": "ok",
"bot_ready": self.bot.is_ready() if self.bot else False,
"timestamp": time.time(),
})
# ─── Pending Approvals (Collector → Gateway → Discord) ───
async def _post_pending(self, request: web.Request) -> web.Response:
"""Collector pushes a pending approval request."""
try:
data = await request.json()
rid = data.get("request_id", str(int(time.time() * 1000)))
data["request_id"] = rid
data.setdefault("timestamp", time.time())
data.setdefault("status", "pending")
# Write to bridge pending dir (bot's scanner will pick it up)
self.bot.bridge.transport.write_json("pending", f"{rid}.json", data)
logger.info(f"[GATEWAY] pending received: {rid[:12]} project={data.get('project_name', '?')}")
return web.json_response({"ok": True, "request_id": rid})
except Exception as e:
logger.error(f"[GATEWAY] pending error: {e}")
return web.json_response({"ok": False, "error": str(e)}, status=400)
async def _list_pending(self, request: web.Request) -> web.Response:
"""List all pending requests (diagnostics)."""
requests = self.bot.bridge.get_pending_requests()
return web.json_response([{
"request_id": r.request_id,
"command": r.command[:100],
"project_name": r.project_name,
"status": r.status,
} for r in requests])
# ─── Responses (Discord → Gateway → Collector) ───
async def _get_response(self, request: web.Request) -> web.Response:
"""Collector polls for a response to a specific pending request."""
rid = request.match_info["rid"]
data = self.bot.bridge.transport.read_json("response", f"{rid}.json")
if data is None:
return web.json_response({"waiting": True, "request_id": rid})
# Serve response and delete both response + pending files (one-time consumption)
self.bot.bridge.transport.delete_file("response", f"{rid}.json")
self.bot.bridge.transport.delete_file("pending", f"{rid}.json") # Bug 2 fix
return web.json_response(data)
# ─── Chat Snapshots (Collector → Gateway → Discord) ───
async def _post_chat(self, request: web.Request) -> web.Response:
"""Collector pushes a chat snapshot for relay to Discord."""
try:
data = await request.json()
project = data.get("project_name", "")
content = data.get("content", "")
if not project or not content:
return web.json_response({"ok": False, "error": "project_name and content required"}, status=400)
# Write to chat_snapshots dir for bot's scanner
snap_dir = self.bot.bridge.transport.bridge_dir / "chat_snapshots" if hasattr(self.bot.bridge.transport, 'bridge_dir') else None
if snap_dir:
snap_dir.mkdir(parents=True, exist_ok=True)
snap_id = f"{int(time.time() * 1000)}"
snap_data = {
"id": snap_id,
"project_name": project,
"content": content,
"timestamp": time.time(),
}
(snap_dir / f"{snap_id}.json").write_text(
json.dumps(snap_data, ensure_ascii=False, indent=2),
encoding="utf-8",
)
logger.info(f"[GATEWAY] chat received: project={project} len={len(content)}")
return web.json_response({"ok": True})
except Exception as e:
logger.error(f"[GATEWAY] chat error: {e}")
return web.json_response({"ok": False, "error": str(e)}, status=400)
# ─── Registration (Collector → Gateway) ───
async def _post_register(self, request: web.Request) -> web.Response:
"""Collector registers session → project mapping."""
try:
data = await request.json()
session_id = data.get("conversation_id", "")
project = data.get("project_name", "")
if session_id and project:
self.bot.conv_to_project[session_id] = project
logger.info(f"[GATEWAY] registered: {session_id[:8]}{project}")
return web.json_response({"ok": True})
except Exception as e:
return web.json_response({"ok": False, "error": str(e)}, status=400)
# ─── Commands (Gateway → Collector) ───
async def _get_commands(self, request: web.Request) -> web.Response:
"""Collector polls for commands (e.g. !auto, !stop, text messages)."""
project = request.match_info["project"]
commands = self._commands.pop(project, [])
return web.json_response({"commands": commands})
def push_command(self, project: str, command: dict):
"""Bot pushes a command for a Collector to pick up."""
if project not in self._commands:
self._commands[project] = []
command.setdefault("_ts", time.time()) # TTL tracking
self._commands[project].append(command)
# Auto-cleanup stale commands (Security 3: memory leak prevention)
self._cleanup_stale_commands()
def _cleanup_stale_commands(self):
"""Remove commands older than COMMAND_TTL."""
now = time.time()
for project in list(self._commands.keys()):
self._commands[project] = [
cmd for cmd in self._commands[project]
if now - cmd.get("_ts", now) < COMMAND_TTL
]
if not self._commands[project]:
del self._commands[project]
# ─── Brain Events (Collector → Gateway → Discord) ───
async def _post_event(self, request: web.Request) -> web.Response:
"""Collector pushes a brain event (file change) for relay to Discord."""
try:
data = await request.json()
from watcher import BrainEvent, EventType
event_type_str = data.get("event_type", "file_changed")
event_type = EventType(event_type_str)
event = BrainEvent(
event_type=event_type,
conversation_id=data.get("conversation_id", ""),
file_name=data.get("file_name", ""),
file_path=Path(data["file_path"]) if data.get("file_path") else None,
content=data.get("content", ""),
timestamp=data.get("timestamp", time.time()),
)
# Inject into bot's event queue (same path as local mode)
await self.bot.event_queue.put(event)
logger.info(f"[GATEWAY] event received: {event_type_str} {event.file_name} conv={event.conversation_id[:8]}")
return web.json_response({"ok": True})
except Exception as e:
logger.error(f"[GATEWAY] event error: {e}")
return web.json_response({"ok": False, "error": str(e)}, status=400)
# ─── Run ───
async def start(self):
"""Start the HTTP server."""
runner = web.AppRunner(self.app)
await runner.setup()
site = web.TCPSite(runner, self.host, self.port)
await site.start()
auth_status = "API Key enabled" if self.api_key else "⚠️ NO AUTH (set GATEWAY_API_KEY!)"
logger.info(f"[GATEWAY] HTTP API started on {self.host}:{self.port} [{auth_status}]")