"""Gateway HTTP API + WebSocket Hub — receives data from Collectors and Extensions. Runs alongside the Discord bot in the server Docker container. Supports both: - REST API: for legacy Collectors (HTTP polling) - WebSocket: for direct Extension connections (real-time) Endpoints: GET /ws — WebSocket endpoint (Extension direct connection) POST /api/pending — Collector pushes a new approval request GET /api/pending — List all pending requests (for diagnostics) POST /api/response/{rid} — Collector polls for response (or Gateway pushes) GET /api/response/{rid} — Get response for a specific request POST /api/chat — Collector pushes a chat snapshot POST /api/register — Collector registers session → project mapping POST /api/command — Gateway pushes command to specific collector GET /api/commands/{project} — Collector polls for commands GET /health — Health check GET /hub/status — WebSocket Hub diagnostics """ import asyncio import json import time import logging import uuid from collections import defaultdict from pathlib import Path from aiohttp import web logger = logging.getLogger(__name__) # Rate limiting — burst-friendly: 10s window absorbs async loop wake-up bursts RATE_LIMIT_WINDOW = 10.0 # seconds (was 1.0 — too strict for burst patterns) RATE_LIMIT_MAX = 100 # max requests per window per IP COMMAND_TTL = 1800 # 30 min — stale commands auto-deleted class GatewayAPI: """HTTP API + WebSocket Hub server.""" def __init__(self, bot, host: str = "0.0.0.0", port: int = 8585, api_key: str = "", hub=None): self.bot = bot self.host = host self.port = port self.api_key = api_key self.hub = hub # WSHub instance (None = WS disabled) self.app = web.Application( middlewares=[self._auth_middleware], client_max_size=1024 * 1024, # Security: 1MB max request body ) self._setup_routes() # In-memory stores self._commands: dict[str, list[dict]] = {} # project → [command dicts] self._rate_limits: dict[str, list[float]] = defaultdict(list) # IP → [timestamps] def _setup_routes(self): # WebSocket endpoint (no auth middleware — Hub handles its own auth) self.app.router.add_get("/ws", self._ws_handler) self.app.router.add_get("/hub/status", self._hub_status) # Legacy REST endpoints (Collector compatibility) self.app.router.add_get("/health", self._health) self.app.router.add_post("/api/pending", self._post_pending) self.app.router.add_get("/api/pending", self._list_pending) self.app.router.add_get("/api/response/{rid}", self._get_response) self.app.router.add_post("/api/chat", self._post_chat) self.app.router.add_post("/api/register", self._post_register) self.app.router.add_get("/api/commands/{project}", self._get_commands) self.app.router.add_post("/api/event", self._post_event) # ─── WebSocket Handler ─── async def _ws_handler(self, request: web.Request) -> web.WebSocketResponse: """WebSocket endpoint for direct Extension connections.""" if not self.hub: return web.json_response( {"error": "WebSocket Hub not enabled"}, status=503 ) return await self.hub.handle_ws(request) async def _hub_status(self, request: web.Request) -> web.Response: """WebSocket Hub diagnostics.""" if not self.hub: return web.json_response({"hub": "disabled"}) return web.json_response(self.hub.get_status()) # ─── Auth Middleware ─── @web.middleware async def _auth_middleware(self, request: web.Request, handler): """Reject requests without valid API key on /api/* routes.""" # WebSocket and public endpoints skip API key auth if request.path in ("/health", "/ws", "/hub/status"): return await handler(request) # All /api/* routes require auth + rate limit if request.path.startswith("/api/"): # Auth check if self.api_key: auth = request.headers.get("Authorization", "") if auth != f"Bearer {self.api_key}": logger.warning(f"[GATEWAY] 401 Unauthorized: {request.method} {request.path} from {request.remote}") return web.json_response( {"error": "Unauthorized", "detail": "Invalid or missing API key"}, status=401, ) # Rate limit check ip = request.remote or "unknown" now = time.time() window = [t for t in self._rate_limits[ip] if now - t < RATE_LIMIT_WINDOW] if len(window) >= RATE_LIMIT_MAX: logger.warning(f"[GATEWAY] 429 Rate limited: {ip}") return web.json_response( {"error": "Too Many Requests"}, status=429, headers={"Retry-After": str(int(RATE_LIMIT_WINDOW * 2))}, ) window.append(now) self._rate_limits[ip] = window # Memory leak prevention: Cleanup stale IPs when mapping grows too large if len(self._rate_limits) > 1000: for k in list(self._rate_limits.keys()): active = [t for t in self._rate_limits[k] if now - t < RATE_LIMIT_WINDOW] if active: self._rate_limits[k] = active else: del self._rate_limits[k] return await handler(request) # ─── Health ─── async def _health(self, request: web.Request) -> web.Response: status = { "status": "ok", "bot_ready": self.bot.is_ready() if self.bot else False, "timestamp": time.time(), "hub_enabled": self.hub is not None, } if self.hub: status["hub_connections"] = len(self.hub.connections) return web.json_response(status) # ─── Pending Approvals (Collector → Gateway → Discord) ─── async def _post_pending(self, request: web.Request) -> web.Response: """Collector pushes a pending approval request.""" try: data = await request.json() rid = data.get("request_id", f"{int(time.time() * 1000)}_{uuid.uuid4().hex[:8]}") data["request_id"] = rid data.setdefault("timestamp", time.time()) data.setdefault("status", "pending") # Write to bridge pending dir (bot's scanner will pick it up) self.bot.bridge.transport.write_json("pending", f"{rid}.json", data) logger.info(f"[GATEWAY] pending received: {rid[:12]} project={data.get('project_name', '?')}") return web.json_response({"ok": True, "request_id": rid}) except Exception as e: logger.error(f"[GATEWAY] pending error: {e}") return web.json_response({"ok": False, "error": str(e)}, status=400) async def _list_pending(self, request: web.Request) -> web.Response: """List all pending requests (diagnostics).""" requests = self.bot.bridge.get_pending_requests() return web.json_response([{ "request_id": r.request_id, "command": r.command[:100], "project_name": r.project_name, "status": r.status, } for r in requests]) # ─── Responses (Discord → Gateway → Collector) ─── async def _get_response(self, request: web.Request) -> web.Response: """Collector polls for a response to a specific pending request.""" rid = request.match_info["rid"] data = self.bot.bridge.transport.read_json("response", f"{rid}.json") if data is None: return web.json_response({"waiting": True, "request_id": rid}) # Serve response and delete both response + pending files (one-time consumption) self.bot.bridge.transport.delete_file("response", f"{rid}.json") self.bot.bridge.transport.delete_file("pending", f"{rid}.json") # Bug 2 fix return web.json_response(data) # ─── Chat Snapshots (Collector → Gateway → Discord) ─── async def _post_chat(self, request: web.Request) -> web.Response: """Collector pushes a chat snapshot for relay to Discord.""" try: data = await request.json() project = data.get("project_name", "") content = data.get("content", "") attached_files = data.get("attached_files", []) if not project or (not content and not attached_files): return web.json_response({"ok": False, "error": "project_name and content/attached_files required"}, status=400) # Write to chat_snapshots dir for bot's scanner snap_dir = self.bot.bridge.transport.bridge_dir / "chat_snapshots" if hasattr(self.bot.bridge.transport, 'bridge_dir') else None if snap_dir: snap_dir.mkdir(parents=True, exist_ok=True) snap_id = f"{int(time.time() * 1000)}_{uuid.uuid4().hex[:8]}" snap_data = { "id": snap_id, "project_name": project, "content": content, "timestamp": time.time(), } if attached_files: snap_data["attached_files"] = attached_files (snap_dir / f"{snap_id}.json").write_text( json.dumps(snap_data, ensure_ascii=False, indent=2), encoding="utf-8", ) af_info = f" +{len(attached_files)} files" if attached_files else "" logger.info(f"[GATEWAY] chat received: project={project} len={len(content)}{af_info}") return web.json_response({"ok": True}) except Exception as e: logger.error(f"[GATEWAY] chat error: {e}") return web.json_response({"ok": False, "error": str(e)}, status=400) # ─── Registration (Collector → Gateway) ─── async def _post_register(self, request: web.Request) -> web.Response: """Collector registers session → project mapping.""" try: data = await request.json() session_id = data.get("conversation_id", "") project = data.get("project_name", "") if session_id and project: self.bot.conv_to_project[session_id] = project logger.info(f"[GATEWAY] registered: {session_id[:8]} → {project}") return web.json_response({"ok": True}) except Exception as e: return web.json_response({"ok": False, "error": str(e)}, status=400) # ─── Commands (Gateway → Collector) ─── async def _get_commands(self, request: web.Request) -> web.Response: """Collector polls for commands (e.g. !auto, !stop, text messages).""" project = request.match_info["project"] commands = self._commands.pop(project, []) return web.json_response({"commands": commands}) def push_command(self, project: str, command: dict): """Bot pushes a command for a Collector to pick up.""" if project not in self._commands: self._commands[project] = [] command.setdefault("_ts", time.time()) # TTL tracking self._commands[project].append(command) # Auto-cleanup stale commands (Security 3: memory leak prevention) self._cleanup_stale_commands() def _cleanup_stale_commands(self): """Remove commands older than COMMAND_TTL.""" now = time.time() for project in list(self._commands.keys()): self._commands[project] = [ cmd for cmd in self._commands[project] if now - cmd.get("_ts", now) < COMMAND_TTL ] if not self._commands[project]: del self._commands[project] # ─── Brain Events (Collector → Gateway → Discord) ─── async def _post_event(self, request: web.Request) -> web.Response: """Collector pushes a brain event (file change) for relay to Discord.""" try: data = await request.json() from watcher import BrainEvent, EventType event_type_str = data.get("event_type", "file_changed") event_type = EventType(event_type_str) event = BrainEvent( event_type=event_type, conversation_id=data.get("conversation_id", ""), file_name=data.get("file_name", ""), file_path=Path(data["file_path"]) if data.get("file_path") else None, content=data.get("content", ""), timestamp=data.get("timestamp", time.time()), ) # Inject into bot's event queue (same path as local mode) await self.bot.event_queue.put(event) logger.info(f"[GATEWAY] event received: {event_type_str} {event.file_name} conv={event.conversation_id[:8]}") return web.json_response({"ok": True}) except Exception as e: logger.error(f"[GATEWAY] event error: {e}") return web.json_response({"ok": False, "error": str(e)}, status=400) # ─── Run ─── async def start(self): """Start the HTTP server.""" runner = web.AppRunner(self.app) await runner.setup() site = web.TCPSite(runner, self.host, self.port) await site.start() auth_status = "API Key enabled" if self.api_key else "⚠️ NO AUTH (set GATEWAY_API_KEY!)" logger.info(f"[GATEWAY] HTTP API started on {self.host}:{self.port} [{auth_status}]")