- DELETE collector.py (523줄) - main.py: BOT_MODE=remote 분기 제거 - gateway.py: Collector REST 6개 endpoint 제거 (311→168줄) - bridge.py: RemoteTransport 제거 (480→270줄) - config.py: REMOTE_BRIDGE_URL 제거 - extension.ts: dead code 4개 + stale module vars 제거 - step-probe.ts: getStepProbeContext() 추가, autoApproveEnabled 제거 - FIX: HttpBridgeContext stale primitive (getter 패턴으로 수정) - ADD: extension.log rotation (10MB→2MB tail) - docs: architecture.md, tech-stack.md, known-issues.md 업데이트
174 lines
6.8 KiB
Python
174 lines
6.8 KiB
Python
"""Gateway HTTP API + WebSocket Hub — serves WebSocket Hub and diagnostics.
|
|
|
|
Runs alongside the Discord bot in the server Docker container.
|
|
|
|
Endpoints:
|
|
GET /ws — WebSocket endpoint (Extension direct connection)
|
|
GET /api/pending — List all pending requests (for diagnostics)
|
|
GET /health — Health check
|
|
GET /hub/status — WebSocket Hub diagnostics
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import time
|
|
import logging
|
|
import uuid
|
|
from collections import defaultdict
|
|
from pathlib import Path
|
|
from aiohttp import web
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Rate limiting — burst-friendly: 10s window absorbs async loop wake-up bursts
|
|
RATE_LIMIT_WINDOW = 10.0 # seconds (was 1.0 — too strict for burst patterns)
|
|
RATE_LIMIT_MAX = 100 # max requests per window per IP
|
|
COMMAND_TTL = 1800 # 30 min — stale commands auto-deleted
|
|
|
|
|
|
class GatewayAPI:
|
|
"""HTTP API + WebSocket Hub server."""
|
|
|
|
def __init__(self, bot, host: str = "0.0.0.0", port: int = 8585,
|
|
api_key: str = "", hub=None):
|
|
self.bot = bot
|
|
self.host = host
|
|
self.port = port
|
|
self.api_key = api_key
|
|
self.hub = hub # WSHub instance (None = WS disabled)
|
|
self.app = web.Application(
|
|
middlewares=[self._auth_middleware],
|
|
client_max_size=1024 * 1024, # Security: 1MB max request body
|
|
)
|
|
self._setup_routes()
|
|
|
|
# In-memory stores
|
|
self._commands: dict[str, list[dict]] = {} # project → [command dicts]
|
|
self._rate_limits: dict[str, list[float]] = defaultdict(list) # IP → [timestamps]
|
|
|
|
def _setup_routes(self):
|
|
# WebSocket endpoint (no auth middleware — Hub handles its own auth)
|
|
self.app.router.add_get("/ws", self._ws_handler)
|
|
self.app.router.add_get("/hub/status", self._hub_status)
|
|
# REST endpoints
|
|
self.app.router.add_get("/health", self._health)
|
|
self.app.router.add_get("/api/pending", self._list_pending)
|
|
|
|
# ─── WebSocket Handler ───
|
|
|
|
async def _ws_handler(self, request: web.Request) -> web.WebSocketResponse:
|
|
"""WebSocket endpoint for direct Extension connections."""
|
|
if not self.hub:
|
|
return web.json_response(
|
|
{"error": "WebSocket Hub not enabled"}, status=503
|
|
)
|
|
return await self.hub.handle_ws(request)
|
|
|
|
async def _hub_status(self, request: web.Request) -> web.Response:
|
|
"""WebSocket Hub diagnostics."""
|
|
if not self.hub:
|
|
return web.json_response({"hub": "disabled"})
|
|
return web.json_response(self.hub.get_status())
|
|
|
|
# ─── Auth Middleware ───
|
|
|
|
@web.middleware
|
|
async def _auth_middleware(self, request: web.Request, handler):
|
|
"""Reject requests without valid API key on /api/* routes."""
|
|
# WebSocket and public endpoints skip API key auth
|
|
if request.path in ("/health", "/ws", "/hub/status"):
|
|
return await handler(request)
|
|
|
|
# All /api/* routes require auth + rate limit
|
|
if request.path.startswith("/api/"):
|
|
# Auth check
|
|
if self.api_key:
|
|
auth = request.headers.get("Authorization", "")
|
|
if auth != f"Bearer {self.api_key}":
|
|
logger.warning(f"[GATEWAY] 401 Unauthorized: {request.method} {request.path} from {request.remote}")
|
|
return web.json_response(
|
|
{"error": "Unauthorized", "detail": "Invalid or missing API key"},
|
|
status=401,
|
|
)
|
|
# Rate limit check
|
|
ip = request.remote or "unknown"
|
|
now = time.time()
|
|
window = [t for t in self._rate_limits[ip] if now - t < RATE_LIMIT_WINDOW]
|
|
if len(window) >= RATE_LIMIT_MAX:
|
|
logger.warning(f"[GATEWAY] 429 Rate limited: {ip}")
|
|
return web.json_response(
|
|
{"error": "Too Many Requests"},
|
|
status=429,
|
|
headers={"Retry-After": str(int(RATE_LIMIT_WINDOW * 2))},
|
|
)
|
|
window.append(now)
|
|
self._rate_limits[ip] = window
|
|
|
|
# Memory leak prevention: Cleanup stale IPs when mapping grows too large
|
|
if len(self._rate_limits) > 1000:
|
|
for k in list(self._rate_limits.keys()):
|
|
active = [t for t in self._rate_limits[k] if now - t < RATE_LIMIT_WINDOW]
|
|
if active:
|
|
self._rate_limits[k] = active
|
|
else:
|
|
del self._rate_limits[k]
|
|
|
|
return await handler(request)
|
|
|
|
# ─── Health ───
|
|
|
|
async def _health(self, request: web.Request) -> web.Response:
|
|
status = {
|
|
"status": "ok",
|
|
"bot_ready": self.bot.is_ready() if self.bot else False,
|
|
"timestamp": time.time(),
|
|
"hub_enabled": self.hub is not None,
|
|
}
|
|
if self.hub:
|
|
status["hub_connections"] = len(self.hub.connections)
|
|
return web.json_response(status)
|
|
|
|
# ─── Pending List (Diagnostics) ───
|
|
|
|
async def _list_pending(self, request: web.Request) -> web.Response:
|
|
"""List all pending requests (diagnostics)."""
|
|
requests = self.bot.bridge.get_pending_requests()
|
|
return web.json_response([{
|
|
"request_id": r.request_id,
|
|
"command": r.command[:100],
|
|
"project_name": r.project_name,
|
|
"status": r.status,
|
|
} for r in requests])
|
|
|
|
# ─── Commands (Legacy fallback: Bot → Extension via HTTP when Hub unavailable) ───
|
|
|
|
def push_command(self, project: str, command: dict):
|
|
"""Bot pushes a command for Extension to pick up (Hub fallback)."""
|
|
if project not in self._commands:
|
|
self._commands[project] = []
|
|
command.setdefault("_ts", time.time()) # TTL tracking
|
|
self._commands[project].append(command)
|
|
self._cleanup_stale_commands()
|
|
|
|
def _cleanup_stale_commands(self):
|
|
"""Remove commands older than COMMAND_TTL."""
|
|
now = time.time()
|
|
for project in list(self._commands.keys()):
|
|
self._commands[project] = [
|
|
cmd for cmd in self._commands[project]
|
|
if now - cmd.get("_ts", now) < COMMAND_TTL
|
|
]
|
|
if not self._commands[project]:
|
|
del self._commands[project]
|
|
|
|
# ─── Run ───
|
|
|
|
async def start(self):
|
|
"""Start the HTTP server."""
|
|
runner = web.AppRunner(self.app)
|
|
await runner.setup()
|
|
site = web.TCPSite(runner, self.host, self.port)
|
|
await site.start()
|
|
auth_status = "API Key enabled" if self.api_key else "⚠️ NO AUTH (set GATEWAY_API_KEY!)"
|
|
logger.info(f"[GATEWAY] HTTP API started on {self.host}:{self.port} [{auth_status}]")
|