refactor: 전면 재설계 - 시작 시 채널 스팸 제거, content hash 중복 방지, 단일 이벤트 경로

This commit is contained in:
2026-03-07 11:42:11 +09:00
parent 52fed8c1d3
commit e32be6b2f3
3 changed files with 155 additions and 312 deletions

379
bot.py
View File

@@ -1,10 +1,9 @@
"""Discord bot — relays Antigravity brain events to Discord channels.
Dynamic channel management:
- Scans brain/ for active sessions on startup
- Creates `gravity-{project_name}` channels per active session
- Archives channels when sessions become inactive
- Project name extracted from artifact content or short conversation ID
- Creates `AG-{project_name}` channels only when file events arrive
- NO startup channel creation — only reconnects to existing Discord channels
- Archives channels after 10 minutes of inactivity
"""
import asyncio
@@ -30,11 +29,13 @@ from bridge import BridgeProtocol, ApprovalRequest, UserResponse
logger = logging.getLogger(__name__)
# ─── Discord UI Components ──────────────────────────────────────────
class ApprovalView(discord.ui.View):
"""Discord buttons for approving/rejecting Antigravity actions."""
def __init__(self, bridge: BridgeProtocol, request: ApprovalRequest):
super().__init__(timeout=300) # 5 min timeout
super().__init__(timeout=300)
self.bridge = bridge
self.request = request
self.responded = False
@@ -46,8 +47,7 @@ class ApprovalView(discord.ui.View):
return
self.responded = True
self.bridge.write_response(UserResponse(
request_id=self.request.request_id,
approved=True,
request_id=self.request.request_id, approved=True,
))
embed = interaction.message.embeds[0] if interaction.message.embeds else None
if embed:
@@ -62,8 +62,7 @@ class ApprovalView(discord.ui.View):
return
self.responded = True
self.bridge.write_response(UserResponse(
request_id=self.request.request_id,
approved=False,
request_id=self.request.request_id, approved=False,
))
embed = interaction.message.embeds[0] if interaction.message.embeds else None
if embed:
@@ -72,44 +71,33 @@ class ApprovalView(discord.ui.View):
await interaction.response.edit_message(embed=embed, view=None)
async def on_timeout(self):
"""Auto-timeout after 5 minutes."""
if not self.responded:
self.bridge.write_response(UserResponse(
request_id=self.request.request_id,
approved=False,
request_id=self.request.request_id, approved=False,
))
# ─── Project Name Detection ─────────────────────────────────────────
def detect_project_name(conv_dir: Path) -> str:
"""Extract a human-readable project name from conversation artifacts.
Strategy:
1. Check task.md first heading for a title
2. Check implementation_plan.md first heading
3. Check any .metadata.json for summary keywords
4. Fallback to short conversation ID
Output format: lowercase_with_underscores (e.g. 'gravity_control')
"""Extract project name from conversation artifacts.
Output: lowercase_with_underscores (e.g. 'gravity_control')
"""
short_id = conv_dir.name[:8]
def _sanitize(raw: str) -> str:
"""Convert raw title to Discord-friendly channel name part."""
# Remove common suffixes
for suffix in ["Task Tracker", "— Task Tracker", "태스크", "구현 계획"]:
raw = raw.replace(suffix, "")
raw = raw.strip(" —-")
# Keep only alphanumeric, Korean, spaces, hyphens
raw = re.sub(r'[^a-zA-Z0-9가-힣\s\-]', '', raw)
# Convert to underscore format
raw = re.sub(r'[\s\-]+', '_', raw).strip('_').lower()
return raw[:30] if raw else ""
# Try task.md title
task_file = conv_dir / "task.md"
if task_file.exists():
for fname in ["task.md", "implementation_plan.md"]:
fpath = conv_dir / fname
if fpath.exists():
try:
first_lines = task_file.read_text(encoding="utf-8").splitlines()[:5]
first_lines = fpath.read_text(encoding="utf-8").splitlines()[:5]
for line in first_lines:
match = re.match(r'^#\s+(.+)', line)
if match:
@@ -119,52 +107,10 @@ def detect_project_name(conv_dir: Path) -> str:
except (OSError, UnicodeDecodeError):
pass
# Try implementation_plan.md title
plan_file = conv_dir / "implementation_plan.md"
if plan_file.exists():
try:
first_lines = plan_file.read_text(encoding="utf-8").splitlines()[:5]
for line in first_lines:
match = re.match(r'^#\s+(.+)', line)
if match:
name = _sanitize(match.group(1))
if name:
return name
except (OSError, UnicodeDecodeError):
pass
# Try metadata summary (first 2 words)
for meta_file in conv_dir.glob("*.metadata.json"):
try:
meta = json.loads(meta_file.read_text(encoding="utf-8"))
summary = meta.get("summary", "")
words = summary.split()[:2]
if words:
name = "_".join(words).lower()
name = re.sub(r'[^a-z0-9가-힣_]', '', name)[:30]
if name:
return name
except (OSError, json.JSONDecodeError):
pass
return short_id
def is_session_active(conv_dir: Path) -> bool:
"""Check if a session is active based on file modification time."""
now = time.time()
threshold = Config.ACTIVE_TIMEOUT_SECONDS
for f in conv_dir.iterdir():
if f.is_file():
try:
mtime = f.stat().st_mtime
if now - mtime < threshold:
return True
except OSError:
continue
return False
# ─── Bot ─────────────────────────────────────────────────────────────
class GravityBot(commands.Bot):
"""Discord bot for Antigravity session monitoring."""
@@ -176,41 +122,28 @@ class GravityBot(commands.Bot):
super().__init__(command_prefix="!", intents=intents)
self.event_queue = event_queue
# conversation_id -> channel object
self.session_channels: dict[str, discord.TextChannel] = {}
# conversation_id -> status message id (to edit in-place)
self.session_status_messages: dict[str, int] = {}
# conversation_id -> project name
self.session_names: dict[str, str] = {}
# Locks to prevent duplicate channel creation
self._channel_locks: dict[str, asyncio.Lock] = {}
# Bridge protocol for bidirectional communication
self.bridge = BridgeProtocol()
# Cache: conversation_id -> last metadata summary (skip unchanged)
self._last_meta_summary: dict[str, str] = {}
# Category for session channels
self.session_category: discord.CategoryChannel | None = None
# Guild reference
self.guild: discord.Guild | None = None
async def setup_hook(self):
"""Called after login, before processing events."""
self.loop.create_task(self._process_events())
self.session_cleanup_loop.start()
self.pending_approval_scanner.start()
logger.info("Bot setup complete, event processor + approval scanner started")
logger.info("Bot setup complete, event processor started")
async def on_ready(self):
"""Called when bot is ready."""
logger.info(f"Bot connected as {self.user} (ID: {self.user.id})")
# Get guild
self.guild = self.get_guild(Config.DISCORD_GUILD_ID)
if not self.guild:
logger.error(f"Guild {Config.DISCORD_GUILD_ID} not found!")
return
# Find or create session category
# Find or create category
category_name = "Antigravity Sessions"
self.session_category = discord.utils.get(
self.guild.categories, name=category_name
@@ -223,66 +156,83 @@ class GravityBot(commands.Bot):
logger.error("No permission to create category!")
return
# Sync existing active sessions
await self._sync_active_sessions()
# ONLY reconnect to existing Discord channels (NO new channel creation)
await self._reconnect_existing_channels()
async def _sync_active_sessions(self):
"""Scan brain/ and create channels for currently active sessions."""
brain_path = Config.BRAIN_PATH
if not brain_path.exists():
async def _reconnect_existing_channels(self):
"""Scan existing Discord channels and map them to conversation IDs.
Does NOT create any new channels."""
if not self.session_category:
return
active_count = 0
for entry in brain_path.iterdir():
if entry.is_dir() and self._is_conversation_id(entry.name):
if is_session_active(entry):
project_name = detect_project_name(entry)
await self._ensure_channel(entry.name, project_name)
active_count += 1
count = 0
for ch in self.session_category.text_channels:
if ch.topic and "Antigravity Session:" in ch.topic:
# Extract conversation ID from topic
conv_id = ch.topic.replace("Antigravity Session:", "").strip()
if conv_id:
self.session_channels[conv_id] = ch
# Recover last task embed
await self._recover_task_message(ch, conv_id)
count += 1
logger.info(f"Synced {active_count} active sessions on startup")
logger.info(f"Reconnected to {count} existing channels")
def _is_conversation_id(self, name: str) -> bool:
"""Check if directory name looks like a UUID."""
parts = name.split("-")
return len(parts) == 5 and all(len(p) >= 4 for p in parts)
async def _recover_task_message(
self, channel: discord.TextChannel, conversation_id: str
):
"""Find last task embed in channel to reuse for editing."""
if conversation_id in self.session_status_messages:
return
try:
async for msg in channel.history(limit=10):
if msg.author == self.user and msg.embeds:
embed = msg.embeds[0]
if embed.title and "Task" in embed.title:
self.session_status_messages[conversation_id] = msg.id
return
except (discord.Forbidden, discord.HTTPException):
pass
# ─── Channel Management ──────────────────────────────────────────
async def _ensure_channel(
self, conversation_id: str, project_name: str
) -> discord.TextChannel:
"""Get or create a Discord channel for a session (thread-safe)."""
# Fast path: already mapped
"""Get or create a channel (thread-safe, single creation per session)."""
if conversation_id in self.session_channels:
return self.session_channels[conversation_id]
ch = self.session_channels[conversation_id]
# Rename back from "closed-" if needed
expected = f"{Config.CHANNEL_PREFIX}-{project_name}".lower()
if ch.name.startswith("closed-") and ch.name != expected:
try:
await ch.edit(name=f"{Config.CHANNEL_PREFIX}-{project_name}")
except discord.errors.Forbidden:
pass
return ch
# Get or create a lock for this conversation
# Lock per conversation to prevent duplicates
if conversation_id not in self._channel_locks:
self._channel_locks[conversation_id] = asyncio.Lock()
async with self._channel_locks[conversation_id]:
# Double-check after acquiring lock
# Double-check
if conversation_id in self.session_channels:
return self.session_channels[conversation_id]
channel_name = f"{Config.CHANNEL_PREFIX}-{project_name}"
# Check if channel already exists in category (from previous run)
# Check if channel exists but wasn't mapped yet
if self.session_category:
for ch in self.session_category.text_channels:
if ch.topic and conversation_id in ch.topic:
self.session_channels[conversation_id] = ch
self.session_names[conversation_id] = project_name
# Rename back from closed- if needed
expected_name = f"{Config.CHANNEL_PREFIX}-{project_name}"
if ch.name != expected_name.lower():
if ch.name.startswith("closed-"):
try:
await ch.edit(name=expected_name)
logger.info(f"Renamed channel {ch.name} -> {expected_name}")
await ch.edit(name=channel_name)
except discord.errors.Forbidden:
pass
# Recover last task embed message ID
await self._recover_task_message(ch, conversation_id)
logger.info(f"Reconnected to existing channel #{ch.name}")
return ch
# Create new channel
@@ -296,13 +246,9 @@ class GravityBot(commands.Bot):
self.session_names[conversation_id] = project_name
logger.info(f"Created channel #{channel_name}")
# Welcome embed
embed = discord.Embed(
title=f"🚀 {project_name}",
description=(
f"Antigravity 세션 연결됨\n"
f"Session: `{conversation_id}`"
),
description=f"Antigravity 세션 연결됨\nSession: `{conversation_id}`",
color=discord.Color.blue(),
timestamp=datetime.now(timezone.utc),
)
@@ -313,26 +259,10 @@ class GravityBot(commands.Bot):
logger.error(f"No permission to create channel: {channel_name}")
return None
async def _recover_task_message(
self, channel: discord.TextChannel, conversation_id: str
):
"""Find the last task embed in channel history to reuse for editing."""
if conversation_id in self.session_status_messages:
return # Already have it
try:
async for msg in channel.history(limit=20):
if msg.author == self.user and msg.embeds:
embed = msg.embeds[0]
if embed.title and "Task 진행 현황" in embed.title:
self.session_status_messages[conversation_id] = msg.id
logger.info(f"Recovered task embed msg {msg.id} for {conversation_id[:8]}")
return
except (discord.Forbidden, discord.HTTPException):
pass
# ─── Event Processing (SINGLE ROUTE) ─────────────────────────────
async def _process_events(self):
"""Main event processing loop."""
"""Main event loop — ALL events go through here sequentially."""
await self.wait_until_ready()
while not self.is_closed():
@@ -347,30 +277,30 @@ class GravityBot(commands.Bot):
logger.error(f"Error processing event: {e}", exc_info=True)
async def _handle_event(self, event: BrainEvent):
"""Route brain events to handlers."""
"""Route brain events to handlers — single entry point."""
conv_dir = Config.BRAIN_PATH / event.conversation_id
project_name = detect_project_name(conv_dir)
if event.event_type == EventType.SESSION_START:
conv_dir = Config.BRAIN_PATH / event.conversation_id
project_name = detect_project_name(conv_dir)
await self._ensure_channel(event.conversation_id, project_name)
return
elif event.event_type in (EventType.FILE_CREATED, EventType.FILE_CHANGED):
# Ensure channel exists
conv_dir = Config.BRAIN_PATH / event.conversation_id
project_name = detect_project_name(conv_dir)
# FILE_CREATED or FILE_CHANGED
channel = await self._ensure_channel(event.conversation_id, project_name)
if not channel:
return
if channel:
if event.file_name == "task.md":
await self._send_task_update(channel, event)
elif event.file_name.endswith(".metadata.json"):
await self._send_metadata_update(channel, event)
else:
await self._send_artifact_content(channel, event)
await self._send_artifact_update(channel, event)
# ─── Message Senders ─────────────────────────────────────────────
async def _send_task_update(
self, channel: discord.TextChannel, event: BrainEvent
):
"""Send task progress update as an embed."""
"""Send/edit task progress embed (SINGLE message, always edited)."""
progress = parse_task_progress(event.content)
embed = discord.Embed(
@@ -381,10 +311,9 @@ class GravityBot(commands.Bot):
else discord.Color.greyple(),
timestamp=datetime.now(timezone.utc),
)
short_id = event.conversation_id[:8]
embed.set_footer(text=f"Session: {short_id}")
embed.set_footer(text=f"Session: {event.conversation_id[:8]}")
# Edit existing or send new
# Always try to edit existing message first
msg_id = self.session_status_messages.get(event.conversation_id)
if msg_id:
try:
@@ -397,10 +326,10 @@ class GravityBot(commands.Bot):
msg = await channel.send(embed=embed)
self.session_status_messages[event.conversation_id] = msg.id
async def _send_artifact_content(
async def _send_artifact_update(
self, channel: discord.TextChannel, event: BrainEvent
):
"""Send artifact change notification (compact — metadata handler sends summary)."""
"""Send artifact update as single compact embed (preview only)."""
labels = {
"implementation_plan.md": "📐 구현 계획",
"walkthrough.md": "📝 작업 결과 요약",
@@ -408,15 +337,11 @@ class GravityBot(commands.Bot):
label = labels.get(event.file_name, f"📄 {event.file_name}")
event_label = "생성" if event.event_type == EventType.FILE_CREATED else "업데이트"
# Only send first few lines as preview instead of full content
# Preview: first 6 non-empty lines only
lines = event.content.strip().splitlines()
preview_lines = []
for line in lines[:8]:
if line.strip():
preview_lines.append(line)
preview = "\n".join(preview_lines)
if len(lines) > 8:
preview += f"\n... (+{len(lines) - 8} lines)"
preview = "\n".join(l for l in lines[:6] if l.strip())
if len(lines) > 6:
preview += f"\n... (+{len(lines) - 6} lines)"
embed = discord.Embed(
title=f"{label} ({event_label}됨)",
@@ -426,108 +351,30 @@ class GravityBot(commands.Bot):
)
await channel.send(embed=embed)
async def _send_metadata_update(
self, channel: discord.TextChannel, event: BrainEvent
):
"""Send artifact metadata summary changes as compact embed."""
try:
meta = json.loads(event.content)
except json.JSONDecodeError:
return
summary = meta.get("summary", "")
artifact_type = meta.get("artifactType", "")
if not summary:
return
# Dedup: skip if summary unchanged since last notification
cache_key = f"{event.conversation_id}:{event.file_name}"
if self._last_meta_summary.get(cache_key) == summary:
return
self._last_meta_summary[cache_key] = summary
# Map artifact types to emoji
type_emoji = {
"ARTIFACT_TYPE_TASK": "📋",
"ARTIFACT_TYPE_IMPLEMENTATION_PLAN": "📐",
"ARTIFACT_TYPE_WALKTHROUGH": "📝",
"ARTIFACT_TYPE_OTHER": "📄",
}
emoji = type_emoji.get(artifact_type, "📄")
# Artifact name from filename (strip .metadata.json)
artifact_name = event.file_name.replace(".metadata.json", "")
embed = discord.Embed(
description=f"{emoji} **{artifact_name}** 업데이트\n\n{summary[:500]}",
color=discord.Color.dark_teal(),
timestamp=datetime.now(timezone.utc),
)
await channel.send(embed=embed)
@tasks.loop(minutes=5)
async def session_cleanup_loop(self):
"""Periodically check for inactive sessions and archive their channels."""
if not self.guild:
return
to_remove = []
for conv_id, channel in self.session_channels.items():
conv_dir = Config.BRAIN_PATH / conv_id
if not conv_dir.exists() or not is_session_active(conv_dir):
to_remove.append(conv_id)
for conv_id in to_remove:
channel = self.session_channels.pop(conv_id, None)
self.session_status_messages.pop(conv_id, None)
name = self.session_names.pop(conv_id, conv_id[:8])
if channel:
try:
embed = discord.Embed(
title="🔴 세션 비활성",
description=f"`{name}` 세션이 비활성 상태입니다.",
color=discord.Color.red(),
timestamp=datetime.now(timezone.utc),
)
await channel.send(embed=embed)
await channel.edit(name=f"closed-{name[:20]}")
logger.info(f"Archived channel for {conv_id[:8]}")
except discord.errors.Forbidden:
logger.warning(f"No permission to archive {conv_id[:8]}")
except Exception as e:
logger.warning(f"Failed to archive {conv_id[:8]}: {e}")
@session_cleanup_loop.before_loop
async def before_cleanup(self):
await self.wait_until_ready()
# ─── Approval Scanner ────────────────────────────────────────────
@tasks.loop(seconds=3)
async def pending_approval_scanner(self):
"""Scan bridge/pending/ for new approval requests and send Discord buttons."""
"""Scan bridge/pending/ for new approval requests."""
try:
requests = self.bridge.get_pending_requests()
for req in requests:
# Find the right channel
if req.conversation_id in self.session_channels:
channel = self.session_channels[req.conversation_id]
elif req.discord_message_id == 0:
# Try to find channel by conversation_id
if req.discord_message_id != 0:
continue # Already sent
channel = self.session_channels.get(req.conversation_id)
if not channel:
conv_dir = Config.BRAIN_PATH / req.conversation_id
if conv_dir.exists():
project_name = detect_project_name(conv_dir)
channel = await self._ensure_channel(req.conversation_id, project_name)
else:
continue
else:
continue
channel = await self._ensure_channel(
req.conversation_id, project_name
)
if channel and req.discord_message_id == 0:
if channel:
await self._send_approval_request(channel, req)
except Exception as e:
logger.error(f"Error scanning pending approvals: {e}")
logger.error(f"Error scanning approvals: {e}")
@pending_approval_scanner.before_loop
async def before_scanner(self):
@@ -536,7 +383,6 @@ class GravityBot(commands.Bot):
async def _send_approval_request(
self, channel: discord.TextChannel, request: ApprovalRequest
):
"""Send an approval request with buttons to Discord."""
embed = discord.Embed(
title="⚠️ 승인 요청",
description=(
@@ -558,25 +404,22 @@ class GravityBot(commands.Bot):
data = json.loads(pending_file.read_text(encoding="utf-8"))
data["discord_message_id"] = msg.id
pending_file.write_text(
json.dumps(data, ensure_ascii=False, indent=2),
encoding="utf-8"
json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8"
)
except (json.JSONDecodeError, OSError):
pass
logger.info(f"Sent approval request: {request.request_id[:8]}")
# ─── Discord → Antigravity Text Relay ─────────────────────────────
async def on_message(self, message: discord.Message):
"""Handle user messages in AG channels → relay as text input."""
# Ignore bot's own messages
if message.author == self.user:
return
# Check if message is in an AG session channel
if not message.channel.name.startswith(Config.CHANNEL_PREFIX + "-"):
if not message.channel.name.startswith(Config.CHANNEL_PREFIX.lower() + "-"):
return
# Find conversation_id for this channel
conv_id = None
for cid, ch in self.session_channels.items():
if ch.id == message.channel.id:
@@ -584,11 +427,7 @@ class GravityBot(commands.Bot):
break
if conv_id and message.content.strip():
# Write user input to bridge commands
cmd_id = self.bridge.write_command(conv_id, message.content.strip())
self.bridge.write_command(conv_id, message.content.strip())
await message.add_reaction("📨")
logger.info(f"User input relayed: {message.content[:50]}... → {conv_id[:8]}")
# Process commands (e.g., !approve, !reject)
await self.process_commands(message)

View File

@@ -21,24 +21,16 @@ class Config:
os.path.expanduser("~/.gemini/antigravity/brain")
))
# Session activity detection
ACTIVE_TIMEOUT_SECONDS: int = int(os.getenv("ACTIVE_TIMEOUT_SECONDS", "300"))
# Watcher settings
DEBOUNCE_SECONDS: float = float(os.getenv("DEBOUNCE_SECONDS", "2"))
DEBOUNCE_SECONDS: float = float(os.getenv("DEBOUNCE_SECONDS", "5"))
# Files to monitor within each conversation directory
# Files to monitor within each conversation directory (PRIMARY ONLY)
WATCHED_FILES: set = {
"task.md",
"implementation_plan.md",
"walkthrough.md",
}
# Also monitor these patterns (matched by suffix)
WATCHED_SUFFIXES: set = {
".metadata.json", # artifact summary changes
}
# Discord message limits
DISCORD_MSG_LIMIT: int = 2000
DISCORD_EMBED_DESC_LIMIT: int = 4096

View File

@@ -2,9 +2,12 @@
Uses watchdog to detect file creation/modification events in the brain directory.
Emits events to an asyncio queue for the Discord bot to consume.
Key design: ONLY emits events for meaningful content changes using hash dedup.
"""
import asyncio
import hashlib
import time
import logging
from pathlib import Path
@@ -21,8 +24,7 @@ logger = logging.getLogger(__name__)
class EventType(Enum):
"""Types of brain events."""
SESSION_START = "session_start" # New conversation directory created
SESSION_END = "session_end" # Conversation directory removed (or program exit)
FILE_CHANGED = "file_changed" # Watched file created/modified
FILE_CHANGED = "file_changed" # Watched file modified
FILE_CREATED = "file_created" # Watched file first created
@@ -38,18 +40,19 @@ class BrainEvent:
class BrainEventHandler(FileSystemEventHandler):
"""Watchdog handler that filters and debounces brain events."""
"""Watchdog handler that filters, debounces, and deduplicates brain events."""
def __init__(self, event_queue: asyncio.Queue, loop: asyncio.AbstractEventLoop):
super().__init__()
self.event_queue = event_queue
self.loop = loop
self._last_events: dict[str, float] = {} # path -> timestamp (debounce)
self._content_hashes: dict[str, str] = {} # path -> md5 hash (dedup)
self._known_sessions: set[str] = set()
self._initialize_known_sessions()
def _initialize_known_sessions(self):
"""Scan existing brain directories to establish baseline."""
"""Scan existing brain directories to establish baseline (no events emitted)."""
brain_path = Config.BRAIN_PATH
if brain_path.exists():
for entry in brain_path.iterdir():
@@ -58,12 +61,10 @@ class BrainEventHandler(FileSystemEventHandler):
logger.info(f"Found {len(self._known_sessions)} existing sessions at startup")
def _is_conversation_id(self, name: str) -> bool:
"""Check if directory name looks like a UUID conversation ID."""
parts = name.split("-")
return len(parts) == 5 and all(len(p) >= 4 for p in parts)
def _get_conversation_id(self, path: Path) -> str | None:
"""Extract conversation ID from file path."""
brain_path = Config.BRAIN_PATH
try:
relative = path.relative_to(brain_path)
@@ -75,7 +76,6 @@ class BrainEventHandler(FileSystemEventHandler):
return None
def _should_debounce(self, path_str: str) -> bool:
"""Check if this event should be debounced."""
now = time.time()
last = self._last_events.get(path_str, 0)
if now - last < Config.DEBOUNCE_SECONDS:
@@ -83,8 +83,20 @@ class BrainEventHandler(FileSystemEventHandler):
self._last_events[path_str] = now
return False
def _content_changed(self, path_str: str, content: str) -> bool:
"""Check if content actually changed using MD5 hash."""
new_hash = hashlib.md5(content.encode()).hexdigest()
old_hash = self._content_hashes.get(path_str)
if old_hash == new_hash:
return False
self._content_hashes[path_str] = new_hash
return True
def _is_watched_file(self, file_name: str) -> bool:
"""Strict filter: only watch primary artifact files."""
return file_name in Config.WATCHED_FILES
def _emit(self, event: BrainEvent):
"""Thread-safe emit to asyncio queue."""
self.loop.call_soon_threadsafe(self.event_queue.put_nowait, event)
def on_created(self, event: FileSystemEvent):
@@ -98,10 +110,8 @@ class BrainEventHandler(FileSystemEventHandler):
self._handle_file_event(Path(event.src_path), EventType.FILE_CHANGED)
def _handle_directory_created(self, path: Path):
"""Detect new session directories."""
conv_id = self._get_conversation_id(path)
if conv_id and conv_id not in self._known_sessions:
# Check if this is a direct child of brain/
if path.parent == Config.BRAIN_PATH:
self._known_sessions.add(conv_id)
logger.info(f"New session detected: {conv_id}")
@@ -111,17 +121,17 @@ class BrainEventHandler(FileSystemEventHandler):
))
def _handle_file_event(self, path: Path, event_type: EventType):
"""Process file creation/modification events."""
conv_id = self._get_conversation_id(path)
if not conv_id:
return
file_name = path.name
if file_name not in Config.WATCHED_FILES:
# Check suffix patterns
if not any(file_name.endswith(s) for s in Config.WATCHED_SUFFIXES):
# STRICT filter: only primary artifacts
if not self._is_watched_file(file_name):
return
# Debounce: skip rapid-fire events for same file
if self._should_debounce(str(path)):
return
@@ -132,7 +142,11 @@ class BrainEventHandler(FileSystemEventHandler):
logger.warning(f"Failed to read {path}: {e}")
return
logger.info(f"File event: {event_type.value} {conv_id}/{file_name}")
# Content hash dedup: skip if content hasn't actually changed
if not self._content_changed(str(path), content):
return
logger.info(f"File event: {event_type.value} {conv_id[:8]}/{file_name}")
self._emit(BrainEvent(
event_type=event_type,
conversation_id=conv_id,
@@ -152,7 +166,6 @@ class BrainWatcher:
self.handler = BrainEventHandler(event_queue, loop)
def start(self):
"""Start watching the brain directory."""
brain_path = Config.BRAIN_PATH
if not brain_path.exists():
logger.error(f"Brain path does not exist: {brain_path}")
@@ -163,7 +176,6 @@ class BrainWatcher:
logger.info(f"Watching brain directory: {brain_path}")
def stop(self):
"""Stop the watcher."""
self.observer.stop()
self.observer.join()
logger.info("Brain watcher stopped")