407 lines
15 KiB
Python
407 lines
15 KiB
Python
"""Discord bot — relays Antigravity brain events to Discord channels.
|
||
|
||
Dynamic channel management:
|
||
- Scans brain/ for active sessions on startup
|
||
- Creates `gravity-{project_name}` channels per active session
|
||
- Archives channels when sessions become inactive
|
||
- Project name extracted from artifact content or short conversation ID
|
||
"""
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import re
|
||
import time
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
|
||
import discord
|
||
from discord.ext import commands, tasks
|
||
|
||
from config import Config
|
||
from parser import (
|
||
parse_task_progress,
|
||
md_to_discord_text,
|
||
format_task_embed_text,
|
||
)
|
||
from watcher import BrainEvent, EventType
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def detect_project_name(conv_dir: Path) -> str:
|
||
"""Extract a human-readable project name from conversation artifacts.
|
||
|
||
Strategy:
|
||
1. Check task.md first heading for a title
|
||
2. Check implementation_plan.md first heading
|
||
3. Check any .metadata.json for summary keywords
|
||
4. Fallback to short conversation ID
|
||
|
||
Output format: lowercase_with_underscores (e.g. 'gravity_control')
|
||
"""
|
||
short_id = conv_dir.name[:8]
|
||
|
||
def _sanitize(raw: str) -> str:
|
||
"""Convert raw title to Discord-friendly channel name part."""
|
||
# Remove common suffixes
|
||
for suffix in ["Task Tracker", "— Task Tracker", "태스크", "구현 계획"]:
|
||
raw = raw.replace(suffix, "")
|
||
raw = raw.strip(" —-–")
|
||
# Keep only alphanumeric, Korean, spaces, hyphens
|
||
raw = re.sub(r'[^a-zA-Z0-9가-힣\s\-]', '', raw)
|
||
# Convert to underscore format
|
||
raw = re.sub(r'[\s\-]+', '_', raw).strip('_').lower()
|
||
return raw[:30] if raw else ""
|
||
|
||
# Try task.md title
|
||
task_file = conv_dir / "task.md"
|
||
if task_file.exists():
|
||
try:
|
||
first_lines = task_file.read_text(encoding="utf-8").splitlines()[:5]
|
||
for line in first_lines:
|
||
match = re.match(r'^#\s+(.+)', line)
|
||
if match:
|
||
name = _sanitize(match.group(1))
|
||
if name and name != "task":
|
||
return name
|
||
except (OSError, UnicodeDecodeError):
|
||
pass
|
||
|
||
# Try implementation_plan.md title
|
||
plan_file = conv_dir / "implementation_plan.md"
|
||
if plan_file.exists():
|
||
try:
|
||
first_lines = plan_file.read_text(encoding="utf-8").splitlines()[:5]
|
||
for line in first_lines:
|
||
match = re.match(r'^#\s+(.+)', line)
|
||
if match:
|
||
name = _sanitize(match.group(1))
|
||
if name:
|
||
return name
|
||
except (OSError, UnicodeDecodeError):
|
||
pass
|
||
|
||
# Try metadata summary (first 2 words)
|
||
for meta_file in conv_dir.glob("*.metadata.json"):
|
||
try:
|
||
meta = json.loads(meta_file.read_text(encoding="utf-8"))
|
||
summary = meta.get("summary", "")
|
||
words = summary.split()[:2]
|
||
if words:
|
||
name = "_".join(words).lower()
|
||
name = re.sub(r'[^a-z0-9가-힣_]', '', name)[:30]
|
||
if name:
|
||
return name
|
||
except (OSError, json.JSONDecodeError):
|
||
pass
|
||
|
||
return short_id
|
||
|
||
|
||
def is_session_active(conv_dir: Path) -> bool:
|
||
"""Check if a session is active based on file modification time."""
|
||
now = time.time()
|
||
threshold = Config.ACTIVE_TIMEOUT_SECONDS
|
||
|
||
for f in conv_dir.iterdir():
|
||
if f.is_file():
|
||
try:
|
||
mtime = f.stat().st_mtime
|
||
if now - mtime < threshold:
|
||
return True
|
||
except OSError:
|
||
continue
|
||
return False
|
||
|
||
|
||
class GravityBot(commands.Bot):
|
||
"""Discord bot for Antigravity session monitoring."""
|
||
|
||
def __init__(self, event_queue: asyncio.Queue):
|
||
intents = discord.Intents.default()
|
||
intents.message_content = True
|
||
intents.guilds = True
|
||
super().__init__(command_prefix="!", intents=intents)
|
||
|
||
self.event_queue = event_queue
|
||
# conversation_id -> channel object
|
||
self.session_channels: dict[str, discord.TextChannel] = {}
|
||
# conversation_id -> status message id (to edit in-place)
|
||
self.session_status_messages: dict[str, int] = {}
|
||
# conversation_id -> project name
|
||
self.session_names: dict[str, str] = {}
|
||
# Locks to prevent duplicate channel creation
|
||
self._channel_locks: dict[str, asyncio.Lock] = {}
|
||
# Category for session channels
|
||
self.session_category: discord.CategoryChannel | None = None
|
||
# Guild reference
|
||
self.guild: discord.Guild | None = None
|
||
|
||
async def setup_hook(self):
|
||
"""Called after login, before processing events."""
|
||
self.loop.create_task(self._process_events())
|
||
self.session_cleanup_loop.start()
|
||
logger.info("Bot setup complete, event processor started")
|
||
|
||
async def on_ready(self):
|
||
"""Called when bot is ready."""
|
||
logger.info(f"Bot connected as {self.user} (ID: {self.user.id})")
|
||
|
||
# Get guild
|
||
self.guild = self.get_guild(Config.DISCORD_GUILD_ID)
|
||
if not self.guild:
|
||
logger.error(f"Guild {Config.DISCORD_GUILD_ID} not found!")
|
||
return
|
||
|
||
# Find or create session category
|
||
category_name = "Antigravity Sessions"
|
||
self.session_category = discord.utils.get(
|
||
self.guild.categories, name=category_name
|
||
)
|
||
if not self.session_category:
|
||
try:
|
||
self.session_category = await self.guild.create_category(category_name)
|
||
logger.info(f"Created category: {category_name}")
|
||
except discord.errors.Forbidden:
|
||
logger.error("No permission to create category!")
|
||
return
|
||
|
||
# Sync existing active sessions
|
||
await self._sync_active_sessions()
|
||
|
||
async def _sync_active_sessions(self):
|
||
"""Scan brain/ and create channels for currently active sessions."""
|
||
brain_path = Config.BRAIN_PATH
|
||
if not brain_path.exists():
|
||
return
|
||
|
||
active_count = 0
|
||
for entry in brain_path.iterdir():
|
||
if entry.is_dir() and self._is_conversation_id(entry.name):
|
||
if is_session_active(entry):
|
||
project_name = detect_project_name(entry)
|
||
await self._ensure_channel(entry.name, project_name)
|
||
active_count += 1
|
||
|
||
logger.info(f"Synced {active_count} active sessions on startup")
|
||
|
||
def _is_conversation_id(self, name: str) -> bool:
|
||
"""Check if directory name looks like a UUID."""
|
||
parts = name.split("-")
|
||
return len(parts) == 5 and all(len(p) >= 4 for p in parts)
|
||
|
||
async def _ensure_channel(
|
||
self, conversation_id: str, project_name: str
|
||
) -> discord.TextChannel:
|
||
"""Get or create a Discord channel for a session (thread-safe)."""
|
||
# Fast path: already mapped
|
||
if conversation_id in self.session_channels:
|
||
return self.session_channels[conversation_id]
|
||
|
||
# Get or create a lock for this conversation
|
||
if conversation_id not in self._channel_locks:
|
||
self._channel_locks[conversation_id] = asyncio.Lock()
|
||
|
||
async with self._channel_locks[conversation_id]:
|
||
# Double-check after acquiring lock
|
||
if conversation_id in self.session_channels:
|
||
return self.session_channels[conversation_id]
|
||
|
||
channel_name = f"{Config.CHANNEL_PREFIX}-{project_name}"
|
||
|
||
# Check if channel already exists in category (from previous run)
|
||
if self.session_category:
|
||
for ch in self.session_category.text_channels:
|
||
if ch.topic and conversation_id in ch.topic:
|
||
self.session_channels[conversation_id] = ch
|
||
self.session_names[conversation_id] = project_name
|
||
logger.info(f"Reconnected to existing channel #{ch.name}")
|
||
return ch
|
||
|
||
# Create new channel
|
||
try:
|
||
channel = await self.guild.create_text_channel(
|
||
name=channel_name,
|
||
category=self.session_category,
|
||
topic=f"Antigravity Session: {conversation_id}",
|
||
)
|
||
self.session_channels[conversation_id] = channel
|
||
self.session_names[conversation_id] = project_name
|
||
logger.info(f"Created channel #{channel_name}")
|
||
|
||
# Welcome embed
|
||
embed = discord.Embed(
|
||
title=f"🚀 {project_name}",
|
||
description=(
|
||
f"Antigravity 세션 연결됨\n"
|
||
f"Session: `{conversation_id}`"
|
||
),
|
||
color=discord.Color.blue(),
|
||
timestamp=datetime.now(timezone.utc),
|
||
)
|
||
await channel.send(embed=embed)
|
||
return channel
|
||
|
||
except discord.errors.Forbidden:
|
||
logger.error(f"No permission to create channel: {channel_name}")
|
||
return None
|
||
|
||
async def _process_events(self):
|
||
"""Main event processing loop."""
|
||
await self.wait_until_ready()
|
||
|
||
while not self.is_closed():
|
||
try:
|
||
event = await asyncio.wait_for(
|
||
self.event_queue.get(), timeout=5.0
|
||
)
|
||
await self._handle_event(event)
|
||
except asyncio.TimeoutError:
|
||
continue
|
||
except Exception as e:
|
||
logger.error(f"Error processing event: {e}", exc_info=True)
|
||
|
||
async def _handle_event(self, event: BrainEvent):
|
||
"""Route brain events to handlers."""
|
||
if event.event_type == EventType.SESSION_START:
|
||
conv_dir = Config.BRAIN_PATH / event.conversation_id
|
||
project_name = detect_project_name(conv_dir)
|
||
await self._ensure_channel(event.conversation_id, project_name)
|
||
|
||
elif event.event_type in (EventType.FILE_CREATED, EventType.FILE_CHANGED):
|
||
# Ensure channel exists
|
||
conv_dir = Config.BRAIN_PATH / event.conversation_id
|
||
project_name = detect_project_name(conv_dir)
|
||
channel = await self._ensure_channel(event.conversation_id, project_name)
|
||
|
||
if channel:
|
||
if event.file_name == "task.md":
|
||
await self._send_task_update(channel, event)
|
||
elif event.file_name.endswith(".metadata.json"):
|
||
await self._send_metadata_update(channel, event)
|
||
else:
|
||
await self._send_artifact_content(channel, event)
|
||
|
||
async def _send_task_update(
|
||
self, channel: discord.TextChannel, event: BrainEvent
|
||
):
|
||
"""Send task progress update as an embed."""
|
||
progress = parse_task_progress(event.content)
|
||
|
||
embed = discord.Embed(
|
||
title="📋 Task 진행 현황",
|
||
description=format_task_embed_text(progress),
|
||
color=discord.Color.gold() if progress.in_progress > 0
|
||
else discord.Color.green() if progress.done == progress.total
|
||
else discord.Color.greyple(),
|
||
timestamp=datetime.now(timezone.utc),
|
||
)
|
||
short_id = event.conversation_id[:8]
|
||
embed.set_footer(text=f"Session: {short_id}")
|
||
|
||
# Edit existing or send new
|
||
msg_id = self.session_status_messages.get(event.conversation_id)
|
||
if msg_id:
|
||
try:
|
||
msg = await channel.fetch_message(msg_id)
|
||
await msg.edit(embed=embed)
|
||
return
|
||
except (discord.NotFound, discord.HTTPException):
|
||
pass
|
||
|
||
msg = await channel.send(embed=embed)
|
||
self.session_status_messages[event.conversation_id] = msg.id
|
||
|
||
async def _send_artifact_content(
|
||
self, channel: discord.TextChannel, event: BrainEvent
|
||
):
|
||
"""Send artifact file content as Discord text messages."""
|
||
labels = {
|
||
"implementation_plan.md": "📐 구현 계획",
|
||
"walkthrough.md": "📝 작업 결과 요약",
|
||
}
|
||
label = labels.get(event.file_name, f"📄 {event.file_name}")
|
||
event_label = "생성" if event.event_type == EventType.FILE_CREATED else "업데이트"
|
||
|
||
await channel.send(f"**{label} ({event_label}됨)**")
|
||
|
||
chunks = md_to_discord_text(event.content)
|
||
for chunk in chunks:
|
||
if chunk.strip():
|
||
await channel.send(chunk)
|
||
await asyncio.sleep(0.5)
|
||
|
||
async def _send_metadata_update(
|
||
self, channel: discord.TextChannel, event: BrainEvent
|
||
):
|
||
"""Send artifact metadata summary changes as compact embed."""
|
||
try:
|
||
meta = json.loads(event.content)
|
||
except json.JSONDecodeError:
|
||
return
|
||
|
||
summary = meta.get("summary", "")
|
||
artifact_type = meta.get("artifactType", "")
|
||
updated_at = meta.get("updatedAt", "")
|
||
|
||
if not summary:
|
||
return
|
||
|
||
# Map artifact types to emoji
|
||
type_emoji = {
|
||
"ARTIFACT_TYPE_TASK": "📋",
|
||
"ARTIFACT_TYPE_IMPLEMENTATION_PLAN": "📐",
|
||
"ARTIFACT_TYPE_WALKTHROUGH": "📝",
|
||
"ARTIFACT_TYPE_OTHER": "📄",
|
||
}
|
||
emoji = type_emoji.get(artifact_type, "📄")
|
||
|
||
# Artifact name from filename (strip .metadata.json)
|
||
artifact_name = event.file_name.replace(".metadata.json", "")
|
||
|
||
embed = discord.Embed(
|
||
description=f"{emoji} **{artifact_name}** 업데이트\n\n{summary[:500]}",
|
||
color=discord.Color.dark_teal(),
|
||
timestamp=datetime.now(timezone.utc),
|
||
)
|
||
|
||
await channel.send(embed=embed)
|
||
|
||
@tasks.loop(minutes=5)
|
||
async def session_cleanup_loop(self):
|
||
"""Periodically check for inactive sessions and archive their channels."""
|
||
if not self.guild:
|
||
return
|
||
|
||
to_remove = []
|
||
for conv_id, channel in self.session_channels.items():
|
||
conv_dir = Config.BRAIN_PATH / conv_id
|
||
if not conv_dir.exists() or not is_session_active(conv_dir):
|
||
to_remove.append(conv_id)
|
||
|
||
for conv_id in to_remove:
|
||
channel = self.session_channels.pop(conv_id, None)
|
||
self.session_status_messages.pop(conv_id, None)
|
||
name = self.session_names.pop(conv_id, conv_id[:8])
|
||
|
||
if channel:
|
||
try:
|
||
embed = discord.Embed(
|
||
title="🔴 세션 비활성",
|
||
description=f"`{name}` 세션이 비활성 상태입니다.",
|
||
color=discord.Color.red(),
|
||
timestamp=datetime.now(timezone.utc),
|
||
)
|
||
await channel.send(embed=embed)
|
||
await channel.edit(name=f"closed-{name[:20]}")
|
||
logger.info(f"Archived channel for {conv_id[:8]}")
|
||
except discord.errors.Forbidden:
|
||
logger.warning(f"No permission to archive {conv_id[:8]}")
|
||
except Exception as e:
|
||
logger.warning(f"Failed to archive {conv_id[:8]}: {e}")
|
||
|
||
@session_cleanup_loop.before_loop
|
||
async def before_cleanup(self):
|
||
await self.wait_until_ready()
|