refactor(bridge): migrate gravity bridge to pure websocket gateway architecture, deleting legacy local file scanners and dependencies

This commit is contained in:
Variet Worker
2026-04-11 13:06:38 +09:00
parent 5e697cd919
commit 072f83bf25
20 changed files with 756 additions and 1537 deletions

478
bot.py
View File

@@ -30,8 +30,7 @@ from parser import (
md_to_discord_text,
format_task_embed_text,
)
from watcher import BrainEvent, EventType
from bridge import BridgeProtocol, ApprovalRequest, UserResponse
from models import BrainEvent, EventType, ApprovalRequest, UserResponse
logger = logging.getLogger(__name__)
@@ -47,10 +46,9 @@ class ApprovalView(discord.ui.View):
(e.g., ✅ Allow Once / ✅ Allow This Conversation / ❌ Deny)
"""
def __init__(self, bridge: BridgeProtocol, request: ApprovalRequest,
def __init__(self, request: ApprovalRequest,
buttons: list[dict] | None = None, hub=None):
super().__init__(timeout=1800) # 30 minutes
self.bridge = bridge
self.hub = hub # WSHub instance for WS response routing
self.request = request
self.responded = False
@@ -100,12 +98,9 @@ class ApprovalView(discord.ui.View):
# Hub WS route (primary — reaches remote Extensions)
delivered = False
if self.hub:
delivered = await self.hub.send_response_to_pending_owner(self.request.request_id, {
await self.hub.send_response_to_pending_owner(self.request.request_id, {
"type": "response", "data": response_data,
})
if not delivered:
# File bridge fallback (Hub unavailable OR owner disconnected)
self.bridge.write_response(UserResponse(**response_data))
embed = interaction.message.embeds[0] if interaction.message.embeds else None
if embed:
color = discord.Color.red() if is_reject else discord.Color.green()
@@ -131,13 +126,10 @@ class ApprovalView(discord.ui.View):
"step_type": getattr(self.request, 'step_type', ''),
"project_name": getattr(self.request, 'project_name', ''),
}
delivered = False
if self.hub:
delivered = await self.hub.send_response_to_pending_owner(self.request.request_id, {
await self.hub.send_response_to_pending_owner(self.request.request_id, {
"type": "response", "data": response_data,
})
if not delivered:
self.bridge.write_response(UserResponse(**response_data))
embed = interaction.message.embeds[0] if interaction.message.embeds else None
if embed:
embed.color = discord.Color.green()
@@ -158,13 +150,10 @@ class ApprovalView(discord.ui.View):
"step_type": getattr(self.request, 'step_type', ''),
"project_name": getattr(self.request, 'project_name', ''),
}
delivered = False
if self.hub:
delivered = await self.hub.send_response_to_pending_owner(self.request.request_id, {
await self.hub.send_response_to_pending_owner(self.request.request_id, {
"type": "response", "data": response_data,
})
if not delivered:
self.bridge.write_response(UserResponse(**response_data))
embed = interaction.message.embeds[0] if interaction.message.embeds else None
if embed:
embed.color = discord.Color.red()
@@ -172,12 +161,14 @@ class ApprovalView(discord.ui.View):
await interaction.response.edit_message(embed=embed, view=None)
async def on_timeout(self):
if not self.responded:
self.bridge.write_response(UserResponse(
request_id=self.request.request_id, approved=False,
step_type=getattr(self.request, 'step_type', ''),
project_name=getattr(self.request, 'project_name', ''),
))
if not self.responded and self.hub:
await self.hub.send_response_to_pending_owner(self.request.request_id, {
"type": "response", "data": {
"request_id": self.request.request_id, "approved": False,
"step_type": getattr(self.request, 'step_type', ''),
"project_name": getattr(self.request, 'project_name', ''),
}
})
# ─── Bot ─────────────────────────────────────────────────────────────
@@ -207,7 +198,6 @@ class GravityBot(commands.Bot):
self._sent_commands: dict[str, str] = {} # request_id → command text (for MERGE edit detection)
self._ready_event = asyncio.Event()
self._channel_lock = asyncio.Lock()
self.bridge = BridgeProtocol()
self.session_category: discord.CategoryChannel | None = None
self.guild: discord.Guild | None = None
self.auto_approve_projects: set[str] = set() # projects with auto-approve enabled
@@ -233,7 +223,7 @@ class GravityBot(commands.Bot):
"project_name": kwargs.get('project_name', project),
}
# Hub route (primary — skip file bridge to prevent double delivery)
# Hub route (primary)
if self.hub:
import time as _time
cmd_data["id"] = str(int(_time.time() * 1000))
@@ -246,14 +236,6 @@ class GravityBot(commands.Bot):
asyncio.create_task(
self.hub.broadcast_to_project(project, msg)
)
return # ← WS sent, skip file bridge
# Legacy fallback (file bridge + gateway HTTP) — only when Hub is unavailable
self.bridge.write_command(project, text, **kwargs)
if self.gateway:
import time as _time
cmd_data["id"] = cmd_data.get("id", str(int(_time.time() * 1000)))
self.gateway.push_command(project, cmd_data)
def _cap_dict(self, d: dict, max_size: int = 5000):
"""Prevent memory leaks by capping dictionary sizes using insertion order (oldest first)."""
@@ -269,8 +251,6 @@ class GravityBot(commands.Bot):
async def setup_hook(self):
self.loop.create_task(self._process_events())
self.pending_approval_scanner.start()
self.chat_snapshot_scanner.start()
self._register_slash_commands()
# Register Hub handlers (if Hub is available, set after setup_hook by main.py)
asyncio.get_event_loop().call_soon(self._register_hub_handlers)
@@ -353,57 +333,12 @@ class GravityBot(commands.Bot):
logger.error("No permission to create category!")
return
# Discover existing project channels
await self._discover_channels()
# Load conversation → project registrations from Extension
self._load_registrations()
# Sync slash commands to guild
try:
self.tree.copy_global_to(guild=self.guild)
synced = await self.tree.sync(guild=self.guild)
logger.info(f"Synced {len(synced)} slash commands to guild")
except Exception as e:
logger.warning(f"Slash command sync failed: {e}")
# Open the gate
# Start WS Hub processors by ensuring ready gate is open
self._ready_event.set()
logger.info("Ready gate opened — event processing enabled")
# Start scanner loops
if not self.pending_approval_scanner.is_running():
self.pending_approval_scanner.start()
if not self.chat_snapshot_scanner.is_running():
self.chat_snapshot_scanner.start()
logger.info("Scanner loops started")
# ─── Channel Management ──────────────────────────────────────────
def _load_registrations(self):
"""Read bridge/register/ to learn conversation → project mappings."""
register_dir = self.bridge.bridge_dir / "register"
if not register_dir.exists():
return
count = 0
for f in register_dir.glob("*.json"):
try:
data = json.loads(f.read_text(encoding="utf-8-sig"))
conv_id = data.get("conversation_id", "")
project = data.get("project_name", "")
if conv_id and project:
self.conv_to_project[conv_id] = project
count += 1
except (json.JSONDecodeError, OSError):
pass
# Only log when count changes
prev = getattr(self, '_last_reg_count', -1)
if count != prev:
self._last_reg_count = count
if count:
logger.info(f"Loaded {count} conversation→project registrations")
# ─── Channel Management ──────────────────────────────────────────
@@ -618,270 +553,7 @@ class GravityBot(commands.Bot):
# ─── Approval Scanner ────────────────────────────────────────────
@tasks.loop(seconds=30) # Hub mode: WS is primary, file scan is fallback only
async def pending_approval_scanner(self):
"""Scan bridge/pending/ for new approval requests + reload registrations.
Per-tick caps prevent Discord API rate limit cascade when multiple
projects generate pending files simultaneously.
"""
try:
# Reload conv→project registrations each cycle
self._load_registrations()
# Channels are created on-demand when actual signals arrive
# (via _get_channel in snapshot scanner / approval sender)
MAX_NEW_PER_TICK = 5 # Phase 1: max new pending to process per tick
MAX_STATUS_PER_TICK = 5 # Phase 2: max status changes to process per tick
phase1_processed = 0
requests = self.bridge.get_pending_requests()
for req in requests:
if phase1_processed >= MAX_NEW_PER_TICK:
break
if req.request_id in self._sent_approval_ids:
continue
if req.discord_message_id != 0:
continue
# Learn project mapping from pending approval
project = req.project_name or Config.PROJECT_NAME
if req.conversation_id and req.conversation_id != '__global__':
self.conv_to_project[req.conversation_id] = project
# ── SafeToAutoRun: approve immediately and quietly ──
if getattr(req, "safe_to_auto_run", False):
self._cap_dict(self._sent_approval_ids)
self._sent_approval_ids[req.request_id] = True
# Generate approve response back to extension
approve_btn_index = 0
pending_file = self.bridge.pending_dir / f"{req.request_id}.json"
if pending_file.exists():
try:
pdata = json.loads(pending_file.read_text(encoding="utf-8-sig"))
btns = pdata.get("buttons")
if btns and len(btns) > 1:
reject_words = {"deny", "reject", "cancel", "reject all", "decline", "dismiss", "stop"}
for b in btns:
txt = b.get("text", "").lower().strip()
if txt not in reject_words:
approve_btn_index = b.get("index", 0)
break
except (json.JSONDecodeError, OSError):
pass
self.bridge.write_response(UserResponse(
request_id=req.request_id,
approved=True,
button_index=approve_btn_index,
step_type=getattr(req, 'step_type', ''),
project_name=project,
))
logger.info(f"SafeToAutoRun (Quietly Auto-approved): {req.request_id[:12]} project={project}")
phase1_processed += 1
continue
# ── Auto-approve: if project has auto enabled, approve immediately ──
if project in self.auto_approve_projects:
# Defence: reject-word commands should NEVER be auto-approved
# (DOM observer may create standalone "Deny" pending from file_permission UI)
reject_commands = {"deny", "reject", "cancel", "decline", "dismiss", "stop"}
if req.command.strip().lower() in reject_commands:
logger.warning(f"Auto-approve BLOCKED: command='{req.command}' is reject-word — skipping")
self._cap_dict(self._sent_approval_ids)
self._sent_approval_ids[req.request_id] = True
phase1_processed += 1
continue
self._cap_dict(self._sent_approval_ids)
self._sent_approval_ids[req.request_id] = True
# Smart button_index: read buttons array from pending file
# file_permission buttons = [Allow Once(0), Allow This Conv(1), Deny(2)]
# MUST pick non-reject button for safety
approve_btn_index = 0
pending_file = self.bridge.pending_dir / f"{req.request_id}.json"
if pending_file.exists():
try:
pdata = json.loads(pending_file.read_text(encoding="utf-8-sig"))
btns = pdata.get("buttons")
if btns and len(btns) > 1:
reject_words = {"deny", "reject", "cancel", "reject all",
"decline", "dismiss", "stop"}
for b in btns:
txt = b.get("text", "").lower().strip()
if txt not in reject_words:
approve_btn_index = b.get("index", 0)
break
except (json.JSONDecodeError, OSError):
pass
# Write auto-approve response for Extension
self.bridge.write_response(UserResponse(
request_id=req.request_id,
approved=True,
button_index=approve_btn_index,
step_type=getattr(req, 'step_type', ''),
project_name=project,
))
# Show compact auto-approved embed in Discord
channel = await self._get_channel(project)
if channel:
try:
embed = discord.Embed(
title="🤖 자동 승인됨",
description=f"✅ **{req.command}**\n\n```\n{req.description[:2000]}\n```" if getattr(req, "description", "") else f"✅ **{req.command}**",
color=discord.Color.green(),
)
embed.set_footer(text=f"auto-approve | {req.request_id[:12]}")
await channel.send(embed=embed)
except Exception as e:
logger.error(f"[AUTO-APPROVE] Discord send failed for {project}: {e}")
else:
logger.warning(f"[AUTO-APPROVE] No Discord channel for project={project} — notification skipped")
logger.info(f"Auto-approved: {req.request_id[:12]} project={project} btn_idx={approve_btn_index}")
phase1_processed += 1
continue
# Defer short-command pendings (e.g. "Run") by 4 cycles (~12s)
# to give step_probe time to merge detailed command info
# (step_probe MERGE happens ~10s after pending creation)
if len(req.command) <= 15:
if req.request_id not in self._deferred_ids:
self._deferred_ids[req.request_id] = 1
continue # skip this cycle
elif self._deferred_ids[req.request_id] < 4:
self._deferred_ids[req.request_id] += 1
# Re-read from file (step_probe may have merged)
fresh = self.bridge.read_pending_request(req.request_id)
if fresh and len(fresh.command) > 15:
req = fresh # use merged version — send now!
else:
continue # wait one more cycle
# Clean up defer tracking
self._deferred_ids.pop(req.request_id, None)
channel = await self._get_channel(project)
if channel:
self._cap_dict(self._sent_approval_ids)
self._sent_approval_ids[req.request_id] = True
self._cap_dict(self._sent_commands)
self._sent_commands[req.request_id] = req.command
await self._send_approval_request(channel, req)
phase1_processed += 1
else:
logger.warning(f"[APPROVAL] No Discord channel for project={project} — approval request skipped (rid={req.request_id[:12]})")
# ── Single-pass: handle auto_resolved, expired, and MERGE in one glob ──
phase2_processed = 0
for f in self.bridge.pending_dir.glob("*.json"):
if phase2_processed >= MAX_STATUS_PER_TICK:
break
try:
data = json.loads(f.read_text(encoding="utf-8-sig"))
status = data.get("status", "pending")
rid = data.get("request_id", "")
if status == "auto_resolved":
# FIX #5: Use _approval_messages as fallback when discord_message_id is 0
msg_id = data.get("discord_message_id", 0) or self._approval_messages.get(rid, 0)
project = data.get("project_name", Config.PROJECT_NAME)
logger.info(f"[AUTO-RESOLVED] rid={rid[:12]} project={project} msg_id={msg_id} cmd='{data.get('command', '')[:60]}'")
if msg_id:
channel = await self._get_channel(project)
if channel:
try:
msg = await channel.fetch_message(msg_id)
embed = discord.Embed(
title="✅ AG에서 직접 승인됨",
description=f"```\n{data.get('command', '')[:500]}\n```",
color=discord.Color.green(),
)
embed.set_footer(text=f"ID: {rid}")
await msg.edit(embed=embed, view=None)
logger.info(f"[AUTO-RESOLVED] ✅ Discord message {msg_id} updated")
except discord.NotFound:
logger.warning(f"[AUTO-RESOLVED] Discord message {msg_id} not found")
else:
logger.warning(f"[AUTO-RESOLVED] No msg_id for rid={rid[:12]} — cannot edit Discord message")
f.unlink()
self._deferred_ids.pop(rid, None)
self._sent_commands.pop(rid, None)
self._approval_messages.pop(rid, None)
self._sent_approval_ids.pop(rid, None)
phase2_processed += 1
elif status == "expired":
msg_id = data.get("discord_message_id", 0)
project = data.get("project_name", Config.PROJECT_NAME)
if msg_id:
channel = await self._get_channel(project)
if channel:
try:
msg = await channel.fetch_message(msg_id)
embed = discord.Embed(
title="⏰ 만료됨",
description=f"```\n{data.get('command', '')[:500]}\n```",
color=discord.Color.light_grey(),
)
embed.set_footer(text=f"ID: {rid}")
await msg.edit(embed=embed, view=None)
except discord.NotFound:
pass
f.unlink()
self._deferred_ids.pop(rid, None)
self._sent_commands.pop(rid, None)
self._sent_approval_ids.pop(rid, None)
phase2_processed += 1
elif status == "pending":
# MERGE check: step_probe updated command in already-sent pending
if rid not in self._sent_approval_ids:
continue
msg_id = data.get("discord_message_id", 0)
if not msg_id:
continue
new_cmd = data.get("command", "")
old_cmd = self._sent_commands.get(rid, "")
if new_cmd and new_cmd != old_cmd and len(new_cmd) > len(old_cmd):
self._sent_commands[rid] = new_cmd
project = data.get("project_name", Config.PROJECT_NAME)
channel = await self._get_channel(project)
if channel:
try:
msg = await channel.fetch_message(msg_id)
buttons = data.get("buttons")
desc_parts = [f"**명령어:**\n```\n{new_cmd[:1000]}\n```"]
if buttons and len(buttons) > 1:
btn_names = [b.get("text", "?") for b in buttons]
desc_parts.append(f"**선택지:** {' / '.join(btn_names)}")
desc = data.get("description", "")
if desc:
desc_parts.append(desc[:500])
embed = discord.Embed(
title="⚠️ 승인 요청",
description="\n".join(desc_parts),
color=discord.Color.orange(),
timestamp=datetime.now(timezone.utc),
)
embed.set_footer(text=f"ID: {rid}")
await msg.edit(embed=embed)
logger.info(f"MERGE edit: {rid[:12]} cmd='{new_cmd[:60]}'")
except discord.NotFound:
pass
except (json.JSONDecodeError, OSError):
pass
except Exception as e:
logger.error(f"Error scanning approvals: {e}")
@pending_approval_scanner.before_loop
async def before_scanner(self):
await self.wait_until_ready()
async def _send_approval_request(
self, channel: discord.TextChannel, request: ApprovalRequest
@@ -1133,9 +805,8 @@ class GravityBot(commands.Bot):
self._cap_dict(self._sent_approval_ids)
self._sent_approval_ids[request.request_id] = True
delivered = False
if self.hub:
delivered = await self.hub.send_response_to_pending_owner(request.request_id, {
await self.hub.send_response_to_pending_owner(request.request_id, {
"type": "response",
"data": {
"request_id": request.request_id,
@@ -1145,13 +816,6 @@ class GravityBot(commands.Bot):
"project_name": request.project_name,
},
})
if not delivered:
# File bridge fallback (Hub unavailable OR owner disconnected)
self.bridge.write_response(UserResponse(
request_id=request.request_id, approved=True,
step_type=request.step_type,
project_name=request.project_name,
))
# Send compact auto-approved embed to Discord (was missing — caused silent approvals)
channel = await self._get_channel(request.project_name)
if channel:
@@ -1282,114 +946,4 @@ class GravityBot(commands.Bot):
# ─── Chat Snapshot Scanner ─────────────────────────────────────────
@tasks.loop(seconds=30) # Hub mode: WS is primary, file scan is fallback only
async def chat_snapshot_scanner(self):
"""Scan bridge/chat_snapshots/ for AI response dumps."""
try:
snapshot_dir = self.bridge.bridge_dir / "chat_snapshots"
if not snapshot_dir.exists():
return
for f in snapshot_dir.glob("*.json"):
try:
data = json.loads(f.read_text(encoding="utf-8-sig"))
project = data.get("project_name", Config.PROJECT_NAME)
content = data.get("content", "")
attached_files = data.get("attached_files", [])
if content or attached_files:
channel = await self._get_channel(project)
if not channel:
logger.warning(f"[SNAPSHOT] No Discord channel for project={project} — snapshot skipped (len={len(content)})")
elif channel:
import io
# ── Send attached files (from Extension's writeChatSnapshotWithFiles) ──
discord_files = []
for af in attached_files:
af_name = af.get("name", "document.md")
af_content = af.get("content", "")
if af_content:
discord_files.append(discord.File(
io.BytesIO(af_content.encode("utf-8")),
filename=af_name,
))
FILE_ATTACH_THRESHOLD = 4000
if len(content) > FILE_ATTACH_THRESHOLD:
# Long chat content → summary embed + file attachment
summary = content[:500].rsplit('\n', 1)[0]
embed = discord.Embed(
title="💬 AI 대화 내용",
description=f"{summary}\n\n📎 *전체 내용은 첨부 파일 참조* ({len(content):,}자)",
color=discord.Color.purple(),
timestamp=datetime.now(timezone.utc),
)
# Add content itself as file attachment
discord_files.append(discord.File(
io.BytesIO(content.encode("utf-8")),
filename="chat_message.md",
))
try:
await channel.send(embed=embed, files=discord_files)
logger.info(f"[SNAPSHOT] Sent to #{channel.name} (file, {len(content)} chars)")
except discord.NotFound:
logger.warning(f"Channel deleted for {project}, re-creating...")
self.project_channels.pop(project, None)
channel = await self._get_channel(project)
if channel:
# Re-create files (discord.File consumed after send)
discord_files2 = []
for af in attached_files:
af_name = af.get("name", "document.md")
af_content = af.get("content", "")
if af_content:
discord_files2.append(discord.File(
io.BytesIO(af_content.encode("utf-8")),
filename=af_name,
))
discord_files2.append(discord.File(
io.BytesIO(content.encode("utf-8")),
filename="chat_message.md",
))
await channel.send(embed=embed, files=discord_files2)
logger.info(f"[SNAPSHOT] Sent to #{channel.name} after re-create (file, {len(content)} chars)")
except Exception as e:
logger.error(f"[SNAPSHOT] Discord send failed for {project}: {e}")
else:
# Short content → inline embed (original)
embed = discord.Embed(
title="💬 AI 대화 내용",
description=content,
color=discord.Color.purple(),
timestamp=datetime.now(timezone.utc),
)
try:
await channel.send(
embed=embed,
files=discord_files if discord_files else discord.utils.MISSING,
)
logger.info(f"[SNAPSHOT] Sent to #{channel.name} (inline, {len(content)} chars)")
except discord.NotFound:
logger.warning(f"Channel deleted for {project}, re-creating...")
self.project_channels.pop(project, None)
channel = await self._get_channel(project)
if channel:
await channel.send(embed=embed)
logger.info(f"[SNAPSHOT] Sent to #{channel.name} after re-create (inline)")
except Exception as e:
logger.error(f"[SNAPSHOT] Discord send failed for {project}: {e}")
f.unlink() # Cleanup
except Exception as e:
logger.warning(f"Bad chat snapshot {f.name}: {e}")
try:
f.rename(f.with_suffix('.json.failed'))
except OSError:
pass
except Exception as e:
logger.error(f"Error scanning chat snapshots: {e}")
@chat_snapshot_scanner.before_loop
async def before_chat_scanner(self):
await self.wait_until_ready()