172 lines
6.0 KiB
Python
172 lines
6.0 KiB
Python
"""Brain directory watcher — monitors Antigravity's brain/ for file changes.
|
|
|
|
Uses watchdog to detect file creation/modification events in the brain directory.
|
|
Emits events to an asyncio queue for the Discord bot to consume.
|
|
"""
|
|
|
|
import asyncio
|
|
import time
|
|
import logging
|
|
from pathlib import Path
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler, FileSystemEvent
|
|
|
|
from config import Config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class EventType(Enum):
|
|
"""Types of brain events."""
|
|
SESSION_START = "session_start" # New conversation directory created
|
|
SESSION_END = "session_end" # Conversation directory removed (or program exit)
|
|
FILE_CHANGED = "file_changed" # Watched file created/modified
|
|
FILE_CREATED = "file_created" # Watched file first created
|
|
|
|
|
|
@dataclass
|
|
class BrainEvent:
|
|
"""An event from the brain directory."""
|
|
event_type: EventType
|
|
conversation_id: str
|
|
file_name: str = ""
|
|
file_path: Path = None
|
|
content: str = ""
|
|
timestamp: float = field(default_factory=time.time)
|
|
|
|
|
|
class BrainEventHandler(FileSystemEventHandler):
|
|
"""Watchdog handler that filters and debounces brain events."""
|
|
|
|
def __init__(self, event_queue: asyncio.Queue, loop: asyncio.AbstractEventLoop):
|
|
super().__init__()
|
|
self.event_queue = event_queue
|
|
self.loop = loop
|
|
self._last_events: dict[str, float] = {} # path -> timestamp (debounce)
|
|
self._known_sessions: set[str] = set()
|
|
self._initialize_known_sessions()
|
|
|
|
def _initialize_known_sessions(self):
|
|
"""Scan existing brain directories to establish baseline."""
|
|
brain_path = Config.BRAIN_PATH
|
|
if brain_path.exists():
|
|
for entry in brain_path.iterdir():
|
|
if entry.is_dir() and self._is_conversation_id(entry.name):
|
|
self._known_sessions.add(entry.name)
|
|
logger.info(f"Found {len(self._known_sessions)} existing sessions at startup")
|
|
|
|
def _is_conversation_id(self, name: str) -> bool:
|
|
"""Check if directory name looks like a UUID conversation ID."""
|
|
parts = name.split("-")
|
|
return len(parts) == 5 and all(len(p) >= 4 for p in parts)
|
|
|
|
def _get_conversation_id(self, path: Path) -> str | None:
|
|
"""Extract conversation ID from file path."""
|
|
brain_path = Config.BRAIN_PATH
|
|
try:
|
|
relative = path.relative_to(brain_path)
|
|
parts = relative.parts
|
|
if parts and self._is_conversation_id(parts[0]):
|
|
return parts[0]
|
|
except ValueError:
|
|
pass
|
|
return None
|
|
|
|
def _should_debounce(self, path_str: str) -> bool:
|
|
"""Check if this event should be debounced."""
|
|
now = time.time()
|
|
last = self._last_events.get(path_str, 0)
|
|
if now - last < Config.DEBOUNCE_SECONDS:
|
|
return True
|
|
self._last_events[path_str] = now
|
|
return False
|
|
|
|
def _emit(self, event: BrainEvent):
|
|
"""Thread-safe emit to asyncio queue."""
|
|
self.loop.call_soon_threadsafe(self.event_queue.put_nowait, event)
|
|
|
|
def on_created(self, event: FileSystemEvent):
|
|
if event.is_directory:
|
|
self._handle_directory_created(Path(event.src_path))
|
|
else:
|
|
self._handle_file_event(Path(event.src_path), EventType.FILE_CREATED)
|
|
|
|
def on_modified(self, event: FileSystemEvent):
|
|
if not event.is_directory:
|
|
self._handle_file_event(Path(event.src_path), EventType.FILE_CHANGED)
|
|
|
|
def _handle_directory_created(self, path: Path):
|
|
"""Detect new session directories."""
|
|
conv_id = self._get_conversation_id(path)
|
|
if conv_id and conv_id not in self._known_sessions:
|
|
# Check if this is a direct child of brain/
|
|
if path.parent == Config.BRAIN_PATH:
|
|
self._known_sessions.add(conv_id)
|
|
logger.info(f"New session detected: {conv_id}")
|
|
self._emit(BrainEvent(
|
|
event_type=EventType.SESSION_START,
|
|
conversation_id=conv_id,
|
|
))
|
|
|
|
def _handle_file_event(self, path: Path, event_type: EventType):
|
|
"""Process file creation/modification events."""
|
|
conv_id = self._get_conversation_id(path)
|
|
if not conv_id:
|
|
return
|
|
|
|
file_name = path.name
|
|
if file_name not in Config.WATCHED_FILES:
|
|
return
|
|
|
|
if self._should_debounce(str(path)):
|
|
return
|
|
|
|
# Read file content
|
|
try:
|
|
content = path.read_text(encoding="utf-8")
|
|
except (OSError, UnicodeDecodeError) as e:
|
|
logger.warning(f"Failed to read {path}: {e}")
|
|
return
|
|
|
|
logger.info(f"File event: {event_type.value} {conv_id}/{file_name}")
|
|
self._emit(BrainEvent(
|
|
event_type=event_type,
|
|
conversation_id=conv_id,
|
|
file_name=file_name,
|
|
file_path=path,
|
|
content=content,
|
|
))
|
|
|
|
|
|
class BrainWatcher:
|
|
"""Manages the watchdog observer for the brain directory."""
|
|
|
|
def __init__(self, event_queue: asyncio.Queue, loop: asyncio.AbstractEventLoop):
|
|
self.event_queue = event_queue
|
|
self.loop = loop
|
|
self.observer = Observer()
|
|
self.handler = BrainEventHandler(event_queue, loop)
|
|
|
|
def start(self):
|
|
"""Start watching the brain directory."""
|
|
brain_path = Config.BRAIN_PATH
|
|
if not brain_path.exists():
|
|
logger.error(f"Brain path does not exist: {brain_path}")
|
|
return
|
|
|
|
self.observer.schedule(self.handler, str(brain_path), recursive=True)
|
|
self.observer.start()
|
|
logger.info(f"Watching brain directory: {brain_path}")
|
|
|
|
def stop(self):
|
|
"""Stop the watcher."""
|
|
self.observer.stop()
|
|
self.observer.join()
|
|
logger.info("Brain watcher stopped")
|
|
|
|
@property
|
|
def known_sessions(self) -> set[str]:
|
|
return self.handler._known_sessions
|