Files
gravity_web/server/message-accumulator.js
Variet 1060476113 feat(server,frontend): real-time sync architecture with message accumulator
- Add message-accumulator.js: cascades diff-based message accumulation
- Add 3-second cascade polling with broadcastToAll (was undefined!)
- Add /api/bridge/approve endpoint: tries accept/reject Step→Command→Terminal
- Add persistent approve/reject buttons in chat header toolbar
- Frontend: loadSessionMessages (trajectory + accumulated), applyNewMessages (WS push)
- Status change detection: _prevStatusKey tracking prevents unnecessary re-renders
- actionInProgress flag prevents DOM replacement during button fetch
- Known issues: Trajectory 341 hard limit, Cascade no command-approval state
2026-03-08 14:05:59 +09:00

185 lines
6.9 KiB
JavaScript

/**
* MessageAccumulator — 세션별 메시지 누적 관리
*
* Antigravity API 한계(trajectory 최대 341 스텝)를 우회하기 위해,
* bridge WS 이벤트 발생 시 cascades 스냅샷을 diff하여 변경된
* task/notify/blocking 메시지를 서버 측에서 누적 저장.
*/
class MessageAccumulator {
constructor() {
/** @type {Map<string, {messages: object[], lastSnapshot: object}>} */
this.sessions = new Map();
}
/**
* cascade 스냅샷을 받아 diff → 새 메시지 추출 및 저장
* @returns {object[]|null} 새로 추가된 메시지 배열 (변경 없으면 null)
*/
processCascade(sessionId, cascade) {
if (!cascade || !sessionId) return null;
let session = this.sessions.get(sessionId);
if (!session) {
session = { messages: [], lastSnapshot: {} };
this.sessions.set(sessionId, session);
}
const prev = session.lastSnapshot;
const newMessages = [];
// 1. Task boundary 변경 감지
const tbStep = cascade.latestTaskBoundaryStep?.step;
const tbIndex = cascade.latestTaskBoundaryStep?.stepIndex ?? -1;
const prevTbIndex = prev.taskStepIndex ?? -1;
if (tbStep?.taskBoundary && tbIndex > prevTbIndex) {
const tb = tbStep.taskBoundary;
newMessages.push({
type: 'task',
title: tb.taskName || '',
summary: tb.taskSummary || '',
status: tb.taskStatus || '',
mode: tb.mode || '',
tools: [],
time: tbStep.metadata?.createdAt || '',
stepIndex: tbIndex,
});
} else if (tbStep?.taskBoundary && tbIndex === prevTbIndex) {
// 같은 task인데 summary/status 업데이트 → 기존 메시지 수정
const existing = session.messages.filter(m => m.type === 'task' && m.stepIndex === tbIndex).pop();
if (existing) {
const tb = tbStep.taskBoundary;
existing.summary = tb.taskSummary || existing.summary;
existing.status = tb.taskStatus || existing.status;
}
}
// 2. Notify user 변경 감지
const nuStep = cascade.latestNotifyUserStep?.step;
const nuIndex = cascade.latestNotifyUserStep?.stepIndex ?? -1;
const prevNuIndex = prev.notifyStepIndex ?? -1;
if (nuStep?.notifyUser?.notificationContent && nuIndex > prevNuIndex) {
newMessages.push({
type: 'text',
content: nuStep.notifyUser.notificationContent,
isBlocking: !!nuStep.notifyUser.isBlocking,
time: nuStep.metadata?.createdAt || '',
stepIndex: nuIndex,
});
}
// 3. 사용자 입력 감지 (텍스트는 없지만 시간으로 추정)
const lastUserInput = cascade.lastUserInputTime || '';
const prevUserInput = prev.lastUserInputTime || '';
const lastUserInputIdx = cascade.lastUserInputStepIndex ?? -1;
const prevUserInputIdx = prev.lastUserInputStepIndex ?? -1;
if (lastUserInput && lastUserInput !== prevUserInput && lastUserInputIdx > prevUserInputIdx) {
newMessages.push({
type: 'user_input',
time: lastUserInput,
stepIndex: lastUserInputIdx,
});
}
// 4. 현재 상태 계산
const currentStatus = this._computeStatus(cascade);
// 스냅샷 저장
session.lastSnapshot = {
taskStepIndex: tbIndex,
notifyStepIndex: nuIndex,
lastUserInputTime: lastUserInput,
lastUserInputStepIndex: lastUserInputIdx,
status: cascade.status,
stepCount: cascade.stepCount,
};
// 새 메시지 정렬 후 추가
if (newMessages.length > 0) {
newMessages.sort((a, b) => (a.stepIndex || 0) - (b.stepIndex || 0));
session.messages.push(...newMessages);
}
return newMessages.length > 0 ? { newMessages, currentStatus } : { newMessages: null, currentStatus };
}
/**
* 현재 상태 계산 (blocking, running, waiting, idle)
*/
_computeStatus(cascade) {
const nuStep = cascade.latestNotifyUserStep?.step;
const lastUserInput = cascade.lastUserInputTime || '';
const nuTime = nuStep?.metadata?.createdAt || '';
const isRunning = cascade.status === 'CASCADE_RUN_STATUS_RUNNING';
const stepCount = cascade.stepCount || 0;
// notify_user blocking: notify가 blocking이고, AI가 idle이고, 사용자 미응답
let isBlocking = false;
if (nuStep?.notifyUser?.isBlocking && !isRunning) {
if (!lastUserInput || lastUserInput < nuTime) {
isBlocking = true;
}
}
// stepCount 변화 추적 (승인 대기 감지용)
const sessionId = cascade.trajectoryId || '';
if (!this._stepHistory) this._stepHistory = new Map();
const now = Date.now();
const hist = this._stepHistory.get(sessionId) || { count: 0, lastChangeTime: now };
let isWaiting = false;
if (stepCount !== hist.count) {
hist.count = stepCount;
hist.lastChangeTime = now;
} else if (isRunning && (now - hist.lastChangeTime) > 6000) {
// RUNNING인데 6초간 stepCount 안 변함 → 사용자 응답 대기
isWaiting = true;
}
this._stepHistory.set(sessionId, hist);
return {
isRunning,
isBlocking,
isWaiting,
stepCount,
};
}
/**
* 세션의 전체 누적 메시지 반환
*/
getMessages(sessionId) {
const session = this.sessions.get(sessionId);
return session ? session.messages : [];
}
/**
* 세션의 현재 상태 반환
*/
getStatus(sessionId) {
const session = this.sessions.get(sessionId);
if (!session?.lastSnapshot) return { isRunning: false, isBlocking: false, stepCount: 0 };
const cascade = session.lastSnapshot;
return {
isRunning: cascade.status === 'CASCADE_RUN_STATUS_RUNNING',
isBlocking: false, // 정확한 판단은 processCascade에서만
stepCount: cascade.stepCount || 0,
};
}
/**
* 초기 trajectory 메시지와 병합하여 전체 메시지 반환
* trajectory의 마지막 stepIndex 이후의 누적 메시지만 추가
*/
getMergedMessages(sessionId, trajectoryMessages, trajLastStepIndex) {
const accumulated = this.getMessages(sessionId);
// trajectory 이후의 누적 메시지만 필터
const afterTrajectory = accumulated.filter(m => (m.stepIndex || 0) > trajLastStepIndex);
return [...trajectoryMessages, ...afterTrajectory];
}
}
module.exports = MessageAccumulator;