- _get_channel_history에서 embed title/description/fields 추출 - 봇의 clarify 질문이 히스토리에 포함되어 맥락 유지 - planner 프롬프트 강화: 불필요한 태스크 분할 방지
323 lines
11 KiB
Python
323 lines
11 KiB
Python
"""Gemini CLI ↔ Discord Bridge 프로토타입.
|
|
|
|
Gemini CLI를 영속 프로세스로 유지하고,
|
|
stdout → Discord / Discord → stdin 파이핑을 테스트합니다.
|
|
"""
|
|
|
|
import asyncio
|
|
import re
|
|
import logging
|
|
import sys
|
|
import os
|
|
import discord
|
|
from discord.ext import commands
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s")
|
|
logger = logging.getLogger("cli-bridge")
|
|
|
|
# ─────────────────────────────────────
|
|
# 설정
|
|
# ─────────────────────────────────────
|
|
|
|
DISCORD_TOKEN = os.getenv("DISCORD_TOKEN", "")
|
|
TEST_CHANNEL_ID = int(os.getenv("TEST_CHANNEL_ID", "0"))
|
|
PROJECT_DIR = os.getenv("PROJECT_DIR", r"C:\Users\Certes\Desktop\VW_Proj\test_bridge")
|
|
GEMINI_MODEL = "gemini-3-flash-preview"
|
|
|
|
_IS_WIN = sys.platform == "win32"
|
|
|
|
# ANSI 이스케이프 코드 제거 패턴
|
|
ANSI_ESCAPE = re.compile(r'\x1b\[[0-9;]*[a-zA-Z]|\x1b\].*?\x07|\x1b[()][AB012]')
|
|
|
|
# Gemini CLI 대화형 프롬프트 감지 패턴
|
|
ACTION_REQUIRED_PATTERN = re.compile(r'Action Required', re.IGNORECASE)
|
|
CHOICE_PATTERN = re.compile(r'^\s*[●•]\s*(\d+)\.\s*(.+)', re.MULTILINE)
|
|
QUESTION_PATTERN = re.compile(r'\?\s*$', re.MULTILINE)
|
|
|
|
|
|
# ─────────────────────────────────────
|
|
# GeminiProcess: 영속 프로세스 관리
|
|
# ─────────────────────────────────────
|
|
|
|
class GeminiProcess:
|
|
"""채널 하나에 대응하는 영속 Gemini CLI 프로세스."""
|
|
|
|
def __init__(self, cwd: str, on_output=None, on_prompt=None):
|
|
self.cwd = cwd
|
|
self.proc: asyncio.subprocess.Process | None = None
|
|
self.on_output = on_output # 콜백: 일반 출력
|
|
self.on_prompt = on_prompt # 콜백: 대화형 프롬프트
|
|
self._running = False
|
|
self._buffer = "" # 출력 버퍼 (청크 수집)
|
|
self._flush_task: asyncio.Task | None = None
|
|
|
|
async def start(self):
|
|
"""Gemini CLI 프로세스 시작."""
|
|
os.makedirs(self.cwd, exist_ok=True)
|
|
|
|
if _IS_WIN:
|
|
cmd = ["cmd", "/c", "gemini", "--model", GEMINI_MODEL,
|
|
"--approval-mode", "yolo"]
|
|
else:
|
|
cmd = ["gemini", "--model", GEMINI_MODEL, "--approval-mode", "yolo"]
|
|
|
|
self.proc = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdin=asyncio.subprocess.PIPE,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
cwd=self.cwd,
|
|
)
|
|
self._running = True
|
|
logger.info(f"Gemini 프로세스 시작: pid={self.proc.pid}, cwd={self.cwd}")
|
|
|
|
# stdout/stderr 읽기 태스크 시작
|
|
asyncio.create_task(self._read_stdout())
|
|
asyncio.create_task(self._read_stderr())
|
|
|
|
async def _read_stdout(self):
|
|
"""stdout을 실시간으로 읽어서 버퍼에 수집."""
|
|
while self._running and self.proc:
|
|
try:
|
|
chunk = await self.proc.stdout.read(4096)
|
|
if not chunk:
|
|
break
|
|
raw = chunk.decode("utf-8", errors="replace")
|
|
# 콘솔에 원본 출력 (사용자가 볼 수 있도록)
|
|
sys.stdout.write(raw)
|
|
sys.stdout.flush()
|
|
text = ANSI_ESCAPE.sub("", raw) # ANSI 코드 제거
|
|
await self._handle_output(text)
|
|
except Exception as e:
|
|
logger.error(f"stdout 읽기 오류: {e}")
|
|
break
|
|
|
|
self._running = False
|
|
logger.info("Gemini 프로세스 종료됨")
|
|
if self.on_output:
|
|
await self.on_output("\n🔴 Gemini 프로세스가 종료되었습니다.")
|
|
|
|
async def _read_stderr(self):
|
|
"""stderr도 콘솔에 출력."""
|
|
while self._running and self.proc:
|
|
try:
|
|
chunk = await self.proc.stderr.read(4096)
|
|
if not chunk:
|
|
break
|
|
raw = chunk.decode("utf-8", errors="replace")
|
|
# stderr도 콘솔에 표시
|
|
sys.stderr.write(raw)
|
|
sys.stderr.flush()
|
|
except Exception:
|
|
break
|
|
|
|
async def _handle_output(self, text: str):
|
|
"""출력 처리: 버퍼에 모으고 일정 시간 후 flush."""
|
|
self._buffer += text
|
|
|
|
# 대화형 프롬프트 감지
|
|
if ACTION_REQUIRED_PATTERN.search(self._buffer):
|
|
await self._flush_buffer(is_prompt=True)
|
|
return
|
|
|
|
# 기존 flush 타이머 취소 후 재설정 (디바운스)
|
|
if self._flush_task and not self._flush_task.done():
|
|
self._flush_task.cancel()
|
|
self._flush_task = asyncio.create_task(self._debounce_flush())
|
|
|
|
async def _debounce_flush(self):
|
|
"""1.5초 동안 추가 출력이 없으면 flush."""
|
|
await asyncio.sleep(1.5)
|
|
await self._flush_buffer()
|
|
|
|
async def _flush_buffer(self, is_prompt: bool = False):
|
|
"""버퍼 내용을 콜백으로 전달."""
|
|
if not self._buffer.strip():
|
|
return
|
|
|
|
content = self._buffer.strip()
|
|
self._buffer = ""
|
|
|
|
# 노이즈 제거
|
|
lines = content.split("\n")
|
|
cleaned = []
|
|
for line in lines:
|
|
if any(noise in line for noise in [
|
|
"YOLO mode", "Loaded cached", "Welcome to Gemini",
|
|
"Type /help", "Gemini CLI", "gemini-3-flash",
|
|
]):
|
|
continue
|
|
cleaned.append(line)
|
|
content = "\n".join(cleaned).strip()
|
|
|
|
if not content:
|
|
return
|
|
|
|
if is_prompt and self.on_prompt:
|
|
# 대화형 프롬프트 (선택지 포함)
|
|
await self.on_prompt(content)
|
|
elif self.on_output:
|
|
await self.on_output(content)
|
|
|
|
async def send_input(self, text: str):
|
|
"""stdin으로 텍스트 전송."""
|
|
if self.proc and self.proc.stdin:
|
|
self.proc.stdin.write(f"{text}\n".encode("utf-8"))
|
|
await self.proc.stdin.drain()
|
|
logger.info(f"stdin 전송: {text[:100]}")
|
|
|
|
async def stop(self):
|
|
"""프로세스 종료."""
|
|
self._running = False
|
|
if self.proc:
|
|
self.proc.terminate()
|
|
try:
|
|
await asyncio.wait_for(self.proc.wait(), timeout=5)
|
|
except asyncio.TimeoutError:
|
|
self.proc.kill()
|
|
logger.info("Gemini 프로세스 종료")
|
|
|
|
@property
|
|
def is_alive(self) -> bool:
|
|
return self._running and self.proc and self.proc.returncode is None
|
|
|
|
|
|
# ─────────────────────────────────────
|
|
# Discord Bot
|
|
# ─────────────────────────────────────
|
|
|
|
intents = discord.Intents.default()
|
|
intents.message_content = True
|
|
bot = commands.Bot(command_prefix="!", intents=intents)
|
|
|
|
# 채널별 Gemini 프로세스
|
|
_processes: dict[int, GeminiProcess] = {}
|
|
|
|
|
|
async def _send_to_discord(channel: discord.TextChannel, content: str):
|
|
"""긴 텍스트를 Discord 메시지 제한(2000자)에 맞춰 분할 전송."""
|
|
if len(content) <= 1990:
|
|
await channel.send(f"```\n{content}\n```")
|
|
return
|
|
|
|
# 2000자 단위로 분할
|
|
chunks = []
|
|
current = ""
|
|
for line in content.split("\n"):
|
|
if len(current) + len(line) + 10 > 1900:
|
|
chunks.append(current)
|
|
current = line
|
|
else:
|
|
current += "\n" + line if current else line
|
|
if current:
|
|
chunks.append(current)
|
|
|
|
for i, chunk in enumerate(chunks):
|
|
await channel.send(f"```\n{chunk}\n```")
|
|
if i < len(chunks) - 1:
|
|
await asyncio.sleep(0.5) # Rate limit 방지
|
|
|
|
|
|
async def _send_prompt_to_discord(channel: discord.TextChannel, content: str):
|
|
"""대화형 프롬프트를 Discord에 선택지 형태로 전송."""
|
|
# 선택지 추출
|
|
choices = CHOICE_PATTERN.findall(content)
|
|
|
|
embed = discord.Embed(
|
|
title="⚡ Gemini 확인 요청",
|
|
description=f"```\n{content[:1500]}\n```",
|
|
color=0xF39C12,
|
|
)
|
|
|
|
if choices:
|
|
choice_text = "\n".join(f"**{num}.** {desc}" for num, desc in choices)
|
|
embed.add_field(
|
|
name="선택지 (번호를 입력하세요)",
|
|
value=choice_text,
|
|
inline=False,
|
|
)
|
|
|
|
await channel.send(embed=embed)
|
|
|
|
|
|
@bot.event
|
|
async def on_ready():
|
|
logger.info(f"Bridge Bot 시작: {bot.user}")
|
|
|
|
|
|
@bot.event
|
|
async def on_message(message: discord.Message):
|
|
if message.author == bot.user or message.author.bot:
|
|
return
|
|
|
|
channel = message.channel
|
|
channel_id = channel.id
|
|
text = message.content.strip()
|
|
|
|
if not text:
|
|
return
|
|
|
|
# !start — Gemini 프로세스 시작
|
|
if text == "!start":
|
|
if channel_id in _processes and _processes[channel_id].is_alive:
|
|
await channel.send("⚠️ 이미 Gemini 세션이 실행 중입니다.")
|
|
return
|
|
|
|
await channel.send("🟢 Gemini 세션을 시작합니다...")
|
|
|
|
async def on_output(content):
|
|
await _send_to_discord(channel, content)
|
|
|
|
async def on_prompt(content):
|
|
await _send_prompt_to_discord(channel, content)
|
|
|
|
gp = GeminiProcess(
|
|
cwd=PROJECT_DIR,
|
|
on_output=on_output,
|
|
on_prompt=on_prompt,
|
|
)
|
|
_processes[channel_id] = gp
|
|
await gp.start()
|
|
return
|
|
|
|
# !stop — 세션 종료
|
|
if text == "!stop":
|
|
if channel_id in _processes:
|
|
await _processes[channel_id].stop()
|
|
del _processes[channel_id]
|
|
await channel.send("🔴 Gemini 세션이 종료되었습니다.")
|
|
else:
|
|
await channel.send("실행 중인 세션이 없습니다.")
|
|
return
|
|
|
|
# 일반 메시지 → Gemini stdin으로 전달
|
|
if channel_id in _processes and _processes[channel_id].is_alive:
|
|
gp = _processes[channel_id]
|
|
await gp.send_input(text)
|
|
# 타이핑 표시
|
|
async with channel.typing():
|
|
await asyncio.sleep(1)
|
|
else:
|
|
await channel.send("💡 `!start`로 Gemini 세션을 먼저 시작하세요.")
|
|
|
|
|
|
# ─────────────────────────────────────
|
|
# 메인
|
|
# ─────────────────────────────────────
|
|
|
|
if __name__ == "__main__":
|
|
if not DISCORD_TOKEN:
|
|
# .env에서 읽기
|
|
from pathlib import Path
|
|
env_path = Path(__file__).parent.parent.parent / ".env"
|
|
if env_path.exists():
|
|
for line in env_path.read_text(encoding="utf-8").splitlines():
|
|
if line.startswith("DISCORD_BOT_TOKEN="):
|
|
DISCORD_TOKEN = line.split("=", 1)[1].strip().strip('"')
|
|
|
|
if not DISCORD_TOKEN:
|
|
print("DISCORD_TOKEN이 필요합니다. .env 또는 환경변수에 설정하세요.")
|
|
sys.exit(1)
|
|
|
|
bot.run(DISCORD_TOKEN)
|