r"""AI Foreman v0.1 — 목표 분해 + 상담 모드. /goal 명령어로 목표 입력 → Gemini가 작업 트리 분해 → 스레드에서 상담 → 확정 시 Vikunja에 일괄 등록. """ import asyncio import json import logging import uuid from dataclasses import dataclass, field from datetime import datetime from typing import Optional from core.gemini_caller import GeminiCaller, GeminiCallError logger = logging.getLogger("variet.core.foreman") @dataclass class TaskNode: """작업 트리 노드.""" id: str title: str description: str = "" children: list["TaskNode"] = field(default_factory=list) done: bool = False priority: int = 0 # 0=보통, 1=높음, -1=낮음 estimated_hours: float = 0 def to_display(self, indent: int = 0) -> str: """트리 표시용 문자열.""" prefix = " " * indent check = "✅" if self.done else "⬜" est = f" (~{self.estimated_hours}h)" if self.estimated_hours else "" line = f"{prefix}{check} **{self.title}**{est}" if self.description: line += f"\n{prefix} _{self.description}_" for child in self.children: line += "\n" + child.to_display(indent + 1) return line def to_flat_list(self, parent_title: str = "") -> list[dict]: """Vikunja 등록용 플랫 리스트.""" full_title = f"{parent_title} > {self.title}" if parent_title else self.title items = [{ "title": self.title, "description": self.description, "full_path": full_title, "priority": self.priority, }] for child in self.children: items.extend(child.to_flat_list(full_title)) return items @dataclass class ForemanSession: """Foreman 상담 세션.""" id: str goal: str thread_id: int user_id: int tasks: list[TaskNode] = field(default_factory=list) created_at: str = "" history: list[dict] = field(default_factory=list) # 상담 대화 기록 status: str = "drafting" # drafting | confirmed | cancelled def __post_init__(self): if not self.created_at: self.created_at = datetime.now().isoformat() class Foreman: """AI Foreman — 목표 분해 + 상담.""" # 활성 세션 (thread_id → session) _sessions: dict[int, ForemanSession] = {} def __init__(self): self.gemini = GeminiCaller() # ────────────────────────────────────── # 세션 관리 # ────────────────────────────────────── def get_session(self, thread_id: int) -> Optional[ForemanSession]: return self._sessions.get(thread_id) def create_session(self, goal: str, thread_id: int, user_id: int) -> ForemanSession: session = ForemanSession( id=str(uuid.uuid4())[:8], goal=goal, thread_id=thread_id, user_id=user_id, ) self._sessions[thread_id] = session logger.info(f"Foreman 세션 생성: {session.id} — '{goal[:50]}'") return session def close_session(self, thread_id: int): if thread_id in self._sessions: del self._sessions[thread_id] # ────────────────────────────────────── # 목표 분해 (초기) # ────────────────────────────────────── async def decompose_goal(self, session: ForemanSession) -> list[TaskNode]: """Gemini로 목표를 작업 트리로 분해.""" context = ( f"## 목표\n{session.goal}\n\n" f"위 목표를 실행 가능한 작업 트리로 분해하세요.\n" f"JSON 형태로 응답하세요.\n\n" f"형식:\n" f"```json\n" f'{{"tasks": [\n' f' {{"title": "...", "description": "...", "estimated_hours": 2, "children": [\n' f' {{"title": "하위작업", "description": "...", "estimated_hours": 1, "children": []}}\n' f" ]}}\n" f"]}}\n" f"```\n\n" f"규칙:\n" f"- 최대 3단계 깊이\n" f"- 각 작업은 구체적이고 실행 가능해야 합니다\n" f"- estimated_hours는 예상 소요 시간\n" f"- 한국어로 작성\n" ) response = await self.gemini.call("foreman", context, timeout=120) # JSON 추출 tasks = self._parse_task_tree(response) session.tasks = tasks session.history.append({"role": "system", "content": f"초기 분해: {len(tasks)}개 최상위 작업"}) return tasks # ────────────────────────────────────── # 상담 명령 처리 # ────────────────────────────────────── async def handle_command(self, session: ForemanSession, command: str, args: str) -> str: """스레드 내 상담 명령 처리. Args: command: 확정|수정|추가|삭제|현황 args: 명령 인자 Returns: 응답 메시지 """ session.history.append({"role": "user", "content": f"!{command} {args}"}) if command == "확정": session.status = "confirmed" return await self._confirm(session) elif command == "현황": return self._show_status(session) elif command in ("수정", "추가", "삭제"): # Gemini에게 수정 요청 위임 return await self._modify_tree(session, command, args) else: return f"알 수 없는 명령: `!{command}`\n사용 가능: `!확정`, `!수정`, `!추가`, `!삭제`, `!현황`" async def handle_freeform(self, session: ForemanSession, text: str) -> str: """자유 형식 메시지 처리 (명령어 없는 대화).""" session.history.append({"role": "user", "content": text}) # 현재 트리 컨텍스트 + 사용자 메시지를 Gemini에게 전달 tree_str = "\n".join(t.to_display() for t in session.tasks) history_str = "\n".join( f"[{h['role']}] {h['content'][:200]}" for h in session.history[-6:] ) context = ( f"## 현재 작업 트리\n{tree_str}\n\n" f"## 대화 기록\n{history_str}\n\n" f"## 사용자 메시지\n{text}\n\n" f"사용자의 피드백에 따라 작업 트리를 수정하고, " f"수정된 전체 트리를 JSON으로 응답하세요.\n" f"변경 사항을 간단히 설명하고, 마지막에 JSON을 포함하세요.\n" f"JSON 형식은 초기 분해와 동일합니다.\n" ) response = await self.gemini.call("foreman", context, timeout=120) # JSON이 포함되어 있으면 트리 업데이트 updated_tasks = self._parse_task_tree(response) if updated_tasks: session.tasks = updated_tasks session.history.append({"role": "assistant", "content": response[:500]}) return response # ────────────────────────────────────── # 내부 함수 # ────────────────────────────────────── def _show_status(self, session: ForemanSession) -> str: """현재 작업 트리 표시.""" if not session.tasks: return "📋 아직 작업이 없습니다." lines = [f"🎯 **목표:** {session.goal}\n"] for task in session.tasks: lines.append(task.to_display()) total = sum(len(t.to_flat_list()) for t in session.tasks) lines.append(f"\n---\n📊 총 {total}개 작업 | 상태: `{session.status}`") return "\n".join(lines) async def _modify_tree(self, session: ForemanSession, command: str, args: str) -> str: """Gemini에게 트리 수정 위임.""" tree_str = "\n".join(t.to_display() for t in session.tasks) context = ( f"## 현재 작업 트리\n{tree_str}\n\n" f"## 수정 요청\n명령: !{command}\n내용: {args}\n\n" f"수정된 전체 트리를 JSON으로 응답하세요.\n" f"변경 사항을 간단히 설명하세요.\n" ) response = await self.gemini.call("foreman", context, timeout=120) updated_tasks = self._parse_task_tree(response) if updated_tasks: session.tasks = updated_tasks session.history.append({"role": "assistant", "content": response[:500]}) return response async def _confirm(self, session: ForemanSession) -> str: """작업 트리 확정 → Vikunja 등록.""" if not session.tasks: return "❌ 등록할 작업이 없습니다." # Vikunja 등록 try: from integrations.vikunja_client import VikunjaClient vik = VikunjaClient() flat_tasks = [] for task in session.tasks: flat_tasks.extend(task.to_flat_list()) created = 0 for t in flat_tasks: await vik.create_task(t["title"], t.get("description", "")) created += 1 logger.info(f"Foreman 확정: {created}개 작업 Vikunja 등록") return f"✅ **{created}개 작업**이 Vikunja에 등록되었습니다!\n\n" + self._show_status(session) except Exception as e: logger.error(f"Vikunja 등록 실패: {e}") return f"⚠️ Vikunja 등록 중 오류: {str(e)[:200]}\n\n작업 트리는 유지됩니다." @staticmethod def _parse_task_tree(response: str) -> list[TaskNode]: """Gemini 응답에서 JSON 작업 트리 파싱.""" # JSON 블록 추출 json_match = None # ```json ... ``` 블록 m = __import__("re").search(r"```json\s*\n(.+?)```", response, __import__("re").DOTALL) if m: json_match = m.group(1) else: # { ... } 직접 m = __import__("re").search(r"\{[\s\S]*\"tasks\"[\s\S]*\}", response) if m: json_match = m.group(0) if not json_match: return [] try: data = json.loads(json_match) tasks_data = data.get("tasks", []) return [Foreman._dict_to_task(t) for t in tasks_data] except (json.JSONDecodeError, KeyError) as e: logger.warning(f"작업 트리 파싱 실패: {e}") return [] @staticmethod def _dict_to_task(d: dict) -> TaskNode: """dict → TaskNode 재귀 변환.""" children = [Foreman._dict_to_task(c) for c in d.get("children", [])] return TaskNode( id=str(uuid.uuid4())[:8], title=d.get("title", ""), description=d.get("description", ""), estimated_hours=d.get("estimated_hours", 0), children=children, )