Files
variet-agent/core/task_pipeline.py
CD 8140f75c7f fix: 리뷰어가 실제 파일 읽기로 변경
- batch_review: Gemini stdout 대신 프로젝트의 최근 변경 파일(60초)을 읽어 리뷰
- 파일 삭제/변경 없는 작업은 자동 통과
- 에이전트 보고 + 실제 파일 내용을 함께 전달
- 최대 10개 파일, 파일당 3000자 제한
2026-03-06 22:25:26 +09:00

362 lines
15 KiB
Python

"""Task Pipeline -- Plan -> Code(에이전트) -> Review -> 재시도 -> 총평 -> 기록.
Coder는 에이전트 모드로 프로젝트 디렉토리에서 실행되어
Gemini가 직접 파일을 읽고/쓰고/명령을 실행합니다.
리뷰 실패 시 최대 MAX_REVIEW_RETRIES회 재시도합니다.
"""
import asyncio
import json
import logging
import re
from pathlib import Path
from core.project_indexer import ProjectIndex
from core.context_manager import ContextManager
from core.gemini_caller import GeminiCaller, GeminiCallError
from core.docs_manager import DocsManager
MAX_REVIEW_RETRIES = 2
logger = logging.getLogger("variet.pipeline")
class TaskPipeline:
"""작업 파이프라인: Plan -> Code(에이전트) -> Review(재시도) -> 기록."""
def __init__(self, project_path: str, token_budget: int = 50_000,
docs_subpath: str = "docs/wiki"):
self.project_path = project_path
self.index = ProjectIndex(project_path)
self.ctx = ContextManager(self.index, token_budget)
self.gemini = GeminiCaller(project_path)
self.docs = DocsManager(project_path, docs_subpath)
self.log: list[dict] = []
def setup(self):
"""프로젝트 인덱싱."""
self.index.scan()
return self
# ──────────────────────────────────────────
# Docs 컨텍스트 (모든 호출에 주입)
# ──────────────────────────────────────────
def _docs_context(self) -> str:
"""Gemini 호출에 주입할 프로젝트 문서 컨텍스트."""
index = self.docs.get_docs_index()
return (
f"\n{index}\n"
f"작업 완료 시 관련 문서가 있으면 업데이트하세요.\n"
f"docs 경로: {self.docs.docs_path}\n"
)
# ──────────────────────────────────────────
# Plan
# ──────────────────────────────────────────
async def plan(self, user_request: str) -> dict:
"""Planner로 태스크 분해."""
context = self.ctx.gather(user_request)
docs_ctx = self._docs_context()
prompt = (
f"## User Request\n{user_request}\n\n"
f"## Project Context\n{context}\n\n"
f"## Project Docs\n{docs_ctx}\n\n"
f"Decompose this request into concrete tasks."
)
response = await self.gemini.call("planner", prompt, timeout=180)
self._log("plan", user_request, response)
plan = self._extract_json(response)
return plan or {"summary": response, "tasks": [], "raw": response}
# ──────────────────────────────────────────
# Code (에이전트 모드 — Gemini가 직접 파일 쓰기)
# ──────────────────────────────────────────
async def code(self, task: dict) -> str:
"""에이전트 모드로 코딩 — Gemini가 직접 파일 생성/수정."""
docs_ctx = self._docs_context()
prompt = (
f"## Task\n{json.dumps(task, ensure_ascii=False, indent=2)}\n\n"
f"## Project Docs\n{docs_ctx}\n\n"
f"위 태스크를 구현하세요. 파일을 직접 생성/수정하세요."
)
response = await self.gemini.call_agent(
"coder", prompt, cwd=self.project_path, timeout=300,
)
self._log("code", task.get("title", ""), response)
return response
# ──────────────────────────────────────────
# Code 병렬 실행
# ──────────────────────────────────────────
async def code_parallel(self, tasks: list[dict]) -> list[str]:
"""여러 태스크를 병렬로 코딩 (에이전트 모드)."""
results = await asyncio.gather(
*[self.code(task) for task in tasks],
return_exceptions=True,
)
processed = []
for i, result in enumerate(results):
if isinstance(result, Exception):
error_msg = f"[ERROR] Task {i+1} 실패: {result}"
self._log("code_error", tasks[i].get("title", ""), error_msg)
processed.append(error_msg)
else:
processed.append(result)
return processed
# ──────────────────────────────────────────
# Batch Review
# ──────────────────────────────────────────
async def batch_review(self, tasks: list[dict], code_outputs: list[str]) -> dict:
"""에이전트가 생성/수정한 실제 파일을 리뷰.
code_outputs(에이전트 보고)와 함께 실제 프로젝트 파일 내용을 읽어서 리뷰합니다.
"""
import os
import time as _time
# 태스크 요약
task_summaries = []
for i, task in enumerate(tasks):
title = task.get("title", task.get("description", f"Task {i+1}"))
task_summaries.append(f"### Task {i+1}: {title}")
# 에이전트 보고 요약
agent_reports = []
for i, output in enumerate(code_outputs):
agent_reports.append(f"--- Agent {i+1} 보고 ---\n{output[:1000]}")
# 최근 변경된 파일 읽기 (60초 이내)
recent_files = []
cutoff = _time.time() - 60
project_root = Path(self.project_path)
skip_dirs = {".git", "__pycache__", "node_modules", ".venv", "venv"}
for root, dirs, files in os.walk(self.project_path):
dirs[:] = [d for d in dirs if d not in skip_dirs]
for fname in files:
fpath = Path(root) / fname
try:
if fpath.stat().st_mtime > cutoff:
rel = fpath.relative_to(project_root)
content = fpath.read_text(encoding="utf-8", errors="replace")
recent_files.append(
f"### {rel}\n```\n{content[:3000]}\n```"
)
except (OSError, UnicodeDecodeError):
continue
if not recent_files:
# 변경 파일 없음 → 자동 통과 (삭제 작업 등)
return {
"passed": True,
"summary": "파일 변경 없음 또는 삭제 작업 — 자동 통과",
"issues": [],
}
files_section = "\n\n".join(recent_files[:10]) # 최대 10개
prompt = (
f"## 요청된 태스크\n{chr(10).join(task_summaries)}\n\n"
f"## 에이전트 보고\n{chr(10).join(agent_reports)}\n\n"
f"## 실제 생성/수정된 파일\n{files_section}\n\n"
f"위 파일들이 태스크 요구사항을 충족하는지 리뷰하세요."
)
response = await self.gemini.call("reviewer", prompt, timeout=180)
self._log("batch_review", f"{len(tasks)} tasks, {len(recent_files)} files", response)
review = self._extract_json(response)
return review or {"passed": True, "summary": response, "raw": response}
# ──────────────────────────────────────────
# 총평
# ──────────────────────────────────────────
async def summarize(self, user_request: str, plan: dict,
code_outputs: list[str], review: dict) -> dict:
"""전체 작업 결과 종합 총평."""
prompt = (
f"## 원래 요청\n{user_request}\n\n"
f"## 태스크 수\n{len(plan.get('tasks', []))}\n\n"
f"## 리뷰 결과\n{review.get('summary', str(review))[:500]}\n\n"
f"## 코딩 결과 요약\n"
f"{chr(10).join(o[:200] for o in code_outputs)}\n\n"
f"위 정보를 바탕으로 총평을 작성하세요."
)
response = await self.gemini.call("summarizer", prompt, timeout=60)
self._log("summarize", user_request, response)
summary = self._extract_json(response)
return summary or {
"title": "작업 완료",
"summary": response,
"changes": [],
"warnings": [],
"next_steps": [],
}
# ──────────────────────────────────────────
# 전체 파이프라인 (재시도 루프 포함)
# ──────────────────────────────────────────
async def execute(self, user_request: str) -> dict:
"""Plan -> Code(에이전트, 병렬) -> Review -> 재시도 -> 총평 -> 기록.
Coder가 에이전트 모드로 직접 파일을 생성/수정합니다.
리뷰 실패 시 최대 MAX_REVIEW_RETRIES회 재시도합니다.
성공/실패 모두 docs에 기록됩니다.
"""
result = {
"request": user_request,
"plan": None,
"code_outputs": [],
"review": None,
"summary": None,
"errors": [],
"retry_count": 0,
}
try:
# 1. Plan
plan = await self.plan(user_request)
result["plan"] = plan
tasks = plan.get("tasks", [])
if not tasks:
result["summary"] = {
"title": "태스크 없음",
"summary": "Planner가 태스크를 생성하지 못했습니다.",
"changes": [],
"warnings": ["요청을 더 구체적으로 해주세요."],
"next_steps": [],
}
self.docs.record_session(user_request, result["summary"], plan)
return result
# 2. Code + Review (재시도 루프)
review = None
code_outputs = []
for attempt in range(1 + MAX_REVIEW_RETRIES):
# Code 병렬 실행 (에이전트 모드 — 파일 직접 쓰기)
code_outputs = await self.code_parallel(tasks)
result["code_outputs"] = [o[:500] for o in code_outputs]
error_count = sum(1 for o in code_outputs if o.startswith("[ERROR]"))
if error_count > 0:
result["errors"].append(
f"코딩 실패: {error_count}/{len(tasks)}개 (시도 {attempt+1})"
)
# Review
review = await self.batch_review(tasks, code_outputs)
result["review"] = review
# 리뷰 통과 여부 확인
passed = review.get("passed", True)
if isinstance(passed, str):
passed = passed.lower() in ("true", "yes", "pass")
if passed:
logger.info(f"리뷰 통과 (시도 {attempt+1})")
break
else:
result["retry_count"] = attempt + 1
if attempt < MAX_REVIEW_RETRIES:
logger.warning(
f"리뷰 실패 -- 재시도 {attempt+2}/{1+MAX_REVIEW_RETRIES}"
)
# 리뷰 피드백을 태스크에 추가
feedback = review.get("summary", str(review))[:500]
for task in tasks:
task["review_feedback"] = (
f"이전 시도에서 다음 리뷰 피드백을 받았습니다. "
f"반드시 수정하세요:\n{feedback}"
)
else:
result["errors"].append(
f"리뷰 {1+MAX_REVIEW_RETRIES}회 시도 모두 실패"
)
# 3. 총평
summary = await self.summarize(
user_request, plan, code_outputs, review
)
if result["errors"]:
existing_warnings = summary.get("warnings", [])
summary["warnings"] = existing_warnings + result["errors"]
if result["retry_count"] > 0:
summary["retries"] = result["retry_count"]
result["summary"] = summary
except Exception as e:
result["errors"].append(f"파이프라인 오류: {str(e)}")
result["summary"] = {
"title": "작업 실패",
"summary": f"파이프라인 실행 중 오류 발생: {str(e)}",
"changes": [],
"warnings": result["errors"],
"next_steps": ["오류 내용 확인 후 다시 시도"],
}
self._log("pipeline_error", user_request, str(e))
finally:
self.docs.record_session(
user_request,
result.get("summary", {"summary": "기록 없음"}),
result.get("plan"),
)
self.docs.append_changelog(
result.get("summary", {}).get("title", user_request[:50])
)
return result
# ──────────────────────────────────────────
# 유틸리티
# ──────────────────────────────────────────
def _extract_json(self, text: str) -> dict | None:
"""텍스트에서 JSON 블록 추출."""
match = re.search(r"```json\s*\n(.*?)\n\s*```", text, re.DOTALL)
if match:
try:
return json.loads(match.group(1))
except json.JSONDecodeError:
pass
brace_depth = 0
start = -1
for i, ch in enumerate(text):
if ch == '{':
if brace_depth == 0:
start = i
brace_depth += 1
elif ch == '}':
brace_depth -= 1
if brace_depth == 0 and start >= 0:
try:
return json.loads(text[start:i + 1])
except json.JSONDecodeError:
start = -1
return None
def _log(self, phase: str, input_summary: str, output: str):
self.log.append({
"phase": phase,
"input": input_summary[:200],
"output": output[:500],
})