"""Collector — local relay between Extension (file-based) and Gateway (HTTP). The Collector runs on the local PC alongside the AG IDE. It bridges the gap between the Extension (which writes to local bridge/ files) and the remote Gateway (which manages Discord). Flow: Extension → bridge/pending/ → Collector → POST Gateway /api/pending Gateway /api/response/{rid} → Collector → bridge/response/ → Extension Gateway /api/commands/{project} → Collector → bridge/commands/ → Extension """ import asyncio import json import time import logging from pathlib import Path from bridge import LocalTransport, RemoteTransport from config import Config logger = logging.getLogger(__name__) class CollectorBridge: """Bridges local file-based bridge with remote Gateway API. Periodically: 1. Scans local pending/ → forwards new ones to Gateway 2. Polls Gateway for responses → writes to local response/ 3. Polls Gateway for commands → writes to local commands/ """ def __init__(self, local: LocalTransport, remote: RemoteTransport, project_name: str): self.local = local self.remote = remote self.project_name = project_name self._forwarded_pending: set[str] = set() # already forwarded request IDs self._poll_interval = 3 # seconds self._running = False async def start(self): """Start the Collector polling loops.""" self._running = True logger.info(f"[COLLECTOR] started for project={self.project_name}") await asyncio.gather( self._forward_pending_loop(), self._poll_responses_loop(), self._poll_commands_loop(), ) async def stop(self): """Stop the Collector.""" self._running = False logger.info("[COLLECTOR] stopped") # ─── Forward local pending → Gateway ─── async def _forward_pending_loop(self): """Scan local pending/ and forward new requests to Gateway.""" while self._running: try: for fname in self.local.list_json_files("pending"): rid = fname.replace(".json", "") if rid in self._forwarded_pending: continue data = self.local.read_json("pending", fname) if data is None or data.get("status") != "pending": continue # Forward to Gateway self.remote.write_json("pending", fname, data) self._forwarded_pending.add(rid) logger.info(f"[COLLECTOR] → Gateway: pending {rid[:12]}") # Clean up stale forwarded tracking (keep last 200) if len(self._forwarded_pending) > 200: self._forwarded_pending = set(list(self._forwarded_pending)[-100:]) except Exception as e: logger.error(f"[COLLECTOR] forward_pending error: {e}") await asyncio.sleep(self._poll_interval) # ─── Poll Gateway responses → local ─── async def _poll_responses_loop(self): """Poll Gateway for responses and write them locally for Extension.""" while self._running: try: # Check each forwarded pending for a response for rid in list(self._forwarded_pending): data = self.remote.read_json("response", f"{rid}.json") if data is None or data.get("waiting"): continue # Write response locally for Extension to pick up self.local.write_json("response", f"{rid}.json", data) # Also delete local pending file (Extension expects this) self.local.delete_file("pending", f"{rid}.json") self._forwarded_pending.discard(rid) approved = data.get("approved", "?") logger.info(f"[COLLECTOR] ← Gateway: response {rid[:12]} approved={approved}") except Exception as e: logger.error(f"[COLLECTOR] poll_responses error: {e}") await asyncio.sleep(self._poll_interval) # ─── Poll Gateway commands → local ─── async def _poll_commands_loop(self): """Poll Gateway for commands and write them locally for Extension.""" while self._running: try: commands = self.remote.poll_commands(self.project_name) for cmd in commands: cmd_id = cmd.get("id", str(int(time.time() * 1000))) fname = f"{cmd_id}.json" self.local.write_json("commands", fname, cmd) logger.info(f"[COLLECTOR] ← Gateway: command {cmd.get('text', '?')[:30]}") except Exception as e: logger.error(f"[COLLECTOR] poll_commands error: {e}") await asyncio.sleep(self._poll_interval)