195 lines
7.8 KiB
Python
195 lines
7.8 KiB
Python
"""Task Pipeline -- Agent 1회 호출 + 선택적 Review.
|
|
|
|
Gemini CLI agent 모드가 plan+code+verify를 한 세션에서 수행합니다.
|
|
기존 5단계(Plan→Code→PlannerVerify→Review→Summarize) → 2단계로 단순화.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import re
|
|
from pathlib import Path
|
|
from core.project_indexer import ProjectIndex
|
|
from core.context_manager import ContextManager
|
|
from core.gemini_caller import GeminiCaller, GeminiCallError
|
|
from core.docs_manager import DocsManager
|
|
|
|
logger = logging.getLogger("variet.pipeline")
|
|
|
|
|
|
class TaskPipeline:
|
|
"""작업 파이프라인: Agent 1회 호출 → 선택적 Review."""
|
|
|
|
def __init__(self, project_path: str, token_budget: int = 50_000,
|
|
docs_subpath: str = "docs/wiki"):
|
|
self.project_path = project_path
|
|
self.index = ProjectIndex(project_path)
|
|
self.ctx = ContextManager(self.index, token_budget)
|
|
self.gemini = GeminiCaller(project_path)
|
|
self.docs = DocsManager(project_path, docs_subpath)
|
|
self.log: list[dict] = []
|
|
|
|
def setup(self):
|
|
"""프로젝트 인덱싱."""
|
|
self.index.scan()
|
|
return self
|
|
|
|
# ──────────────────────────────────────────
|
|
# Docs 컨텍스트
|
|
# ──────────────────────────────────────────
|
|
|
|
def _docs_context(self) -> str:
|
|
"""Gemini 호출에 주입할 프로젝트 문서 컨텍스트."""
|
|
index = self.docs.get_docs_index()
|
|
return (
|
|
f"\n{index}\n"
|
|
f"작업 완료 시 관련 문서가 있으면 업데이트하세요.\n"
|
|
f"docs 경로: {self.docs.docs_path}\n"
|
|
)
|
|
|
|
# ──────────────────────────────────────────
|
|
# Agent 통합 실행 (핵심)
|
|
# ──────────────────────────────────────────
|
|
|
|
async def execute(self, user_request: str, history: str = "",
|
|
progress_callback=None, role: str = "agent",
|
|
timeout: int = None) -> dict:
|
|
"""Agent 1회 호출 — plan+code+verify+report 통합.
|
|
|
|
Args:
|
|
role: 'agent'(코딩) 또는 'operator'(도구 실행)
|
|
progress_callback: async callable(status_text) — 진행 상태 콜백
|
|
timeout: 타임아웃(초). None이면 operator=180, agent=600
|
|
|
|
Returns:
|
|
dict: {title, summary, changes, verified, warnings, next_steps}
|
|
"""
|
|
if timeout is None:
|
|
timeout = 600
|
|
|
|
# operator 모드는 프로젝트 컨텍스트/문서 불필요 (도구만 실행)
|
|
if role == "operator":
|
|
history_section = ""
|
|
if history:
|
|
history_section = f"## 대화 히스토리\n{history}\n\n"
|
|
prompt = (
|
|
f"{history_section}"
|
|
f"## 사용자 요청\n{user_request}\n\n"
|
|
f"위 요청에 맞는 도구를 바로 실행하고 결과를 JSON으로 보고하세요."
|
|
)
|
|
else:
|
|
context = self.ctx.gather(user_request)
|
|
docs_ctx = self._docs_context()
|
|
history_section = ""
|
|
if history:
|
|
history_section = f"## 대화 히스토리\n{history}\n\n"
|
|
prompt = (
|
|
f"{history_section}"
|
|
f"## 사용자 요청\n{user_request}\n\n"
|
|
f"## 프로젝트 컨텍스트\n{context}\n\n"
|
|
f"## 프로젝트 문서\n{docs_ctx}\n\n"
|
|
f"위 요청을 분석하고, 계획을 세우고, 필요한 도구를 실행하고, "
|
|
f"결과를 JSON 보고서로 출력하세요."
|
|
)
|
|
|
|
response = await self.gemini.call_agent(
|
|
role, prompt, cwd=self.project_path, timeout=timeout,
|
|
progress_callback=progress_callback,
|
|
)
|
|
self._log("execute", user_request, response)
|
|
|
|
result = self._extract_json(response)
|
|
return result or {
|
|
"title": "작업 완료",
|
|
"summary": response[:500],
|
|
"changes": [],
|
|
"verified": False,
|
|
"warnings": ["JSON 보고서 파싱 실패 — 원본 응답 참조"],
|
|
"next_steps": [],
|
|
"raw": response,
|
|
}
|
|
|
|
# ──────────────────────────────────────────
|
|
# 선택적 독립 리뷰
|
|
# ──────────────────────────────────────────
|
|
|
|
async def review(self, user_request: str, agent_report: str) -> dict:
|
|
"""독립 리뷰어 — agent 작업 결과를 검증.
|
|
|
|
agent가 자체 검증을 하지만, 중요한 작업에는 독립 리뷰를 추가할 수 있음.
|
|
"""
|
|
prompt = (
|
|
f"## 원래 사용자 요청\n{user_request}\n\n"
|
|
f"## Agent 보고\n{agent_report}\n\n"
|
|
f"현재 디렉토리의 프로젝트 파일을 직접 읽어서 리뷰하세요.\n"
|
|
f"필요한 파일만 선택적으로 확인하세요."
|
|
)
|
|
|
|
response = await self.gemini.call_agent(
|
|
"reviewer", prompt, cwd=self.project_path, timeout=300,
|
|
)
|
|
self._log("review", user_request, response)
|
|
|
|
result = self._extract_json(response)
|
|
return result or {"passed": True, "summary": response, "raw": response}
|
|
|
|
# ──────────────────────────────────────────
|
|
# NLU 분류 (chat/task/anime 구분에 사용)
|
|
# ──────────────────────────────────────────
|
|
|
|
async def classify(self, user_input: str, history: str = "") -> dict:
|
|
"""통합 프롬프트로 의도 분류 (Orchestrator에서 위임).
|
|
|
|
이전 _unified_call의 역할을 담당합니다.
|
|
"""
|
|
docs_ctx = self._docs_context()
|
|
|
|
context = (
|
|
f"{history}"
|
|
f"## Workspace\nPath: {self.project_path}\n\n"
|
|
f"## Project Docs\n{docs_ctx}\n\n"
|
|
f"## User Message\n{user_input}"
|
|
)
|
|
|
|
raw = await self.gemini.call("unified", context, timeout=120)
|
|
|
|
result = self._extract_json(raw)
|
|
return result or {"mode": "chat", "response": raw}
|
|
|
|
# ──────────────────────────────────────────
|
|
# 유틸리티
|
|
# ──────────────────────────────────────────
|
|
|
|
def _extract_json(self, text: str) -> dict | None:
|
|
"""텍스트에서 JSON 블록 추출."""
|
|
match = re.search(r"```json\s*\n(.*?)\n\s*```", text, re.DOTALL)
|
|
if match:
|
|
try:
|
|
return json.loads(match.group(1))
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
brace_depth = 0
|
|
start = -1
|
|
for i, ch in enumerate(text):
|
|
if ch == '{':
|
|
if brace_depth == 0:
|
|
start = i
|
|
brace_depth += 1
|
|
elif ch == '}':
|
|
brace_depth -= 1
|
|
if brace_depth == 0 and start >= 0:
|
|
try:
|
|
return json.loads(text[start:i + 1])
|
|
except json.JSONDecodeError:
|
|
start = -1
|
|
|
|
return None
|
|
|
|
def _log(self, phase: str, input_summary: str, output: str):
|
|
self.log.append({
|
|
"phase": phase,
|
|
"input": input_summary[:200],
|
|
"output": output[:500],
|
|
})
|