1. 데드 코드 제거: execute() 146줄, _read_project_files() 31줄 2. 전 역할 agent 모드: summarize도 call_agent로 변경 3. 작업 취소: 취소/stop/cancel 입력으로 실행 중 작업 중단 4. 중복 방지: 채널당 1작업만 허용
260 lines
11 KiB
Python
260 lines
11 KiB
Python
"""Task Pipeline -- Plan -> Code(에이전트) -> Review -> 재시도 -> 총평 -> 기록.
|
|
|
|
Coder는 에이전트 모드로 프로젝트 디렉토리에서 실행되어
|
|
Gemini가 직접 파일을 읽고/쓰고/명령을 실행합니다.
|
|
리뷰 실패 시 최대 MAX_REVIEW_RETRIES회 재시도합니다.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import re
|
|
from pathlib import Path
|
|
from core.project_indexer import ProjectIndex
|
|
from core.context_manager import ContextManager
|
|
from core.gemini_caller import GeminiCaller, GeminiCallError
|
|
from core.docs_manager import DocsManager
|
|
|
|
MAX_REVIEW_RETRIES = 2
|
|
logger = logging.getLogger("variet.pipeline")
|
|
|
|
|
|
class TaskPipeline:
|
|
"""작업 파이프라인: Plan -> Code(에이전트) -> Review(재시도) -> 기록."""
|
|
|
|
def __init__(self, project_path: str, token_budget: int = 50_000,
|
|
docs_subpath: str = "docs/wiki"):
|
|
self.project_path = project_path
|
|
self.index = ProjectIndex(project_path)
|
|
self.ctx = ContextManager(self.index, token_budget)
|
|
self.gemini = GeminiCaller(project_path)
|
|
self.docs = DocsManager(project_path, docs_subpath)
|
|
self.log: list[dict] = []
|
|
|
|
def setup(self):
|
|
"""프로젝트 인덱싱."""
|
|
self.index.scan()
|
|
return self
|
|
|
|
# ──────────────────────────────────────────
|
|
# Docs 컨텍스트 (모든 호출에 주입)
|
|
# ──────────────────────────────────────────
|
|
|
|
def _docs_context(self) -> str:
|
|
"""Gemini 호출에 주입할 프로젝트 문서 컨텍스트."""
|
|
index = self.docs.get_docs_index()
|
|
return (
|
|
f"\n{index}\n"
|
|
f"작업 완료 시 관련 문서가 있으면 업데이트하세요.\n"
|
|
f"docs 경로: {self.docs.docs_path}\n"
|
|
)
|
|
|
|
# ──────────────────────────────────────────
|
|
# Plan
|
|
# ──────────────────────────────────────────
|
|
|
|
async def plan(self, user_request: str) -> dict:
|
|
"""Planner로 태스크 분해."""
|
|
context = self.ctx.gather(user_request)
|
|
docs_ctx = self._docs_context()
|
|
|
|
prompt = (
|
|
f"## User Request\n{user_request}\n\n"
|
|
f"## Project Context\n{context}\n\n"
|
|
f"## Project Docs\n{docs_ctx}\n\n"
|
|
f"Decompose this request into concrete tasks."
|
|
)
|
|
|
|
response = await self.gemini.call("planner", prompt, timeout=180)
|
|
self._log("plan", user_request, response)
|
|
|
|
plan = self._extract_json(response)
|
|
return plan or {"summary": response, "tasks": [], "raw": response}
|
|
|
|
# ──────────────────────────────────────────
|
|
# Code (에이전트 모드 — Gemini가 직접 파일 쓰기)
|
|
# ──────────────────────────────────────────
|
|
|
|
async def code(self, task: dict) -> str:
|
|
"""에이전트 모드로 코딩 — Gemini가 직접 파일 생성/수정."""
|
|
docs_ctx = self._docs_context()
|
|
|
|
prompt = (
|
|
f"## Task\n{json.dumps(task, ensure_ascii=False, indent=2)}\n\n"
|
|
f"## Project Docs\n{docs_ctx}\n\n"
|
|
f"위 태스크를 구현하세요. 파일을 직접 생성/수정하세요."
|
|
)
|
|
|
|
response = await self.gemini.call_agent(
|
|
"coder", prompt, cwd=self.project_path, timeout=300,
|
|
)
|
|
self._log("code", task.get("title", ""), response)
|
|
return response
|
|
|
|
# ──────────────────────────────────────────
|
|
# Code 병렬 실행
|
|
# ──────────────────────────────────────────
|
|
|
|
async def code_parallel(self, tasks: list[dict]) -> list[str]:
|
|
"""여러 태스크를 병렬로 코딩 (에이전트 모드)."""
|
|
results = await asyncio.gather(
|
|
*[self.code(task) for task in tasks],
|
|
return_exceptions=True,
|
|
)
|
|
|
|
processed = []
|
|
for i, result in enumerate(results):
|
|
if isinstance(result, Exception):
|
|
error_msg = f"[ERROR] Task {i+1} 실패: {result}"
|
|
self._log("code_error", tasks[i].get("title", ""), error_msg)
|
|
processed.append(error_msg)
|
|
else:
|
|
processed.append(result)
|
|
|
|
return processed
|
|
|
|
|
|
# ──────────────────────────────────────────
|
|
# Planner 자가 검증 (오케스트레이션)
|
|
# ──────────────────────────────────────────
|
|
|
|
async def planner_verify(
|
|
self, user_request: str, plan: dict,
|
|
code_outputs: list[str],
|
|
) -> dict:
|
|
"""Planner가 자기 계획의 달성 여부를 에이전트 모드로 검증.
|
|
|
|
프로젝트 디렉토리에서 직접 파일을 읽어서 계획 충족 여부를 판단합니다.
|
|
"""
|
|
agent_reports = "\n".join(
|
|
f"--- Agent {i+1} ---\n{output}"
|
|
for i, output in enumerate(code_outputs)
|
|
)
|
|
|
|
prompt = (
|
|
f"## 원래 사용자 요청\n{user_request}\n\n"
|
|
f"## 내가 세운 계획\n{json.dumps(plan, ensure_ascii=False, indent=2)}\n\n"
|
|
f"## 에이전트 보고\n{agent_reports}\n\n"
|
|
f"## 판단 요청\n"
|
|
f"현재 디렉토리의 프로젝트 파일을 직접 읽어서 계획이 충족되었는지 확인하세요.\n"
|
|
f"필요한 파일만 선택적으로 읽으세요.\n\n"
|
|
f"충족되었으면 satisfied=true.\n"
|
|
f"미충족이면 satisfied=false + 부족한 부분을 해결할 추가 태스크를 생성하세요.\n\n"
|
|
f"반드시 아래 JSON만 출력하세요:\n"
|
|
f"```json\n"
|
|
f'{{\n'
|
|
f' "satisfied": true|false,\n'
|
|
f' "feedback": "판단 근거 (한국어)",\n'
|
|
f' "additional_tasks": [\n'
|
|
f' {{"id": 1, "title": "추가 태스크", "description": "구현 내용", "type": "modify"}}\n'
|
|
f' ]\n'
|
|
f'}}\n'
|
|
f"```"
|
|
)
|
|
|
|
response = await self.gemini.call_agent(
|
|
"planner", prompt, cwd=self.project_path, timeout=180,
|
|
)
|
|
self._log("planner_verify", user_request, response)
|
|
|
|
result = self._extract_json(response)
|
|
return result or {"satisfied": True, "feedback": response}
|
|
|
|
# ──────────────────────────────────────────
|
|
# Batch Review
|
|
# ──────────────────────────────────────────
|
|
|
|
async def batch_review(self, tasks: list[dict], code_outputs: list[str]) -> dict:
|
|
"""에이전트 모드로 프로젝트 파일을 직접 읽어 리뷰."""
|
|
task_summaries = []
|
|
for i, task in enumerate(tasks):
|
|
title = task.get("title", task.get("description", f"Task {i+1}"))
|
|
task_summaries.append(f"### Task {i+1}: {title}")
|
|
|
|
agent_reports = []
|
|
for i, output in enumerate(code_outputs):
|
|
agent_reports.append(f"--- Agent {i+1} 보고 ---\n{output}")
|
|
|
|
prompt = (
|
|
f"## 요청된 태스크\n{chr(10).join(task_summaries)}\n\n"
|
|
f"## 에이전트 보고\n{chr(10).join(agent_reports)}\n\n"
|
|
f"현재 디렉토리의 프로젝트 파일을 직접 읽어서 리뷰하세요.\n"
|
|
f"필요한 파일만 선택적으로 확인하세요."
|
|
)
|
|
|
|
response = await self.gemini.call_agent(
|
|
"reviewer", prompt, cwd=self.project_path, timeout=180,
|
|
)
|
|
self._log("batch_review", f"{len(tasks)} tasks", response)
|
|
|
|
review = self._extract_json(response)
|
|
return review or {"passed": True, "summary": response, "raw": response}
|
|
|
|
# ──────────────────────────────────────────
|
|
# 총평
|
|
# ──────────────────────────────────────────
|
|
|
|
async def summarize(self, user_request: str, plan: dict,
|
|
code_outputs: list[str], review: dict) -> dict:
|
|
"""전체 작업 결과 종합 총평."""
|
|
prompt = (
|
|
f"## 원래 요청\n{user_request}\n\n"
|
|
f"## 태스크 수\n{len(plan.get('tasks', []))}개\n\n"
|
|
f"## 리뷰 결과\n{review.get('summary', str(review))}\n\n"
|
|
f"## 코딩 결과 요약\n"
|
|
f"{chr(10).join(code_outputs)}\n\n"
|
|
f"위 정보를 바탕으로 총평을 작성하세요."
|
|
)
|
|
|
|
response = await self.gemini.call_agent(
|
|
"summarizer", prompt, cwd=self.project_path, timeout=120,
|
|
)
|
|
self._log("summarize", user_request, response)
|
|
|
|
summary = self._extract_json(response)
|
|
return summary or {
|
|
"title": "작업 완료",
|
|
"summary": response,
|
|
"changes": [],
|
|
"warnings": [],
|
|
"next_steps": [],
|
|
}
|
|
|
|
|
|
# ──────────────────────────────────────────
|
|
# 유틸리티
|
|
# ──────────────────────────────────────────
|
|
|
|
def _extract_json(self, text: str) -> dict | None:
|
|
"""텍스트에서 JSON 블록 추출."""
|
|
match = re.search(r"```json\s*\n(.*?)\n\s*```", text, re.DOTALL)
|
|
if match:
|
|
try:
|
|
return json.loads(match.group(1))
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
brace_depth = 0
|
|
start = -1
|
|
for i, ch in enumerate(text):
|
|
if ch == '{':
|
|
if brace_depth == 0:
|
|
start = i
|
|
brace_depth += 1
|
|
elif ch == '}':
|
|
brace_depth -= 1
|
|
if brace_depth == 0 and start >= 0:
|
|
try:
|
|
return json.loads(text[start:i + 1])
|
|
except json.JSONDecodeError:
|
|
start = -1
|
|
|
|
return None
|
|
|
|
def _log(self, phase: str, input_summary: str, output: str):
|
|
self.log.append({
|
|
"phase": phase,
|
|
"input": input_summary[:200],
|
|
"output": output[:500],
|
|
})
|