r"""Nextcloud Mail (IMAP) 모듈 — 메일 조회/검색/요약. Mailcow IMAP 직접 연결로 메일 조회. AI 요약 기능은 GeminiCaller를 통해 처리. """ import asyncio import email import email.header import email.message import imaplib import logging import os import re import sys from dataclasses import dataclass from datetime import datetime from typing import Optional sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) import config logger = logging.getLogger("variet.tools.nc_mail") @dataclass class MailMessage: """메일 메시지 정보.""" uid: str subject: str sender: str date: str snippet: str = "" # 본문 미리보기 (200자) body: str = "" # 전체 본문 is_read: bool = False @property def sender_name(self) -> str: """발신자 이름만 추출.""" if "<" in self.sender: return self.sender.split("<")[0].strip().strip('"') return self.sender @property def sender_email(self) -> str: """발신자 이메일만 추출.""" m = re.search(r"<(.+?)>", self.sender) return m.group(1) if m else self.sender def to_display(self) -> str: """Discord embed용 표시.""" read_icon = "📬" if not self.is_read else "📭" return f"{read_icon} **{self.sender_name}** — {self.subject}\n _{self.snippet}_" class NCMailClient: """Mailcow IMAP 메일 클라이언트.""" def __init__( self, host: str = None, port: int = None, username: str = None, password: str = None, ): self.host = host or config.MAIL_IMAP_HOST self.port = port or config.MAIL_IMAP_PORT self.username = username or config.MAIL_USER self.password = password or config.MAIL_PASSWORD def _connect(self) -> imaplib.IMAP4_SSL: """IMAP SSL 연결 + 로그인 (PLAIN auth — Mailcow 앱 비밀번호 호환).""" conn = imaplib.IMAP4_SSL(self.host, self.port) # Mailcow 앱 비밀번호는 LOGIN 커맨드 거부 → PLAIN auth 사용 conn.authenticate( "PLAIN", lambda x: ( "\0" + self.username + "\0" + self.password ).encode(), ) return conn @staticmethod def _decode_header(raw: str) -> str: """MIME 인코딩된 헤더 디코딩.""" if not raw: return "" decoded_parts = email.header.decode_header(raw) result = [] for part, charset in decoded_parts: if isinstance(part, bytes): result.append(part.decode(charset or "utf-8", errors="replace")) else: result.append(part) return " ".join(result) @staticmethod def _extract_body(msg: email.message.Message, max_len: int = 2000) -> str: """메일 본문 추출 (text/plain 우선).""" body = "" if msg.is_multipart(): for part in msg.walk(): ct = part.get_content_type() if ct == "text/plain": payload = part.get_payload(decode=True) if payload: charset = part.get_content_charset() or "utf-8" body = payload.decode(charset, errors="replace") break # text/plain이 없으면 text/html에서 태그 제거 if not body: for part in msg.walk(): ct = part.get_content_type() if ct == "text/html": payload = part.get_payload(decode=True) if payload: charset = part.get_content_charset() or "utf-8" html = payload.decode(charset, errors="replace") body = re.sub(r"<[^>]+>", "", html) body = re.sub(r"\s+", " ", body).strip() break else: payload = msg.get_payload(decode=True) if payload: charset = msg.get_content_charset() or "utf-8" body = payload.decode(charset, errors="replace") return body[:max_len] def _fetch_message(self, conn: imaplib.IMAP4_SSL, uid: str, body: bool = False) -> Optional[MailMessage]: """UID로 메일 메시지 가져오기.""" fetch_parts = "(FLAGS BODY.PEEK[HEADER])" if not body else "(FLAGS BODY.PEEK[])" status, data = conn.uid("FETCH", uid, fetch_parts) if status != "OK" or not data or data[0] is None: return None raw_email = data[0][1] if isinstance(data[0], tuple) else data[0] if isinstance(raw_email, bytes): msg = email.message_from_bytes(raw_email) else: return None # FLAGS 파싱 flags_data = data[0][0] if isinstance(data[0], tuple) else b"" is_read = b"\\Seen" in flags_data subject = self._decode_header(msg.get("Subject", "")) sender = self._decode_header(msg.get("From", "")) date_str = msg.get("Date", "") mail_body = "" snippet = "" if body: mail_body = self._extract_body(msg) snippet = mail_body[:200].replace("\n", " ").strip() else: snippet = subject[:200] return MailMessage( uid=uid, subject=subject, sender=sender, date=date_str, snippet=snippet, body=mail_body, is_read=is_read, ) # ────────────────────────────────────── # 미확인 메일 # ────────────────────────────────────── async def get_unread(self, limit: int = 5, mailbox: str = "INBOX") -> list[MailMessage]: """미확인 메일 조회. Args: limit: 최대 건수 mailbox: 메일함 (기본: INBOX) """ def _fetch(): conn = self._connect() try: conn.select(mailbox, readonly=True) status, data = conn.uid("SEARCH", None, "UNSEEN") if status != "OK": return [] uids = data[0].split() # 최신 순 uids = uids[-limit:] if len(uids) > limit else uids uids.reverse() messages = [] for uid_bytes in uids: uid = uid_bytes.decode() msg = self._fetch_message(conn, uid, body=True) if msg: messages.append(msg) return messages finally: conn.logout() messages = await asyncio.to_thread(_fetch) logger.info(f"미확인 메일 조회: {len(messages)}건") return messages # ────────────────────────────────────── # 메일 검색 # ────────────────────────────────────── async def search(self, query: str, mailbox: str = "INBOX", limit: int = 10) -> list[MailMessage]: """메일 검색. Args: query: IMAP SEARCH 쿼리 또는 자연어 키워드 mailbox: 메일함 limit: 최대 건수 """ def _search(): conn = self._connect() try: conn.select(mailbox, readonly=True) # 자연어 → IMAP SEARCH 변환 if query.upper().startswith(("FROM", "SUBJECT", "SINCE", "BEFORE", "TO")): search_criteria = query else: # 기본: SUBJECT + FROM 동시 검색 search_criteria = f'OR SUBJECT "{query}" FROM "{query}"' status, data = conn.uid("SEARCH", None, search_criteria) if status != "OK": return [] uids = data[0].split() uids = uids[-limit:] if len(uids) > limit else uids uids.reverse() messages = [] for uid_bytes in uids: uid = uid_bytes.decode() msg = self._fetch_message(conn, uid, body=False) if msg: messages.append(msg) return messages finally: conn.logout() messages = await asyncio.to_thread(_search) logger.info(f"메일 검색 '{query}': {len(messages)}건") return messages # ────────────────────────────────────── # 메일 본문 조회 # ────────────────────────────────────── async def get_message(self, uid: str, mailbox: str = "INBOX") -> Optional[MailMessage]: """메일 본문 전체 조회. Args: uid: 메일 UID mailbox: 메일함 """ def _fetch(): conn = self._connect() try: conn.select(mailbox, readonly=True) return self._fetch_message(conn, uid, body=True) finally: conn.logout() return await asyncio.to_thread(_fetch) # ────────────────────────────────────── # CLI # ────────────────────────────────────── async def _cli(): import io if sys.stdout.encoding != "utf-8": sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace") args = sys.argv[1:] client = NCMailClient() if not args: print("사용법:") print(" nc_mail.py unread [갯수]") print(' nc_mail.py search "키워드"') print(" nc_mail.py get ") return if args[0] == "unread": limit = int(args[1]) if len(args) > 1 else 5 messages = await client.get_unread(limit) print(f"📬 미확인 메일 {len(messages)}건:\n") for i, m in enumerate(messages, 1): print(f" {i}. {m.to_display()}") print(f" 📅 {m.date}") print() elif args[0] == "search" and len(args) > 1: query = " ".join(args[1:]) messages = await client.search(query) print(f"🔍 '{query}' 검색 결과: {len(messages)}건\n") for i, m in enumerate(messages, 1): read = "📭" if m.is_read else "📬" print(f" {i}. {read} [{m.uid}] {m.sender_name} — {m.subject}") print(f" 📅 {m.date}") elif args[0] == "get" and len(args) > 1: uid = args[1] msg = await client.get_message(uid) if msg: print(f"📧 {msg.subject}") print(f" From: {msg.sender}") print(f" Date: {msg.date}") print(f"{'─' * 40}") print(msg.body[:1000]) else: print(f"❌ UID {uid} 메일을 찾을 수 없습니다.") if __name__ == "__main__": asyncio.run(_cli())