feat: 워크스페이스 시스템 + 통합 프롬프트 + Docs 기록 관리

- workspace.py: 채널별 워크스페이스 모델 + JSON 영속 저장
- discord_bot.py: /workspace 슬래시 커맨드 (set/git/vikunja/info/remove/list)
  - 등록 채널만 자동 응답, 미등록 채널 무시
  - Git/Vikunja 미설정 시 작업 차단 + 안내
  - 통합 프롬프트 1회 호출 (router+planner+chat 통합)
- docs_manager.py: Wiki 인덱스, 세션 기록, Changelog 자동 업데이트
- task_pipeline.py: 모든 Gemini 호출에 docs 컨텍스트 주입, 완료 시 기록
- unified.md: 분류+즉답/계획 통합 프롬프트
This commit is contained in:
2026-03-06 21:12:50 +09:00
parent 752d851f9f
commit a9bdce90f4
5 changed files with 746 additions and 280 deletions

View File

@@ -1,8 +1,7 @@
"""Discord Bot 어댑터. """Discord Bot — 워크스페이스 기반 AI Agent.
사용자 명령을 받아 파이프라인 실행 또는 즉답. 슬래시 커맨드로 워크스페이스 관리.
지정 채널(봇 이름 채널)에서는 접두사 없이 자연스럽게 대화합니다. 등록된 채널에서 자동 대화 + 통합 프롬프트 (1회 호출로 분류+응답).
대화 기억: 채널별 최근 메시지를 컨텍스트로 주입.
""" """
import asyncio import asyncio
@@ -11,10 +10,12 @@ import logging
import re import re
import discord import discord
from discord import app_commands
from discord.ext import commands from discord.ext import commands
import config import config
from core.gemini_caller import GeminiCallError from core.workspace import WorkspaceManager
from core.gemini_caller import GeminiCaller, GeminiCallError
logger = logging.getLogger("variet.discord") logger = logging.getLogger("variet.discord")
@@ -28,31 +29,24 @@ bot = commands.Bot(
help_command=commands.DefaultHelpCommand(no_category="명령어"), help_command=commands.DefaultHelpCommand(no_category="명령어"),
) )
# In-memory # 워크스페이스 매니저 (전역)
_channel_tasks: dict[int, list[str]] = {} ws_manager = WorkspaceManager()
_auto_chat_channel_ids: set[int] = set()
# ────────────────────────────────────────────── # ──────────────────────────────────────────────
# 대화 기억 (Conversation Memory) # 대화 기억
# ────────────────────────────────────────────── # ──────────────────────────────────────────────
async def _get_channel_history(channel: discord.TextChannel, limit: int = 10) -> str: async def _get_channel_history(channel: discord.TextChannel, limit: int = 10) -> str:
"""채널 최근 메시지를 대화 히스토리 문자열로 변환.""" """채널 최근 메시지를 대화 히스토리 문자열로 변환."""
messages = [] messages = []
async for msg in channel.history(limit=limit + 1): # 현재 메시지 포함이므로 +1 async for msg in channel.history(limit=limit + 1):
if msg.author.bot: role = "assistant" if msg.author.bot else "user"
role = "assistant"
else:
role = "user"
messages.append(f"[{role}] {msg.content[:300]}") messages.append(f"[{role}] {msg.content[:300]}")
# 시간순 (오래된 것 먼저)
messages.reverse() messages.reverse()
# 마지막(현재 메시지)은 제외 — 이미 context로 전달되니까
if messages: if messages:
messages = messages[:-1] messages = messages[:-1] # 현재 메시지 제외
if not messages: if not messages:
return "" return ""
@@ -61,29 +55,51 @@ async def _get_channel_history(channel: discord.TextChannel, limit: int = 10) ->
# ────────────────────────────────────────────── # ──────────────────────────────────────────────
# 의도 분류 (Intent Router) # 통합 프롬프트 (1회 호출: 분류 + 응답/계획)
# ────────────────────────────────────────────── # ──────────────────────────────────────────────
async def _classify_intent(message: str) -> dict: async def _unified_call(text: str, history: str, project_path: str) -> dict:
"""Gemini로 사용자 메시지의 의도를.""" """통합 프롬프트로 1회 호출 — chat/task/clarify 자동."""
from core.gemini_caller import GeminiCaller gemini = GeminiCaller(project_path)
gemini = GeminiCaller(str(config.PROJECT_ROOT)) # docs 인덱스 주입
try: from core.docs_manager import DocsManager
raw = await gemini.call("router", message, timeout=30) docs = DocsManager(project_path)
except GeminiCallError as e: docs_index = docs.get_docs_index()
logger.warning(f"Intent 분류 실패: {e}")
return {"intent": "chat", "reason": "분류 실패"}
context = (
f"{history}"
f"## Project Docs\n{docs_index}\n\n"
f"## User Message\n{text}"
)
raw = await gemini.call("unified", context, timeout=120)
# JSON 추출
try: try:
json_match = re.search(r'\{[^}]+\}', raw) # ```json ... ``` 패턴
if json_match: match = re.search(r'```json\s*\n(.*?)\n\s*```', raw, re.DOTALL)
return json.loads(json_match.group()) if match:
return json.loads(match.group(1))
# 중첩 { } 찾기
brace_depth = 0
start = -1
for i, ch in enumerate(raw):
if ch == '{':
if brace_depth == 0:
start = i
brace_depth += 1
elif ch == '}':
brace_depth -= 1
if brace_depth == 0 and start >= 0:
return json.loads(raw[start:i + 1])
except (json.JSONDecodeError, AttributeError): except (json.JSONDecodeError, AttributeError):
pass pass
logger.warning(f"Intent JSON 파싱 실패, chat으로 처리: {raw[:100]}") # 파싱 실패 chat으로 처리
return {"intent": "chat", "reason": "파싱 실패 — 기본 chat"} logger.warning(f"통합 프롬프트 JSON 파싱 실패: {raw[:100]}")
return {"mode": "chat", "response": raw}
# ────────────────────────────────────────────── # ──────────────────────────────────────────────
@@ -92,106 +108,86 @@ async def _classify_intent(message: str) -> dict:
@bot.event @bot.event
async def on_ready(): async def on_ready():
"""봇 접속 완료 — 봇 이름 채널 자동 감지.""" """봇 접속 완료 — 슬래시 커맨드 동기화."""
logger.info(f"Discord Bot 접속 완료: {bot.user} (ID: {bot.user.id})") logger.info(f"Discord Bot 접속 완료: {bot.user} (ID: {bot.user.id})")
logger.info(f"서버 {len(bot.guilds)}개 연결됨") logger.info(f"서버 {len(bot.guilds)}개 연결됨")
bot_name = bot.user.name.lower().replace(" ", "-") # 슬래시 커맨드 동기화
bot_name_underscore = bot_name.replace("-", "_") try:
bot_name_dash = bot_name.replace("_", "-") synced = await bot.tree.sync()
logger.info(f"슬래시 커맨드 {len(synced)}개 동기화 완료")
except Exception as e:
logger.error(f"슬래시 커맨드 동기화 실패: {e}")
for guild in bot.guilds: # 등록된 워크스페이스 채널 표시
for channel in guild.text_channels: ws_list = ws_manager.list_all()
ch_name = channel.name.lower() if ws_list:
if ch_name in (bot_name, bot_name_underscore, bot_name_dash): for ws in ws_list:
_auto_chat_channel_ids.add(channel.id) logger.info(f"워크스페이스 활성: #{ws.channel_id}{ws.name} ({ws.path})")
logger.info( else:
f"자동 채팅 채널 감지: #{channel.name} (ID: {channel.id}) " logger.info("등록된 워크스페이스 없음 - /workspace set 으로 등록하세요")
f"in {guild.name}"
)
if not _auto_chat_channel_ids:
logger.info(f"봇 이름({bot_name}) 채널 없음 — !명령어만 사용 가능")
await bot.change_presence( await bot.change_presence(
activity=discord.Activity( activity=discord.Activity(
type=discord.ActivityType.listening, type=discord.ActivityType.listening,
name="대화" if _auto_chat_channel_ids else f"{config.DISCORD_COMMAND_PREFIX}agent", name="/workspace",
) )
) )
@bot.event @bot.event
async def on_message(message: discord.Message): async def on_message(message: discord.Message):
"""모든 메시지 수신 — 봇 전용 채널이면 스마트 라우팅.""" """메시지 수신 — 워크스페이스 채널이면 자동 응답."""
if message.author == bot.user or message.author.bot: if message.author == bot.user or message.author.bot:
return return
# ! 명령어는 기존 핸들러로 # ! 명령어 처리
if message.content.startswith(config.DISCORD_COMMAND_PREFIX): if message.content.startswith(config.DISCORD_COMMAND_PREFIX):
await bot.process_commands(message) await bot.process_commands(message)
return return
# 봇 전용 채널이 아니면 무시 # 워크스페이스 채널인지 확인
if message.channel.id not in _auto_chat_channel_ids: if not ws_manager.is_workspace_channel(message.channel.id):
return return
await _route_message(message) ws = ws_manager.get_workspace(message.channel.id)
# ──────────────────────────────────────────────
# 스마트 라우팅
# ──────────────────────────────────────────────
async def _route_message(message: discord.Message):
"""메시지 의도를 분류하고 적절한 핸들러로 라우팅."""
user_text = message.content.strip() user_text = message.content.strip()
if not user_text: if not user_text:
return return
# 짧은 메시지 → 분류 없이 바로 chat # 통합 프롬프트 호출
if len(user_text) <= 15:
await _handle_chat(message, user_text)
return
# 의도 분류
async with message.channel.typing():
intent_result = await _classify_intent(user_text)
intent = intent_result.get("intent", "chat")
reason = intent_result.get("reason", "")
logger.info(f"의도 분류: {intent} ({reason}) — \"{user_text[:50]}\"")
if intent == "task":
await _handle_task(message, user_text)
elif intent == "clarify":
await _handle_clarify(message, user_text, reason)
else:
await _handle_chat(message, user_text)
# ──────────────────────────────────────────────
# 핸들러: Chat (즉답 + 대화 기억)
# ──────────────────────────────────────────────
async def _handle_chat(message: discord.Message, text: str):
"""즉답 — 대화 히스토리 포함하여 Gemini 직접 호출."""
async with message.channel.typing(): async with message.channel.typing():
try: try:
from core.gemini_caller import GeminiCaller
# 대화 기억: 최근 10개 메시지
history = await _get_channel_history(message.channel, limit=10) history = await _get_channel_history(message.channel, limit=10)
context = f"{history}{text}" result = await _unified_call(user_text, history, ws.path)
except GeminiCallError as e:
gemini = GeminiCaller(str(config.PROJECT_ROOT)) await message.reply(f"⚠️ AI 호출 오류: {str(e)[:200]}")
response = await gemini.call("default", context, timeout=60) return
except Exception as e:
if not response: logger.error(f"통합 호출 오류: {e}", exc_info=True)
await message.reply("⚠️ 응답을 생성하지 못했어요. 다시 시도해 주세요.") await message.reply(f"❌ 오류: {str(e)[:200]}")
return return
# Discord 2000자 제한 mode = result.get("mode", "chat")
logger.info(f"통합 분류: {mode}\"{user_text[:50]}\"")
if mode == "task":
# 설정 완료 체크
if not ws.is_ready:
await _send_setup_warning(message, ws)
return
await _handle_task(message, user_text, ws)
elif mode == "clarify":
question = result.get("question", "더 구체적으로 말씀해 주시겠어요?")
embed = discord.Embed(
title="🤔 확인이 필요해요",
description=question,
color=0xF39C12,
)
await message.reply(embed=embed)
else:
# chat — 즉답
response = result.get("response", "응답을 생성하지 못했습니다.")
if len(response) <= 2000: if len(response) <= 2000:
await message.reply(response) await message.reply(response)
else: else:
@@ -200,51 +196,71 @@ async def _handle_chat(message: discord.Message, text: str):
embed = discord.Embed(description=chunk, color=0x3498DB) embed = discord.Embed(description=chunk, color=0x3498DB)
await message.channel.send(embed=embed) await message.channel.send(embed=embed)
except GeminiCallError as e:
logger.error(f"Chat Gemini 오류: {e}") # ──────────────────────────────────────────────
await message.reply(f"⚠️ AI 호출 오류: {str(e)[:200]}") # 설정 경고
except Exception as e: # ──────────────────────────────────────────────
logger.error(f"Chat 오류: {e}", exc_info=True)
await message.reply(f"❌ 오류: {str(e)[:200]}") async def _send_setup_warning(message: discord.Message, ws):
"""미설정 항목 안내."""
missing = ws.missing_configs
lines = []
for item in missing:
if item == "Git":
lines.append("❌ **Git** 미설정 → `/workspace git` 으로 설정")
elif item == "Vikunja":
lines.append("❌ **Vikunja** 미설정 → `/workspace vikunja` 으로 설정")
embed = discord.Embed(
title="⚠️ 워크스페이스 설정 미완료",
description=(
f"**{ws.name}** 워크스페이스의 설정이 완료되지 않아 작업을 실행할 수 없습니다.\n\n"
+ "\n".join(lines)
+ "\n\n설정 완료 후 다시 요청해주세요."
),
color=0xE74C3C,
)
await message.reply(embed=embed)
# ────────────────────────────────────────────── # ──────────────────────────────────────────────
# 핸들러: Task (파이프라인 실행) # Task 핸들러 (파이프라인 실행)
# ────────────────────────────────────────────── # ──────────────────────────────────────────────
async def _handle_task(message: discord.Message, text: str): async def _handle_task(message: discord.Message, text: str, ws):
"""작업 요청 — 파이프라인 (병렬 Code + Batch Review + 총평).""" """작업 요청 — 파이프라인 실행."""
import uuid import uuid
task_id = uuid.uuid4().hex[:8] task_id = uuid.uuid4().hex[:8]
# 접수 메시지
embed = discord.Embed( embed = discord.Embed(
title="📋 작업 접수", title="📋 작업 접수",
description=f"```{text[:200]}```", description=f"```{text[:200]}```",
color=0x3498DB, color=0x3498DB,
) )
embed.set_footer(text=f"ID: {task_id}분석 중...") embed.set_footer(text=f"ID: {task_id}워크스페이스: {ws.name}")
status_msg = await message.channel.send(embed=embed) status_msg = await message.channel.send(embed=embed)
try: try:
from core.task_pipeline import TaskPipeline from core.task_pipeline import TaskPipeline
pipeline = TaskPipeline(project_path=str(config.PROJECT_ROOT)) pipeline = TaskPipeline(
project_path=ws.path,
docs_subpath=ws.docs_path,
)
pipeline.setup() pipeline.setup()
# ── Plan ── # Plan
embed.color = 0xF39C12 embed.color = 0xF39C12
embed.set_footer(text=f"🔍 작업 분해 중... (ID: {task_id})") embed.set_footer(text=f"🔍 작업 분해 중... (ID: {task_id})")
await status_msg.edit(embed=embed) await status_msg.edit(embed=embed)
plan = await pipeline.plan(text) plan = await pipeline.plan(text)
tasks = plan.get("tasks", []) tasks = plan.get("tasks", [])
plan_text = plan.get("summary", str(plan))[:500]
plan_embed = discord.Embed( plan_embed = discord.Embed(
title="📝 작업 계획", title="📝 작업 계획",
description=f"```{plan_text}```", description=f"```{plan.get('summary', str(plan))[:500]}```",
color=0x2ECC71, color=0x2ECC71,
) )
if tasks: if tasks:
@@ -253,221 +269,289 @@ async def _handle_task(message: discord.Message, text: str):
for t in tasks[:10] for t in tasks[:10]
) )
plan_embed.add_field( plan_embed.add_field(
name=f"태스크 ({len(tasks)}개)", name=f"태스크 ({len(tasks)}개)", value=task_list[:1000], inline=False,
value=task_list[:1000],
inline=False,
) )
plan_embed.set_footer(text=f"ID: {task_id}")
await message.channel.send(embed=plan_embed) await message.channel.send(embed=plan_embed)
if not tasks: if not tasks:
await message.channel.send( await message.channel.send(
embed=discord.Embed( embed=discord.Embed(
title="⚠️ 실행할 태스크습니다", title="⚠️ 실행할 태스크 없",
description="요청을 더 구체적으로 해주세요.", description="요청을 더 구체적으로 해주세요.",
color=0xF39C12, color=0xF39C12,
) )
) )
return return
# ── Code 병렬 실행 ── # Code 병렬
code_embed = discord.Embed( code_embed = discord.Embed(
title=f"⚙️ 코딩 중... ({len(tasks)}개 병렬 실행)", title=f"⚙️ 코딩 중... ({len(tasks)}개 병렬)",
description="\n".join( description="\n".join(
f"{t.get('title', t.get('description', '?'))[:60]}" f"{t.get('title', '?')[:60]}" for t in tasks
for t in tasks
), ),
color=0xE67E22, color=0xE67E22,
) )
code_msg = await message.channel.send(embed=code_embed) code_msg = await message.channel.send(embed=code_embed)
code_outputs = await pipeline.code_parallel(tasks) code_outputs = await pipeline.code_parallel(tasks)
# 코딩 완료 표시
code_embed.title = f"✅ 코딩 완료 ({len(tasks)}개)" code_embed.title = f"✅ 코딩 완료 ({len(tasks)}개)"
code_embed.color = 0x2ECC71 code_embed.color = 0x2ECC71
await code_msg.edit(embed=code_embed) await code_msg.edit(embed=code_embed)
# ── 파일 적용 ── # 파일 적용
from core.file_applier import parse_code_output, apply_changes from core.file_applier import parse_code_output, apply_changes
all_applied = [] all_applied = []
for output in code_outputs: for output in code_outputs:
if not output.startswith("[ERROR]"): if not output.startswith("[ERROR]"):
changes = parse_code_output(output) changes = parse_code_output(output)
if changes: if changes:
applied = apply_changes(changes, str(config.PROJECT_ROOT)) applied = apply_changes(changes, ws.path)
all_applied.extend(applied) all_applied.extend(applied)
if all_applied: if all_applied:
files_text = "\n".join( files_text = "\n".join(f"• `{f['path']}` ({f['action']})" for f in all_applied[:15])
f"• `{f['path']}` ({f['action']})"
for f in all_applied[:15]
)
await message.channel.send( await message.channel.send(
embed=discord.Embed( embed=discord.Embed(title=f"📁 파일 적용 ({len(all_applied)}개)",
title=f"📁 파일 적용 ({len(all_applied)}개)", description=files_text, color=0x3498DB)
description=files_text,
color=0x3498DB,
)
)
# ── Batch Review ──
review_msg = await message.channel.send(
embed=discord.Embed(
title="🔍 전체 리뷰 중...",
color=0xF39C12,
)
) )
# Batch Review
review = await pipeline.batch_review(tasks, code_outputs) review = await pipeline.batch_review(tasks, code_outputs)
passed = review.get("passed", True) passed = review.get("passed", True)
review_embed = discord.Embed( await message.channel.send(
embed=discord.Embed(
title=f"{'' if passed else '⚠️'} 리뷰 결과", title=f"{'' if passed else '⚠️'} 리뷰 결과",
description=review.get("summary", str(review))[:500], description=review.get("summary", str(review))[:500],
color=0x2ECC71 if passed else 0xE74C3C, color=0x2ECC71 if passed else 0xE74C3C,
) )
await review_msg.edit(embed=review_embed)
# ── 총평 ──
summary = await pipeline.summarize(
text, plan, code_outputs, review, all_applied
) )
# 총평
summary = await pipeline.summarize(text, plan, code_outputs, review, all_applied)
summary_embed = discord.Embed( summary_embed = discord.Embed(
title=f"📊 {summary.get('title', '작업 완료')}", title=f"📊 {summary.get('title', '작업 완료')}",
description=summary.get("summary", "작업이 완료되었습니다."), description=summary.get("summary", "완료"),
color=0x9B59B6, color=0x9B59B6,
) )
for field_name, key, emoji in [
("변경 사항", "changes", ""),
("⚠️ 주의", "warnings", ""),
("🔜 다음 단계", "next_steps", ""),
]:
items = summary.get(key, [])
if items:
if key == "changes":
val = "\n".join(f"• `{c.get('file','?')}` — {c.get('description','')}" for c in items[:10])
else:
val = "\n".join(f"{s}" for s in items)
summary_embed.add_field(name=field_name, value=val[:1000], inline=False)
# 변경 파일 요약 summary_embed.set_footer(text=f"ID: {task_id} | {ws.name}")
changes = summary.get("changes", [])
if changes:
changes_text = "\n".join(
f"• `{c.get('file', '?')}` — {c.get('description', '')}"
for c in changes[:10]
)
summary_embed.add_field(
name="변경 사항",
value=changes_text[:1000],
inline=False,
)
# 주의사항
warnings = summary.get("warnings", [])
if warnings:
summary_embed.add_field(
name="⚠️ 주의",
value="\n".join(f"{w}" for w in warnings),
inline=False,
)
# 다음 단계 제안
next_steps = summary.get("next_steps", [])
if next_steps:
summary_embed.add_field(
name="🔜 다음 단계",
value="\n".join(f"{s}" for s in next_steps),
inline=False,
)
summary_embed.set_footer(text=f"ID: {task_id}")
await message.channel.send(embed=summary_embed) await message.channel.send(embed=summary_embed)
_channel_tasks.setdefault(message.channel.id, []).append(task_id)
except GeminiCallError as e: except GeminiCallError as e:
logger.error(f"파이프라인 Gemini 오류: {e}")
await message.channel.send( await message.channel.send(
embed=discord.Embed( embed=discord.Embed(title="❌ AI 호출 오류",
title="❌ AI 호출 오류", description=f"```{str(e)[:500]}```", color=0xE74C3C)
description=f"```{str(e)[:500]}```",
color=0xE74C3C,
)
) )
except Exception as e: except Exception as e:
logger.error(f"작업 실행 오류: {e}", exc_info=True) logger.error(f"작업 오류: {e}", exc_info=True)
await message.channel.send( await message.channel.send(
embed=discord.Embed( embed=discord.Embed(title="❌ 오류",
title="❌ 오류 발생", description=f"```{str(e)[:500]}```", color=0xE74C3C)
description=f"```{str(e)[:500]}```",
color=0xE74C3C,
)
) )
# ────────────────────────────────────────────── # ──────────────────────────────────────────────
# 핸들러: Clarify (되묻기) # 슬래시 커맨드: /workspace
# ────────────────────────────────────────────── # ──────────────────────────────────────────────
async def _handle_clarify(message: discord.Message, text: str, reason: str): workspace_group = app_commands.Group(name="workspace", description="워크스페이스 관리")
"""의도 불명확 — 사용자에게 되묻기."""
@workspace_group.command(name="set", description="이 채널에 워크스페이스 등록")
@app_commands.describe(name="프로젝트 이름", path="로컬 프로젝트 경로")
async def workspace_set(interaction: discord.Interaction, name: str, path: str):
"""채널에 워크스페이스 등록."""
from pathlib import Path as P
if not P(path).exists():
await interaction.response.send_message(
f"❌ 경로가 존재하지 않습니다: `{path}`", ephemeral=True
)
return
ws = ws_manager.set_workspace(interaction.channel_id, name, path)
embed = discord.Embed( embed = discord.Embed(
title="🤔 확인이 필요해요", title="✅ 워크스페이스 등록 완료",
description=( description=(
f"말씀하신 내용을 정확히 이해하고 싶어요.\n\n" f"**{name}** → `{path}`\n\n"
f"> {text[:200]}\n\n" f"이 채널에서 봇과 대화할 수 있습니다.\n"
f"**💬 질문/대화**인가요, **🔧 작업 요청**인가요?\n" f"작업 실행을 위해 Git과 Vikunja를 설정해주세요:"
f"`질문` 또는 `작업`으로 답해주세요."
), ),
color=0xF39C12, color=0x2ECC71,
) )
embed.set_footer(text=f"사유: {reason}") embed.add_field(name="Git 설정", value="`/workspace git`", inline=True)
await message.reply(embed=embed) embed.add_field(name="Vikunja 설정", value="`/workspace vikunja`", inline=True)
await interaction.response.send_message(embed=embed)
@workspace_group.command(name="git", description="Git 연결 설정")
@app_commands.describe(url="Git 서버 URL", token="API 토큰", repo="Owner/Repo", branch="기본 브랜치")
async def workspace_git(interaction: discord.Interaction, url: str, token: str,
repo: str = "", branch: str = "main"):
"""워크스페이스에 Git 설정."""
ws = ws_manager.get_workspace(interaction.channel_id)
if not ws:
await interaction.response.send_message(
"❌ 이 채널에 워크스페이스가 없습니다. `/workspace set`을 먼저 실행하세요.",
ephemeral=True,
)
return
ws_manager.set_git(interaction.channel_id, url, token, repo, branch)
embed = discord.Embed(
title="✅ Git 연결 완료",
description=f"**{ws.name}** → {url}\nRepo: `{repo or '미지정'}`\nBranch: `{branch}`",
color=0x2ECC71,
)
await interaction.response.send_message(embed=embed)
@workspace_group.command(name="vikunja", description="Vikunja 프로젝트 관리 연결")
@app_commands.describe(url="Vikunja URL", token="API 토큰", project_id="프로젝트 ID")
async def workspace_vikunja(interaction: discord.Interaction, url: str, token: str,
project_id: int):
"""워크스페이스에 Vikunja 설정."""
ws = ws_manager.get_workspace(interaction.channel_id)
if not ws:
await interaction.response.send_message(
"❌ 이 채널에 워크스페이스가 없습니다. `/workspace set`을 먼저 실행하세요.",
ephemeral=True,
)
return
ws_manager.set_vikunja(interaction.channel_id, url, token, project_id)
embed = discord.Embed(
title="✅ Vikunja 연결 완료",
description=f"**{ws.name}** → {url}\nProject ID: `{project_id}`",
color=0x2ECC71,
)
await interaction.response.send_message(embed=embed)
@workspace_group.command(name="info", description="현재 워크스페이스 정보 표시")
async def workspace_info(interaction: discord.Interaction):
"""현재 채널 워크스페이스 상태."""
ws = ws_manager.get_workspace(interaction.channel_id)
if not ws:
await interaction.response.send_message(
"이 채널에 등록된 워크스페이스가 없습니다.\n`/workspace set`으로 등록하세요.",
ephemeral=True,
)
return
git_status = f"{ws.git.url}" if ws.git.is_configured else "❌ 미설정"
vik_status = f"{ws.vikunja.url} (project {ws.vikunja.project_id})" if ws.vikunja.is_configured else "❌ 미설정"
embed = discord.Embed(
title=f"📂 {ws.name}",
description=f"경로: `{ws.path}`\nDocs: `{ws.docs_path}`",
color=0x3498DB if ws.is_ready else 0xF39C12,
)
embed.add_field(name="Git", value=git_status, inline=False)
embed.add_field(name="Vikunja", value=vik_status, inline=False)
embed.add_field(
name="상태",
value="✅ 모든 설정 완료 — 작업 가능" if ws.is_ready else "⚠️ 설정 미완료 — 작업 차단됨",
inline=False,
)
await interaction.response.send_message(embed=embed)
@workspace_group.command(name="remove", description="워크스페이스 등록 해제")
async def workspace_remove(interaction: discord.Interaction):
"""워크스페이스 제거."""
ws = ws_manager.get_workspace(interaction.channel_id)
if not ws:
await interaction.response.send_message(
"이 채널에 등록된 워크스페이스가 없습니다.", ephemeral=True
)
return
name = ws.name
ws_manager.remove_workspace(interaction.channel_id)
await interaction.response.send_message(
embed=discord.Embed(
title="🗑️ 워크스페이스 해제",
description=f"**{name}** 등록이 해제되었습니다.\n이 채널에서 봇이 더 이상 자동 응답하지 않습니다.",
color=0x95A5A6,
)
)
@workspace_group.command(name="list", description="등록된 전체 워크스페이스 목록")
async def workspace_list(interaction: discord.Interaction):
"""전체 워크스페이스 목록."""
all_ws = ws_manager.list_all()
if not all_ws:
await interaction.response.send_message(
"등록된 워크스페이스가 없습니다.\n`/workspace set`으로 등록하세요.",
ephemeral=True,
)
return
embed = discord.Embed(title="📂 워크스페이스 목록", color=0x3498DB)
for ws in all_ws:
status = "" if ws.is_ready else "⚠️"
embed.add_field(
name=f"{status} {ws.name}",
value=f"채널: <#{ws.channel_id}>\n경로: `{ws.path}`",
inline=False,
)
await interaction.response.send_message(embed=embed)
# 슬래시 커맨드 그룹 등록
bot.tree.add_command(workspace_group)
# ────────────────────────────────────────────── # ──────────────────────────────────────────────
# 기존 ! 명령어 (유지) # 기존 ! 명령어 (유지, 하위호환)
# ────────────────────────────────────────────── # ──────────────────────────────────────────────
@bot.command(name="agent", help="AI Agent에게 작업 요청\n예: !agent README에 설치 방법 추가해줘")
async def agent_command(ctx: commands.Context, *, request: str):
"""작업 요청 → Pipeline 실행."""
await _handle_task(ctx.message, request)
@bot.command(name="chat", help="Gemini에게 질문/대화\n예: !chat 네 소개를 해줘")
async def chat_command(ctx: commands.Context, *, message: str):
"""단순 대화."""
await _handle_chat(ctx.message, message)
@bot.command(name="ping", help="봇 응답 테스트") @bot.command(name="ping", help="봇 응답 테스트")
async def ping_command(ctx: commands.Context): async def ping_command(ctx: commands.Context):
"""연결 상태 확인."""
latency = round(bot.latency * 1000) latency = round(bot.latency * 1000)
await ctx.send(f"🏓 Pong! ({latency}ms)") await ctx.send(f"🏓 Pong! ({latency}ms)")
@bot.command(name="info", help="시스템 정보") @bot.command(name="info", help="시스템 정보")
async def info_command(ctx: commands.Context): async def info_command(ctx: commands.Context):
"""시스템 정보 표시."""
embed = discord.Embed( embed = discord.Embed(
title="🤖 Variet Agent", title="🤖 Variet Agent",
description="AI Agent Team — Gemini CLI 기반 자동화 개발 에이전트", description="AI Agent Team — 워크스페이스 기반 자동화 개발 에이전트",
color=0x9B59B6, color=0x9B59B6,
) )
embed.add_field(name="프로젝트", value=str(config.PROJECT_ROOT), inline=False) all_ws = ws_manager.list_all()
embed.add_field(name="명령어 접두사", value=config.DISCORD_COMMAND_PREFIX, inline=True) embed.add_field(name="워크스페이스", value=f"{len(all_ws)}개 등록", inline=True)
embed.add_field(name="서버", value=str(len(bot.guilds)), inline=True) embed.add_field(name="서버", value=str(len(bot.guilds)), inline=True)
if _auto_chat_channel_ids:
channels = ", ".join(f"<#{ch_id}>" for ch_id in _auto_chat_channel_ids)
embed.add_field(name="자동 채팅 채널", value=channels, inline=False)
embed.add_field( embed.add_field(
name="파이프라인", name="파이프라인",
value="Plan → Code(병렬) → Review(배치) → 총평", value="통합분류(1회) → Code(병렬) → Review(배치) → 총평 → 기록",
inline=False, inline=False,
) )
await ctx.send(embed=embed) await ctx.send(embed=embed)
async def start_bot(): async def start_bot():
"""Discord Bot 시작 (async).""" """Discord Bot 시작."""
token = config.DISCORD_BOT_TOKEN token = config.DISCORD_BOT_TOKEN
if not token: if not token:
logger.error("DISCORD_BOT_TOKEN이 설정되지 않았습니다. .env 파일을 확인하세요.") logger.error("DISCORD_BOT_TOKEN이 설정되지 않았습니다.")
return return
logger.info("Discord Bot 시작 중...") logger.info("Discord Bot 시작 중...")

132
core/docs_manager.py Normal file
View File

@@ -0,0 +1,132 @@
"""Docs Manager — 작업 기록 + Wiki 문서 관리.
모든 작업은 docs/에 기록되며, Gemini 호출 시
문서 경로와 기존 문서 목록을 프롬프트에 주입합니다.
"""
import logging
from datetime import datetime
from pathlib import Path
logger = logging.getLogger("variet.docs")
class DocsManager:
"""프로젝트 문서 관리자."""
def __init__(self, project_path: str, docs_subpath: str = "docs/wiki"):
self.project_path = Path(project_path)
self.docs_path = self.project_path / docs_subpath
self.sessions_path = self.project_path / "docs" / "sessions"
self.changelog_path = self.project_path / "docs" / "changelog.md"
# 디렉토리 생성
self.docs_path.mkdir(parents=True, exist_ok=True)
self.sessions_path.mkdir(parents=True, exist_ok=True)
def get_docs_index(self) -> str:
"""docs/wiki 내 문서 목록을 문자열로 반환 (프롬프트 주입용)."""
if not self.docs_path.exists():
return "문서 없음"
files = sorted(self.docs_path.glob("*.md"))
if not files:
return "문서 없음"
lines = ["=== PROJECT DOCS ==="]
for f in files:
size = f.stat().st_size
lines.append(f" - {f.name} ({size}B)")
lines.append(f"경로: {self.docs_path}")
lines.append("=== END DOCS ===")
return "\n".join(lines)
def get_doc_content(self, filename: str) -> str:
"""특정 문서 내용 반환."""
path = self.docs_path / filename
if path.exists():
return path.read_text(encoding="utf-8", errors="ignore")
return ""
def get_all_docs_content(self, max_total_bytes: int = 30000) -> str:
"""전체 문서 내용 반환 (예산 내에서)."""
files = sorted(self.docs_path.glob("*.md"))
parts = ["=== PROJECT KNOWLEDGE BASE ===\n"]
total = 0
for f in files:
content = f.read_text(encoding="utf-8", errors="ignore")
if total + len(content.encode("utf-8")) > max_total_bytes:
parts.append(f"\n--- {f.name} (예산 초과, 생략) ---")
continue
parts.append(f"\n--- {f.name} ---\n{content}")
total += len(content.encode("utf-8"))
parts.append("\n=== END KNOWLEDGE BASE ===")
return "\n".join(parts)
def record_session(self, request: str, summary: dict, plan: dict = None) -> str:
"""작업 세션을 기록."""
now = datetime.now()
filename = f"{now.strftime('%Y-%m-%d_%H%M%S')}.md"
filepath = self.sessions_path / filename
lines = [
f"# 작업 기록 — {now.strftime('%Y-%m-%d %H:%M')}",
"",
f"## 요청",
f"{request}",
"",
]
if plan:
lines.extend([
f"## 계획",
f"{plan.get('summary', str(plan)[:300])}",
"",
])
if isinstance(summary, dict):
lines.extend([
f"## 결과",
f"{summary.get('summary', str(summary)[:300])}",
"",
])
changes = summary.get("changes", [])
if changes:
lines.append("## 변경 파일")
for c in changes:
lines.append(f"- `{c.get('file', '?')}` — {c.get('description', '')}")
lines.append("")
warnings = summary.get("warnings", [])
if warnings:
lines.append("## 주의사항")
for w in warnings:
lines.append(f"- {w}")
lines.append("")
filepath.write_text("\n".join(lines), encoding="utf-8")
logger.info(f"세션 기록: {filepath}")
return str(filepath)
def append_changelog(self, entry: str):
"""Changelog에 항목 추가."""
now = datetime.now().strftime("%Y-%m-%d %H:%M")
line = f"\n- [{now}] {entry}"
if self.changelog_path.exists():
content = self.changelog_path.read_text(encoding="utf-8")
else:
content = "# Changelog\n"
content += line
self.changelog_path.write_text(content, encoding="utf-8")
def update_wiki(self, filename: str, content: str):
"""Wiki 문서 생성/수정."""
path = self.docs_path / filename
path.write_text(content, encoding="utf-8")
logger.info(f"Wiki 업데이트: {path}")

View File

@@ -1,6 +1,6 @@
"""Task Pipeline — Plan → Code(병렬) → Batch Review → 총평. """Task Pipeline — Plan → Code(병렬) → Review(배치) → 파일 적용 → 총평 → 기록.
병렬 코드 실행, 단일 배치 리뷰, 파일 적용, 종합 총평을 수행합니다. docs/wiki를 프롬프트에 주입하고, 완료 시 세션 기록 + changelog 업데이트.
""" """
import asyncio import asyncio
@@ -11,16 +11,19 @@ from core.project_indexer import ProjectIndex
from core.context_manager import ContextManager from core.context_manager import ContextManager
from core.gemini_caller import GeminiCaller, GeminiCallError from core.gemini_caller import GeminiCaller, GeminiCallError
from core.file_applier import parse_code_output, apply_changes from core.file_applier import parse_code_output, apply_changes
from core.docs_manager import DocsManager
class TaskPipeline: class TaskPipeline:
"""작업 파이프라인: Plan → Code(병렬) → Review(배치) → Summary.""" """작업 파이프라인: Plan → Code(병렬) → Review(배치) → 기록."""
def __init__(self, project_path: str, token_budget: int = 50_000): def __init__(self, project_path: str, token_budget: int = 50_000,
docs_subpath: str = "docs/wiki"):
self.project_path = project_path self.project_path = project_path
self.index = ProjectIndex(project_path) self.index = ProjectIndex(project_path)
self.ctx = ContextManager(self.index, token_budget) self.ctx = ContextManager(self.index, token_budget)
self.gemini = GeminiCaller(project_path) self.gemini = GeminiCaller(project_path)
self.docs = DocsManager(project_path, docs_subpath)
self.log: list[dict] = [] self.log: list[dict] = []
def setup(self): def setup(self):
@@ -28,6 +31,19 @@ class TaskPipeline:
self.index.scan() self.index.scan()
return self return self
# ──────────────────────────────────────────
# Docs 컨텍스트 (모든 호출에 주입)
# ──────────────────────────────────────────
def _docs_context(self) -> str:
"""Gemini 호출에 주입할 프로젝트 문서 컨텍스트."""
index = self.docs.get_docs_index()
return (
f"\n{index}\n"
f"작업 완료 시 관련 문서가 있으면 업데이트하세요.\n"
f"docs 경로: {self.docs.docs_path}\n"
)
# ────────────────────────────────────────── # ──────────────────────────────────────────
# Plan # Plan
# ────────────────────────────────────────── # ──────────────────────────────────────────
@@ -35,9 +51,12 @@ class TaskPipeline:
async def plan(self, user_request: str) -> dict: async def plan(self, user_request: str) -> dict:
"""Planner로 작업 분해.""" """Planner로 작업 분해."""
structure = self.index.get_structure_summary() structure = self.index.get_structure_summary()
docs_ctx = self._docs_context()
prompt = ( prompt = (
f"## User Request\n{user_request}\n\n" f"## User Request\n{user_request}\n\n"
f"## Project Structure\n{structure}\n\n" f"## Project Structure\n{structure}\n\n"
f"## Project Docs\n{docs_ctx}\n\n"
f"Decompose this request into concrete tasks." f"Decompose this request into concrete tasks."
) )
@@ -54,10 +73,12 @@ class TaskPipeline:
async def code(self, task: dict) -> str: async def code(self, task: dict) -> str:
"""Coder로 코드 수정 (단일 태스크).""" """Coder로 코드 수정 (단일 태스크)."""
context = self.ctx.gather(task.get("description", task.get("title", ""))) context = self.ctx.gather(task.get("description", task.get("title", "")))
docs_ctx = self._docs_context()
prompt = ( prompt = (
f"## Task\n{json.dumps(task, ensure_ascii=False, indent=2)}\n\n" f"## Task\n{json.dumps(task, ensure_ascii=False, indent=2)}\n\n"
f"## Context\n{context}\n\n" f"## Context\n{context}\n\n"
f"## Project Docs\n{docs_ctx}\n\n"
f"Implement the changes described in the task." f"Implement the changes described in the task."
) )
@@ -76,7 +97,6 @@ class TaskPipeline:
return_exceptions=True, return_exceptions=True,
) )
# 예외를 문자열로 변환
processed = [] processed = []
for i, result in enumerate(results): for i, result in enumerate(results):
if isinstance(result, Exception): if isinstance(result, Exception):
@@ -89,25 +109,23 @@ class TaskPipeline:
return processed return processed
# ────────────────────────────────────────── # ──────────────────────────────────────────
# Batch Review (전체 한 번) # Batch Review
# ────────────────────────────────────────── # ──────────────────────────────────────────
async def batch_review(self, tasks: list[dict], code_outputs: list[str]) -> dict: async def batch_review(self, tasks: list[dict], code_outputs: list[str]) -> dict:
"""모든 코드 출력을 한 번에 리뷰.""" """모든 코드 출력을 한 번에 리뷰."""
# 태스크별 코드 출력을 하나로 합침
combined = [] combined = []
for i, (task, output) in enumerate(zip(tasks, code_outputs)): for i, (task, output) in enumerate(zip(tasks, code_outputs)):
title = task.get("title", task.get("description", f"Task {i+1}")) title = task.get("title", task.get("description", f"Task {i+1}"))
combined.append( combined.append(
f"### Task {i+1}: {title}\n" f"### Task {i+1}: {title}\n"
f"{output[:2000]}\n" # 각 출력 2000자 제한 f"{output[:2000]}\n"
) )
prompt = ( prompt = (
f"## All Code Changes\n\n" f"## All Code Changes\n\n"
f"{'---'.join(combined)}\n\n" f"{'---'.join(combined)}\n\n"
f"Review ALL changes above as a whole." f"Review ALL changes above as a whole."
f"Check for consistency, conflicts between tasks, and overall correctness."
) )
response = await self.gemini.call("reviewer", prompt, timeout=180) response = await self.gemini.call("reviewer", prompt, timeout=180)
@@ -117,17 +135,12 @@ class TaskPipeline:
return review or {"passed": True, "summary": response, "raw": response} return review or {"passed": True, "summary": response, "raw": response}
# ────────────────────────────────────────── # ──────────────────────────────────────────
# 총평 (Summary) # 총평
# ────────────────────────────────────────── # ──────────────────────────────────────────
async def summarize( async def summarize(self, user_request: str, plan: dict,
self, code_outputs: list[str], review: dict,
user_request: str, applied_files: list[dict]) -> dict:
plan: dict,
code_outputs: list[str],
review: dict,
applied_files: list[dict],
) -> dict:
"""전체 작업 결과 종합 총평.""" """전체 작업 결과 종합 총평."""
file_changes = "\n".join( file_changes = "\n".join(
f"- {f['path']} ({f['action']}, {f.get('lines', '?')}L)" f"- {f['path']} ({f['action']}, {f.get('lines', '?')}L)"
@@ -136,7 +149,6 @@ class TaskPipeline:
prompt = ( prompt = (
f"## 원래 요청\n{user_request}\n\n" f"## 원래 요청\n{user_request}\n\n"
f"## 계획\n{plan.get('summary', str(plan))[:500]}\n\n"
f"## 태스크 수\n{len(plan.get('tasks', []))}\n\n" f"## 태스크 수\n{len(plan.get('tasks', []))}\n\n"
f"## 리뷰 결과\n{review.get('summary', str(review))[:500]}\n\n" f"## 리뷰 결과\n{review.get('summary', str(review))[:500]}\n\n"
f"## 변경된 파일\n{file_changes}\n\n" f"## 변경된 파일\n{file_changes}\n\n"
@@ -156,11 +168,11 @@ class TaskPipeline:
} }
# ────────────────────────────────────────── # ──────────────────────────────────────────
# 전체 파이프라인 실행 # 전체 파이프라인
# ────────────────────────────────────────── # ──────────────────────────────────────────
async def execute(self, user_request: str) -> dict: async def execute(self, user_request: str) -> dict:
"""전체 파이프라인: Plan → Code(병렬) → Review(배치) → 파일 적용 → 총평.""" """Plan → Code(병렬) → 파일 적용 → Review → 총평기록."""
result = { result = {
"request": user_request, "request": user_request,
"plan": None, "plan": None,
@@ -178,7 +190,7 @@ class TaskPipeline:
if not tasks: if not tasks:
result["summary"] = { result["summary"] = {
"title": "태스크 없음", "title": "태스크 없음",
"summary": "Planner가 실행할 태스크를 생성하지 못했습니다.", "summary": "Planner가 태스크를 생성하지 못했습니다.",
"changes": [], "changes": [],
"warnings": ["요청을 더 구체적으로 해주세요."], "warnings": ["요청을 더 구체적으로 해주세요."],
"next_steps": [], "next_steps": [],
@@ -189,7 +201,7 @@ class TaskPipeline:
code_outputs = await self.code_parallel(tasks) code_outputs = await self.code_parallel(tasks)
result["code_outputs"] = [o[:500] for o in code_outputs] result["code_outputs"] = [o[:500] for o in code_outputs]
# 3. 파일 적용 (Coder 출력 파싱) # 3. 파일 적용
all_applied = [] all_applied = []
for output in code_outputs: for output in code_outputs:
if output.startswith("[ERROR]"): if output.startswith("[ERROR]"):
@@ -200,20 +212,22 @@ class TaskPipeline:
all_applied.extend(applied) all_applied.extend(applied)
result["applied_files"] = all_applied result["applied_files"] = all_applied
# 4. Batch Review (전체 1회) # 4. Batch Review
review = await self.batch_review(tasks, code_outputs) review = await self.batch_review(tasks, code_outputs)
result["review"] = review result["review"] = review
# 리뷰 실패 시 로그만 (재시도 없이 진행)
if not review.get("passed", True):
self._log("review_warning", "batch", "리뷰 이슈 있음 — 총평에 반영")
# 5. 총평 # 5. 총평
summary = await self.summarize( summary = await self.summarize(
user_request, plan, code_outputs, review, all_applied user_request, plan, code_outputs, review, all_applied
) )
result["summary"] = summary result["summary"] = summary
# 6. 기록
self.docs.record_session(user_request, summary, plan)
self.docs.append_changelog(
summary.get("title", user_request[:50])
)
return result return result
# ────────────────────────────────────────── # ──────────────────────────────────────────
@@ -222,7 +236,6 @@ class TaskPipeline:
def _extract_json(self, text: str) -> dict | None: def _extract_json(self, text: str) -> dict | None:
"""텍스트에서 JSON 블록 추출.""" """텍스트에서 JSON 블록 추출."""
# ```json ... ``` 패턴
match = re.search(r"```json\s*\n(.*?)\n\s*```", text, re.DOTALL) match = re.search(r"```json\s*\n(.*?)\n\s*```", text, re.DOTALL)
if match: if match:
try: try:
@@ -230,7 +243,6 @@ class TaskPipeline:
except json.JSONDecodeError: except json.JSONDecodeError:
pass pass
# { ... } 직접 찾기 (중첩 지원)
brace_depth = 0 brace_depth = 0
start = -1 start = -1
for i, ch in enumerate(text): for i, ch in enumerate(text):

180
core/workspace.py Normal file
View File

@@ -0,0 +1,180 @@
"""Workspace Manager — 채널별 프로젝트 설정 관리.
워크스페이스는 JSON 파일로 영속 저장됩니다.
각 디스코드 채널은 하나의 워크스페이스에 바인딩됩니다.
"""
import json
import logging
from pathlib import Path
from dataclasses import dataclass, field, asdict
from typing import Optional
logger = logging.getLogger("variet.workspace")
# 워크스페이스 설정 파일 경로
WORKSPACES_FILE = Path(__file__).parent.parent / "workspaces.json"
@dataclass
class GitConfig:
url: str = ""
token: str = ""
repo: str = "" # "Owner/RepoName"
branch: str = "main"
@property
def is_configured(self) -> bool:
return bool(self.url and self.token)
@dataclass
class VikunjaConfig:
url: str = ""
token: str = ""
project_id: int = 0
@property
def is_configured(self) -> bool:
return bool(self.url and self.token and self.project_id)
@dataclass
class Workspace:
name: str
path: str
channel_id: int
git: GitConfig = field(default_factory=GitConfig)
vikunja: VikunjaConfig = field(default_factory=VikunjaConfig)
docs_path: str = "docs/wiki"
@property
def is_ready(self) -> bool:
"""Git + Vikunja 모두 설정되었는지."""
return self.git.is_configured and self.vikunja.is_configured
@property
def missing_configs(self) -> list[str]:
"""미설정 항목 목록."""
missing = []
if not self.git.is_configured:
missing.append("Git")
if not self.vikunja.is_configured:
missing.append("Vikunja")
return missing
@property
def abs_docs_path(self) -> Path:
return Path(self.path) / self.docs_path
def to_dict(self) -> dict:
return {
"name": self.name,
"path": self.path,
"channel_id": self.channel_id,
"git": asdict(self.git),
"vikunja": asdict(self.vikunja),
"docs_path": self.docs_path,
}
@classmethod
def from_dict(cls, data: dict) -> "Workspace":
return cls(
name=data["name"],
path=data["path"],
channel_id=data["channel_id"],
git=GitConfig(**data.get("git", {})),
vikunja=VikunjaConfig(**data.get("vikunja", {})),
docs_path=data.get("docs_path", "docs/wiki"),
)
class WorkspaceManager:
"""워크스페이스 CRUD + 영속 저장."""
def __init__(self, config_path: Path = WORKSPACES_FILE):
self.config_path = config_path
self.workspaces: dict[int, Workspace] = {} # channel_id → Workspace
self._load()
def _load(self):
"""JSON에서 워크스페이스 로드."""
if not self.config_path.exists():
self.workspaces = {}
return
try:
data = json.loads(self.config_path.read_text(encoding="utf-8"))
self.workspaces = {
int(ch_id): Workspace.from_dict(ws)
for ch_id, ws in data.items()
}
logger.info(f"워크스페이스 {len(self.workspaces)}개 로드됨")
except Exception as e:
logger.error(f"워크스페이스 로드 실패: {e}")
self.workspaces = {}
def _save(self):
"""JSON으로 워크스페이스 저장."""
data = {
str(ch_id): ws.to_dict()
for ch_id, ws in self.workspaces.items()
}
self.config_path.write_text(
json.dumps(data, ensure_ascii=False, indent=2),
encoding="utf-8",
)
def set_workspace(self, channel_id: int, name: str, path: str) -> Workspace:
"""채널에 워크스페이스 등록."""
ws = Workspace(name=name, path=path, channel_id=channel_id)
self.workspaces[channel_id] = ws
self._save()
logger.info(f"워크스페이스 설정: #{channel_id}{name} ({path})")
return ws
def get_workspace(self, channel_id: int) -> Optional[Workspace]:
"""채널의 워크스페이스 조회."""
return self.workspaces.get(channel_id)
def is_workspace_channel(self, channel_id: int) -> bool:
"""이 채널이 워크스페이스로 등록되어 있는지."""
return channel_id in self.workspaces
def set_git(self, channel_id: int, url: str, token: str,
repo: str = "", branch: str = "main") -> bool:
"""워크스페이스에 Git 설정."""
ws = self.workspaces.get(channel_id)
if not ws:
return False
ws.git = GitConfig(url=url, token=token, repo=repo, branch=branch)
self._save()
logger.info(f"Git 설정: {ws.name}{url}")
return True
def set_vikunja(self, channel_id: int, url: str, token: str,
project_id: int) -> bool:
"""워크스페이스에 Vikunja 설정."""
ws = self.workspaces.get(channel_id)
if not ws:
return False
ws.vikunja = VikunjaConfig(url=url, token=token, project_id=project_id)
self._save()
logger.info(f"Vikunja 설정: {ws.name}{url} (project {project_id})")
return True
def remove_workspace(self, channel_id: int) -> bool:
"""워크스페이스 제거."""
if channel_id in self.workspaces:
name = self.workspaces[channel_id].name
del self.workspaces[channel_id]
self._save()
logger.info(f"워크스페이스 제거: {name}")
return True
return False
def list_all(self) -> list[Workspace]:
"""전체 워크스페이스 목록."""
return list(self.workspaces.values())

58
prompts/unified.md Normal file
View File

@@ -0,0 +1,58 @@
# Unified Agent — 분류 + 즉답/계획 통합 프롬프트
당신은 **Variet Agent**입니다. 사용자의 메시지를 받아 스스로 판단하여 즉답하거나 작업 계획을 수립합니다.
## 판단 기준
1. **즉답 가능** (질문, 인사, 설명 요청, 의견 교환)
`mode: "chat"` — 바로 답변을 포함하세요.
2. **작업 필요** (코드 수정, 파일 생성, 리팩토링, 배포 등 실제 변경이 필요한 요청)
`mode: "task"` — 구체적 태스크 목록을 생성하세요.
3. **불명확** (맥락 부족, 대상 불분명)
`mode: "clarify"` — 되물을 질문을 포함하세요.
## 출력 형식 (반드시 JSON)
### 즉답인 경우:
```json
{
"mode": "chat",
"response": "여기에 답변 내용"
}
```
### 작업인 경우:
```json
{
"mode": "task",
"summary": "작업 요약",
"tasks": [
{
"id": 1,
"title": "태스크 제목",
"files": ["path/to/file.py"],
"description": "구체적 변경 내용",
"type": "create|modify|delete"
}
],
"risk": "low|medium|high"
}
```
### 불명확한 경우:
```json
{
"mode": "clarify",
"question": "어떤 파일을 수정할까요?"
}
```
## 규칙
- 반드시 위 JSON 형식만 출력하세요. JSON 외의 텍스트를 포함하지 마세요.
- chat 모드의 response는 마크다운 사용 가능, 완성된 답변이어야 합니다.
- task 모드의 tasks는 1-5개, 각 태스크는 독립 실행 가능해야 합니다.
- 한국어로 응답하세요.
- 이전 대화 기록이 주어지면, 맥락을 고려하세요.