diff --git a/collector.py b/collector.py index 091d16c..a3bdb77 100644 --- a/collector.py +++ b/collector.py @@ -13,6 +13,7 @@ Flow: import asyncio import hashlib import json +import os import time import logging from pathlib import Path @@ -39,7 +40,7 @@ class CollectorBridge: self.remote = remote self.project_name = project_name self.event_queue = event_queue - self._poll_interval = 3 # seconds + self._poll_interval = 5 # seconds (was 3 — reduced I/O frequency) self._running = False # Pre-populate with existing pending files → skip on startup (prevents 만료됨 spam) @@ -47,7 +48,13 @@ class CollectorBridge: self._forwarded_pending: set[str] = set() self._forwarded_timestamps: dict[str, float] = {} # rid → when forwarded self._pending_hashes: dict[str, str] = {} # rid → content hash (for MERGE/status detection) + self._pending_mtimes: dict[str, float] = {} # rid → last known file mtime self._RESPONSE_POLL_TTL = 300 # 5 min — stop polling responses for old pending + + # Project discovery cache (avoid re-reading register/ every cycle) + self._cached_projects: set[str] | None = None + self._projects_cache_ts: float = 0 + self._PROJECTS_CACHE_TTL = 60.0 # seconds for fname in self.local.list_json_files("pending"): rid = fname.replace(".json", "") self._startup_pending.add(rid) @@ -58,9 +65,33 @@ class CollectorBridge: self._pending_hashes[rid] = hashlib.md5( json.dumps(data, sort_keys=True).encode() ).hexdigest() + # Pre-cache mtime + try: + fpath = self.local.bridge_dir / "pending" / fname + self._pending_mtimes[rid] = fpath.stat().st_mtime + except OSError: + pass if self._startup_pending: logger.info(f"[COLLECTOR] skipping {len(self._startup_pending)} existing pending files") + # Startup cleanup: remove stale response files (> 5 min) + self._cleanup_stale_responses() + + def _cleanup_stale_responses(self, max_age: int = 300): + """Remove stale response files (> max_age seconds) on startup.""" + now = time.time() + cleaned = 0 + for fname in self.local.list_json_files("response"): + try: + fpath = self.local.bridge_dir / "response" / fname + if now - fpath.stat().st_mtime > max_age: + self.local.delete_file("response", fname) + cleaned += 1 + except OSError: + pass + if cleaned: + logger.info(f"[COLLECTOR] startup cleanup: removed {cleaned} stale response files") + async def start(self): """Start the Collector polling loops with staggered offsets. @@ -115,6 +146,17 @@ class CollectorBridge: rid = fname.replace(".json", "") current_files.add(rid) + # mtime pre-check: skip read+hash if file hasn't been modified + try: + fpath = self.local.bridge_dir / "pending" / fname + current_mtime = fpath.stat().st_mtime + except OSError: + continue + prev_mtime = self._pending_mtimes.get(rid) + if prev_mtime is not None and current_mtime == prev_mtime: + continue # File untouched since last check — skip read+hash + self._pending_mtimes[rid] = current_mtime + data = self.local.read_json("pending", fname) if data is None: continue @@ -153,6 +195,12 @@ class CollectorBridge: if rid not in current_files and rid not in self._startup_pending: self._forwarded_pending.discard(rid) self._pending_hashes.pop(rid, None) + self._pending_mtimes.pop(rid, None) + # Also clean up orphaned hashes/mtimes for files no longer on disk + for rid in list(self._pending_hashes): + if rid not in current_files: + self._pending_hashes.pop(rid, None) + self._pending_mtimes.pop(rid, None) except Exception as e: logger.error(f"[COLLECTOR] forward_pending error: {e}") @@ -183,7 +231,10 @@ class CollectorBridge: for rid in expired: self._forwarded_pending.discard(rid) self._forwarded_timestamps.pop(rid, None) - self._pending_hashes.pop(rid, None) + # NOTE: intentionally keep _pending_hashes[rid] to prevent + # re-forward cycle (expired pending would be re-detected as + # "new" if hash is cleared). Hash is cleaned up when file + # is actually deleted from disk (see _forward_pending_loop). if expired: logger.info(f"[COLLECTOR] expired {len(expired)} stale forwarded pending (>{self._RESPONSE_POLL_TTL}s)") @@ -223,10 +274,19 @@ class CollectorBridge: Reads bridge/register/*.json files, which are written by each AG window's Extension with {conversation_id, project_name}. Returns unique project names found, always including self.project_name as a fallback. + + Results are cached for _PROJECTS_CACHE_TTL seconds to avoid re-reading + 22+ register files every polling cycle. """ + now = time.time() + if self._cached_projects is not None and now - self._projects_cache_ts < self._PROJECTS_CACHE_TTL: + return self._cached_projects + projects = {self.project_name} register_dir = self.local.bridge_dir / "register" if not register_dir.exists(): + self._cached_projects = projects + self._projects_cache_ts = now return projects for f in register_dir.glob("*.json"): try: @@ -236,6 +296,8 @@ class CollectorBridge: projects.add(p) except (json.JSONDecodeError, OSError): pass + self._cached_projects = projects + self._projects_cache_ts = now return projects async def _poll_commands_loop(self): @@ -319,7 +381,7 @@ class CollectorBridge: except Exception as e: logger.error(f"[COLLECTOR] forward_chat_snapshots error: {e}") - await asyncio.sleep(self._poll_interval) + await asyncio.sleep(10) # Chat snapshots: less urgent, 10s interval # ─── Forward session registrations → Gateway ─── @@ -341,12 +403,13 @@ class CollectorBridge: await self.remote.aregister_session(conv_id, project) forwarded_regs.add(f.name) logger.info(f"[COLLECTOR] → Gateway: register {conv_id[:8]} → {project}") + await asyncio.sleep(0.3) # Spread startup burst except (json.JSONDecodeError, OSError) as e: logger.warning(f"[COLLECTOR] bad register {f.name}: {e}") except Exception as e: logger.error(f"[COLLECTOR] forward_registrations error: {e}") - await asyncio.sleep(self._poll_interval * 3) # Less frequent + await asyncio.sleep(30) # Registration changes rarely — 30s interval # ─── Forward brain events → Gateway ─── async def _forward_events_loop(self): @@ -394,4 +457,4 @@ class CollectorBridge: await self.remote.flush_retry_queue() except Exception as e: logger.error(f"[COLLECTOR] retry flush error: {e}") - await asyncio.sleep(10) + await asyncio.sleep(30) # Retry flush: 30s interval (was 10s) diff --git a/watcher.py b/watcher.py index 71d408e..88117a1 100644 --- a/watcher.py +++ b/watcher.py @@ -77,6 +77,27 @@ class BrainEventHandler(FileSystemEventHandler): f"pre-loaded {hash_count} content hashes" ) + def dispatch(self, event: FileSystemEvent): + """Early filter: skip events for files/dirs we don't care about. + + This runs BEFORE on_created/on_modified, avoiding unnecessary + method dispatch overhead for the majority of file events. + """ + path = Path(event.src_path) + + # Skip .system_generated and logs subdirectories immediately + path_parts = path.parts + if '.system_generated' in path_parts or 'logs' in path_parts: + return + + # For file events, skip non-watched files immediately + if not event.is_directory: + file_name = path.name + if not self._is_watched_file(file_name): + return + + super().dispatch(event) + def _is_conversation_id(self, name: str) -> bool: parts = name.split("-") return len(parts) == 5 and all(len(p) >= 4 for p in parts)