feat(api): API Server + Discord Bot 구현 #task-191

This commit is contained in:
quantlab
2026-03-06 18:52:11 +09:00
parent 5a931a5480
commit 427763c493
7 changed files with 526 additions and 0 deletions

1
api/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""API layer for Variet Agent."""

181
api/discord_bot.py Normal file
View File

@@ -0,0 +1,181 @@
"""Discord Bot 어댑터.
사용자 명령을 받아 FastAPI API를 호출하고 결과를 디스코드로 보고합니다.
"""
import asyncio
import logging
import discord
from discord.ext import commands
import config
logger = logging.getLogger("variet.discord")
# Bot 설정
intents = discord.Intents.default()
intents.message_content = True # MESSAGE CONTENT INTENT 필요
bot = commands.Bot(
command_prefix=config.DISCORD_COMMAND_PREFIX,
intents=intents,
help_command=commands.DefaultHelpCommand(no_category="명령어"),
)
# In-memory: Discord 채널 ↔ Task 매핑
_channel_tasks: dict[int, list[str]] = {}
@bot.event
async def on_ready():
"""봇 접속 완료."""
logger.info(f"Discord Bot 접속 완료: {bot.user} (ID: {bot.user.id})")
logger.info(f"서버 {len(bot.guilds)}개 연결됨")
await bot.change_presence(
activity=discord.Activity(
type=discord.ActivityType.listening,
name=f"{config.DISCORD_COMMAND_PREFIX}agent",
)
)
@bot.command(name="agent", help="AI Agent에게 작업 요청\n예: !agent README에 설치 방법 추가해줘")
async def agent_command(ctx: commands.Context, *, request: str):
"""작업 요청 → Pipeline 실행 → 결과 보고."""
# 1. 접수 메시지
embed = discord.Embed(
title="📋 작업 접수",
description=f"```{request[:200]}```",
color=0x3498DB,
)
embed.set_footer(text="분석 중...")
status_msg = await ctx.send(embed=embed)
try:
# 2. Pipeline 직접 실행 (같은 프로세스)
from core.task_pipeline import TaskPipeline
import uuid
task_id = uuid.uuid4().hex[:8]
# Planning
embed.color = 0xF39C12
embed.set_footer(text=f"🔍 작업 분해 중... (ID: {task_id})")
await status_msg.edit(embed=embed)
pipeline = TaskPipeline(
project_path=str(config.PROJECT_ROOT),
)
pipeline.setup()
# Plan 단계
plan = await pipeline.plan(request)
tasks = plan.get("tasks", [])
plan_text = plan.get("summary", str(plan))[:500]
plan_embed = discord.Embed(
title="📝 작업 계획",
description=f"```{plan_text}```",
color=0x2ECC71,
)
if tasks:
task_list = "\n".join(
f"{t.get('title', t.get('description', '?'))}"
for t in tasks[:10]
)
plan_embed.add_field(
name=f"태스크 ({len(tasks)}개)",
value=task_list[:1000],
inline=False,
)
plan_embed.set_footer(text=f"ID: {task_id}")
await ctx.send(embed=plan_embed)
# Code + Review 단계
if tasks:
for i, task in enumerate(tasks, 1):
progress_embed = discord.Embed(
title=f"⚙️ 실행 중 ({i}/{len(tasks)})",
description=task.get("title", task.get("description", ""))[:200],
color=0xE67E22,
)
await ctx.send(embed=progress_embed)
code_output = await pipeline.code(task)
review = await pipeline.review(task, code_output)
passed = review.get("passed", True)
review_emoji = "" if passed else "⚠️"
review_embed = discord.Embed(
title=f"{review_emoji} 리뷰 결과 ({i}/{len(tasks)})",
description=review.get("summary", str(review))[:500],
color=0x2ECC71 if passed else 0xE74C3C,
)
await ctx.send(embed=review_embed)
# 완료
done_embed = discord.Embed(
title="✅ 작업 완료",
description=f"{len(tasks)}개 태스크 처리 완료",
color=0x2ECC71,
)
done_embed.set_footer(text=f"ID: {task_id}")
await ctx.send(embed=done_embed)
# 채널 태스크 기록
_channel_tasks.setdefault(ctx.channel.id, []).append(task_id)
except Exception as e:
logger.error(f"작업 실행 오류: {e}", exc_info=True)
error_embed = discord.Embed(
title="❌ 오류 발생",
description=f"```{str(e)[:500]}```",
color=0xE74C3C,
)
await ctx.send(embed=error_embed)
@bot.command(name="ping", help="봇 응답 테스트")
async def ping_command(ctx: commands.Context):
"""연결 상태 확인."""
latency = round(bot.latency * 1000)
await ctx.send(f"🏓 Pong! ({latency}ms)")
@bot.command(name="info", help="시스템 정보")
async def info_command(ctx: commands.Context):
"""시스템 정보 표시."""
embed = discord.Embed(
title="🤖 Variet Agent",
description="AI Agent Team — Gemini CLI 기반 자동화 개발 에이전트",
color=0x9B59B6,
)
embed.add_field(name="프로젝트", value=str(config.PROJECT_ROOT), inline=False)
embed.add_field(name="명령어 접두사", value=config.DISCORD_COMMAND_PREFIX, inline=True)
embed.add_field(name="서버 수", value=str(len(bot.guilds)), inline=True)
await ctx.send(embed=embed)
async def start_bot():
"""Discord Bot 시작 (async)."""
token = config.DISCORD_BOT_TOKEN
if not token:
logger.error("DISCORD_BOT_TOKEN이 설정되지 않았습니다. .env 파일을 확인하세요.")
return
logger.info("Discord Bot 시작 중...")
try:
await bot.start(token)
except discord.LoginFailure:
logger.error("Discord 로그인 실패 — 토큰을 확인하세요.")
except Exception as e:
logger.error(f"Discord Bot 오류: {e}")
async def stop_bot():
"""Discord Bot 정지."""
if not bot.is_closed():
await bot.close()

61
api/models.py Normal file
View File

@@ -0,0 +1,61 @@
"""API 요청/응답 모델.
FastAPI + Discord Bot 공용 데이터 모델입니다.
"""
from enum import Enum
from pydantic import BaseModel, Field
from datetime import datetime
class TaskStatus(str, Enum):
"""작업 상태."""
PENDING = "pending"
PLANNING = "planning"
EXECUTING = "executing"
REVIEWING = "reviewing"
DONE = "done"
ERROR = "error"
class TaskRequest(BaseModel):
"""작업 요청."""
project: str = Field(
default=".",
description="프로젝트 경로 (기본: 현재 디렉토리)",
)
request: str = Field(
...,
description="사용자 작업 요청 (자연어)",
)
mode: str = Field(
default="general",
description="실행 모드: general | secure",
)
class TaskProgress(BaseModel):
"""작업 진행 단계."""
phase: str
message: str
timestamp: datetime = Field(default_factory=datetime.now)
class TaskResponse(BaseModel):
"""작업 결과."""
task_id: str
status: TaskStatus = TaskStatus.PENDING
request: str = ""
plan: dict | None = None
progress: list[TaskProgress] = Field(default_factory=list)
result: dict | None = None
error: str | None = None
created_at: datetime = Field(default_factory=datetime.now)
updated_at: datetime = Field(default_factory=datetime.now)
class HealthResponse(BaseModel):
"""헬스체크 응답."""
status: str = "ok"
version: str = "0.1.0"
uptime_seconds: float = 0.0

161
api/server.py Normal file
View File

@@ -0,0 +1,161 @@
"""FastAPI REST + SSE 서버.
Discord Bot과 Web UI 모두 이 API를 호출합니다.
Orchestrator는 인터페이스를 모릅니다.
"""
import asyncio
import time
import uuid
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from api.models import (
TaskRequest,
TaskResponse,
TaskProgress,
TaskStatus,
HealthResponse,
)
app = FastAPI(
title="Variet Agent API",
description="AI Agent Team — 작업 요청/관리 API",
version="0.1.0",
)
# In-memory task store
_tasks: dict[str, TaskResponse] = {}
_start_time = time.time()
@app.get("/health", response_model=HealthResponse)
async def health():
"""헬스체크."""
return HealthResponse(
uptime_seconds=round(time.time() - _start_time, 1),
)
@app.post("/tasks", response_model=TaskResponse)
async def create_task(req: TaskRequest):
"""작업 요청 → 백그라운드에서 TaskPipeline 실행."""
task_id = uuid.uuid4().hex[:8]
task_resp = TaskResponse(
task_id=task_id,
status=TaskStatus.PENDING,
request=req.request,
)
_tasks[task_id] = task_resp
# 백그라운드에서 파이프라인 실행
asyncio.create_task(_run_pipeline(task_id, req))
return task_resp
@app.get("/tasks/{task_id}", response_model=TaskResponse)
async def get_task(task_id: str):
"""작업 상태 조회."""
if task_id not in _tasks:
raise HTTPException(status_code=404, detail="Task not found")
return _tasks[task_id]
@app.get("/tasks/{task_id}/stream")
async def stream_task(task_id: str):
"""SSE 스트림으로 작업 진행 상황 전달."""
if task_id not in _tasks:
raise HTTPException(status_code=404, detail="Task not found")
async def event_generator():
last_progress_count = 0
while True:
task = _tasks[task_id]
# 새로운 진행 단계가 있으면 전송
if len(task.progress) > last_progress_count:
for p in task.progress[last_progress_count:]:
yield f"data: [{p.phase}] {p.message}\n\n"
last_progress_count = len(task.progress)
# 완료 또는 에러 시 종료
if task.status in (TaskStatus.DONE, TaskStatus.ERROR):
yield f"data: [완료] status={task.status.value}\n\n"
break
await asyncio.sleep(1)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
@app.get("/tasks")
async def list_tasks(limit: int = 20):
"""최근 작업 목록."""
tasks = sorted(
_tasks.values(),
key=lambda t: t.created_at,
reverse=True,
)
return tasks[:limit]
# === Internal ===
async def _run_pipeline(task_id: str, req: TaskRequest):
"""백그라운드에서 TaskPipeline을 실행합니다."""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
task = _tasks[task_id]
try:
from core.task_pipeline import TaskPipeline
from datetime import datetime
# Planning
task.status = TaskStatus.PLANNING
task.progress.append(TaskProgress(
phase="plan", message=f"작업 분석 중: {req.request[:100]}"
))
task.updated_at = datetime.now()
pipeline = TaskPipeline(
project_path=req.project,
)
pipeline.setup()
# Execute
task.status = TaskStatus.EXECUTING
task.progress.append(TaskProgress(
phase="execute", message="파이프라인 실행 중..."
))
task.updated_at = datetime.now()
result = await pipeline.execute(req.request)
# Done
task.status = TaskStatus.DONE
task.result = result
task.plan = result.get("plan")
task.progress.append(TaskProgress(
phase="done",
message=f"완료 — {len(result.get('tasks_completed', []))}개 태스크 처리",
))
task.updated_at = datetime.now()
except Exception as e:
task.status = TaskStatus.ERROR
task.error = str(e)
task.progress.append(TaskProgress(
phase="error", message=f"오류: {e}"
))
from datetime import datetime
task.updated_at = datetime.now()

36
config.py Normal file
View File

@@ -0,0 +1,36 @@
"""Variet Agent 설정 관리.
.env 파일에서 환경변수를 로드합니다.
"""
import os
from pathlib import Path
# .env 파일 수동 파싱 (python-dotenv 없이도 동작)
_env_path = Path(__file__).parent / ".env"
if _env_path.exists():
for line in _env_path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key, _, value = line.partition("=")
os.environ.setdefault(key.strip(), value.strip())
# === Discord ===
DISCORD_BOT_TOKEN: str = os.getenv("DISCORD_BOT_TOKEN", "")
DISCORD_COMMAND_PREFIX: str = os.getenv("DISCORD_COMMAND_PREFIX", "!")
# === API Server ===
API_HOST: str = os.getenv("API_HOST", "0.0.0.0")
API_PORT: int = int(os.getenv("API_PORT", "8000"))
# === Paths ===
PROJECT_ROOT: Path = Path(__file__).parent
SESSIONS_DIR: Path = PROJECT_ROOT / "sessions"
SESSIONS_DIR.mkdir(exist_ok=True)
# === Gemini ===
GEMINI_TOKEN_BUDGET: int = int(os.getenv("GEMINI_TOKEN_BUDGET", "50000"))
GEMINI_TIMEOUT: int = int(os.getenv("GEMINI_TIMEOUT", "180"))

80
main.py Normal file
View File

@@ -0,0 +1,80 @@
"""Variet Agent — 진입점.
FastAPI 서버 + Discord Bot을 동시 실행합니다.
"""
import asyncio
import logging
import sys
import signal
# config를 먼저 import → .env 로드
import config
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
datefmt="%H:%M:%S",
handlers=[logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger("variet")
async def run_api_server():
"""FastAPI 서버를 uvicorn으로 실행."""
import uvicorn
uvi_config = uvicorn.Config(
"api.server:app",
host=config.API_HOST,
port=config.API_PORT,
log_level="info",
reload=False,
)
server = uvicorn.Server(uvi_config)
await server.serve()
async def run_discord_bot():
"""Discord Bot 실행."""
from api.discord_bot import start_bot
await start_bot()
async def main():
"""API 서버 + Discord Bot 동시 실행."""
logger.info("=" * 50)
logger.info("Variet Agent 시작")
logger.info(f" API: http://{config.API_HOST}:{config.API_PORT}")
logger.info(f" Discord Bot: {'토큰 설정됨' if config.DISCORD_BOT_TOKEN else '⚠ 토큰 없음'}")
logger.info("=" * 50)
tasks = []
# API 서버
tasks.append(asyncio.create_task(run_api_server()))
# Discord Bot (토큰이 있을 때만)
if config.DISCORD_BOT_TOKEN:
tasks.append(asyncio.create_task(run_discord_bot()))
else:
logger.warning("DISCORD_BOT_TOKEN이 없습니다. Bot 없이 API만 실행합니다.")
try:
await asyncio.gather(*tasks)
except KeyboardInterrupt:
logger.info("종료 요청...")
except Exception as e:
logger.error(f"실행 오류: {e}")
finally:
# 정리
from api.discord_bot import stop_bot
await stop_bot()
logger.info("Variet Agent 종료")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass

6
requirements.txt Normal file
View File

@@ -0,0 +1,6 @@
# Variet Agent Dependencies
fastapi>=0.115.0
uvicorn>=0.30.0
discord.py>=2.4.0
pydantic>=2.0.0
httpx>=0.27.0