diff --git a/.agent/references/known-issues.md b/.agent/references/known-issues.md
index f879ec6..151e2d9 100644
--- a/.agent/references/known-issues.md
+++ b/.agent/references/known-issues.md
@@ -93,3 +93,16 @@
- **원인**: `workspaces.json`의 path가 다른 PC의 사용자 경로(`c:\Users\Certes\...`)로 하드코딩
- **해결**: 현재 머신의 사용자 경로(`c:\Users\Variet-Worker\...`)로 수정
- **주의**: 멀티 환경 배포 시 workspaces.json의 절대 경로가 환경별로 다를 수 있음. 상대 경로 또는 환경변수 사용 고려
+
+### [2026-03-18] config.py — .env 값의 따옴표가 값에 포함됨
+- **증상**: `.env`에 `KEY="val,ue"` 형식으로 쓰면 비밀번호에 `"` 따옴표가 포함되어 인증 실패
+- **원인**: `config.py`의 수동 `.env` 파서가 `value.strip()`만 하고 따옴표를 제거하지 않음
+- **해결**: 양쪽 따옴표(`"..."` 또는 `'...'`) 감지 후 제거하는 로직 추가
+- **주의**: `.env`에 특수문자(`,`, `&`, `#` 등) 포함 비밀번호는 반드시 따옴표로 감싸야 함
+
+### [2026-03-18] Mailcow IMAP — 앱 비밀번호 LOGIN 커맨드 거부
+- **증상**: `imaplib.login()` 호출 시 `AUTHENTICATIONFAILED` — 비밀번호 맞는데도 실패
+- **원인**: Mailcow 앱 비밀번호는 IMAP `LOGIN` 커맨드를 거부하고 `PLAIN` 인증만 지원
+- **해결**: `conn.authenticate("PLAIN", lambda x: ("\0" + user + "\0" + pw).encode())` 사용
+- **주의**: 일반 계정 비밀번호는 `LOGIN`도 가능하지만, 앱 비밀번호는 반드시 `PLAIN` auth 필요
+
diff --git a/api/discord_bot.py b/api/discord_bot.py
index 79f6ff5..05822c9 100644
--- a/api/discord_bot.py
+++ b/api/discord_bot.py
@@ -19,9 +19,17 @@ from discord.ext import commands
import config
from core.workspace import WorkspaceManager
from core.gemini_caller import GeminiCaller, GeminiCallError
+from core.foreman import Foreman
+from handlers.nc_handler import NCHandler
logger = logging.getLogger("variet.discord")
+# Nextcloud 도구 핸들러
+_nc_handler = NCHandler()
+
+# AI Foreman (목표 분해)
+_foreman = Foreman()
+
EMBED_DESC_LIMIT = 4096
EMBED_FIELD_LIMIT = 1024
@@ -263,7 +271,48 @@ async def on_message(message: discord.Message):
if not user_text:
return
- # 취소 명령어 확인
+ # ──────────────────────────────────────
+ # Foreman 세션 스레드 확인
+ # ──────────────────────────────────────
+ foreman_session = _foreman.get_session(message.channel.id)
+ if foreman_session:
+ async def _foreman_reply():
+ try:
+ async with message.channel.typing():
+ # ! 명령어 처리
+ if user_text.startswith("!"):
+ parts = user_text[1:].split(maxsplit=1)
+ command = parts[0] if parts else ""
+ args = parts[1] if len(parts) > 1 else ""
+ response = await _foreman.handle_command(
+ foreman_session, command, args,
+ )
+ else:
+ # 자유 형식 대화
+ response = await _foreman.handle_freeform(
+ foreman_session, user_text,
+ )
+
+ if response:
+ if len(response) <= 2000:
+ await message.reply(response)
+ else:
+ for i in range(0, len(response), 4000):
+ embed = discord.Embed(
+ description=response[i:i + 4000],
+ color=0x9B59B6,
+ )
+ await message.channel.send(embed=embed)
+ except Exception as e:
+ logger.error(f"Foreman 오류: {e}", exc_info=True)
+ await message.reply(f"⚠️ 오류: {str(e)[:200]}")
+
+ asyncio.create_task(_foreman_reply())
+ return
+
+ # ──────────────────────────────────────
+ # 취소 명령어 확인
+ # ──────────────────────────────────────
cancel_keywords = {"취소", "stop", "cancel", "중지", "멈춰"}
if user_text.lower() in cancel_keywords:
channel_id = message.channel.id
@@ -975,6 +1024,63 @@ async def anime_status(interaction: discord.Interaction):
bot.tree.add_command(anime_group)
+# ──────────────────────────────────────────────
+# /goal 커맨드 — AI Foreman 목표 분해
+# ──────────────────────────────────────────────
+
+@bot.tree.command(name="goal", description="목표를 입력하면 AI가 작업 트리로 분해합니다")
+@app_commands.describe(goal="달성할 목표 (자연어)")
+async def goal_command(interaction: discord.Interaction, goal: str):
+ """Foreman 상담 모드 시작."""
+ await interaction.response.defer()
+
+ # 스레드 생성
+ thread_name = f"🎯 {goal[:80]}"
+ thread = await interaction.channel.create_thread(
+ name=thread_name,
+ type=discord.ChannelType.public_thread,
+ auto_archive_duration=1440,
+ )
+
+ # 세션 생성
+ session = _foreman.create_session(goal, thread.id, interaction.user.id)
+
+ # 시작 메시지
+ start_embed = discord.Embed(
+ title="🎯 AI Foreman — 목표 분해",
+ description=(
+ f"**목표:** {goal}\n\n"
+ f"작업 트리를 생성 중... ⏳"
+ ),
+ color=0x9B59B6,
+ )
+ await thread.send(embed=start_embed)
+ await interaction.followup.send(f"✅ 상담 스레드가 생성되었습니다: <#{thread.id}>", ephemeral=True)
+
+ # Gemini로 목표 분해
+ try:
+ tasks = await _foreman.decompose_goal(session)
+ if tasks:
+ tree_display = "\n".join(t.to_display() for t in tasks)
+ total = sum(len(t.to_flat_list()) for t in tasks)
+ result_embed = discord.Embed(
+ title="📋 작업 트리 (초안)",
+ description=tree_display[:4000],
+ color=0x2ECC71,
+ )
+ result_embed.set_footer(
+ text=f"총 {total}개 작업 | !확정 !수정 !추가 !삭제 !현황"
+ )
+ await thread.send(embed=result_embed)
+ else:
+ await thread.send("⚠️ 작업 분해에 실패했습니다. 목표를 더 구체적으로 입력해주세요.")
+ except GeminiCallError as e:
+ await thread.send(f"⚠️ AI 호출 오류: {str(e)[:300]}")
+ except Exception as e:
+ logger.error(f"Foreman 분해 오류: {e}", exc_info=True)
+ await thread.send(f"❌ 오류: {str(e)[:200]}")
+
+
# ──────────────────────────────────────────────
# 기존 ! 명령어 (유지, 하위호환)
# ──────────────────────────────────────────────
diff --git a/config.py b/config.py
index 3b1bd38..b52f5ea 100644
--- a/config.py
+++ b/config.py
@@ -15,7 +15,11 @@ if _env_path.exists():
continue
if "=" in line:
key, _, value = line.partition("=")
- os.environ.setdefault(key.strip(), value.strip())
+ value = value.strip()
+ # 양쪽 따옴표 제거 (".." 또는 '..')
+ if len(value) >= 2 and value[0] == value[-1] and value[0] in ('"', "'"):
+ value = value[1:-1]
+ os.environ.setdefault(key.strip(), value)
# === Discord ===
@@ -64,3 +68,14 @@ NAS_ANIME_PATH: str = os.getenv(
WIKI_URL: str = os.getenv("WIKI_URL", "https://wiki.variet.net")
WIKI_API_KEY: str = os.getenv("WIKI_API_KEY", "")
+# === Nextcloud ===
+NEXTCLOUD_URL: str = os.getenv("NEXTCLOUD_URL", "https://cloud.variet.net")
+NEXTCLOUD_USER: str = os.getenv("NEXTCLOUD_USER", "")
+NEXTCLOUD_APP_PASSWORD: str = os.getenv("NEXTCLOUD_APP_PASSWORD", "")
+
+# === Mail (IMAP) ===
+MAIL_IMAP_HOST: str = os.getenv("MAIL_IMAP_HOST", "mail.variet.net")
+MAIL_IMAP_PORT: int = int(os.getenv("MAIL_IMAP_PORT", "993"))
+MAIL_USER: str = os.getenv("MAIL_USER", "")
+MAIL_PASSWORD: str = os.getenv("MAIL_PASSWORD", "")
+
diff --git a/core/foreman.py b/core/foreman.py
new file mode 100644
index 0000000..4b44990
--- /dev/null
+++ b/core/foreman.py
@@ -0,0 +1,294 @@
+r"""AI Foreman v0.1 — 목표 분해 + 상담 모드.
+
+/goal 명령어로 목표 입력 → Gemini가 작업 트리 분해 → 스레드에서 상담 →
+확정 시 Vikunja에 일괄 등록.
+"""
+
+import asyncio
+import json
+import logging
+import uuid
+from dataclasses import dataclass, field
+from datetime import datetime
+from typing import Optional
+
+from core.gemini_caller import GeminiCaller, GeminiCallError
+
+logger = logging.getLogger("variet.core.foreman")
+
+
+@dataclass
+class TaskNode:
+ """작업 트리 노드."""
+ id: str
+ title: str
+ description: str = ""
+ children: list["TaskNode"] = field(default_factory=list)
+ done: bool = False
+ priority: int = 0 # 0=보통, 1=높음, -1=낮음
+ estimated_hours: float = 0
+
+ def to_display(self, indent: int = 0) -> str:
+ """트리 표시용 문자열."""
+ prefix = " " * indent
+ check = "✅" if self.done else "⬜"
+ est = f" (~{self.estimated_hours}h)" if self.estimated_hours else ""
+ line = f"{prefix}{check} **{self.title}**{est}"
+ if self.description:
+ line += f"\n{prefix} _{self.description}_"
+ for child in self.children:
+ line += "\n" + child.to_display(indent + 1)
+ return line
+
+ def to_flat_list(self, parent_title: str = "") -> list[dict]:
+ """Vikunja 등록용 플랫 리스트."""
+ full_title = f"{parent_title} > {self.title}" if parent_title else self.title
+ items = [{
+ "title": self.title,
+ "description": self.description,
+ "full_path": full_title,
+ "priority": self.priority,
+ }]
+ for child in self.children:
+ items.extend(child.to_flat_list(full_title))
+ return items
+
+
+@dataclass
+class ForemanSession:
+ """Foreman 상담 세션."""
+ id: str
+ goal: str
+ thread_id: int
+ user_id: int
+ tasks: list[TaskNode] = field(default_factory=list)
+ created_at: str = ""
+ history: list[dict] = field(default_factory=list) # 상담 대화 기록
+ status: str = "drafting" # drafting | confirmed | cancelled
+
+ def __post_init__(self):
+ if not self.created_at:
+ self.created_at = datetime.now().isoformat()
+
+
+class Foreman:
+ """AI Foreman — 목표 분해 + 상담."""
+
+ # 활성 세션 (thread_id → session)
+ _sessions: dict[int, ForemanSession] = {}
+
+ def __init__(self):
+ self.gemini = GeminiCaller()
+
+ # ──────────────────────────────────────
+ # 세션 관리
+ # ──────────────────────────────────────
+
+ def get_session(self, thread_id: int) -> Optional[ForemanSession]:
+ return self._sessions.get(thread_id)
+
+ def create_session(self, goal: str, thread_id: int, user_id: int) -> ForemanSession:
+ session = ForemanSession(
+ id=str(uuid.uuid4())[:8],
+ goal=goal,
+ thread_id=thread_id,
+ user_id=user_id,
+ )
+ self._sessions[thread_id] = session
+ logger.info(f"Foreman 세션 생성: {session.id} — '{goal[:50]}'")
+ return session
+
+ def close_session(self, thread_id: int):
+ if thread_id in self._sessions:
+ del self._sessions[thread_id]
+
+ # ──────────────────────────────────────
+ # 목표 분해 (초기)
+ # ──────────────────────────────────────
+
+ async def decompose_goal(self, session: ForemanSession) -> list[TaskNode]:
+ """Gemini로 목표를 작업 트리로 분해."""
+ context = (
+ f"## 목표\n{session.goal}\n\n"
+ f"위 목표를 실행 가능한 작업 트리로 분해하세요.\n"
+ f"JSON 형태로 응답하세요.\n\n"
+ f"형식:\n"
+ f"```json\n"
+ f'{{"tasks": [\n'
+ f' {{"title": "...", "description": "...", "estimated_hours": 2, "children": [\n'
+ f' {{"title": "하위작업", "description": "...", "estimated_hours": 1, "children": []}}\n'
+ f" ]}}\n"
+ f"]}}\n"
+ f"```\n\n"
+ f"규칙:\n"
+ f"- 최대 3단계 깊이\n"
+ f"- 각 작업은 구체적이고 실행 가능해야 합니다\n"
+ f"- estimated_hours는 예상 소요 시간\n"
+ f"- 한국어로 작성\n"
+ )
+
+ response = await self.gemini.call("foreman", context, timeout=120)
+
+ # JSON 추출
+ tasks = self._parse_task_tree(response)
+ session.tasks = tasks
+ session.history.append({"role": "system", "content": f"초기 분해: {len(tasks)}개 최상위 작업"})
+ return tasks
+
+ # ──────────────────────────────────────
+ # 상담 명령 처리
+ # ──────────────────────────────────────
+
+ async def handle_command(self, session: ForemanSession, command: str, args: str) -> str:
+ """스레드 내 상담 명령 처리.
+
+ Args:
+ command: 확정|수정|추가|삭제|현황
+ args: 명령 인자
+
+ Returns:
+ 응답 메시지
+ """
+ session.history.append({"role": "user", "content": f"!{command} {args}"})
+
+ if command == "확정":
+ session.status = "confirmed"
+ return await self._confirm(session)
+
+ elif command == "현황":
+ return self._show_status(session)
+
+ elif command in ("수정", "추가", "삭제"):
+ # Gemini에게 수정 요청 위임
+ return await self._modify_tree(session, command, args)
+
+ else:
+ return f"알 수 없는 명령: `!{command}`\n사용 가능: `!확정`, `!수정`, `!추가`, `!삭제`, `!현황`"
+
+ async def handle_freeform(self, session: ForemanSession, text: str) -> str:
+ """자유 형식 메시지 처리 (명령어 없는 대화)."""
+ session.history.append({"role": "user", "content": text})
+
+ # 현재 트리 컨텍스트 + 사용자 메시지를 Gemini에게 전달
+ tree_str = "\n".join(t.to_display() for t in session.tasks)
+ history_str = "\n".join(
+ f"[{h['role']}] {h['content'][:200]}" for h in session.history[-6:]
+ )
+
+ context = (
+ f"## 현재 작업 트리\n{tree_str}\n\n"
+ f"## 대화 기록\n{history_str}\n\n"
+ f"## 사용자 메시지\n{text}\n\n"
+ f"사용자의 피드백에 따라 작업 트리를 수정하고, "
+ f"수정된 전체 트리를 JSON으로 응답하세요.\n"
+ f"변경 사항을 간단히 설명하고, 마지막에 JSON을 포함하세요.\n"
+ f"JSON 형식은 초기 분해와 동일합니다.\n"
+ )
+
+ response = await self.gemini.call("foreman", context, timeout=120)
+
+ # JSON이 포함되어 있으면 트리 업데이트
+ updated_tasks = self._parse_task_tree(response)
+ if updated_tasks:
+ session.tasks = updated_tasks
+
+ session.history.append({"role": "assistant", "content": response[:500]})
+ return response
+
+ # ──────────────────────────────────────
+ # 내부 함수
+ # ──────────────────────────────────────
+
+ def _show_status(self, session: ForemanSession) -> str:
+ """현재 작업 트리 표시."""
+ if not session.tasks:
+ return "📋 아직 작업이 없습니다."
+
+ lines = [f"🎯 **목표:** {session.goal}\n"]
+ for task in session.tasks:
+ lines.append(task.to_display())
+ total = sum(len(t.to_flat_list()) for t in session.tasks)
+ lines.append(f"\n---\n📊 총 {total}개 작업 | 상태: `{session.status}`")
+ return "\n".join(lines)
+
+ async def _modify_tree(self, session: ForemanSession, command: str, args: str) -> str:
+ """Gemini에게 트리 수정 위임."""
+ tree_str = "\n".join(t.to_display() for t in session.tasks)
+
+ context = (
+ f"## 현재 작업 트리\n{tree_str}\n\n"
+ f"## 수정 요청\n명령: !{command}\n내용: {args}\n\n"
+ f"수정된 전체 트리를 JSON으로 응답하세요.\n"
+ f"변경 사항을 간단히 설명하세요.\n"
+ )
+
+ response = await self.gemini.call("foreman", context, timeout=120)
+ updated_tasks = self._parse_task_tree(response)
+ if updated_tasks:
+ session.tasks = updated_tasks
+
+ session.history.append({"role": "assistant", "content": response[:500]})
+ return response
+
+ async def _confirm(self, session: ForemanSession) -> str:
+ """작업 트리 확정 → Vikunja 등록."""
+ if not session.tasks:
+ return "❌ 등록할 작업이 없습니다."
+
+ # Vikunja 등록
+ try:
+ from integrations.vikunja_client import VikunjaClient
+ vik = VikunjaClient()
+ flat_tasks = []
+ for task in session.tasks:
+ flat_tasks.extend(task.to_flat_list())
+
+ created = 0
+ for t in flat_tasks:
+ await vik.create_task(t["title"], t.get("description", ""))
+ created += 1
+
+ logger.info(f"Foreman 확정: {created}개 작업 Vikunja 등록")
+ return f"✅ **{created}개 작업**이 Vikunja에 등록되었습니다!\n\n" + self._show_status(session)
+
+ except Exception as e:
+ logger.error(f"Vikunja 등록 실패: {e}")
+ return f"⚠️ Vikunja 등록 중 오류: {str(e)[:200]}\n\n작업 트리는 유지됩니다."
+
+ @staticmethod
+ def _parse_task_tree(response: str) -> list[TaskNode]:
+ """Gemini 응답에서 JSON 작업 트리 파싱."""
+ # JSON 블록 추출
+ json_match = None
+ # ```json ... ``` 블록
+ m = __import__("re").search(r"```json\s*\n(.+?)```", response, __import__("re").DOTALL)
+ if m:
+ json_match = m.group(1)
+ else:
+ # { ... } 직접
+ m = __import__("re").search(r"\{[\s\S]*\"tasks\"[\s\S]*\}", response)
+ if m:
+ json_match = m.group(0)
+
+ if not json_match:
+ return []
+
+ try:
+ data = json.loads(json_match)
+ tasks_data = data.get("tasks", [])
+ return [Foreman._dict_to_task(t) for t in tasks_data]
+ except (json.JSONDecodeError, KeyError) as e:
+ logger.warning(f"작업 트리 파싱 실패: {e}")
+ return []
+
+ @staticmethod
+ def _dict_to_task(d: dict) -> TaskNode:
+ """dict → TaskNode 재귀 변환."""
+ children = [Foreman._dict_to_task(c) for c in d.get("children", [])]
+ return TaskNode(
+ id=str(uuid.uuid4())[:8],
+ title=d.get("title", ""),
+ description=d.get("description", ""),
+ estimated_hours=d.get("estimated_hours", 0),
+ children=children,
+ )
diff --git a/core/gemini_caller.py b/core/gemini_caller.py
index bbbc1c7..2de3edc 100644
--- a/core/gemini_caller.py
+++ b/core/gemini_caller.py
@@ -25,6 +25,7 @@ ROLE_THINKING: dict[str, int] = {
"agent": 4096,
"summarizer": 512,
"planner": 4096,
+ "foreman": 4096,
"coder": 8192,
"reviewer": 8192,
}
diff --git a/docs/devlog/2026-03-18.md b/docs/devlog/2026-03-18.md
new file mode 100644
index 0000000..dd4bb32
--- /dev/null
+++ b/docs/devlog/2026-03-18.md
@@ -0,0 +1,8 @@
+| # | 시간 | 작업 | 커밋 | 상태 |
+|----|------|------|------|------|
+| 001 | 16:10 | Nextcloud 모듈 연결 테스트 (Files ✅ Calendar ✅ Mail ✅) | - | ✅ |
+| 002 | 16:50 | config.py .env 따옴표 파싱 버그 수정 | - | ✅ |
+| 003 | 16:55 | nc_mail.py PLAIN 인증으로 변경 (Mailcow 앱 비밀번호 호환) | - | ✅ |
+| 004 | 17:10 | NC 도구 핸들러 (`handlers/nc_handler.py`) 구현 | - | ✅ |
+| 005 | 17:13 | AI Foreman v0.1 (`core/foreman.py` + `/goal` 커맨드) | - | ✅ |
+| 006 | 17:15 | unified.md에 nextcloud 모드 추가 + discord_bot.py 통합 | - | ✅ |
diff --git a/handlers/nc_handler.py b/handlers/nc_handler.py
new file mode 100644
index 0000000..7a6526e
--- /dev/null
+++ b/handlers/nc_handler.py
@@ -0,0 +1,301 @@
+r"""Nextcloud 도구 핸들러 — 자연어 → NC 도구 자동 라우팅.
+
+Gemini unified prompt의 'nextcloud' 모드 분류 결과를 받아
+적절한 NC 모듈 함수를 호출하고 Discord embed로 응답.
+"""
+
+import logging
+import re
+from datetime import datetime, timedelta
+from typing import Optional
+
+import discord
+
+from tools.nc_files import NCFilesClient
+from tools.nc_calendar import NCCalendarClient
+from tools.nc_mail import NCMailClient
+
+logger = logging.getLogger("variet.handlers.nc")
+
+
+class NCHandler:
+ """Nextcloud 도구 라우터."""
+
+ def __init__(self):
+ self.files = NCFilesClient()
+ self.calendar = NCCalendarClient()
+ self.mail = NCMailClient()
+ # Contacts는 EasyOCR 무거움 → 필요 시 지연 로드
+ self._contacts = None
+
+ @property
+ def contacts(self):
+ if self._contacts is None:
+ from tools.nc_contacts import NCContactsClient
+ self._contacts = NCContactsClient()
+ return self._contacts
+
+ # ──────────────────────────────────────
+ # 메인 라우터
+ # ──────────────────────────────────────
+
+ async def handle(self, action: dict, channel) -> None:
+ """unified prompt 결과를 받아 NC 도구 실행 + 응답.
+
+ Args:
+ action: {"tool": "files|calendar|mail|contacts", "op": ..., "params": {...}}
+ channel: Discord 채널/스레드
+ """
+ tool = action.get("tool", "")
+ op = action.get("op", "")
+ params = action.get("params", {})
+
+ try:
+ if tool == "files":
+ await self._handle_files(op, params, channel)
+ elif tool == "calendar":
+ await self._handle_calendar(op, params, channel)
+ elif tool == "mail":
+ await self._handle_mail(op, params, channel)
+ elif tool == "contacts":
+ await self._handle_contacts(op, params, channel)
+ else:
+ await channel.send(
+ embed=discord.Embed(
+ title="❌ 알 수 없는 도구",
+ description=f"tool: `{tool}`은 지원하지 않습니다.",
+ color=0xE74C3C,
+ )
+ )
+ except Exception as e:
+ logger.error(f"NC 핸들러 오류: {tool}.{op} — {e}", exc_info=True)
+ await channel.send(
+ embed=discord.Embed(
+ title="❌ 오류 발생",
+ description=f"`{tool}.{op}` 실행 중 오류:\n```{str(e)[:300]}```",
+ color=0xE74C3C,
+ )
+ )
+
+ # ──────────────────────────────────────
+ # Files
+ # ──────────────────────────────────────
+
+ async def _handle_files(self, op: str, params: dict, channel):
+ if op == "search":
+ query = params.get("query", "")
+ files = await self.files.search(query)
+ if not files:
+ await channel.send(f"🔍 `{query}` 검색 결과가 없습니다.")
+ return
+ lines = [f"{i}. {f.icon} **{f.name}** ({f.size_human})\n 📂 `{f.path}`"
+ for i, f in enumerate(files[:15], 1)]
+ embed = discord.Embed(
+ title=f"🔍 파일 검색: {query}",
+ description="\n".join(lines),
+ color=0x3498DB,
+ )
+ embed.set_footer(text=f"{len(files)}건 중 최대 15건 표시")
+ await channel.send(embed=embed)
+
+ elif op == "list":
+ path = params.get("path", "")
+ files = await self.files.list_dir(path)
+ lines = [f"{f.icon} **{f.name}**" + (f" ({f.size_human})" if not f.is_dir else "")
+ for f in files[:20]]
+ embed = discord.Embed(
+ title=f"📂 /{path or '(루트)'}",
+ description="\n".join(lines) or "빈 디렉토리",
+ color=0x3498DB,
+ )
+ await channel.send(embed=embed)
+
+ elif op == "link":
+ path = params.get("path", "")
+ url = await self.files.create_link(path)
+ if url:
+ await channel.send(f"🔗 공유 링크: {url}")
+ else:
+ await channel.send("❌ 공유 링크 생성 실패")
+
+ elif op == "recent":
+ limit = params.get("limit", 10)
+ files = await self.files.list_recent(limit)
+ lines = [f"{i}. {f.icon} **{f.name}** ({f.size_human})\n 📅 {f.lastmod}"
+ for i, f in enumerate(files, 1)]
+ embed = discord.Embed(
+ title="🕐 최근 파일",
+ description="\n".join(lines) or "파일 없음",
+ color=0x3498DB,
+ )
+ await channel.send(embed=embed)
+
+ # ──────────────────────────────────────
+ # Calendar
+ # ──────────────────────────────────────
+
+ async def _handle_calendar(self, op: str, params: dict, channel):
+ if op == "today":
+ events = await self.calendar.get_today()
+ embed = discord.Embed(
+ title=f"📅 오늘 일정 ({datetime.now().strftime('%Y-%m-%d')})",
+ description="\n".join(e.to_display() for e in events) or "일정 없음 ☀️",
+ color=0x2ECC71,
+ )
+ await channel.send(embed=embed)
+
+ elif op == "week":
+ events = await self.calendar.get_week()
+ lines = []
+ current_date = ""
+ for e in events:
+ if e.date_str != current_date:
+ current_date = e.date_str
+ lines.append(f"\n**📆 {current_date}**")
+ lines.append(f" {e.to_display()}")
+ embed = discord.Embed(
+ title="📅 이번주 일정",
+ description="\n".join(lines) or "일정 없음 ☀️",
+ color=0x2ECC71,
+ )
+ await channel.send(embed=embed)
+
+ elif op == "add":
+ summary = params.get("summary", "")
+ date_str = params.get("date", "")
+ time_str = params.get("time", "09:00")
+ duration = params.get("duration", "1h")
+ location = params.get("location", "")
+
+ dtstart = datetime.strptime(f"{date_str} {time_str}", "%Y-%m-%d %H:%M")
+ dur_match = re.match(r"(\d+)([hm])", duration)
+ if dur_match:
+ val, unit = int(dur_match.group(1)), dur_match.group(2)
+ delta = timedelta(hours=val) if unit == "h" else timedelta(minutes=val)
+ else:
+ delta = timedelta(hours=1)
+ dtend = dtstart + delta
+
+ event = await self.calendar.create_event(
+ summary, dtstart, dtend, location=location,
+ )
+ embed = discord.Embed(
+ title="✅ 일정 생성",
+ description=event.to_display(),
+ color=0x2ECC71,
+ )
+ await channel.send(embed=embed)
+
+ elif op == "delete":
+ uid = params.get("uid", "")
+ ok = await self.calendar.delete_event(uid)
+ if ok:
+ await channel.send(f"✅ 일정 삭제 완료: `{uid}`")
+ else:
+ await channel.send(f"❌ 삭제 실패: `{uid}`")
+
+ elif op == "list_calendars":
+ cals = await self.calendar.list_calendars()
+ embed = discord.Embed(
+ title="📅 캘린더 목록",
+ description="\n".join(f"• {c}" for c in cals),
+ color=0x2ECC71,
+ )
+ await channel.send(embed=embed)
+
+ # ──────────────────────────────────────
+ # Mail
+ # ──────────────────────────────────────
+
+ async def _handle_mail(self, op: str, params: dict, channel):
+ if op == "unread":
+ limit = params.get("limit", 5)
+ messages = await self.mail.get_unread(limit)
+ if not messages:
+ await channel.send("📭 미확인 메일이 없습니다.")
+ return
+ lines = []
+ for i, m in enumerate(messages, 1):
+ lines.append(
+ f"{i}. 📬 **{m.sender_name}**\n"
+ f" 제목: {m.subject}\n"
+ f" 📅 {m.date}"
+ )
+ embed = discord.Embed(
+ title=f"📬 미확인 메일 {len(messages)}건",
+ description="\n\n".join(lines),
+ color=0xE67E22,
+ )
+ await channel.send(embed=embed)
+
+ elif op == "search":
+ query = params.get("query", "")
+ messages = await self.mail.search(query)
+ if not messages:
+ await channel.send(f"🔍 `{query}` 검색 결과가 없습니다.")
+ return
+ lines = [f"{i}. {'📬' if not m.is_read else '📭'} {m.sender_name} — {m.subject}"
+ for i, m in enumerate(messages[:10], 1)]
+ embed = discord.Embed(
+ title=f"🔍 메일 검색: {query}",
+ description="\n".join(lines),
+ color=0xE67E22,
+ )
+ await channel.send(embed=embed)
+
+ elif op == "get":
+ uid = params.get("uid", "")
+ msg = await self.mail.get_message(uid)
+ if msg:
+ body_preview = msg.body[:1500].replace("```", "")
+ embed = discord.Embed(
+ title=f"📧 {msg.subject}",
+ description=f"**From:** {msg.sender}\n**Date:** {msg.date}\n\n{body_preview}",
+ color=0xE67E22,
+ )
+ await channel.send(embed=embed)
+ else:
+ await channel.send(f"❌ UID `{uid}` 메일을 찾을 수 없습니다.")
+
+ # ──────────────────────────────────────
+ # Contacts
+ # ──────────────────────────────────────
+
+ async def _handle_contacts(self, op: str, params: dict, channel):
+ if op == "search":
+ query = params.get("query", "")
+ contacts = await self.contacts.search_contacts(query)
+ if not contacts:
+ await channel.send(f"🔍 `{query}` 연락처를 찾을 수 없습니다.")
+ return
+ lines = [c.to_display() for c in contacts[:10]]
+ embed = discord.Embed(
+ title=f"🔍 연락처 검색: {query}",
+ description="\n\n".join(lines),
+ color=0x9B59B6,
+ )
+ await channel.send(embed=embed)
+
+ elif op == "list":
+ contacts = await self.contacts.list_contacts()
+ lines = [f"👤 **{c.name}** — {', '.join(c.phone) or '전화 없음'}"
+ for c in contacts[:20]]
+ embed = discord.Embed(
+ title=f"📇 연락처 {len(contacts)}건",
+ description="\n".join(lines) or "연락처 없음",
+ color=0x9B59B6,
+ )
+ await channel.send(embed=embed)
+
+ elif op == "scan":
+ image_url = params.get("image_url", "")
+ if not image_url:
+ await channel.send("❌ 스캔할 이미지가 없습니다.")
+ return
+ contact = await self.contacts.scan_card(image_url)
+ embed = discord.Embed(
+ title="📇 명함 스캔 결과",
+ description=contact.to_display(),
+ color=0x9B59B6,
+ )
+ await channel.send(embed=embed)
diff --git a/prompts/foreman.md b/prompts/foreman.md
new file mode 100644
index 0000000..4c7fee1
--- /dev/null
+++ b/prompts/foreman.md
@@ -0,0 +1,49 @@
+# Foreman — AI 프로젝트 매니저
+
+당신은 **Foreman (포어맨)**입니다. 사용자의 목표를 실행 가능한 작업 트리로 분해하는 전문가입니다.
+
+## 역할
+- 사용자의 고수준 목표를 구체적인 작업 트리로 분해
+- 기술적 구현 세부사항까지 포함
+- 각 작업의 소요시간 추정
+- 사용자 피드백 반영하여 트리 수정
+
+## 작업 분해 원칙
+1. **구체적**: 각 작업은 1명이 1~4시간 내에 완료 가능해야 합니다
+2. **실행 가능**: "좋은 코드 작성" 같은 추상적 작업 금지
+3. **계층적**: 최대 3단계 (대분류 → 중분류 → 소작업)
+4. **의존성 순서**: 선행 작업이 앞에 오도록 배치
+5. **누락 방지**: 테스트, 문서화, 배포도 포함
+
+## 출력 형식
+반드시 JSON으로 응답하세요. 설명 텍스트 후 JSON 블록을 포함하세요.
+
+```json
+{
+ "tasks": [
+ {
+ "title": "대분류 작업명",
+ "description": "구체적 설명",
+ "estimated_hours": 4,
+ "children": [
+ {
+ "title": "하위 작업",
+ "description": "세부 설명",
+ "estimated_hours": 2,
+ "children": []
+ }
+ ]
+ }
+ ]
+}
+```
+
+## 수정 요청 처리
+사용자가 수정/추가/삭제를 요청하면:
+1. 변경 사항을 간단히 설명
+2. 수정된 **전체** 트리를 JSON으로 출력
+
+## 규칙
+- 한국어로 응답
+- 기술 스택은 사용자 프로젝트 컨텍스트에 맞추기
+- 불확실한 부분은 질문하기
diff --git a/prompts/unified.md b/prompts/unified.md
index dffc126..9067f92 100644
--- a/prompts/unified.md
+++ b/prompts/unified.md
@@ -7,6 +7,7 @@
**핵심 질문: "이 요청이 무엇을 원하는가?"**
- **애니메이션 관련 요청** → `mode: "anime"` (자막, 영상, 다운로드, 편성표 등)
+- **Nextcloud 도구 요청** → `mode: "nextcloud"` (파일, 일정, 메일, 연락처)
- **프로젝트 파일 변경이 필요** → `mode: "task"`
- **대화로 해결 가능** → `mode: "chat"`
- **판단 불가** → `mode: "clarify"`
@@ -76,12 +77,42 @@
- "프리렌 7화" → title="장송의 프리렌", episode=7
- "일요일 편성" → action="schedule", filter="week:0"
+### nextcloud:
+```json
+{
+ "mode": "nextcloud",
+ "tool": "files | calendar | mail | contacts",
+ "op": "도구별 연산자",
+ "params": {},
+ "summary": "사용자 요청 요약"
+}
+```
+
+**nextcloud 판단 기준:**
+- 파일 찾기/검색/공유 링크 → `tool: "files"`
+- 일정/스케줄/약속 → `tool: "calendar"`
+- 메일/이메일 확인 → `tool: "mail"`
+- 연락처/명함 → `tool: "contacts"`
+
+**도구별 op:**
+- files: `search`, `list`, `link`, `recent`
+- calendar: `today`, `week`, `add`, `delete`, `list_calendars`
+- mail: `unread`, `search`, `get`
+- contacts: `search`, `list`, `scan`
+
+**params 예시:**
+- "세금 관련 파일 찾아줘" → `{"tool":"files","op":"search","params":{"query":"세금"}}`
+- "오늘 일정 뭐야" → `{"tool":"calendar","op":"today","params":{}}`
+- "안 읽은 메일 있어?" → `{"tool":"mail","op":"unread","params":{"limit":5}}`
+- "내일 3시에 회의 잡아줘" → `{"tool":"calendar","op":"add","params":{"summary":"회의","date":"2026-03-19","time":"15:00","duration":"1h"}}`
+
## 규칙
- 반드시 위 JSON 형식만 출력하세요. JSON 외의 텍스트를 포함하지 마세요.
- chat의 response는 마크다운 사용 가능, **완성된 답변**이어야 합니다.
- task에서는 summary만 작성하세요. tasks 배열을 만들지 마세요.
- anime에서는 사용자 의도를 정확히 파악하여 action과 파라미터를 설정하세요.
+- nextcloud에서는 적절한 tool, op, params를 설정하세요.
- 한국어로 응답하세요.
- 이전 대화 기록이 주어지면, 맥락을 고려하세요.
diff --git a/tools/nc_calendar.py b/tools/nc_calendar.py
new file mode 100644
index 0000000..e370082
--- /dev/null
+++ b/tools/nc_calendar.py
@@ -0,0 +1,454 @@
+r"""Nextcloud Calendar (CalDAV) 모듈 — 일정 CRUD.
+
+사용자 명령 기반 일정 조회/추가/수정/삭제.
+CalDAV 프로토콜로 iCalendar (VEVENT) 객체를 관리.
+"""
+
+import asyncio
+import logging
+import os
+import re
+import sys
+import uuid
+from dataclasses import dataclass, field
+from datetime import datetime, timedelta
+from typing import Optional
+
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
+import config
+from tools.nextcloud_client import NextcloudClient, DAV_NS, CAL_NS
+
+logger = logging.getLogger("variet.tools.nc_calendar")
+
+
+@dataclass
+class CalEvent:
+ """캘린더 이벤트."""
+ uid: str
+ summary: str
+ dtstart: datetime
+ dtend: Optional[datetime] = None
+ location: str = ""
+ description: str = ""
+ href: str = "" # CalDAV 리소스 경로
+ calendar: str = "" # 소속 캘린더 이름
+ raw_ics: str = "" # 원본 ICS 데이터
+
+ @property
+ def duration_str(self) -> str:
+ if self.dtend and self.dtstart:
+ delta = self.dtend - self.dtstart
+ hours = delta.seconds // 3600
+ mins = (delta.seconds % 3600) // 60
+ if hours > 0:
+ return f"{hours}시간 {mins}분" if mins else f"{hours}시간"
+ return f"{mins}분"
+ return ""
+
+ @property
+ def date_str(self) -> str:
+ return self.dtstart.strftime("%Y-%m-%d")
+
+ @property
+ def time_str(self) -> str:
+ return self.dtstart.strftime("%H:%M")
+
+ def to_display(self) -> str:
+ """Discord embed용 표시 문자열."""
+ time = self.time_str
+ dur = f" ({self.duration_str})" if self.duration_str else ""
+ loc = f" 📍{self.location}" if self.location else ""
+ return f"⏰ {time}{dur} — **{self.summary}**{loc}"
+
+
+class NCCalendarClient:
+ """Nextcloud CalDAV 캘린더 클라이언트."""
+
+ DEFAULT_CALENDAR = "personal"
+
+ def __init__(self, nc_client: NextcloudClient = None):
+ self.nc = nc_client or NextcloudClient()
+
+ def _calendar_path(self, calendar: str = None) -> str:
+ cal = calendar or self.DEFAULT_CALENDAR
+ return f"calendars/{self.nc.username}/{cal}"
+
+ # ──────────────────────────────────────
+ # 캘린더 목록
+ # ──────────────────────────────────────
+
+ async def list_calendars(self) -> list[str]:
+ """사용자 캘린더 목록 조회."""
+ path = f"calendars/{self.nc.username}/"
+ resp = await self.nc.dav_request(
+ "PROPFIND", path,
+ body="""
+
+
+
+
+
+""",
+ headers={"Depth": "1"},
+ )
+ calendars = []
+ if resp.status_code == 207:
+ from xml.etree import ElementTree as ET
+ root = ET.fromstring(resp.text)
+ for r in root.findall(f"{{{DAV_NS}}}response"):
+ href = r.find(f"{{{DAV_NS}}}href")
+ propstat = r.find(f"{{{DAV_NS}}}propstat")
+ if propstat is None:
+ continue
+ prop = propstat.find(f"{{{DAV_NS}}}prop")
+ if prop is None:
+ continue
+ rtype = prop.find(f"{{{DAV_NS}}}resourcetype")
+ if rtype is not None and rtype.find(f"{{{CAL_NS}}}calendar") is not None:
+ dn = prop.find(f"{{{DAV_NS}}}displayname")
+ name = dn.text if dn is not None and dn.text else href.text.rstrip("/").split("/")[-1]
+ calendars.append(name)
+ return calendars
+
+ # ──────────────────────────────────────
+ # 이벤트 조회
+ # ──────────────────────────────────────
+
+ async def get_events(
+ self,
+ start: datetime = None,
+ end: datetime = None,
+ calendar: str = None,
+ ) -> list[CalEvent]:
+ """기간 내 이벤트 조회.
+
+ Args:
+ start: 시작일 (기본: 오늘 00:00)
+ end: 종료일 (기본: start + 7일)
+ calendar: 캘린더 이름 (기본: personal)
+ """
+ if start is None:
+ start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
+ if end is None:
+ end = start + timedelta(days=7)
+
+ start_str = start.strftime("%Y%m%dT%H%M%SZ")
+ end_str = end.strftime("%Y%m%dT%H%M%SZ")
+
+ body = f"""
+
+
+
+
+
+
+
+
+
+
+
+
+"""
+
+ path = self._calendar_path(calendar)
+ resp = await self.nc.dav_request(
+ "REPORT", path, body=body,
+ headers={"Depth": "1"},
+ )
+
+ events = []
+ if resp.status_code == 207:
+ from xml.etree import ElementTree as ET
+ root = ET.fromstring(resp.text)
+ for r in root.findall(f"{{{DAV_NS}}}response"):
+ href_el = r.find(f"{{{DAV_NS}}}href")
+ href = href_el.text if href_el is not None else ""
+ propstat = r.find(f"{{{DAV_NS}}}propstat")
+ if propstat is None:
+ continue
+ prop = propstat.find(f"{{{DAV_NS}}}prop")
+ if prop is None:
+ continue
+ cal_data = prop.find(f"{{{CAL_NS}}}calendar-data")
+ if cal_data is not None and cal_data.text:
+ event = self._parse_ics(cal_data.text, href, calendar or self.DEFAULT_CALENDAR)
+ if event:
+ events.append(event)
+
+ events.sort(key=lambda e: e.dtstart)
+ logger.info(f"캘린더 이벤트 조회: {len(events)}건 ({start.date()} ~ {end.date()})")
+ return events
+
+ async def get_today(self) -> list[CalEvent]:
+ """오늘 일정 조회."""
+ now = datetime.now()
+ start = now.replace(hour=0, minute=0, second=0, microsecond=0)
+ end = start + timedelta(days=1)
+ return await self.get_events(start, end)
+
+ async def get_week(self) -> list[CalEvent]:
+ """이번주 일정 조회."""
+ now = datetime.now()
+ start = now.replace(hour=0, minute=0, second=0, microsecond=0)
+ # 이번주 월요일부터
+ start = start - timedelta(days=start.weekday())
+ end = start + timedelta(days=7)
+ return await self.get_events(start, end)
+
+ # ──────────────────────────────────────
+ # 이벤트 생성
+ # ──────────────────────────────────────
+
+ async def create_event(
+ self,
+ summary: str,
+ dtstart: datetime,
+ dtend: datetime = None,
+ location: str = "",
+ description: str = "",
+ calendar: str = None,
+ ) -> CalEvent:
+ """이벤트 생성.
+
+ Args:
+ summary: 제목
+ dtstart: 시작 시간
+ dtend: 종료 시간 (기본: 시작 + 1시간)
+ location: 장소
+ description: 설명
+ calendar: 캘린더 이름
+ """
+ uid = f"{uuid.uuid4()}@variet-agent"
+ if dtend is None:
+ dtend = dtstart + timedelta(hours=1)
+
+ ics = self._build_ics(uid, summary, dtstart, dtend, location, description)
+ path = f"{self._calendar_path(calendar)}/{uid}.ics"
+
+ resp = await self.nc.dav_put(path, ics, "text/calendar; charset=utf-8")
+ if resp.status_code in (200, 201, 204):
+ logger.info(f"일정 생성: {summary} ({dtstart})")
+ return CalEvent(
+ uid=uid, summary=summary, dtstart=dtstart, dtend=dtend,
+ location=location, description=description, href=path,
+ calendar=calendar or self.DEFAULT_CALENDAR, raw_ics=ics,
+ )
+ else:
+ raise RuntimeError(f"일정 생성 실패: {resp.status_code} {resp.text[:200]}")
+
+ # ──────────────────────────────────────
+ # 이벤트 수정
+ # ──────────────────────────────────────
+
+ async def update_event(
+ self,
+ uid: str,
+ calendar: str = None,
+ summary: str = None,
+ dtstart: datetime = None,
+ dtend: datetime = None,
+ location: str = None,
+ description: str = None,
+ ) -> CalEvent:
+ """이벤트 수정 — 기존 이벤트를 찾아서 변경."""
+ # 먼저 기존 이벤트를 찾음
+ events = await self.get_events(
+ start=datetime(2020, 1, 1),
+ end=datetime(2030, 12, 31),
+ calendar=calendar,
+ )
+ target = None
+ for e in events:
+ if e.uid == uid:
+ target = e
+ break
+
+ if target is None:
+ raise RuntimeError(f"이벤트 {uid}를 찾을 수 없습니다.")
+
+ # 변경 적용
+ new_summary = summary if summary is not None else target.summary
+ new_dtstart = dtstart if dtstart is not None else target.dtstart
+ new_dtend = dtend if dtend is not None else target.dtend
+ new_location = location if location is not None else target.location
+ new_description = description if description is not None else target.description
+
+ ics = self._build_ics(uid, new_summary, new_dtstart, new_dtend, new_location, new_description)
+ path = target.href or f"{self._calendar_path(calendar)}/{uid}.ics"
+
+ resp = await self.nc.dav_put(path, ics, "text/calendar; charset=utf-8")
+ if resp.status_code in (200, 201, 204):
+ logger.info(f"일정 수정: {new_summary}")
+ return CalEvent(
+ uid=uid, summary=new_summary, dtstart=new_dtstart, dtend=new_dtend,
+ location=new_location, description=new_description, href=path,
+ calendar=calendar or self.DEFAULT_CALENDAR,
+ )
+ else:
+ raise RuntimeError(f"일정 수정 실패: {resp.status_code}")
+
+ # ──────────────────────────────────────
+ # 이벤트 삭제
+ # ──────────────────────────────────────
+
+ async def delete_event(self, uid: str, calendar: str = None) -> bool:
+ """이벤트 삭제."""
+ # href 직접 삭제 시도
+ path = f"{self._calendar_path(calendar)}/{uid}.ics"
+ ok = await self.nc.dav_delete(path)
+ if ok:
+ logger.info(f"일정 삭제: {uid}")
+ else:
+ logger.warning(f"일정 삭제 실패: {uid}")
+ return ok
+
+ # ──────────────────────────────────────
+ # ICS 빌드/파싱
+ # ──────────────────────────────────────
+
+ @staticmethod
+ def _build_ics(
+ uid: str, summary: str,
+ dtstart: datetime, dtend: datetime,
+ location: str = "", description: str = "",
+ ) -> str:
+ """VEVENT ICS 문자열 생성."""
+ now = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
+ ds = dtstart.strftime("%Y%m%dT%H%M%S")
+ de = dtend.strftime("%Y%m%dT%H%M%S")
+
+ lines = [
+ "BEGIN:VCALENDAR",
+ "VERSION:2.0",
+ "PRODID:-//Variet-Agent//NC Calendar//KO",
+ "BEGIN:VEVENT",
+ f"UID:{uid}",
+ f"DTSTAMP:{now}",
+ f"DTSTART:{ds}",
+ f"DTEND:{de}",
+ f"SUMMARY:{summary}",
+ ]
+ if location:
+ lines.append(f"LOCATION:{location}")
+ if description:
+ lines.append(f"DESCRIPTION:{description}")
+ lines.extend([
+ "END:VEVENT",
+ "END:VCALENDAR",
+ ])
+ return "\r\n".join(lines)
+
+ @staticmethod
+ def _parse_ics(ics_text: str, href: str = "", calendar: str = "") -> Optional[CalEvent]:
+ """ICS 텍스트에서 CalEvent 추출 (간단 파서)."""
+ def _get_field(text: str, field: str) -> str:
+ pattern = rf"^{field}[;:](.+)$"
+ m = re.search(pattern, text, re.MULTILINE)
+ return m.group(1).strip() if m else ""
+
+ def _parse_dt(val: str) -> Optional[datetime]:
+ if not val:
+ return None
+ # VALUE=DATE:20260318 형식 처리
+ if "VALUE=DATE:" in val:
+ val = val.split(":")[-1]
+ elif ":" in val:
+ val = val.split(":")[-1]
+ val = val.replace("Z", "")
+ for fmt in ("%Y%m%dT%H%M%S", "%Y%m%d"):
+ try:
+ return datetime.strptime(val, fmt)
+ except ValueError:
+ continue
+ return None
+
+ uid = _get_field(ics_text, "UID")
+ summary = _get_field(ics_text, "SUMMARY")
+ dtstart = _parse_dt(_get_field(ics_text, "DTSTART"))
+ dtend = _parse_dt(_get_field(ics_text, "DTEND"))
+ location = _get_field(ics_text, "LOCATION")
+ description = _get_field(ics_text, "DESCRIPTION")
+
+ if not uid or not dtstart:
+ return None
+
+ return CalEvent(
+ uid=uid, summary=summary, dtstart=dtstart, dtend=dtend,
+ location=location, description=description,
+ href=href, calendar=calendar, raw_ics=ics_text,
+ )
+
+
+# ──────────────────────────────────────
+# CLI
+# ──────────────────────────────────────
+
+async def _cli():
+ import io
+ if sys.stdout.encoding != "utf-8":
+ sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
+
+ args = sys.argv[1:]
+ client = NCCalendarClient()
+
+ if not args:
+ print("사용법:")
+ print(" nc_calendar.py calendars")
+ print(" nc_calendar.py today")
+ print(" nc_calendar.py week")
+ print(' nc_calendar.py add "제목" 2026-03-20 14:00 [1h]')
+ print(' nc_calendar.py delete ')
+ return
+
+ if args[0] == "calendars":
+ cals = await client.list_calendars()
+ print(f"📅 캘린더 {len(cals)}개:")
+ for c in cals:
+ print(f" • {c}")
+
+ elif args[0] == "today":
+ events = await client.get_today()
+ print(f"📅 오늘 일정 ({datetime.now().strftime('%Y-%m-%d')}):\n")
+ if not events:
+ print(" 일정 없음")
+ for e in events:
+ print(f" {e.to_display()}")
+
+ elif args[0] == "week":
+ events = await client.get_week()
+ print(f"📅 이번주 일정:\n")
+ current_date = ""
+ for e in events:
+ if e.date_str != current_date:
+ current_date = e.date_str
+ print(f"\n 📆 {current_date}")
+ print(f" {e.to_display()}")
+
+ elif args[0] == "add" and len(args) >= 3:
+ summary = args[1]
+ date_str = args[2]
+ time_str = args[3] if len(args) > 3 else "09:00"
+ duration = args[4] if len(args) > 4 else "1h"
+
+ dtstart = datetime.strptime(f"{date_str} {time_str}", "%Y-%m-%d %H:%M")
+ # duration 파싱
+ dur_match = re.match(r"(\d+)([hm])", duration)
+ if dur_match:
+ val, unit = int(dur_match.group(1)), dur_match.group(2)
+ delta = timedelta(hours=val) if unit == "h" else timedelta(minutes=val)
+ else:
+ delta = timedelta(hours=1)
+ dtend = dtstart + delta
+
+ event = await client.create_event(summary, dtstart, dtend)
+ print(f"✅ 일정 생성: {event.to_display()}")
+ print(f" UID: {event.uid}")
+
+ elif args[0] == "delete" and len(args) > 1:
+ uid = args[1]
+ ok = await client.delete_event(uid)
+ print(f"{'✅ 삭제 완료' if ok else '❌ 삭제 실패'}: {uid}")
+
+
+if __name__ == "__main__":
+ asyncio.run(_cli())
diff --git a/tools/nc_contacts.py b/tools/nc_contacts.py
new file mode 100644
index 0000000..c3b90c5
--- /dev/null
+++ b/tools/nc_contacts.py
@@ -0,0 +1,436 @@
+r"""Nextcloud Contacts + 명함 스캔 모듈.
+
+EasyOCR로 명함 이미지 텍스트 추출 → 정규식 필드 분류 → vCard 생성 → CardDAV 저장.
+연락처 검색/조회 기능 포함.
+"""
+
+import asyncio
+import logging
+import os
+import re
+import sys
+import uuid
+from dataclasses import dataclass, field
+from typing import Optional
+
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
+import config
+from tools.nextcloud_client import NextcloudClient, DAV_NS, CARD_NS
+
+logger = logging.getLogger("variet.tools.nc_contacts")
+
+
+@dataclass
+class Contact:
+ """연락처 정보."""
+ uid: str = ""
+ name: str = ""
+ company: str = ""
+ title: str = "" # 직함
+ phone: list[str] = field(default_factory=list)
+ email: list[str] = field(default_factory=list)
+ address: str = ""
+ note: str = ""
+ href: str = "" # CardDAV 리소스 경로
+
+ def to_vcard(self) -> str:
+ """vCard 3.0 포맷 생성."""
+ uid = self.uid or str(uuid.uuid4())
+ lines = [
+ "BEGIN:VCARD",
+ "VERSION:3.0",
+ f"UID:{uid}",
+ f"FN:{self.name}",
+ f"N:{self.name};;;;",
+ ]
+ if self.company:
+ lines.append(f"ORG:{self.company}")
+ if self.title:
+ lines.append(f"TITLE:{self.title}")
+ for p in self.phone:
+ lines.append(f"TEL;TYPE=CELL:{p}")
+ for e in self.email:
+ lines.append(f"EMAIL:{e}")
+ if self.address:
+ lines.append(f"ADR;TYPE=WORK:;;{self.address};;;;")
+ if self.note:
+ lines.append(f"NOTE:{self.note}")
+ lines.append("END:VCARD")
+ return "\r\n".join(lines)
+
+ def to_display(self) -> str:
+ """Discord embed용 표시."""
+ parts = [f"👤 **{self.name}**"]
+ if self.title and self.company:
+ parts.append(f" 🏢 {self.company} / {self.title}")
+ elif self.company:
+ parts.append(f" 🏢 {self.company}")
+ elif self.title:
+ parts.append(f" 💼 {self.title}")
+ for p in self.phone:
+ parts.append(f" 📞 {p}")
+ for e in self.email:
+ parts.append(f" 📧 {e}")
+ if self.address:
+ parts.append(f" 📍 {self.address}")
+ return "\n".join(parts)
+
+
+class NCContactsClient:
+ """Nextcloud CardDAV 연락처 + 명함 스캔 클라이언트."""
+
+ DEFAULT_ADDRESSBOOK = "contacts"
+
+ def __init__(self, nc_client: NextcloudClient = None):
+ self.nc = nc_client or NextcloudClient()
+ self._ocr = None
+
+ def _addressbook_path(self, addressbook: str = None) -> str:
+ ab = addressbook or self.DEFAULT_ADDRESSBOOK
+ return f"addressbooks/users/{self.nc.username}/{ab}"
+
+ # ──────────────────────────────────────
+ # OCR (EasyOCR)
+ # ──────────────────────────────────────
+
+ def _get_ocr(self):
+ """EasyOCR Reader 지연 로딩."""
+ if self._ocr is None:
+ try:
+ import easyocr
+ self._ocr = easyocr.Reader(["ko", "en", "ja"], gpu=False)
+ logger.info("EasyOCR 초기화 완료 (ko, en, ja)")
+ except ImportError:
+ raise RuntimeError(
+ "easyocr 미설치. pip install easyocr 실행 필요."
+ )
+ return self._ocr
+
+ async def scan_card(self, image_path: str) -> Contact:
+ """명함 이미지 → 연락처 정보 추출.
+
+ Args:
+ image_path: 이미지 파일 경로 또는 URL
+
+ Returns:
+ 추출된 Contact 객체
+ """
+ def _ocr_extract():
+ reader = self._get_ocr()
+ results = reader.readtext(image_path)
+ # (bbox, text, confidence) 리스트
+ texts = [text for _, text, conf in results if conf > 0.3]
+ return texts
+
+ texts = await asyncio.to_thread(_ocr_extract)
+ logger.info(f"명함 OCR: {len(texts)}줄 추출")
+
+ return self._classify_fields(texts)
+
+ @staticmethod
+ def _classify_fields(texts: list[str]) -> Contact:
+ """OCR 텍스트 → 구조화된 Contact로 분류.
+
+ 정규식으로 이메일, 전화번호, URL 등을 먼저 분류하고
+ 나머지 텍스트에서 이름/회사/직함을 추론.
+ """
+ contact = Contact(uid=str(uuid.uuid4()))
+ remaining = []
+
+ for text in texts:
+ text = text.strip()
+ if not text:
+ continue
+
+ # 이메일
+ email_match = re.search(r"[\w.+-]+@[\w.-]+\.\w+", text)
+ if email_match:
+ contact.email.append(email_match.group())
+ continue
+
+ # 전화번호 (한국/국제)
+ phone_match = re.search(
+ r"(?:\+?\d{1,3}[-.\s]?)?"
+ r"(?:\(?\d{2,4}\)?[-.\s]?)?"
+ r"\d{3,4}[-.\s]?\d{3,4}",
+ text,
+ )
+ if phone_match and len(phone_match.group().replace("-", "").replace(" ", "").replace(".", "")) >= 8:
+ contact.phone.append(phone_match.group())
+ continue
+
+ # URL (웹사이트) → note에 저장
+ if re.match(r"(https?://|www\.)", text, re.IGNORECASE):
+ contact.note += f" {text}"
+ continue
+
+ # 주소 패턴 (번지, 로, 길, 동, 층 등)
+ if re.search(r"(시|구|동|로|길|층|호|번지|타워|빌딩|센터)", text):
+ contact.address = text
+ continue
+
+ remaining.append(text)
+
+ # 나머지 텍스트 분류 (이름, 회사, 직함)
+ if remaining:
+ # 보통 명함에서 가장 큰 텍스트(첫 번째)가 이름
+ contact.name = remaining[0]
+
+ for text in remaining[1:]:
+ # 직함 키워드
+ if re.search(r"(대표|이사|부장|과장|차장|팀장|사원|매니저|Director|Manager|CEO|CTO|VP|Engineer|Developer)", text, re.IGNORECASE):
+ contact.title = text
+ elif not contact.company:
+ # 회사명 (나머지 중 첫 번째)
+ contact.company = text
+
+ contact.note = contact.note.strip()
+ return contact
+
+ # ──────────────────────────────────────
+ # 연락처 저장 (CardDAV)
+ # ──────────────────────────────────────
+
+ async def create_contact(self, contact: Contact, addressbook: str = None) -> Contact:
+ """연락처를 Nextcloud에 저장.
+
+ Args:
+ contact: Contact 객체
+ addressbook: 주소록 이름
+
+ Returns:
+ 저장된 Contact (href 포함)
+ """
+ if not contact.uid:
+ contact.uid = str(uuid.uuid4())
+
+ vcard = contact.to_vcard()
+ path = f"{self._addressbook_path(addressbook)}/{contact.uid}.vcf"
+
+ resp = await self.nc.dav_put(path, vcard, "text/vcard; charset=utf-8")
+ if resp.status_code in (200, 201, 204):
+ contact.href = path
+ logger.info(f"연락처 저장: {contact.name}")
+ return contact
+ else:
+ raise RuntimeError(f"연락처 저장 실패: {resp.status_code} {resp.text[:200]}")
+
+ # ──────────────────────────────────────
+ # 연락처 검색/조회
+ # ──────────────────────────────────────
+
+ async def search_contacts(self, query: str, addressbook: str = None) -> list[Contact]:
+ """연락처 검색.
+
+ Args:
+ query: 검색어 (이름, 이메일, 전화번호)
+ """
+ body = f"""
+
+
+
+
+
+
+
+ {query}
+
+
+"""
+
+ path = self._addressbook_path(addressbook)
+ resp = await self.nc.dav_request(
+ "REPORT", path, body=body,
+ headers={"Depth": "1"},
+ )
+
+ contacts = []
+ if resp.status_code == 207:
+ from xml.etree import ElementTree as ET
+ root = ET.fromstring(resp.text)
+ for r in root.findall(f"{{{DAV_NS}}}response"):
+ href_el = r.find(f"{{{DAV_NS}}}href")
+ href = href_el.text if href_el is not None else ""
+ propstat = r.find(f"{{{DAV_NS}}}propstat")
+ if propstat is None:
+ continue
+ prop = propstat.find(f"{{{DAV_NS}}}prop")
+ if prop is None:
+ continue
+ addr_data = prop.find(f"{{{CARD_NS}}}address-data")
+ if addr_data is not None and addr_data.text:
+ contact = self._parse_vcard(addr_data.text, href)
+ if contact:
+ contacts.append(contact)
+
+ logger.info(f"연락처 검색 '{query}': {len(contacts)}건")
+ return contacts
+
+ async def list_contacts(self, addressbook: str = None) -> list[Contact]:
+ """전체 연락처 목록."""
+ path = self._addressbook_path(addressbook)
+ body = """
+
+
+
+
+
+"""
+
+ resp = await self.nc.dav_request(
+ "REPORT", path, body=body,
+ headers={"Depth": "1"},
+ )
+
+ contacts = []
+ if resp.status_code == 207:
+ from xml.etree import ElementTree as ET
+ root = ET.fromstring(resp.text)
+ for r in root.findall(f"{{{DAV_NS}}}response"):
+ href_el = r.find(f"{{{DAV_NS}}}href")
+ href = href_el.text if href_el is not None else ""
+ propstat = r.find(f"{{{DAV_NS}}}propstat")
+ if propstat is None:
+ continue
+ prop = propstat.find(f"{{{DAV_NS}}}prop")
+ if prop is None:
+ continue
+ addr_data = prop.find(f"{{{CARD_NS}}}address-data")
+ if addr_data is not None and addr_data.text:
+ contact = self._parse_vcard(addr_data.text, href)
+ if contact:
+ contacts.append(contact)
+
+ contacts.sort(key=lambda c: c.name)
+ return contacts
+
+ # ──────────────────────────────────────
+ # vCard 파싱
+ # ──────────────────────────────────────
+
+ @staticmethod
+ def _parse_vcard(vcard_text: str, href: str = "") -> Optional[Contact]:
+ """vCard 텍스트 → Contact 객체."""
+ def _get(text: str, field: str) -> str:
+ pattern = rf"^{field}[;:](.+)$"
+ m = re.search(pattern, text, re.MULTILINE)
+ return m.group(1).strip() if m else ""
+
+ def _get_all(text: str, field: str) -> list[str]:
+ pattern = rf"^{field}[;:](.+)$"
+ return [m.group(1).strip() for m in re.finditer(pattern, text, re.MULTILINE)]
+
+ uid = _get(vcard_text, "UID")
+ fn = _get(vcard_text, "FN")
+ org = _get(vcard_text, "ORG")
+ title = _get(vcard_text, "TITLE")
+ note = _get(vcard_text, "NOTE")
+
+ phones = _get_all(vcard_text, "TEL")
+ # TEL;TYPE=CELL:010-... → 번호만 추출
+ phones = [re.sub(r"^.*:", "", p) if ":" in p else p for p in phones]
+
+ emails = _get_all(vcard_text, "EMAIL")
+ emails = [re.sub(r"^.*:", "", e) if ":" in e else e for e in emails]
+
+ adr = _get(vcard_text, "ADR")
+ # ADR;TYPE=WORK:;;... → 주소만 추출
+ if ";" in adr:
+ parts = adr.split(";")
+ adr = " ".join(p for p in parts if p).strip()
+
+ if not fn and not uid:
+ return None
+
+ return Contact(
+ uid=uid, name=fn, company=org, title=title,
+ phone=phones, email=emails, address=adr, note=note,
+ href=href,
+ )
+
+ # ──────────────────────────────────────
+ # 명함 스캔 + 저장 통합
+ # ──────────────────────────────────────
+
+ async def scan_and_save(
+ self,
+ image_path: str,
+ save_image: bool = True,
+ addressbook: str = None,
+ ) -> Contact:
+ """명함 스캔 → 연락처 저장 → 원본 이미지도 NC에 보관.
+
+ Args:
+ image_path: 이미지 파일 경로
+ save_image: 원본 이미지를 NC Files에도 저장할지
+ addressbook: 주소록 이름
+ """
+ # 1. OCR
+ contact = await self.scan_card(image_path)
+
+ # 2. CardDAV 저장
+ contact = await self.create_contact(contact, addressbook)
+
+ # 3. 원본 이미지도 NC Files에 저장 (선택)
+ if save_image and os.path.exists(image_path):
+ try:
+ with open(image_path, "rb") as f:
+ img_data = f.read()
+ remote_name = f"BusinessCards/{contact.uid}.jpg"
+ await self.nc.upload_file(remote_name, img_data, "image/jpeg")
+ logger.info(f"명함 이미지 저장: {remote_name}")
+ except Exception as e:
+ logger.warning(f"명함 이미지 저장 실패: {e}")
+
+ return contact
+
+
+# ──────────────────────────────────────
+# CLI
+# ──────────────────────────────────────
+
+async def _cli():
+ import io
+ if sys.stdout.encoding != "utf-8":
+ sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
+
+ args = sys.argv[1:]
+ client = NCContactsClient()
+
+ if not args:
+ print("사용법:")
+ print(' nc_contacts.py scan <이미지경로>')
+ print(' nc_contacts.py search "이름"')
+ print(' nc_contacts.py list')
+ return
+
+ if args[0] == "scan" and len(args) > 1:
+ image_path = args[1]
+ contact = await client.scan_card(image_path)
+ print(f"📇 명함 스캔 결과:\n")
+ print(contact.to_display())
+ print(f"\n💾 저장하려면: nc_contacts.py save <이미지경로>")
+
+ elif args[0] == "save" and len(args) > 1:
+ image_path = args[1]
+ contact = await client.scan_and_save(image_path)
+ print(f"✅ 연락처 저장 완료:\n")
+ print(contact.to_display())
+
+ elif args[0] == "search" and len(args) > 1:
+ query = " ".join(args[1:])
+ contacts = await client.search_contacts(query)
+ print(f"🔍 '{query}' 검색 결과: {len(contacts)}건\n")
+ for c in contacts:
+ print(c.to_display())
+ print()
+
+ elif args[0] == "list":
+ contacts = await client.list_contacts()
+ print(f"📇 전체 연락처: {len(contacts)}건\n")
+ for c in contacts:
+ print(f" 👤 {c.name} — {', '.join(c.phone) or '전화 없음'} — {', '.join(c.email) or '이메일 없음'}")
+
+
+if __name__ == "__main__":
+ asyncio.run(_cli())
diff --git a/tools/nc_files.py b/tools/nc_files.py
new file mode 100644
index 0000000..d71c0b8
--- /dev/null
+++ b/tools/nc_files.py
@@ -0,0 +1,245 @@
+r"""Nextcloud Files 검색/관리 모듈.
+
+파일 검색, 최근 파일 조회, 공유 링크 생성 등.
+업로드는 NC 앱에서 직접 수행 — 봇은 검색/관리/링크 역할.
+"""
+
+import asyncio
+import logging
+import os
+import sys
+from dataclasses import dataclass
+from datetime import datetime
+from typing import Optional
+
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
+import config
+from tools.nextcloud_client import NextcloudClient
+
+logger = logging.getLogger("variet.tools.nc_files")
+
+
+@dataclass
+class NCFile:
+ """Nextcloud 파일 정보."""
+ name: str
+ path: str # 사용자 상대 경로 (예: Documents/세금/...)
+ size: int = 0
+ lastmod: str = ""
+ content_type: str = ""
+ is_dir: bool = False
+
+ @property
+ def size_human(self) -> str:
+ """사람이 읽기 쉬운 크기 표시."""
+ if self.size < 1024:
+ return f"{self.size}B"
+ elif self.size < 1024 * 1024:
+ return f"{self.size / 1024:.1f}KB"
+ elif self.size < 1024 * 1024 * 1024:
+ return f"{self.size / (1024 * 1024):.1f}MB"
+ else:
+ return f"{self.size / (1024 * 1024 * 1024):.1f}GB"
+
+ @property
+ def icon(self) -> str:
+ if self.is_dir:
+ return "📁"
+ ct = self.content_type or ""
+ if "image" in ct:
+ return "🖼️"
+ elif "pdf" in ct:
+ return "📕"
+ elif "video" in ct:
+ return "🎬"
+ elif "audio" in ct:
+ return "🎵"
+ elif "zip" in ct or "tar" in ct or "rar" in ct:
+ return "📦"
+ elif "spreadsheet" in ct or "excel" in ct:
+ return "📊"
+ elif "document" in ct or "word" in ct:
+ return "📝"
+ return "📄"
+
+
+class NCFilesClient:
+ """Nextcloud Files 검색/관리 클라이언트."""
+
+ def __init__(self, nc_client: NextcloudClient = None):
+ self.nc = nc_client or NextcloudClient()
+
+ def _href_to_user_path(self, href: str) -> str:
+ """WebDAV href → 사용자 상대 경로."""
+ # href: /remote.php/dav/files/username/Documents/...
+ marker = f"/files/{self.nc.username}/"
+ idx = href.find(marker)
+ if idx >= 0:
+ return href[idx + len(marker):].rstrip("/")
+ return href.rstrip("/").split("/")[-1]
+
+ def _to_ncfile(self, item: dict) -> NCFile:
+ """propfind 결과 dict → NCFile."""
+ return NCFile(
+ name=item.get("name", ""),
+ path=self._href_to_user_path(item.get("href", "")),
+ size=item.get("size", 0),
+ lastmod=item.get("lastmod", ""),
+ content_type=item.get("content_type", ""),
+ is_dir=item.get("is_dir", False),
+ )
+
+ # ──────────────────────────────────────
+ # 검색
+ # ──────────────────────────────────────
+
+ async def search(self, query: str) -> list[NCFile]:
+ """파일 검색 (이름 기준).
+
+ Args:
+ query: 검색어 (부분 일치)
+
+ Returns:
+ 매칭된 NCFile 리스트
+ """
+ results = await self.nc.webdav_search(query)
+ files = [self._to_ncfile(r) for r in results]
+ # 디렉토리보다 파일 우선, 최신순
+ files.sort(key=lambda f: (f.is_dir, f.name))
+ logger.info(f"파일 검색 '{query}': {len(files)}건")
+ return files
+
+ # ──────────────────────────────────────
+ # 목록
+ # ──────────────────────────────────────
+
+ async def list_dir(self, path: str = "") -> list[NCFile]:
+ """디렉토리 목록 조회.
+
+ Args:
+ path: 사용자 상대 경로 (빈 문자열 = 루트)
+ """
+ dav_path = f"files/{self.nc.username}/{path.lstrip('/')}"
+ results = await self.nc.webdav_propfind(dav_path)
+ # 첫 번째는 자기 자신 → 제외
+ files = [self._to_ncfile(r) for r in results[1:]]
+ files.sort(key=lambda f: (not f.is_dir, f.name.lower()))
+ return files
+
+ async def list_recent(self, limit: int = 10) -> list[NCFile]:
+ """최근 수정된 파일 목록.
+
+ PROPFIND depth=infinity는 부하가 크므로
+ OCS activity API나 search를 활용.
+ """
+ # 대안: WebDAV SEARCH로 최근 수정 파일 검색
+ # 현재는 루트 1단계만 조회하여 lastmod 정렬
+ dav_path = f"files/{self.nc.username}"
+ # depth=infinity는 서버 부하 → 대신 OCS favorite 또는 search 활용
+ # 간단하게: search로 '*' 전체 검색 후 정렬
+ results = await self.nc.webdav_search("*")
+ files = [self._to_ncfile(r) for r in results if not r.get("is_dir")]
+ # lastmod 기준 정렬 (최신 먼저)
+ files.sort(key=lambda f: f.lastmod, reverse=True)
+ return files[:limit]
+
+ # ──────────────────────────────────────
+ # 공유 링크
+ # ──────────────────────────────────────
+
+ async def create_link(self, path: str, expire_days: int = 7) -> Optional[str]:
+ """파일/폴더의 공유 링크 생성.
+
+ Args:
+ path: 사용자 상대 경로
+ expire_days: 만료 일수 (0 = 만료 없음)
+
+ Returns:
+ 공유 URL 또는 None
+ """
+ # OCS Share API는 / 시작 경로 필요
+ share_path = f"/{path.lstrip('/')}"
+ return await self.nc.create_share_link(share_path, expire_days)
+
+ # ──────────────────────────────────────
+ # 용량 분석
+ # ──────────────────────────────────────
+
+ async def get_quota(self) -> dict:
+ """사용자 스토리지 용량 정보.
+
+ Returns:
+ {used, total, free, percent} (바이트 기준)
+ """
+ try:
+ dav_path = f"files/{self.nc.username}"
+ results = await self.nc.webdav_propfind(
+ dav_path,
+ props=["d:quota-used-bytes", "d:quota-available-bytes"],
+ depth=0,
+ )
+ if results:
+ prop = results[0]
+ # propfind 커스텀 필드는 직접 파싱 필요
+ # 기본 구현에서는 OCS 활용
+ return {}
+ except Exception as e:
+ logger.warning(f"용량 조회 실패: {e}")
+ return {}
+
+
+# ──────────────────────────────────────
+# CLI
+# ──────────────────────────────────────
+
+async def _cli():
+ import io
+ if sys.stdout.encoding != "utf-8":
+ sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
+
+ args = sys.argv[1:]
+ client = NCFilesClient()
+
+ if not args:
+ print("사용법:")
+ print(" nc_files.py search <검색어>")
+ print(" nc_files.py ls [경로]")
+ print(" nc_files.py recent [갯수]")
+ print(" nc_files.py link <경로>")
+ return
+
+ if args[0] == "search" and len(args) > 1:
+ query = " ".join(args[1:])
+ files = await client.search(query)
+ print(f"🔍 '{query}' 검색 결과: {len(files)}건\n")
+ for i, f in enumerate(files, 1):
+ print(f" {i}. {f.icon} {f.name} ({f.size_human})")
+ print(f" 📂 {f.path}")
+
+ elif args[0] == "ls":
+ path = args[1] if len(args) > 1 else ""
+ files = await client.list_dir(path)
+ print(f"📂 /{path or '(루트)'} — {len(files)}건\n")
+ for f in files:
+ size = f" ({f.size_human})" if not f.is_dir else ""
+ print(f" {f.icon} {f.name}{size}")
+
+ elif args[0] == "recent":
+ limit = int(args[1]) if len(args) > 1 else 10
+ files = await client.list_recent(limit)
+ print(f"🕐 최근 파일 {len(files)}건:\n")
+ for i, f in enumerate(files, 1):
+ print(f" {i}. {f.icon} {f.name} ({f.size_human})")
+ print(f" 📂 {f.path} | 📅 {f.lastmod}")
+
+ elif args[0] == "link" and len(args) > 1:
+ path = args[1]
+ url = await client.create_link(path)
+ if url:
+ print(f"🔗 {url}")
+ else:
+ print("❌ 공유 링크 생성 실패")
+
+
+if __name__ == "__main__":
+ asyncio.run(_cli())
diff --git a/tools/nc_mail.py b/tools/nc_mail.py
new file mode 100644
index 0000000..66b9bcf
--- /dev/null
+++ b/tools/nc_mail.py
@@ -0,0 +1,324 @@
+r"""Nextcloud Mail (IMAP) 모듈 — 메일 조회/검색/요약.
+
+Mailcow IMAP 직접 연결로 메일 조회.
+AI 요약 기능은 GeminiCaller를 통해 처리.
+"""
+
+import asyncio
+import email
+import email.header
+import email.message
+import imaplib
+import logging
+import os
+import re
+import sys
+from dataclasses import dataclass
+from datetime import datetime
+from typing import Optional
+
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
+import config
+
+logger = logging.getLogger("variet.tools.nc_mail")
+
+
+@dataclass
+class MailMessage:
+ """메일 메시지 정보."""
+ uid: str
+ subject: str
+ sender: str
+ date: str
+ snippet: str = "" # 본문 미리보기 (200자)
+ body: str = "" # 전체 본문
+ is_read: bool = False
+
+ @property
+ def sender_name(self) -> str:
+ """발신자 이름만 추출."""
+ if "<" in self.sender:
+ return self.sender.split("<")[0].strip().strip('"')
+ return self.sender
+
+ @property
+ def sender_email(self) -> str:
+ """발신자 이메일만 추출."""
+ m = re.search(r"<(.+?)>", self.sender)
+ return m.group(1) if m else self.sender
+
+ def to_display(self) -> str:
+ """Discord embed용 표시."""
+ read_icon = "📬" if not self.is_read else "📭"
+ return f"{read_icon} **{self.sender_name}** — {self.subject}\n _{self.snippet}_"
+
+
+class NCMailClient:
+ """Mailcow IMAP 메일 클라이언트."""
+
+ def __init__(
+ self,
+ host: str = None,
+ port: int = None,
+ username: str = None,
+ password: str = None,
+ ):
+ self.host = host or config.MAIL_IMAP_HOST
+ self.port = port or config.MAIL_IMAP_PORT
+ self.username = username or config.MAIL_USER
+ self.password = password or config.MAIL_PASSWORD
+
+ def _connect(self) -> imaplib.IMAP4_SSL:
+ """IMAP SSL 연결 + 로그인 (PLAIN auth — Mailcow 앱 비밀번호 호환)."""
+ conn = imaplib.IMAP4_SSL(self.host, self.port)
+ # Mailcow 앱 비밀번호는 LOGIN 커맨드 거부 → PLAIN auth 사용
+ conn.authenticate(
+ "PLAIN",
+ lambda x: (
+ "\0" + self.username + "\0" + self.password
+ ).encode(),
+ )
+ return conn
+
+ @staticmethod
+ def _decode_header(raw: str) -> str:
+ """MIME 인코딩된 헤더 디코딩."""
+ if not raw:
+ return ""
+ decoded_parts = email.header.decode_header(raw)
+ result = []
+ for part, charset in decoded_parts:
+ if isinstance(part, bytes):
+ result.append(part.decode(charset or "utf-8", errors="replace"))
+ else:
+ result.append(part)
+ return " ".join(result)
+
+ @staticmethod
+ def _extract_body(msg: email.message.Message, max_len: int = 2000) -> str:
+ """메일 본문 추출 (text/plain 우선)."""
+ body = ""
+ if msg.is_multipart():
+ for part in msg.walk():
+ ct = part.get_content_type()
+ if ct == "text/plain":
+ payload = part.get_payload(decode=True)
+ if payload:
+ charset = part.get_content_charset() or "utf-8"
+ body = payload.decode(charset, errors="replace")
+ break
+ # text/plain이 없으면 text/html에서 태그 제거
+ if not body:
+ for part in msg.walk():
+ ct = part.get_content_type()
+ if ct == "text/html":
+ payload = part.get_payload(decode=True)
+ if payload:
+ charset = part.get_content_charset() or "utf-8"
+ html = payload.decode(charset, errors="replace")
+ body = re.sub(r"<[^>]+>", "", html)
+ body = re.sub(r"\s+", " ", body).strip()
+ break
+ else:
+ payload = msg.get_payload(decode=True)
+ if payload:
+ charset = msg.get_content_charset() or "utf-8"
+ body = payload.decode(charset, errors="replace")
+
+ return body[:max_len]
+
+ def _fetch_message(self, conn: imaplib.IMAP4_SSL, uid: str, body: bool = False) -> Optional[MailMessage]:
+ """UID로 메일 메시지 가져오기."""
+ fetch_parts = "(FLAGS BODY.PEEK[HEADER])" if not body else "(FLAGS BODY.PEEK[])"
+ status, data = conn.uid("FETCH", uid, fetch_parts)
+ if status != "OK" or not data or data[0] is None:
+ return None
+
+ raw_email = data[0][1] if isinstance(data[0], tuple) else data[0]
+ if isinstance(raw_email, bytes):
+ msg = email.message_from_bytes(raw_email)
+ else:
+ return None
+
+ # FLAGS 파싱
+ flags_data = data[0][0] if isinstance(data[0], tuple) else b""
+ is_read = b"\\Seen" in flags_data
+
+ subject = self._decode_header(msg.get("Subject", ""))
+ sender = self._decode_header(msg.get("From", ""))
+ date_str = msg.get("Date", "")
+
+ mail_body = ""
+ snippet = ""
+ if body:
+ mail_body = self._extract_body(msg)
+ snippet = mail_body[:200].replace("\n", " ").strip()
+ else:
+ snippet = subject[:200]
+
+ return MailMessage(
+ uid=uid,
+ subject=subject,
+ sender=sender,
+ date=date_str,
+ snippet=snippet,
+ body=mail_body,
+ is_read=is_read,
+ )
+
+ # ──────────────────────────────────────
+ # 미확인 메일
+ # ──────────────────────────────────────
+
+ async def get_unread(self, limit: int = 5, mailbox: str = "INBOX") -> list[MailMessage]:
+ """미확인 메일 조회.
+
+ Args:
+ limit: 최대 건수
+ mailbox: 메일함 (기본: INBOX)
+ """
+ def _fetch():
+ conn = self._connect()
+ try:
+ conn.select(mailbox, readonly=True)
+ status, data = conn.uid("SEARCH", None, "UNSEEN")
+ if status != "OK":
+ return []
+ uids = data[0].split()
+ # 최신 순
+ uids = uids[-limit:] if len(uids) > limit else uids
+ uids.reverse()
+
+ messages = []
+ for uid_bytes in uids:
+ uid = uid_bytes.decode()
+ msg = self._fetch_message(conn, uid, body=True)
+ if msg:
+ messages.append(msg)
+ return messages
+ finally:
+ conn.logout()
+
+ messages = await asyncio.to_thread(_fetch)
+ logger.info(f"미확인 메일 조회: {len(messages)}건")
+ return messages
+
+ # ──────────────────────────────────────
+ # 메일 검색
+ # ──────────────────────────────────────
+
+ async def search(self, query: str, mailbox: str = "INBOX", limit: int = 10) -> list[MailMessage]:
+ """메일 검색.
+
+ Args:
+ query: IMAP SEARCH 쿼리 또는 자연어 키워드
+ mailbox: 메일함
+ limit: 최대 건수
+ """
+ def _search():
+ conn = self._connect()
+ try:
+ conn.select(mailbox, readonly=True)
+
+ # 자연어 → IMAP SEARCH 변환
+ if query.upper().startswith(("FROM", "SUBJECT", "SINCE", "BEFORE", "TO")):
+ search_criteria = query
+ else:
+ # 기본: SUBJECT + FROM 동시 검색
+ search_criteria = f'OR SUBJECT "{query}" FROM "{query}"'
+
+ status, data = conn.uid("SEARCH", None, search_criteria)
+ if status != "OK":
+ return []
+ uids = data[0].split()
+ uids = uids[-limit:] if len(uids) > limit else uids
+ uids.reverse()
+
+ messages = []
+ for uid_bytes in uids:
+ uid = uid_bytes.decode()
+ msg = self._fetch_message(conn, uid, body=False)
+ if msg:
+ messages.append(msg)
+ return messages
+ finally:
+ conn.logout()
+
+ messages = await asyncio.to_thread(_search)
+ logger.info(f"메일 검색 '{query}': {len(messages)}건")
+ return messages
+
+ # ──────────────────────────────────────
+ # 메일 본문 조회
+ # ──────────────────────────────────────
+
+ async def get_message(self, uid: str, mailbox: str = "INBOX") -> Optional[MailMessage]:
+ """메일 본문 전체 조회.
+
+ Args:
+ uid: 메일 UID
+ mailbox: 메일함
+ """
+ def _fetch():
+ conn = self._connect()
+ try:
+ conn.select(mailbox, readonly=True)
+ return self._fetch_message(conn, uid, body=True)
+ finally:
+ conn.logout()
+
+ return await asyncio.to_thread(_fetch)
+
+
+# ──────────────────────────────────────
+# CLI
+# ──────────────────────────────────────
+
+async def _cli():
+ import io
+ if sys.stdout.encoding != "utf-8":
+ sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
+
+ args = sys.argv[1:]
+ client = NCMailClient()
+
+ if not args:
+ print("사용법:")
+ print(" nc_mail.py unread [갯수]")
+ print(' nc_mail.py search "키워드"')
+ print(" nc_mail.py get ")
+ return
+
+ if args[0] == "unread":
+ limit = int(args[1]) if len(args) > 1 else 5
+ messages = await client.get_unread(limit)
+ print(f"📬 미확인 메일 {len(messages)}건:\n")
+ for i, m in enumerate(messages, 1):
+ print(f" {i}. {m.to_display()}")
+ print(f" 📅 {m.date}")
+ print()
+
+ elif args[0] == "search" and len(args) > 1:
+ query = " ".join(args[1:])
+ messages = await client.search(query)
+ print(f"🔍 '{query}' 검색 결과: {len(messages)}건\n")
+ for i, m in enumerate(messages, 1):
+ read = "📭" if m.is_read else "📬"
+ print(f" {i}. {read} [{m.uid}] {m.sender_name} — {m.subject}")
+ print(f" 📅 {m.date}")
+
+ elif args[0] == "get" and len(args) > 1:
+ uid = args[1]
+ msg = await client.get_message(uid)
+ if msg:
+ print(f"📧 {msg.subject}")
+ print(f" From: {msg.sender}")
+ print(f" Date: {msg.date}")
+ print(f"{'─' * 40}")
+ print(msg.body[:1000])
+ else:
+ print(f"❌ UID {uid} 메일을 찾을 수 없습니다.")
+
+
+if __name__ == "__main__":
+ asyncio.run(_cli())
diff --git a/tools/nextcloud_client.py b/tools/nextcloud_client.py
new file mode 100644
index 0000000..f4abff0
--- /dev/null
+++ b/tools/nextcloud_client.py
@@ -0,0 +1,430 @@
+r"""Nextcloud 공통 클라이언트 — WebDAV / OCS / CalDAV / CardDAV.
+
+모든 Nextcloud 연동 모듈(nc_files, nc_calendar, nc_mail, nc_contacts)의 기반.
+
+인증: App Password + Basic Auth
+API:
+ - WebDAV: remote.php/dav/files/{user}/
+ - OCS: ocs/v2.php/apps/...
+ - CalDAV: remote.php/dav/calendars/{user}/
+ - CardDAV: remote.php/dav/addressbooks/users/{user}/
+"""
+
+import asyncio
+import logging
+import sys
+import os
+from typing import Optional
+from xml.etree import ElementTree as ET
+
+import httpx
+
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
+import config
+
+logger = logging.getLogger("variet.tools.nextcloud")
+
+# XML 네임스페이스
+DAV_NS = "DAV:"
+OC_NS = "http://owncloud.org/ns"
+NC_NS = "http://nextcloud.org/ns"
+CAL_NS = "urn:ietf:params:xml:ns:caldav"
+CARD_NS = "urn:ietf:params:xml:ns:carddav"
+
+NS_MAP = {
+ "d": DAV_NS,
+ "oc": OC_NS,
+ "nc": NC_NS,
+ "cal": CAL_NS,
+ "card": CARD_NS,
+}
+
+
+class NextcloudClient:
+ """Nextcloud 공통 HTTP 클라이언트.
+
+ WebDAV, OCS, CalDAV, CardDAV 공통 기능 제공.
+ """
+
+ def __init__(
+ self,
+ base_url: str = None,
+ username: str = None,
+ app_password: str = None,
+ timeout: float = 30.0,
+ ):
+ self.base_url = (base_url or config.NEXTCLOUD_URL).rstrip("/")
+ self.username = username or config.NEXTCLOUD_USER
+ self.app_password = app_password or config.NEXTCLOUD_APP_PASSWORD
+ self.timeout = timeout
+ self._auth = (self.username, self.app_password)
+
+ def _client(self) -> httpx.AsyncClient:
+ """새 httpx 클라이언트 생성."""
+ return httpx.AsyncClient(
+ base_url=self.base_url,
+ auth=self._auth,
+ timeout=self.timeout,
+ headers={
+ "OCS-APIRequest": "true",
+ "Accept": "application/json",
+ },
+ )
+
+ # ──────────────────────────────────────
+ # OCS API (JSON)
+ # ──────────────────────────────────────
+
+ async def ocs_get(self, endpoint: str, params: dict = None) -> dict:
+ """OCS GET 요청 (JSON 응답)."""
+ url = f"/ocs/v2.php/{endpoint}"
+ if "format=json" not in url:
+ params = params or {}
+ params["format"] = "json"
+ async with self._client() as client:
+ resp = await client.get(url, params=params)
+ resp.raise_for_status()
+ return resp.json().get("ocs", {}).get("data", {})
+
+ async def ocs_post(self, endpoint: str, data: dict = None) -> dict:
+ """OCS POST 요청."""
+ url = f"/ocs/v2.php/{endpoint}"
+ async with self._client() as client:
+ resp = await client.post(
+ url,
+ data={**(data or {}), "format": "json"},
+ )
+ resp.raise_for_status()
+ return resp.json().get("ocs", {}).get("data", {})
+
+ async def ocs_delete(self, endpoint: str) -> bool:
+ """OCS DELETE 요청."""
+ url = f"/ocs/v2.php/{endpoint}"
+ async with self._client() as client:
+ resp = await client.delete(url, params={"format": "json"})
+ return resp.status_code in (200, 204)
+
+ # ──────────────────────────────────────
+ # WebDAV (XML)
+ # ──────────────────────────────────────
+
+ async def webdav_request(
+ self,
+ method: str,
+ path: str,
+ body: str = None,
+ headers: dict = None,
+ ) -> httpx.Response:
+ """WebDAV 요청 (PROPFIND, REPORT, SEARCH, PUT, DELETE 등)."""
+ url = f"/remote.php/dav/{path}"
+ req_headers = {"Content-Type": "application/xml; charset=utf-8"}
+ if headers:
+ req_headers.update(headers)
+
+ async with self._client() as client:
+ resp = await client.request(
+ method, url,
+ content=body.encode("utf-8") if body else None,
+ headers=req_headers,
+ )
+ return resp
+
+ async def webdav_propfind(
+ self, path: str, props: list[str] = None, depth: int = 1,
+ ) -> list[dict]:
+ """PROPFIND — 파일/디렉토리 속성 조회.
+
+ Returns:
+ [{href, name, size, lastmod, content_type, is_dir}, ...]
+ """
+ if props is None:
+ props = [
+ "d:getlastmodified",
+ "d:getcontentlength",
+ "d:getcontenttype",
+ "d:resourcetype",
+ "d:displayname",
+ "oc:fileid",
+ ]
+ props_xml = "\n".join(f"<{p}/>" for p in props)
+ body = f"""
+
+
+ {props_xml}
+
+"""
+
+ resp = await self.webdav_request(
+ "PROPFIND", path, body=body,
+ headers={"Depth": str(depth)},
+ )
+ resp.raise_for_status()
+ return self._parse_propfind(resp.text)
+
+ async def webdav_search(
+ self, query: str, path: str = None,
+ ) -> list[dict]:
+ """WebDAV SEARCH — 파일 검색.
+
+ Args:
+ query: 검색어 (파일명 매칭)
+ path: 검색 시작 경로 (기본: 사용자 루트)
+ """
+ search_path = path or f"files/{self.username}"
+ body = f"""
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ /remote.php/dav/{search_path}
+ infinity
+
+
+
+
+
+ %{query}%
+
+
+
+ 50
+
+
+"""
+
+ resp = await self.webdav_request("SEARCH", search_path, body=body)
+ if resp.status_code == 207:
+ return self._parse_propfind(resp.text)
+ logger.warning(f"WebDAV SEARCH 실패: {resp.status_code}")
+ return []
+
+ # ──────────────────────────────────────
+ # 파일 조작
+ # ──────────────────────────────────────
+
+ async def upload_file(
+ self, remote_path: str, data: bytes, content_type: str = "application/octet-stream",
+ ) -> bool:
+ """WebDAV PUT — 파일 업로드."""
+ url = f"files/{self.username}/{remote_path.lstrip('/')}"
+ async with self._client() as client:
+ resp = await client.put(
+ f"/remote.php/dav/{url}",
+ content=data,
+ headers={"Content-Type": content_type},
+ )
+ ok = resp.status_code in (200, 201, 204)
+ if ok:
+ logger.info(f"파일 업로드: {remote_path}")
+ else:
+ logger.error(f"업로드 실패: {resp.status_code} {resp.text[:200]}")
+ return ok
+
+ async def download_file(self, remote_path: str) -> Optional[bytes]:
+ """WebDAV GET — 파일 다운로드."""
+ url = f"files/{self.username}/{remote_path.lstrip('/')}"
+ async with self._client() as client:
+ resp = await client.get(f"/remote.php/dav/{url}")
+ if resp.status_code == 200:
+ return resp.content
+ logger.warning(f"다운로드 실패: {resp.status_code}")
+ return None
+
+ # ──────────────────────────────────────
+ # 공유 링크 (OCS Share API)
+ # ──────────────────────────────────────
+
+ async def create_share_link(
+ self, path: str, expire_days: int = 7,
+ ) -> Optional[str]:
+ """OCS 공유 링크 생성.
+
+ Returns:
+ 공유 URL 또는 None
+ """
+ data = {
+ "shareType": "3", # 3 = 공개 링크
+ "path": path,
+ }
+ if expire_days:
+ from datetime import datetime, timedelta
+ expire = (datetime.now() + timedelta(days=expire_days)).strftime("%Y-%m-%d")
+ data["expireDate"] = expire
+
+ result = await self.ocs_post("apps/files_sharing/api/v1/shares", data)
+ url = result.get("url")
+ if url:
+ logger.info(f"공유 링크 생성: {path} → {url}")
+ return url
+
+ async def delete_share(self, share_id: int) -> bool:
+ """OCS 공유 링크 삭제."""
+ return await self.ocs_delete(f"apps/files_sharing/api/v1/shares/{share_id}")
+
+ # ──────────────────────────────────────
+ # DAV (CalDAV / CardDAV 공통)
+ # ──────────────────────────────────────
+
+ async def dav_request(
+ self, method: str, path: str, body: str = None, headers: dict = None,
+ ) -> httpx.Response:
+ """CalDAV/CardDAV 요청 래퍼."""
+ return await self.webdav_request(method, path, body=body, headers=headers)
+
+ async def dav_put(
+ self, path: str, data: str, content_type: str,
+ ) -> httpx.Response:
+ """CalDAV/CardDAV PUT — 리소스 생성/수정."""
+ async with self._client() as client:
+ resp = await client.put(
+ f"/remote.php/dav/{path}",
+ content=data.encode("utf-8"),
+ headers={"Content-Type": content_type},
+ )
+ return resp
+
+ async def dav_delete(self, path: str) -> bool:
+ """CalDAV/CardDAV DELETE."""
+ async with self._client() as client:
+ resp = await client.delete(f"/remote.php/dav/{path}")
+ return resp.status_code in (200, 204)
+
+ # ──────────────────────────────────────
+ # XML 파싱 유틸
+ # ──────────────────────────────────────
+
+ @staticmethod
+ def _parse_propfind(xml_text: str) -> list[dict]:
+ """PROPFIND 응답 XML → dict 리스트."""
+ results = []
+ try:
+ root = ET.fromstring(xml_text)
+ except ET.ParseError:
+ logger.error("PROPFIND XML 파싱 실패")
+ return results
+
+ for response in root.findall(f"{{{DAV_NS}}}response"):
+ href_el = response.find(f"{{{DAV_NS}}}href")
+ if href_el is None:
+ continue
+ href = href_el.text or ""
+
+ propstat = response.find(f"{{{DAV_NS}}}propstat")
+ if propstat is None:
+ continue
+ prop = propstat.find(f"{{{DAV_NS}}}prop")
+ if prop is None:
+ continue
+
+ # 디렉토리 여부
+ rtype = prop.find(f"{{{DAV_NS}}}resourcetype")
+ is_dir = rtype is not None and rtype.find(f"{{{DAV_NS}}}collection") is not None
+
+ # 이름 추출
+ displayname = prop.find(f"{{{DAV_NS}}}displayname")
+ name = displayname.text if displayname is not None and displayname.text else href.rstrip("/").split("/")[-1]
+
+ # 크기
+ size_el = prop.find(f"{{{DAV_NS}}}getcontentlength")
+ size = int(size_el.text) if size_el is not None and size_el.text else 0
+
+ # 수정시간
+ lastmod_el = prop.find(f"{{{DAV_NS}}}getlastmodified")
+ lastmod = lastmod_el.text if lastmod_el is not None else ""
+
+ # 콘텐츠 타입
+ ctype_el = prop.find(f"{{{DAV_NS}}}getcontenttype")
+ content_type = ctype_el.text if ctype_el is not None else ""
+
+ results.append({
+ "href": href,
+ "name": name,
+ "size": size,
+ "lastmod": lastmod,
+ "content_type": content_type,
+ "is_dir": is_dir,
+ })
+
+ return results
+
+ # ──────────────────────────────────────
+ # 헬스체크
+ # ──────────────────────────────────────
+
+ async def health_check(self) -> bool:
+ """Nextcloud 연결 확인."""
+ try:
+ path = f"files/{self.username}"
+ results = await self.webdav_propfind(path, depth=0)
+ ok = len(results) > 0
+ if ok:
+ logger.info("Nextcloud 연결 확인 OK")
+ return ok
+ except Exception as e:
+ logger.error(f"Nextcloud 연결 실패: {e}")
+ return False
+
+
+# ──────────────────────────────────────
+# CLI
+# ──────────────────────────────────────
+
+async def _cli():
+ import io
+ if sys.stdout.encoding != "utf-8":
+ sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
+
+ args = sys.argv[1:]
+ client = NextcloudClient()
+
+ if not args or args[0] == "check":
+ ok = await client.health_check()
+ print(f"{'✅ 연결 성공' if ok else '❌ 연결 실패'}")
+
+ elif args[0] == "ls" and len(args) > 1:
+ path = f"files/{client.username}/{args[1].lstrip('/')}"
+ items = await client.webdav_propfind(path)
+ print(f"📂 {args[1]} — {len(items)}건:")
+ for item in items[1:]: # 첫 번째는 자기 자신
+ icon = "📁" if item["is_dir"] else "📄"
+ size = f" ({item['size']:,}B)" if not item["is_dir"] else ""
+ print(f" {icon} {item['name']}{size}")
+
+ elif args[0] == "search" and len(args) > 1:
+ query = " ".join(args[1:])
+ results = await client.webdav_search(query)
+ print(f"🔍 '{query}' 검색 결과: {len(results)}건")
+ for item in results:
+ icon = "📁" if item["is_dir"] else "📄"
+ size = f" ({item['size']:,}B)" if not item["is_dir"] else ""
+ print(f" {icon} {item['name']}{size}")
+ print(f" {item['href']}")
+
+ elif args[0] == "share" and len(args) > 1:
+ path = args[1]
+ url = await client.create_share_link(path)
+ if url:
+ print(f"🔗 {url}")
+ else:
+ print("❌ 공유 링크 생성 실패")
+
+ else:
+ print("사용법:")
+ print(" nextcloud_client.py check")
+ print(" nextcloud_client.py ls ")
+ print(" nextcloud_client.py search ")
+ print(" nextcloud_client.py share ")
+
+
+if __name__ == "__main__":
+ asyncio.run(_cli())