"""Task Pipeline -- Agent 1회 호출 + 선택적 Review. Gemini CLI agent 모드가 plan+code+verify를 한 세션에서 수행합니다. 기존 5단계(Plan→Code→PlannerVerify→Review→Summarize) → 2단계로 단순화. """ import asyncio import json import logging import re from pathlib import Path from core.project_indexer import ProjectIndex from core.context_manager import ContextManager from core.gemini_caller import GeminiCaller, GeminiCallError from core.docs_manager import DocsManager logger = logging.getLogger("variet.pipeline") class TaskPipeline: """작업 파이프라인: Agent 1회 호출 → 선택적 Review.""" def __init__(self, project_path: str, token_budget: int = 50_000, docs_subpath: str = "docs/wiki"): self.project_path = project_path self.index = ProjectIndex(project_path) self.ctx = ContextManager(self.index, token_budget) self.gemini = GeminiCaller(project_path) self.docs = DocsManager(project_path, docs_subpath) self.log: list[dict] = [] def setup(self): """프로젝트 인덱싱.""" self.index.scan() return self # ────────────────────────────────────────── # Docs 컨텍스트 # ────────────────────────────────────────── def _docs_context(self) -> str: """Gemini 호출에 주입할 프로젝트 문서 컨텍스트.""" index = self.docs.get_docs_index() return ( f"\n{index}\n" f"작업 완료 시 관련 문서가 있으면 업데이트하세요.\n" f"docs 경로: {self.docs.docs_path}\n" ) # ────────────────────────────────────────── # Agent 통합 실행 (핵심) # ────────────────────────────────────────── async def execute(self, user_request: str, history: str = "", progress_callback=None, role: str = "agent", timeout: int = None) -> dict: """Agent 1회 호출 — plan+code+verify+report 통합. Args: role: 'agent'(코딩) 또는 'operator'(도구 실행) progress_callback: async callable(status_text) — 진행 상태 콜백 timeout: 타임아웃(초). None이면 operator=180, agent=600 Returns: dict: {title, summary, changes, verified, warnings, next_steps} """ if timeout is None: timeout = 600 # operator 모드는 프로젝트 컨텍스트/문서 불필요 (도구만 실행) if role == "operator": history_section = "" if history: history_section = f"## 대화 히스토리\n{history}\n\n" prompt = ( f"{history_section}" f"## 사용자 요청\n{user_request}\n\n" f"위 요청에 맞는 도구를 바로 실행하고 결과를 JSON으로 보고하세요." ) else: context = self.ctx.gather(user_request) docs_ctx = self._docs_context() history_section = "" if history: history_section = f"## 대화 히스토리\n{history}\n\n" prompt = ( f"{history_section}" f"## 사용자 요청\n{user_request}\n\n" f"## 프로젝트 컨텍스트\n{context}\n\n" f"## 프로젝트 문서\n{docs_ctx}\n\n" f"위 요청을 분석하고, 계획을 세우고, 필요한 도구를 실행하고, " f"결과를 JSON 보고서로 출력하세요." ) response = await self.gemini.call_agent( role, prompt, cwd=self.project_path, timeout=timeout, progress_callback=progress_callback, ) self._log("execute", user_request, response) result = self._extract_json(response) return result or { "title": "작업 완료", "summary": response[:500], "changes": [], "verified": False, "warnings": ["JSON 보고서 파싱 실패 — 원본 응답 참조"], "next_steps": [], "raw": response, } # ────────────────────────────────────────── # 선택적 독립 리뷰 # ────────────────────────────────────────── async def review(self, user_request: str, agent_report: str) -> dict: """독립 리뷰어 — agent 작업 결과를 검증. agent가 자체 검증을 하지만, 중요한 작업에는 독립 리뷰를 추가할 수 있음. """ prompt = ( f"## 원래 사용자 요청\n{user_request}\n\n" f"## Agent 보고\n{agent_report}\n\n" f"현재 디렉토리의 프로젝트 파일을 직접 읽어서 리뷰하세요.\n" f"필요한 파일만 선택적으로 확인하세요." ) response = await self.gemini.call_agent( "reviewer", prompt, cwd=self.project_path, timeout=300, ) self._log("review", user_request, response) result = self._extract_json(response) return result or {"passed": True, "summary": response, "raw": response} # ────────────────────────────────────────── # NLU 분류 (chat/task/anime 구분에 사용) # ────────────────────────────────────────── async def classify(self, user_input: str, history: str = "") -> dict: """통합 프롬프트로 의도 분류 (Orchestrator에서 위임). 이전 _unified_call의 역할을 담당합니다. """ docs_ctx = self._docs_context() context = ( f"{history}" f"## Workspace\nPath: {self.project_path}\n\n" f"## Project Docs\n{docs_ctx}\n\n" f"## User Message\n{user_input}" ) raw = await self.gemini.call("unified", context, timeout=120) result = self._extract_json(raw) return result or {"mode": "chat", "response": raw} # ────────────────────────────────────────── # 유틸리티 # ────────────────────────────────────────── def _extract_json(self, text: str) -> dict | None: """텍스트에서 JSON 블록 추출.""" match = re.search(r"```json\s*\n(.*?)\n\s*```", text, re.DOTALL) if match: try: return json.loads(match.group(1)) except json.JSONDecodeError: pass brace_depth = 0 start = -1 for i, ch in enumerate(text): if ch == '{': if brace_depth == 0: start = i brace_depth += 1 elif ch == '}': brace_depth -= 1 if brace_depth == 0 and start >= 0: try: return json.loads(text[start:i + 1]) except json.JSONDecodeError: start = -1 return None def _log(self, phase: str, input_summary: str, output: str): self.log.append({ "phase": phase, "input": input_summary[:200], "output": output[:500], })