238 lines
8.4 KiB
Python
238 lines
8.4 KiB
Python
"""Brain directory watcher — monitors Antigravity's brain/ for file changes.
|
|
|
|
Uses watchdog to detect file creation/modification events in the brain directory.
|
|
Emits events to an asyncio queue for the Discord bot to consume.
|
|
|
|
Key design: ONLY emits events for meaningful content changes using hash dedup.
|
|
"""
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import time
|
|
import logging
|
|
from pathlib import Path
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler, FileSystemEvent
|
|
|
|
from config import Config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class EventType(Enum):
|
|
"""Types of brain events."""
|
|
SESSION_START = "session_start" # New conversation directory created
|
|
FILE_CHANGED = "file_changed" # Watched file modified
|
|
FILE_CREATED = "file_created" # Watched file first created
|
|
|
|
|
|
@dataclass
|
|
class BrainEvent:
|
|
"""An event from the brain directory."""
|
|
event_type: EventType
|
|
conversation_id: str
|
|
file_name: str = ""
|
|
file_path: Path = None
|
|
content: str = ""
|
|
timestamp: float = field(default_factory=time.time)
|
|
|
|
|
|
class BrainEventHandler(FileSystemEventHandler):
|
|
"""Watchdog handler that filters, debounces, and deduplicates brain events."""
|
|
|
|
def __init__(self, event_queue: asyncio.Queue, loop: asyncio.AbstractEventLoop):
|
|
super().__init__()
|
|
self.event_queue = event_queue
|
|
self.loop = loop
|
|
self._last_events: dict[str, float] = {} # path -> timestamp (debounce)
|
|
self._content_hashes: dict[str, str] = {} # path -> md5 hash (dedup)
|
|
self._known_sessions: set[str] = set()
|
|
self._initialize_known_sessions()
|
|
|
|
def _initialize_known_sessions(self):
|
|
"""Scan existing brain directories to establish baseline (no events emitted).
|
|
Also pre-loads content hashes for watched files to prevent spurious events.
|
|
"""
|
|
brain_path = Config.BRAIN_PATH
|
|
hash_count = 0
|
|
if brain_path.exists():
|
|
for entry in brain_path.iterdir():
|
|
if entry.is_dir() and self._is_conversation_id(entry.name):
|
|
self._known_sessions.add(entry.name)
|
|
# Pre-load content hashes for watched files
|
|
for watched in Config.WATCHED_FILES:
|
|
fpath = entry / watched
|
|
if fpath.exists():
|
|
try:
|
|
content = fpath.read_text(encoding="utf-8")
|
|
h = hashlib.md5(content.encode()).hexdigest()
|
|
self._content_hashes[str(fpath)] = h
|
|
hash_count += 1
|
|
except (OSError, UnicodeDecodeError):
|
|
pass
|
|
logger.info(
|
|
f"Found {len(self._known_sessions)} existing sessions, "
|
|
f"pre-loaded {hash_count} content hashes"
|
|
)
|
|
|
|
def dispatch(self, event: FileSystemEvent):
|
|
"""Early filter: skip events for files/dirs we don't care about.
|
|
|
|
This runs BEFORE on_created/on_modified, avoiding unnecessary
|
|
method dispatch overhead for the majority of file events.
|
|
"""
|
|
path = Path(event.src_path)
|
|
|
|
# Skip .system_generated and logs subdirectories immediately
|
|
path_parts = path.parts
|
|
if '.system_generated' in path_parts or 'logs' in path_parts:
|
|
return
|
|
|
|
# For file events, skip non-watched files immediately
|
|
if not event.is_directory:
|
|
file_name = path.name
|
|
if not self._is_watched_file(file_name):
|
|
return
|
|
|
|
super().dispatch(event)
|
|
|
|
def _is_conversation_id(self, name: str) -> bool:
|
|
parts = name.split("-")
|
|
return len(parts) == 5 and all(len(p) >= 4 for p in parts)
|
|
|
|
def _get_conversation_id(self, path: Path) -> str | None:
|
|
brain_path = Config.BRAIN_PATH
|
|
try:
|
|
relative = path.relative_to(brain_path)
|
|
parts = relative.parts
|
|
if parts and self._is_conversation_id(parts[0]):
|
|
return parts[0]
|
|
except ValueError:
|
|
pass
|
|
return None
|
|
|
|
def _should_debounce(self, path_str: str) -> bool:
|
|
now = time.time()
|
|
last = self._last_events.get(path_str, 0)
|
|
if now - last < Config.DEBOUNCE_SECONDS:
|
|
return True
|
|
self._last_events[path_str] = now
|
|
return False
|
|
|
|
def _content_changed(self, path_str: str, content: str) -> bool:
|
|
"""Check if content actually changed using MD5 hash."""
|
|
new_hash = hashlib.md5(content.encode()).hexdigest()
|
|
old_hash = self._content_hashes.get(path_str)
|
|
if old_hash == new_hash:
|
|
return False
|
|
self._content_hashes[path_str] = new_hash
|
|
return True
|
|
|
|
def _is_watched_file(self, file_name: str) -> bool:
|
|
"""Filter: watch primary artifact files + any file matching watched extensions."""
|
|
if file_name in Config.WATCHED_FILES:
|
|
return True
|
|
# Extension-based matching (e.g., any .md file in conversation dir)
|
|
ext = Path(file_name).suffix
|
|
if ext and ext in Config.WATCHED_EXTENSIONS:
|
|
return True
|
|
return False
|
|
|
|
def _emit(self, event: BrainEvent):
|
|
self.loop.call_soon_threadsafe(self.event_queue.put_nowait, event)
|
|
|
|
def on_created(self, event: FileSystemEvent):
|
|
if event.is_directory:
|
|
self._handle_directory_created(Path(event.src_path))
|
|
else:
|
|
self._handle_file_event(Path(event.src_path), EventType.FILE_CREATED)
|
|
|
|
def on_modified(self, event: FileSystemEvent):
|
|
if not event.is_directory:
|
|
self._handle_file_event(Path(event.src_path), EventType.FILE_CHANGED)
|
|
|
|
def _handle_directory_created(self, path: Path):
|
|
conv_id = self._get_conversation_id(path)
|
|
if conv_id and conv_id not in self._known_sessions:
|
|
if path.parent == Config.BRAIN_PATH:
|
|
self._known_sessions.add(conv_id)
|
|
logger.info(f"New session detected: {conv_id}")
|
|
self._emit(BrainEvent(
|
|
event_type=EventType.SESSION_START,
|
|
conversation_id=conv_id,
|
|
))
|
|
|
|
def _handle_file_event(self, path: Path, event_type: EventType):
|
|
conv_id = self._get_conversation_id(path)
|
|
if not conv_id:
|
|
return
|
|
|
|
# Exclude files in .system_generated subdirectory (AG internal logs)
|
|
try:
|
|
relative = path.relative_to(Config.BRAIN_PATH / conv_id)
|
|
if '.system_generated' in relative.parts:
|
|
return
|
|
except ValueError:
|
|
pass
|
|
|
|
file_name = path.name
|
|
|
|
# Filter: watched files by name or extension
|
|
if not self._is_watched_file(file_name):
|
|
return
|
|
|
|
# Debounce: skip rapid-fire events for same file
|
|
if self._should_debounce(str(path)):
|
|
return
|
|
|
|
# Read file content
|
|
try:
|
|
content = path.read_text(encoding="utf-8")
|
|
except (OSError, UnicodeDecodeError) as e:
|
|
logger.warning(f"Failed to read {path}: {e}")
|
|
return
|
|
|
|
# Content hash dedup: skip if content hasn't actually changed
|
|
if not self._content_changed(str(path), content):
|
|
return
|
|
|
|
logger.info(f"File event: {event_type.value} {conv_id[:8]}/{file_name}")
|
|
self._emit(BrainEvent(
|
|
event_type=event_type,
|
|
conversation_id=conv_id,
|
|
file_name=file_name,
|
|
file_path=path,
|
|
content=content,
|
|
))
|
|
|
|
|
|
class BrainWatcher:
|
|
"""Manages the watchdog observer for the brain directory."""
|
|
|
|
def __init__(self, event_queue: asyncio.Queue, loop: asyncio.AbstractEventLoop):
|
|
self.event_queue = event_queue
|
|
self.loop = loop
|
|
self.observer = Observer()
|
|
self.handler = BrainEventHandler(event_queue, loop)
|
|
|
|
def start(self):
|
|
brain_path = Config.BRAIN_PATH
|
|
if not brain_path.exists():
|
|
logger.error(f"Brain path does not exist: {brain_path}")
|
|
return
|
|
|
|
self.observer.schedule(self.handler, str(brain_path), recursive=True)
|
|
self.observer.start()
|
|
logger.info(f"Watching brain directory: {brain_path}")
|
|
|
|
def stop(self):
|
|
self.observer.stop()
|
|
self.observer.join()
|
|
logger.info("Brain watcher stopped")
|
|
|
|
@property
|
|
def known_sessions(self) -> set[str]:
|
|
return self.handler._known_sessions
|