Files
variet-agent/core/gemini_caller.py
CD 752d851f9f feat: Pipeline 전면 개선 — 병렬실행, Batch Review, 총평, 대화기억, 스마트라우팅
- GeminiCaller: cmd/c 제거, 인자 분리, Semaphore(4) 동시성 제어, GeminiCallError
- TaskPipeline: asyncio.gather 병렬 코딩, batch_review 1회, summarize 총평
- FileApplier: Coder 출력 파싱 → 실제 파일 적용 (경로 보안 체크)
- Discord Bot: on_message 자동채팅, 의도분류(chat/task/clarify), 대화기억(10메시지)
- Prompts: router.md (의도분류), summarizer.md (총평)
- Workflows: agent_chat 환경 경로 업데이트
2026-03-06 20:46:58 +09:00

112 lines
3.7 KiB
Python

"""GeminiCaller — gemini headless 호출.
인자 분리, 세마포어 기반 동시성 제어, 에러 처리 개선.
"""
import asyncio
import logging
import time
from pathlib import Path
logger = logging.getLogger("variet.gemini")
ROLE_PROMPTS_DIR = Path(__file__).parent.parent / "prompts"
# 동시 호출 제한 (Gemini AI Ultra 120RPM 고려)
_semaphore = asyncio.Semaphore(4)
class GeminiCallError(Exception):
"""Gemini CLI 호출 실패."""
pass
class GeminiCaller:
"""Gemini CLI headless 호출을 관리합니다."""
def __init__(self, project_path: str = None):
self.project_path = project_path
self.call_count = 0
self.last_call_time = 0.0
async def call(self, role: str, context: str, timeout: int = 120) -> str:
"""역할별 프롬프트로 gemini 호출.
세마포어로 동시 호출 수 제한.
인자를 분리하여 안정적 subprocess 실행.
"""
async with _semaphore:
return await self._call_impl(role, context, timeout)
async def _call_impl(self, role: str, context: str, timeout: int) -> str:
"""실제 gemini 호출 구현."""
# 시스템 프롬프트 로드
prompt_file = ROLE_PROMPTS_DIR / f"{role}.md"
if prompt_file.exists():
system_prompt = prompt_file.read_text(encoding="utf-8")
else:
system_prompt = f"You are a {role}. Respond in Korean."
# 시스템 프롬프트 + 컨텍스트를 하나의 입력으로 합침
full_input = (
f"=== SYSTEM INSTRUCTIONS ===\n"
f"{system_prompt}\n\n"
f"=== USER INPUT ===\n"
f"{context}"
)
try:
proc = await asyncio.create_subprocess_exec(
"gemini", "--approval-mode", "yolo",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(
proc.communicate(input=full_input.encode("utf-8")),
timeout=timeout,
)
self.call_count += 1
self.last_call_time = time.time()
output = stdout.decode("utf-8", errors="replace").strip()
# 노이즈 라인 제거
lines = output.splitlines()
cleaned = []
for line in lines:
if any(noise in line for noise in [
"YOLO mode", "Loaded cached", "Welcome to Gemini",
"Type /help", "Gemini CLI",
]):
continue
cleaned.append(line)
result = "\n".join(cleaned).strip()
if not result:
logger.warning(f"Gemini [{role}] 빈 응답")
if stderr:
err = stderr.decode("utf-8", errors="replace").strip()
logger.warning(f"Gemini stderr: {err[:200]}")
logger.info(
f"Gemini [{role}] 호출 #{self.call_count} "
f"— 입력 {len(full_input)}자 → 출력 {len(result)}"
)
return result
except asyncio.TimeoutError:
raise GeminiCallError(f"Gemini CLI timeout ({timeout}s) — role={role}")
except FileNotFoundError:
raise GeminiCallError(
"gemini CLI를 찾을 수 없습니다. PATH에 gemini가 있는지 확인하세요."
)
except Exception as e:
raise GeminiCallError(f"Gemini CLI 호출 실패: {e}")
async def call_simple(self, prompt: str, timeout: int = 60) -> str:
"""시스템 프롬프트 없이 단순 호출."""
return await self.call("default", prompt, timeout)