- gateway.py: Collector↔Gateway HTTP API (pending, response, chat, register, commands) - Dockerfile + docker-compose.yml: BOT_MODE=gateway, port 8585 - main.py: gateway 모드 (watcher 비활성, GatewayAPI 시작) - config.py: gateway 모드 BRAIN_PATH 검증 스킵 - requirements.txt: aiohttp 추가 - docs/usage-guide.md: Docker 배포 섹션 추가 - Extension VSIX v0.3.9 빌드 (auto-approve 포함)
174 lines
7.4 KiB
Python
174 lines
7.4 KiB
Python
"""Gateway HTTP API — receives data from remote Collectors and routes to Discord bot.
|
|
|
|
Runs alongside the Discord bot in the server Docker container.
|
|
Collectors (local PCs) push pending approvals, chat snapshots, and registrations
|
|
to this API, and poll for responses.
|
|
|
|
Endpoints:
|
|
POST /api/pending — Collector pushes a new approval request
|
|
GET /api/pending — List all pending requests (for diagnostics)
|
|
POST /api/response/{rid} — Collector polls for response (or Gateway pushes)
|
|
GET /api/response/{rid} — Get response for a specific request
|
|
POST /api/chat — Collector pushes a chat snapshot
|
|
POST /api/register — Collector registers session → project mapping
|
|
POST /api/command — Gateway pushes command to specific collector
|
|
GET /api/commands/{project} — Collector polls for commands
|
|
GET /health — Health check
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import time
|
|
import logging
|
|
from aiohttp import web
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class GatewayAPI:
|
|
"""HTTP API server for Collector ↔ Gateway communication."""
|
|
|
|
def __init__(self, bot, host: str = "0.0.0.0", port: int = 8585):
|
|
self.bot = bot
|
|
self.host = host
|
|
self.port = port
|
|
self.app = web.Application()
|
|
self._setup_routes()
|
|
|
|
# In-memory stores (Gateway is stateless across restarts)
|
|
self._commands: dict[str, list[dict]] = {} # project → [command dicts]
|
|
|
|
def _setup_routes(self):
|
|
self.app.router.add_get("/health", self._health)
|
|
self.app.router.add_post("/api/pending", self._post_pending)
|
|
self.app.router.add_get("/api/pending", self._list_pending)
|
|
self.app.router.add_get("/api/response/{rid}", self._get_response)
|
|
self.app.router.add_post("/api/chat", self._post_chat)
|
|
self.app.router.add_post("/api/register", self._post_register)
|
|
self.app.router.add_get("/api/commands/{project}", self._get_commands)
|
|
|
|
# ─── Health ───
|
|
|
|
async def _health(self, request: web.Request) -> web.Response:
|
|
return web.json_response({
|
|
"status": "ok",
|
|
"bot_ready": self.bot.is_ready() if self.bot else False,
|
|
"timestamp": time.time(),
|
|
})
|
|
|
|
# ─── Pending Approvals (Collector → Gateway → Discord) ───
|
|
|
|
async def _post_pending(self, request: web.Request) -> web.Response:
|
|
"""Collector pushes a pending approval request."""
|
|
try:
|
|
data = await request.json()
|
|
rid = data.get("request_id", str(int(time.time() * 1000)))
|
|
data["request_id"] = rid
|
|
data.setdefault("timestamp", time.time())
|
|
data.setdefault("status", "pending")
|
|
|
|
# Write to bridge pending dir (bot's scanner will pick it up)
|
|
self.bot.bridge.transport.write_json("pending", f"{rid}.json", data)
|
|
logger.info(f"[GATEWAY] pending received: {rid[:12]} project={data.get('project_name', '?')}")
|
|
|
|
return web.json_response({"ok": True, "request_id": rid})
|
|
except Exception as e:
|
|
logger.error(f"[GATEWAY] pending error: {e}")
|
|
return web.json_response({"ok": False, "error": str(e)}, status=400)
|
|
|
|
async def _list_pending(self, request: web.Request) -> web.Response:
|
|
"""List all pending requests (diagnostics)."""
|
|
requests = self.bot.bridge.get_pending_requests()
|
|
return web.json_response([{
|
|
"request_id": r.request_id,
|
|
"command": r.command[:100],
|
|
"project_name": r.project_name,
|
|
"status": r.status,
|
|
} for r in requests])
|
|
|
|
# ─── Responses (Discord → Gateway → Collector) ───
|
|
|
|
async def _get_response(self, request: web.Request) -> web.Response:
|
|
"""Collector polls for a response to a specific pending request."""
|
|
rid = request.match_info["rid"]
|
|
data = self.bot.bridge.transport.read_json("response", f"{rid}.json")
|
|
if data is None:
|
|
return web.json_response({"waiting": True, "request_id": rid})
|
|
|
|
# Serve response and delete file (one-time consumption)
|
|
self.bot.bridge.transport.delete_file("response", f"{rid}.json")
|
|
return web.json_response(data)
|
|
|
|
# ─── Chat Snapshots (Collector → Gateway → Discord) ───
|
|
|
|
async def _post_chat(self, request: web.Request) -> web.Response:
|
|
"""Collector pushes a chat snapshot for relay to Discord."""
|
|
try:
|
|
data = await request.json()
|
|
project = data.get("project_name", "")
|
|
content = data.get("content", "")
|
|
|
|
if not project or not content:
|
|
return web.json_response({"ok": False, "error": "project_name and content required"}, status=400)
|
|
|
|
# Write to chat_snapshots dir for bot's scanner
|
|
snap_dir = self.bot.bridge.transport.bridge_dir / "chat_snapshots" if hasattr(self.bot.bridge.transport, 'bridge_dir') else None
|
|
if snap_dir:
|
|
snap_dir.mkdir(parents=True, exist_ok=True)
|
|
snap_id = f"{int(time.time() * 1000)}"
|
|
snap_data = {
|
|
"id": snap_id,
|
|
"project_name": project,
|
|
"content": content,
|
|
"timestamp": time.time(),
|
|
}
|
|
(snap_dir / f"{snap_id}.json").write_text(
|
|
json.dumps(snap_data, ensure_ascii=False, indent=2),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
logger.info(f"[GATEWAY] chat received: project={project} len={len(content)}")
|
|
return web.json_response({"ok": True})
|
|
except Exception as e:
|
|
logger.error(f"[GATEWAY] chat error: {e}")
|
|
return web.json_response({"ok": False, "error": str(e)}, status=400)
|
|
|
|
# ─── Registration (Collector → Gateway) ───
|
|
|
|
async def _post_register(self, request: web.Request) -> web.Response:
|
|
"""Collector registers session → project mapping."""
|
|
try:
|
|
data = await request.json()
|
|
session_id = data.get("conversation_id", "")
|
|
project = data.get("project_name", "")
|
|
if session_id and project:
|
|
self.bot.conv_to_project[session_id] = project
|
|
logger.info(f"[GATEWAY] registered: {session_id[:8]} → {project}")
|
|
return web.json_response({"ok": True})
|
|
except Exception as e:
|
|
return web.json_response({"ok": False, "error": str(e)}, status=400)
|
|
|
|
# ─── Commands (Gateway → Collector) ───
|
|
|
|
async def _get_commands(self, request: web.Request) -> web.Response:
|
|
"""Collector polls for commands (e.g. !auto, !stop, text messages)."""
|
|
project = request.match_info["project"]
|
|
commands = self._commands.pop(project, [])
|
|
return web.json_response({"commands": commands})
|
|
|
|
def push_command(self, project: str, command: dict):
|
|
"""Bot pushes a command for a Collector to pick up."""
|
|
if project not in self._commands:
|
|
self._commands[project] = []
|
|
self._commands[project].append(command)
|
|
|
|
# ─── Run ───
|
|
|
|
async def start(self):
|
|
"""Start the HTTP server."""
|
|
runner = web.AppRunner(self.app)
|
|
await runner.setup()
|
|
site = web.TCPSite(runner, self.host, self.port)
|
|
await site.start()
|
|
logger.info(f"[GATEWAY] HTTP API started on {self.host}:{self.port}")
|