"""Collector — local relay between Extension (file-based) and Gateway (HTTP). The Collector runs on the local PC alongside the AG IDE. It bridges the gap between the Extension (which writes to local bridge/ files) and the remote Gateway (which manages Discord). Flow: Extension → bridge/pending/ → Collector → POST Gateway /api/pending Gateway /api/response/{rid} → Collector → bridge/response/ → Extension Gateway /api/commands/{project} → Collector → bridge/commands/ → Extension """ import asyncio import hashlib import json import os import time import logging import uuid from pathlib import Path from bridge import LocalTransport, RemoteTransport from config import Config from watcher import BrainEvent, EventType logger = logging.getLogger(__name__) class CollectorBridge: """Bridges local file-based bridge with remote Gateway API. Periodically: 1. Scans local pending/ → forwards new ones to Gateway 2. Polls Gateway for responses → writes to local response/ 3. Polls Gateway for commands → writes to local commands/ """ def __init__(self, local: LocalTransport, remote: RemoteTransport, project_name: str, event_queue: asyncio.Queue | None = None): self.local = local self.remote = remote self.project_name = project_name self.event_queue = event_queue self._poll_interval = 5 # seconds (was 3 — reduced I/O frequency) self._running = False # Pre-populate with existing pending files → skip on startup (prevents 만료됨 spam) self._startup_pending: set[str] = set() self._forwarded_pending: set[str] = set() self._forwarded_timestamps: dict[str, float] = {} # rid → when forwarded self._pending_hashes: dict[str, str] = {} # rid → content hash (for MERGE/status detection) self._pending_mtimes: dict[str, float] = {} # rid → last known file mtime self._RESPONSE_POLL_TTL = 300 # 5 min — stop polling responses for old pending # Project discovery cache (avoid re-reading register/ every cycle) self._cached_projects: set[str] | None = None self._projects_cache_ts: float = 0 self._PROJECTS_CACHE_TTL = 60.0 # seconds for fname in self.local.list_json_files("pending"): rid = fname.replace(".json", "") self._startup_pending.add(rid) self._forwarded_pending.add(rid) # Pre-hash existing files data = self.local.read_json("pending", fname) if data: self._pending_hashes[rid] = hashlib.md5( json.dumps(data, sort_keys=True).encode() ).hexdigest() # Pre-cache mtime try: fpath = self.local.bridge_dir / "pending" / fname self._pending_mtimes[rid] = fpath.stat().st_mtime except OSError: pass if self._startup_pending: logger.info(f"[COLLECTOR] skipping {len(self._startup_pending)} existing pending files") # Startup cleanup: remove stale response files (> 5 min) self._cleanup_stale_responses() def _cleanup_stale_responses(self, max_age: int = 300): """Remove stale response files (> max_age seconds) on startup.""" now = time.time() cleaned = 0 for fname in self.local.list_json_files("response"): try: fpath = self.local.bridge_dir / "response" / fname if now - fpath.stat().st_mtime > max_age: self.local.delete_file("response", fname) cleaned += 1 except OSError: pass if cleaned: logger.info(f"[COLLECTOR] startup cleanup: removed {cleaned} stale response files") async def start(self): """Start the Collector polling loops with staggered offsets. Each loop starts with a different delay to prevent all loops from waking up at the same time and causing burst requests to Gateway. """ self._running = True logger.info(f"[COLLECTOR] started for project={self.project_name}") async def _staggered(coro, offset: float): await asyncio.sleep(offset) await coro() tasks = [ _staggered(self._forward_pending_loop, 0.0), _staggered(self._poll_responses_loop, 0.5), _staggered(self._poll_commands_loop, 1.0), _staggered(self._forward_chat_snapshots_loop, 1.5), _staggered(self._forward_registrations_loop, 2.0), _staggered(self._health_check_loop, 2.5), _staggered(self._retry_flush_loop, 3.0), ] if self.event_queue: tasks.append(_staggered(self._forward_events_loop, 3.5)) await asyncio.gather(*tasks) async def stop(self): """Stop the Collector and close HTTP session.""" self._running = False await self.remote.close() logger.info("[COLLECTOR] stopped") # ─── Forward local pending → Gateway ─── async def _forward_pending_loop(self): """Scan local pending/ and forward new + updated requests to Gateway. Tracks content hashes to detect: - New pending files → forward immediately - MERGE updates (step_probe updates command text) → re-forward - Status changes (auto_resolved, expired) → re-forward """ while self._running: try: # Skip cycle if rate-limited if self.remote.is_rate_limited: await asyncio.sleep(self._poll_interval) continue current_files = set() for fname in self.local.list_json_files("pending"): rid = fname.replace(".json", "") current_files.add(rid) # mtime pre-check: skip read+hash if file hasn't been modified try: fpath = self.local.bridge_dir / "pending" / fname current_mtime = fpath.stat().st_mtime except OSError: continue prev_mtime = self._pending_mtimes.get(rid) if prev_mtime is not None and current_mtime == prev_mtime: continue # File untouched since last check — skip read+hash self._pending_mtimes[rid] = current_mtime data = self.local.read_json("pending", fname) if data is None: continue # Compute content hash to detect changes content_hash = hashlib.md5( json.dumps(data, sort_keys=True).encode() ).hexdigest() prev_hash = self._pending_hashes.get(rid) if prev_hash == content_hash: continue # No change is_new = rid not in self._forwarded_pending status = data.get("status", "pending") if rid in self._startup_pending: # Startup files: only forward status CHANGES (not re-forward as new pending) if status == "pending": continue # Still pending from before startup — skip # Status changed (auto_resolved/expired) — forward the update # Forward to Gateway (new or updated) await self.remote.awrite_json("pending", fname, data) self._forwarded_pending.add(rid) self._forwarded_timestamps[rid] = time.time() self._pending_hashes[rid] = content_hash if is_new: logger.info(f"[COLLECTOR] → Gateway: pending {rid[:12]}") else: logger.info(f"[COLLECTOR] → Gateway: pending UPDATE {rid[:12]} status={status}") # ── Phase 1 FIX: delete local auto_resolved/expired after forwarding ── if status in ("auto_resolved", "expired"): try: fpath.unlink() current_files.discard(rid) self._forwarded_pending.discard(rid) self._pending_hashes.pop(rid, None) self._pending_mtimes.pop(rid, None) logger.info(f"[COLLECTOR] 🗑 deleted local {status} pending: {rid[:12]}") except OSError: pass # ── Periodic stale cleanup: delete pending files > 10 min old ── now_ts = time.time() for rid in list(current_files): if rid in self._forwarded_pending: forwarded_at = self._forwarded_timestamps.get(rid, 0) if now_ts - forwarded_at > 600: # 10 min since forwarding try: stale_path = self.local.bridge_dir / "pending" / f"{rid}.json" stale_path.unlink() self._forwarded_pending.discard(rid) self._pending_hashes.pop(rid, None) self._pending_mtimes.pop(rid, None) logger.info(f"[COLLECTOR] 🗑 stale cleanup (>10min): {rid[:12]}") except OSError: pass # Clean up tracking for deleted files for rid in list(self._forwarded_pending): if rid not in current_files and rid not in self._startup_pending: self._forwarded_pending.discard(rid) self._pending_hashes.pop(rid, None) self._pending_mtimes.pop(rid, None) # Also clean up orphaned hashes/mtimes for files no longer on disk for rid in list(self._pending_hashes): if rid not in current_files: self._pending_hashes.pop(rid, None) self._pending_mtimes.pop(rid, None) except Exception as e: logger.error(f"[COLLECTOR] forward_pending error: {e}") await asyncio.sleep(self._poll_interval) # ─── Poll Gateway responses → local ─── async def _poll_responses_loop(self): """Poll Gateway for responses and write them locally for Extension. Only polls responses for recently-forwarded pending (within _RESPONSE_POLL_TTL). Expired entries are removed from tracking to prevent request accumulation. """ while self._running: try: # Skip cycle if rate-limited if self.remote.is_rate_limited: await asyncio.sleep(self._poll_interval) continue now = time.time() # Clean up expired forwarded pending (stop polling responses for old ones) expired = [ rid for rid, ts in self._forwarded_timestamps.items() if now - ts > self._RESPONSE_POLL_TTL ] for rid in expired: self._forwarded_pending.discard(rid) self._forwarded_timestamps.pop(rid, None) # NOTE: intentionally keep _pending_hashes[rid] to prevent # re-forward cycle (expired pending would be re-detected as # "new" if hash is cleared). Hash is cleaned up when file # is actually deleted from disk (see _forward_pending_loop). if expired: logger.info(f"[COLLECTOR] expired {len(expired)} stale forwarded pending (>{self._RESPONSE_POLL_TTL}s)") # Check each active forwarded pending for a response active_rids = [ rid for rid in self._forwarded_pending if rid not in self._startup_pending ] for rid in active_rids: # Rate-limit guard: stop polling if we got rate-limited mid-cycle if self.remote.is_rate_limited: break data = await self.remote.aread_json("response", f"{rid}.json") if data is None or data.get("waiting"): await asyncio.sleep(0.3) # Throttle between individual response polls continue # Write response locally for Extension to pick up self.local.write_json("response", f"{rid}.json", data) # Also delete local pending file (Extension expects this) self.local.delete_file("pending", f"{rid}.json") self._forwarded_pending.discard(rid) self._forwarded_timestamps.pop(rid, None) approved = data.get("approved", "?") logger.info(f"[COLLECTOR] ← Gateway: response {rid[:12]} approved={approved}") except Exception as e: logger.error(f"[COLLECTOR] poll_responses error: {e}") await asyncio.sleep(self._poll_interval) # ─── Poll Gateway commands → local ─── def _discover_local_projects(self) -> set[str]: """Discover all project names registered by local Extension instances. Reads bridge/register/*.json files, which are written by each AG window's Extension with {conversation_id, project_name}. Returns unique project names found, always including self.project_name as a fallback. Results are cached for _PROJECTS_CACHE_TTL seconds to avoid re-reading 22+ register files every polling cycle. """ now = time.time() if self._cached_projects is not None and now - self._projects_cache_ts < self._PROJECTS_CACHE_TTL: return self._cached_projects projects = {self.project_name} register_dir = self.local.bridge_dir / "register" if not register_dir.exists(): self._cached_projects = projects self._projects_cache_ts = now return projects for f in register_dir.glob("*.json"): try: data = json.loads(f.read_text(encoding="utf-8-sig")) p = data.get("project_name", "") if p: projects.add(p) except (json.JSONDecodeError, OSError): pass self._cached_projects = projects self._projects_cache_ts = now return projects def _get_session_project(self, conversation_id: str) -> str | None: """Look up which project a session belongs to from register/ files. Returns project_name if found, None if unknown (allow forwarding). Uses _discover_local_projects cache timing to avoid redundant I/O. """ register_dir = self.local.bridge_dir / "register" reg_file = register_dir / f"{conversation_id}.json" if reg_file.exists(): try: data = json.loads(reg_file.read_text(encoding="utf-8-sig")) return data.get("project_name", "") except (json.JSONDecodeError, OSError): pass return None # Unknown → allow (don't block unregistered sessions) async def _poll_commands_loop(self): """Poll Gateway for commands with adaptive per-project intervals. When a project returns empty commands repeatedly, its poll interval increases (3s → 10s → 30s → 60s). On receiving a command, interval resets to base. This prevents idle projects from wasting requests. """ # Per-project adaptive state project_intervals: dict[str, float] = {} # project → current interval project_last_poll: dict[str, float] = {} # project → last poll timestamp _BASE_INTERVAL = 3.0 _IDLE_STEPS = [10.0, 30.0, 60.0] # progressive idle intervals project_empty_streak: dict[str, int] = {} # project → consecutive empty polls while self._running: try: # Skip cycle if rate-limited if not self.remote.is_rate_limited: projects = self._discover_local_projects() now = time.time() for project in projects: if self.remote.is_rate_limited: break # Check if this project's interval has elapsed interval = project_intervals.get(project, _BASE_INTERVAL) last = project_last_poll.get(project, 0) if now - last < interval: continue # Not time yet for this project project_last_poll[project] = now commands = await self.remote.apoll_commands(project) if commands: # Got commands → reset to base interval project_intervals[project] = _BASE_INTERVAL project_empty_streak[project] = 0 for cmd in commands: cmd_id = cmd.get("id", f"{int(time.time() * 1000)}_{uuid.uuid4().hex[:8]}") fname = f"{cmd_id}.json" self.local.write_json("commands", fname, cmd) logger.info(f"[COLLECTOR] ← Gateway: command [{project}] {cmd.get('text', '?')[:30]}") else: # Empty → increase interval progressively streak = project_empty_streak.get(project, 0) + 1 project_empty_streak[project] = streak if streak <= len(_IDLE_STEPS): project_intervals[project] = _IDLE_STEPS[streak - 1] # else stays at max (60s) await asyncio.sleep(0.3) # Throttle between projects except Exception as e: logger.error(f"[COLLECTOR] poll_commands error: {e}") await asyncio.sleep(self._poll_interval) # ─── Forward chat snapshots → Gateway ─── async def _forward_chat_snapshots_loop(self): """Forward chat_snapshots/ from Extension to Gateway.""" while self._running: try: snap_dir = self.local.bridge_dir / "chat_snapshots" if snap_dir.exists(): for f in snap_dir.glob("*.json"): try: data = json.loads(f.read_text(encoding="utf-8-sig")) project = data.get("project_name", self.project_name) content = data.get("content", "") attached_files = data.get("attached_files", []) if content or attached_files: await self.remote.asend_chat(project, content, attached_files=attached_files) af_info = f" +{len(attached_files)} files" if attached_files else "" logger.info(f"[COLLECTOR] → Gateway: chat snapshot len={len(content)}{af_info}") f.unlink() # Cleanup after forwarding except (json.JSONDecodeError, OSError) as e: logger.warning(f"[COLLECTOR] bad chat snapshot {f.name}: {e}") except Exception as e: logger.error(f"[COLLECTOR] forward_chat_snapshots error: {e}") await asyncio.sleep(10) # Chat snapshots: less urgent, 10s interval # ─── Forward session registrations → Gateway ─── async def _forward_registrations_loop(self): """Forward register/ files from Extension to Gateway.""" forwarded_regs: set[str] = set() while self._running: try: register_dir = self.local.bridge_dir / "register" if register_dir.exists(): for f in register_dir.glob("*.json"): if f.name in forwarded_regs: continue try: data = json.loads(f.read_text(encoding="utf-8-sig")) conv_id = data.get("conversation_id", "") project = data.get("project_name", "") if conv_id and project: await self.remote.aregister_session(conv_id, project) forwarded_regs.add(f.name) logger.info(f"[COLLECTOR] → Gateway: register {conv_id[:8]} → {project}") await asyncio.sleep(0.3) # Spread startup burst except (json.JSONDecodeError, OSError) as e: logger.warning(f"[COLLECTOR] bad register {f.name}: {e}") except Exception as e: logger.error(f"[COLLECTOR] forward_registrations error: {e}") await asyncio.sleep(30) # Registration changes rarely — 30s interval # ─── Forward brain events → Gateway ─── async def _forward_events_loop(self): """Read BrainEvents from Watcher queue and POST to Gateway. Phase 2 FIX: Only forward events for sessions belonging to this project. Uses register/ files to determine session→project mapping. """ while self._running: try: event: BrainEvent = await asyncio.wait_for( self.event_queue.get(), timeout=5.0 ) # ── Project filter: only forward events for MY project ── conv_id = event.conversation_id session_project = self._get_session_project(conv_id) if session_project and session_project != self.project_name: # Skip: this session belongs to another project continue # Serialize event to JSON event_data = { "event_type": event.event_type.value, "conversation_id": event.conversation_id, "file_name": event.file_name, "file_path": str(event.file_path) if event.file_path else "", "content": event.content, "timestamp": event.timestamp, } await self.remote.asend_event(event_data) logger.info(f"[COLLECTOR] → Gateway: event {event.event_type.value} {event.file_name}") except asyncio.TimeoutError: continue except Exception as e: logger.error(f"[COLLECTOR] forward_event error: {e}") # ─── Health check ─── async def _health_check_loop(self): """Periodically check Gateway connectivity.""" while self._running: try: ok = await self.remote.health_check() if not ok and self.remote.connected: logger.warning("[COLLECTOR] ❌ Gateway health check failed") except Exception: pass await asyncio.sleep(30) # ─── Retry flush ─── async def _retry_flush_loop(self): """Periodically flush failed request retry queue.""" while self._running: try: await self.remote.flush_retry_queue() except Exception as e: logger.error(f"[COLLECTOR] retry flush error: {e}") await asyncio.sleep(30) # Retry flush: 30s interval (was 10s)