/** * MessageAccumulator — 세션별 메시지 누적 관리 * * Antigravity API 한계(trajectory 최대 341 스텝)를 우회하기 위해, * bridge WS 이벤트 발생 시 cascades 스냅샷을 diff하여 변경된 * task/notify/blocking 메시지를 서버 측에서 누적 저장. */ class MessageAccumulator { constructor() { /** @type {Map} */ this.sessions = new Map(); } /** * cascade 스냅샷을 받아 diff → 새 메시지 추출 및 저장 * @returns {object[]|null} 새로 추가된 메시지 배열 (변경 없으면 null) */ processCascade(sessionId, cascade) { if (!cascade || !sessionId) return null; let session = this.sessions.get(sessionId); if (!session) { session = { messages: [], lastSnapshot: {} }; this.sessions.set(sessionId, session); } const prev = session.lastSnapshot; const newMessages = []; // 1. Task boundary 변경 감지 const tbStep = cascade.latestTaskBoundaryStep?.step; const tbIndex = cascade.latestTaskBoundaryStep?.stepIndex ?? -1; const prevTbIndex = prev.taskStepIndex ?? -1; if (tbStep?.taskBoundary && tbIndex > prevTbIndex) { const tb = tbStep.taskBoundary; newMessages.push({ type: 'task', title: tb.taskName || '', summary: tb.taskSummary || '', status: tb.taskStatus || '', mode: tb.mode || '', tools: [], time: tbStep.metadata?.createdAt || '', stepIndex: tbIndex, }); } else if (tbStep?.taskBoundary && tbIndex === prevTbIndex) { // 같은 task인데 summary/status 업데이트 → 기존 메시지 수정 const existing = session.messages.filter(m => m.type === 'task' && m.stepIndex === tbIndex).pop(); if (existing) { const tb = tbStep.taskBoundary; existing.summary = tb.taskSummary || existing.summary; existing.status = tb.taskStatus || existing.status; } } // 2. Notify user 변경 감지 const nuStep = cascade.latestNotifyUserStep?.step; const nuIndex = cascade.latestNotifyUserStep?.stepIndex ?? -1; const prevNuIndex = prev.notifyStepIndex ?? -1; if (nuStep?.notifyUser?.notificationContent && nuIndex > prevNuIndex) { newMessages.push({ type: 'text', content: nuStep.notifyUser.notificationContent, isBlocking: !!nuStep.notifyUser.isBlocking, time: nuStep.metadata?.createdAt || '', stepIndex: nuIndex, }); } // 3. 사용자 입력 감지 (텍스트는 없지만 시간으로 추정) const lastUserInput = cascade.lastUserInputTime || ''; const prevUserInput = prev.lastUserInputTime || ''; const lastUserInputIdx = cascade.lastUserInputStepIndex ?? -1; const prevUserInputIdx = prev.lastUserInputStepIndex ?? -1; if (lastUserInput && lastUserInput !== prevUserInput && lastUserInputIdx > prevUserInputIdx) { newMessages.push({ type: 'user_input', time: lastUserInput, stepIndex: lastUserInputIdx, }); } // 4. 현재 상태 계산 const currentStatus = this._computeStatus(cascade); // 스냅샷 저장 session.lastSnapshot = { taskStepIndex: tbIndex, notifyStepIndex: nuIndex, lastUserInputTime: lastUserInput, lastUserInputStepIndex: lastUserInputIdx, status: cascade.status, stepCount: cascade.stepCount, }; // 새 메시지 정렬 후 추가 if (newMessages.length > 0) { newMessages.sort((a, b) => (a.stepIndex || 0) - (b.stepIndex || 0)); session.messages.push(...newMessages); } return newMessages.length > 0 ? { newMessages, currentStatus } : { newMessages: null, currentStatus }; } /** * 현재 상태 계산 (blocking, running, waiting, idle) */ _computeStatus(cascade) { const nuStep = cascade.latestNotifyUserStep?.step; const lastUserInput = cascade.lastUserInputTime || ''; const nuTime = nuStep?.metadata?.createdAt || ''; const isRunning = cascade.status === 'CASCADE_RUN_STATUS_RUNNING'; const stepCount = cascade.stepCount || 0; // notify_user blocking: notify가 blocking이고, AI가 idle이고, 사용자 미응답 let isBlocking = false; if (nuStep?.notifyUser?.isBlocking && !isRunning) { if (!lastUserInput || lastUserInput < nuTime) { isBlocking = true; } } // stepCount 변화 추적 (승인 대기 감지용) const sessionId = cascade.trajectoryId || ''; if (!this._stepHistory) this._stepHistory = new Map(); const now = Date.now(); const hist = this._stepHistory.get(sessionId) || { count: 0, lastChangeTime: now }; let isWaiting = false; if (stepCount !== hist.count) { hist.count = stepCount; hist.lastChangeTime = now; } else if (isRunning && (now - hist.lastChangeTime) > 6000) { // RUNNING인데 6초간 stepCount 안 변함 → 사용자 응답 대기 isWaiting = true; } this._stepHistory.set(sessionId, hist); return { isRunning, isBlocking, isWaiting, stepCount, }; } /** * 세션의 전체 누적 메시지 반환 */ getMessages(sessionId) { const session = this.sessions.get(sessionId); return session ? session.messages : []; } /** * 세션의 현재 상태 반환 */ getStatus(sessionId) { const session = this.sessions.get(sessionId); if (!session?.lastSnapshot) return { isRunning: false, isBlocking: false, stepCount: 0 }; const cascade = session.lastSnapshot; return { isRunning: cascade.status === 'CASCADE_RUN_STATUS_RUNNING', isBlocking: false, // 정확한 판단은 processCascade에서만 stepCount: cascade.stepCount || 0, }; } /** * 초기 trajectory 메시지와 병합하여 전체 메시지 반환 * trajectory의 마지막 stepIndex 이후의 누적 메시지만 추가 */ getMergedMessages(sessionId, trajectoryMessages, trajLastStepIndex) { const accumulated = this.getMessages(sessionId); // trajectory 이후의 누적 메시지만 필터 const afterTrajectory = accumulated.filter(m => (m.stepIndex || 0) > trajLastStepIndex); return [...trajectoryMessages, ...afterTrajectory]; } } module.exports = MessageAccumulator;