"""Task Pipeline -- Plan -> Code(에이전트) -> Review -> 재시도 -> 총평 -> 기록. Coder는 에이전트 모드로 프로젝트 디렉토리에서 실행되어 Gemini가 직접 파일을 읽고/쓰고/명령을 실행합니다. 리뷰 실패 시 최대 MAX_REVIEW_RETRIES회 재시도합니다. """ import asyncio import json import logging import re from pathlib import Path from core.project_indexer import ProjectIndex from core.context_manager import ContextManager from core.gemini_caller import GeminiCaller, GeminiCallError from core.docs_manager import DocsManager MAX_REVIEW_RETRIES = 2 logger = logging.getLogger("variet.pipeline") class TaskPipeline: """작업 파이프라인: Plan -> Code(에이전트) -> Review(재시도) -> 기록.""" def __init__(self, project_path: str, token_budget: int = 50_000, docs_subpath: str = "docs/wiki"): self.project_path = project_path self.index = ProjectIndex(project_path) self.ctx = ContextManager(self.index, token_budget) self.gemini = GeminiCaller(project_path) self.docs = DocsManager(project_path, docs_subpath) self.log: list[dict] = [] def setup(self): """프로젝트 인덱싱.""" self.index.scan() return self # ────────────────────────────────────────── # Docs 컨텍스트 (모든 호출에 주입) # ────────────────────────────────────────── def _docs_context(self) -> str: """Gemini 호출에 주입할 프로젝트 문서 컨텍스트.""" index = self.docs.get_docs_index() return ( f"\n{index}\n" f"작업 완료 시 관련 문서가 있으면 업데이트하세요.\n" f"docs 경로: {self.docs.docs_path}\n" ) # ────────────────────────────────────────── # Plan # ────────────────────────────────────────── async def plan(self, user_request: str) -> dict: """Planner로 태스크 분해.""" context = self.ctx.gather(user_request) docs_ctx = self._docs_context() prompt = ( f"## User Request\n{user_request}\n\n" f"## Project Context\n{context}\n\n" f"## Project Docs\n{docs_ctx}\n\n" f"Decompose this request into concrete tasks." ) response = await self.gemini.call("planner", prompt, timeout=180) self._log("plan", user_request, response) plan = self._extract_json(response) return plan or {"summary": response, "tasks": [], "raw": response} # ────────────────────────────────────────── # Code (에이전트 모드 — Gemini가 직접 파일 쓰기) # ────────────────────────────────────────── async def code(self, task: dict) -> str: """에이전트 모드로 코딩 — Gemini가 직접 파일 생성/수정.""" docs_ctx = self._docs_context() prompt = ( f"## Task\n{json.dumps(task, ensure_ascii=False, indent=2)}\n\n" f"## Project Docs\n{docs_ctx}\n\n" f"위 태스크를 구현하세요. 파일을 직접 생성/수정하세요." ) response = await self.gemini.call_agent( "coder", prompt, cwd=self.project_path, timeout=300, ) self._log("code", task.get("title", ""), response) return response # ────────────────────────────────────────── # Code 병렬 실행 # ────────────────────────────────────────── async def code_parallel(self, tasks: list[dict]) -> list[str]: """여러 태스크를 병렬로 코딩 (에이전트 모드).""" results = await asyncio.gather( *[self.code(task) for task in tasks], return_exceptions=True, ) processed = [] for i, result in enumerate(results): if isinstance(result, Exception): error_msg = f"[ERROR] Task {i+1} 실패: {result}" self._log("code_error", tasks[i].get("title", ""), error_msg) processed.append(error_msg) else: processed.append(result) return processed # ────────────────────────────────────────── # Batch Review # ────────────────────────────────────────── async def batch_review(self, tasks: list[dict], code_outputs: list[str]) -> dict: """에이전트가 생성/수정한 실제 파일을 리뷰. code_outputs(에이전트 보고)와 함께 실제 프로젝트 파일 내용을 읽어서 리뷰합니다. """ import os import time as _time # 태스크 요약 task_summaries = [] for i, task in enumerate(tasks): title = task.get("title", task.get("description", f"Task {i+1}")) task_summaries.append(f"### Task {i+1}: {title}") # 에이전트 보고 요약 agent_reports = [] for i, output in enumerate(code_outputs): agent_reports.append(f"--- Agent {i+1} 보고 ---\n{output}") # 최근 변경된 파일 읽기 (10분 이내) recent_files = [] cutoff = _time.time() - 600 project_root = Path(self.project_path) skip_dirs = {".git", "__pycache__", "node_modules", ".venv", "venv"} for root, dirs, files in os.walk(self.project_path): dirs[:] = [d for d in dirs if d not in skip_dirs] for fname in files: fpath = Path(root) / fname try: if fpath.stat().st_mtime > cutoff: rel = fpath.relative_to(project_root) content = fpath.read_text(encoding="utf-8", errors="replace") recent_files.append( f"### {rel}\n```\n{content}\n```" ) except (OSError, UnicodeDecodeError): continue if not recent_files: # 변경 파일 없음 → 자동 통과 (삭제 작업 등) return { "passed": True, "summary": "파일 변경 없음 또는 삭제 작업 — 자동 통과", "issues": [], } files_section = "\n\n".join(recent_files) prompt = ( f"## 요청된 태스크\n{chr(10).join(task_summaries)}\n\n" f"## 에이전트 보고\n{chr(10).join(agent_reports)}\n\n" f"## 실제 생성/수정된 파일\n{files_section}\n\n" f"위 파일들이 태스크 요구사항을 충족하는지 리뷰하세요." ) response = await self.gemini.call("reviewer", prompt, timeout=180) self._log("batch_review", f"{len(tasks)} tasks, {len(recent_files)} files", response) review = self._extract_json(response) return review or {"passed": True, "summary": response, "raw": response} # ────────────────────────────────────────── # 총평 # ────────────────────────────────────────── async def summarize(self, user_request: str, plan: dict, code_outputs: list[str], review: dict) -> dict: """전체 작업 결과 종합 총평.""" prompt = ( f"## 원래 요청\n{user_request}\n\n" f"## 태스크 수\n{len(plan.get('tasks', []))}개\n\n" f"## 리뷰 결과\n{review.get('summary', str(review))}\n\n" f"## 코딩 결과 요약\n" f"{chr(10).join(code_outputs)}\n\n" f"위 정보를 바탕으로 총평을 작성하세요." ) response = await self.gemini.call("summarizer", prompt, timeout=60) self._log("summarize", user_request, response) summary = self._extract_json(response) return summary or { "title": "작업 완료", "summary": response, "changes": [], "warnings": [], "next_steps": [], } # ────────────────────────────────────────── # 전체 파이프라인 (재시도 루프 포함) # ────────────────────────────────────────── async def execute(self, user_request: str) -> dict: """Plan -> Code(에이전트, 병렬) -> Review -> 재시도 -> 총평 -> 기록. Coder가 에이전트 모드로 직접 파일을 생성/수정합니다. 리뷰 실패 시 최대 MAX_REVIEW_RETRIES회 재시도합니다. 성공/실패 모두 docs에 기록됩니다. """ result = { "request": user_request, "plan": None, "code_outputs": [], "review": None, "summary": None, "errors": [], "retry_count": 0, } try: # 1. Plan plan = await self.plan(user_request) result["plan"] = plan tasks = plan.get("tasks", []) if not tasks: result["summary"] = { "title": "태스크 없음", "summary": "Planner가 태스크를 생성하지 못했습니다.", "changes": [], "warnings": ["요청을 더 구체적으로 해주세요."], "next_steps": [], } self.docs.record_session(user_request, result["summary"], plan) return result # 2. Code + Review (재시도 루프) review = None code_outputs = [] for attempt in range(1 + MAX_REVIEW_RETRIES): # Code 병렬 실행 (에이전트 모드 — 파일 직접 쓰기) code_outputs = await self.code_parallel(tasks) result["code_outputs"] = [o[:500] for o in code_outputs] error_count = sum(1 for o in code_outputs if o.startswith("[ERROR]")) if error_count > 0: result["errors"].append( f"코딩 실패: {error_count}/{len(tasks)}개 (시도 {attempt+1})" ) # Review review = await self.batch_review(tasks, code_outputs) result["review"] = review # 리뷰 통과 여부 확인 passed = review.get("passed", True) if isinstance(passed, str): passed = passed.lower() in ("true", "yes", "pass") if passed: logger.info(f"리뷰 통과 (시도 {attempt+1})") break else: result["retry_count"] = attempt + 1 if attempt < MAX_REVIEW_RETRIES: logger.warning( f"리뷰 실패 -- 재시도 {attempt+2}/{1+MAX_REVIEW_RETRIES}" ) # 리뷰 피드백을 태스크에 추가 feedback = review.get("summary", str(review))[:500] for task in tasks: task["review_feedback"] = ( f"이전 시도에서 다음 리뷰 피드백을 받았습니다. " f"반드시 수정하세요:\n{feedback}" ) else: result["errors"].append( f"리뷰 {1+MAX_REVIEW_RETRIES}회 시도 모두 실패" ) # 3. 총평 summary = await self.summarize( user_request, plan, code_outputs, review ) if result["errors"]: existing_warnings = summary.get("warnings", []) summary["warnings"] = existing_warnings + result["errors"] if result["retry_count"] > 0: summary["retries"] = result["retry_count"] result["summary"] = summary except Exception as e: result["errors"].append(f"파이프라인 오류: {str(e)}") result["summary"] = { "title": "작업 실패", "summary": f"파이프라인 실행 중 오류 발생: {str(e)}", "changes": [], "warnings": result["errors"], "next_steps": ["오류 내용 확인 후 다시 시도"], } self._log("pipeline_error", user_request, str(e)) finally: self.docs.record_session( user_request, result.get("summary", {"summary": "기록 없음"}), result.get("plan"), ) self.docs.append_changelog( result.get("summary", {}).get("title", user_request[:50]) ) return result # ────────────────────────────────────────── # 유틸리티 # ────────────────────────────────────────── def _extract_json(self, text: str) -> dict | None: """텍스트에서 JSON 블록 추출.""" match = re.search(r"```json\s*\n(.*?)\n\s*```", text, re.DOTALL) if match: try: return json.loads(match.group(1)) except json.JSONDecodeError: pass brace_depth = 0 start = -1 for i, ch in enumerate(text): if ch == '{': if brace_depth == 0: start = i brace_depth += 1 elif ch == '}': brace_depth -= 1 if brace_depth == 0 and start >= 0: try: return json.loads(text[start:i + 1]) except json.JSONDecodeError: start = -1 return None def _log(self, phase: str, input_summary: str, output: str): self.log.append({ "phase": phase, "input": input_summary[:200], "output": output[:500], })