"""Collector — local relay between Extension (file-based) and Gateway (HTTP). The Collector runs on the local PC alongside the AG IDE. It bridges the gap between the Extension (which writes to local bridge/ files) and the remote Gateway (which manages Discord). Flow: Extension → bridge/pending/ → Collector → POST Gateway /api/pending Gateway /api/response/{rid} → Collector → bridge/response/ → Extension Gateway /api/commands/{project} → Collector → bridge/commands/ → Extension """ import asyncio import json import time import logging from pathlib import Path from bridge import LocalTransport, RemoteTransport from config import Config from watcher import BrainEvent, EventType logger = logging.getLogger(__name__) class CollectorBridge: """Bridges local file-based bridge with remote Gateway API. Periodically: 1. Scans local pending/ → forwards new ones to Gateway 2. Polls Gateway for responses → writes to local response/ 3. Polls Gateway for commands → writes to local commands/ """ def __init__(self, local: LocalTransport, remote: RemoteTransport, project_name: str, event_queue: asyncio.Queue | None = None): self.local = local self.remote = remote self.project_name = project_name self.event_queue = event_queue self._poll_interval = 3 # seconds self._running = False # Pre-populate with existing pending files → skip on startup (prevents 만료됨 spam) self._forwarded_pending: set[str] = set() for fname in self.local.list_json_files("pending"): rid = fname.replace(".json", "") self._forwarded_pending.add(rid) if self._forwarded_pending: logger.info(f"[COLLECTOR] skipping {len(self._forwarded_pending)} existing pending files") async def start(self): """Start the Collector polling loops.""" self._running = True logger.info(f"[COLLECTOR] started for project={self.project_name}") tasks = [ self._forward_pending_loop(), self._poll_responses_loop(), self._poll_commands_loop(), self._forward_chat_snapshots_loop(), self._forward_registrations_loop(), ] if self.event_queue: tasks.append(self._forward_events_loop()) await asyncio.gather(*tasks) async def stop(self): """Stop the Collector.""" self._running = False logger.info("[COLLECTOR] stopped") # ─── Forward local pending → Gateway ─── async def _forward_pending_loop(self): """Scan local pending/ and forward new requests to Gateway.""" while self._running: try: for fname in self.local.list_json_files("pending"): rid = fname.replace(".json", "") if rid in self._forwarded_pending: continue data = self.local.read_json("pending", fname) if data is None or data.get("status") != "pending": continue # Forward to Gateway self.remote.write_json("pending", fname, data) self._forwarded_pending.add(rid) logger.info(f"[COLLECTOR] → Gateway: pending {rid[:12]}") # Clean up stale forwarded tracking (keep last 200) if len(self._forwarded_pending) > 200: self._forwarded_pending = set(list(self._forwarded_pending)[-100:]) except Exception as e: logger.error(f"[COLLECTOR] forward_pending error: {e}") await asyncio.sleep(self._poll_interval) # ─── Poll Gateway responses → local ─── async def _poll_responses_loop(self): """Poll Gateway for responses and write them locally for Extension.""" while self._running: try: # Check each forwarded pending for a response for rid in list(self._forwarded_pending): data = self.remote.read_json("response", f"{rid}.json") if data is None or data.get("waiting"): continue # Write response locally for Extension to pick up self.local.write_json("response", f"{rid}.json", data) # Also delete local pending file (Extension expects this) self.local.delete_file("pending", f"{rid}.json") self._forwarded_pending.discard(rid) approved = data.get("approved", "?") logger.info(f"[COLLECTOR] ← Gateway: response {rid[:12]} approved={approved}") except Exception as e: logger.error(f"[COLLECTOR] poll_responses error: {e}") await asyncio.sleep(self._poll_interval) # ─── Poll Gateway commands → local ─── async def _poll_commands_loop(self): """Poll Gateway for commands and write them locally for Extension.""" while self._running: try: commands = self.remote.poll_commands(self.project_name) for cmd in commands: cmd_id = cmd.get("id", str(int(time.time() * 1000))) fname = f"{cmd_id}.json" self.local.write_json("commands", fname, cmd) logger.info(f"[COLLECTOR] ← Gateway: command {cmd.get('text', '?')[:30]}") except Exception as e: logger.error(f"[COLLECTOR] poll_commands error: {e}") await asyncio.sleep(self._poll_interval) # ─── Forward chat snapshots → Gateway ─── async def _forward_chat_snapshots_loop(self): """Forward chat_snapshots/ from Extension to Gateway.""" while self._running: try: snap_dir = self.local.bridge_dir / "chat_snapshots" if snap_dir.exists(): for f in snap_dir.glob("*.json"): try: data = json.loads(f.read_text(encoding="utf-8-sig")) project = data.get("project_name", self.project_name) content = data.get("content", "") if content: self.remote.send_chat(project, content) logger.info(f"[COLLECTOR] → Gateway: chat snapshot len={len(content)}") f.unlink() # Cleanup after forwarding except (json.JSONDecodeError, OSError) as e: logger.warning(f"[COLLECTOR] bad chat snapshot {f.name}: {e}") except Exception as e: logger.error(f"[COLLECTOR] forward_chat_snapshots error: {e}") await asyncio.sleep(self._poll_interval) # ─── Forward session registrations → Gateway ─── async def _forward_registrations_loop(self): """Forward register/ files from Extension to Gateway.""" forwarded_regs: set[str] = set() while self._running: try: register_dir = self.local.bridge_dir / "register" if register_dir.exists(): for f in register_dir.glob("*.json"): if f.name in forwarded_regs: continue try: data = json.loads(f.read_text(encoding="utf-8-sig")) conv_id = data.get("conversation_id", "") project = data.get("project_name", "") if conv_id and project: self.remote.register_session(conv_id, project) forwarded_regs.add(f.name) logger.info(f"[COLLECTOR] → Gateway: register {conv_id[:8]} → {project}") except (json.JSONDecodeError, OSError) as e: logger.warning(f"[COLLECTOR] bad register {f.name}: {e}") except Exception as e: logger.error(f"[COLLECTOR] forward_registrations error: {e}") await asyncio.sleep(self._poll_interval * 3) # Less frequent # ─── Forward brain events → Gateway ─── async def _forward_events_loop(self): """Read BrainEvents from Watcher queue and POST to Gateway.""" while self._running: try: event: BrainEvent = await asyncio.wait_for( self.event_queue.get(), timeout=5.0 ) # Serialize event to JSON event_data = { "event_type": event.event_type.value, "conversation_id": event.conversation_id, "file_name": event.file_name, "file_path": str(event.file_path) if event.file_path else "", "content": event.content, "timestamp": event.timestamp, } self.remote._request("POST", "/api/event", event_data) logger.info(f"[COLLECTOR] → Gateway: event {event.event_type.value} {event.file_name}") except asyncio.TimeoutError: continue except Exception as e: logger.error(f"[COLLECTOR] forward_event error: {e}")