174 lines
6.2 KiB
Python
174 lines
6.2 KiB
Python
"""Bridge protocol — file-based communication between Discord bot and Antigravity.
|
|
|
|
Bridge directory: ~/.gemini/antigravity/bridge/
|
|
Structure:
|
|
bridge/
|
|
pending/ ← Bot writes approval requests for Discord
|
|
response/ ← Bot writes user responses from Discord
|
|
commands/ ← Bot writes user text input from Discord
|
|
|
|
Protocol:
|
|
1. VS Code Extension detects pending approval → writes JSON to pending/
|
|
2. Bot reads pending/ → sends Discord message with ✅/❌ buttons
|
|
3. User clicks button → Bot writes JSON to response/
|
|
4. VS Code Extension reads response/ → executes action
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
import logging
|
|
from pathlib import Path
|
|
from dataclasses import dataclass, asdict
|
|
from enum import Enum
|
|
|
|
from config import Config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ApprovalStatus(Enum):
|
|
PENDING = "pending"
|
|
APPROVED = "approved"
|
|
REJECTED = "rejected"
|
|
TIMEOUT = "timeout"
|
|
|
|
|
|
@dataclass
|
|
class ApprovalRequest:
|
|
"""An approval request from Antigravity."""
|
|
request_id: str
|
|
conversation_id: str
|
|
command: str # The command/action needing approval
|
|
description: str # Human-readable description
|
|
timestamp: float
|
|
status: str = "pending"
|
|
discord_message_id: int = 0
|
|
project_name: str = "" # Project routing key
|
|
|
|
|
|
@dataclass
|
|
class UserResponse:
|
|
"""A user response from Discord."""
|
|
request_id: str
|
|
approved: bool
|
|
user_input: str = ""
|
|
timestamp: float = 0
|
|
button_index: int = -1 # -1 = legacy (approve/reject), 0+ = specific button index
|
|
step_type: str = "" # pass through from pending for extension routing
|
|
|
|
|
|
class BridgeProtocol:
|
|
"""Manages the file-based bridge protocol."""
|
|
|
|
def __init__(self):
|
|
self.bridge_dir = Config.BRAIN_PATH.parent / "bridge"
|
|
self.pending_dir = self.bridge_dir / "pending"
|
|
self.response_dir = self.bridge_dir / "response"
|
|
self.commands_dir = self.bridge_dir / "commands"
|
|
|
|
# Create directories
|
|
for d in [self.pending_dir, self.response_dir, self.commands_dir]:
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Startup cleanup: purge stale pending files (> 5 min old)
|
|
self._cleanup_stale_pending()
|
|
|
|
logger.info(f"Bridge protocol initialized: {self.bridge_dir}")
|
|
|
|
def _cleanup_stale_pending(self, max_age_seconds: int = 300):
|
|
"""Remove pending files older than max_age_seconds on startup."""
|
|
now = time.time()
|
|
cleaned = 0
|
|
for f in self.pending_dir.glob("*.json"):
|
|
try:
|
|
data = json.loads(f.read_text(encoding="utf-8-sig"))
|
|
ts = data.get("timestamp", 0)
|
|
if now - ts > max_age_seconds:
|
|
f.unlink()
|
|
cleaned += 1
|
|
except (json.JSONDecodeError, OSError):
|
|
f.unlink() # corrupt file, remove
|
|
cleaned += 1
|
|
if cleaned:
|
|
logger.info(f"Startup cleanup: removed {cleaned} stale pending files")
|
|
|
|
def get_pending_requests(self) -> list[ApprovalRequest]:
|
|
"""Read all pending approval requests. Skips files older than 5 minutes."""
|
|
requests = []
|
|
fields = {f.name for f in ApprovalRequest.__dataclass_fields__.values()}
|
|
now = time.time()
|
|
MAX_AGE = 300 # 5 minutes
|
|
for f in self.pending_dir.glob("*.json"):
|
|
try:
|
|
data = json.loads(f.read_text(encoding="utf-8-sig"))
|
|
ts = data.get("timestamp", 0)
|
|
if now - ts > MAX_AGE:
|
|
# Too old — mark expired and skip
|
|
data["status"] = "expired"
|
|
f.write_text(
|
|
json.dumps(data, ensure_ascii=False, indent=2),
|
|
encoding="utf-8",
|
|
)
|
|
continue
|
|
if data.get("status") == "pending":
|
|
# Filter to known fields only
|
|
filtered = {k: v for k, v in data.items() if k in fields}
|
|
requests.append(ApprovalRequest(**filtered))
|
|
except (json.JSONDecodeError, TypeError, OSError) as e:
|
|
logger.warning(f"Bad pending request {f.name}: {e}")
|
|
return requests
|
|
|
|
def read_pending_request(self, request_id: str) -> ApprovalRequest | None:
|
|
"""Re-read a specific pending request (to get merged data)."""
|
|
f = self.pending_dir / f"{request_id}.json"
|
|
if not f.exists():
|
|
return None
|
|
try:
|
|
data = json.loads(f.read_text(encoding="utf-8-sig"))
|
|
fields = {fn.name for fn in ApprovalRequest.__dataclass_fields__.values()}
|
|
filtered = {k: v for k, v in data.items() if k in fields}
|
|
return ApprovalRequest(**filtered)
|
|
except (json.JSONDecodeError, TypeError, OSError):
|
|
return None
|
|
|
|
def write_response(self, response: UserResponse):
|
|
"""Write a user response to the response directory."""
|
|
response.timestamp = time.time()
|
|
filename = f"{response.request_id}.json"
|
|
filepath = self.response_dir / filename
|
|
|
|
filepath.write_text(
|
|
json.dumps(asdict(response), ensure_ascii=False, indent=2),
|
|
encoding="utf-8"
|
|
)
|
|
logger.info(f"Response written: {filename} (approved={response.approved})")
|
|
|
|
# Delete pending file after processing (prevents re-processing and accumulation)
|
|
pending_file = self.pending_dir / filename
|
|
if pending_file.exists():
|
|
try:
|
|
pending_file.unlink()
|
|
except OSError:
|
|
pass
|
|
|
|
def write_command(self, conversation_id: str, text: str, *, project_name: str = ""):
|
|
"""Write a user text command for Antigravity to consume."""
|
|
cmd_id = f"{int(time.time() * 1000)}"
|
|
filepath = self.commands_dir / f"{cmd_id}.json"
|
|
|
|
data = {
|
|
"id": cmd_id,
|
|
"conversation_id": conversation_id,
|
|
"project_name": project_name,
|
|
"text": text,
|
|
"timestamp": time.time(),
|
|
"consumed": False,
|
|
}
|
|
|
|
filepath.write_text(
|
|
json.dumps(data, ensure_ascii=False, indent=2),
|
|
encoding="utf-8"
|
|
)
|
|
logger.info(f"Command written: {cmd_id} → project={project_name}")
|
|
return cmd_id
|