Files
gravity_control/bot.py

350 lines
13 KiB
Python

"""Discord bot — relays Antigravity brain events to Discord channels.
Dynamic channel management:
- Scans brain/ for active sessions on startup
- Creates `gravity-{project_name}` channels per active session
- Archives channels when sessions become inactive
- Project name extracted from artifact content or short conversation ID
"""
import asyncio
import json
import logging
import re
import time
from datetime import datetime, timezone
from pathlib import Path
import discord
from discord.ext import commands, tasks
from config import Config
from parser import (
parse_task_progress,
md_to_discord_text,
format_task_embed_text,
)
from watcher import BrainEvent, EventType
logger = logging.getLogger(__name__)
def detect_project_name(conv_dir: Path) -> str:
"""Extract a human-readable project name from conversation artifacts.
Strategy:
1. Check task.md first line for a title (# Title)
2. Check implementation_plan.md first line
3. Check any .metadata.json for summary keywords
4. Fallback to short conversation ID
"""
short_id = conv_dir.name[:8]
# Try task.md title
task_file = conv_dir / "task.md"
if task_file.exists():
try:
first_lines = task_file.read_text(encoding="utf-8").splitlines()[:5]
for line in first_lines:
match = re.match(r'^#\s+(.+?)[\s—\-]+', line)
if match:
name = match.group(1).strip()
# Sanitize for Discord channel name
name = re.sub(r'[^a-zA-Z0-9가-힣\s\-]', '', name)
name = re.sub(r'\s+', '-', name).lower()[:30]
if name:
return name
except (OSError, UnicodeDecodeError):
pass
# Try implementation_plan.md title
plan_file = conv_dir / "implementation_plan.md"
if plan_file.exists():
try:
first_lines = plan_file.read_text(encoding="utf-8").splitlines()[:5]
for line in first_lines:
match = re.match(r'^#\s+(.+?)[\s—\-]+', line)
if match:
name = match.group(1).strip()
name = re.sub(r'[^a-zA-Z0-9가-힣\s\-]', '', name)
name = re.sub(r'\s+', '-', name).lower()[:30]
if name:
return name
except (OSError, UnicodeDecodeError):
pass
# Try metadata summary
for meta_file in conv_dir.glob("*.metadata.json"):
try:
meta = json.loads(meta_file.read_text(encoding="utf-8"))
summary = meta.get("summary", "")
# Extract first meaningful noun/phrase
words = summary.split()[:3]
if words:
name = "-".join(words).lower()
name = re.sub(r'[^a-z0-9가-힣\-]', '', name)[:30]
if name:
return name
except (OSError, json.JSONDecodeError):
pass
return short_id
def is_session_active(conv_dir: Path) -> bool:
"""Check if a session is active based on file modification time."""
now = time.time()
threshold = Config.ACTIVE_TIMEOUT_SECONDS
for f in conv_dir.iterdir():
if f.is_file():
try:
mtime = f.stat().st_mtime
if now - mtime < threshold:
return True
except OSError:
continue
return False
class GravityBot(commands.Bot):
"""Discord bot for Antigravity session monitoring."""
def __init__(self, event_queue: asyncio.Queue):
intents = discord.Intents.default()
intents.message_content = True
intents.guilds = True
super().__init__(command_prefix="!", intents=intents)
self.event_queue = event_queue
# conversation_id -> channel object
self.session_channels: dict[str, discord.TextChannel] = {}
# conversation_id -> status message id (to edit in-place)
self.session_status_messages: dict[str, int] = {}
# conversation_id -> project name
self.session_names: dict[str, str] = {}
# Category for session channels
self.session_category: discord.CategoryChannel | None = None
# Guild reference
self.guild: discord.Guild | None = None
async def setup_hook(self):
"""Called after login, before processing events."""
self.loop.create_task(self._process_events())
self.session_cleanup_loop.start()
logger.info("Bot setup complete, event processor started")
async def on_ready(self):
"""Called when bot is ready."""
logger.info(f"Bot connected as {self.user} (ID: {self.user.id})")
# Get guild
self.guild = self.get_guild(Config.DISCORD_GUILD_ID)
if not self.guild:
logger.error(f"Guild {Config.DISCORD_GUILD_ID} not found!")
return
# Find or create session category
category_name = "Antigravity Sessions"
self.session_category = discord.utils.get(
self.guild.categories, name=category_name
)
if not self.session_category:
try:
self.session_category = await self.guild.create_category(category_name)
logger.info(f"Created category: {category_name}")
except discord.errors.Forbidden:
logger.error("No permission to create category!")
return
# Sync existing active sessions
await self._sync_active_sessions()
async def _sync_active_sessions(self):
"""Scan brain/ and create channels for currently active sessions."""
brain_path = Config.BRAIN_PATH
if not brain_path.exists():
return
active_count = 0
for entry in brain_path.iterdir():
if entry.is_dir() and self._is_conversation_id(entry.name):
if is_session_active(entry):
project_name = detect_project_name(entry)
await self._ensure_channel(entry.name, project_name)
active_count += 1
logger.info(f"Synced {active_count} active sessions on startup")
def _is_conversation_id(self, name: str) -> bool:
"""Check if directory name looks like a UUID."""
parts = name.split("-")
return len(parts) == 5 and all(len(p) >= 4 for p in parts)
async def _ensure_channel(
self, conversation_id: str, project_name: str
) -> discord.TextChannel:
"""Get or create a Discord channel for a session."""
# Check if channel already exists
if conversation_id in self.session_channels:
return self.session_channels[conversation_id]
channel_name = f"{Config.CHANNEL_PREFIX}-{project_name}"
# Check if channel already exists in category (from previous run)
if self.session_category:
for ch in self.session_category.text_channels:
if ch.topic and conversation_id in ch.topic:
self.session_channels[conversation_id] = ch
self.session_names[conversation_id] = project_name
logger.info(f"Reconnected to existing channel #{ch.name}")
return ch
# Create new channel
try:
channel = await self.guild.create_text_channel(
name=channel_name,
category=self.session_category,
topic=f"Antigravity Session: {conversation_id}",
)
self.session_channels[conversation_id] = channel
self.session_names[conversation_id] = project_name
logger.info(f"Created channel #{channel_name}")
# Welcome embed
embed = discord.Embed(
title=f"🚀 {project_name}",
description=(
f"Antigravity 세션 연결됨\n"
f"Session: `{conversation_id}`"
),
color=discord.Color.blue(),
timestamp=datetime.now(timezone.utc),
)
await channel.send(embed=embed)
return channel
except discord.errors.Forbidden:
logger.error(f"No permission to create channel: {channel_name}")
return None
async def _process_events(self):
"""Main event processing loop."""
await self.wait_until_ready()
while not self.is_closed():
try:
event = await asyncio.wait_for(
self.event_queue.get(), timeout=5.0
)
await self._handle_event(event)
except asyncio.TimeoutError:
continue
except Exception as e:
logger.error(f"Error processing event: {e}", exc_info=True)
async def _handle_event(self, event: BrainEvent):
"""Route brain events to handlers."""
if event.event_type == EventType.SESSION_START:
conv_dir = Config.BRAIN_PATH / event.conversation_id
project_name = detect_project_name(conv_dir)
await self._ensure_channel(event.conversation_id, project_name)
elif event.event_type in (EventType.FILE_CREATED, EventType.FILE_CHANGED):
# Ensure channel exists
conv_dir = Config.BRAIN_PATH / event.conversation_id
project_name = detect_project_name(conv_dir)
channel = await self._ensure_channel(event.conversation_id, project_name)
if channel:
if event.file_name == "task.md":
await self._send_task_update(channel, event)
else:
await self._send_artifact_content(channel, event)
async def _send_task_update(
self, channel: discord.TextChannel, event: BrainEvent
):
"""Send task progress update as an embed."""
progress = parse_task_progress(event.content)
embed = discord.Embed(
title="📋 Task 진행 현황",
description=format_task_embed_text(progress),
color=discord.Color.gold() if progress.in_progress > 0
else discord.Color.green() if progress.done == progress.total
else discord.Color.greyple(),
timestamp=datetime.now(timezone.utc),
)
short_id = event.conversation_id[:8]
embed.set_footer(text=f"Session: {short_id}")
# Edit existing or send new
msg_id = self.session_status_messages.get(event.conversation_id)
if msg_id:
try:
msg = await channel.fetch_message(msg_id)
await msg.edit(embed=embed)
return
except (discord.NotFound, discord.HTTPException):
pass
msg = await channel.send(embed=embed)
self.session_status_messages[event.conversation_id] = msg.id
async def _send_artifact_content(
self, channel: discord.TextChannel, event: BrainEvent
):
"""Send artifact file content as Discord text messages."""
labels = {
"implementation_plan.md": "📐 구현 계획",
"walkthrough.md": "📝 작업 결과 요약",
}
label = labels.get(event.file_name, f"📄 {event.file_name}")
event_label = "생성" if event.event_type == EventType.FILE_CREATED else "업데이트"
await channel.send(f"**{label} ({event_label}됨)**")
chunks = md_to_discord_text(event.content)
for chunk in chunks:
if chunk.strip():
await channel.send(chunk)
await asyncio.sleep(0.5)
@tasks.loop(minutes=5)
async def session_cleanup_loop(self):
"""Periodically check for inactive sessions and archive their channels."""
if not self.guild:
return
to_remove = []
for conv_id, channel in self.session_channels.items():
conv_dir = Config.BRAIN_PATH / conv_id
if not conv_dir.exists() or not is_session_active(conv_dir):
to_remove.append(conv_id)
for conv_id in to_remove:
channel = self.session_channels.pop(conv_id, None)
self.session_status_messages.pop(conv_id, None)
name = self.session_names.pop(conv_id, conv_id[:8])
if channel:
try:
embed = discord.Embed(
title="🔴 세션 비활성",
description=f"`{name}` 세션이 비활성 상태입니다.",
color=discord.Color.red(),
timestamp=datetime.now(timezone.utc),
)
await channel.send(embed=embed)
await channel.edit(name=f"closed-{name[:20]}")
logger.info(f"Archived channel for {conv_id[:8]}")
except discord.errors.Forbidden:
logger.warning(f"No permission to archive {conv_id[:8]}")
except Exception as e:
logger.warning(f"Failed to archive {conv_id[:8]}: {e}")
@session_cleanup_loop.before_loop
async def before_cleanup(self):
await self.wait_until_ready()