feat(nextcloud): Nextcloud 4모듈 + NC핸들러 + AI Foreman v0.1

- tools/nextcloud_client.py: WebDAV/OCS/CalDAV/CardDAV 공통 클라이언트
- tools/nc_files.py: 파일 검색/목록/최근/공유링크
- tools/nc_calendar.py: CalDAV 일정 CRUD + ICS 빌더
- tools/nc_mail.py: IMAP 메일 조회 (PLAIN auth for Mailcow)
- tools/nc_contacts.py: CardDAV 연락처 + EasyOCR 명함 스캔
- handlers/nc_handler.py: 자연어→NC도구 자동 라우팅
- core/foreman.py: 목표 분해 + 상담 세션 + Vikunja 등록
- prompts/foreman.md: Foreman 시스템 프롬프트
- prompts/unified.md: nextcloud 모드 분류 추가
- config.py: .env 따옴표 파싱 버그 수정
- api/discord_bot.py: /goal 커맨드 + Foreman 스레드 라우팅
This commit is contained in:
2026-03-18 17:25:27 +09:00
parent aae9c188eb
commit d22493125c
14 changed files with 2709 additions and 2 deletions

454
tools/nc_calendar.py Normal file
View File

@@ -0,0 +1,454 @@
r"""Nextcloud Calendar (CalDAV) 모듈 — 일정 CRUD.
사용자 명령 기반 일정 조회/추가/수정/삭제.
CalDAV 프로토콜로 iCalendar (VEVENT) 객체를 관리.
"""
import asyncio
import logging
import os
import re
import sys
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import config
from tools.nextcloud_client import NextcloudClient, DAV_NS, CAL_NS
logger = logging.getLogger("variet.tools.nc_calendar")
@dataclass
class CalEvent:
"""캘린더 이벤트."""
uid: str
summary: str
dtstart: datetime
dtend: Optional[datetime] = None
location: str = ""
description: str = ""
href: str = "" # CalDAV 리소스 경로
calendar: str = "" # 소속 캘린더 이름
raw_ics: str = "" # 원본 ICS 데이터
@property
def duration_str(self) -> str:
if self.dtend and self.dtstart:
delta = self.dtend - self.dtstart
hours = delta.seconds // 3600
mins = (delta.seconds % 3600) // 60
if hours > 0:
return f"{hours}시간 {mins}" if mins else f"{hours}시간"
return f"{mins}"
return ""
@property
def date_str(self) -> str:
return self.dtstart.strftime("%Y-%m-%d")
@property
def time_str(self) -> str:
return self.dtstart.strftime("%H:%M")
def to_display(self) -> str:
"""Discord embed용 표시 문자열."""
time = self.time_str
dur = f" ({self.duration_str})" if self.duration_str else ""
loc = f" 📍{self.location}" if self.location else ""
return f"{time}{dur} — **{self.summary}**{loc}"
class NCCalendarClient:
"""Nextcloud CalDAV 캘린더 클라이언트."""
DEFAULT_CALENDAR = "personal"
def __init__(self, nc_client: NextcloudClient = None):
self.nc = nc_client or NextcloudClient()
def _calendar_path(self, calendar: str = None) -> str:
cal = calendar or self.DEFAULT_CALENDAR
return f"calendars/{self.nc.username}/{cal}"
# ──────────────────────────────────────
# 캘린더 목록
# ──────────────────────────────────────
async def list_calendars(self) -> list[str]:
"""사용자 캘린더 목록 조회."""
path = f"calendars/{self.nc.username}/"
resp = await self.nc.dav_request(
"PROPFIND", path,
body="""<?xml version="1.0" encoding="UTF-8"?>
<d:propfind xmlns:d="DAV:" xmlns:cal="urn:ietf:params:xml:ns:caldav">
<d:prop>
<d:displayname/>
<d:resourcetype/>
</d:prop>
</d:propfind>""",
headers={"Depth": "1"},
)
calendars = []
if resp.status_code == 207:
from xml.etree import ElementTree as ET
root = ET.fromstring(resp.text)
for r in root.findall(f"{{{DAV_NS}}}response"):
href = r.find(f"{{{DAV_NS}}}href")
propstat = r.find(f"{{{DAV_NS}}}propstat")
if propstat is None:
continue
prop = propstat.find(f"{{{DAV_NS}}}prop")
if prop is None:
continue
rtype = prop.find(f"{{{DAV_NS}}}resourcetype")
if rtype is not None and rtype.find(f"{{{CAL_NS}}}calendar") is not None:
dn = prop.find(f"{{{DAV_NS}}}displayname")
name = dn.text if dn is not None and dn.text else href.text.rstrip("/").split("/")[-1]
calendars.append(name)
return calendars
# ──────────────────────────────────────
# 이벤트 조회
# ──────────────────────────────────────
async def get_events(
self,
start: datetime = None,
end: datetime = None,
calendar: str = None,
) -> list[CalEvent]:
"""기간 내 이벤트 조회.
Args:
start: 시작일 (기본: 오늘 00:00)
end: 종료일 (기본: start + 7일)
calendar: 캘린더 이름 (기본: personal)
"""
if start is None:
start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
if end is None:
end = start + timedelta(days=7)
start_str = start.strftime("%Y%m%dT%H%M%SZ")
end_str = end.strftime("%Y%m%dT%H%M%SZ")
body = f"""<?xml version="1.0" encoding="UTF-8"?>
<cal:calendar-query xmlns:d="DAV:" xmlns:cal="urn:ietf:params:xml:ns:caldav">
<d:prop>
<d:getetag/>
<cal:calendar-data/>
</d:prop>
<cal:filter>
<cal:comp-filter name="VCALENDAR">
<cal:comp-filter name="VEVENT">
<cal:time-range start="{start_str}" end="{end_str}"/>
</cal:comp-filter>
</cal:comp-filter>
</cal:filter>
</cal:calendar-query>"""
path = self._calendar_path(calendar)
resp = await self.nc.dav_request(
"REPORT", path, body=body,
headers={"Depth": "1"},
)
events = []
if resp.status_code == 207:
from xml.etree import ElementTree as ET
root = ET.fromstring(resp.text)
for r in root.findall(f"{{{DAV_NS}}}response"):
href_el = r.find(f"{{{DAV_NS}}}href")
href = href_el.text if href_el is not None else ""
propstat = r.find(f"{{{DAV_NS}}}propstat")
if propstat is None:
continue
prop = propstat.find(f"{{{DAV_NS}}}prop")
if prop is None:
continue
cal_data = prop.find(f"{{{CAL_NS}}}calendar-data")
if cal_data is not None and cal_data.text:
event = self._parse_ics(cal_data.text, href, calendar or self.DEFAULT_CALENDAR)
if event:
events.append(event)
events.sort(key=lambda e: e.dtstart)
logger.info(f"캘린더 이벤트 조회: {len(events)}건 ({start.date()} ~ {end.date()})")
return events
async def get_today(self) -> list[CalEvent]:
"""오늘 일정 조회."""
now = datetime.now()
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
end = start + timedelta(days=1)
return await self.get_events(start, end)
async def get_week(self) -> list[CalEvent]:
"""이번주 일정 조회."""
now = datetime.now()
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
# 이번주 월요일부터
start = start - timedelta(days=start.weekday())
end = start + timedelta(days=7)
return await self.get_events(start, end)
# ──────────────────────────────────────
# 이벤트 생성
# ──────────────────────────────────────
async def create_event(
self,
summary: str,
dtstart: datetime,
dtend: datetime = None,
location: str = "",
description: str = "",
calendar: str = None,
) -> CalEvent:
"""이벤트 생성.
Args:
summary: 제목
dtstart: 시작 시간
dtend: 종료 시간 (기본: 시작 + 1시간)
location: 장소
description: 설명
calendar: 캘린더 이름
"""
uid = f"{uuid.uuid4()}@variet-agent"
if dtend is None:
dtend = dtstart + timedelta(hours=1)
ics = self._build_ics(uid, summary, dtstart, dtend, location, description)
path = f"{self._calendar_path(calendar)}/{uid}.ics"
resp = await self.nc.dav_put(path, ics, "text/calendar; charset=utf-8")
if resp.status_code in (200, 201, 204):
logger.info(f"일정 생성: {summary} ({dtstart})")
return CalEvent(
uid=uid, summary=summary, dtstart=dtstart, dtend=dtend,
location=location, description=description, href=path,
calendar=calendar or self.DEFAULT_CALENDAR, raw_ics=ics,
)
else:
raise RuntimeError(f"일정 생성 실패: {resp.status_code} {resp.text[:200]}")
# ──────────────────────────────────────
# 이벤트 수정
# ──────────────────────────────────────
async def update_event(
self,
uid: str,
calendar: str = None,
summary: str = None,
dtstart: datetime = None,
dtend: datetime = None,
location: str = None,
description: str = None,
) -> CalEvent:
"""이벤트 수정 — 기존 이벤트를 찾아서 변경."""
# 먼저 기존 이벤트를 찾음
events = await self.get_events(
start=datetime(2020, 1, 1),
end=datetime(2030, 12, 31),
calendar=calendar,
)
target = None
for e in events:
if e.uid == uid:
target = e
break
if target is None:
raise RuntimeError(f"이벤트 {uid}를 찾을 수 없습니다.")
# 변경 적용
new_summary = summary if summary is not None else target.summary
new_dtstart = dtstart if dtstart is not None else target.dtstart
new_dtend = dtend if dtend is not None else target.dtend
new_location = location if location is not None else target.location
new_description = description if description is not None else target.description
ics = self._build_ics(uid, new_summary, new_dtstart, new_dtend, new_location, new_description)
path = target.href or f"{self._calendar_path(calendar)}/{uid}.ics"
resp = await self.nc.dav_put(path, ics, "text/calendar; charset=utf-8")
if resp.status_code in (200, 201, 204):
logger.info(f"일정 수정: {new_summary}")
return CalEvent(
uid=uid, summary=new_summary, dtstart=new_dtstart, dtend=new_dtend,
location=new_location, description=new_description, href=path,
calendar=calendar or self.DEFAULT_CALENDAR,
)
else:
raise RuntimeError(f"일정 수정 실패: {resp.status_code}")
# ──────────────────────────────────────
# 이벤트 삭제
# ──────────────────────────────────────
async def delete_event(self, uid: str, calendar: str = None) -> bool:
"""이벤트 삭제."""
# href 직접 삭제 시도
path = f"{self._calendar_path(calendar)}/{uid}.ics"
ok = await self.nc.dav_delete(path)
if ok:
logger.info(f"일정 삭제: {uid}")
else:
logger.warning(f"일정 삭제 실패: {uid}")
return ok
# ──────────────────────────────────────
# ICS 빌드/파싱
# ──────────────────────────────────────
@staticmethod
def _build_ics(
uid: str, summary: str,
dtstart: datetime, dtend: datetime,
location: str = "", description: str = "",
) -> str:
"""VEVENT ICS 문자열 생성."""
now = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
ds = dtstart.strftime("%Y%m%dT%H%M%S")
de = dtend.strftime("%Y%m%dT%H%M%S")
lines = [
"BEGIN:VCALENDAR",
"VERSION:2.0",
"PRODID:-//Variet-Agent//NC Calendar//KO",
"BEGIN:VEVENT",
f"UID:{uid}",
f"DTSTAMP:{now}",
f"DTSTART:{ds}",
f"DTEND:{de}",
f"SUMMARY:{summary}",
]
if location:
lines.append(f"LOCATION:{location}")
if description:
lines.append(f"DESCRIPTION:{description}")
lines.extend([
"END:VEVENT",
"END:VCALENDAR",
])
return "\r\n".join(lines)
@staticmethod
def _parse_ics(ics_text: str, href: str = "", calendar: str = "") -> Optional[CalEvent]:
"""ICS 텍스트에서 CalEvent 추출 (간단 파서)."""
def _get_field(text: str, field: str) -> str:
pattern = rf"^{field}[;:](.+)$"
m = re.search(pattern, text, re.MULTILINE)
return m.group(1).strip() if m else ""
def _parse_dt(val: str) -> Optional[datetime]:
if not val:
return None
# VALUE=DATE:20260318 형식 처리
if "VALUE=DATE:" in val:
val = val.split(":")[-1]
elif ":" in val:
val = val.split(":")[-1]
val = val.replace("Z", "")
for fmt in ("%Y%m%dT%H%M%S", "%Y%m%d"):
try:
return datetime.strptime(val, fmt)
except ValueError:
continue
return None
uid = _get_field(ics_text, "UID")
summary = _get_field(ics_text, "SUMMARY")
dtstart = _parse_dt(_get_field(ics_text, "DTSTART"))
dtend = _parse_dt(_get_field(ics_text, "DTEND"))
location = _get_field(ics_text, "LOCATION")
description = _get_field(ics_text, "DESCRIPTION")
if not uid or not dtstart:
return None
return CalEvent(
uid=uid, summary=summary, dtstart=dtstart, dtend=dtend,
location=location, description=description,
href=href, calendar=calendar, raw_ics=ics_text,
)
# ──────────────────────────────────────
# CLI
# ──────────────────────────────────────
async def _cli():
import io
if sys.stdout.encoding != "utf-8":
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
args = sys.argv[1:]
client = NCCalendarClient()
if not args:
print("사용법:")
print(" nc_calendar.py calendars")
print(" nc_calendar.py today")
print(" nc_calendar.py week")
print(' nc_calendar.py add "제목" 2026-03-20 14:00 [1h]')
print(' nc_calendar.py delete <uid>')
return
if args[0] == "calendars":
cals = await client.list_calendars()
print(f"📅 캘린더 {len(cals)}개:")
for c in cals:
print(f"{c}")
elif args[0] == "today":
events = await client.get_today()
print(f"📅 오늘 일정 ({datetime.now().strftime('%Y-%m-%d')}):\n")
if not events:
print(" 일정 없음")
for e in events:
print(f" {e.to_display()}")
elif args[0] == "week":
events = await client.get_week()
print(f"📅 이번주 일정:\n")
current_date = ""
for e in events:
if e.date_str != current_date:
current_date = e.date_str
print(f"\n 📆 {current_date}")
print(f" {e.to_display()}")
elif args[0] == "add" and len(args) >= 3:
summary = args[1]
date_str = args[2]
time_str = args[3] if len(args) > 3 else "09:00"
duration = args[4] if len(args) > 4 else "1h"
dtstart = datetime.strptime(f"{date_str} {time_str}", "%Y-%m-%d %H:%M")
# duration 파싱
dur_match = re.match(r"(\d+)([hm])", duration)
if dur_match:
val, unit = int(dur_match.group(1)), dur_match.group(2)
delta = timedelta(hours=val) if unit == "h" else timedelta(minutes=val)
else:
delta = timedelta(hours=1)
dtend = dtstart + delta
event = await client.create_event(summary, dtstart, dtend)
print(f"✅ 일정 생성: {event.to_display()}")
print(f" UID: {event.uid}")
elif args[0] == "delete" and len(args) > 1:
uid = args[1]
ok = await client.delete_event(uid)
print(f"{'✅ 삭제 완료' if ok else '❌ 삭제 실패'}: {uid}")
if __name__ == "__main__":
asyncio.run(_cli())

436
tools/nc_contacts.py Normal file
View File

@@ -0,0 +1,436 @@
r"""Nextcloud Contacts + 명함 스캔 모듈.
EasyOCR로 명함 이미지 텍스트 추출 → 정규식 필드 분류 → vCard 생성 → CardDAV 저장.
연락처 검색/조회 기능 포함.
"""
import asyncio
import logging
import os
import re
import sys
import uuid
from dataclasses import dataclass, field
from typing import Optional
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import config
from tools.nextcloud_client import NextcloudClient, DAV_NS, CARD_NS
logger = logging.getLogger("variet.tools.nc_contacts")
@dataclass
class Contact:
"""연락처 정보."""
uid: str = ""
name: str = ""
company: str = ""
title: str = "" # 직함
phone: list[str] = field(default_factory=list)
email: list[str] = field(default_factory=list)
address: str = ""
note: str = ""
href: str = "" # CardDAV 리소스 경로
def to_vcard(self) -> str:
"""vCard 3.0 포맷 생성."""
uid = self.uid or str(uuid.uuid4())
lines = [
"BEGIN:VCARD",
"VERSION:3.0",
f"UID:{uid}",
f"FN:{self.name}",
f"N:{self.name};;;;",
]
if self.company:
lines.append(f"ORG:{self.company}")
if self.title:
lines.append(f"TITLE:{self.title}")
for p in self.phone:
lines.append(f"TEL;TYPE=CELL:{p}")
for e in self.email:
lines.append(f"EMAIL:{e}")
if self.address:
lines.append(f"ADR;TYPE=WORK:;;{self.address};;;;")
if self.note:
lines.append(f"NOTE:{self.note}")
lines.append("END:VCARD")
return "\r\n".join(lines)
def to_display(self) -> str:
"""Discord embed용 표시."""
parts = [f"👤 **{self.name}**"]
if self.title and self.company:
parts.append(f" 🏢 {self.company} / {self.title}")
elif self.company:
parts.append(f" 🏢 {self.company}")
elif self.title:
parts.append(f" 💼 {self.title}")
for p in self.phone:
parts.append(f" 📞 {p}")
for e in self.email:
parts.append(f" 📧 {e}")
if self.address:
parts.append(f" 📍 {self.address}")
return "\n".join(parts)
class NCContactsClient:
"""Nextcloud CardDAV 연락처 + 명함 스캔 클라이언트."""
DEFAULT_ADDRESSBOOK = "contacts"
def __init__(self, nc_client: NextcloudClient = None):
self.nc = nc_client or NextcloudClient()
self._ocr = None
def _addressbook_path(self, addressbook: str = None) -> str:
ab = addressbook or self.DEFAULT_ADDRESSBOOK
return f"addressbooks/users/{self.nc.username}/{ab}"
# ──────────────────────────────────────
# OCR (EasyOCR)
# ──────────────────────────────────────
def _get_ocr(self):
"""EasyOCR Reader 지연 로딩."""
if self._ocr is None:
try:
import easyocr
self._ocr = easyocr.Reader(["ko", "en", "ja"], gpu=False)
logger.info("EasyOCR 초기화 완료 (ko, en, ja)")
except ImportError:
raise RuntimeError(
"easyocr 미설치. pip install easyocr 실행 필요."
)
return self._ocr
async def scan_card(self, image_path: str) -> Contact:
"""명함 이미지 → 연락처 정보 추출.
Args:
image_path: 이미지 파일 경로 또는 URL
Returns:
추출된 Contact 객체
"""
def _ocr_extract():
reader = self._get_ocr()
results = reader.readtext(image_path)
# (bbox, text, confidence) 리스트
texts = [text for _, text, conf in results if conf > 0.3]
return texts
texts = await asyncio.to_thread(_ocr_extract)
logger.info(f"명함 OCR: {len(texts)}줄 추출")
return self._classify_fields(texts)
@staticmethod
def _classify_fields(texts: list[str]) -> Contact:
"""OCR 텍스트 → 구조화된 Contact로 분류.
정규식으로 이메일, 전화번호, URL 등을 먼저 분류하고
나머지 텍스트에서 이름/회사/직함을 추론.
"""
contact = Contact(uid=str(uuid.uuid4()))
remaining = []
for text in texts:
text = text.strip()
if not text:
continue
# 이메일
email_match = re.search(r"[\w.+-]+@[\w.-]+\.\w+", text)
if email_match:
contact.email.append(email_match.group())
continue
# 전화번호 (한국/국제)
phone_match = re.search(
r"(?:\+?\d{1,3}[-.\s]?)?"
r"(?:\(?\d{2,4}\)?[-.\s]?)?"
r"\d{3,4}[-.\s]?\d{3,4}",
text,
)
if phone_match and len(phone_match.group().replace("-", "").replace(" ", "").replace(".", "")) >= 8:
contact.phone.append(phone_match.group())
continue
# URL (웹사이트) → note에 저장
if re.match(r"(https?://|www\.)", text, re.IGNORECASE):
contact.note += f" {text}"
continue
# 주소 패턴 (번지, 로, 길, 동, 층 등)
if re.search(r"(시|구|동|로|길|층|호|번지|타워|빌딩|센터)", text):
contact.address = text
continue
remaining.append(text)
# 나머지 텍스트 분류 (이름, 회사, 직함)
if remaining:
# 보통 명함에서 가장 큰 텍스트(첫 번째)가 이름
contact.name = remaining[0]
for text in remaining[1:]:
# 직함 키워드
if re.search(r"(대표|이사|부장|과장|차장|팀장|사원|매니저|Director|Manager|CEO|CTO|VP|Engineer|Developer)", text, re.IGNORECASE):
contact.title = text
elif not contact.company:
# 회사명 (나머지 중 첫 번째)
contact.company = text
contact.note = contact.note.strip()
return contact
# ──────────────────────────────────────
# 연락처 저장 (CardDAV)
# ──────────────────────────────────────
async def create_contact(self, contact: Contact, addressbook: str = None) -> Contact:
"""연락처를 Nextcloud에 저장.
Args:
contact: Contact 객체
addressbook: 주소록 이름
Returns:
저장된 Contact (href 포함)
"""
if not contact.uid:
contact.uid = str(uuid.uuid4())
vcard = contact.to_vcard()
path = f"{self._addressbook_path(addressbook)}/{contact.uid}.vcf"
resp = await self.nc.dav_put(path, vcard, "text/vcard; charset=utf-8")
if resp.status_code in (200, 201, 204):
contact.href = path
logger.info(f"연락처 저장: {contact.name}")
return contact
else:
raise RuntimeError(f"연락처 저장 실패: {resp.status_code} {resp.text[:200]}")
# ──────────────────────────────────────
# 연락처 검색/조회
# ──────────────────────────────────────
async def search_contacts(self, query: str, addressbook: str = None) -> list[Contact]:
"""연락처 검색.
Args:
query: 검색어 (이름, 이메일, 전화번호)
"""
body = f"""<?xml version="1.0" encoding="UTF-8"?>
<card:addressbook-query xmlns:d="DAV:" xmlns:card="urn:ietf:params:xml:ns:carddav">
<d:prop>
<d:getetag/>
<card:address-data/>
</d:prop>
<card:filter>
<card:prop-filter name="FN">
<card:text-match collation="i;unicode-casemap" match-type="contains">{query}</card:text-match>
</card:prop-filter>
</card:filter>
</card:addressbook-query>"""
path = self._addressbook_path(addressbook)
resp = await self.nc.dav_request(
"REPORT", path, body=body,
headers={"Depth": "1"},
)
contacts = []
if resp.status_code == 207:
from xml.etree import ElementTree as ET
root = ET.fromstring(resp.text)
for r in root.findall(f"{{{DAV_NS}}}response"):
href_el = r.find(f"{{{DAV_NS}}}href")
href = href_el.text if href_el is not None else ""
propstat = r.find(f"{{{DAV_NS}}}propstat")
if propstat is None:
continue
prop = propstat.find(f"{{{DAV_NS}}}prop")
if prop is None:
continue
addr_data = prop.find(f"{{{CARD_NS}}}address-data")
if addr_data is not None and addr_data.text:
contact = self._parse_vcard(addr_data.text, href)
if contact:
contacts.append(contact)
logger.info(f"연락처 검색 '{query}': {len(contacts)}")
return contacts
async def list_contacts(self, addressbook: str = None) -> list[Contact]:
"""전체 연락처 목록."""
path = self._addressbook_path(addressbook)
body = """<?xml version="1.0" encoding="UTF-8"?>
<card:addressbook-query xmlns:d="DAV:" xmlns:card="urn:ietf:params:xml:ns:carddav">
<d:prop>
<d:getetag/>
<card:address-data/>
</d:prop>
</card:addressbook-query>"""
resp = await self.nc.dav_request(
"REPORT", path, body=body,
headers={"Depth": "1"},
)
contacts = []
if resp.status_code == 207:
from xml.etree import ElementTree as ET
root = ET.fromstring(resp.text)
for r in root.findall(f"{{{DAV_NS}}}response"):
href_el = r.find(f"{{{DAV_NS}}}href")
href = href_el.text if href_el is not None else ""
propstat = r.find(f"{{{DAV_NS}}}propstat")
if propstat is None:
continue
prop = propstat.find(f"{{{DAV_NS}}}prop")
if prop is None:
continue
addr_data = prop.find(f"{{{CARD_NS}}}address-data")
if addr_data is not None and addr_data.text:
contact = self._parse_vcard(addr_data.text, href)
if contact:
contacts.append(contact)
contacts.sort(key=lambda c: c.name)
return contacts
# ──────────────────────────────────────
# vCard 파싱
# ──────────────────────────────────────
@staticmethod
def _parse_vcard(vcard_text: str, href: str = "") -> Optional[Contact]:
"""vCard 텍스트 → Contact 객체."""
def _get(text: str, field: str) -> str:
pattern = rf"^{field}[;:](.+)$"
m = re.search(pattern, text, re.MULTILINE)
return m.group(1).strip() if m else ""
def _get_all(text: str, field: str) -> list[str]:
pattern = rf"^{field}[;:](.+)$"
return [m.group(1).strip() for m in re.finditer(pattern, text, re.MULTILINE)]
uid = _get(vcard_text, "UID")
fn = _get(vcard_text, "FN")
org = _get(vcard_text, "ORG")
title = _get(vcard_text, "TITLE")
note = _get(vcard_text, "NOTE")
phones = _get_all(vcard_text, "TEL")
# TEL;TYPE=CELL:010-... → 번호만 추출
phones = [re.sub(r"^.*:", "", p) if ":" in p else p for p in phones]
emails = _get_all(vcard_text, "EMAIL")
emails = [re.sub(r"^.*:", "", e) if ":" in e else e for e in emails]
adr = _get(vcard_text, "ADR")
# ADR;TYPE=WORK:;;... → 주소만 추출
if ";" in adr:
parts = adr.split(";")
adr = " ".join(p for p in parts if p).strip()
if not fn and not uid:
return None
return Contact(
uid=uid, name=fn, company=org, title=title,
phone=phones, email=emails, address=adr, note=note,
href=href,
)
# ──────────────────────────────────────
# 명함 스캔 + 저장 통합
# ──────────────────────────────────────
async def scan_and_save(
self,
image_path: str,
save_image: bool = True,
addressbook: str = None,
) -> Contact:
"""명함 스캔 → 연락처 저장 → 원본 이미지도 NC에 보관.
Args:
image_path: 이미지 파일 경로
save_image: 원본 이미지를 NC Files에도 저장할지
addressbook: 주소록 이름
"""
# 1. OCR
contact = await self.scan_card(image_path)
# 2. CardDAV 저장
contact = await self.create_contact(contact, addressbook)
# 3. 원본 이미지도 NC Files에 저장 (선택)
if save_image and os.path.exists(image_path):
try:
with open(image_path, "rb") as f:
img_data = f.read()
remote_name = f"BusinessCards/{contact.uid}.jpg"
await self.nc.upload_file(remote_name, img_data, "image/jpeg")
logger.info(f"명함 이미지 저장: {remote_name}")
except Exception as e:
logger.warning(f"명함 이미지 저장 실패: {e}")
return contact
# ──────────────────────────────────────
# CLI
# ──────────────────────────────────────
async def _cli():
import io
if sys.stdout.encoding != "utf-8":
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
args = sys.argv[1:]
client = NCContactsClient()
if not args:
print("사용법:")
print(' nc_contacts.py scan <이미지경로>')
print(' nc_contacts.py search "이름"')
print(' nc_contacts.py list')
return
if args[0] == "scan" and len(args) > 1:
image_path = args[1]
contact = await client.scan_card(image_path)
print(f"📇 명함 스캔 결과:\n")
print(contact.to_display())
print(f"\n💾 저장하려면: nc_contacts.py save <이미지경로>")
elif args[0] == "save" and len(args) > 1:
image_path = args[1]
contact = await client.scan_and_save(image_path)
print(f"✅ 연락처 저장 완료:\n")
print(contact.to_display())
elif args[0] == "search" and len(args) > 1:
query = " ".join(args[1:])
contacts = await client.search_contacts(query)
print(f"🔍 '{query}' 검색 결과: {len(contacts)}\n")
for c in contacts:
print(c.to_display())
print()
elif args[0] == "list":
contacts = await client.list_contacts()
print(f"📇 전체 연락처: {len(contacts)}\n")
for c in contacts:
print(f" 👤 {c.name}{', '.join(c.phone) or '전화 없음'}{', '.join(c.email) or '이메일 없음'}")
if __name__ == "__main__":
asyncio.run(_cli())

245
tools/nc_files.py Normal file
View File

@@ -0,0 +1,245 @@
r"""Nextcloud Files 검색/관리 모듈.
파일 검색, 최근 파일 조회, 공유 링크 생성 등.
업로드는 NC 앱에서 직접 수행 — 봇은 검색/관리/링크 역할.
"""
import asyncio
import logging
import os
import sys
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import config
from tools.nextcloud_client import NextcloudClient
logger = logging.getLogger("variet.tools.nc_files")
@dataclass
class NCFile:
"""Nextcloud 파일 정보."""
name: str
path: str # 사용자 상대 경로 (예: Documents/세금/...)
size: int = 0
lastmod: str = ""
content_type: str = ""
is_dir: bool = False
@property
def size_human(self) -> str:
"""사람이 읽기 쉬운 크기 표시."""
if self.size < 1024:
return f"{self.size}B"
elif self.size < 1024 * 1024:
return f"{self.size / 1024:.1f}KB"
elif self.size < 1024 * 1024 * 1024:
return f"{self.size / (1024 * 1024):.1f}MB"
else:
return f"{self.size / (1024 * 1024 * 1024):.1f}GB"
@property
def icon(self) -> str:
if self.is_dir:
return "📁"
ct = self.content_type or ""
if "image" in ct:
return "🖼️"
elif "pdf" in ct:
return "📕"
elif "video" in ct:
return "🎬"
elif "audio" in ct:
return "🎵"
elif "zip" in ct or "tar" in ct or "rar" in ct:
return "📦"
elif "spreadsheet" in ct or "excel" in ct:
return "📊"
elif "document" in ct or "word" in ct:
return "📝"
return "📄"
class NCFilesClient:
"""Nextcloud Files 검색/관리 클라이언트."""
def __init__(self, nc_client: NextcloudClient = None):
self.nc = nc_client or NextcloudClient()
def _href_to_user_path(self, href: str) -> str:
"""WebDAV href → 사용자 상대 경로."""
# href: /remote.php/dav/files/username/Documents/...
marker = f"/files/{self.nc.username}/"
idx = href.find(marker)
if idx >= 0:
return href[idx + len(marker):].rstrip("/")
return href.rstrip("/").split("/")[-1]
def _to_ncfile(self, item: dict) -> NCFile:
"""propfind 결과 dict → NCFile."""
return NCFile(
name=item.get("name", ""),
path=self._href_to_user_path(item.get("href", "")),
size=item.get("size", 0),
lastmod=item.get("lastmod", ""),
content_type=item.get("content_type", ""),
is_dir=item.get("is_dir", False),
)
# ──────────────────────────────────────
# 검색
# ──────────────────────────────────────
async def search(self, query: str) -> list[NCFile]:
"""파일 검색 (이름 기준).
Args:
query: 검색어 (부분 일치)
Returns:
매칭된 NCFile 리스트
"""
results = await self.nc.webdav_search(query)
files = [self._to_ncfile(r) for r in results]
# 디렉토리보다 파일 우선, 최신순
files.sort(key=lambda f: (f.is_dir, f.name))
logger.info(f"파일 검색 '{query}': {len(files)}")
return files
# ──────────────────────────────────────
# 목록
# ──────────────────────────────────────
async def list_dir(self, path: str = "") -> list[NCFile]:
"""디렉토리 목록 조회.
Args:
path: 사용자 상대 경로 (빈 문자열 = 루트)
"""
dav_path = f"files/{self.nc.username}/{path.lstrip('/')}"
results = await self.nc.webdav_propfind(dav_path)
# 첫 번째는 자기 자신 → 제외
files = [self._to_ncfile(r) for r in results[1:]]
files.sort(key=lambda f: (not f.is_dir, f.name.lower()))
return files
async def list_recent(self, limit: int = 10) -> list[NCFile]:
"""최근 수정된 파일 목록.
PROPFIND depth=infinity는 부하가 크므로
OCS activity API나 search를 활용.
"""
# 대안: WebDAV SEARCH로 최근 수정 파일 검색
# 현재는 루트 1단계만 조회하여 lastmod 정렬
dav_path = f"files/{self.nc.username}"
# depth=infinity는 서버 부하 → 대신 OCS favorite 또는 search 활용
# 간단하게: search로 '*' 전체 검색 후 정렬
results = await self.nc.webdav_search("*")
files = [self._to_ncfile(r) for r in results if not r.get("is_dir")]
# lastmod 기준 정렬 (최신 먼저)
files.sort(key=lambda f: f.lastmod, reverse=True)
return files[:limit]
# ──────────────────────────────────────
# 공유 링크
# ──────────────────────────────────────
async def create_link(self, path: str, expire_days: int = 7) -> Optional[str]:
"""파일/폴더의 공유 링크 생성.
Args:
path: 사용자 상대 경로
expire_days: 만료 일수 (0 = 만료 없음)
Returns:
공유 URL 또는 None
"""
# OCS Share API는 / 시작 경로 필요
share_path = f"/{path.lstrip('/')}"
return await self.nc.create_share_link(share_path, expire_days)
# ──────────────────────────────────────
# 용량 분석
# ──────────────────────────────────────
async def get_quota(self) -> dict:
"""사용자 스토리지 용량 정보.
Returns:
{used, total, free, percent} (바이트 기준)
"""
try:
dav_path = f"files/{self.nc.username}"
results = await self.nc.webdav_propfind(
dav_path,
props=["d:quota-used-bytes", "d:quota-available-bytes"],
depth=0,
)
if results:
prop = results[0]
# propfind 커스텀 필드는 직접 파싱 필요
# 기본 구현에서는 OCS 활용
return {}
except Exception as e:
logger.warning(f"용량 조회 실패: {e}")
return {}
# ──────────────────────────────────────
# CLI
# ──────────────────────────────────────
async def _cli():
import io
if sys.stdout.encoding != "utf-8":
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
args = sys.argv[1:]
client = NCFilesClient()
if not args:
print("사용법:")
print(" nc_files.py search <검색어>")
print(" nc_files.py ls [경로]")
print(" nc_files.py recent [갯수]")
print(" nc_files.py link <경로>")
return
if args[0] == "search" and len(args) > 1:
query = " ".join(args[1:])
files = await client.search(query)
print(f"🔍 '{query}' 검색 결과: {len(files)}\n")
for i, f in enumerate(files, 1):
print(f" {i}. {f.icon} {f.name} ({f.size_human})")
print(f" 📂 {f.path}")
elif args[0] == "ls":
path = args[1] if len(args) > 1 else ""
files = await client.list_dir(path)
print(f"📂 /{path or '(루트)'}{len(files)}\n")
for f in files:
size = f" ({f.size_human})" if not f.is_dir else ""
print(f" {f.icon} {f.name}{size}")
elif args[0] == "recent":
limit = int(args[1]) if len(args) > 1 else 10
files = await client.list_recent(limit)
print(f"🕐 최근 파일 {len(files)}건:\n")
for i, f in enumerate(files, 1):
print(f" {i}. {f.icon} {f.name} ({f.size_human})")
print(f" 📂 {f.path} | 📅 {f.lastmod}")
elif args[0] == "link" and len(args) > 1:
path = args[1]
url = await client.create_link(path)
if url:
print(f"🔗 {url}")
else:
print("❌ 공유 링크 생성 실패")
if __name__ == "__main__":
asyncio.run(_cli())

324
tools/nc_mail.py Normal file
View File

@@ -0,0 +1,324 @@
r"""Nextcloud Mail (IMAP) 모듈 — 메일 조회/검색/요약.
Mailcow IMAP 직접 연결로 메일 조회.
AI 요약 기능은 GeminiCaller를 통해 처리.
"""
import asyncio
import email
import email.header
import email.message
import imaplib
import logging
import os
import re
import sys
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import config
logger = logging.getLogger("variet.tools.nc_mail")
@dataclass
class MailMessage:
"""메일 메시지 정보."""
uid: str
subject: str
sender: str
date: str
snippet: str = "" # 본문 미리보기 (200자)
body: str = "" # 전체 본문
is_read: bool = False
@property
def sender_name(self) -> str:
"""발신자 이름만 추출."""
if "<" in self.sender:
return self.sender.split("<")[0].strip().strip('"')
return self.sender
@property
def sender_email(self) -> str:
"""발신자 이메일만 추출."""
m = re.search(r"<(.+?)>", self.sender)
return m.group(1) if m else self.sender
def to_display(self) -> str:
"""Discord embed용 표시."""
read_icon = "📬" if not self.is_read else "📭"
return f"{read_icon} **{self.sender_name}** — {self.subject}\n _{self.snippet}_"
class NCMailClient:
"""Mailcow IMAP 메일 클라이언트."""
def __init__(
self,
host: str = None,
port: int = None,
username: str = None,
password: str = None,
):
self.host = host or config.MAIL_IMAP_HOST
self.port = port or config.MAIL_IMAP_PORT
self.username = username or config.MAIL_USER
self.password = password or config.MAIL_PASSWORD
def _connect(self) -> imaplib.IMAP4_SSL:
"""IMAP SSL 연결 + 로그인 (PLAIN auth — Mailcow 앱 비밀번호 호환)."""
conn = imaplib.IMAP4_SSL(self.host, self.port)
# Mailcow 앱 비밀번호는 LOGIN 커맨드 거부 → PLAIN auth 사용
conn.authenticate(
"PLAIN",
lambda x: (
"\0" + self.username + "\0" + self.password
).encode(),
)
return conn
@staticmethod
def _decode_header(raw: str) -> str:
"""MIME 인코딩된 헤더 디코딩."""
if not raw:
return ""
decoded_parts = email.header.decode_header(raw)
result = []
for part, charset in decoded_parts:
if isinstance(part, bytes):
result.append(part.decode(charset or "utf-8", errors="replace"))
else:
result.append(part)
return " ".join(result)
@staticmethod
def _extract_body(msg: email.message.Message, max_len: int = 2000) -> str:
"""메일 본문 추출 (text/plain 우선)."""
body = ""
if msg.is_multipart():
for part in msg.walk():
ct = part.get_content_type()
if ct == "text/plain":
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or "utf-8"
body = payload.decode(charset, errors="replace")
break
# text/plain이 없으면 text/html에서 태그 제거
if not body:
for part in msg.walk():
ct = part.get_content_type()
if ct == "text/html":
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or "utf-8"
html = payload.decode(charset, errors="replace")
body = re.sub(r"<[^>]+>", "", html)
body = re.sub(r"\s+", " ", body).strip()
break
else:
payload = msg.get_payload(decode=True)
if payload:
charset = msg.get_content_charset() or "utf-8"
body = payload.decode(charset, errors="replace")
return body[:max_len]
def _fetch_message(self, conn: imaplib.IMAP4_SSL, uid: str, body: bool = False) -> Optional[MailMessage]:
"""UID로 메일 메시지 가져오기."""
fetch_parts = "(FLAGS BODY.PEEK[HEADER])" if not body else "(FLAGS BODY.PEEK[])"
status, data = conn.uid("FETCH", uid, fetch_parts)
if status != "OK" or not data or data[0] is None:
return None
raw_email = data[0][1] if isinstance(data[0], tuple) else data[0]
if isinstance(raw_email, bytes):
msg = email.message_from_bytes(raw_email)
else:
return None
# FLAGS 파싱
flags_data = data[0][0] if isinstance(data[0], tuple) else b""
is_read = b"\\Seen" in flags_data
subject = self._decode_header(msg.get("Subject", ""))
sender = self._decode_header(msg.get("From", ""))
date_str = msg.get("Date", "")
mail_body = ""
snippet = ""
if body:
mail_body = self._extract_body(msg)
snippet = mail_body[:200].replace("\n", " ").strip()
else:
snippet = subject[:200]
return MailMessage(
uid=uid,
subject=subject,
sender=sender,
date=date_str,
snippet=snippet,
body=mail_body,
is_read=is_read,
)
# ──────────────────────────────────────
# 미확인 메일
# ──────────────────────────────────────
async def get_unread(self, limit: int = 5, mailbox: str = "INBOX") -> list[MailMessage]:
"""미확인 메일 조회.
Args:
limit: 최대 건수
mailbox: 메일함 (기본: INBOX)
"""
def _fetch():
conn = self._connect()
try:
conn.select(mailbox, readonly=True)
status, data = conn.uid("SEARCH", None, "UNSEEN")
if status != "OK":
return []
uids = data[0].split()
# 최신 순
uids = uids[-limit:] if len(uids) > limit else uids
uids.reverse()
messages = []
for uid_bytes in uids:
uid = uid_bytes.decode()
msg = self._fetch_message(conn, uid, body=True)
if msg:
messages.append(msg)
return messages
finally:
conn.logout()
messages = await asyncio.to_thread(_fetch)
logger.info(f"미확인 메일 조회: {len(messages)}")
return messages
# ──────────────────────────────────────
# 메일 검색
# ──────────────────────────────────────
async def search(self, query: str, mailbox: str = "INBOX", limit: int = 10) -> list[MailMessage]:
"""메일 검색.
Args:
query: IMAP SEARCH 쿼리 또는 자연어 키워드
mailbox: 메일함
limit: 최대 건수
"""
def _search():
conn = self._connect()
try:
conn.select(mailbox, readonly=True)
# 자연어 → IMAP SEARCH 변환
if query.upper().startswith(("FROM", "SUBJECT", "SINCE", "BEFORE", "TO")):
search_criteria = query
else:
# 기본: SUBJECT + FROM 동시 검색
search_criteria = f'OR SUBJECT "{query}" FROM "{query}"'
status, data = conn.uid("SEARCH", None, search_criteria)
if status != "OK":
return []
uids = data[0].split()
uids = uids[-limit:] if len(uids) > limit else uids
uids.reverse()
messages = []
for uid_bytes in uids:
uid = uid_bytes.decode()
msg = self._fetch_message(conn, uid, body=False)
if msg:
messages.append(msg)
return messages
finally:
conn.logout()
messages = await asyncio.to_thread(_search)
logger.info(f"메일 검색 '{query}': {len(messages)}")
return messages
# ──────────────────────────────────────
# 메일 본문 조회
# ──────────────────────────────────────
async def get_message(self, uid: str, mailbox: str = "INBOX") -> Optional[MailMessage]:
"""메일 본문 전체 조회.
Args:
uid: 메일 UID
mailbox: 메일함
"""
def _fetch():
conn = self._connect()
try:
conn.select(mailbox, readonly=True)
return self._fetch_message(conn, uid, body=True)
finally:
conn.logout()
return await asyncio.to_thread(_fetch)
# ──────────────────────────────────────
# CLI
# ──────────────────────────────────────
async def _cli():
import io
if sys.stdout.encoding != "utf-8":
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
args = sys.argv[1:]
client = NCMailClient()
if not args:
print("사용법:")
print(" nc_mail.py unread [갯수]")
print(' nc_mail.py search "키워드"')
print(" nc_mail.py get <uid>")
return
if args[0] == "unread":
limit = int(args[1]) if len(args) > 1 else 5
messages = await client.get_unread(limit)
print(f"📬 미확인 메일 {len(messages)}건:\n")
for i, m in enumerate(messages, 1):
print(f" {i}. {m.to_display()}")
print(f" 📅 {m.date}")
print()
elif args[0] == "search" and len(args) > 1:
query = " ".join(args[1:])
messages = await client.search(query)
print(f"🔍 '{query}' 검색 결과: {len(messages)}\n")
for i, m in enumerate(messages, 1):
read = "📭" if m.is_read else "📬"
print(f" {i}. {read} [{m.uid}] {m.sender_name}{m.subject}")
print(f" 📅 {m.date}")
elif args[0] == "get" and len(args) > 1:
uid = args[1]
msg = await client.get_message(uid)
if msg:
print(f"📧 {msg.subject}")
print(f" From: {msg.sender}")
print(f" Date: {msg.date}")
print(f"{'' * 40}")
print(msg.body[:1000])
else:
print(f"❌ UID {uid} 메일을 찾을 수 없습니다.")
if __name__ == "__main__":
asyncio.run(_cli())

430
tools/nextcloud_client.py Normal file
View File

@@ -0,0 +1,430 @@
r"""Nextcloud 공통 클라이언트 — WebDAV / OCS / CalDAV / CardDAV.
모든 Nextcloud 연동 모듈(nc_files, nc_calendar, nc_mail, nc_contacts)의 기반.
인증: App Password + Basic Auth
API:
- WebDAV: remote.php/dav/files/{user}/
- OCS: ocs/v2.php/apps/...
- CalDAV: remote.php/dav/calendars/{user}/
- CardDAV: remote.php/dav/addressbooks/users/{user}/
"""
import asyncio
import logging
import sys
import os
from typing import Optional
from xml.etree import ElementTree as ET
import httpx
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import config
logger = logging.getLogger("variet.tools.nextcloud")
# XML 네임스페이스
DAV_NS = "DAV:"
OC_NS = "http://owncloud.org/ns"
NC_NS = "http://nextcloud.org/ns"
CAL_NS = "urn:ietf:params:xml:ns:caldav"
CARD_NS = "urn:ietf:params:xml:ns:carddav"
NS_MAP = {
"d": DAV_NS,
"oc": OC_NS,
"nc": NC_NS,
"cal": CAL_NS,
"card": CARD_NS,
}
class NextcloudClient:
"""Nextcloud 공통 HTTP 클라이언트.
WebDAV, OCS, CalDAV, CardDAV 공통 기능 제공.
"""
def __init__(
self,
base_url: str = None,
username: str = None,
app_password: str = None,
timeout: float = 30.0,
):
self.base_url = (base_url or config.NEXTCLOUD_URL).rstrip("/")
self.username = username or config.NEXTCLOUD_USER
self.app_password = app_password or config.NEXTCLOUD_APP_PASSWORD
self.timeout = timeout
self._auth = (self.username, self.app_password)
def _client(self) -> httpx.AsyncClient:
"""새 httpx 클라이언트 생성."""
return httpx.AsyncClient(
base_url=self.base_url,
auth=self._auth,
timeout=self.timeout,
headers={
"OCS-APIRequest": "true",
"Accept": "application/json",
},
)
# ──────────────────────────────────────
# OCS API (JSON)
# ──────────────────────────────────────
async def ocs_get(self, endpoint: str, params: dict = None) -> dict:
"""OCS GET 요청 (JSON 응답)."""
url = f"/ocs/v2.php/{endpoint}"
if "format=json" not in url:
params = params or {}
params["format"] = "json"
async with self._client() as client:
resp = await client.get(url, params=params)
resp.raise_for_status()
return resp.json().get("ocs", {}).get("data", {})
async def ocs_post(self, endpoint: str, data: dict = None) -> dict:
"""OCS POST 요청."""
url = f"/ocs/v2.php/{endpoint}"
async with self._client() as client:
resp = await client.post(
url,
data={**(data or {}), "format": "json"},
)
resp.raise_for_status()
return resp.json().get("ocs", {}).get("data", {})
async def ocs_delete(self, endpoint: str) -> bool:
"""OCS DELETE 요청."""
url = f"/ocs/v2.php/{endpoint}"
async with self._client() as client:
resp = await client.delete(url, params={"format": "json"})
return resp.status_code in (200, 204)
# ──────────────────────────────────────
# WebDAV (XML)
# ──────────────────────────────────────
async def webdav_request(
self,
method: str,
path: str,
body: str = None,
headers: dict = None,
) -> httpx.Response:
"""WebDAV 요청 (PROPFIND, REPORT, SEARCH, PUT, DELETE 등)."""
url = f"/remote.php/dav/{path}"
req_headers = {"Content-Type": "application/xml; charset=utf-8"}
if headers:
req_headers.update(headers)
async with self._client() as client:
resp = await client.request(
method, url,
content=body.encode("utf-8") if body else None,
headers=req_headers,
)
return resp
async def webdav_propfind(
self, path: str, props: list[str] = None, depth: int = 1,
) -> list[dict]:
"""PROPFIND — 파일/디렉토리 속성 조회.
Returns:
[{href, name, size, lastmod, content_type, is_dir}, ...]
"""
if props is None:
props = [
"d:getlastmodified",
"d:getcontentlength",
"d:getcontenttype",
"d:resourcetype",
"d:displayname",
"oc:fileid",
]
props_xml = "\n".join(f"<{p}/>" for p in props)
body = f"""<?xml version="1.0" encoding="UTF-8"?>
<d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns" xmlns:nc="http://nextcloud.org/ns">
<d:prop>
{props_xml}
</d:prop>
</d:propfind>"""
resp = await self.webdav_request(
"PROPFIND", path, body=body,
headers={"Depth": str(depth)},
)
resp.raise_for_status()
return self._parse_propfind(resp.text)
async def webdav_search(
self, query: str, path: str = None,
) -> list[dict]:
"""WebDAV SEARCH — 파일 검색.
Args:
query: 검색어 (파일명 매칭)
path: 검색 시작 경로 (기본: 사용자 루트)
"""
search_path = path or f"files/{self.username}"
body = f"""<?xml version="1.0" encoding="UTF-8"?>
<d:searchrequest xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns">
<d:basicsearch>
<d:select>
<d:prop>
<d:displayname/>
<d:getcontentlength/>
<d:getlastmodified/>
<d:getcontenttype/>
<d:resourcetype/>
<oc:fileid/>
</d:prop>
</d:select>
<d:from>
<d:scope>
<d:href>/remote.php/dav/{search_path}</d:href>
<d:depth>infinity</d:depth>
</d:scope>
</d:from>
<d:where>
<d:like>
<d:prop><d:displayname/></d:prop>
<d:literal>%{query}%</d:literal>
</d:like>
</d:where>
<d:limit>
<d:nresults>50</d:nresults>
</d:limit>
</d:basicsearch>
</d:searchrequest>"""
resp = await self.webdav_request("SEARCH", search_path, body=body)
if resp.status_code == 207:
return self._parse_propfind(resp.text)
logger.warning(f"WebDAV SEARCH 실패: {resp.status_code}")
return []
# ──────────────────────────────────────
# 파일 조작
# ──────────────────────────────────────
async def upload_file(
self, remote_path: str, data: bytes, content_type: str = "application/octet-stream",
) -> bool:
"""WebDAV PUT — 파일 업로드."""
url = f"files/{self.username}/{remote_path.lstrip('/')}"
async with self._client() as client:
resp = await client.put(
f"/remote.php/dav/{url}",
content=data,
headers={"Content-Type": content_type},
)
ok = resp.status_code in (200, 201, 204)
if ok:
logger.info(f"파일 업로드: {remote_path}")
else:
logger.error(f"업로드 실패: {resp.status_code} {resp.text[:200]}")
return ok
async def download_file(self, remote_path: str) -> Optional[bytes]:
"""WebDAV GET — 파일 다운로드."""
url = f"files/{self.username}/{remote_path.lstrip('/')}"
async with self._client() as client:
resp = await client.get(f"/remote.php/dav/{url}")
if resp.status_code == 200:
return resp.content
logger.warning(f"다운로드 실패: {resp.status_code}")
return None
# ──────────────────────────────────────
# 공유 링크 (OCS Share API)
# ──────────────────────────────────────
async def create_share_link(
self, path: str, expire_days: int = 7,
) -> Optional[str]:
"""OCS 공유 링크 생성.
Returns:
공유 URL 또는 None
"""
data = {
"shareType": "3", # 3 = 공개 링크
"path": path,
}
if expire_days:
from datetime import datetime, timedelta
expire = (datetime.now() + timedelta(days=expire_days)).strftime("%Y-%m-%d")
data["expireDate"] = expire
result = await self.ocs_post("apps/files_sharing/api/v1/shares", data)
url = result.get("url")
if url:
logger.info(f"공유 링크 생성: {path}{url}")
return url
async def delete_share(self, share_id: int) -> bool:
"""OCS 공유 링크 삭제."""
return await self.ocs_delete(f"apps/files_sharing/api/v1/shares/{share_id}")
# ──────────────────────────────────────
# DAV (CalDAV / CardDAV 공통)
# ──────────────────────────────────────
async def dav_request(
self, method: str, path: str, body: str = None, headers: dict = None,
) -> httpx.Response:
"""CalDAV/CardDAV 요청 래퍼."""
return await self.webdav_request(method, path, body=body, headers=headers)
async def dav_put(
self, path: str, data: str, content_type: str,
) -> httpx.Response:
"""CalDAV/CardDAV PUT — 리소스 생성/수정."""
async with self._client() as client:
resp = await client.put(
f"/remote.php/dav/{path}",
content=data.encode("utf-8"),
headers={"Content-Type": content_type},
)
return resp
async def dav_delete(self, path: str) -> bool:
"""CalDAV/CardDAV DELETE."""
async with self._client() as client:
resp = await client.delete(f"/remote.php/dav/{path}")
return resp.status_code in (200, 204)
# ──────────────────────────────────────
# XML 파싱 유틸
# ──────────────────────────────────────
@staticmethod
def _parse_propfind(xml_text: str) -> list[dict]:
"""PROPFIND 응답 XML → dict 리스트."""
results = []
try:
root = ET.fromstring(xml_text)
except ET.ParseError:
logger.error("PROPFIND XML 파싱 실패")
return results
for response in root.findall(f"{{{DAV_NS}}}response"):
href_el = response.find(f"{{{DAV_NS}}}href")
if href_el is None:
continue
href = href_el.text or ""
propstat = response.find(f"{{{DAV_NS}}}propstat")
if propstat is None:
continue
prop = propstat.find(f"{{{DAV_NS}}}prop")
if prop is None:
continue
# 디렉토리 여부
rtype = prop.find(f"{{{DAV_NS}}}resourcetype")
is_dir = rtype is not None and rtype.find(f"{{{DAV_NS}}}collection") is not None
# 이름 추출
displayname = prop.find(f"{{{DAV_NS}}}displayname")
name = displayname.text if displayname is not None and displayname.text else href.rstrip("/").split("/")[-1]
# 크기
size_el = prop.find(f"{{{DAV_NS}}}getcontentlength")
size = int(size_el.text) if size_el is not None and size_el.text else 0
# 수정시간
lastmod_el = prop.find(f"{{{DAV_NS}}}getlastmodified")
lastmod = lastmod_el.text if lastmod_el is not None else ""
# 콘텐츠 타입
ctype_el = prop.find(f"{{{DAV_NS}}}getcontenttype")
content_type = ctype_el.text if ctype_el is not None else ""
results.append({
"href": href,
"name": name,
"size": size,
"lastmod": lastmod,
"content_type": content_type,
"is_dir": is_dir,
})
return results
# ──────────────────────────────────────
# 헬스체크
# ──────────────────────────────────────
async def health_check(self) -> bool:
"""Nextcloud 연결 확인."""
try:
path = f"files/{self.username}"
results = await self.webdav_propfind(path, depth=0)
ok = len(results) > 0
if ok:
logger.info("Nextcloud 연결 확인 OK")
return ok
except Exception as e:
logger.error(f"Nextcloud 연결 실패: {e}")
return False
# ──────────────────────────────────────
# CLI
# ──────────────────────────────────────
async def _cli():
import io
if sys.stdout.encoding != "utf-8":
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
args = sys.argv[1:]
client = NextcloudClient()
if not args or args[0] == "check":
ok = await client.health_check()
print(f"{'✅ 연결 성공' if ok else '❌ 연결 실패'}")
elif args[0] == "ls" and len(args) > 1:
path = f"files/{client.username}/{args[1].lstrip('/')}"
items = await client.webdav_propfind(path)
print(f"📂 {args[1]}{len(items)}건:")
for item in items[1:]: # 첫 번째는 자기 자신
icon = "📁" if item["is_dir"] else "📄"
size = f" ({item['size']:,}B)" if not item["is_dir"] else ""
print(f" {icon} {item['name']}{size}")
elif args[0] == "search" and len(args) > 1:
query = " ".join(args[1:])
results = await client.webdav_search(query)
print(f"🔍 '{query}' 검색 결과: {len(results)}")
for item in results:
icon = "📁" if item["is_dir"] else "📄"
size = f" ({item['size']:,}B)" if not item["is_dir"] else ""
print(f" {icon} {item['name']}{size}")
print(f" {item['href']}")
elif args[0] == "share" and len(args) > 1:
path = args[1]
url = await client.create_share_link(path)
if url:
print(f"🔗 {url}")
else:
print("❌ 공유 링크 생성 실패")
else:
print("사용법:")
print(" nextcloud_client.py check")
print(" nextcloud_client.py ls <path>")
print(" nextcloud_client.py search <query>")
print(" nextcloud_client.py share <path>")
if __name__ == "__main__":
asyncio.run(_cli())