refactor: single project channel - guild.fetch_channels API + project_channel singleton
This commit is contained in:
282
bot.py
282
bot.py
@@ -1,15 +1,14 @@
|
||||
"""Discord bot — relays Antigravity brain events to Discord channels.
|
||||
|
||||
Dynamic channel management:
|
||||
- Creates `AG-{project_name}` channels only when file events arrive
|
||||
- NO startup channel creation — only reconnects to existing Discord channels
|
||||
- Archives channels after 10 minutes of inactivity
|
||||
Single project channel design:
|
||||
- ONE channel: AG-{PROJECT_NAME} (e.g. ag-gravity_control)
|
||||
- ALL conversations route to this single channel
|
||||
- Uses guild.fetch_channels() API, NOT cached text_channels
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
@@ -77,46 +76,15 @@ class ApprovalView(discord.ui.View):
|
||||
))
|
||||
|
||||
|
||||
# ─── Project Name Detection ─────────────────────────────────────────
|
||||
|
||||
def detect_project_name(conv_dir: Path) -> str:
|
||||
"""Extract project name from conversation artifacts.
|
||||
Returns: lowercase_with_underscores (e.g. 'gravity_control')
|
||||
Uses FIRST successful extraction and caches it.
|
||||
"""
|
||||
short_id = conv_dir.name[:8]
|
||||
|
||||
def _sanitize(raw: str) -> str:
|
||||
for suffix in ["Task Tracker", "— Task Tracker", "태스크", "구현 계획",
|
||||
"Implementation Plan", "Walkthrough"]:
|
||||
raw = raw.replace(suffix, "")
|
||||
raw = raw.strip(" —-–")
|
||||
raw = re.sub(r'[^a-zA-Z0-9가-힣\s\-]', '', raw)
|
||||
raw = re.sub(r'[\s\-]+', '_', raw).strip('_').lower()
|
||||
return raw[:30] if raw else ""
|
||||
|
||||
for fname in ["task.md", "implementation_plan.md"]:
|
||||
fpath = conv_dir / fname
|
||||
if fpath.exists():
|
||||
try:
|
||||
first_lines = fpath.read_text(encoding="utf-8").splitlines()[:5]
|
||||
for line in first_lines:
|
||||
match = re.match(r'^#\s+(.+)', line)
|
||||
if match:
|
||||
name = _sanitize(match.group(1))
|
||||
# Require at least 5 chars to avoid short generic names
|
||||
if name and name != "task" and len(name) >= 5:
|
||||
return name
|
||||
except (OSError, UnicodeDecodeError):
|
||||
pass
|
||||
|
||||
return short_id
|
||||
|
||||
|
||||
# ─── Bot ─────────────────────────────────────────────────────────────
|
||||
|
||||
class GravityBot(commands.Bot):
|
||||
"""Discord bot for Antigravity session monitoring."""
|
||||
"""Discord bot for Antigravity session monitoring.
|
||||
|
||||
Single-channel architecture:
|
||||
- ONE channel per project (ag-gravity_control)
|
||||
- self.project_channel is the singleton — trivially prevents duplication
|
||||
"""
|
||||
|
||||
def __init__(self, event_queue: asyncio.Queue):
|
||||
intents = discord.Intents.default()
|
||||
@@ -125,20 +93,23 @@ class GravityBot(commands.Bot):
|
||||
super().__init__(command_prefix="!", intents=intents)
|
||||
|
||||
self.event_queue = event_queue
|
||||
self.session_channels: dict[str, discord.TextChannel] = {}
|
||||
self.session_status_messages: dict[str, int] = {}
|
||||
self.session_names: dict[str, str] = {}
|
||||
self._channel_create_lock = asyncio.Lock() # SINGLE global lock
|
||||
self._sent_approval_ids: set[str] = set() # Track sent approvals
|
||||
self._ready_event = asyncio.Event() # Gate: wait until on_ready finishes
|
||||
self.project_channel: discord.TextChannel | None = None # THE channel
|
||||
self.session_status_messages: dict[str, int] = {} # conv_id → msg_id
|
||||
self._sent_approval_ids: set[str] = set()
|
||||
self._ready_event = asyncio.Event()
|
||||
self.bridge = BridgeProtocol()
|
||||
self.session_category: discord.CategoryChannel | None = None
|
||||
self.guild: discord.Guild | None = None
|
||||
|
||||
@property
|
||||
def _channel_name(self) -> str:
|
||||
"""The ONE channel name: ag-gravity_control (lowercase)."""
|
||||
return f"{Config.CHANNEL_PREFIX}-{Config.PROJECT_NAME}".lower()
|
||||
|
||||
async def setup_hook(self):
|
||||
self.loop.create_task(self._process_events())
|
||||
self.pending_approval_scanner.start()
|
||||
logger.info("Bot setup complete, event processor started")
|
||||
logger.info("Bot setup complete")
|
||||
|
||||
async def on_ready(self):
|
||||
logger.info(f"Bot connected as {self.user} (ID: {self.user.id})")
|
||||
@@ -161,131 +132,97 @@ class GravityBot(commands.Bot):
|
||||
logger.error("No permission to create category!")
|
||||
return
|
||||
|
||||
# ONLY reconnect to existing Discord channels (NO new creation)
|
||||
await self._reconnect_existing_channels()
|
||||
# Find the project channel + cleanup duplicates
|
||||
await self._init_project_channel()
|
||||
|
||||
# NOW allow event processing to begin
|
||||
# Open the gate
|
||||
self._ready_event.set()
|
||||
logger.info("Ready gate opened — event processing enabled")
|
||||
|
||||
async def _reconnect_existing_channels(self):
|
||||
"""Scan existing Discord channels and map them — MERGE same-name channels."""
|
||||
if not self.session_category:
|
||||
return
|
||||
# ─── Channel Init (ONE channel, guild.fetch_channels API) ────────
|
||||
|
||||
# Group channels by normalized name
|
||||
name_to_channel: dict[str, discord.TextChannel] = {}
|
||||
duplicates: list[discord.TextChannel] = []
|
||||
async def _init_project_channel(self):
|
||||
"""Find or create the single project channel. Delete any duplicates.
|
||||
|
||||
for ch in self.session_category.text_channels:
|
||||
if ch.topic and "Antigravity Session:" in ch.topic:
|
||||
if ch.name in name_to_channel:
|
||||
# DUPLICATE — mark for cleanup
|
||||
duplicates.append(ch)
|
||||
else:
|
||||
name_to_channel[ch.name] = ch
|
||||
Uses guild.fetch_channels() — the REAL Discord API, not the cache.
|
||||
"""
|
||||
target_name = self._channel_name
|
||||
|
||||
# Map the primary channel for each name
|
||||
count = 0
|
||||
for ch in name_to_channel.values():
|
||||
conv_id = ch.topic.replace("Antigravity Session:", "").strip()
|
||||
if conv_id:
|
||||
self.session_channels[conv_id] = ch
|
||||
await self._recover_task_message(ch, conv_id)
|
||||
count += 1
|
||||
# Fetch ALL channels from Discord API (not cache)
|
||||
all_channels = await self.guild.fetch_channels()
|
||||
|
||||
# Delete duplicate channels
|
||||
for ch in duplicates:
|
||||
matches: list[discord.TextChannel] = []
|
||||
for ch in all_channels:
|
||||
if (isinstance(ch, discord.TextChannel)
|
||||
and ch.category_id == self.session_category.id
|
||||
and ch.name == target_name):
|
||||
matches.append(ch)
|
||||
|
||||
if matches:
|
||||
# Keep the first, delete the rest
|
||||
self.project_channel = matches[0]
|
||||
logger.info(f"Found project channel: #{target_name} (id={self.project_channel.id})")
|
||||
|
||||
for dup in matches[1:]:
|
||||
try:
|
||||
await ch.delete(reason="Duplicate channel cleanup")
|
||||
logger.info(f"Deleted duplicate channel: #{ch.name}")
|
||||
await dup.delete(reason="Duplicate project channel cleanup")
|
||||
logger.info(f"Deleted duplicate: #{dup.name} (id={dup.id})")
|
||||
except (discord.Forbidden, discord.HTTPException) as e:
|
||||
logger.warning(f"Failed to delete duplicate #{ch.name}: {e}")
|
||||
logger.warning(f"Failed to delete duplicate: {e}")
|
||||
|
||||
logger.info(f"Reconnected to {count} channels, cleaned {len(duplicates)} duplicates")
|
||||
|
||||
async def _recover_task_message(
|
||||
self, channel: discord.TextChannel, conversation_id: str
|
||||
):
|
||||
if conversation_id in self.session_status_messages:
|
||||
return
|
||||
# Also delete any OLD-style channels with different names
|
||||
for ch in all_channels:
|
||||
if (isinstance(ch, discord.TextChannel)
|
||||
and ch.category_id == self.session_category.id
|
||||
and ch.name != target_name
|
||||
and ch.topic and "Antigravity Session:" in ch.topic):
|
||||
try:
|
||||
async for msg in channel.history(limit=10):
|
||||
if msg.author == self.user and msg.embeds:
|
||||
embed = msg.embeds[0]
|
||||
if embed.title and "Task" in embed.title:
|
||||
self.session_status_messages[conversation_id] = msg.id
|
||||
return
|
||||
except (discord.Forbidden, discord.HTTPException):
|
||||
pass
|
||||
await ch.delete(reason="Old-style channel cleanup")
|
||||
logger.info(f"Deleted old channel: #{ch.name}")
|
||||
except (discord.Forbidden, discord.HTTPException) as e:
|
||||
logger.warning(f"Failed to delete old channel: {e}")
|
||||
else:
|
||||
logger.info(f"No existing project channel found. Will create on first event.")
|
||||
|
||||
# ─── Channel Management ──────────────────────────────────────────
|
||||
async def _get_project_channel(self) -> discord.TextChannel:
|
||||
"""Get the project channel. Create if it doesn't exist yet.
|
||||
|
||||
async def _ensure_channel(
|
||||
self, conversation_id: str, project_name: str
|
||||
) -> discord.TextChannel:
|
||||
"""Get or create a channel. ONE channel per conv_id, guaranteed."""
|
||||
Thread-safe: only ONE channel will ever be created because
|
||||
self.project_channel acts as a singleton guard.
|
||||
"""
|
||||
if self.project_channel:
|
||||
return self.project_channel
|
||||
|
||||
# Fast path: this conv_id already has a channel — ALWAYS return it
|
||||
# (even if project name changed; name changes are cosmetic, not worth a new channel)
|
||||
if conversation_id in self.session_channels:
|
||||
return self.session_channels[conversation_id]
|
||||
|
||||
async with self._channel_create_lock:
|
||||
# Double-check after lock
|
||||
if conversation_id in self.session_channels:
|
||||
return self.session_channels[conversation_id]
|
||||
|
||||
channel_name = f"{Config.CHANNEL_PREFIX}-{project_name}"
|
||||
target_name = channel_name.lower().replace(" ", "-")
|
||||
|
||||
# Check ALL mapped channels for same name
|
||||
for cid, ch in self.session_channels.items():
|
||||
if ch.name == target_name:
|
||||
self.session_channels[conversation_id] = ch
|
||||
self.session_names[conversation_id] = project_name
|
||||
logger.info(f"Reusing mapped channel #{ch.name} for {conversation_id[:8]}")
|
||||
return ch
|
||||
|
||||
# Check Discord API — maybe channel exists but isn't in our dict
|
||||
if self.session_category:
|
||||
for ch in self.session_category.text_channels:
|
||||
if ch.name == target_name:
|
||||
self.session_channels[conversation_id] = ch
|
||||
self.session_names[conversation_id] = project_name
|
||||
logger.info(f"Found existing Discord channel #{ch.name} for {conversation_id[:8]}")
|
||||
return ch
|
||||
|
||||
# Create new channel (truly no match anywhere)
|
||||
# Create the channel
|
||||
try:
|
||||
channel = await self.guild.create_text_channel(
|
||||
name=channel_name,
|
||||
self.project_channel = await self.guild.create_text_channel(
|
||||
name=self._channel_name,
|
||||
category=self.session_category,
|
||||
topic=f"Antigravity Session: {conversation_id}",
|
||||
topic=f"Gravity Control — Antigravity Bridge",
|
||||
)
|
||||
self.session_channels[conversation_id] = channel
|
||||
self.session_names[conversation_id] = project_name
|
||||
logger.info(f"Created channel #{channel_name}")
|
||||
logger.info(f"Created project channel: #{self._channel_name}")
|
||||
|
||||
embed = discord.Embed(
|
||||
title=f"🚀 {project_name}",
|
||||
description=f"Antigravity 세션 연결됨\nSession: `{conversation_id}`",
|
||||
title=f"🚀 {Config.PROJECT_NAME}",
|
||||
description=(
|
||||
f"Antigravity Bridge 연결됨\n"
|
||||
f"모든 세션 이벤트가 이 채널로 전달됩니다."
|
||||
),
|
||||
color=discord.Color.blue(),
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
)
|
||||
await channel.send(embed=embed)
|
||||
return channel
|
||||
|
||||
await self.project_channel.send(embed=embed)
|
||||
except discord.errors.Forbidden:
|
||||
logger.error(f"No permission to create channel: {channel_name}")
|
||||
return None
|
||||
logger.error(f"No permission to create channel: {self._channel_name}")
|
||||
|
||||
# ─── Event Processing (SINGLE ROUTE) ─────────────────────────────
|
||||
return self.project_channel
|
||||
|
||||
# ─── Event Processing ─────────────────────────────────────────────
|
||||
|
||||
async def _process_events(self):
|
||||
"""Main event loop — ALL events go through here sequentially."""
|
||||
await self.wait_until_ready()
|
||||
await self._ready_event.wait() # Wait until on_ready + reconnect completes
|
||||
await self._ready_event.wait()
|
||||
logger.info("Event processor started (ready gate passed)")
|
||||
|
||||
while not self.is_closed():
|
||||
@@ -300,16 +237,13 @@ class GravityBot(commands.Bot):
|
||||
logger.error(f"Error processing event: {e}", exc_info=True)
|
||||
|
||||
async def _handle_event(self, event: BrainEvent):
|
||||
"""Route brain events to handlers — single entry point."""
|
||||
conv_dir = Config.BRAIN_PATH / event.conversation_id
|
||||
project_name = detect_project_name(conv_dir)
|
||||
|
||||
"""Route brain events to the single project channel."""
|
||||
if event.event_type == EventType.SESSION_START:
|
||||
await self._ensure_channel(event.conversation_id, project_name)
|
||||
# Just ensure channel exists, no message needed
|
||||
await self._get_project_channel()
|
||||
return
|
||||
|
||||
# FILE_CREATED or FILE_CHANGED
|
||||
channel = await self._ensure_channel(event.conversation_id, project_name)
|
||||
channel = await self._get_project_channel()
|
||||
if not channel:
|
||||
return
|
||||
|
||||
@@ -319,17 +253,15 @@ class GravityBot(commands.Bot):
|
||||
else:
|
||||
await self._send_artifact_update(channel, event)
|
||||
except discord.NotFound:
|
||||
# Channel was deleted while we held a reference
|
||||
self.session_channels.pop(event.conversation_id, None)
|
||||
self.session_status_messages.pop(event.conversation_id, None)
|
||||
logger.warning(f"Channel deleted, cleared: {event.conversation_id[:8]}")
|
||||
self.project_channel = None # Channel was deleted, recreate next time
|
||||
logger.warning("Project channel was deleted, will recreate")
|
||||
|
||||
# ─── Message Senders ─────────────────────────────────────────────
|
||||
|
||||
async def _send_task_update(
|
||||
self, channel: discord.TextChannel, event: BrainEvent
|
||||
):
|
||||
"""Send/edit task progress embed (SINGLE message, always edited)."""
|
||||
"""Send/edit task progress embed (ONE message per conv_id, always edited)."""
|
||||
progress = parse_task_progress(event.content)
|
||||
|
||||
embed = discord.Embed(
|
||||
@@ -342,7 +274,7 @@ class GravityBot(commands.Bot):
|
||||
)
|
||||
embed.set_footer(text=f"Session: {event.conversation_id[:8]}")
|
||||
|
||||
# Always try to edit existing message first
|
||||
# Try to edit existing message for this conversation
|
||||
msg_id = self.session_status_messages.get(event.conversation_id)
|
||||
if msg_id:
|
||||
try:
|
||||
@@ -378,6 +310,7 @@ class GravityBot(commands.Bot):
|
||||
color=discord.Color.blue(),
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
)
|
||||
embed.set_footer(text=f"Session: {event.conversation_id[:8]}")
|
||||
await channel.send(embed=embed)
|
||||
|
||||
# ─── Approval Scanner ────────────────────────────────────────────
|
||||
@@ -389,19 +322,11 @@ class GravityBot(commands.Bot):
|
||||
requests = self.bridge.get_pending_requests()
|
||||
for req in requests:
|
||||
if req.request_id in self._sent_approval_ids:
|
||||
continue # Already sent
|
||||
continue
|
||||
if req.discord_message_id != 0:
|
||||
continue
|
||||
|
||||
channel = self.session_channels.get(req.conversation_id)
|
||||
if not channel:
|
||||
conv_dir = Config.BRAIN_PATH / req.conversation_id
|
||||
if conv_dir.exists():
|
||||
project_name = detect_project_name(conv_dir)
|
||||
channel = await self._ensure_channel(
|
||||
req.conversation_id, project_name
|
||||
)
|
||||
|
||||
channel = await self._get_project_channel()
|
||||
if channel:
|
||||
self._sent_approval_ids.add(req.request_id)
|
||||
await self._send_approval_request(channel, req)
|
||||
@@ -449,21 +374,16 @@ class GravityBot(commands.Bot):
|
||||
if message.author == self.user:
|
||||
return
|
||||
|
||||
if not message.channel.name.startswith(Config.CHANNEL_PREFIX.lower() + "-"):
|
||||
# Only respond in the project channel
|
||||
if not self.project_channel or message.channel.id != self.project_channel.id:
|
||||
await self.process_commands(message)
|
||||
return
|
||||
|
||||
conv_id = None
|
||||
for cid, ch in self.session_channels.items():
|
||||
if ch.id == message.channel.id:
|
||||
conv_id = cid
|
||||
break
|
||||
|
||||
text = message.content.strip()
|
||||
|
||||
# Special command: !auto on/off
|
||||
if text in ("!auto on", "!auto off"):
|
||||
if conv_id:
|
||||
self.bridge.write_command(conv_id, text)
|
||||
self.bridge.write_command("__global__", text)
|
||||
enabled = text == "!auto on"
|
||||
emoji = "🟢" if enabled else "🔴"
|
||||
mode = "자동 승인" if enabled else "수동 승인"
|
||||
@@ -475,13 +395,11 @@ class GravityBot(commands.Bot):
|
||||
color=discord.Color.green() if enabled else discord.Color.red(),
|
||||
)
|
||||
await message.channel.send(embed=embed)
|
||||
else:
|
||||
await message.reply("⚠️ 채널에 연결된 세션이 없습니다.")
|
||||
return
|
||||
|
||||
# General text relay
|
||||
if conv_id and text:
|
||||
self.bridge.write_command(conv_id, text)
|
||||
# General text relay (broadcast to most recent session or global)
|
||||
if text:
|
||||
self.bridge.write_command("__global__", text)
|
||||
await message.add_reaction("📨")
|
||||
|
||||
await self.process_commands(message)
|
||||
|
||||
Reference in New Issue
Block a user