Files
gravity_control/watcher.py

217 lines
7.7 KiB
Python

"""Brain directory watcher — monitors Antigravity's brain/ for file changes.
Uses watchdog to detect file creation/modification events in the brain directory.
Emits events to an asyncio queue for the Discord bot to consume.
Key design: ONLY emits events for meaningful content changes using hash dedup.
"""
import asyncio
import hashlib
import time
import logging
from pathlib import Path
from dataclasses import dataclass, field
from enum import Enum
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileSystemEvent
from config import Config
logger = logging.getLogger(__name__)
class EventType(Enum):
"""Types of brain events."""
SESSION_START = "session_start" # New conversation directory created
FILE_CHANGED = "file_changed" # Watched file modified
FILE_CREATED = "file_created" # Watched file first created
@dataclass
class BrainEvent:
"""An event from the brain directory."""
event_type: EventType
conversation_id: str
file_name: str = ""
file_path: Path = None
content: str = ""
timestamp: float = field(default_factory=time.time)
class BrainEventHandler(FileSystemEventHandler):
"""Watchdog handler that filters, debounces, and deduplicates brain events."""
def __init__(self, event_queue: asyncio.Queue, loop: asyncio.AbstractEventLoop):
super().__init__()
self.event_queue = event_queue
self.loop = loop
self._last_events: dict[str, float] = {} # path -> timestamp (debounce)
self._content_hashes: dict[str, str] = {} # path -> md5 hash (dedup)
self._known_sessions: set[str] = set()
self._initialize_known_sessions()
def _initialize_known_sessions(self):
"""Scan existing brain directories to establish baseline (no events emitted).
Also pre-loads content hashes for watched files to prevent spurious events.
"""
brain_path = Config.BRAIN_PATH
hash_count = 0
if brain_path.exists():
for entry in brain_path.iterdir():
if entry.is_dir() and self._is_conversation_id(entry.name):
self._known_sessions.add(entry.name)
# Pre-load content hashes for watched files
for watched in Config.WATCHED_FILES:
fpath = entry / watched
if fpath.exists():
try:
content = fpath.read_text(encoding="utf-8")
h = hashlib.md5(content.encode()).hexdigest()
self._content_hashes[str(fpath)] = h
hash_count += 1
except (OSError, UnicodeDecodeError):
pass
logger.info(
f"Found {len(self._known_sessions)} existing sessions, "
f"pre-loaded {hash_count} content hashes"
)
def _is_conversation_id(self, name: str) -> bool:
parts = name.split("-")
return len(parts) == 5 and all(len(p) >= 4 for p in parts)
def _get_conversation_id(self, path: Path) -> str | None:
brain_path = Config.BRAIN_PATH
try:
relative = path.relative_to(brain_path)
parts = relative.parts
if parts and self._is_conversation_id(parts[0]):
return parts[0]
except ValueError:
pass
return None
def _should_debounce(self, path_str: str) -> bool:
now = time.time()
last = self._last_events.get(path_str, 0)
if now - last < Config.DEBOUNCE_SECONDS:
return True
self._last_events[path_str] = now
return False
def _content_changed(self, path_str: str, content: str) -> bool:
"""Check if content actually changed using MD5 hash."""
new_hash = hashlib.md5(content.encode()).hexdigest()
old_hash = self._content_hashes.get(path_str)
if old_hash == new_hash:
return False
self._content_hashes[path_str] = new_hash
return True
def _is_watched_file(self, file_name: str) -> bool:
"""Filter: watch primary artifact files + any file matching watched extensions."""
if file_name in Config.WATCHED_FILES:
return True
# Extension-based matching (e.g., any .md file in conversation dir)
ext = Path(file_name).suffix
if ext and ext in Config.WATCHED_EXTENSIONS:
return True
return False
def _emit(self, event: BrainEvent):
self.loop.call_soon_threadsafe(self.event_queue.put_nowait, event)
def on_created(self, event: FileSystemEvent):
if event.is_directory:
self._handle_directory_created(Path(event.src_path))
else:
self._handle_file_event(Path(event.src_path), EventType.FILE_CREATED)
def on_modified(self, event: FileSystemEvent):
if not event.is_directory:
self._handle_file_event(Path(event.src_path), EventType.FILE_CHANGED)
def _handle_directory_created(self, path: Path):
conv_id = self._get_conversation_id(path)
if conv_id and conv_id not in self._known_sessions:
if path.parent == Config.BRAIN_PATH:
self._known_sessions.add(conv_id)
logger.info(f"New session detected: {conv_id}")
self._emit(BrainEvent(
event_type=EventType.SESSION_START,
conversation_id=conv_id,
))
def _handle_file_event(self, path: Path, event_type: EventType):
conv_id = self._get_conversation_id(path)
if not conv_id:
return
# Exclude files in .system_generated subdirectory (AG internal logs)
try:
relative = path.relative_to(Config.BRAIN_PATH / conv_id)
if '.system_generated' in relative.parts:
return
except ValueError:
pass
file_name = path.name
# Filter: watched files by name or extension
if not self._is_watched_file(file_name):
return
# Debounce: skip rapid-fire events for same file
if self._should_debounce(str(path)):
return
# Read file content
try:
content = path.read_text(encoding="utf-8")
except (OSError, UnicodeDecodeError) as e:
logger.warning(f"Failed to read {path}: {e}")
return
# Content hash dedup: skip if content hasn't actually changed
if not self._content_changed(str(path), content):
return
logger.info(f"File event: {event_type.value} {conv_id[:8]}/{file_name}")
self._emit(BrainEvent(
event_type=event_type,
conversation_id=conv_id,
file_name=file_name,
file_path=path,
content=content,
))
class BrainWatcher:
"""Manages the watchdog observer for the brain directory."""
def __init__(self, event_queue: asyncio.Queue, loop: asyncio.AbstractEventLoop):
self.event_queue = event_queue
self.loop = loop
self.observer = Observer()
self.handler = BrainEventHandler(event_queue, loop)
def start(self):
brain_path = Config.BRAIN_PATH
if not brain_path.exists():
logger.error(f"Brain path does not exist: {brain_path}")
return
self.observer.schedule(self.handler, str(brain_path), recursive=True)
self.observer.start()
logger.info(f"Watching brain directory: {brain_path}")
def stop(self):
self.observer.stop()
self.observer.join()
logger.info("Brain watcher stopped")
@property
def known_sessions(self) -> set[str]:
return self.handler._known_sessions