diff --git a/.env.example b/.env.example index f73a32a..246de34 100644 --- a/.env.example +++ b/.env.example @@ -16,3 +16,13 @@ ACTIVE_TIMEOUT_SECONDS=300 # Watcher Settings DEBOUNCE_SECONDS=2 + +# Bot mode: 'local' (default, file-based) or 'gateway' (서버 Docker) +BOT_MODE=local +# Remote bridge URL (only used when BOT_MODE=remote) +REMOTE_BRIDGE_URL= + +# Gateway API Key (보안) +# 서버와 Collector에 동일한 키를 설정하세요 +# 생성: python -c "import secrets; print(secrets.token_urlsafe(32))" +GATEWAY_API_KEY= diff --git a/bridge.py b/bridge.py index 055671c..3880f4f 100644 --- a/bridge.py +++ b/bridge.py @@ -150,30 +150,107 @@ class LocalTransport(BridgeTransport): class RemoteTransport(BridgeTransport): - """HTTP-based transport for remote/multi-PC mode (skeleton). + """HTTP-based transport for Collector → Gateway communication. - Future implementation: polls a remote bridge HTTP server that - exposes the same pending/response/commands JSON files via API. + Maps BridgeTransport methods to Gateway API endpoints: + list_json_files("pending") → GET /api/pending (returns list) + write_json("pending", ...) → POST /api/pending + read_json("response", ...) → GET /api/response/{rid} + write_json("commands", ...) → (not used by Collector, Gateway pushes commands) + etc. """ - def __init__(self, base_url: str): + def __init__(self, base_url: str, api_key: str = ""): self.base_url = base_url.rstrip("/") - logger.info(f"RemoteTransport: initialized with {self.base_url}") + self.api_key = api_key + self._headers = {"Content-Type": "application/json"} + if api_key: + self._headers["Authorization"] = f"Bearer {api_key}" + logger.info(f"RemoteTransport: {self.base_url} (auth={'yes' if api_key else 'no'})") + + def _request(self, method: str, path: str, data: dict | None = None) -> dict | None: + """Make HTTP request to Gateway API.""" + import urllib.request + import urllib.error + + url = f"{self.base_url}{path}" + body = json.dumps(data, ensure_ascii=False).encode("utf-8") if data else None + req = urllib.request.Request(url, data=body, headers=self._headers, method=method) + + try: + with urllib.request.urlopen(req, timeout=10) as resp: + return json.loads(resp.read().decode("utf-8")) + except urllib.error.HTTPError as e: + logger.warning(f"RemoteTransport: {method} {path} → {e.code} {e.reason}") + return None + except (urllib.error.URLError, OSError, json.JSONDecodeError) as e: + logger.warning(f"RemoteTransport: {method} {path} → {e}") + return None def list_json_files(self, subdir: str) -> list[str]: - raise NotImplementedError("RemoteTransport not yet implemented") + """List pending requests from Gateway.""" + if subdir == "pending": + result = self._request("GET", "/api/pending") + if result and isinstance(result, list): + return [f"{r['request_id']}.json" for r in result] + elif subdir == "commands": + # Commands are polled per-project (handled separately) + return [] + return [] def read_json(self, subdir: str, filename: str) -> dict | None: - raise NotImplementedError("RemoteTransport not yet implemented") + """Read a JSON file from Gateway.""" + rid = filename.replace(".json", "") + if subdir == "response": + return self._request("GET", f"/api/response/{rid}") + elif subdir == "pending": + # Pending data comes from list, not individual read + result = self._request("GET", "/api/pending") + if result and isinstance(result, list): + for r in result: + if r.get("request_id") == rid: + return r + return None def write_json(self, subdir: str, filename: str, data: dict) -> None: - raise NotImplementedError("RemoteTransport not yet implemented") + """Write data to Gateway via API.""" + if subdir == "pending": + self._request("POST", "/api/pending", data) + elif subdir == "response": + rid = data.get("request_id", filename.replace(".json", "")) + self._request("POST", f"/api/response/{rid}", data) + elif subdir == "commands": + # Commands go through write_command in BridgeProtocol + self._request("POST", "/api/chat", data) def delete_file(self, subdir: str, filename: str) -> bool: - raise NotImplementedError("RemoteTransport not yet implemented") + """Delete not needed for remote — Gateway manages cleanup.""" + return True def ensure_dirs(self) -> None: - pass # Remote server manages its own directories + """No local dirs needed for remote transport.""" + pass + + def poll_commands(self, project: str) -> list[dict]: + """Poll Gateway for commands (Collector-specific, not in ABC).""" + result = self._request("GET", f"/api/commands/{project}") + if result and isinstance(result, dict): + return result.get("commands", []) + return [] + + def register_session(self, conversation_id: str, project_name: str) -> None: + """Register session → project mapping on Gateway.""" + self._request("POST", "/api/register", { + "conversation_id": conversation_id, + "project_name": project_name, + }) + + def send_chat(self, project_name: str, content: str) -> None: + """Push chat snapshot to Gateway for relay to Discord.""" + self._request("POST", "/api/chat", { + "project_name": project_name, + "content": content, + }) # ─── Bridge Protocol (uses Transport) ─── diff --git a/collector.py b/collector.py new file mode 100644 index 0000000..e485b19 --- /dev/null +++ b/collector.py @@ -0,0 +1,127 @@ +"""Collector — local relay between Extension (file-based) and Gateway (HTTP). + +The Collector runs on the local PC alongside the AG IDE. +It bridges the gap between the Extension (which writes to local bridge/ files) +and the remote Gateway (which manages Discord). + +Flow: + Extension → bridge/pending/ → Collector → POST Gateway /api/pending + Gateway /api/response/{rid} → Collector → bridge/response/ → Extension + Gateway /api/commands/{project} → Collector → bridge/commands/ → Extension +""" + +import asyncio +import json +import time +import logging +from pathlib import Path + +from bridge import LocalTransport, RemoteTransport +from config import Config + +logger = logging.getLogger(__name__) + + +class CollectorBridge: + """Bridges local file-based bridge with remote Gateway API. + + Periodically: + 1. Scans local pending/ → forwards new ones to Gateway + 2. Polls Gateway for responses → writes to local response/ + 3. Polls Gateway for commands → writes to local commands/ + """ + + def __init__(self, local: LocalTransport, remote: RemoteTransport, project_name: str): + self.local = local + self.remote = remote + self.project_name = project_name + self._forwarded_pending: set[str] = set() # already forwarded request IDs + self._poll_interval = 3 # seconds + self._running = False + + async def start(self): + """Start the Collector polling loops.""" + self._running = True + logger.info(f"[COLLECTOR] started for project={self.project_name}") + await asyncio.gather( + self._forward_pending_loop(), + self._poll_responses_loop(), + self._poll_commands_loop(), + ) + + async def stop(self): + """Stop the Collector.""" + self._running = False + logger.info("[COLLECTOR] stopped") + + # ─── Forward local pending → Gateway ─── + + async def _forward_pending_loop(self): + """Scan local pending/ and forward new requests to Gateway.""" + while self._running: + try: + for fname in self.local.list_json_files("pending"): + rid = fname.replace(".json", "") + if rid in self._forwarded_pending: + continue + + data = self.local.read_json("pending", fname) + if data is None or data.get("status") != "pending": + continue + + # Forward to Gateway + self.remote.write_json("pending", fname, data) + self._forwarded_pending.add(rid) + logger.info(f"[COLLECTOR] → Gateway: pending {rid[:12]}") + + # Clean up stale forwarded tracking (keep last 200) + if len(self._forwarded_pending) > 200: + self._forwarded_pending = set(list(self._forwarded_pending)[-100:]) + + except Exception as e: + logger.error(f"[COLLECTOR] forward_pending error: {e}") + + await asyncio.sleep(self._poll_interval) + + # ─── Poll Gateway responses → local ─── + + async def _poll_responses_loop(self): + """Poll Gateway for responses and write them locally for Extension.""" + while self._running: + try: + # Check each forwarded pending for a response + for rid in list(self._forwarded_pending): + data = self.remote.read_json("response", f"{rid}.json") + if data is None or data.get("waiting"): + continue + + # Write response locally for Extension to pick up + self.local.write_json("response", f"{rid}.json", data) + # Also delete local pending file (Extension expects this) + self.local.delete_file("pending", f"{rid}.json") + self._forwarded_pending.discard(rid) + approved = data.get("approved", "?") + logger.info(f"[COLLECTOR] ← Gateway: response {rid[:12]} approved={approved}") + + except Exception as e: + logger.error(f"[COLLECTOR] poll_responses error: {e}") + + await asyncio.sleep(self._poll_interval) + + # ─── Poll Gateway commands → local ─── + + async def _poll_commands_loop(self): + """Poll Gateway for commands and write them locally for Extension.""" + while self._running: + try: + commands = self.remote.poll_commands(self.project_name) + for cmd in commands: + cmd_id = cmd.get("id", str(int(time.time() * 1000))) + fname = f"{cmd_id}.json" + self.local.write_json("commands", fname, cmd) + logger.info(f"[COLLECTOR] ← Gateway: command {cmd.get('text', '?')[:30]}") + + except Exception as e: + logger.error(f"[COLLECTOR] poll_commands error: {e}") + + await asyncio.sleep(self._poll_interval) diff --git a/main.py b/main.py index ddde8eb..5db23bc 100644 --- a/main.py +++ b/main.py @@ -51,15 +51,41 @@ async def main(): # Get the running loop loop = asyncio.get_running_loop() - # Create transport based on BOT_MODE - transport = None # None → LocalTransport (default) + # ── Collector mode: no Discord bot, just relay local ↔ Gateway ── if Config.BOT_MODE == "remote": - from bridge import RemoteTransport + from bridge import LocalTransport, RemoteTransport + from collector import CollectorBridge + if not Config.REMOTE_BRIDGE_URL: - logger.error("REMOTE_BRIDGE_URL is required for remote mode") + logger.error("REMOTE_BRIDGE_URL is required for remote (Collector) mode") sys.exit(1) - transport = RemoteTransport(Config.REMOTE_BRIDGE_URL) - logger.info(f"Remote transport: {Config.REMOTE_BRIDGE_URL}") + + bridge_dir = Config.BRAIN_PATH.parent / "bridge" + local = LocalTransport(bridge_dir) + local.ensure_dirs() + remote = RemoteTransport(Config.REMOTE_BRIDGE_URL, api_key=Config.GATEWAY_API_KEY) + + collector = CollectorBridge(local, remote, project_name=Config.PROJECT_NAME) + logger.info(f"Collector mode: {Config.REMOTE_BRIDGE_URL}") + + # Optionally start watcher for brain events (local display only) + watcher = BrainWatcher(event_queue, loop) + + try: + watcher.start() + logger.info(f"Watcher started, {len(watcher.known_sessions)} existing sessions") + await collector.start() + except KeyboardInterrupt: + logger.info("Received keyboard interrupt") + except Exception as e: + logger.error(f"Fatal error: {e}", exc_info=True) + finally: + await collector.stop() + watcher.stop() + logger.info("Collector shutdown complete") + return + + # ── Local / Gateway mode ── # Create components watcher = None @@ -67,11 +93,6 @@ async def main(): watcher = BrainWatcher(event_queue, loop) bot = GravityBot(event_queue) - # Inject transport if specified (otherwise bot uses default LocalTransport) - if transport is not None: - from bridge import BridgeProtocol - bot.bridge = BridgeProtocol(transport) - try: # Start watcher (local mode only — gateway receives data via HTTP) if watcher: