"""Gemini CLI ↔ Discord Bridge 프로토타입. Gemini CLI를 영속 프로세스로 유지하고, stdout → Discord / Discord → stdin 파이핑을 테스트합니다. """ import asyncio import re import logging import sys import os import discord from discord.ext import commands logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s") logger = logging.getLogger("cli-bridge") # ───────────────────────────────────── # 설정 # ───────────────────────────────────── DISCORD_TOKEN = os.getenv("DISCORD_TOKEN", "") TEST_CHANNEL_ID = int(os.getenv("TEST_CHANNEL_ID", "0")) PROJECT_DIR = os.getenv("PROJECT_DIR", r"C:\Users\Certes\Desktop\VW_Proj\test_bridge") GEMINI_MODEL = "gemini-3-flash-preview" _IS_WIN = sys.platform == "win32" # ANSI 이스케이프 코드 제거 패턴 ANSI_ESCAPE = re.compile(r'\x1b\[[0-9;]*[a-zA-Z]|\x1b\].*?\x07|\x1b[()][AB012]') # Gemini CLI 대화형 프롬프트 감지 패턴 ACTION_REQUIRED_PATTERN = re.compile(r'Action Required', re.IGNORECASE) CHOICE_PATTERN = re.compile(r'^\s*[●•]\s*(\d+)\.\s*(.+)', re.MULTILINE) QUESTION_PATTERN = re.compile(r'\?\s*$', re.MULTILINE) # ───────────────────────────────────── # GeminiProcess: 영속 프로세스 관리 # ───────────────────────────────────── class GeminiProcess: """채널 하나에 대응하는 영속 Gemini CLI 프로세스.""" def __init__(self, cwd: str, on_output=None, on_prompt=None): self.cwd = cwd self.proc: asyncio.subprocess.Process | None = None self.on_output = on_output # 콜백: 일반 출력 self.on_prompt = on_prompt # 콜백: 대화형 프롬프트 self._running = False self._buffer = "" # 출력 버퍼 (청크 수집) self._flush_task: asyncio.Task | None = None async def start(self): """Gemini CLI 프로세스 시작.""" os.makedirs(self.cwd, exist_ok=True) if _IS_WIN: cmd = ["cmd", "/c", "gemini", "--model", GEMINI_MODEL, "--approval-mode", "yolo"] else: cmd = ["gemini", "--model", GEMINI_MODEL, "--approval-mode", "yolo"] self.proc = await asyncio.create_subprocess_exec( *cmd, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=self.cwd, ) self._running = True logger.info(f"Gemini 프로세스 시작: pid={self.proc.pid}, cwd={self.cwd}") # stdout/stderr 읽기 태스크 시작 asyncio.create_task(self._read_stdout()) asyncio.create_task(self._read_stderr()) async def _read_stdout(self): """stdout을 실시간으로 읽어서 버퍼에 수집.""" while self._running and self.proc: try: chunk = await self.proc.stdout.read(4096) if not chunk: break raw = chunk.decode("utf-8", errors="replace") # 콘솔에 원본 출력 (사용자가 볼 수 있도록) sys.stdout.write(raw) sys.stdout.flush() text = ANSI_ESCAPE.sub("", raw) # ANSI 코드 제거 await self._handle_output(text) except Exception as e: logger.error(f"stdout 읽기 오류: {e}") break self._running = False logger.info("Gemini 프로세스 종료됨") if self.on_output: await self.on_output("\n🔴 Gemini 프로세스가 종료되었습니다.") async def _read_stderr(self): """stderr도 콘솔에 출력.""" while self._running and self.proc: try: chunk = await self.proc.stderr.read(4096) if not chunk: break raw = chunk.decode("utf-8", errors="replace") # stderr도 콘솔에 표시 sys.stderr.write(raw) sys.stderr.flush() except Exception: break async def _handle_output(self, text: str): """출력 처리: 버퍼에 모으고 일정 시간 후 flush.""" self._buffer += text # 대화형 프롬프트 감지 if ACTION_REQUIRED_PATTERN.search(self._buffer): await self._flush_buffer(is_prompt=True) return # 기존 flush 타이머 취소 후 재설정 (디바운스) if self._flush_task and not self._flush_task.done(): self._flush_task.cancel() self._flush_task = asyncio.create_task(self._debounce_flush()) async def _debounce_flush(self): """1.5초 동안 추가 출력이 없으면 flush.""" await asyncio.sleep(1.5) await self._flush_buffer() async def _flush_buffer(self, is_prompt: bool = False): """버퍼 내용을 콜백으로 전달.""" if not self._buffer.strip(): return content = self._buffer.strip() self._buffer = "" # 노이즈 제거 lines = content.split("\n") cleaned = [] for line in lines: if any(noise in line for noise in [ "YOLO mode", "Loaded cached", "Welcome to Gemini", "Type /help", "Gemini CLI", "gemini-3-flash", ]): continue cleaned.append(line) content = "\n".join(cleaned).strip() if not content: return if is_prompt and self.on_prompt: # 대화형 프롬프트 (선택지 포함) await self.on_prompt(content) elif self.on_output: await self.on_output(content) async def send_input(self, text: str): """stdin으로 텍스트 전송.""" if self.proc and self.proc.stdin: self.proc.stdin.write(f"{text}\n".encode("utf-8")) await self.proc.stdin.drain() logger.info(f"stdin 전송: {text[:100]}") async def stop(self): """프로세스 종료.""" self._running = False if self.proc: self.proc.terminate() try: await asyncio.wait_for(self.proc.wait(), timeout=5) except asyncio.TimeoutError: self.proc.kill() logger.info("Gemini 프로세스 종료") @property def is_alive(self) -> bool: return self._running and self.proc and self.proc.returncode is None # ───────────────────────────────────── # Discord Bot # ───────────────────────────────────── intents = discord.Intents.default() intents.message_content = True bot = commands.Bot(command_prefix="!", intents=intents) # 채널별 Gemini 프로세스 _processes: dict[int, GeminiProcess] = {} async def _send_to_discord(channel: discord.TextChannel, content: str): """긴 텍스트를 Discord 메시지 제한(2000자)에 맞춰 분할 전송.""" if len(content) <= 1990: await channel.send(f"```\n{content}\n```") return # 2000자 단위로 분할 chunks = [] current = "" for line in content.split("\n"): if len(current) + len(line) + 10 > 1900: chunks.append(current) current = line else: current += "\n" + line if current else line if current: chunks.append(current) for i, chunk in enumerate(chunks): await channel.send(f"```\n{chunk}\n```") if i < len(chunks) - 1: await asyncio.sleep(0.5) # Rate limit 방지 async def _send_prompt_to_discord(channel: discord.TextChannel, content: str): """대화형 프롬프트를 Discord에 선택지 형태로 전송.""" # 선택지 추출 choices = CHOICE_PATTERN.findall(content) embed = discord.Embed( title="⚡ Gemini 확인 요청", description=f"```\n{content[:1500]}\n```", color=0xF39C12, ) if choices: choice_text = "\n".join(f"**{num}.** {desc}" for num, desc in choices) embed.add_field( name="선택지 (번호를 입력하세요)", value=choice_text, inline=False, ) await channel.send(embed=embed) @bot.event async def on_ready(): logger.info(f"Bridge Bot 시작: {bot.user}") @bot.event async def on_message(message: discord.Message): if message.author == bot.user or message.author.bot: return channel = message.channel channel_id = channel.id text = message.content.strip() if not text: return # !start — Gemini 프로세스 시작 if text == "!start": if channel_id in _processes and _processes[channel_id].is_alive: await channel.send("⚠️ 이미 Gemini 세션이 실행 중입니다.") return await channel.send("🟢 Gemini 세션을 시작합니다...") async def on_output(content): await _send_to_discord(channel, content) async def on_prompt(content): await _send_prompt_to_discord(channel, content) gp = GeminiProcess( cwd=PROJECT_DIR, on_output=on_output, on_prompt=on_prompt, ) _processes[channel_id] = gp await gp.start() return # !stop — 세션 종료 if text == "!stop": if channel_id in _processes: await _processes[channel_id].stop() del _processes[channel_id] await channel.send("🔴 Gemini 세션이 종료되었습니다.") else: await channel.send("실행 중인 세션이 없습니다.") return # 일반 메시지 → Gemini stdin으로 전달 if channel_id in _processes and _processes[channel_id].is_alive: gp = _processes[channel_id] await gp.send_input(text) # 타이핑 표시 async with channel.typing(): await asyncio.sleep(1) else: await channel.send("💡 `!start`로 Gemini 세션을 먼저 시작하세요.") # ───────────────────────────────────── # 메인 # ───────────────────────────────────── if __name__ == "__main__": if not DISCORD_TOKEN: # .env에서 읽기 from pathlib import Path env_path = Path(__file__).parent.parent.parent / ".env" if env_path.exists(): for line in env_path.read_text(encoding="utf-8").splitlines(): if line.startswith("DISCORD_BOT_TOKEN="): DISCORD_TOKEN = line.split("=", 1)[1].strip().strip('"') if not DISCORD_TOKEN: print("DISCORD_TOKEN이 필요합니다. .env 또는 환경변수에 설정하세요.") sys.exit(1) bot.run(DISCORD_TOKEN)