From e3f8fb93f76f8a7c9ed9dcd01f97828f6f49178b Mon Sep 17 00:00:00 2001 From: Variet Worker Date: Mon, 16 Mar 2026 23:05:27 +0900 Subject: [PATCH] fix: cross-project event flooding + pending accumulation + diff_review brain exclusion Phase 1: Collector auto-cleanup of auto_resolved/expired pending files after Gateway forwarding Phase 2: Watcher project filter (only MY sessions emit events) + Collector event forward filter Phase 3: Extension diff_review excludes brain/ artifact files (task.md, implementation_plan.md) --- collector.py | 62 +++++++++++++++++++++++-- extension/src/extension.ts | 92 ++++++++++++++++++++++++-------------- watcher.py | 55 ++++++++++++++++++++++- 3 files changed, 172 insertions(+), 37 deletions(-) diff --git a/collector.py b/collector.py index f56b5b8..2bc67ff 100644 --- a/collector.py +++ b/collector.py @@ -172,9 +172,10 @@ class CollectorBridge: continue # No change is_new = rid not in self._forwarded_pending + status = data.get("status", "pending") + if rid in self._startup_pending: # Startup files: only forward status CHANGES (not re-forward as new pending) - status = data.get("status", "pending") if status == "pending": continue # Still pending from before startup — skip # Status changed (auto_resolved/expired) — forward the update @@ -188,9 +189,36 @@ class CollectorBridge: if is_new: logger.info(f"[COLLECTOR] → Gateway: pending {rid[:12]}") else: - status = data.get("status", "?") logger.info(f"[COLLECTOR] → Gateway: pending UPDATE {rid[:12]} status={status}") + # ── Phase 1 FIX: delete local auto_resolved/expired after forwarding ── + if status in ("auto_resolved", "expired"): + try: + fpath.unlink() + current_files.discard(rid) + self._forwarded_pending.discard(rid) + self._pending_hashes.pop(rid, None) + self._pending_mtimes.pop(rid, None) + logger.info(f"[COLLECTOR] 🗑 deleted local {status} pending: {rid[:12]}") + except OSError: + pass + + # ── Periodic stale cleanup: delete pending files > 10 min old ── + now_ts = time.time() + for rid in list(current_files): + if rid in self._forwarded_pending: + forwarded_at = self._forwarded_timestamps.get(rid, 0) + if now_ts - forwarded_at > 600: # 10 min since forwarding + try: + stale_path = self.local.bridge_dir / "pending" / f"{rid}.json" + stale_path.unlink() + self._forwarded_pending.discard(rid) + self._pending_hashes.pop(rid, None) + self._pending_mtimes.pop(rid, None) + logger.info(f"[COLLECTOR] 🗑 stale cleanup (>10min): {rid[:12]}") + except OSError: + pass + # Clean up tracking for deleted files for rid in list(self._forwarded_pending): if rid not in current_files and rid not in self._startup_pending: @@ -301,6 +329,22 @@ class CollectorBridge: self._projects_cache_ts = now return projects + def _get_session_project(self, conversation_id: str) -> str | None: + """Look up which project a session belongs to from register/ files. + + Returns project_name if found, None if unknown (allow forwarding). + Uses _discover_local_projects cache timing to avoid redundant I/O. + """ + register_dir = self.local.bridge_dir / "register" + reg_file = register_dir / f"{conversation_id}.json" + if reg_file.exists(): + try: + data = json.loads(reg_file.read_text(encoding="utf-8-sig")) + return data.get("project_name", "") + except (json.JSONDecodeError, OSError): + pass + return None # Unknown → allow (don't block unregistered sessions) + async def _poll_commands_loop(self): """Poll Gateway for commands with adaptive per-project intervals. @@ -414,12 +458,24 @@ class CollectorBridge: # ─── Forward brain events → Gateway ─── async def _forward_events_loop(self): - """Read BrainEvents from Watcher queue and POST to Gateway.""" + """Read BrainEvents from Watcher queue and POST to Gateway. + + Phase 2 FIX: Only forward events for sessions belonging to this project. + Uses register/ files to determine session→project mapping. + """ while self._running: try: event: BrainEvent = await asyncio.wait_for( self.event_queue.get(), timeout=5.0 ) + + # ── Project filter: only forward events for MY project ── + conv_id = event.conversation_id + session_project = self._get_session_project(conv_id) + if session_project and session_project != self.project_name: + # Skip: this session belongs to another project + continue + # Serialize event to JSON event_data = { "event_type": event.event_type.value, diff --git a/extension/src/extension.ts b/extension/src/extension.ts index 030aaab..10084fd 100644 --- a/extension/src/extension.ts +++ b/extension/src/extension.ts @@ -2407,39 +2407,65 @@ function setupMonitor() { // ── Diff review detection: if session just went IDLE and files were modified ── if (wasRunning && !isRunning && pendingModifiedFiles.length > 0) { - const fileList = pendingModifiedFiles.slice(0, 5).join(', '); - const fileCount = pendingModifiedFiles.length; - // Capture variables for delayed closure (poll loop may change them) - const capturedSessionId = activeSessionId; - const capturedStepCount = currentCount; - const capturedModFiles = pendingModifiedFilePaths.slice(0, 20); - const capturedEditSteps = pendingEditStepIndices.slice(0, 20); - logToFile(`[DIFF-REVIEW] IDLE with ${fileCount} modified files: ${fileList}`); - // Reset tracking arrays immediately (so next session starts fresh) - pendingModifiedFiles = []; - pendingModifiedFilePaths = []; - pendingEditStepIndices = []; - // Delay diff_review pending by 8s so AI response snapshot arrives - // on Discord before the approval buttons (snapshot scanner needs time - // to relay the response text to Discord ahead of the approval embed) - setTimeout(() => { - logToFile(`[DIFF-REVIEW] deferred pending creation (8s) for: ${fileList}`); - writeChatSnapshot(`📝 **코드 리뷰 대기**\n\n수정된 파일: ${fileList}\n\nAG에서 Accept all / Reject all로 확인해주세요.`); - writePendingApproval({ - conversation_id: capturedSessionId, - command: `코드 리뷰: ${fileList}`, - description: `${fileCount}개 파일이 수정되었습니다`, - step_type: 'diff_review', - step_index: capturedStepCount, - source: 'diff_review_detect', - buttons: [ - { text: 'Accept all', index: 0 }, - { text: 'Reject all', index: 1 }, - ], - modified_files: capturedModFiles, - edit_step_indices: capturedEditSteps, - }); - }, 8000); + // Phase 3 FIX: Filter out brain/ artifact files (task.md, implementation_plan.md etc.) + // These are AG internal artifacts, NOT code changes needing user review. + const brainPathSegment = '.gemini/antigravity/brain/'; + const codeOnlyFiles: string[] = []; + const codeOnlyPaths: string[] = []; + const codeOnlySteps: number[] = []; + for (let fi = 0; fi < pendingModifiedFilePaths.length; fi++) { + const normalized = pendingModifiedFilePaths[fi].replace(/\\/g, '/').toLowerCase(); + if (!normalized.includes(brainPathSegment)) { + codeOnlyFiles.push(pendingModifiedFiles[fi]); + codeOnlyPaths.push(pendingModifiedFilePaths[fi]); + if (fi < pendingEditStepIndices.length) { + codeOnlySteps.push(pendingEditStepIndices[fi]); + } + } else { + logToFile(`[DIFF-REVIEW] skip brain artifact: ${pendingModifiedFiles[fi]}`); + } + } + + if (codeOnlyFiles.length > 0) { + const fileList = codeOnlyFiles.slice(0, 5).join(', '); + const fileCount = codeOnlyFiles.length; + // Capture variables for delayed closure (poll loop may change them) + const capturedSessionId = activeSessionId; + const capturedStepCount = currentCount; + const capturedModFiles = codeOnlyPaths.slice(0, 20); + const capturedEditSteps = codeOnlySteps.slice(0, 20); + logToFile(`[DIFF-REVIEW] IDLE with ${fileCount} code files: ${fileList}`); + // Reset tracking arrays immediately (so next session starts fresh) + pendingModifiedFiles = []; + pendingModifiedFilePaths = []; + pendingEditStepIndices = []; + // Delay diff_review pending by 8s so AI response snapshot arrives + // on Discord before the approval buttons (snapshot scanner needs time + // to relay the response text to Discord ahead of the approval embed) + setTimeout(() => { + logToFile(`[DIFF-REVIEW] deferred pending creation (8s) for: ${fileList}`); + writeChatSnapshot(`📝 **코드 리뷰 대기**\n\n수정된 파일: ${fileList}\n\nAG에서 Accept all / Reject all로 확인해주세요.`); + writePendingApproval({ + conversation_id: capturedSessionId, + command: `코드 리뷰: ${fileList}`, + description: `${fileCount}개 파일이 수정되었습니다`, + step_type: 'diff_review', + step_index: capturedStepCount, + source: 'diff_review_detect', + buttons: [ + { text: 'Accept all', index: 0 }, + { text: 'Reject all', index: 1 }, + ], + modified_files: capturedModFiles, + edit_step_indices: capturedEditSteps, + }); + }, 8000); + } else { + logToFile(`[DIFF-REVIEW] all ${pendingModifiedFiles.length} modified files are brain artifacts — skip diff_review`); + pendingModifiedFiles = []; + pendingModifiedFilePaths = []; + pendingEditStepIndices = []; + } } wasRunning = isRunning; } catch (e: any) { diff --git a/watcher.py b/watcher.py index 88117a1..6e2bce3 100644 --- a/watcher.py +++ b/watcher.py @@ -40,7 +40,11 @@ class BrainEvent: class BrainEventHandler(FileSystemEventHandler): - """Watchdog handler that filters, debounces, and deduplicates brain events.""" + """Watchdog handler that filters, debounces, and deduplicates brain events. + + Phase 2 FIX: Only emits events for sessions belonging to the current project + (Config.PROJECT_NAME), using bridge/register/ files for session→project mapping. + """ def __init__(self, event_queue: asyncio.Queue, loop: asyncio.AbstractEventLoop): super().__init__() @@ -49,6 +53,10 @@ class BrainEventHandler(FileSystemEventHandler): self._last_events: dict[str, float] = {} # path -> timestamp (debounce) self._content_hashes: dict[str, str] = {} # path -> md5 hash (dedup) self._known_sessions: set[str] = set() + # Phase 2: project filter + self._session_project_map: dict[str, str] = {} # conv_id → project_name + self._project_map_ts: float = 0 # last load timestamp + self._PROJECT_MAP_TTL: float = 60.0 # reload every 60s self._initialize_known_sessions() def _initialize_known_sessions(self): @@ -77,6 +85,47 @@ class BrainEventHandler(FileSystemEventHandler): f"pre-loaded {hash_count} content hashes" ) + def _load_session_project_map(self) -> dict[str, str]: + """Load session→project mapping from bridge/register/ files (cached).""" + now = time.time() + if now - self._project_map_ts < self._PROJECT_MAP_TTL: + return self._session_project_map + + import json + register_dir = Config.BRAIN_PATH.parent / "bridge" / "register" + if not register_dir.exists(): + self._project_map_ts = now + return self._session_project_map + + new_map: dict[str, str] = {} + for f in register_dir.glob("*.json"): + try: + data = json.loads(f.read_text(encoding="utf-8-sig")) + conv_id = data.get("conversation_id", "") + project = data.get("project_name", "") + if conv_id and project: + new_map[conv_id] = project + except (json.JSONDecodeError, OSError): + pass + + self._session_project_map = new_map + self._project_map_ts = now + return self._session_project_map + + def _is_my_session(self, conv_id: str) -> bool: + """Check if a session belongs to the current project. + + Returns True for: + - Sessions registered to Config.PROJECT_NAME + - Unknown sessions (not in any register file — allow to avoid blocking) + Returns False for sessions registered to OTHER projects. + """ + session_map = self._load_session_project_map() + project = session_map.get(conv_id) + if project is None: + return True # Unknown → allow (newly started, not yet registered) + return project == Config.PROJECT_NAME + def dispatch(self, event: FileSystemEvent): """Early filter: skip events for files/dirs we don't care about. @@ -169,6 +218,10 @@ class BrainEventHandler(FileSystemEventHandler): if not conv_id: return + # Phase 2 FIX: only emit events for MY project's sessions + if not self._is_my_session(conv_id): + return + # Exclude files in .system_generated subdirectory (AG internal logs) try: relative = path.relative_to(Config.BRAIN_PATH / conv_id)