From 58a421f5a6df77d7539e536b6b5151a6c3ed30a1 Mon Sep 17 00:00:00 2001 From: Variet Worker Date: Wed, 11 Mar 2026 22:42:05 +0900 Subject: [PATCH] =?UTF-8?q?fix:=20=EC=A0=84=EC=B2=B4=20=EC=8B=9C=EC=8A=A4?= =?UTF-8?q?=ED=85=9C=20=EA=B0=90=EC=82=AC=20=E2=80=94=206=EA=B1=B4=20?= =?UTF-8?q?=EC=88=98=EC=A0=95=20(=EB=B3=B4=EC=95=88=20+=20=EC=95=88?= =?UTF-8?q?=EC=A0=95=EC=84=B1)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug 1 (만료됨 스팸): Collector 시작 시 기존 pending skip Bug 2 (pending 미삭제): Gateway에서 response 소비 시 pending도 삭제 Bug 3 (재시작 중복): Bug 1로 해결 Security 1: API 요청 1MB 크기 제한 (client_max_size) Security 2: IP별 rate limiting (10 req/s) Security 3: _commands 메모리 누수 방지 (TTL 30분) --- collector.py | 9 +++++++- gateway.py | 59 +++++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 57 insertions(+), 11 deletions(-) diff --git a/collector.py b/collector.py index b81f2a4..b9a0eca 100644 --- a/collector.py +++ b/collector.py @@ -38,10 +38,17 @@ class CollectorBridge: self.remote = remote self.project_name = project_name self.event_queue = event_queue - self._forwarded_pending: set[str] = set() # already forwarded request IDs self._poll_interval = 3 # seconds self._running = False + # Pre-populate with existing pending files → skip on startup (prevents 만료됨 spam) + self._forwarded_pending: set[str] = set() + for fname in self.local.list_json_files("pending"): + rid = fname.replace(".json", "") + self._forwarded_pending.add(rid) + if self._forwarded_pending: + logger.info(f"[COLLECTOR] skipping {len(self._forwarded_pending)} existing pending files") + async def start(self): """Start the Collector polling loops.""" self._running = True diff --git a/gateway.py b/gateway.py index 4b1a036..999a9d6 100644 --- a/gateway.py +++ b/gateway.py @@ -20,11 +20,17 @@ import asyncio import json import time import logging +from collections import defaultdict from pathlib import Path from aiohttp import web logger = logging.getLogger(__name__) +# Rate limiting +RATE_LIMIT_WINDOW = 1.0 # seconds +RATE_LIMIT_MAX = 10 # max requests per window per IP +COMMAND_TTL = 1800 # 30 min — stale commands auto-deleted + class GatewayAPI: """HTTP API server for Collector ↔ Gateway communication.""" @@ -34,11 +40,15 @@ class GatewayAPI: self.host = host self.port = port self.api_key = api_key - self.app = web.Application(middlewares=[self._auth_middleware]) + self.app = web.Application( + middlewares=[self._auth_middleware], + client_max_size=1024 * 1024, # Security: 1MB max request body + ) self._setup_routes() - # In-memory stores (Gateway is stateless across restarts) + # In-memory stores self._commands: dict[str, list[dict]] = {} # project → [command dicts] + self._rate_limits: dict[str, list[float]] = defaultdict(list) # IP → [timestamps] def _setup_routes(self): self.app.router.add_get("/health", self._health) @@ -59,15 +69,29 @@ class GatewayAPI: if request.path == "/health": return await handler(request) - # All /api/* routes require auth - if request.path.startswith("/api/") and self.api_key: - auth = request.headers.get("Authorization", "") - if auth != f"Bearer {self.api_key}": - logger.warning(f"[GATEWAY] 401 Unauthorized: {request.method} {request.path} from {request.remote}") + # All /api/* routes require auth + rate limit + if request.path.startswith("/api/"): + # Auth check + if self.api_key: + auth = request.headers.get("Authorization", "") + if auth != f"Bearer {self.api_key}": + logger.warning(f"[GATEWAY] 401 Unauthorized: {request.method} {request.path} from {request.remote}") + return web.json_response( + {"error": "Unauthorized", "detail": "Invalid or missing API key"}, + status=401, + ) + # Rate limit check + ip = request.remote or "unknown" + now = time.time() + window = [t for t in self._rate_limits[ip] if now - t < RATE_LIMIT_WINDOW] + if len(window) >= RATE_LIMIT_MAX: + logger.warning(f"[GATEWAY] 429 Rate limited: {ip}") return web.json_response( - {"error": "Unauthorized", "detail": "Invalid or missing API key"}, - status=401, + {"error": "Too Many Requests"}, + status=429, ) + window.append(now) + self._rate_limits[ip] = window return await handler(request) @@ -119,8 +143,9 @@ class GatewayAPI: if data is None: return web.json_response({"waiting": True, "request_id": rid}) - # Serve response and delete file (one-time consumption) + # Serve response and delete both response + pending files (one-time consumption) self.bot.bridge.transport.delete_file("response", f"{rid}.json") + self.bot.bridge.transport.delete_file("pending", f"{rid}.json") # Bug 2 fix return web.json_response(data) # ─── Chat Snapshots (Collector → Gateway → Discord) ─── @@ -184,7 +209,21 @@ class GatewayAPI: """Bot pushes a command for a Collector to pick up.""" if project not in self._commands: self._commands[project] = [] + command.setdefault("_ts", time.time()) # TTL tracking self._commands[project].append(command) + # Auto-cleanup stale commands (Security 3: memory leak prevention) + self._cleanup_stale_commands() + + def _cleanup_stale_commands(self): + """Remove commands older than COMMAND_TTL.""" + now = time.time() + for project in list(self._commands.keys()): + self._commands[project] = [ + cmd for cmd in self._commands[project] + if now - cmd.get("_ts", now) < COMMAND_TTL + ] + if not self._commands[project]: + del self._commands[project] # ─── Brain Events (Collector → Gateway → Discord) ───