feat: Coder를 에이전트 모드로 전환 + 리뷰 재시도 루프

핵심 변경:
- gemini_caller.py: call_agent() 추가 (cwd 지원, 5분 타임아웃)
  Gemini가 프로젝트 디렉토리에서 직접 파일 읽기/쓰기/실행
- task_pipeline.py: Coder가 call_agent() 사용, file_applier 의존 제거
  리뷰 실패 시 최대 2회 재시도 (피드백 포함)
- discord_bot.py: pipeline.execute() 호출로 단순화
- coder.md: 파일 직접 쓰기 지시 (코드블록 출력 금지)
- 검증: echo prompt | gemini --cwd=VW_Proj → test_agent.txt 생성 확인
This commit is contained in:
2026-03-06 22:13:06 +09:00
parent 83c043863c
commit bccc673713
6 changed files with 483 additions and 187 deletions

View File

@@ -1,11 +1,15 @@
"""GeminiCaller — gemini headless 호출.
"""GeminiCaller — gemini CLI 호출.
인자 분리, 세마포어 기반 동시성 제어, 에러 처리 개선.
두 가지 모드:
1. call() — 텍스트 입출력 (분류/리뷰/총평)
2. call_agent() — 프로젝트 디렉토리에서 에이전트 실행 (코딩)
→ Gemini가 직접 파일 읽기/쓰기/명령 실행
"""
import asyncio
import logging
import time
import sys
from pathlib import Path
logger = logging.getLogger("variet.gemini")
@@ -15,6 +19,9 @@ ROLE_PROMPTS_DIR = Path(__file__).parent.parent / "prompts"
# 동시 호출 제한 (Gemini AI Ultra 120RPM 고려)
_semaphore = asyncio.Semaphore(4)
# Windows에서 PS ExecutionPolicy 우회
_IS_WIN = sys.platform == "win32"
class GeminiCallError(Exception):
"""Gemini CLI 호출 실패."""
@@ -22,32 +29,53 @@ class GeminiCallError(Exception):
class GeminiCaller:
"""Gemini CLI headless 호출을 관리합니다."""
"""Gemini CLI 호출을 관리합니다."""
def __init__(self, project_path: str = None):
self.project_path = project_path
self.call_count = 0
self.last_call_time = 0.0
async def call(self, role: str, context: str, timeout: int = 120) -> str:
"""역할별 프롬프트로 gemini 호출.
def _build_cmd(self):
"""OS에 맞는 gemini 커맨드 빌드."""
if _IS_WIN:
return ["cmd", "/c", "gemini", "--approval-mode", "yolo"]
return ["gemini", "--approval-mode", "yolo"]
세마포어로 동시 호출 수 제한.
인자를 분리하여 안정적 subprocess 실행.
def _clean_output(self, raw: str) -> str:
"""Gemini 출력에서 노이즈 라인 제거."""
lines = raw.splitlines()
cleaned = []
for line in lines:
if any(noise in line for noise in [
"YOLO mode", "Loaded cached", "Welcome to Gemini",
"Type /help", "Gemini CLI",
]):
continue
cleaned.append(line)
return "\n".join(cleaned).strip()
# ──────────────────────────────────────────
# 텍스트 모드 (분류/리뷰/총평)
# ──────────────────────────────────────────
async def call(self, role: str, context: str, timeout: int = 120) -> str:
"""역할별 프롬프트로 텍스트 생성.
파일 접근 없이 텍스트만 주고받는 역할에 사용.
(unified, reviewer, summarizer)
"""
async with _semaphore:
return await self._call_impl(role, context, timeout)
return await self._call_text(role, context, timeout)
async def _call_impl(self, role: str, context: str, timeout: int) -> str:
"""실제 gemini 호출 구현."""
# 시스템 프롬프트 로드
async def _call_text(self, role: str, context: str, timeout: int) -> str:
"""텍스트 전용 호출."""
prompt_file = ROLE_PROMPTS_DIR / f"{role}.md"
if prompt_file.exists():
system_prompt = prompt_file.read_text(encoding="utf-8")
else:
system_prompt = f"You are a {role}. Respond in Korean."
# 시스템 프롬프트 + 컨텍스트를 하나의 입력으로 합침
full_input = (
f"=== SYSTEM INSTRUCTIONS ===\n"
f"{system_prompt}\n\n"
@@ -56,15 +84,8 @@ class GeminiCaller:
)
try:
# Windows: cmd /c gemini (PS ExecutionPolicy가 .ps1을 차단하므로)
import sys
if sys.platform == "win32":
cmd = ["cmd", "/c", "gemini", "--approval-mode", "yolo"]
else:
cmd = ["gemini", "--approval-mode", "yolo"]
proc = await asyncio.create_subprocess_exec(
*cmd,
*self._build_cmd(),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
@@ -77,42 +98,101 @@ class GeminiCaller:
self.call_count += 1
self.last_call_time = time.time()
output = stdout.decode("utf-8", errors="replace").strip()
result = self._clean_output(stdout.decode("utf-8", errors="replace"))
# 노이즈 라인 제거
lines = output.splitlines()
cleaned = []
for line in lines:
if any(noise in line for noise in [
"YOLO mode", "Loaded cached", "Welcome to Gemini",
"Type /help", "Gemini CLI",
]):
continue
cleaned.append(line)
result = "\n".join(cleaned).strip()
if not result:
logger.warning(f"Gemini [{role}] 빈 응답")
if stderr:
err = stderr.decode("utf-8", errors="replace").strip()
logger.warning(f"Gemini stderr: {err[:200]}")
if not result and stderr:
err = stderr.decode("utf-8", errors="replace").strip()
logger.warning(f"Gemini [{role}] 빈 응답. stderr: {err[:200]}")
logger.info(
f"Gemini [{role}] 호출 #{self.call_count} "
f" 입력 {len(full_input)} 출력 {len(result)}"
f"Gemini [{role}] text #{self.call_count} "
f"-- 입력 {len(full_input)}-> 출력 {len(result)}"
)
return result
except asyncio.TimeoutError:
raise GeminiCallError(f"Gemini CLI timeout ({timeout}s) role={role}")
raise GeminiCallError(f"Gemini timeout ({timeout}s) -- role={role}")
except FileNotFoundError:
raise GeminiCallError(
"gemini CLI를 찾을 수 없습니다. PATH에 gemini가 있는지 확인하세요."
)
raise GeminiCallError("gemini CLI를 찾을 수 없습니다.")
except Exception as e:
raise GeminiCallError(f"Gemini CLI 호출 실패: {e}")
# ──────────────────────────────────────────
# 에이전트 모드 (코딩 — 파일 직접 읽기/쓰기)
# ──────────────────────────────────────────
async def call_agent(
self, role: str, context: str, cwd: str,
timeout: int = 300,
) -> str:
"""에이전트 모드 — 프로젝트 디렉토리에서 실행.
Gemini가 직접 파일을 읽고/쓰고/명령을 실행합니다.
cwd를 프로젝트 경로로 설정하여 파일 접근을 허용합니다.
Args:
role: 프롬프트 역할 (coder)
context: 작업 지시 (태스크 설명)
cwd: 프로젝트 루트 경로 (여기서 Gemini 실행)
timeout: 타임아웃 (에이전트는 더 길게 — 기본 5분)
"""
async with _semaphore:
return await self._call_agent_impl(role, context, cwd, timeout)
async def _call_agent_impl(
self, role: str, context: str, cwd: str, timeout: int,
) -> str:
"""에이전트 모드 구현."""
prompt_file = ROLE_PROMPTS_DIR / f"{role}.md"
if prompt_file.exists():
system_prompt = prompt_file.read_text(encoding="utf-8")
else:
system_prompt = f"You are a {role}. Respond in Korean."
full_input = (
f"=== SYSTEM INSTRUCTIONS ===\n"
f"{system_prompt}\n\n"
f"=== TASK ===\n"
f"{context}\n\n"
f"=== IMPORTANT ===\n"
f"프로젝트 루트: {cwd}\n"
f"파일을 직접 생성/수정하세요. 코드블록으로 출력하지 말고, 실제 파일로 저장하세요.\n"
f"작업 완료 후 변경한 파일 목록을 간단히 출력하세요."
)
try:
proc = await asyncio.create_subprocess_exec(
*self._build_cmd(),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd, # ★ 핵심: 프로젝트 디렉토리에서 실행
)
stdout, stderr = await asyncio.wait_for(
proc.communicate(input=full_input.encode("utf-8")),
timeout=timeout,
)
self.call_count += 1
self.last_call_time = time.time()
result = self._clean_output(stdout.decode("utf-8", errors="replace"))
logger.info(
f"Gemini [{role}] agent #{self.call_count} "
f"-- cwd={cwd} 입력 {len(full_input)}자 -> 출력 {len(result)}"
)
return result
except asyncio.TimeoutError:
raise GeminiCallError(
f"Gemini agent timeout ({timeout}s) -- role={role}, cwd={cwd}"
)
except FileNotFoundError:
raise GeminiCallError("gemini CLI를 찾을 수 없습니다.")
except Exception as e:
raise GeminiCallError(f"Gemini agent 호출 실패: {e}")
async def call_simple(self, prompt: str, timeout: int = 60) -> str:
"""시스템 프롬프트 없이 단순 호출."""
return await self.call("default", prompt, timeout)

View File

@@ -1,21 +1,26 @@
"""Task Pipeline Plan Code(병렬) → Review(배치) → 파일 적용 → 총평 기록.
"""Task Pipeline -- Plan -> Code(에이전트) -> Review -> 재시도 -> 총평 -> 기록.
docs/wiki를 프롬프트에 주입하고, 완료 시 세션 기록 + changelog 업데이트.
Coder는 에이전트 모드로 프로젝트 디렉토리에서 실행되어
Gemini가 직접 파일을 읽고/쓰고/명령을 실행합니다.
리뷰 실패 시 최대 MAX_REVIEW_RETRIES회 재시도합니다.
"""
import asyncio
import json
import logging
import re
from pathlib import Path
from core.project_indexer import ProjectIndex
from core.context_manager import ContextManager
from core.gemini_caller import GeminiCaller, GeminiCallError
from core.file_applier import parse_code_output, apply_changes
from core.docs_manager import DocsManager
MAX_REVIEW_RETRIES = 2
logger = logging.getLogger("variet.pipeline")
class TaskPipeline:
"""작업 파이프라인: Plan Code(병렬) → Review(배치) → 기록."""
"""작업 파이프라인: Plan -> Code(에이전트) -> Review(재시도) -> 기록."""
def __init__(self, project_path: str, token_budget: int = 50_000,
docs_subpath: str = "docs/wiki"):
@@ -49,13 +54,13 @@ class TaskPipeline:
# ──────────────────────────────────────────
async def plan(self, user_request: str) -> dict:
"""Planner로 작업 분해."""
structure = self.index.get_structure_summary()
"""Planner로 태스크 분해."""
context = self.ctx.gather(user_request)
docs_ctx = self._docs_context()
prompt = (
f"## User Request\n{user_request}\n\n"
f"## Project Structure\n{structure}\n\n"
f"## Project Context\n{context}\n\n"
f"## Project Docs\n{docs_ctx}\n\n"
f"Decompose this request into concrete tasks."
)
@@ -67,22 +72,22 @@ class TaskPipeline:
return plan or {"summary": response, "tasks": [], "raw": response}
# ──────────────────────────────────────────
# Code (개별 태스크)
# Code (에이전트 모드 — Gemini가 직접 파일 쓰기)
# ──────────────────────────────────────────
async def code(self, task: dict) -> str:
"""Coder로 코드 수정 (단일 태스크)."""
context = self.ctx.gather(task.get("description", task.get("title", "")))
"""에이전트 모드로 코딩 — Gemini가 직접 파일 생성/수정."""
docs_ctx = self._docs_context()
prompt = (
f"## Task\n{json.dumps(task, ensure_ascii=False, indent=2)}\n\n"
f"## Context\n{context}\n\n"
f"## Project Docs\n{docs_ctx}\n\n"
f"Implement the changes described in the task."
f"위 태스크를 구현하세요. 파일을 직접 생성/수정하세요."
)
response = await self.gemini.call("coder", prompt, timeout=180)
response = await self.gemini.call_agent(
"coder", prompt, cwd=self.project_path, timeout=300,
)
self._log("code", task.get("title", ""), response)
return response
@@ -91,7 +96,7 @@ class TaskPipeline:
# ──────────────────────────────────────────
async def code_parallel(self, tasks: list[dict]) -> list[str]:
"""여러 태스크를 병렬로 코딩."""
"""여러 태스크를 병렬로 코딩 (에이전트 모드)."""
results = await asyncio.gather(
*[self.code(task) for task in tasks],
return_exceptions=True,
@@ -139,19 +144,14 @@ class TaskPipeline:
# ──────────────────────────────────────────
async def summarize(self, user_request: str, plan: dict,
code_outputs: list[str], review: dict,
applied_files: list[dict]) -> dict:
code_outputs: list[str], review: dict) -> dict:
"""전체 작업 결과 종합 총평."""
file_changes = "\n".join(
f"- {f['path']} ({f['action']}, {f.get('lines', '?')}L)"
for f in applied_files
) if applied_files else "파일 변경 없음"
prompt = (
f"## 원래 요청\n{user_request}\n\n"
f"## 태스크 수\n{len(plan.get('tasks', []))}\n\n"
f"## 리뷰 결과\n{review.get('summary', str(review))[:500]}\n\n"
f"## 변경된 파일\n{file_changes}\n\n"
f"## 코딩 결과 요약\n"
f"{chr(10).join(o[:200] for o in code_outputs)}\n\n"
f"위 정보를 바탕으로 총평을 작성하세요."
)
@@ -168,12 +168,14 @@ class TaskPipeline:
}
# ──────────────────────────────────────────
# 전체 파이프라인
# 전체 파이프라인 (재시도 루프 포함)
# ──────────────────────────────────────────
async def execute(self, user_request: str) -> dict:
"""Plan -> Code(병렬) -> 파일 적용 -> Review -> 총평 -> 기록.
"""Plan -> Code(에이전트, 병렬) -> Review -> 재시도 -> 총평 -> 기록.
Coder가 에이전트 모드로 직접 파일을 생성/수정합니다.
리뷰 실패 시 최대 MAX_REVIEW_RETRIES회 재시도합니다.
성공/실패 모두 docs에 기록됩니다.
"""
result = {
@@ -181,9 +183,9 @@ class TaskPipeline:
"plan": None,
"code_outputs": [],
"review": None,
"applied_files": [],
"summary": None,
"errors": [],
"retry_count": 0,
}
try:
@@ -203,56 +205,62 @@ class TaskPipeline:
self.docs.record_session(user_request, result["summary"], plan)
return result
# 2. Code 병렬 실행
code_outputs = await self.code_parallel(tasks)
result["code_outputs"] = [o[:500] for o in code_outputs]
# 2. Code + Review (재시도 루프)
review = None
code_outputs = []
for attempt in range(1 + MAX_REVIEW_RETRIES):
# Code 병렬 실행 (에이전트 모드 — 파일 직접 쓰기)
code_outputs = await self.code_parallel(tasks)
result["code_outputs"] = [o[:500] for o in code_outputs]
# 에러 추적
error_count = sum(1 for o in code_outputs if o.startswith("[ERROR]"))
if error_count > 0:
result["errors"].append(f"코딩 실패: {error_count}/{len(tasks)}개 태스크")
# 3. 파일 적용
all_applied = []
parse_failures = 0
for i, output in enumerate(code_outputs):
if output.startswith("[ERROR]"):
continue
changes = parse_code_output(output)
if changes:
applied = apply_changes(changes, self.project_path)
all_applied.extend(applied)
else:
parse_failures += 1
error_count = sum(1 for o in code_outputs if o.startswith("[ERROR]"))
if error_count > 0:
result["errors"].append(
f"Task {i+1} 출력에서 파일을 추출하지 못함 "
f"(출력 {len(output)}자)"
f"코딩 실패: {error_count}/{len(tasks)}개 (시도 {attempt+1})"
)
result["applied_files"] = all_applied
if parse_failures > 0:
self._log(
"parse_warning",
f"{parse_failures}/{len(tasks)} 파싱 실패",
f"적용 성공: {len(all_applied)}개 파일",
)
# Review
review = await self.batch_review(tasks, code_outputs)
result["review"] = review
# 4. Batch Review
review = await self.batch_review(tasks, code_outputs)
result["review"] = review
# 리뷰 통과 여부 확인
passed = review.get("passed", True)
if isinstance(passed, str):
passed = passed.lower() in ("true", "yes", "pass")
# 5. 총평
if passed:
logger.info(f"리뷰 통과 (시도 {attempt+1})")
break
else:
result["retry_count"] = attempt + 1
if attempt < MAX_REVIEW_RETRIES:
logger.warning(
f"리뷰 실패 -- 재시도 {attempt+2}/{1+MAX_REVIEW_RETRIES}"
)
# 리뷰 피드백을 태스크에 추가
feedback = review.get("summary", str(review))[:500]
for task in tasks:
task["review_feedback"] = (
f"이전 시도에서 다음 리뷰 피드백을 받았습니다. "
f"반드시 수정하세요:\n{feedback}"
)
else:
result["errors"].append(
f"리뷰 {1+MAX_REVIEW_RETRIES}회 시도 모두 실패"
)
# 3. 총평
summary = await self.summarize(
user_request, plan, code_outputs, review, all_applied
user_request, plan, code_outputs, review
)
# 에러 정보를 총평에 추가
if result["errors"]:
existing_warnings = summary.get("warnings", [])
summary["warnings"] = existing_warnings + result["errors"]
if result["retry_count"] > 0:
summary["retries"] = result["retry_count"]
result["summary"] = summary
except Exception as e:
# 파이프라인 자체 실패
result["errors"].append(f"파이프라인 오류: {str(e)}")
result["summary"] = {
"title": "작업 실패",
@@ -264,7 +272,6 @@ class TaskPipeline:
self._log("pipeline_error", user_request, str(e))
finally:
# 성공/실패 모두 기록
self.docs.record_session(
user_request,
result.get("summary", {"summary": "기록 없음"}),