"""Bridge protocol — file-based communication between Discord bot and Antigravity. Bridge directory: ~/.gemini/antigravity/bridge/ Structure: bridge/ pending/ ← Bot writes approval requests for Discord response/ ← Bot writes user responses from Discord commands/ ← Bot writes user text input from Discord Protocol: 1. VS Code Extension detects pending approval → writes JSON to pending/ 2. Bot reads pending/ → sends Discord message with ✅/❌ buttons 3. User clicks button → Bot writes JSON to response/ 4. VS Code Extension reads response/ → executes action """ import json import time import logging from pathlib import Path from dataclasses import dataclass, asdict from enum import Enum from config import Config logger = logging.getLogger(__name__) class ApprovalStatus(Enum): PENDING = "pending" APPROVED = "approved" REJECTED = "rejected" TIMEOUT = "timeout" @dataclass class ApprovalRequest: """An approval request from Antigravity.""" request_id: str conversation_id: str command: str # The command/action needing approval description: str # Human-readable description timestamp: float status: str = "pending" discord_message_id: int = 0 project_name: str = "" # Project routing key @dataclass class UserResponse: """A user response from Discord.""" request_id: str approved: bool user_input: str = "" timestamp: float = 0 button_index: int = -1 # -1 = legacy (approve/reject), 0+ = specific button index step_type: str = "" # pass through from pending for extension routing class BridgeProtocol: """Manages the file-based bridge protocol.""" def __init__(self): self.bridge_dir = Config.BRAIN_PATH.parent / "bridge" self.pending_dir = self.bridge_dir / "pending" self.response_dir = self.bridge_dir / "response" self.commands_dir = self.bridge_dir / "commands" # Create directories for d in [self.pending_dir, self.response_dir, self.commands_dir]: d.mkdir(parents=True, exist_ok=True) # Startup cleanup: purge stale pending files (> 5 min old) self._cleanup_stale_pending() logger.info(f"Bridge protocol initialized: {self.bridge_dir}") def _cleanup_stale_pending(self, max_age_seconds: int = 300): """Remove pending files older than max_age_seconds on startup.""" now = time.time() cleaned = 0 for f in self.pending_dir.glob("*.json"): try: data = json.loads(f.read_text(encoding="utf-8-sig")) ts = data.get("timestamp", 0) if now - ts > max_age_seconds: f.unlink() cleaned += 1 except (json.JSONDecodeError, OSError): f.unlink() # corrupt file, remove cleaned += 1 if cleaned: logger.info(f"Startup cleanup: removed {cleaned} stale pending files") def get_pending_requests(self) -> list[ApprovalRequest]: """Read all pending approval requests. Skips files older than 30 minutes.""" requests = [] fields = {f.name for f in ApprovalRequest.__dataclass_fields__.values()} now = time.time() MAX_AGE = 1800 # 30 minutes (matches Discord button timeout) for f in self.pending_dir.glob("*.json"): try: data = json.loads(f.read_text(encoding="utf-8-sig")) ts = data.get("timestamp", 0) if now - ts > MAX_AGE: # Too old — mark expired and skip data["status"] = "expired" f.write_text( json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8", ) continue if data.get("status") == "pending": # Filter to known fields only filtered = {k: v for k, v in data.items() if k in fields} requests.append(ApprovalRequest(**filtered)) except (json.JSONDecodeError, TypeError, OSError) as e: logger.warning(f"Bad pending request {f.name}: {e}") return requests def read_pending_request(self, request_id: str) -> ApprovalRequest | None: """Re-read a specific pending request (to get merged data).""" f = self.pending_dir / f"{request_id}.json" if not f.exists(): return None try: data = json.loads(f.read_text(encoding="utf-8-sig")) fields = {fn.name for fn in ApprovalRequest.__dataclass_fields__.values()} filtered = {k: v for k, v in data.items() if k in fields} return ApprovalRequest(**filtered) except (json.JSONDecodeError, TypeError, OSError): return None def write_response(self, response: UserResponse): """Write a user response to the response directory.""" response.timestamp = time.time() filename = f"{response.request_id}.json" filepath = self.response_dir / filename filepath.write_text( json.dumps(asdict(response), ensure_ascii=False, indent=2), encoding="utf-8" ) logger.info(f"Response written: {filename} (approved={response.approved})") # Delete pending file after processing (prevents re-processing and accumulation) pending_file = self.pending_dir / filename if pending_file.exists(): try: pending_file.unlink() except OSError: pass def write_command(self, conversation_id: str, text: str, *, project_name: str = ""): """Write a user text command for Antigravity to consume.""" cmd_id = f"{int(time.time() * 1000)}" filepath = self.commands_dir / f"{cmd_id}.json" data = { "id": cmd_id, "conversation_id": conversation_id, "project_name": project_name, "text": text, "timestamp": time.time(), "consumed": False, } filepath.write_text( json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8" ) logger.info(f"Command written: {cmd_id} → project={project_name}") return cmd_id