feat(server,frontend): real-time sync architecture with message accumulator
- Add message-accumulator.js: cascades diff-based message accumulation - Add 3-second cascade polling with broadcastToAll (was undefined!) - Add /api/bridge/approve endpoint: tries accept/reject Step→Command→Terminal - Add persistent approve/reject buttons in chat header toolbar - Frontend: loadSessionMessages (trajectory + accumulated), applyNewMessages (WS push) - Status change detection: _prevStatusKey tracking prevents unnecessary re-renders - actionInProgress flag prevents DOM replacement during button fetch - Known issues: Trajectory 341 hard limit, Cascade no command-approval state
This commit is contained in:
129
server/index.js
129
server/index.js
@@ -12,12 +12,20 @@ const { WebSocketServer, WebSocket } = require('ws');
|
||||
const SessionManager = require('./session-manager');
|
||||
const { AutoDiscovery } = require('./auto-discover');
|
||||
const BridgeClient = require('./bridge-client');
|
||||
const MessageAccumulator = require('./message-accumulator');
|
||||
|
||||
const PORT = process.env.PORT || 3300;
|
||||
const app = express();
|
||||
const server = http.createServer(app);
|
||||
|
||||
app.use(express.json());
|
||||
// 개발 모드: 캐시 비활성화
|
||||
app.use((req, res, next) => {
|
||||
res.set('Cache-Control', 'no-store, no-cache, must-revalidate');
|
||||
res.set('Pragma', 'no-cache');
|
||||
res.set('Expires', '0');
|
||||
next();
|
||||
});
|
||||
app.use(express.static(path.join(__dirname, '..', 'public')));
|
||||
|
||||
const sessionManager = new SessionManager();
|
||||
@@ -253,6 +261,18 @@ function broadcastSessions() {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 모든 WS 클라이언트에 메시지 브로드캐스트 (Bridge 이벤트 전달용)
|
||||
*/
|
||||
function broadcastToAll(msg) {
|
||||
const payload = JSON.stringify(msg);
|
||||
for (const [ws] of wsClients) {
|
||||
if (ws.readyState === WebSocket.OPEN) {
|
||||
ws.send(payload);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Bridge API 프록시 ──────────────────────────────────
|
||||
|
||||
const bridge = new BridgeClient();
|
||||
@@ -284,6 +304,22 @@ app.get('/api/bridge/cascades', async (req, res) => {
|
||||
}
|
||||
});
|
||||
|
||||
// 단일 세션 cascades (실시간 갱신용 — 응답 크기 최소화)
|
||||
app.get('/api/bridge/cascades/:sessionId', async (req, res) => {
|
||||
try {
|
||||
const data = await bridge.getCascades();
|
||||
const cascades = data.cascades || data;
|
||||
const session = cascades[req.params.sessionId];
|
||||
if (!session) {
|
||||
res.json({ status: 'not_found' });
|
||||
} else {
|
||||
res.json(session);
|
||||
}
|
||||
} catch (e) {
|
||||
res.status(502).json({ error: 'Bridge 연결 실패: ' + e.message });
|
||||
}
|
||||
});
|
||||
|
||||
app.post('/api/bridge/send', async (req, res) => {
|
||||
try {
|
||||
const { message, sessionId } = req.body;
|
||||
@@ -296,7 +332,7 @@ app.post('/api/bridge/send', async (req, res) => {
|
||||
|
||||
app.post('/api/bridge/accept', async (req, res) => {
|
||||
try {
|
||||
const result = await bridge.acceptStep();
|
||||
const result = await bridge.sendAction('acceptStep');
|
||||
res.json(result);
|
||||
} catch (e) {
|
||||
res.status(502).json({ error: e.message });
|
||||
@@ -305,7 +341,7 @@ app.post('/api/bridge/accept', async (req, res) => {
|
||||
|
||||
app.post('/api/bridge/reject', async (req, res) => {
|
||||
try {
|
||||
const result = await bridge.rejectStep();
|
||||
const result = await bridge.sendAction('rejectStep');
|
||||
res.json(result);
|
||||
} catch (e) {
|
||||
res.status(502).json({ error: e.message });
|
||||
@@ -314,7 +350,7 @@ app.post('/api/bridge/reject', async (req, res) => {
|
||||
|
||||
app.post('/api/bridge/accept-terminal', async (req, res) => {
|
||||
try {
|
||||
const result = await bridge.acceptTerminal();
|
||||
const result = await bridge.sendAction('acceptTerminal');
|
||||
res.json(result);
|
||||
} catch (e) {
|
||||
res.status(502).json({ error: e.message });
|
||||
@@ -339,9 +375,92 @@ app.post('/api/bridge/action', async (req, res) => {
|
||||
res.status(502).json({ error: e.message });
|
||||
}
|
||||
});
|
||||
bridge.connectWs((msg) => {
|
||||
broadcastToAll({ type: 'bridge_event', ...msg });
|
||||
|
||||
// 승인/거절 통합 엔드포인트: 모든 유형을 순차 시도
|
||||
app.post('/api/bridge/approve', async (req, res) => {
|
||||
const { type } = req.body; // 'accept' or 'reject'
|
||||
const actions = type === 'accept'
|
||||
? ['acceptStep', 'acceptCommand', 'acceptTerminal']
|
||||
: ['rejectStep', 'rejectCommand', 'rejectTerminal'];
|
||||
|
||||
for (const action of actions) {
|
||||
try {
|
||||
const result = await bridge.sendAction(action);
|
||||
console.log(`[Approve] ${action} 성공`);
|
||||
return res.json({ success: true, action, result });
|
||||
} catch (_) {
|
||||
// 이 타입은 안 맞음 → 다음 시도
|
||||
}
|
||||
}
|
||||
res.status(502).json({ error: '승인/거절 실패: 대기 중인 요청 없음' });
|
||||
});
|
||||
const accumulator = new MessageAccumulator();
|
||||
|
||||
// 메시지 누적 API
|
||||
app.get('/api/bridge/messages/:sessionId', async (req, res) => {
|
||||
// 요청 시점에 즉시 최신 cascade 반영
|
||||
try {
|
||||
const cascData = await bridge.getCascades();
|
||||
const cascades = cascData.cascades || cascData;
|
||||
const sid = req.params.sessionId;
|
||||
if (cascades[sid]) {
|
||||
accumulator.processCascade(sid, cascades[sid]);
|
||||
}
|
||||
} catch (_) { }
|
||||
const messages = accumulator.getMessages(req.params.sessionId);
|
||||
const status = accumulator.getStatus(req.params.sessionId);
|
||||
res.json({ messages, ...status });
|
||||
});
|
||||
|
||||
bridge.connectWs(async (msg) => {
|
||||
console.log(`[Bridge Event] ${msg.type} session=${msg.sessionId || 'N/A'}`);
|
||||
broadcastToAll({ type: 'bridge_event', ...msg });
|
||||
|
||||
// step_changed/state_changed → cascade 스냅샷 diff → 새 메시지 push
|
||||
if (msg.type === 'step_changed' || msg.type === 'state_changed') {
|
||||
await pollAndPushCascades(msg.sessionId);
|
||||
}
|
||||
});
|
||||
|
||||
// 주기적 cascade polling (WS 이벤트 누락 보완, 3초 간격)
|
||||
setInterval(async () => {
|
||||
if (!bridge.connected) return;
|
||||
await pollAndPushCascades();
|
||||
}, 3000);
|
||||
|
||||
async function pollAndPushCascades(specificSessionId) {
|
||||
try {
|
||||
const cascData = await bridge.getCascades();
|
||||
const cascades = cascData.cascades || cascData;
|
||||
|
||||
for (const [sessionId, cascade] of Object.entries(cascades)) {
|
||||
if (specificSessionId && sessionId !== specificSessionId) continue;
|
||||
|
||||
const result = accumulator.processCascade(sessionId, cascade);
|
||||
if (result && result.newMessages) {
|
||||
const status = accumulator._computeStatus(cascade);
|
||||
console.log(`[Accumulator] ${sessionId.substring(0, 8)}: +${result.newMessages.length} messages (${status.isRunning ? 'running' : status.isBlocking ? 'blocking' : 'idle'})`);
|
||||
broadcastToAll({
|
||||
type: 'new_messages',
|
||||
sessionId,
|
||||
messages: result.newMessages,
|
||||
...status,
|
||||
});
|
||||
} else if (result) {
|
||||
// 메시지 변경 없어도 상태 변경은 push
|
||||
const status = accumulator._computeStatus(cascade);
|
||||
broadcastToAll({
|
||||
type: 'new_messages',
|
||||
sessionId,
|
||||
messages: null,
|
||||
...status,
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
// 연결 안 됨 — 무시
|
||||
}
|
||||
}
|
||||
|
||||
// ─── CDP REST API (레거시) ──────────────────────────────
|
||||
|
||||
|
||||
184
server/message-accumulator.js
Normal file
184
server/message-accumulator.js
Normal file
@@ -0,0 +1,184 @@
|
||||
/**
|
||||
* MessageAccumulator — 세션별 메시지 누적 관리
|
||||
*
|
||||
* Antigravity API 한계(trajectory 최대 341 스텝)를 우회하기 위해,
|
||||
* bridge WS 이벤트 발생 시 cascades 스냅샷을 diff하여 변경된
|
||||
* task/notify/blocking 메시지를 서버 측에서 누적 저장.
|
||||
*/
|
||||
class MessageAccumulator {
|
||||
constructor() {
|
||||
/** @type {Map<string, {messages: object[], lastSnapshot: object}>} */
|
||||
this.sessions = new Map();
|
||||
}
|
||||
|
||||
/**
|
||||
* cascade 스냅샷을 받아 diff → 새 메시지 추출 및 저장
|
||||
* @returns {object[]|null} 새로 추가된 메시지 배열 (변경 없으면 null)
|
||||
*/
|
||||
processCascade(sessionId, cascade) {
|
||||
if (!cascade || !sessionId) return null;
|
||||
|
||||
let session = this.sessions.get(sessionId);
|
||||
if (!session) {
|
||||
session = { messages: [], lastSnapshot: {} };
|
||||
this.sessions.set(sessionId, session);
|
||||
}
|
||||
|
||||
const prev = session.lastSnapshot;
|
||||
const newMessages = [];
|
||||
|
||||
// 1. Task boundary 변경 감지
|
||||
const tbStep = cascade.latestTaskBoundaryStep?.step;
|
||||
const tbIndex = cascade.latestTaskBoundaryStep?.stepIndex ?? -1;
|
||||
const prevTbIndex = prev.taskStepIndex ?? -1;
|
||||
|
||||
if (tbStep?.taskBoundary && tbIndex > prevTbIndex) {
|
||||
const tb = tbStep.taskBoundary;
|
||||
newMessages.push({
|
||||
type: 'task',
|
||||
title: tb.taskName || '',
|
||||
summary: tb.taskSummary || '',
|
||||
status: tb.taskStatus || '',
|
||||
mode: tb.mode || '',
|
||||
tools: [],
|
||||
time: tbStep.metadata?.createdAt || '',
|
||||
stepIndex: tbIndex,
|
||||
});
|
||||
} else if (tbStep?.taskBoundary && tbIndex === prevTbIndex) {
|
||||
// 같은 task인데 summary/status 업데이트 → 기존 메시지 수정
|
||||
const existing = session.messages.filter(m => m.type === 'task' && m.stepIndex === tbIndex).pop();
|
||||
if (existing) {
|
||||
const tb = tbStep.taskBoundary;
|
||||
existing.summary = tb.taskSummary || existing.summary;
|
||||
existing.status = tb.taskStatus || existing.status;
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Notify user 변경 감지
|
||||
const nuStep = cascade.latestNotifyUserStep?.step;
|
||||
const nuIndex = cascade.latestNotifyUserStep?.stepIndex ?? -1;
|
||||
const prevNuIndex = prev.notifyStepIndex ?? -1;
|
||||
|
||||
if (nuStep?.notifyUser?.notificationContent && nuIndex > prevNuIndex) {
|
||||
newMessages.push({
|
||||
type: 'text',
|
||||
content: nuStep.notifyUser.notificationContent,
|
||||
isBlocking: !!nuStep.notifyUser.isBlocking,
|
||||
time: nuStep.metadata?.createdAt || '',
|
||||
stepIndex: nuIndex,
|
||||
});
|
||||
}
|
||||
|
||||
// 3. 사용자 입력 감지 (텍스트는 없지만 시간으로 추정)
|
||||
const lastUserInput = cascade.lastUserInputTime || '';
|
||||
const prevUserInput = prev.lastUserInputTime || '';
|
||||
const lastUserInputIdx = cascade.lastUserInputStepIndex ?? -1;
|
||||
const prevUserInputIdx = prev.lastUserInputStepIndex ?? -1;
|
||||
|
||||
if (lastUserInput && lastUserInput !== prevUserInput && lastUserInputIdx > prevUserInputIdx) {
|
||||
newMessages.push({
|
||||
type: 'user_input',
|
||||
time: lastUserInput,
|
||||
stepIndex: lastUserInputIdx,
|
||||
});
|
||||
}
|
||||
|
||||
// 4. 현재 상태 계산
|
||||
const currentStatus = this._computeStatus(cascade);
|
||||
|
||||
// 스냅샷 저장
|
||||
session.lastSnapshot = {
|
||||
taskStepIndex: tbIndex,
|
||||
notifyStepIndex: nuIndex,
|
||||
lastUserInputTime: lastUserInput,
|
||||
lastUserInputStepIndex: lastUserInputIdx,
|
||||
status: cascade.status,
|
||||
stepCount: cascade.stepCount,
|
||||
};
|
||||
|
||||
// 새 메시지 정렬 후 추가
|
||||
if (newMessages.length > 0) {
|
||||
newMessages.sort((a, b) => (a.stepIndex || 0) - (b.stepIndex || 0));
|
||||
session.messages.push(...newMessages);
|
||||
}
|
||||
|
||||
return newMessages.length > 0 ? { newMessages, currentStatus } : { newMessages: null, currentStatus };
|
||||
}
|
||||
|
||||
/**
|
||||
* 현재 상태 계산 (blocking, running, waiting, idle)
|
||||
*/
|
||||
_computeStatus(cascade) {
|
||||
const nuStep = cascade.latestNotifyUserStep?.step;
|
||||
const lastUserInput = cascade.lastUserInputTime || '';
|
||||
const nuTime = nuStep?.metadata?.createdAt || '';
|
||||
const isRunning = cascade.status === 'CASCADE_RUN_STATUS_RUNNING';
|
||||
const stepCount = cascade.stepCount || 0;
|
||||
|
||||
// notify_user blocking: notify가 blocking이고, AI가 idle이고, 사용자 미응답
|
||||
let isBlocking = false;
|
||||
if (nuStep?.notifyUser?.isBlocking && !isRunning) {
|
||||
if (!lastUserInput || lastUserInput < nuTime) {
|
||||
isBlocking = true;
|
||||
}
|
||||
}
|
||||
|
||||
// stepCount 변화 추적 (승인 대기 감지용)
|
||||
const sessionId = cascade.trajectoryId || '';
|
||||
if (!this._stepHistory) this._stepHistory = new Map();
|
||||
const now = Date.now();
|
||||
const hist = this._stepHistory.get(sessionId) || { count: 0, lastChangeTime: now };
|
||||
|
||||
let isWaiting = false;
|
||||
if (stepCount !== hist.count) {
|
||||
hist.count = stepCount;
|
||||
hist.lastChangeTime = now;
|
||||
} else if (isRunning && (now - hist.lastChangeTime) > 6000) {
|
||||
// RUNNING인데 6초간 stepCount 안 변함 → 사용자 응답 대기
|
||||
isWaiting = true;
|
||||
}
|
||||
this._stepHistory.set(sessionId, hist);
|
||||
|
||||
return {
|
||||
isRunning,
|
||||
isBlocking,
|
||||
isWaiting,
|
||||
stepCount,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* 세션의 전체 누적 메시지 반환
|
||||
*/
|
||||
getMessages(sessionId) {
|
||||
const session = this.sessions.get(sessionId);
|
||||
return session ? session.messages : [];
|
||||
}
|
||||
|
||||
/**
|
||||
* 세션의 현재 상태 반환
|
||||
*/
|
||||
getStatus(sessionId) {
|
||||
const session = this.sessions.get(sessionId);
|
||||
if (!session?.lastSnapshot) return { isRunning: false, isBlocking: false, stepCount: 0 };
|
||||
const cascade = session.lastSnapshot;
|
||||
return {
|
||||
isRunning: cascade.status === 'CASCADE_RUN_STATUS_RUNNING',
|
||||
isBlocking: false, // 정확한 판단은 processCascade에서만
|
||||
stepCount: cascade.stepCount || 0,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* 초기 trajectory 메시지와 병합하여 전체 메시지 반환
|
||||
* trajectory의 마지막 stepIndex 이후의 누적 메시지만 추가
|
||||
*/
|
||||
getMergedMessages(sessionId, trajectoryMessages, trajLastStepIndex) {
|
||||
const accumulated = this.getMessages(sessionId);
|
||||
// trajectory 이후의 누적 메시지만 필터
|
||||
const afterTrajectory = accumulated.filter(m => (m.stepIndex || 0) > trajLastStepIndex);
|
||||
return [...trajectoryMessages, ...afterTrajectory];
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = MessageAccumulator;
|
||||
Reference in New Issue
Block a user