fix: cross-project event flooding + pending accumulation + diff_review brain exclusion
Phase 1: Collector auto-cleanup of auto_resolved/expired pending files after Gateway forwarding Phase 2: Watcher project filter (only MY sessions emit events) + Collector event forward filter Phase 3: Extension diff_review excludes brain/ artifact files (task.md, implementation_plan.md)
This commit is contained in:
62
collector.py
62
collector.py
@@ -172,9 +172,10 @@ class CollectorBridge:
|
||||
continue # No change
|
||||
|
||||
is_new = rid not in self._forwarded_pending
|
||||
status = data.get("status", "pending")
|
||||
|
||||
if rid in self._startup_pending:
|
||||
# Startup files: only forward status CHANGES (not re-forward as new pending)
|
||||
status = data.get("status", "pending")
|
||||
if status == "pending":
|
||||
continue # Still pending from before startup — skip
|
||||
# Status changed (auto_resolved/expired) — forward the update
|
||||
@@ -188,9 +189,36 @@ class CollectorBridge:
|
||||
if is_new:
|
||||
logger.info(f"[COLLECTOR] → Gateway: pending {rid[:12]}")
|
||||
else:
|
||||
status = data.get("status", "?")
|
||||
logger.info(f"[COLLECTOR] → Gateway: pending UPDATE {rid[:12]} status={status}")
|
||||
|
||||
# ── Phase 1 FIX: delete local auto_resolved/expired after forwarding ──
|
||||
if status in ("auto_resolved", "expired"):
|
||||
try:
|
||||
fpath.unlink()
|
||||
current_files.discard(rid)
|
||||
self._forwarded_pending.discard(rid)
|
||||
self._pending_hashes.pop(rid, None)
|
||||
self._pending_mtimes.pop(rid, None)
|
||||
logger.info(f"[COLLECTOR] 🗑 deleted local {status} pending: {rid[:12]}")
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
# ── Periodic stale cleanup: delete pending files > 10 min old ──
|
||||
now_ts = time.time()
|
||||
for rid in list(current_files):
|
||||
if rid in self._forwarded_pending:
|
||||
forwarded_at = self._forwarded_timestamps.get(rid, 0)
|
||||
if now_ts - forwarded_at > 600: # 10 min since forwarding
|
||||
try:
|
||||
stale_path = self.local.bridge_dir / "pending" / f"{rid}.json"
|
||||
stale_path.unlink()
|
||||
self._forwarded_pending.discard(rid)
|
||||
self._pending_hashes.pop(rid, None)
|
||||
self._pending_mtimes.pop(rid, None)
|
||||
logger.info(f"[COLLECTOR] 🗑 stale cleanup (>10min): {rid[:12]}")
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
# Clean up tracking for deleted files
|
||||
for rid in list(self._forwarded_pending):
|
||||
if rid not in current_files and rid not in self._startup_pending:
|
||||
@@ -301,6 +329,22 @@ class CollectorBridge:
|
||||
self._projects_cache_ts = now
|
||||
return projects
|
||||
|
||||
def _get_session_project(self, conversation_id: str) -> str | None:
|
||||
"""Look up which project a session belongs to from register/ files.
|
||||
|
||||
Returns project_name if found, None if unknown (allow forwarding).
|
||||
Uses _discover_local_projects cache timing to avoid redundant I/O.
|
||||
"""
|
||||
register_dir = self.local.bridge_dir / "register"
|
||||
reg_file = register_dir / f"{conversation_id}.json"
|
||||
if reg_file.exists():
|
||||
try:
|
||||
data = json.loads(reg_file.read_text(encoding="utf-8-sig"))
|
||||
return data.get("project_name", "")
|
||||
except (json.JSONDecodeError, OSError):
|
||||
pass
|
||||
return None # Unknown → allow (don't block unregistered sessions)
|
||||
|
||||
async def _poll_commands_loop(self):
|
||||
"""Poll Gateway for commands with adaptive per-project intervals.
|
||||
|
||||
@@ -414,12 +458,24 @@ class CollectorBridge:
|
||||
# ─── Forward brain events → Gateway ───
|
||||
|
||||
async def _forward_events_loop(self):
|
||||
"""Read BrainEvents from Watcher queue and POST to Gateway."""
|
||||
"""Read BrainEvents from Watcher queue and POST to Gateway.
|
||||
|
||||
Phase 2 FIX: Only forward events for sessions belonging to this project.
|
||||
Uses register/ files to determine session→project mapping.
|
||||
"""
|
||||
while self._running:
|
||||
try:
|
||||
event: BrainEvent = await asyncio.wait_for(
|
||||
self.event_queue.get(), timeout=5.0
|
||||
)
|
||||
|
||||
# ── Project filter: only forward events for MY project ──
|
||||
conv_id = event.conversation_id
|
||||
session_project = self._get_session_project(conv_id)
|
||||
if session_project and session_project != self.project_name:
|
||||
# Skip: this session belongs to another project
|
||||
continue
|
||||
|
||||
# Serialize event to JSON
|
||||
event_data = {
|
||||
"event_type": event.event_type.value,
|
||||
|
||||
Reference in New Issue
Block a user