"""FastAPI REST + SSE 서버. Discord Bot과 Web UI 모두 이 API를 호출합니다. Orchestrator는 인터페이스를 모릅니다. """ import asyncio import time import uuid from fastapi import FastAPI, HTTPException from fastapi.responses import StreamingResponse from api.models import ( TaskRequest, TaskResponse, TaskProgress, TaskStatus, HealthResponse, ) app = FastAPI( title="Variet Agent API", description="AI Agent Team — 작업 요청/관리 API", version="0.1.0", ) # In-memory task store _tasks: dict[str, TaskResponse] = {} _start_time = time.time() @app.get("/health", response_model=HealthResponse) async def health(): """헬스체크.""" return HealthResponse( uptime_seconds=round(time.time() - _start_time, 1), ) @app.post("/tasks", response_model=TaskResponse) async def create_task(req: TaskRequest): """작업 요청 → 백그라운드에서 TaskPipeline 실행.""" task_id = uuid.uuid4().hex[:8] task_resp = TaskResponse( task_id=task_id, status=TaskStatus.PENDING, request=req.request, ) _tasks[task_id] = task_resp # 백그라운드에서 파이프라인 실행 asyncio.create_task(_run_pipeline(task_id, req)) return task_resp @app.get("/tasks/{task_id}", response_model=TaskResponse) async def get_task(task_id: str): """작업 상태 조회.""" if task_id not in _tasks: raise HTTPException(status_code=404, detail="Task not found") return _tasks[task_id] @app.get("/tasks/{task_id}/stream") async def stream_task(task_id: str): """SSE 스트림으로 작업 진행 상황 전달.""" if task_id not in _tasks: raise HTTPException(status_code=404, detail="Task not found") async def event_generator(): last_progress_count = 0 while True: task = _tasks[task_id] # 새로운 진행 단계가 있으면 전송 if len(task.progress) > last_progress_count: for p in task.progress[last_progress_count:]: yield f"data: [{p.phase}] {p.message}\n\n" last_progress_count = len(task.progress) # 완료 또는 에러 시 종료 if task.status in (TaskStatus.DONE, TaskStatus.ERROR): yield f"data: [완료] status={task.status.value}\n\n" break await asyncio.sleep(1) return StreamingResponse( event_generator(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, ) @app.get("/tasks") async def list_tasks(limit: int = 20): """최근 작업 목록.""" tasks = sorted( _tasks.values(), key=lambda t: t.created_at, reverse=True, ) return tasks[:limit] # === Internal === async def _run_pipeline(task_id: str, req: TaskRequest): """백그라운드에서 TaskPipeline을 실행합니다.""" import sys import os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) task = _tasks[task_id] try: from core.task_pipeline import TaskPipeline from datetime import datetime # Planning task.status = TaskStatus.PLANNING task.progress.append(TaskProgress( phase="plan", message=f"작업 분석 중: {req.request[:100]}" )) task.updated_at = datetime.now() pipeline = TaskPipeline( project_path=req.project, ) pipeline.setup() # Execute task.status = TaskStatus.EXECUTING task.progress.append(TaskProgress( phase="execute", message="파이프라인 실행 중..." )) task.updated_at = datetime.now() result = await pipeline.execute(req.request) # Done task.status = TaskStatus.DONE task.result = result task.plan = result.get("plan") task.progress.append(TaskProgress( phase="done", message=f"완료 — {len(result.get('tasks_completed', []))}개 태스크 처리", )) task.updated_at = datetime.now() except Exception as e: task.status = TaskStatus.ERROR task.error = str(e) task.progress.append(TaskProgress( phase="error", message=f"오류: {e}" )) from datetime import datetime task.updated_at = datetime.now()