From e32be6b2f372e7b93e6af8c803abe7bb8f61bfbc Mon Sep 17 00:00:00 2001 From: CD Date: Sat, 7 Mar 2026 11:42:11 +0900 Subject: [PATCH] =?UTF-8?q?refactor:=20=EC=A0=84=EB=A9=B4=20=EC=9E=AC?= =?UTF-8?q?=EC=84=A4=EA=B3=84=20-=20=EC=8B=9C=EC=9E=91=20=EC=8B=9C=20?= =?UTF-8?q?=EC=B1=84=EB=84=90=20=EC=8A=A4=ED=8C=B8=20=EC=A0=9C=EA=B1=B0,?= =?UTF-8?q?=20content=20hash=20=EC=A4=91=EB=B3=B5=20=EB=B0=A9=EC=A7=80,=20?= =?UTF-8?q?=EB=8B=A8=EC=9D=BC=20=EC=9D=B4=EB=B2=A4=ED=8A=B8=20=EA=B2=BD?= =?UTF-8?q?=EB=A1=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bot.py | 405 ++++++++++++++++------------------------------------- config.py | 12 +- watcher.py | 50 ++++--- 3 files changed, 155 insertions(+), 312 deletions(-) diff --git a/bot.py b/bot.py index 43dbd31..df7c9a2 100644 --- a/bot.py +++ b/bot.py @@ -1,10 +1,9 @@ """Discord bot — relays Antigravity brain events to Discord channels. Dynamic channel management: -- Scans brain/ for active sessions on startup -- Creates `gravity-{project_name}` channels per active session -- Archives channels when sessions become inactive -- Project name extracted from artifact content or short conversation ID +- Creates `AG-{project_name}` channels only when file events arrive +- NO startup channel creation — only reconnects to existing Discord channels +- Archives channels after 10 minutes of inactivity """ import asyncio @@ -30,11 +29,13 @@ from bridge import BridgeProtocol, ApprovalRequest, UserResponse logger = logging.getLogger(__name__) +# ─── Discord UI Components ────────────────────────────────────────── + class ApprovalView(discord.ui.View): """Discord buttons for approving/rejecting Antigravity actions.""" def __init__(self, bridge: BridgeProtocol, request: ApprovalRequest): - super().__init__(timeout=300) # 5 min timeout + super().__init__(timeout=300) self.bridge = bridge self.request = request self.responded = False @@ -46,8 +47,7 @@ class ApprovalView(discord.ui.View): return self.responded = True self.bridge.write_response(UserResponse( - request_id=self.request.request_id, - approved=True, + request_id=self.request.request_id, approved=True, )) embed = interaction.message.embeds[0] if interaction.message.embeds else None if embed: @@ -62,8 +62,7 @@ class ApprovalView(discord.ui.View): return self.responded = True self.bridge.write_response(UserResponse( - request_id=self.request.request_id, - approved=False, + request_id=self.request.request_id, approved=False, )) embed = interaction.message.embeds[0] if interaction.message.embeds else None if embed: @@ -72,99 +71,46 @@ class ApprovalView(discord.ui.View): await interaction.response.edit_message(embed=embed, view=None) async def on_timeout(self): - """Auto-timeout after 5 minutes.""" if not self.responded: self.bridge.write_response(UserResponse( - request_id=self.request.request_id, - approved=False, + request_id=self.request.request_id, approved=False, )) +# ─── Project Name Detection ───────────────────────────────────────── + def detect_project_name(conv_dir: Path) -> str: - """Extract a human-readable project name from conversation artifacts. - - Strategy: - 1. Check task.md first heading for a title - 2. Check implementation_plan.md first heading - 3. Check any .metadata.json for summary keywords - 4. Fallback to short conversation ID - - Output format: lowercase_with_underscores (e.g. 'gravity_control') + """Extract project name from conversation artifacts. + Output: lowercase_with_underscores (e.g. 'gravity_control') """ short_id = conv_dir.name[:8] def _sanitize(raw: str) -> str: - """Convert raw title to Discord-friendly channel name part.""" - # Remove common suffixes for suffix in ["Task Tracker", "— Task Tracker", "태스크", "구현 계획"]: raw = raw.replace(suffix, "") raw = raw.strip(" —-–") - # Keep only alphanumeric, Korean, spaces, hyphens raw = re.sub(r'[^a-zA-Z0-9가-힣\s\-]', '', raw) - # Convert to underscore format raw = re.sub(r'[\s\-]+', '_', raw).strip('_').lower() return raw[:30] if raw else "" - # Try task.md title - task_file = conv_dir / "task.md" - if task_file.exists(): - try: - first_lines = task_file.read_text(encoding="utf-8").splitlines()[:5] - for line in first_lines: - match = re.match(r'^#\s+(.+)', line) - if match: - name = _sanitize(match.group(1)) - if name and name != "task": - return name - except (OSError, UnicodeDecodeError): - pass - - # Try implementation_plan.md title - plan_file = conv_dir / "implementation_plan.md" - if plan_file.exists(): - try: - first_lines = plan_file.read_text(encoding="utf-8").splitlines()[:5] - for line in first_lines: - match = re.match(r'^#\s+(.+)', line) - if match: - name = _sanitize(match.group(1)) - if name: - return name - except (OSError, UnicodeDecodeError): - pass - - # Try metadata summary (first 2 words) - for meta_file in conv_dir.glob("*.metadata.json"): - try: - meta = json.loads(meta_file.read_text(encoding="utf-8")) - summary = meta.get("summary", "") - words = summary.split()[:2] - if words: - name = "_".join(words).lower() - name = re.sub(r'[^a-z0-9가-힣_]', '', name)[:30] - if name: - return name - except (OSError, json.JSONDecodeError): - pass + for fname in ["task.md", "implementation_plan.md"]: + fpath = conv_dir / fname + if fpath.exists(): + try: + first_lines = fpath.read_text(encoding="utf-8").splitlines()[:5] + for line in first_lines: + match = re.match(r'^#\s+(.+)', line) + if match: + name = _sanitize(match.group(1)) + if name and name != "task": + return name + except (OSError, UnicodeDecodeError): + pass return short_id -def is_session_active(conv_dir: Path) -> bool: - """Check if a session is active based on file modification time.""" - now = time.time() - threshold = Config.ACTIVE_TIMEOUT_SECONDS - - for f in conv_dir.iterdir(): - if f.is_file(): - try: - mtime = f.stat().st_mtime - if now - mtime < threshold: - return True - except OSError: - continue - return False - +# ─── Bot ───────────────────────────────────────────────────────────── class GravityBot(commands.Bot): """Discord bot for Antigravity session monitoring.""" @@ -176,41 +122,28 @@ class GravityBot(commands.Bot): super().__init__(command_prefix="!", intents=intents) self.event_queue = event_queue - # conversation_id -> channel object self.session_channels: dict[str, discord.TextChannel] = {} - # conversation_id -> status message id (to edit in-place) self.session_status_messages: dict[str, int] = {} - # conversation_id -> project name self.session_names: dict[str, str] = {} - # Locks to prevent duplicate channel creation self._channel_locks: dict[str, asyncio.Lock] = {} - # Bridge protocol for bidirectional communication self.bridge = BridgeProtocol() - # Cache: conversation_id -> last metadata summary (skip unchanged) - self._last_meta_summary: dict[str, str] = {} - # Category for session channels self.session_category: discord.CategoryChannel | None = None - # Guild reference self.guild: discord.Guild | None = None async def setup_hook(self): - """Called after login, before processing events.""" self.loop.create_task(self._process_events()) - self.session_cleanup_loop.start() self.pending_approval_scanner.start() - logger.info("Bot setup complete, event processor + approval scanner started") + logger.info("Bot setup complete, event processor started") async def on_ready(self): - """Called when bot is ready.""" logger.info(f"Bot connected as {self.user} (ID: {self.user.id})") - # Get guild self.guild = self.get_guild(Config.DISCORD_GUILD_ID) if not self.guild: logger.error(f"Guild {Config.DISCORD_GUILD_ID} not found!") return - # Find or create session category + # Find or create category category_name = "Antigravity Sessions" self.session_category = discord.utils.get( self.guild.categories, name=category_name @@ -223,66 +156,83 @@ class GravityBot(commands.Bot): logger.error("No permission to create category!") return - # Sync existing active sessions - await self._sync_active_sessions() + # ONLY reconnect to existing Discord channels (NO new channel creation) + await self._reconnect_existing_channels() - async def _sync_active_sessions(self): - """Scan brain/ and create channels for currently active sessions.""" - brain_path = Config.BRAIN_PATH - if not brain_path.exists(): + async def _reconnect_existing_channels(self): + """Scan existing Discord channels and map them to conversation IDs. + Does NOT create any new channels.""" + if not self.session_category: return - active_count = 0 - for entry in brain_path.iterdir(): - if entry.is_dir() and self._is_conversation_id(entry.name): - if is_session_active(entry): - project_name = detect_project_name(entry) - await self._ensure_channel(entry.name, project_name) - active_count += 1 + count = 0 + for ch in self.session_category.text_channels: + if ch.topic and "Antigravity Session:" in ch.topic: + # Extract conversation ID from topic + conv_id = ch.topic.replace("Antigravity Session:", "").strip() + if conv_id: + self.session_channels[conv_id] = ch + # Recover last task embed + await self._recover_task_message(ch, conv_id) + count += 1 - logger.info(f"Synced {active_count} active sessions on startup") + logger.info(f"Reconnected to {count} existing channels") - def _is_conversation_id(self, name: str) -> bool: - """Check if directory name looks like a UUID.""" - parts = name.split("-") - return len(parts) == 5 and all(len(p) >= 4 for p in parts) + async def _recover_task_message( + self, channel: discord.TextChannel, conversation_id: str + ): + """Find last task embed in channel to reuse for editing.""" + if conversation_id in self.session_status_messages: + return + try: + async for msg in channel.history(limit=10): + if msg.author == self.user and msg.embeds: + embed = msg.embeds[0] + if embed.title and "Task" in embed.title: + self.session_status_messages[conversation_id] = msg.id + return + except (discord.Forbidden, discord.HTTPException): + pass + + # ─── Channel Management ────────────────────────────────────────── async def _ensure_channel( self, conversation_id: str, project_name: str ) -> discord.TextChannel: - """Get or create a Discord channel for a session (thread-safe).""" - # Fast path: already mapped + """Get or create a channel (thread-safe, single creation per session).""" if conversation_id in self.session_channels: - return self.session_channels[conversation_id] + ch = self.session_channels[conversation_id] + # Rename back from "closed-" if needed + expected = f"{Config.CHANNEL_PREFIX}-{project_name}".lower() + if ch.name.startswith("closed-") and ch.name != expected: + try: + await ch.edit(name=f"{Config.CHANNEL_PREFIX}-{project_name}") + except discord.errors.Forbidden: + pass + return ch - # Get or create a lock for this conversation + # Lock per conversation to prevent duplicates if conversation_id not in self._channel_locks: self._channel_locks[conversation_id] = asyncio.Lock() async with self._channel_locks[conversation_id]: - # Double-check after acquiring lock + # Double-check if conversation_id in self.session_channels: return self.session_channels[conversation_id] channel_name = f"{Config.CHANNEL_PREFIX}-{project_name}" - # Check if channel already exists in category (from previous run) + # Check if channel exists but wasn't mapped yet if self.session_category: for ch in self.session_category.text_channels: if ch.topic and conversation_id in ch.topic: self.session_channels[conversation_id] = ch self.session_names[conversation_id] = project_name - # Rename back from closed- if needed - expected_name = f"{Config.CHANNEL_PREFIX}-{project_name}" - if ch.name != expected_name.lower(): + if ch.name.startswith("closed-"): try: - await ch.edit(name=expected_name) - logger.info(f"Renamed channel {ch.name} -> {expected_name}") + await ch.edit(name=channel_name) except discord.errors.Forbidden: pass - # Recover last task embed message ID - await self._recover_task_message(ch, conversation_id) - logger.info(f"Reconnected to existing channel #{ch.name}") return ch # Create new channel @@ -296,13 +246,9 @@ class GravityBot(commands.Bot): self.session_names[conversation_id] = project_name logger.info(f"Created channel #{channel_name}") - # Welcome embed embed = discord.Embed( title=f"🚀 {project_name}", - description=( - f"Antigravity 세션 연결됨\n" - f"Session: `{conversation_id}`" - ), + description=f"Antigravity 세션 연결됨\nSession: `{conversation_id}`", color=discord.Color.blue(), timestamp=datetime.now(timezone.utc), ) @@ -313,26 +259,10 @@ class GravityBot(commands.Bot): logger.error(f"No permission to create channel: {channel_name}") return None - async def _recover_task_message( - self, channel: discord.TextChannel, conversation_id: str - ): - """Find the last task embed in channel history to reuse for editing.""" - if conversation_id in self.session_status_messages: - return # Already have it - - try: - async for msg in channel.history(limit=20): - if msg.author == self.user and msg.embeds: - embed = msg.embeds[0] - if embed.title and "Task 진행 현황" in embed.title: - self.session_status_messages[conversation_id] = msg.id - logger.info(f"Recovered task embed msg {msg.id} for {conversation_id[:8]}") - return - except (discord.Forbidden, discord.HTTPException): - pass + # ─── Event Processing (SINGLE ROUTE) ───────────────────────────── async def _process_events(self): - """Main event processing loop.""" + """Main event loop — ALL events go through here sequentially.""" await self.wait_until_ready() while not self.is_closed(): @@ -347,30 +277,30 @@ class GravityBot(commands.Bot): logger.error(f"Error processing event: {e}", exc_info=True) async def _handle_event(self, event: BrainEvent): - """Route brain events to handlers.""" + """Route brain events to handlers — single entry point.""" + conv_dir = Config.BRAIN_PATH / event.conversation_id + project_name = detect_project_name(conv_dir) + if event.event_type == EventType.SESSION_START: - conv_dir = Config.BRAIN_PATH / event.conversation_id - project_name = detect_project_name(conv_dir) await self._ensure_channel(event.conversation_id, project_name) + return - elif event.event_type in (EventType.FILE_CREATED, EventType.FILE_CHANGED): - # Ensure channel exists - conv_dir = Config.BRAIN_PATH / event.conversation_id - project_name = detect_project_name(conv_dir) - channel = await self._ensure_channel(event.conversation_id, project_name) + # FILE_CREATED or FILE_CHANGED + channel = await self._ensure_channel(event.conversation_id, project_name) + if not channel: + return - if channel: - if event.file_name == "task.md": - await self._send_task_update(channel, event) - elif event.file_name.endswith(".metadata.json"): - await self._send_metadata_update(channel, event) - else: - await self._send_artifact_content(channel, event) + if event.file_name == "task.md": + await self._send_task_update(channel, event) + else: + await self._send_artifact_update(channel, event) + + # ─── Message Senders ───────────────────────────────────────────── async def _send_task_update( self, channel: discord.TextChannel, event: BrainEvent ): - """Send task progress update as an embed.""" + """Send/edit task progress embed (SINGLE message, always edited).""" progress = parse_task_progress(event.content) embed = discord.Embed( @@ -381,10 +311,9 @@ class GravityBot(commands.Bot): else discord.Color.greyple(), timestamp=datetime.now(timezone.utc), ) - short_id = event.conversation_id[:8] - embed.set_footer(text=f"Session: {short_id}") + embed.set_footer(text=f"Session: {event.conversation_id[:8]}") - # Edit existing or send new + # Always try to edit existing message first msg_id = self.session_status_messages.get(event.conversation_id) if msg_id: try: @@ -397,10 +326,10 @@ class GravityBot(commands.Bot): msg = await channel.send(embed=embed) self.session_status_messages[event.conversation_id] = msg.id - async def _send_artifact_content( + async def _send_artifact_update( self, channel: discord.TextChannel, event: BrainEvent ): - """Send artifact change notification (compact — metadata handler sends summary).""" + """Send artifact update as single compact embed (preview only).""" labels = { "implementation_plan.md": "📐 구현 계획", "walkthrough.md": "📝 작업 결과 요약", @@ -408,15 +337,11 @@ class GravityBot(commands.Bot): label = labels.get(event.file_name, f"📄 {event.file_name}") event_label = "생성" if event.event_type == EventType.FILE_CREATED else "업데이트" - # Only send first few lines as preview instead of full content + # Preview: first 6 non-empty lines only lines = event.content.strip().splitlines() - preview_lines = [] - for line in lines[:8]: - if line.strip(): - preview_lines.append(line) - preview = "\n".join(preview_lines) - if len(lines) > 8: - preview += f"\n... (+{len(lines) - 8} lines)" + preview = "\n".join(l for l in lines[:6] if l.strip()) + if len(lines) > 6: + preview += f"\n... (+{len(lines) - 6} lines)" embed = discord.Embed( title=f"{label} ({event_label}됨)", @@ -426,108 +351,30 @@ class GravityBot(commands.Bot): ) await channel.send(embed=embed) - async def _send_metadata_update( - self, channel: discord.TextChannel, event: BrainEvent - ): - """Send artifact metadata summary changes as compact embed.""" - try: - meta = json.loads(event.content) - except json.JSONDecodeError: - return - - summary = meta.get("summary", "") - artifact_type = meta.get("artifactType", "") - - if not summary: - return - - # Dedup: skip if summary unchanged since last notification - cache_key = f"{event.conversation_id}:{event.file_name}" - if self._last_meta_summary.get(cache_key) == summary: - return - self._last_meta_summary[cache_key] = summary - - # Map artifact types to emoji - type_emoji = { - "ARTIFACT_TYPE_TASK": "📋", - "ARTIFACT_TYPE_IMPLEMENTATION_PLAN": "📐", - "ARTIFACT_TYPE_WALKTHROUGH": "📝", - "ARTIFACT_TYPE_OTHER": "📄", - } - emoji = type_emoji.get(artifact_type, "📄") - - # Artifact name from filename (strip .metadata.json) - artifact_name = event.file_name.replace(".metadata.json", "") - - embed = discord.Embed( - description=f"{emoji} **{artifact_name}** 업데이트\n\n{summary[:500]}", - color=discord.Color.dark_teal(), - timestamp=datetime.now(timezone.utc), - ) - - await channel.send(embed=embed) - - @tasks.loop(minutes=5) - async def session_cleanup_loop(self): - """Periodically check for inactive sessions and archive their channels.""" - if not self.guild: - return - - to_remove = [] - for conv_id, channel in self.session_channels.items(): - conv_dir = Config.BRAIN_PATH / conv_id - if not conv_dir.exists() or not is_session_active(conv_dir): - to_remove.append(conv_id) - - for conv_id in to_remove: - channel = self.session_channels.pop(conv_id, None) - self.session_status_messages.pop(conv_id, None) - name = self.session_names.pop(conv_id, conv_id[:8]) - - if channel: - try: - embed = discord.Embed( - title="🔴 세션 비활성", - description=f"`{name}` 세션이 비활성 상태입니다.", - color=discord.Color.red(), - timestamp=datetime.now(timezone.utc), - ) - await channel.send(embed=embed) - await channel.edit(name=f"closed-{name[:20]}") - logger.info(f"Archived channel for {conv_id[:8]}") - except discord.errors.Forbidden: - logger.warning(f"No permission to archive {conv_id[:8]}") - except Exception as e: - logger.warning(f"Failed to archive {conv_id[:8]}: {e}") - - @session_cleanup_loop.before_loop - async def before_cleanup(self): - await self.wait_until_ready() + # ─── Approval Scanner ──────────────────────────────────────────── @tasks.loop(seconds=3) async def pending_approval_scanner(self): - """Scan bridge/pending/ for new approval requests and send Discord buttons.""" + """Scan bridge/pending/ for new approval requests.""" try: requests = self.bridge.get_pending_requests() for req in requests: - # Find the right channel - if req.conversation_id in self.session_channels: - channel = self.session_channels[req.conversation_id] - elif req.discord_message_id == 0: - # Try to find channel by conversation_id + if req.discord_message_id != 0: + continue # Already sent + + channel = self.session_channels.get(req.conversation_id) + if not channel: conv_dir = Config.BRAIN_PATH / req.conversation_id if conv_dir.exists(): project_name = detect_project_name(conv_dir) - channel = await self._ensure_channel(req.conversation_id, project_name) - else: - continue - else: - continue + channel = await self._ensure_channel( + req.conversation_id, project_name + ) - if channel and req.discord_message_id == 0: + if channel: await self._send_approval_request(channel, req) except Exception as e: - logger.error(f"Error scanning pending approvals: {e}") + logger.error(f"Error scanning approvals: {e}") @pending_approval_scanner.before_loop async def before_scanner(self): @@ -536,7 +383,6 @@ class GravityBot(commands.Bot): async def _send_approval_request( self, channel: discord.TextChannel, request: ApprovalRequest ): - """Send an approval request with buttons to Discord.""" embed = discord.Embed( title="⚠️ 승인 요청", description=( @@ -558,25 +404,22 @@ class GravityBot(commands.Bot): data = json.loads(pending_file.read_text(encoding="utf-8")) data["discord_message_id"] = msg.id pending_file.write_text( - json.dumps(data, ensure_ascii=False, indent=2), - encoding="utf-8" + json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8" ) except (json.JSONDecodeError, OSError): pass logger.info(f"Sent approval request: {request.request_id[:8]}") + # ─── Discord → Antigravity Text Relay ───────────────────────────── + async def on_message(self, message: discord.Message): - """Handle user messages in AG channels → relay as text input.""" - # Ignore bot's own messages if message.author == self.user: return - # Check if message is in an AG session channel - if not message.channel.name.startswith(Config.CHANNEL_PREFIX + "-"): + if not message.channel.name.startswith(Config.CHANNEL_PREFIX.lower() + "-"): return - # Find conversation_id for this channel conv_id = None for cid, ch in self.session_channels.items(): if ch.id == message.channel.id: @@ -584,11 +427,7 @@ class GravityBot(commands.Bot): break if conv_id and message.content.strip(): - # Write user input to bridge commands - cmd_id = self.bridge.write_command(conv_id, message.content.strip()) + self.bridge.write_command(conv_id, message.content.strip()) await message.add_reaction("📨") - logger.info(f"User input relayed: {message.content[:50]}... → {conv_id[:8]}") - # Process commands (e.g., !approve, !reject) await self.process_commands(message) - diff --git a/config.py b/config.py index e5d7cb4..5f67ad0 100644 --- a/config.py +++ b/config.py @@ -21,24 +21,16 @@ class Config: os.path.expanduser("~/.gemini/antigravity/brain") )) - # Session activity detection - ACTIVE_TIMEOUT_SECONDS: int = int(os.getenv("ACTIVE_TIMEOUT_SECONDS", "300")) - # Watcher settings - DEBOUNCE_SECONDS: float = float(os.getenv("DEBOUNCE_SECONDS", "2")) + DEBOUNCE_SECONDS: float = float(os.getenv("DEBOUNCE_SECONDS", "5")) - # Files to monitor within each conversation directory + # Files to monitor within each conversation directory (PRIMARY ONLY) WATCHED_FILES: set = { "task.md", "implementation_plan.md", "walkthrough.md", } - # Also monitor these patterns (matched by suffix) - WATCHED_SUFFIXES: set = { - ".metadata.json", # artifact summary changes - } - # Discord message limits DISCORD_MSG_LIMIT: int = 2000 DISCORD_EMBED_DESC_LIMIT: int = 4096 diff --git a/watcher.py b/watcher.py index 29987eb..33bce29 100644 --- a/watcher.py +++ b/watcher.py @@ -2,9 +2,12 @@ Uses watchdog to detect file creation/modification events in the brain directory. Emits events to an asyncio queue for the Discord bot to consume. + +Key design: ONLY emits events for meaningful content changes using hash dedup. """ import asyncio +import hashlib import time import logging from pathlib import Path @@ -21,8 +24,7 @@ logger = logging.getLogger(__name__) class EventType(Enum): """Types of brain events.""" SESSION_START = "session_start" # New conversation directory created - SESSION_END = "session_end" # Conversation directory removed (or program exit) - FILE_CHANGED = "file_changed" # Watched file created/modified + FILE_CHANGED = "file_changed" # Watched file modified FILE_CREATED = "file_created" # Watched file first created @@ -38,18 +40,19 @@ class BrainEvent: class BrainEventHandler(FileSystemEventHandler): - """Watchdog handler that filters and debounces brain events.""" + """Watchdog handler that filters, debounces, and deduplicates brain events.""" def __init__(self, event_queue: asyncio.Queue, loop: asyncio.AbstractEventLoop): super().__init__() self.event_queue = event_queue self.loop = loop - self._last_events: dict[str, float] = {} # path -> timestamp (debounce) + self._last_events: dict[str, float] = {} # path -> timestamp (debounce) + self._content_hashes: dict[str, str] = {} # path -> md5 hash (dedup) self._known_sessions: set[str] = set() self._initialize_known_sessions() def _initialize_known_sessions(self): - """Scan existing brain directories to establish baseline.""" + """Scan existing brain directories to establish baseline (no events emitted).""" brain_path = Config.BRAIN_PATH if brain_path.exists(): for entry in brain_path.iterdir(): @@ -58,12 +61,10 @@ class BrainEventHandler(FileSystemEventHandler): logger.info(f"Found {len(self._known_sessions)} existing sessions at startup") def _is_conversation_id(self, name: str) -> bool: - """Check if directory name looks like a UUID conversation ID.""" parts = name.split("-") return len(parts) == 5 and all(len(p) >= 4 for p in parts) def _get_conversation_id(self, path: Path) -> str | None: - """Extract conversation ID from file path.""" brain_path = Config.BRAIN_PATH try: relative = path.relative_to(brain_path) @@ -75,7 +76,6 @@ class BrainEventHandler(FileSystemEventHandler): return None def _should_debounce(self, path_str: str) -> bool: - """Check if this event should be debounced.""" now = time.time() last = self._last_events.get(path_str, 0) if now - last < Config.DEBOUNCE_SECONDS: @@ -83,8 +83,20 @@ class BrainEventHandler(FileSystemEventHandler): self._last_events[path_str] = now return False + def _content_changed(self, path_str: str, content: str) -> bool: + """Check if content actually changed using MD5 hash.""" + new_hash = hashlib.md5(content.encode()).hexdigest() + old_hash = self._content_hashes.get(path_str) + if old_hash == new_hash: + return False + self._content_hashes[path_str] = new_hash + return True + + def _is_watched_file(self, file_name: str) -> bool: + """Strict filter: only watch primary artifact files.""" + return file_name in Config.WATCHED_FILES + def _emit(self, event: BrainEvent): - """Thread-safe emit to asyncio queue.""" self.loop.call_soon_threadsafe(self.event_queue.put_nowait, event) def on_created(self, event: FileSystemEvent): @@ -98,10 +110,8 @@ class BrainEventHandler(FileSystemEventHandler): self._handle_file_event(Path(event.src_path), EventType.FILE_CHANGED) def _handle_directory_created(self, path: Path): - """Detect new session directories.""" conv_id = self._get_conversation_id(path) if conv_id and conv_id not in self._known_sessions: - # Check if this is a direct child of brain/ if path.parent == Config.BRAIN_PATH: self._known_sessions.add(conv_id) logger.info(f"New session detected: {conv_id}") @@ -111,17 +121,17 @@ class BrainEventHandler(FileSystemEventHandler): )) def _handle_file_event(self, path: Path, event_type: EventType): - """Process file creation/modification events.""" conv_id = self._get_conversation_id(path) if not conv_id: return file_name = path.name - if file_name not in Config.WATCHED_FILES: - # Check suffix patterns - if not any(file_name.endswith(s) for s in Config.WATCHED_SUFFIXES): - return + # STRICT filter: only primary artifacts + if not self._is_watched_file(file_name): + return + + # Debounce: skip rapid-fire events for same file if self._should_debounce(str(path)): return @@ -132,7 +142,11 @@ class BrainEventHandler(FileSystemEventHandler): logger.warning(f"Failed to read {path}: {e}") return - logger.info(f"File event: {event_type.value} {conv_id}/{file_name}") + # Content hash dedup: skip if content hasn't actually changed + if not self._content_changed(str(path), content): + return + + logger.info(f"File event: {event_type.value} {conv_id[:8]}/{file_name}") self._emit(BrainEvent( event_type=event_type, conversation_id=conv_id, @@ -152,7 +166,6 @@ class BrainWatcher: self.handler = BrainEventHandler(event_queue, loop) def start(self): - """Start watching the brain directory.""" brain_path = Config.BRAIN_PATH if not brain_path.exists(): logger.error(f"Brain path does not exist: {brain_path}") @@ -163,7 +176,6 @@ class BrainWatcher: logger.info(f"Watching brain directory: {brain_path}") def stop(self): - """Stop the watcher.""" self.observer.stop() self.observer.join() logger.info("Brain watcher stopped")