feat: Pipeline 전면 개선 — 병렬실행, Batch Review, 총평, 대화기억, 스마트라우팅

- GeminiCaller: cmd/c 제거, 인자 분리, Semaphore(4) 동시성 제어, GeminiCallError
- TaskPipeline: asyncio.gather 병렬 코딩, batch_review 1회, summarize 총평
- FileApplier: Coder 출력 파싱 → 실제 파일 적용 (경로 보안 체크)
- Discord Bot: on_message 자동채팅, 의도분류(chat/task/clarify), 대화기억(10메시지)
- Prompts: router.md (의도분류), summarizer.md (총평)
- Workflows: agent_chat 환경 경로 업데이트
This commit is contained in:
2026-03-06 20:46:58 +09:00
parent 4c0f5ec9c7
commit 752d851f9f
10 changed files with 783 additions and 190 deletions

View File

@@ -1,24 +1,24 @@
---
description: 프로젝트 전체 작업 현황을 종합 체크하는 워크플로우 (Git + Vikunja + 로컬)
description: ?„로?<3F>트 ?„ì²´ ?‘ì—… ?„황??종합 ì²´í<C2B4>¬?˜ëŠ” ?Œí<C592>¬?Œë¡œ??(Git + Vikunja + 로컬)
---
# 프로젝트 현황 종합 체크
# ?„로?<3F>트 ?„황 종합 ì²´í<C2B4>¬
"작업상황 체크", "현황 확인", "status check" 등 요청 시 이 워크플로우를 실행합니다.
"?‘ì—…?<3F>황 ì²´í<C2B4>¬", "?„황 ?•ì<E280A2>¸", "status check" ???”ì²­ ?????Œí<C592>¬?Œë¡œ?°ë? ?¤í–‰?©ë‹ˆ??
// turbo-all
## 절차
## ?ˆì°¨
1. Git 로컬 상태 확인:
1. Git 로컬 ?<3F>태 ?•ì<E280A2>¸:
```powershell
git -C "c:\Users\CafeVariet-GL552VW\Desktop\source_diff\variet-agent" status --short
git -C "c:\Users\Certes\Desktop\variet-agent" status --short
```
```powershell
git -C "c:\Users\CafeVariet-GL552VW\Desktop\source_diff\variet-agent" log --oneline -5
git -C "c:\Users\Certes\Desktop\variet-agent" log --oneline -5
```
2. Vikunja 태스크 현황 조회:
2. Vikunja ?œìФ???„황 조회:
```powershell
$h = @{Authorization="Bearer tk_070f8e0b715e818bb7178c3815ed5389040eddca"}
$tasks = Invoke-RestMethod -Uri "https://plan.variet.net/api/v1/projects/7/tasks?per_page=50" -Headers $h
@@ -28,7 +28,7 @@ Write-Host "=== Vikunja: TODO $($todo.Count), DONE $($done.Count) ==="
$todo | ForEach-Object { Write-Host " #$($_.id) $($_.title)" }
```
3. Gitea 최근 커밋 확인 (리모트):
3. Gitea 최근 커밋 ?•ì<E280A2>¸ (리모??:
```powershell
$h = @{Authorization="token 3a01b4b15a39921572e64c413353e870d4d2161b"}
$commits = Invoke-RestMethod -Uri "https://git.variet.net/api/v1/repos/Variet/variet-agent/commits?limit=5&sha=main" -Headers $h
@@ -36,8 +36,8 @@ Write-Host "=== Gitea: Recent Commits ==="
$commits | ForEach-Object { Write-Host " $($_.sha.Substring(0,7)) $($_.commit.message.Split("`n")[0])" }
```
4. 결과를 종합하여 사용자에게 보고:
- 로컬 uncommitted 변경 여부
- 로컬 vs 리모트 커밋 차이
- TODO 태스크 목록 + 우선순위
- 다음 작업 제안
4. ê²°ê³¼ë¥?종합?˜ì—¬ ?¬ìš©?<3F>ì—<C3AC>ê²?ë³´ê³ :
- 로컬 uncommitted ë³€ê²??¬ë?
- 로컬 vs 리모??커밋 ì°¨ì<C2A8>´
- TODO ?œìФ??목ë¡<C3AB> + ?°ì„ ?œìœ„
- ?¤ì<C2A4>Œ ?‘ì—… ?œì•ˆ

View File

@@ -1,50 +1,50 @@
---
description: Vikunja APIVariet Agent 프로젝트 태스크 현황을 조회하는 워크플로우
description: Vikunja APIë¡?Variet Agent ?„로?<3F>트 ?œìФ???„황??조회?˜ëŠ” ?Œí<C592>¬?Œë¡œ??
---
# Vikunja 태스크 현황 조회
# Vikunja ?œìФ???„황 조회
서비스 정보는 `.agent/workflows/services.md` 참조.
?œë¹„???•ë³´??`.agent/workflows/services.md` 참조.
// turbo-all
## 절차
## ?ˆì°¨
1. 간편 조회 (TODO/DONE 리스트):
1. 간편 조회 (TODO/DONE 리스??:
```powershell
C:\ProgramData\miniforge3\envs\quant\python.exe .agent\workflows\vikunja_helper.py list
C:\ProgramData\miniforge3\envs\agent_chat\python.exe .agent\workflows\vikunja_helper.py list
```
2. TODO만 조회:
2. TODO�조회:
```powershell
C:\ProgramData\miniforge3\envs\quant\python.exe .agent\workflows\vikunja_helper.py list todo
C:\ProgramData\miniforge3\envs\agent_chat\python.exe .agent\workflows\vikunja_helper.py list todo
```
3. DONE만 조회:
3. DONE�조회:
```powershell
C:\ProgramData\miniforge3\envs\quant\python.exe .agent\workflows\vikunja_helper.py list done
C:\ProgramData\miniforge3\envs\agent_chat\python.exe .agent\workflows\vikunja_helper.py list done
```
4. 태스크 완료 처리 (**⚠️ 반드시 이 방법 사용 — 직접 API 호출 금지**):
4. ?œìФ???„료 처리 (**? ï¸<C3AF> 반드????방법 ?¬ìš© ??ì§<C3AC>ì  API ?¸ì¶œ 금ì?**):
```powershell
C:\ProgramData\miniforge3\envs\quant\python.exe .agent\workflows\vikunja_helper.py done {TASK_ID}
C:\ProgramData\miniforge3\envs\agent_chat\python.exe .agent\workflows\vikunja_helper.py done {TASK_ID}
```
여러 태스크 동시:
?¬ëŸ¬ ?œìФ???™ì‹œ:
```powershell
C:\ProgramData\miniforge3\envs\quant\python.exe .agent\workflows\vikunja_helper.py done 71 77 78
C:\ProgramData\miniforge3\envs\agent_chat\python.exe .agent\workflows\vikunja_helper.py done 71 77 78
```
5. 태스크 코멘트 추가:
5. ?œìФ??코멘??ì¶”ê?:
```powershell
C:\ProgramData\miniforge3\envs\quant\python.exe .agent\workflows\vikunja_helper.py comment {TASK_ID} "내용"
C:\ProgramData\miniforge3\envs\agent_chat\python.exe .agent\workflows\vikunja_helper.py comment {TASK_ID} "?´ìš©"
```
6. 새 태스크 생성:
6. ???œìФ???<3F>성:
```powershell
C:\ProgramData\miniforge3\envs\quant\python.exe .agent\workflows\vikunja_helper.py create "제목" "설명"
C:\ProgramData\miniforge3\envs\agent_chat\python.exe .agent\workflows\vikunja_helper.py create "?œëª©" "?¤ëª…"
```
> [!CAUTION]
> **절대로** `Invoke-RestMethod -Method Post -Body '{"done": true}'` 같은 직접 API 호출을 사용하지 마세요.
> Vikunja APIPOST body에 포함되지 않은 필드를 빈값으로 덮어씁니다.
> `vikunja_helper.py`는 항상 GET → 기존 필드 보존 → POST 패턴을 사용하여 title/description 보존을 보장합니다.
> **?ˆë?ë¡?* `Invoke-RestMethod -Method Post -Body '{"done": true}'` ê°™ì? ì§<C3AC>ì  API ?¸ì¶œ???¬ìš©?˜ì? 마세??
> Vikunja API??POST ??body???¬í•¨?˜ì? ?Šì? ?„드ë¥?빈값?¼ë¡œ ??´?<3F>ëˆ??
> `vikunja_helper.py`????ƒ<> GET ??기존 ?„드 ë³´ì¡´ ??POST ?¨í„´???¬ìš©?˜ì—¬ title/description ë³´ì¡´??보장?©ë‹ˆ??

View File

@@ -1,21 +1,21 @@
---
description: 개발 서버 실행 방법
description: 개발 ?œë²„ ?¤í–‰ 방법
---
## 환경 설정
## ?˜ê²½ ?¤ì •
1. Python 환경 활성화
1. Python ?˜ê²½ ?œì„±??
// turbo
```
C:\ProgramData\miniforge3\envs\quant\python.exe -m pip install -r requirements.txt
C:\ProgramData\miniforge3\envs\agent_chat\python.exe -m pip install -r requirements.txt
```
2. API 서버 실행
2. API ?œë²„ ?¤í–‰
```
C:\ProgramData\miniforge3\envs\quant\python.exe -m uvicorn api.server:app --reload --host 0.0.0.0 --port 8100
C:\ProgramData\miniforge3\envs\agent_chat\python.exe -m uvicorn api.server:app --reload --host 0.0.0.0 --port 8100
```
3. Discord Bot 실행 (별도 터미널)
3. Discord Bot ?¤í–‰ (별ë<E2809E>„ ?°ë???
```
C:\ProgramData\miniforge3\envs\quant\python.exe -m api.discord_bot
C:\ProgramData\miniforge3\envs\agent_chat\python.exe -m api.discord_bot
```

View File

@@ -1,24 +1,24 @@
---
description: 테스트 실행 방법
description: ?ŒìФ???¤í–‰ 방법
---
## 단위 테스트
## ?¨ìœ„ ?ŒìФ??
// turbo
```
C:\ProgramData\miniforge3\envs\quant\python.exe -m pytest tests/ -v
C:\ProgramData\miniforge3\envs\agent_chat\python.exe -m pytest tests/ -v
```
## Context Manager 효과 테스트
## Context Manager ?¨ê³¼ ?ŒìФ??
```
C:\ProgramData\miniforge3\envs\quant\python.exe -m tests.test_context_manager
C:\ProgramData\miniforge3\envs\agent_chat\python.exe -m tests.test_context_manager
```
## Task Pipeline E2E
```
C:\ProgramData\miniforge3\envs\quant\python.exe -m tests.test_pipeline_e2e
C:\ProgramData\miniforge3\envs\agent_chat\python.exe -m tests.test_pipeline_e2e
```
## Gemini CLI 연동 테스트
## Gemini CLI ?°ë<C2B0>™ ?ŒìФ??
```
gemini -p "Hello, respond with 'OK'" --approval-mode yolo -o json
```

View File

@@ -1,21 +1,26 @@
"""Discord Bot 어댑터.
사용자 명령을 받아 FastAPI API를 호출하고 결과를 디스코드로 보고합니다.
사용자 명령을 받아 파이프라인 실행 또는 즉답.
지정 채널(봇 이름 채널)에서는 접두사 없이 자연스럽게 대화합니다.
대화 기억: 채널별 최근 메시지를 컨텍스트로 주입.
"""
import asyncio
import json
import logging
import re
import discord
from discord.ext import commands
import config
from core.gemini_caller import GeminiCallError
logger = logging.getLogger("variet.discord")
# Bot 설정
intents = discord.Intents.default()
intents.message_content = True # MESSAGE CONTENT INTENT 필요
intents.message_content = True
bot = commands.Bot(
command_prefix=config.DISCORD_COMMAND_PREFIX,
@@ -23,56 +28,217 @@ bot = commands.Bot(
help_command=commands.DefaultHelpCommand(no_category="명령어"),
)
# In-memory: Discord 채널 ↔ Task 매핑
# In-memory
_channel_tasks: dict[int, list[str]] = {}
_auto_chat_channel_ids: set[int] = set()
# ──────────────────────────────────────────────
# 대화 기억 (Conversation Memory)
# ──────────────────────────────────────────────
async def _get_channel_history(channel: discord.TextChannel, limit: int = 10) -> str:
"""채널의 최근 메시지를 대화 히스토리 문자열로 변환."""
messages = []
async for msg in channel.history(limit=limit + 1): # 현재 메시지 포함이므로 +1
if msg.author.bot:
role = "assistant"
else:
role = "user"
messages.append(f"[{role}] {msg.content[:300]}")
# 시간순 (오래된 것 먼저)
messages.reverse()
# 마지막(현재 메시지)은 제외 — 이미 context로 전달되니까
if messages:
messages = messages[:-1]
if not messages:
return ""
return "=== CONVERSATION HISTORY ===\n" + "\n".join(messages) + "\n\n"
# ──────────────────────────────────────────────
# 의도 분류 (Intent Router)
# ──────────────────────────────────────────────
async def _classify_intent(message: str) -> dict:
"""Gemini로 사용자 메시지의 의도를 분류."""
from core.gemini_caller import GeminiCaller
gemini = GeminiCaller(str(config.PROJECT_ROOT))
try:
raw = await gemini.call("router", message, timeout=30)
except GeminiCallError as e:
logger.warning(f"Intent 분류 실패: {e}")
return {"intent": "chat", "reason": "분류 실패"}
try:
json_match = re.search(r'\{[^}]+\}', raw)
if json_match:
return json.loads(json_match.group())
except (json.JSONDecodeError, AttributeError):
pass
logger.warning(f"Intent JSON 파싱 실패, chat으로 처리: {raw[:100]}")
return {"intent": "chat", "reason": "파싱 실패 — 기본 chat"}
# ──────────────────────────────────────────────
# 이벤트 핸들러
# ──────────────────────────────────────────────
@bot.event
async def on_ready():
"""봇 접속 완료."""
"""봇 접속 완료 — 봇 이름 채널 자동 감지."""
logger.info(f"Discord Bot 접속 완료: {bot.user} (ID: {bot.user.id})")
logger.info(f"서버 {len(bot.guilds)}개 연결됨")
bot_name = bot.user.name.lower().replace(" ", "-")
bot_name_underscore = bot_name.replace("-", "_")
bot_name_dash = bot_name.replace("_", "-")
for guild in bot.guilds:
for channel in guild.text_channels:
ch_name = channel.name.lower()
if ch_name in (bot_name, bot_name_underscore, bot_name_dash):
_auto_chat_channel_ids.add(channel.id)
logger.info(
f"자동 채팅 채널 감지: #{channel.name} (ID: {channel.id}) "
f"in {guild.name}"
)
if not _auto_chat_channel_ids:
logger.info(f"봇 이름({bot_name}) 채널 없음 — !명령어만 사용 가능")
await bot.change_presence(
activity=discord.Activity(
type=discord.ActivityType.listening,
name=f"{config.DISCORD_COMMAND_PREFIX}agent",
name="대화" if _auto_chat_channel_ids else f"{config.DISCORD_COMMAND_PREFIX}agent",
)
)
@bot.command(name="agent", help="AI Agent에게 작업 요청\n예: !agent README에 설치 방법 추가해줘")
async def agent_command(ctx: commands.Context, *, request: str):
"""작업 요청 → Pipeline 실행 → 결과 보고."""
@bot.event
async def on_message(message: discord.Message):
"""모든 메시지 수신 — 봇 전용 채널이면 스마트 라우팅."""
if message.author == bot.user or message.author.bot:
return
# 1. 접수 메시지
embed = discord.Embed(
title="📋 작업 접수",
description=f"```{request[:200]}```",
color=0x3498DB,
)
embed.set_footer(text="분석 중...")
status_msg = await ctx.send(embed=embed)
# ! 명령어는 기존 핸들러로
if message.content.startswith(config.DISCORD_COMMAND_PREFIX):
await bot.process_commands(message)
return
# 봇 전용 채널이 아니면 무시
if message.channel.id not in _auto_chat_channel_ids:
return
await _route_message(message)
# ──────────────────────────────────────────────
# 스마트 라우팅
# ──────────────────────────────────────────────
async def _route_message(message: discord.Message):
"""메시지 의도를 분류하고 적절한 핸들러로 라우팅."""
user_text = message.content.strip()
if not user_text:
return
# 짧은 메시지 → 분류 없이 바로 chat
if len(user_text) <= 15:
await _handle_chat(message, user_text)
return
# 의도 분류
async with message.channel.typing():
intent_result = await _classify_intent(user_text)
intent = intent_result.get("intent", "chat")
reason = intent_result.get("reason", "")
logger.info(f"의도 분류: {intent} ({reason}) — \"{user_text[:50]}\"")
if intent == "task":
await _handle_task(message, user_text)
elif intent == "clarify":
await _handle_clarify(message, user_text, reason)
else:
await _handle_chat(message, user_text)
# ──────────────────────────────────────────────
# 핸들러: Chat (즉답 + 대화 기억)
# ──────────────────────────────────────────────
async def _handle_chat(message: discord.Message, text: str):
"""즉답 — 대화 히스토리 포함하여 Gemini 직접 호출."""
async with message.channel.typing():
try:
# 2. Pipeline 직접 실행 (같은 프로세스)
from core.task_pipeline import TaskPipeline
from core.gemini_caller import GeminiCaller
# 대화 기억: 최근 10개 메시지
history = await _get_channel_history(message.channel, limit=10)
context = f"{history}{text}"
gemini = GeminiCaller(str(config.PROJECT_ROOT))
response = await gemini.call("default", context, timeout=60)
if not response:
await message.reply("⚠️ 응답을 생성하지 못했어요. 다시 시도해 주세요.")
return
# Discord 2000자 제한
if len(response) <= 2000:
await message.reply(response)
else:
for i in range(0, len(response), 4000):
chunk = response[i:i + 4000]
embed = discord.Embed(description=chunk, color=0x3498DB)
await message.channel.send(embed=embed)
except GeminiCallError as e:
logger.error(f"Chat Gemini 오류: {e}")
await message.reply(f"⚠️ AI 호출 오류: {str(e)[:200]}")
except Exception as e:
logger.error(f"Chat 오류: {e}", exc_info=True)
await message.reply(f"❌ 오류: {str(e)[:200]}")
# ──────────────────────────────────────────────
# 핸들러: Task (파이프라인 실행)
# ──────────────────────────────────────────────
async def _handle_task(message: discord.Message, text: str):
"""작업 요청 — 새 파이프라인 (병렬 Code + Batch Review + 총평)."""
import uuid
task_id = uuid.uuid4().hex[:8]
# Planning
# 접수 메시지
embed = discord.Embed(
title="📋 작업 접수",
description=f"```{text[:200]}```",
color=0x3498DB,
)
embed.set_footer(text=f"ID: {task_id} — 분석 중...")
status_msg = await message.channel.send(embed=embed)
try:
from core.task_pipeline import TaskPipeline
pipeline = TaskPipeline(project_path=str(config.PROJECT_ROOT))
pipeline.setup()
# ── Plan ──
embed.color = 0xF39C12
embed.set_footer(text=f"🔍 작업 분해 중... (ID: {task_id})")
await status_msg.edit(embed=embed)
pipeline = TaskPipeline(
project_path=str(config.PROJECT_ROOT),
)
pipeline.setup()
# Plan 단계
plan = await pipeline.plan(request)
plan = await pipeline.plan(text)
tasks = plan.get("tasks", [])
plan_text = plan.get("summary", str(plan))[:500]
@@ -92,82 +258,178 @@ async def agent_command(ctx: commands.Context, *, request: str):
inline=False,
)
plan_embed.set_footer(text=f"ID: {task_id}")
await ctx.send(embed=plan_embed)
await message.channel.send(embed=plan_embed)
# Code + Review 단계
if tasks:
for i, task in enumerate(tasks, 1):
progress_embed = discord.Embed(
title=f"⚙️ 실행 중 ({i}/{len(tasks)})",
description=task.get("title", task.get("description", ""))[:200],
if not tasks:
await message.channel.send(
embed=discord.Embed(
title="⚠️ 실행할 태스크가 없습니다",
description="요청을 더 구체적으로 해주세요.",
color=0xF39C12,
)
)
return
# ── Code 병렬 실행 ──
code_embed = discord.Embed(
title=f"⚙️ 코딩 중... ({len(tasks)}개 병렬 실행)",
description="\n".join(
f"{t.get('title', t.get('description', '?'))[:60]}"
for t in tasks
),
color=0xE67E22,
)
await ctx.send(embed=progress_embed)
code_msg = await message.channel.send(embed=code_embed)
code_output = await pipeline.code(task)
review = await pipeline.review(task, code_output)
code_outputs = await pipeline.code_parallel(tasks)
# 코딩 완료 표시
code_embed.title = f"✅ 코딩 완료 ({len(tasks)}개)"
code_embed.color = 0x2ECC71
await code_msg.edit(embed=code_embed)
# ── 파일 적용 ──
from core.file_applier import parse_code_output, apply_changes
all_applied = []
for output in code_outputs:
if not output.startswith("[ERROR]"):
changes = parse_code_output(output)
if changes:
applied = apply_changes(changes, str(config.PROJECT_ROOT))
all_applied.extend(applied)
if all_applied:
files_text = "\n".join(
f"• `{f['path']}` ({f['action']})"
for f in all_applied[:15]
)
await message.channel.send(
embed=discord.Embed(
title=f"📁 파일 적용 ({len(all_applied)}개)",
description=files_text,
color=0x3498DB,
)
)
# ── Batch Review ──
review_msg = await message.channel.send(
embed=discord.Embed(
title="🔍 전체 리뷰 중...",
color=0xF39C12,
)
)
review = await pipeline.batch_review(tasks, code_outputs)
passed = review.get("passed", True)
review_emoji = "" if passed else "⚠️"
review_embed = discord.Embed(
title=f"{review_emoji} 리뷰 결과 ({i}/{len(tasks)})",
title=f"{'' if passed else '⚠️'} 리뷰 결과",
description=review.get("summary", str(review))[:500],
color=0x2ECC71 if passed else 0xE74C3C,
)
await ctx.send(embed=review_embed)
await review_msg.edit(embed=review_embed)
# 완료
done_embed = discord.Embed(
title="✅ 작업 완료",
description=f"{len(tasks)}개 태스크 처리 완료",
color=0x2ECC71,
# ── 총평 ──
summary = await pipeline.summarize(
text, plan, code_outputs, review, all_applied
)
done_embed.set_footer(text=f"ID: {task_id}")
await ctx.send(embed=done_embed)
# 채널 태스크 기록
_channel_tasks.setdefault(ctx.channel.id, []).append(task_id)
summary_embed = discord.Embed(
title=f"📊 {summary.get('title', '작업 완료')}",
description=summary.get("summary", "작업이 완료되었습니다."),
color=0x9B59B6,
)
# 변경 파일 요약
changes = summary.get("changes", [])
if changes:
changes_text = "\n".join(
f"• `{c.get('file', '?')}` — {c.get('description', '')}"
for c in changes[:10]
)
summary_embed.add_field(
name="변경 사항",
value=changes_text[:1000],
inline=False,
)
# 주의사항
warnings = summary.get("warnings", [])
if warnings:
summary_embed.add_field(
name="⚠️ 주의",
value="\n".join(f"{w}" for w in warnings),
inline=False,
)
# 다음 단계 제안
next_steps = summary.get("next_steps", [])
if next_steps:
summary_embed.add_field(
name="🔜 다음 단계",
value="\n".join(f"{s}" for s in next_steps),
inline=False,
)
summary_embed.set_footer(text=f"ID: {task_id}")
await message.channel.send(embed=summary_embed)
_channel_tasks.setdefault(message.channel.id, []).append(task_id)
except GeminiCallError as e:
logger.error(f"파이프라인 Gemini 오류: {e}")
await message.channel.send(
embed=discord.Embed(
title="❌ AI 호출 오류",
description=f"```{str(e)[:500]}```",
color=0xE74C3C,
)
)
except Exception as e:
logger.error(f"작업 실행 오류: {e}", exc_info=True)
error_embed = discord.Embed(
await message.channel.send(
embed=discord.Embed(
title="❌ 오류 발생",
description=f"```{str(e)[:500]}```",
color=0xE74C3C,
)
await ctx.send(embed=error_embed)
)
# ──────────────────────────────────────────────
# 핸들러: Clarify (되묻기)
# ──────────────────────────────────────────────
async def _handle_clarify(message: discord.Message, text: str, reason: str):
"""의도 불명확 — 사용자에게 되묻기."""
embed = discord.Embed(
title="🤔 확인이 필요해요",
description=(
f"말씀하신 내용을 정확히 이해하고 싶어요.\n\n"
f"> {text[:200]}\n\n"
f"**💬 질문/대화**인가요, **🔧 작업 요청**인가요?\n"
f"`질문` 또는 `작업`으로 답해주세요."
),
color=0xF39C12,
)
embed.set_footer(text=f"사유: {reason}")
await message.reply(embed=embed)
# ──────────────────────────────────────────────
# 기존 ! 명령어 (유지)
# ──────────────────────────────────────────────
@bot.command(name="agent", help="AI Agent에게 작업 요청\n예: !agent README에 설치 방법 추가해줘")
async def agent_command(ctx: commands.Context, *, request: str):
"""작업 요청 → Pipeline 실행."""
await _handle_task(ctx.message, request)
@bot.command(name="chat", help="Gemini에게 질문/대화\n예: !chat 네 소개를 해줘")
async def chat_command(ctx: commands.Context, *, message: str):
"""단순 대화 — Pipeline 없이 Gemini 직접 호출."""
async with ctx.typing():
try:
from core.gemini_caller import GeminiCaller
gemini = GeminiCaller(str(config.PROJECT_ROOT))
response = await gemini.call(
role="default",
context=message,
timeout=60,
)
# Discord 메시지 2000자 제한 처리
if len(response) <= 2000:
await ctx.send(response)
else:
# 긴 응답은 Embed로 분할
for i in range(0, len(response), 4000):
chunk = response[i:i + 4000]
embed = discord.Embed(
description=chunk,
color=0x3498DB,
)
await ctx.send(embed=embed)
except Exception as e:
logger.error(f"Chat 오류: {e}", exc_info=True)
await ctx.send(f"❌ 오류: {str(e)[:200]}")
"""단순 대화."""
await _handle_chat(ctx.message, message)
@bot.command(name="ping", help="봇 응답 테스트")
@@ -188,6 +450,16 @@ async def info_command(ctx: commands.Context):
embed.add_field(name="프로젝트", value=str(config.PROJECT_ROOT), inline=False)
embed.add_field(name="명령어 접두사", value=config.DISCORD_COMMAND_PREFIX, inline=True)
embed.add_field(name="서버 수", value=str(len(bot.guilds)), inline=True)
if _auto_chat_channel_ids:
channels = ", ".join(f"<#{ch_id}>" for ch_id in _auto_chat_channel_ids)
embed.add_field(name="자동 채팅 채널", value=channels, inline=False)
embed.add_field(
name="파이프라인",
value="Plan → Code(병렬) → Review(배치) → 총평",
inline=False,
)
await ctx.send(embed=embed)

117
core/file_applier.py Normal file
View File

@@ -0,0 +1,117 @@
"""File Applier — Coder 출력을 실제 파일에 적용.
Coder가 출력한 `=== FILE: path === ... === END FILE ===` 블록을
파싱하여 프로젝트 파일에 실제로 쓰기.
"""
import re
import logging
from pathlib import Path
from dataclasses import dataclass
logger = logging.getLogger("variet.applier")
@dataclass
class FileChange:
"""파일 변경 단위."""
path: str # 상대 경로
content: str # 전체 파일 내용
is_new: bool # 신규 파일 여부
def parse_code_output(raw: str) -> list[FileChange]:
"""Coder 출력에서 파일 블록을 추출.
지원 형식:
=== FILE: path/to/file.py ===
(content)
=== END FILE ===
또는 마크다운 방식:
```python:path/to/file.py
(content)
```
"""
changes: list[FileChange] = []
# 패턴 1: === FILE: path === ... === END FILE ===
pattern1 = re.compile(
r'===\s*FILE:\s*(.+?)\s*===\s*\n(.*?)\n\s*===\s*END\s*FILE\s*===',
re.DOTALL,
)
for match in pattern1.finditer(raw):
path = match.group(1).strip()
content = match.group(2)
changes.append(FileChange(path=path, content=content, is_new=False))
# 패턴 2: ```lang:path/to/file.py\n...\n```
if not changes:
pattern2 = re.compile(
r'```\w*:(.+?)\n(.*?)\n```',
re.DOTALL,
)
for match in pattern2.finditer(raw):
path = match.group(1).strip()
content = match.group(2)
changes.append(FileChange(path=path, content=content, is_new=False))
return changes
def apply_changes(
changes: list[FileChange],
project_path: str | Path,
dry_run: bool = False,
) -> list[dict]:
"""파일 변경사항을 프로젝트에 적용.
Args:
changes: parse_code_output() 결과
project_path: 프로젝트 루트 경로
dry_run: True면 실제 파일 쓰기 없이 결과만 반환
Returns:
적용 결과 리스트 [{"path": ..., "action": "created|modified|skipped", "lines": N}]
"""
root = Path(project_path).resolve()
results = []
for change in changes:
# 경로 정규화 + 보안: 프로젝트 밖 경로 차단
target = (root / change.path).resolve()
if not str(target).startswith(str(root)):
logger.warning(f"경로 보안 위반 — 스킵: {change.path}")
results.append({
"path": change.path,
"action": "skipped",
"reason": "프로젝트 외부 경로",
})
continue
is_new = not target.exists()
line_count = len(change.content.splitlines())
if dry_run:
results.append({
"path": change.path,
"action": "would_create" if is_new else "would_modify",
"lines": line_count,
})
continue
# 디렉토리 생성
target.parent.mkdir(parents=True, exist_ok=True)
# 파일 쓰기
target.write_text(change.content, encoding="utf-8")
action = "created" if is_new else "modified"
logger.info(f"파일 {action}: {change.path} ({line_count}L)")
results.append({
"path": change.path,
"action": action,
"lines": line_count,
})
return results

View File

@@ -1,16 +1,25 @@
"""GeminiCaller — gemini headless 호출.
stdin으로 시스템 프롬프트 + 컨텍스트를 직접 전달합니다.
cmd /c 래핑으로 PowerShell 실행 정책 우회.
인자 분리, 세마포어 기반 동시성 제어, 에러 처리 개선.
"""
import asyncio
import logging
import time
from pathlib import Path
logger = logging.getLogger("variet.gemini")
ROLE_PROMPTS_DIR = Path(__file__).parent.parent / "prompts"
# 동시 호출 제한 (Gemini AI Ultra 120RPM 고려)
_semaphore = asyncio.Semaphore(4)
class GeminiCallError(Exception):
"""Gemini CLI 호출 실패."""
pass
class GeminiCaller:
"""Gemini CLI headless 호출을 관리합니다."""
@@ -23,8 +32,14 @@ class GeminiCaller:
async def call(self, role: str, context: str, timeout: int = 120) -> str:
"""역할별 프롬프트로 gemini 호출.
시스템 프롬프트와 컨텍스트를 하나로 합쳐 stdin으로 전달.
세마포어로 동시 호출 수 제한.
인자를 분리하여 안정적 subprocess 실행.
"""
async with _semaphore:
return await self._call_impl(role, context, timeout)
async def _call_impl(self, role: str, context: str, timeout: int) -> str:
"""실제 gemini 호출 구현."""
# 시스템 프롬프트 로드
prompt_file = ROLE_PROMPTS_DIR / f"{role}.md"
if prompt_file.exists():
@@ -42,14 +57,14 @@ class GeminiCaller:
try:
proc = await asyncio.create_subprocess_exec(
"cmd", "/c", "gemini --approval-mode yolo",
"gemini", "--approval-mode", "yolo",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(
proc.communicate(input=full_input.encode("utf-8")),
timeout=timeout
timeout=timeout,
)
self.call_count += 1
@@ -57,23 +72,40 @@ class GeminiCaller:
output = stdout.decode("utf-8", errors="replace").strip()
# YOLO 모드 메시지 제거
# 노이즈 라인 제거
lines = output.splitlines()
cleaned = []
for line in lines:
if "YOLO mode" in line or "Loaded cached" in line:
if any(noise in line for noise in [
"YOLO mode", "Loaded cached", "Welcome to Gemini",
"Type /help", "Gemini CLI",
]):
continue
cleaned.append(line)
return "\n".join(cleaned).strip()
result = "\n".join(cleaned).strip()
if not result:
logger.warning(f"Gemini [{role}] 빈 응답")
if stderr:
err = stderr.decode("utf-8", errors="replace").strip()
logger.warning(f"Gemini stderr: {err[:200]}")
logger.info(
f"Gemini [{role}] 호출 #{self.call_count} "
f"— 입력 {len(full_input)}자 → 출력 {len(result)}"
)
return result
except asyncio.TimeoutError:
return f"[ERROR] Gemini CLI timeout after {timeout}s"
raise GeminiCallError(f"Gemini CLI timeout ({timeout}s) — role={role}")
except FileNotFoundError:
raise GeminiCallError(
"gemini CLI를 찾을 수 없습니다. PATH에 gemini가 있는지 확인하세요."
)
except Exception as e:
return f"[ERROR] Gemini CLI call failed: {e}"
raise GeminiCallError(f"Gemini CLI 호출 실패: {e}")
async def call_simple(self, prompt: str, timeout: int = 60) -> str:
"""시스템 프롬프트 없이 단순 호출."""
return await self.call("default", prompt, timeout)

View File

@@ -1,6 +1,6 @@
"""Task Pipeline — Plan → Code Review → Ship.
"""Task Pipeline — Plan → Code(병렬) → Batch Review → 총평.
E2E 파이프라인을 구성하고 실행합니다.
병렬 코드 실행, 단일 배치 리뷰, 파일 적용, 종합 총평을 수행합니다.
"""
import asyncio
@@ -9,11 +9,12 @@ import re
from pathlib import Path
from core.project_indexer import ProjectIndex
from core.context_manager import ContextManager
from core.gemini_caller import GeminiCaller
from core.gemini_caller import GeminiCaller, GeminiCallError
from core.file_applier import parse_code_output, apply_changes
class TaskPipeline:
"""작업 파이프라인: 사용자 요청을 분해하고 순차 실행합니다."""
"""작업 파이프라인: Plan → Code(병렬) → Review(배치) → Summary."""
def __init__(self, project_path: str, token_budget: int = 50_000):
self.project_path = project_path
@@ -27,6 +28,10 @@ class TaskPipeline:
self.index.scan()
return self
# ──────────────────────────────────────────
# Plan
# ──────────────────────────────────────────
async def plan(self, user_request: str) -> dict:
"""Planner로 작업 분해."""
structure = self.index.get_structure_summary()
@@ -39,13 +44,15 @@ class TaskPipeline:
response = await self.gemini.call("planner", prompt, timeout=180)
self._log("plan", user_request, response)
# JSON 추출
plan = self._extract_json(response)
return plan or {"summary": response, "tasks": [], "raw": response}
# ──────────────────────────────────────────
# Code (개별 태스크)
# ──────────────────────────────────────────
async def code(self, task: dict) -> str:
"""Coder로 코드 수정."""
# 관련 파일 컨텍스트 수집
"""Coder로 코드 수정 (단일 태스크)."""
context = self.ctx.gather(task.get("description", task.get("title", "")))
prompt = (
@@ -58,27 +65,109 @@ class TaskPipeline:
self._log("code", task.get("title", ""), response)
return response
async def review(self, task: dict, code_output: str) -> dict:
"""Reviewer로 코드 리뷰."""
# ──────────────────────────────────────────
# Code 병렬 실행
# ──────────────────────────────────────────
async def code_parallel(self, tasks: list[dict]) -> list[str]:
"""여러 태스크를 병렬로 코딩."""
results = await asyncio.gather(
*[self.code(task) for task in tasks],
return_exceptions=True,
)
# 예외를 문자열로 변환
processed = []
for i, result in enumerate(results):
if isinstance(result, Exception):
error_msg = f"[ERROR] Task {i+1} 실패: {result}"
self._log("code_error", tasks[i].get("title", ""), error_msg)
processed.append(error_msg)
else:
processed.append(result)
return processed
# ──────────────────────────────────────────
# Batch Review (전체 한 번)
# ──────────────────────────────────────────
async def batch_review(self, tasks: list[dict], code_outputs: list[str]) -> dict:
"""모든 코드 출력을 한 번에 리뷰."""
# 태스크별 코드 출력을 하나로 합침
combined = []
for i, (task, output) in enumerate(zip(tasks, code_outputs)):
title = task.get("title", task.get("description", f"Task {i+1}"))
combined.append(
f"### Task {i+1}: {title}\n"
f"{output[:2000]}\n" # 각 출력 2000자 제한
)
prompt = (
f"## Task\n{json.dumps(task, ensure_ascii=False, indent=2)}\n\n"
f"## Code Output\n{code_output}\n\n"
f"Review the code changes."
f"## All Code Changes\n\n"
f"{'---'.join(combined)}\n\n"
f"Review ALL changes above as a whole. "
f"Check for consistency, conflicts between tasks, and overall correctness."
)
response = await self.gemini.call("reviewer", prompt, timeout=180)
self._log("review", task.get("title", ""), response)
self._log("batch_review", f"{len(tasks)} tasks", response)
review = self._extract_json(response)
return review or {"passed": True, "summary": response, "raw": response}
# ──────────────────────────────────────────
# 총평 (Summary)
# ──────────────────────────────────────────
async def summarize(
self,
user_request: str,
plan: dict,
code_outputs: list[str],
review: dict,
applied_files: list[dict],
) -> dict:
"""전체 작업 결과 종합 총평."""
file_changes = "\n".join(
f"- {f['path']} ({f['action']}, {f.get('lines', '?')}L)"
for f in applied_files
) if applied_files else "파일 변경 없음"
prompt = (
f"## 원래 요청\n{user_request}\n\n"
f"## 계획\n{plan.get('summary', str(plan))[:500]}\n\n"
f"## 태스크 수\n{len(plan.get('tasks', []))}\n\n"
f"## 리뷰 결과\n{review.get('summary', str(review))[:500]}\n\n"
f"## 변경된 파일\n{file_changes}\n\n"
f"위 정보를 바탕으로 총평을 작성하세요."
)
response = await self.gemini.call("summarizer", prompt, timeout=60)
self._log("summarize", user_request, response)
summary = self._extract_json(response)
return summary or {
"title": "작업 완료",
"summary": response,
"changes": [],
"warnings": [],
"next_steps": [],
}
# ──────────────────────────────────────────
# 전체 파이프라인 실행
# ──────────────────────────────────────────
async def execute(self, user_request: str) -> dict:
"""전체 파이프라인 실행."""
"""전체 파이프라인: Plan → Code(병렬) → Review(배치) → 파일 적용 → 총평."""
result = {
"request": user_request,
"plan": None,
"tasks_completed": [],
"reviews": [],
"code_outputs": [],
"review": None,
"applied_files": [],
"summary": None,
}
# 1. Plan
@@ -87,32 +176,50 @@ class TaskPipeline:
tasks = plan.get("tasks", [])
if not tasks:
result["error"] = "Planner returned no tasks"
result["summary"] = {
"title": "태스크 없음",
"summary": "Planner가 실행할 태스크를 생성하지 못했습니다.",
"changes": [],
"warnings": ["요청을 더 구체적으로 해주세요."],
"next_steps": [],
}
return result
# 2. Code + Review for each task
for task in tasks:
code_output = await self.code(task)
# 2. Code 병렬 실행
code_outputs = await self.code_parallel(tasks)
result["code_outputs"] = [o[:500] for o in code_outputs]
review = await self.review(task, code_output)
# 3. 파일 적용 (Coder 출력 파싱)
all_applied = []
for output in code_outputs:
if output.startswith("[ERROR]"):
continue
changes = parse_code_output(output)
if changes:
applied = apply_changes(changes, self.project_path)
all_applied.extend(applied)
result["applied_files"] = all_applied
# 4. Batch Review (전체 1회)
review = await self.batch_review(tasks, code_outputs)
result["review"] = review
# 리뷰 실패 시 로그만 (재시도 없이 진행)
if not review.get("passed", True):
# 리뷰 실패 시 한 번 재시도
code_output = await self.code({
**task,
"description": task.get("description", "") +
f"\n\n## Review Feedback\n{json.dumps(review.get('issues', []), ensure_ascii=False)}"
})
review = await self.review(task, code_output)
self._log("review_warning", "batch", "리뷰 이슈 있음 — 총평에 반영")
result["tasks_completed"].append({
"task": task,
"output": code_output[:500], # 요약
"review": review,
})
# 5. 총평
summary = await self.summarize(
user_request, plan, code_outputs, review, all_applied
)
result["summary"] = summary
return result
# ──────────────────────────────────────────
# 유틸리티
# ──────────────────────────────────────────
def _extract_json(self, text: str) -> dict | None:
"""텍스트에서 JSON 블록 추출."""
# ```json ... ``` 패턴
@@ -123,13 +230,21 @@ class TaskPipeline:
except json.JSONDecodeError:
pass
# { ... } 직접 찾기
match = re.search(r"\{.*\}", text, re.DOTALL)
if match:
# { ... } 직접 찾기 (중첩 지원)
brace_depth = 0
start = -1
for i, ch in enumerate(text):
if ch == '{':
if brace_depth == 0:
start = i
brace_depth += 1
elif ch == '}':
brace_depth -= 1
if brace_depth == 0 and start >= 0:
try:
return json.loads(match.group(0))
return json.loads(text[start:i + 1])
except json.JSONDecodeError:
pass
start = -1
return None

24
prompts/router.md Normal file
View File

@@ -0,0 +1,24 @@
# Intent Router
사용자의 메시지를 분류하세요. 반드시 아래 JSON 형식만 출력하세요.
## 분류 기준
- **chat**: 즉답 가능한 질문, 인사, 잡담, 지식 질문, 의견 요청
- **task**: 코드 수정/생성/분석/리팩토링/파일 변경 등 구체적 작업 요청
- **clarify**: 맥락 없이 의도를 파악할 수 없는 경우 ("그거 해줘", "아까 그거")
## 출력 형식 (JSON만, 다른 텍스트 금지)
```json
{"intent": "chat", "reason": "인사 메시지"}
```
## 예시
- "안녕" → `{"intent": "chat", "reason": "인사"}`
- "파이썬 리스트 정렬 방법" → `{"intent": "chat", "reason": "지식 질문"}`
- "README에 설치 방법 추가해줘" → `{"intent": "task", "reason": "파일 수정 요청"}`
- "테스트 코드 작성해줘" → `{"intent": "task", "reason": "코드 생성 요청"}`
- "그거 고쳐줘" → `{"intent": "clarify", "reason": "대상 불명확"}`
- "이 프로젝트 구조 설명해줘" → `{"intent": "chat", "reason": "설명 요청"}`

33
prompts/summarizer.md Normal file
View File

@@ -0,0 +1,33 @@
# Summarizer
당신은 AI Agent Team의 **총평 작성자**입니다.
작업 파이프라인이 완료된 후, 전체 결과를 사용자가 이해하기 쉽게 요약합니다.
## 입력
- 사용자의 원래 요청
- Plan 결과 (태스크 목록)
- Code 결과 (각 태스크별 출력)
- Review 결과 (전체 리뷰)
- 파일 변경 목록
## 출력 형식 (JSON)
```json
{
"title": "작업 완료 한줄 제목",
"changes": [
{"file": "path/to/file.py", "description": "변경 내용 설명"}
],
"warnings": ["주의사항이 있으면 여기에"],
"next_steps": ["사용자가 다음에 할 수 있는 작업 제안"],
"summary": "2-3문장 전체 요약"
}
```
## 규칙
- 기술 용어는 최소화, 사용자 관점에서 서술
- 한국어로 답변
- 주의사항이 없으면 warnings를 빈 배열로
- next_steps는 1-2개만 제안