"""Task Pipeline — Plan → Code → Review → Ship. E2E 파이프라인을 구성하고 실행합니다. """ import asyncio import json import re from pathlib import Path from core.project_indexer import ProjectIndex from core.context_manager import ContextManager from core.gemini_caller import GeminiCaller class TaskPipeline: """작업 파이프라인: 사용자 요청을 분해하고 순차 실행합니다.""" def __init__(self, project_path: str, token_budget: int = 50_000): self.project_path = project_path self.index = ProjectIndex(project_path) self.ctx = ContextManager(self.index, token_budget) self.gemini = GeminiCaller(project_path) self.log: list[dict] = [] def setup(self): """프로젝트 인덱싱.""" self.index.scan() return self async def plan(self, user_request: str) -> dict: """Planner로 작업 분해.""" structure = self.index.get_structure_summary() prompt = ( f"## User Request\n{user_request}\n\n" f"## Project Structure\n{structure}\n\n" f"Decompose this request into concrete tasks." ) response = await self.gemini.call("planner", prompt, timeout=90) self._log("plan", user_request, response) # JSON 추출 plan = self._extract_json(response) return plan or {"summary": response, "tasks": [], "raw": response} async def code(self, task: dict) -> str: """Coder로 코드 수정.""" # 관련 파일 컨텍스트 수집 context = self.ctx.gather(task.get("description", task.get("title", ""))) prompt = ( f"## Task\n{json.dumps(task, ensure_ascii=False, indent=2)}\n\n" f"## Context\n{context}\n\n" f"Implement the changes described in the task." ) response = await self.gemini.call("coder", prompt, timeout=120) self._log("code", task.get("title", ""), response) return response async def review(self, task: dict, code_output: str) -> dict: """Reviewer로 코드 리뷰.""" prompt = ( f"## Task\n{json.dumps(task, ensure_ascii=False, indent=2)}\n\n" f"## Code Output\n{code_output}\n\n" f"Review the code changes." ) response = await self.gemini.call("reviewer", prompt, timeout=90) self._log("review", task.get("title", ""), response) review = self._extract_json(response) return review or {"passed": True, "summary": response, "raw": response} async def execute(self, user_request: str) -> dict: """전체 파이프라인 실행.""" result = { "request": user_request, "plan": None, "tasks_completed": [], "reviews": [], } # 1. Plan plan = await self.plan(user_request) result["plan"] = plan tasks = plan.get("tasks", []) if not tasks: result["error"] = "Planner returned no tasks" return result # 2. Code + Review for each task for task in tasks: code_output = await self.code(task) review = await self.review(task, code_output) if not review.get("passed", True): # 리뷰 실패 시 한 번 재시도 code_output = await self.code({ **task, "description": task.get("description", "") + f"\n\n## Review Feedback\n{json.dumps(review.get('issues', []), ensure_ascii=False)}" }) review = await self.review(task, code_output) result["tasks_completed"].append({ "task": task, "output": code_output[:500], # 요약 "review": review, }) return result def _extract_json(self, text: str) -> dict | None: """텍스트에서 JSON 블록 추출.""" # ```json ... ``` 패턴 match = re.search(r"```json\s*\n(.*?)\n\s*```", text, re.DOTALL) if match: try: return json.loads(match.group(1)) except json.JSONDecodeError: pass # { ... } 직접 찾기 match = re.search(r"\{.*\}", text, re.DOTALL) if match: try: return json.loads(match.group(0)) except json.JSONDecodeError: pass return None def _log(self, phase: str, input_summary: str, output: str): self.log.append({ "phase": phase, "input": input_summary[:200], "output": output[:500], })