- tools/nextcloud_client.py: WebDAV/OCS/CalDAV/CardDAV 공통 클라이언트 - tools/nc_files.py: 파일 검색/목록/최근/공유링크 - tools/nc_calendar.py: CalDAV 일정 CRUD + ICS 빌더 - tools/nc_mail.py: IMAP 메일 조회 (PLAIN auth for Mailcow) - tools/nc_contacts.py: CardDAV 연락처 + EasyOCR 명함 스캔 - handlers/nc_handler.py: 자연어→NC도구 자동 라우팅 - core/foreman.py: 목표 분해 + 상담 세션 + Vikunja 등록 - prompts/foreman.md: Foreman 시스템 프롬프트 - prompts/unified.md: nextcloud 모드 분류 추가 - config.py: .env 따옴표 파싱 버그 수정 - api/discord_bot.py: /goal 커맨드 + Foreman 스레드 라우팅
325 lines
11 KiB
Python
325 lines
11 KiB
Python
r"""Nextcloud Mail (IMAP) 모듈 — 메일 조회/검색/요약.
|
|
|
|
Mailcow IMAP 직접 연결로 메일 조회.
|
|
AI 요약 기능은 GeminiCaller를 통해 처리.
|
|
"""
|
|
|
|
import asyncio
|
|
import email
|
|
import email.header
|
|
import email.message
|
|
import imaplib
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
|
import config
|
|
|
|
logger = logging.getLogger("variet.tools.nc_mail")
|
|
|
|
|
|
@dataclass
|
|
class MailMessage:
|
|
"""메일 메시지 정보."""
|
|
uid: str
|
|
subject: str
|
|
sender: str
|
|
date: str
|
|
snippet: str = "" # 본문 미리보기 (200자)
|
|
body: str = "" # 전체 본문
|
|
is_read: bool = False
|
|
|
|
@property
|
|
def sender_name(self) -> str:
|
|
"""발신자 이름만 추출."""
|
|
if "<" in self.sender:
|
|
return self.sender.split("<")[0].strip().strip('"')
|
|
return self.sender
|
|
|
|
@property
|
|
def sender_email(self) -> str:
|
|
"""발신자 이메일만 추출."""
|
|
m = re.search(r"<(.+?)>", self.sender)
|
|
return m.group(1) if m else self.sender
|
|
|
|
def to_display(self) -> str:
|
|
"""Discord embed용 표시."""
|
|
read_icon = "📬" if not self.is_read else "📭"
|
|
return f"{read_icon} **{self.sender_name}** — {self.subject}\n _{self.snippet}_"
|
|
|
|
|
|
class NCMailClient:
|
|
"""Mailcow IMAP 메일 클라이언트."""
|
|
|
|
def __init__(
|
|
self,
|
|
host: str = None,
|
|
port: int = None,
|
|
username: str = None,
|
|
password: str = None,
|
|
):
|
|
self.host = host or config.MAIL_IMAP_HOST
|
|
self.port = port or config.MAIL_IMAP_PORT
|
|
self.username = username or config.MAIL_USER
|
|
self.password = password or config.MAIL_PASSWORD
|
|
|
|
def _connect(self) -> imaplib.IMAP4_SSL:
|
|
"""IMAP SSL 연결 + 로그인 (PLAIN auth — Mailcow 앱 비밀번호 호환)."""
|
|
conn = imaplib.IMAP4_SSL(self.host, self.port)
|
|
# Mailcow 앱 비밀번호는 LOGIN 커맨드 거부 → PLAIN auth 사용
|
|
conn.authenticate(
|
|
"PLAIN",
|
|
lambda x: (
|
|
"\0" + self.username + "\0" + self.password
|
|
).encode(),
|
|
)
|
|
return conn
|
|
|
|
@staticmethod
|
|
def _decode_header(raw: str) -> str:
|
|
"""MIME 인코딩된 헤더 디코딩."""
|
|
if not raw:
|
|
return ""
|
|
decoded_parts = email.header.decode_header(raw)
|
|
result = []
|
|
for part, charset in decoded_parts:
|
|
if isinstance(part, bytes):
|
|
result.append(part.decode(charset or "utf-8", errors="replace"))
|
|
else:
|
|
result.append(part)
|
|
return " ".join(result)
|
|
|
|
@staticmethod
|
|
def _extract_body(msg: email.message.Message, max_len: int = 2000) -> str:
|
|
"""메일 본문 추출 (text/plain 우선)."""
|
|
body = ""
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
ct = part.get_content_type()
|
|
if ct == "text/plain":
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
charset = part.get_content_charset() or "utf-8"
|
|
body = payload.decode(charset, errors="replace")
|
|
break
|
|
# text/plain이 없으면 text/html에서 태그 제거
|
|
if not body:
|
|
for part in msg.walk():
|
|
ct = part.get_content_type()
|
|
if ct == "text/html":
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
charset = part.get_content_charset() or "utf-8"
|
|
html = payload.decode(charset, errors="replace")
|
|
body = re.sub(r"<[^>]+>", "", html)
|
|
body = re.sub(r"\s+", " ", body).strip()
|
|
break
|
|
else:
|
|
payload = msg.get_payload(decode=True)
|
|
if payload:
|
|
charset = msg.get_content_charset() or "utf-8"
|
|
body = payload.decode(charset, errors="replace")
|
|
|
|
return body[:max_len]
|
|
|
|
def _fetch_message(self, conn: imaplib.IMAP4_SSL, uid: str, body: bool = False) -> Optional[MailMessage]:
|
|
"""UID로 메일 메시지 가져오기."""
|
|
fetch_parts = "(FLAGS BODY.PEEK[HEADER])" if not body else "(FLAGS BODY.PEEK[])"
|
|
status, data = conn.uid("FETCH", uid, fetch_parts)
|
|
if status != "OK" or not data or data[0] is None:
|
|
return None
|
|
|
|
raw_email = data[0][1] if isinstance(data[0], tuple) else data[0]
|
|
if isinstance(raw_email, bytes):
|
|
msg = email.message_from_bytes(raw_email)
|
|
else:
|
|
return None
|
|
|
|
# FLAGS 파싱
|
|
flags_data = data[0][0] if isinstance(data[0], tuple) else b""
|
|
is_read = b"\\Seen" in flags_data
|
|
|
|
subject = self._decode_header(msg.get("Subject", ""))
|
|
sender = self._decode_header(msg.get("From", ""))
|
|
date_str = msg.get("Date", "")
|
|
|
|
mail_body = ""
|
|
snippet = ""
|
|
if body:
|
|
mail_body = self._extract_body(msg)
|
|
snippet = mail_body[:200].replace("\n", " ").strip()
|
|
else:
|
|
snippet = subject[:200]
|
|
|
|
return MailMessage(
|
|
uid=uid,
|
|
subject=subject,
|
|
sender=sender,
|
|
date=date_str,
|
|
snippet=snippet,
|
|
body=mail_body,
|
|
is_read=is_read,
|
|
)
|
|
|
|
# ──────────────────────────────────────
|
|
# 미확인 메일
|
|
# ──────────────────────────────────────
|
|
|
|
async def get_unread(self, limit: int = 5, mailbox: str = "INBOX") -> list[MailMessage]:
|
|
"""미확인 메일 조회.
|
|
|
|
Args:
|
|
limit: 최대 건수
|
|
mailbox: 메일함 (기본: INBOX)
|
|
"""
|
|
def _fetch():
|
|
conn = self._connect()
|
|
try:
|
|
conn.select(mailbox, readonly=True)
|
|
status, data = conn.uid("SEARCH", None, "UNSEEN")
|
|
if status != "OK":
|
|
return []
|
|
uids = data[0].split()
|
|
# 최신 순
|
|
uids = uids[-limit:] if len(uids) > limit else uids
|
|
uids.reverse()
|
|
|
|
messages = []
|
|
for uid_bytes in uids:
|
|
uid = uid_bytes.decode()
|
|
msg = self._fetch_message(conn, uid, body=True)
|
|
if msg:
|
|
messages.append(msg)
|
|
return messages
|
|
finally:
|
|
conn.logout()
|
|
|
|
messages = await asyncio.to_thread(_fetch)
|
|
logger.info(f"미확인 메일 조회: {len(messages)}건")
|
|
return messages
|
|
|
|
# ──────────────────────────────────────
|
|
# 메일 검색
|
|
# ──────────────────────────────────────
|
|
|
|
async def search(self, query: str, mailbox: str = "INBOX", limit: int = 10) -> list[MailMessage]:
|
|
"""메일 검색.
|
|
|
|
Args:
|
|
query: IMAP SEARCH 쿼리 또는 자연어 키워드
|
|
mailbox: 메일함
|
|
limit: 최대 건수
|
|
"""
|
|
def _search():
|
|
conn = self._connect()
|
|
try:
|
|
conn.select(mailbox, readonly=True)
|
|
|
|
# 자연어 → IMAP SEARCH 변환
|
|
if query.upper().startswith(("FROM", "SUBJECT", "SINCE", "BEFORE", "TO")):
|
|
search_criteria = query
|
|
else:
|
|
# 기본: SUBJECT + FROM 동시 검색
|
|
search_criteria = f'OR SUBJECT "{query}" FROM "{query}"'
|
|
|
|
status, data = conn.uid("SEARCH", None, search_criteria)
|
|
if status != "OK":
|
|
return []
|
|
uids = data[0].split()
|
|
uids = uids[-limit:] if len(uids) > limit else uids
|
|
uids.reverse()
|
|
|
|
messages = []
|
|
for uid_bytes in uids:
|
|
uid = uid_bytes.decode()
|
|
msg = self._fetch_message(conn, uid, body=False)
|
|
if msg:
|
|
messages.append(msg)
|
|
return messages
|
|
finally:
|
|
conn.logout()
|
|
|
|
messages = await asyncio.to_thread(_search)
|
|
logger.info(f"메일 검색 '{query}': {len(messages)}건")
|
|
return messages
|
|
|
|
# ──────────────────────────────────────
|
|
# 메일 본문 조회
|
|
# ──────────────────────────────────────
|
|
|
|
async def get_message(self, uid: str, mailbox: str = "INBOX") -> Optional[MailMessage]:
|
|
"""메일 본문 전체 조회.
|
|
|
|
Args:
|
|
uid: 메일 UID
|
|
mailbox: 메일함
|
|
"""
|
|
def _fetch():
|
|
conn = self._connect()
|
|
try:
|
|
conn.select(mailbox, readonly=True)
|
|
return self._fetch_message(conn, uid, body=True)
|
|
finally:
|
|
conn.logout()
|
|
|
|
return await asyncio.to_thread(_fetch)
|
|
|
|
|
|
# ──────────────────────────────────────
|
|
# CLI
|
|
# ──────────────────────────────────────
|
|
|
|
async def _cli():
|
|
import io
|
|
if sys.stdout.encoding != "utf-8":
|
|
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
|
|
|
|
args = sys.argv[1:]
|
|
client = NCMailClient()
|
|
|
|
if not args:
|
|
print("사용법:")
|
|
print(" nc_mail.py unread [갯수]")
|
|
print(' nc_mail.py search "키워드"')
|
|
print(" nc_mail.py get <uid>")
|
|
return
|
|
|
|
if args[0] == "unread":
|
|
limit = int(args[1]) if len(args) > 1 else 5
|
|
messages = await client.get_unread(limit)
|
|
print(f"📬 미확인 메일 {len(messages)}건:\n")
|
|
for i, m in enumerate(messages, 1):
|
|
print(f" {i}. {m.to_display()}")
|
|
print(f" 📅 {m.date}")
|
|
print()
|
|
|
|
elif args[0] == "search" and len(args) > 1:
|
|
query = " ".join(args[1:])
|
|
messages = await client.search(query)
|
|
print(f"🔍 '{query}' 검색 결과: {len(messages)}건\n")
|
|
for i, m in enumerate(messages, 1):
|
|
read = "📭" if m.is_read else "📬"
|
|
print(f" {i}. {read} [{m.uid}] {m.sender_name} — {m.subject}")
|
|
print(f" 📅 {m.date}")
|
|
|
|
elif args[0] == "get" and len(args) > 1:
|
|
uid = args[1]
|
|
msg = await client.get_message(uid)
|
|
if msg:
|
|
print(f"📧 {msg.subject}")
|
|
print(f" From: {msg.sender}")
|
|
print(f" Date: {msg.date}")
|
|
print(f"{'─' * 40}")
|
|
print(msg.body[:1000])
|
|
else:
|
|
print(f"❌ UID {uid} 메일을 찾을 수 없습니다.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(_cli())
|