Files
gravity_control/extension/src/step-probe.ts

1205 lines
75 KiB
TypeScript

/**
* Step Probe Module - SDK polling, response handling, approval strategies.
* All shared state accessed through BridgeContext.
* Extracted from extension.ts.
*/
import * as vscode from 'vscode';
import * as fs from 'fs';
import * as path from 'path';
import { WSBridgeClient } from './ws-client';
import { extractPlannerText, extractToolCommand, extractToolDescription } from './step-utils';
import { initApprovalHandler, setupResponseWatcher } from './approval-handler';
// Re-export from approval-handler for backward compatibility with extension.ts imports
export { handleDiffReviewResponse, tryApprovalStrategies } from './approval-handler';
export interface BridgeContext {
bridgePath: string;
projectName: string;
sdk: any;
wsBridge: WSBridgeClient | null;
activeSessionId: string;
sessionStalled: boolean;
lastPendingStepIndex: number;
stallProbed: boolean;
sawRunningAfterPending: boolean;
setClickTrigger: (action: 'approve' | 'reject') => void;
logToFile: (msg: string) => void;
workspaceUri: string;
diffReviewMetadata: Map<string, { edit_step_indices: number[]; modified_files: string[] }>;
recentDiscordSentTexts: Map<string, number>;
writeChatSnapshot: (text: string) => void;
writeChatSnapshotWithFiles: (text: string, files: Array<{ name: string, content: string }>) => void;
fixLSConnection?: () => Promise<boolean>;
}
let ctx: BridgeContext;
let responseWatcher: fs.FSWatcher | null = null;
let activeTrajectoryId = '';
const recentPendingSteps = new Map<string, number>();
const PENDING_MEMORY_TTL_MS = 60_000;
// generateApprovalObserverScript → extracted to ./observer-script.ts
const lastSnapshotText = new Map<string, string>();
/**
* Get current approval context for WS response routing.
* Used by extension.ts onResponse handler to call tryApprovalStrategies.
*/
export function getApprovalContext(): { sessionId: string; stepIndex: number } {
return {
sessionId: ctx.activeSessionId || '',
stepIndex: ctx.lastPendingStepIndex ?? -1,
};
}
/**
* Get the active session ID tracked by step-probe polling.
* Used by command-handler (via extension.ts) for !stop → CancelCascadeInvocation.
*
* CRITICAL: extension.ts must use this getter instead of its own module-level
* `activeSessionId` variable, which is a stale primitive copy that never updates
* when step-probe discovers new sessions.
*/
export function getActiveSessionId(): string {
return ctx?.activeSessionId || '';
}
/**
* Get step-probe context values for http-bridge.
* Prevents stale primitive copies by reading live ctx values.
*/
export function getStepProbeContext(): { activeSessionId: string; sessionStalled: boolean; lastPendingStepIndex: number } {
return {
activeSessionId: ctx?.activeSessionId || '',
sessionStalled: ctx?.sessionStalled ?? false,
lastPendingStepIndex: ctx?.lastPendingStepIndex ?? -1,
};
}
/**
* Reset pending state after successful approval.
* Called after WS response triggers approval in extension.ts.
*/
export function resetPendingState(): void {
ctx.stallProbed = false;
ctx.sawRunningAfterPending = false;
}
/**
* Reset step-probe state after WS reconnection.
* Without this, stallProbed=true + lastPendingStepIndex=N permanently block
* re-detection of WAITING steps whose pending was lost during disconnect.
*/
export function resetPendingStateForReconnect(): void {
if (!ctx) return; // Prevent startup race conditions
ctx.lastPendingStepIndex = -1;
ctx.stallProbed = false;
ctx.sawRunningAfterPending = false;
recentPendingSteps.clear();
ctx.logToFile('[STEP-PROBE] Reset pending state for WS reconnect');
}
// handleDiffReviewResponse → moved to ./approval-handler.ts
/**
* Write a registration file for the Bot to discover session → project mapping.
* Called automatically on first step event per session.
*/
export function writeRegistration(sessionId: string) {
try {
// WS route (preferred) — skip file write to prevent duplicate
if (ctx.wsBridge && ctx.wsBridge.isConnected()) {
ctx.wsBridge.sendRegister({
conversation_id: sessionId,
project_name: ctx.projectName,
});
return; // WS delivered — skip file write
}
// File route (fallback — only when WS is NOT connected)
const regDir = path.join(ctx.bridgePath, 'register');
if (!fs.existsSync(regDir)) { fs.mkdirSync(regDir, { recursive: true }); }
const regFile = path.join(regDir, `${sessionId}.json`);
const data = {
conversation_id: sessionId,
project_name: ctx.projectName,
timestamp: Date.now() / 1000,
};
fs.writeFileSync(regFile, JSON.stringify(data, null, 2), 'utf-8');
console.log(`Gravity Bridge: registered session ${sessionId.substring(0, 8)}${ctx.projectName}`);
} catch (e: any) {
console.log(`Gravity Bridge: registration write error: ${e.message}`);
}
}
function setupMonitor() {
if (!ctx.sdk) { return; }
// NOTE: SDK EventMonitor DISABLED to prevent ERR_CONNECTION_REFUSED spam.
// Root cause: EventMonitor polls GetCascadeTrajectorySteps every 2s via rawRPC,
// which has a 775-step hard limit and generates connection errors.
// ALL relay is now handled by the GetAllCascadeTrajectories POLL below.
console.log('Gravity Bridge: SDK monitor DISABLED (using GetAllCascadeTrajectories POLL instead)');
// ══════════════════════════════════════════════════════════════════════
// PRIMARY RELAY: GetAllCascadeTrajectories (THE CORRECT API!)
//
// PROVEN VIA DIRECT RPC TESTING:
// - GetCascadeTrajectorySteps: 775-step hard limit, startStepIndex IGNORED
// - getDiagnostics.lastStepIndex: stale (can lag behind)
// - GetAllCascadeTrajectories:
// stepCount: REAL-TIME (verified 1413→1429 live)
// latestNotifyUserStep: contains FULL notificationContent
// latestTaskBoundaryStep: contains FULL taskName/Status/Summary
// stepIndex on each → perfect for dedup
// ══════════════════════════════════════════════════════════════════════
let pollCount = 0;
// ctx.activeSessionId is module-level (used by ctx.writeChatSnapshot for lazy registration)
let activeSessionTitle = '';
let lastKnownStepCount = 0;
let lastNotifyStepIndex = -1;
let lastTaskStepIndex = -1;
// ctx.lastPendingStepIndex is module-level (above ctx.sessionStalled)
let consecutiveIdleCount = 0; // debounce: require N consecutive stall polls
let lastPendingTime = 0; // cooldown: minimum gap between pendings
// ctx.sawRunningAfterPending is module-level (used by processResponseFile to close auto_resolve gate)
let lastModTime = ''; // track lastModifiedTime to distinguish thinking vs approval
// ctx.stallProbed is module-level (used by processResponseFile to reset after approval)
let lastRelayedTaskText = ''; // dedup TASK_BOUNDARY relay
let wasRunning = false; // track RUNNING→IDLE transition for response capture
let lastUserInputStepIdx = -1; // track user input for response matching
let pendingModifiedFiles: string[] = []; // accumulate modified files during RUNNING
let pendingModifiedFilePaths: string[] = []; // full paths for diff review
let pendingEditStepIndices: number[] = []; // step indices for AcknowledgeCascadeCodeEdit
let lastResponseCaptureStep = -1; // dedup: don't capture same response twice
setInterval(async () => {
pollCount++;
if (pollCount <= 3 || pollCount % 12 === 0) {
ctx.logToFile(`[POLL#${pollCount}] alive`);
}
try {
// Fix (v0.5.14): Reverted 100-limit DoS but restored descending: true with a safe limit of 30
const allTraj = await ctx.sdk.ls.rawRPC('GetAllCascadeTrajectories', { limit: 30, descending: true });
if (!allTraj?.trajectorySummaries) {
if (pollCount <= 3) ctx.logToFile('[POLL] no trajectorySummaries');
return;
}
// ── Filter to sessions owned by THIS window ──
// PRIMARY: Use trajectoryMetadata.workspaces URI to match sessions to this workspace.
// FALLBACK: Use bridge/register/ files for sessions without metadata.
// This prevents cross-window session grabbing when multiple AG instances run.
let bestSession: any = null;
let bestSessionId = '';
let bestModTime = '';
const regDir = path.join(ctx.bridgePath, 'register');
const normalizedWorkspace = ctx.workspaceUri.replace(/\\/g, '/').toLowerCase();
// ── DEBUG: Log all available sessions on every 12th poll ──
const sessionIds = Object.keys(allTraj.trajectorySummaries);
if (pollCount <= 3 || pollCount % 12 === 0) {
ctx.logToFile(`[SESSION-FILTER] total=${sessionIds.length} myWorkspace="${normalizedWorkspace}"`);
for (const [sid, data] of Object.entries(allTraj.trajectorySummaries) as [string, any][]) {
const tm = data.trajectoryMetadata;
const wsRaw = tm?.workspaces?.[0]?.workspaceFolderAbsoluteUri || 'NO_META';
const status = String(data.status || '').replace('CASCADE_RUN_STATUS_', '');
const steps = data.stepCount || 0;
const modT = (data.lastModifiedTime || '').substring(11, 19);
ctx.logToFile(`[SESSION-FILTER] ${sid.substring(0, 8)} ws=${wsRaw.substring(wsRaw.lastIndexOf('/') + 1)} steps=${steps} ${status} mod=${modT}`);
}
}
for (const [sid, data] of Object.entries(allTraj.trajectorySummaries) as [string, any][]) {
// PRIMARY FILTER: Check workspace URI from trajectoryMetadata
const trajMeta = data.trajectoryMetadata;
if (trajMeta?.workspaces?.length > 0 && normalizedWorkspace) {
const sessionWorkspaceRaw = trajMeta.workspaces[0]?.workspaceFolderAbsoluteUri || '';
// Convert file:///c:/Users/... URI to c:/Users/... path for comparison
const sessionWorkspace = sessionWorkspaceRaw
.replace(/^file:\/\/\//, '')
.replace(/%3A/gi, ':')
.replace(/\\/g, '/')
.toLowerCase();
if (sessionWorkspace && !sessionWorkspace.includes(normalizedWorkspace) && !normalizedWorkspace.includes(sessionWorkspace)) {
// Session belongs to a different workspace — skip
continue;
}
} else {
// FALLBACK: Check registration file (for sessions without metadata)
const regFile = path.join(regDir, `${sid}.json`);
if (fs.existsSync(regFile)) {
try {
const reg = JSON.parse(fs.readFileSync(regFile, 'utf-8'));
if (reg.project_name && reg.project_name !== ctx.projectName) {
// Session belongs to another window — skip
continue;
}
} catch { }
}
}
const modTime = data.lastModifiedTime || '';
const candidateRunning = String(data.status || '').includes('RUNNING');
const bestIsRunning = bestSession ? String(bestSession.status || '').includes('RUNNING') : false;
// Prefer RUNNING over IDLE, then latest modTime within same status tier
if (!bestSession
|| (candidateRunning && !bestIsRunning)
|| (candidateRunning === bestIsRunning && modTime > bestModTime)) {
bestSession = data;
bestSessionId = sid;
bestModTime = modTime;
}
}
if (!bestSession) {
if (pollCount <= 10 || pollCount % 12 === 0) {
ctx.logToFile(`[SESSION-FILTER] NO session matched! total=${sessionIds.length}`);
}
return;
}
const currentCount = bestSession.stepCount || 0;
const currentTitle = (bestSession.summary || 'Untitled').substring(0, 50);
const isRunning = String(bestSession.status || '').includes('RUNNING');
const currentModTime = bestSession.lastModifiedTime || (bestSession as any).lastModifiedTimestamp || (bestSession as any).modifiedTime || '';
// Session changed?
if (bestSessionId !== ctx.activeSessionId) {
ctx.activeSessionId = bestSessionId;
activeTrajectoryId = (bestSession as any).trajectoryId || '';
activeSessionTitle = currentTitle;
lastKnownStepCount = currentCount;
lastNotifyStepIndex = bestSession.latestNotifyUserStep?.stepIndex ?? -1;
lastTaskStepIndex = bestSession.latestTaskBoundaryStep?.stepIndex ?? -1;
lastUserInputStepIdx = bestSession.lastUserInputStepIndex ?? -1;
lastResponseCaptureStep = currentCount; // don't re-relay old responses
ctx.lastPendingStepIndex = -1;
ctx.stallProbed = false;
ctx.sawRunningAfterPending = true; // prevent stale auto_resolve from previous session
consecutiveIdleCount = 0;
lastModTime = currentModTime; // use currentModTime → prevents THINKING branch on first poll
// Reset transition/diff state from previous session (regression guard for fall-through)
wasRunning = isRunning;
pendingModifiedFiles = [];
pendingModifiedFilePaths = [];
pendingEditStepIndices = [];
// Don't register here — registration happens lazily in ctx.writeChatSnapshot/writePendingApproval
// to avoid race conditions between multiple extension instances
// Dump session keys + trajectoryMetadata on session change
const allKeys = Object.keys(bestSession);
ctx.logToFile(`[SESSION-INIT] id=${ctx.activeSessionId.substring(0, 8)} keys=[${allKeys.join(',')}]`);
const trajMeta = bestSession.trajectoryMetadata;
if (trajMeta) {
ctx.logToFile(`[SESSION-INIT] trajectoryMetadata=${JSON.stringify(trajMeta).substring(0, 500)}`);
}
console.log(`Gravity Bridge: [POLL#${pollCount}] session: ${ctx.activeSessionId.substring(0, 8)} "${currentTitle}" steps=${currentCount} ${isRunning ? 'RUNNING' : 'idle'}`);
// Fall through to WAITING detection — immediate probe on session change.
// Previously, `return` here caused 20-25s delay before first WAITING
// detection (confirmed via extension.log). Now we immediately check.
}
const delta = currentCount - lastKnownStepCount;
lastKnownStepCount = currentCount;
if (delta > 0) {
console.log(`Gravity Bridge: [POLL#${pollCount}] +${delta} steps (${currentCount}) "${currentTitle}"`);
// Real-time response capture: fetch latest steps on every delta>0
if (isRunning && currentCount > lastResponseCaptureStep && ctx.sdk) {
try {
const rtOffset = Math.max(0, currentCount - 3);
const rtResp = await ctx.sdk.ls.rawRPC('GetCascadeTrajectorySteps', {
cascadeId: bestSessionId,
stepOffset: rtOffset,
verbosity: 1, // DEBUG — includes plannerResponse text
});
if (rtResp?.steps?.length > 0) {
for (let ri = rtResp.steps.length - 1; ri >= 0; ri--) {
const s = rtResp.steps[ri];
const sType = s?.type || '';
const actualIdx = rtOffset + ri;
if (actualIdx <= lastResponseCaptureStep) continue;
// Track file write steps for diff review
if (s?.metadata?.toolCall?.argumentsJson) {
try {
const tcArgs = JSON.parse(s.metadata.toolCall.argumentsJson);
const tf = tcArgs.TargetFile || tcArgs.target_file || '';
if (tf) {
const bn = tf.split(/[\\/]/).pop() || tf;
if (!pendingModifiedFiles.includes(bn)) {
pendingModifiedFiles.push(bn);
pendingModifiedFilePaths.push(tf);
pendingEditStepIndices.push(actualIdx);
ctx.logToFile(`[DIFF-TRACK] + ${bn} (step ${actualIdx})`);
}
}
} catch { }
}
if (sType.includes('PLANNER_RESPONSE') && s?.status?.includes('DONE')) {
const pr = s?.plannerResponse;
if (pr) {
let text = pr.modifiedResponse || pr.rawText || pr.text || '';
if (text.length > 10) {
lastResponseCaptureStep = actualIdx;
ctx.logToFile(`[RT-CAPTURE] step=${actualIdx} (${text.length} chars)`);
const truncated = text.length > 3500
? text.substring(0, 3500) + '\n\n_(이하 생략)_'
: text;
ctx.writeChatSnapshot(`💬 **AI 응답**\n\n${truncated}`);
break;
}
}
}
}
}
} catch (rte: any) {
// Non-critical — don't spam logs
if (pollCount <= 5) ctx.logToFile(`[RT-CAPTURE] error: ${rte.message.substring(0, 80)}`);
}
}
}
// Log session state on EVERY poll for diagnostics
const statusStr = String(bestSession.status || 'UNKNOWN');
if (pollCount <= 10 || pollCount % 6 === 0 || delta > 0) {
ctx.logToFile(`[POLL#${pollCount}] status=${statusStr} steps=${currentCount} delta=${delta}`);
}
// ── PRIMARY: Step-probe-based approval detection ──
// On stall (idle=1, ~5s), probe GetCascadeTrajectorySteps to check WAITING.
// 775-step limit: probe fails for long sessions → faster stall fallback.
// ── STALL-BASED approval detection with step probe ──
const modTimeChanged = currentModTime !== lastModTime;
const isStall = isRunning && delta === 0;
// Session-change immediate probe: bypass stall debounce on first poll
// Without this, consecutiveIdleCount=0 prevents probe condition (>=1)
if (isStall && consecutiveIdleCount === 0 && !ctx.stallProbed && !modTimeChanged) {
consecutiveIdleCount = 1; // force probe condition satisfied
}
// Log modTime on stalls for debugging
if (isStall && consecutiveIdleCount < 8) {
ctx.logToFile(`[STALL-DBG] idle=${consecutiveIdleCount} modTime='${currentModTime}' changed=${modTimeChanged}`);
}
if (delta > 0) {
ctx.sessionStalled = false;
// Steps progressed — if we had a pending approval, it was handled in AG directly
if (!ctx.sawRunningAfterPending && ctx.lastPendingStepIndex >= 0) {
// Mark pending as auto_resolved so bot can update Discord message
let resolvedCount = 0;
let primaryCommand = '';
const pendingFiles = fs.readdirSync(path.join(ctx.bridgePath, 'pending')).filter((f: string) => f.endsWith('.json'));
const nowMs = Date.now();
for (const pf of pendingFiles) {
const pfPath = path.join(ctx.bridgePath, 'pending', pf);
try {
const pd = JSON.parse(fs.readFileSync(pfPath, 'utf-8'));
if (pd.status !== 'pending') continue;
if (pd.project_name && pd.project_name !== ctx.projectName) continue;
// Limit to same session AND (same step or recent)
const ageMs = nowMs - (pd.timestamp * 1000);
const isMatch = (pd.conversation_id === ctx.activeSessionId) &&
(pd.step_index === ctx.lastPendingStepIndex || (ageMs < 60_000 && ageMs >= 0));
if (isMatch) {
pd.status = 'auto_resolved';
fs.promises.writeFile(pfPath, JSON.stringify(pd, null, 2), 'utf-8').catch(e => {
ctx.logToFile(`[AUTO-RESOLVE] write error: ${e.message}`);
});
resolvedCount++;
const cmd = pd.command || '';
if (cmd.length > primaryCommand.length && cmd !== 'Deny' && !cmd.includes('Allow')) {
primaryCommand = cmd;
} else if (!primaryCommand) {
primaryCommand = cmd;
}
}
} catch (e: any) { ctx.logToFile(`[AUTO-RESOLVE] parse error for ${pf}: ${e.message}`); }
}
if (resolvedCount > 0) {
ctx.logToFile(`[AUTO-RESOLVE] step=${ctx.lastPendingStepIndex} progressed → marked ${resolvedCount} pending(s)`);
ctx.writeChatSnapshot(`✅ **AG에서 직접 진행됨** (step ${ctx.lastPendingStepIndex})\n\n\`${primaryCommand.substring(0, 200)}\``);
}
ctx.lastPendingStepIndex = -1;
// Clear memory dedup for this session (step progressed, new WAITING steps are allowed)
for (const k of recentPendingSteps.keys()) {
if (k.startsWith(ctx.activeSessionId + ':')) recentPendingSteps.delete(k);
}
}
consecutiveIdleCount = 0;
ctx.sawRunningAfterPending = true;
ctx.stallProbed = false; // allow re-probe on next stall
lastModTime = currentModTime;
} else if (isStall) {
if (modTimeChanged) {
// lastModifiedTime is still changing = AI is thinking, NOT approval
consecutiveIdleCount = 0; // Reset!
ctx.stallProbed = false;
if (pollCount <= 10 || pollCount % 12 === 0) {
ctx.logToFile(`[THINK] step=${currentCount} modTime changing → not stall`);
}
} else {
// lastModifiedTime frozen = real stall (approval waiting)
consecutiveIdleCount++;
if (consecutiveIdleCount >= 1) ctx.sessionStalled = true;
}
lastModTime = currentModTime;
// ── Step probe: on stall, fetch latest step via cascadeId (retry until WAITING found) ──
// CONFIRMED: param='cascadeId', id=sessionId (map key from trajectorySummaries)
// Retries every 2 polls (~10s) because RUN_COMMAND step may not be created yet
if (consecutiveIdleCount >= 1 && !ctx.stallProbed) {
try {
const stepsResp = await ctx.sdk.ls.rawRPC('GetCascadeTrajectorySteps', {
cascadeId: bestSessionId,
verbosity: 1, // DEBUG — includes argumentsJson for command extraction
});
if (stepsResp?.steps?.length > 0) {
const steps = stepsResp.steps;
// Diagnostic: compare returned steps vs trajectory stepCount
ctx.logToFile(`[STEP-PROBE] returned=${steps.length} vs trajectory.stepCount=${currentCount}`);
if (steps.length < currentCount) {
ctx.logToFile(`[STEP-PROBE] ⚠️ 775-limit hit! steps=${steps.length} < stepCount=${currentCount} — retrying with stepOffset`);
// 775-LIMIT FIX: Retry with stepOffset to get latest steps
try {
const offset = Math.max(0, currentCount - 10);
const offsetResp = await ctx.sdk.ls.rawRPC('GetCascadeTrajectorySteps', {
cascadeId: bestSessionId,
stepOffset: offset,
verbosity: 1,
});
if (offsetResp?.steps?.length > 0) {
// Replace steps array with offset results
const offsetSteps = offsetResp.steps;
ctx.logToFile(`[STEP-PROBE] offset=${offset} returned ${offsetSteps.length} steps (latest)`);
// Scan for WAITING in offset results
for (let osi = offsetSteps.length - 1; osi >= 0; osi--) {
const oStep = offsetSteps[osi];
if (oStep?.status === 'CORTEX_STEP_STATUS_WAITING') {
const toolCall = oStep?.metadata?.toolCall;
const toolName = toolCall?.name || (oStep.type || '').replace('CORTEX_STEP_TYPE_', '').toLowerCase();
let command = toolName;
if (toolCall?.argumentsJson) {
try {
const args = JSON.parse(toolCall.argumentsJson);
if (args.CommandLine) command = `${toolName}: ${args.CommandLine.substring(0, 1500)}`;
else if (args.TargetFile) command = `${toolName}: ${args.TargetFile}`;
else {
// Show first meaningful value (path, query, etc.)
const val = args.DirectoryPath || args.SearchPath || args.AbsolutePath || args.Url || args.Query || args.Prompt || Object.values(args).find((v: any) => typeof v === 'string' && v.length > 2);
command = val ? `${toolName}: ${String(val).substring(0, 500)}` : `${toolName}: ${Object.keys(args).join(', ')}`;
}
} catch { command = toolName; }
}
const actualIndex = offset + osi;
ctx.logToFile(`[STEP-PROBE] ★ WAITING (via offset)! step=${actualIndex} type=${oStep.type} cmd='${command}'`);
if (actualIndex !== ctx.lastPendingStepIndex) {
ctx.stallProbed = true;
// Track highest step index for auto-resolve
if (actualIndex > ctx.lastPendingStepIndex) {
ctx.lastPendingStepIndex = actualIndex;
}
lastPendingTime = Date.now();
ctx.sawRunningAfterPending = false;
// Skip pending for workspace-less AG windows (project=default)
if (ctx.projectName === 'default') {
ctx.logToFile(`[STEP-PROBE] skip pending: ctx.projectName=default (no workspace)`);
} else {
// Always write pending — Bot decides auto-approve (prevents double-fire)
writePendingApproval({
conversation_id: ctx.activeSessionId,
command,
description: `Step #${actualIndex} (${(oStep.type || '').replace('CORTEX_STEP_TYPE_', '')})`,
step_type: ['view_file', 'list_dir', 'find_by_name', 'read_file', 'grep_search'].includes(toolName) ? 'file_permission'
: ['write_to_file', 'replace_file_content', 'multi_replace_file_content'].includes(toolName) ? 'code_edit'
: ['browser_subagent', 'open_browser_url'].includes(toolName) ? 'browser_subagent'
: toolName,
step_index: actualIndex,
source: 'step_probe_offset',
});
}
}
// NOTE: no break — process ALL parallel WAITING steps
}
}
}
} catch (oe: any) {
ctx.logToFile(`[STEP-PROBE] offset retry failed: ${oe.message.substring(0, 100)}`);
}
}
// Scan last 5 steps backwards to find ALL WAITING steps (parallel tool calls)
let foundWaiting = false;
for (let si = steps.length - 1; si >= Math.max(0, steps.length - 5); si--) {
const step = steps[si];
const stepStatus = step?.status || '';
const stepType = step?.type || '';
if (stepStatus === 'CORTEX_STEP_STATUS_WAITING') {
foundWaiting = true;
// Extract command from metadata.toolCall or direct fields
const toolCall = step?.metadata?.toolCall;
const toolName = toolCall?.name || stepType.replace('CORTEX_STEP_TYPE_', '').toLowerCase();
let command = toolName;
let isSafeToAutoRun = false;
// Parse argumentsJson for command details
if (toolCall?.argumentsJson) {
try {
const args = JSON.parse(toolCall.argumentsJson);
isSafeToAutoRun = args.SafeToAutoRun === true;
if (args.CommandLine) {
command = `${toolName}: ${args.CommandLine.substring(0, 1500)}`;
} else if (args.TargetFile) {
command = `${toolName}: ${args.TargetFile}`;
} else {
const val = args.DirectoryPath || args.SearchPath || args.AbsolutePath || args.Url || args.Query || args.Prompt || Object.values(args).find((v: any) => typeof v === 'string' && v.length > 2);
command = val ? `${toolName}: ${String(val).substring(0, 500)}` : `${toolName}: ${Object.keys(args).join(', ')}`;
}
} catch { command = toolName; }
}
const description = `Step #${si} (${stepType.replace('CORTEX_STEP_TYPE_', '')})`;
ctx.logToFile(`[STEP-PROBE] ★ WAITING! step=${si} type=${stepType} cmd='${command}'`);
if (si !== ctx.lastPendingStepIndex) {
ctx.stallProbed = true; // found WAITING — stop retrying
// Track highest step index for auto-resolve
if (si > ctx.lastPendingStepIndex) {
ctx.lastPendingStepIndex = si;
}
lastPendingTime = Date.now();
ctx.sawRunningAfterPending = false;
// Skip pending for workspace-less AG windows (project=default)
if (ctx.projectName === 'default') {
ctx.logToFile(`[STEP-PROBE] skip pending: ctx.projectName=default (no workspace)`);
} else {
// Always write pending — Bot decides auto-approve (prevents double-fire)
writePendingApproval({
conversation_id: ctx.activeSessionId,
command,
description,
step_type: ['view_file', 'list_dir', 'find_by_name', 'read_file', 'grep_search'].includes(toolName) ? 'file_permission'
: ['write_to_file', 'replace_file_content', 'multi_replace_file_content'].includes(toolName) ? 'code_edit'
: ['browser_subagent', 'open_browser_url'].includes(toolName) ? 'browser_subagent'
: toolName,
step_index: si,
source: 'step_probe',
safe_to_auto_run: isSafeToAutoRun,
});
if (isSafeToAutoRun) {
const truncatedCmd = command.length > 500 ? command.substring(0, 500) + '\n...(이하 생략)' : command;
ctx.writeChatSnapshot(`⚡ **자동 실행됨** (step ${si})\n\n\`\`\`\n${truncatedCmd}\n\`\`\``);
}
}
}
// NOTE: no break — process ALL parallel WAITING steps
}
}
if (!foundWaiting) {
const lastStep = steps[steps.length - 1];
ctx.logToFile(`[STEP-PROBE] lastStep status=${lastStep?.status} type=${lastStep?.type} (not waiting)`);
// CRITICAL: Reset ctx.sessionStalled when step_probe confirms no WAITING.
// Without this, ctx.sessionStalled stays true during long AI generations
// (PLANNER_RESPONSE with delta=0), causing false positive "Run" detections.
ctx.sessionStalled = false;
}
}
} catch (e: any) {
ctx.logToFile(`[STEP-PROBE] error: ${e.message?.substring(0, 150)}`);
// UTF-8 invalid data in a step causes a permanent 500 error on full fetch.
// Attempt stepOffset to skip that step and fetch only recent steps.
const isUtf8Error = e.message?.includes('invalid UTF-8') || e.message?.includes('proto:');
if (isUtf8Error && ctx.sdk) {
try {
const utf8Offset = Math.max(0, currentCount - 20);
ctx.logToFile(`[STEP-PROBE] UTF-8 fallback: retrying with stepOffset=${utf8Offset}`);
const offsetResp = await ctx.sdk.ls.rawRPC('GetCascadeTrajectorySteps', {
cascadeId: bestSessionId,
stepOffset: utf8Offset,
verbosity: 1,
});
if (offsetResp?.steps?.length > 0) {
const offsetSteps = offsetResp.steps;
ctx.logToFile(`[STEP-PROBE] UTF-8 offset=${utf8Offset} returned ${offsetSteps.length} steps`);
let foundWaitingInOffset = false;
for (let osi = offsetSteps.length - 1; osi >= 0; osi--) {
const oStep = offsetSteps[osi];
if (oStep?.status === 'CORTEX_STEP_STATUS_WAITING') {
foundWaitingInOffset = true;
const toolCall = oStep?.metadata?.toolCall;
const toolName = toolCall?.name || (oStep.type || '').replace('CORTEX_STEP_TYPE_', '').toLowerCase();
let command = toolName;
let isSafeToAutoRun = false;
if (toolCall?.argumentsJson) {
try {
const args = JSON.parse(toolCall.argumentsJson);
isSafeToAutoRun = args.SafeToAutoRun === true;
if (args.CommandLine) command = `${toolName}: ${args.CommandLine.substring(0, 1500)}`;
else if (args.TargetFile) command = `${toolName}: ${args.TargetFile}`;
else {
const val = args.DirectoryPath || args.SearchPath || args.AbsolutePath || args.Url || args.Query || args.Prompt || Object.values(args).find((v: any) => typeof v === 'string' && v.length > 2);
command = val ? `${toolName}: ${String(val).substring(0, 500)}` : `${toolName}: ${Object.keys(args).join(', ')}`;
}
} catch { command = toolName; }
}
const actualIndex = utf8Offset + osi;
ctx.logToFile(`[STEP-PROBE] ★ WAITING (via UTF-8 offset)! step=${actualIndex} type=${oStep.type} cmd='${command}'`);
if (actualIndex !== ctx.lastPendingStepIndex) {
ctx.stallProbed = true;
if (actualIndex > ctx.lastPendingStepIndex) ctx.lastPendingStepIndex = actualIndex;
lastPendingTime = Date.now();
ctx.sawRunningAfterPending = false;
if (ctx.projectName !== 'default') {
writePendingApproval({
conversation_id: ctx.activeSessionId,
command,
description: `Step #${actualIndex} (${(oStep.type || '').replace('CORTEX_STEP_TYPE_', '')})`,
step_type: ['view_file', 'list_dir', 'find_by_name', 'read_file', 'grep_search'].includes(toolName) ? 'file_permission'
: ['write_to_file', 'replace_file_content', 'multi_replace_file_content'].includes(toolName) ? 'code_edit'
: ['browser_subagent', 'open_browser_url'].includes(toolName) ? 'browser_subagent'
: toolName,
step_index: actualIndex,
source: 'step_probe_utf8_offset',
safe_to_auto_run: isSafeToAutoRun,
});
if (isSafeToAutoRun) {
const truncatedCmd = command.length > 500 ? command.substring(0, 500) + '\n...(이하 생략)' : command;
ctx.writeChatSnapshot(`⚡ **자동 실행됨** (step ${actualIndex})\n\n\`\`\`\n${truncatedCmd}\n\`\`\``);
}
}
}
// NOTE: no break — process ALL parallel WAITING steps
}
}
if (!foundWaitingInOffset) {
ctx.logToFile(`[STEP-PROBE] UTF-8 offset: no WAITING found — stallProbed=true to prevent loop`);
ctx.stallProbed = true; // prevent retry loop; resets on delta>0
ctx.sessionStalled = false;
}
} else {
ctx.logToFile(`[STEP-PROBE] UTF-8 offset returned empty — stallProbed=true`);
ctx.stallProbed = true;
}
} catch (oe: any) {
ctx.logToFile(`[STEP-PROBE] UTF-8 offset also failed: ${oe.message?.substring(0, 100)}`);
ctx.stallProbed = true; // permanent error — block retry loop; resets on delta>0
}
}
}
}
// Stall fallback REMOVED — step probe is sole fallback source
// (stall fallback was generating false positives and is now redundant)
} else if (!isRunning) {
// ── Error detection: probe when session transitions from RUNNING→idle ──
if (consecutiveIdleCount > 0 && !ctx.stallProbed) {
// Was running, now idle — possible error. Probe once.
try {
const stepsResp = await ctx.sdk.ls.rawRPC('GetCascadeTrajectorySteps', {
cascadeId: bestSessionId,
verbosity: 1,
});
if (stepsResp?.steps?.length > 0) {
const steps = stepsResp.steps;
// Check last 3 steps for error/failed status
for (let si = steps.length - 1; si >= Math.max(0, steps.length - 3); si--) {
const step = steps[si];
const stepStatus = step?.status || '';
const stepType = step?.type || '';
if (stepStatus.includes('ERROR') || stepStatus.includes('FAILED')) {
const toolCall = step?.metadata?.toolCall;
const toolName = toolCall?.name || stepType.replace('CORTEX_STEP_TYPE_', '').toLowerCase();
let command = `⚠️ Error: ${toolName}`;
if (toolCall?.argumentsJson) {
try {
const args = JSON.parse(toolCall.argumentsJson);
if (args.CommandLine) command = `⚠️ Error: ${args.CommandLine.substring(0, 100)}`;
else if (args.TargetFile) command = `⚠️ Error: ${args.TargetFile.split(/[\\/]/).pop()}`;
} catch { }
}
const description = `Step #${si} ${stepStatus} — Retry?`;
ctx.logToFile(`[STEP-PROBE] ★ ERROR! step=${si} status=${stepStatus} type=${stepType}`);
// Notify Discord chat about error
ctx.writeChatSnapshot(`❌ **에러 발생** (step ${si})\n\n\`${command.replace('⚠️ Error: ', '')}\`\n${stepStatus.replace('CORTEX_STEP_STATUS_', '')}`);
if (si !== ctx.lastPendingStepIndex) {
ctx.stallProbed = true;
ctx.lastPendingStepIndex = si;
writePendingApproval({
conversation_id: ctx.activeSessionId,
command,
description,
step_type: 'error_recovery',
step_index: si,
source: 'step_probe_error',
});
}
break;
}
}
}
} catch (e: any) {
ctx.logToFile(`[STEP-PROBE-ERR] error check: ${e.message}`);
}
}
consecutiveIdleCount = 0;
lastModTime = currentModTime;
}
// ── Process latestNotifyUserStep ──
const notifyStep = bestSession.latestNotifyUserStep;
if (notifyStep) {
if (notifyStep.stepIndex > lastNotifyStepIndex) {
lastNotifyStepIndex = notifyStep.stepIndex;
const notifyData = notifyStep.step?.notifyUser || {} as any;
const content = notifyData.notificationContent || '';
// Log full structure once for schema discovery
if (pollCount <= 3 || notifyStep.stepIndex <= lastNotifyStepIndex + 1) {
ctx.logToFile(`[NOTIFY-STEP] keys=[${Object.keys(notifyData).join(',')}]`);
}
ctx.logToFile(`[NOTIFY-STEP] NEW step=${notifyStep.stepIndex} content=${content.length} chars`);
// Filter: relay all non-empty notifications
if (content.length > 10) {
ctx.writeChatSnapshot(`📣 **알림** (step ${notifyStep.stepIndex})\n\n${content}`);
} else if (content.length > 0) {
ctx.logToFile(`[NOTIFY-STEP] skipped (too short: ${content.length} chars): ${content}`);
}
// ── PathsToReview: read and relay referenced artifact files ──
// AG field name is `reviewAbsoluteUris` (confirmed via extension.log NOTIFY-STEP keys)
const pathsToReview: string[] = notifyData.reviewAbsoluteUris
|| notifyData.pathsToReview
|| [];
if (pathsToReview.length > 0) {
ctx.logToFile(`[NOTIFY-STEP] PathsToReview: ${pathsToReview.length} files`);
for (const filePath of pathsToReview.slice(0, 5)) {
try {
if (fs.existsSync(filePath)) {
const fileContent = fs.readFileSync(filePath, 'utf-8');
const fileName = path.basename(filePath);
const MAX_ARTIFACT_SIZE = 8000;
const truncatedContent = fileContent.length > MAX_ARTIFACT_SIZE
? fileContent.substring(0, MAX_ARTIFACT_SIZE) + '\n\n_(이하 생략)_'
: fileContent;
// Write as snapshot with attached_files for bot to send as Discord file
ctx.writeChatSnapshotWithFiles(
`📎 **문서: ${fileName}** (${Math.round(fileContent.length / 1024)}KB)`,
[{ name: fileName, content: truncatedContent }]
);
ctx.logToFile(`[NOTIFY-STEP] relayed artifact: ${fileName} (${fileContent.length} chars)`);
} else {
ctx.logToFile(`[NOTIFY-STEP] artifact not found: ${filePath}`);
}
} catch (e: any) {
ctx.logToFile(`[NOTIFY-STEP] artifact read error: ${e.message}`);
}
}
}
}
} else if (pollCount <= 5) {
ctx.logToFile(`[NOTIFY-STEP] null (no notify step in session)`);
}
// ── Process latestTaskBoundaryStep ──
const taskStep = bestSession.latestTaskBoundaryStep;
if (taskStep) {
if (taskStep.stepIndex > lastTaskStepIndex) {
lastTaskStepIndex = taskStep.stepIndex;
const tb = taskStep.step?.taskBoundary;
if (tb?.taskName) {
const mode = tb.mode ? tb.mode.replace('AGENT_MODE_', '') : '';
const taskText = `${tb.taskName}|${tb.taskStatus || ''}`;
ctx.logToFile(`[TASK-STEP] NEW step=${taskStep.stepIndex} name="${tb.taskName}" mode=${mode}`);
if (taskText !== lastRelayedTaskText) {
lastRelayedTaskText = taskText;
ctx.writeChatSnapshot(`📋 **[${mode}] ${tb.taskName}**\n${tb.taskStatus || ''}\n\n${tb.taskSummary || ''}`);
} else {
ctx.logToFile(`[TASK-STEP] skipped (duplicate): "${tb.taskName}"`);
}
}
}
} else if (pollCount <= 5) {
ctx.logToFile(`[TASK-STEP] null (no task step in session)`);
}
// ── RUNNING → IDLE transition: capture AI response for Discord ──
const userInputIdx = bestSession.lastUserInputStepIndex ?? -1;
if (userInputIdx > lastUserInputStepIdx) {
lastUserInputStepIdx = userInputIdx;
ctx.logToFile(`[USER-MSG] user input detected at step ${userInputIdx}, capturing...`);
// Fetch user message content and relay to Discord
try {
const umResp = await ctx.sdk.ls.rawRPC('GetCascadeTrajectorySteps', {
cascadeId: bestSessionId,
stepOffset: userInputIdx,
verbosity: 1,
});
if (umResp?.steps?.length > 0) {
const umStep = umResp.steps[0];
// User message is in userInput.userResponse (discovered via step dump)
const ui = umStep?.userInput;
const umText = ui?.userResponse || '';
const clientType = ui?.clientType || '';
const isFromIDE = clientType.includes('IDE');
ctx.logToFile(`[USER-MSG] step=${userInputIdx} type=${umStep?.type} client=${clientType} text=${umText.substring(0, 100)}`);
// Skip echo: if this text was recently sent from Discord, don't relay back
const trimmed = umText.trim();
const sentAt = ctx.recentDiscordSentTexts.get(trimmed);
if (sentAt && (Date.now() - sentAt) < 60_000) {
ctx.recentDiscordSentTexts.delete(trimmed);
ctx.logToFile(`[USER-MSG] skipped echo relay (Discord origin, ${Math.round((Date.now() - sentAt) / 1000)}s ago)`);
} else if (umText.length > 2) {
// Content-based dedup: AG can create multiple USER_INPUT steps for the same message
// (e.g. comment-while-working feature). Skip if same text relayed within 30s.
const dedupKey = `user_msg:${trimmed}`;
const lastRelayed = lastSnapshotText.get(dedupKey);
if (lastRelayed && (Date.now() - Number(lastRelayed)) < 30_000) {
ctx.logToFile(`[USER-MSG] skipped duplicate relay (same text ${Math.round((Date.now() - Number(lastRelayed)) / 1000)}s ago)`);
} else {
lastSnapshotText.set(dedupKey, String(Date.now()));
const truncated = umText.length > 800
? umText.substring(0, 800) + '\n\n_(이하 생략)_'
: umText;
const source = isFromIDE ? 'AG 직접 입력' : 'API';
ctx.writeChatSnapshot(`👤 **사용자 (${source})**\n\n${truncated}`);
ctx.logToFile(`[USER-MSG] relayed ${umText.length} chars from step ${userInputIdx}`);
}
} else {
ctx.writeChatSnapshot(`👤 **사용자** — _(내용 없음)_`);
ctx.logToFile(`[USER-MSG] step ${userInputIdx} text empty`);
}
}
} catch (umErr: any) {
ctx.logToFile(`[USER-MSG] capture error: ${umErr.message?.substring(0, 100)}`);
// Still notify discord about user input even without content
ctx.writeChatSnapshot(`👤 **사용자 (AG 직접 입력)** — _(캡처 실패)_`);
}
}
if (wasRunning && !isRunning && currentCount > lastResponseCaptureStep) {
ctx.logToFile(`[RESPONSE-CAPTURE] RUNNING→IDLE, steps=${currentCount}, capturing response...`);
lastResponseCaptureStep = currentCount;
try {
const offset = Math.max(0, currentCount - 5);
const latestResp = await ctx.sdk.ls.rawRPC('GetCascadeTrajectorySteps', {
cascadeId: bestSessionId,
stepOffset: offset,
verbosity: 1, // CLIENT_TRAJECTORY_VERBOSITY_DEBUG — includes full plannerResponse text
});
if (latestResp?.steps?.length > 0) {
const steps = latestResp.steps;
for (let ri = steps.length - 1; ri >= 0; ri--) {
const s = steps[ri];
const sType = s?.type || '';
if (sType.includes('PLANNER_RESPONSE') && !sType.includes('EPHEMERAL')) {
let textContent = '';
// Extract from plannerResponse field
const pr = s?.plannerResponse;
if (pr) {
// Priority: modifiedResponse (confirmed field from AG)
if (pr.modifiedResponse) textContent = pr.modifiedResponse;
else if (pr.rawText) textContent = pr.rawText;
else if (pr.text) textContent = pr.text;
else if (pr.message) textContent = typeof pr.message === 'string' ? pr.message : '';
else if (pr.content?.parts) {
for (const p of pr.content.parts) {
if (p?.text) textContent += p.text;
}
}
// Log first time to capture actual field names
if (!textContent) {
ctx.logToFile(`[RESPONSE-CAPTURE] plannerResponse keys: [${Object.keys(pr).join(',')}] values(100): ${JSON.stringify(pr).substring(0, 200)}`);
}
}
// Extract from ephemeralMessage field
const em = s?.ephemeralMessage;
if (!textContent && em) {
if (typeof em === 'string') textContent = em;
else if (em.message) textContent = em.message;
else if (em.content) textContent = typeof em.content === 'string' ? em.content : JSON.stringify(em.content);
}
// Fallback: metadata, content, rawOutput
if (!textContent) {
const parts = s?.content?.parts || s?.parts || [];
for (const p of parts) {
if (p?.text) textContent += p.text;
}
}
if (!textContent && s?.metadata?.text) textContent = s.metadata.text;
if (!textContent && s?.rawOutput) textContent = typeof s.rawOutput === 'string' ? s.rawOutput : JSON.stringify(s.rawOutput);
if (textContent.length > 10) {
ctx.logToFile(`[RESPONSE-CAPTURE] found ${sType} (${textContent.length} chars) at step ${offset + ri}`);
const truncated = textContent.length > 3500
? textContent.substring(0, 3500) + '\n\n_(이하 생략)_'
: textContent;
ctx.writeChatSnapshot(`💬 **AI 응답**\n\n${truncated}`);
break;
} else {
ctx.logToFile(`[RESPONSE-CAPTURE] ${sType} too short (${textContent.length}), keys=[${Object.keys(s).join(',')}]`);
}
}
}
}
} catch (re: any) {
ctx.logToFile(`[RESPONSE-CAPTURE] error: ${re.message.substring(0, 100)}`);
}
// Send explicit IDLE notification so user knows the step is done
ctx.writeChatSnapshot(`✅ **Step ${currentCount} 작업 종료**`);
}
// ── Diff review detection: if session just went IDLE and files were modified ──
if (wasRunning && !isRunning && pendingModifiedFiles.length > 0) {
// Phase 3 FIX: Filter out brain/ artifact files (task.md, implementation_plan.md etc.)
// These are AG internal artifacts, NOT code changes needing user review.
const brainPathSegment = '.gemini/antigravity/brain/';
const codeOnlyFiles: string[] = [];
const codeOnlyPaths: string[] = [];
const codeOnlySteps: number[] = [];
for (let fi = 0; fi < pendingModifiedFilePaths.length; fi++) {
const normalized = pendingModifiedFilePaths[fi].replace(/\\/g, '/').toLowerCase();
if (!normalized.includes(brainPathSegment)) {
codeOnlyFiles.push(pendingModifiedFiles[fi]);
codeOnlyPaths.push(pendingModifiedFilePaths[fi]);
if (fi < pendingEditStepIndices.length) {
codeOnlySteps.push(pendingEditStepIndices[fi]);
}
} else {
ctx.logToFile(`[DIFF-REVIEW] skip brain artifact: ${pendingModifiedFiles[fi]}`);
}
}
if (codeOnlyFiles.length > 0) {
const fileList = codeOnlyFiles.slice(0, 5).join(', ');
const fileCount = codeOnlyFiles.length;
// Capture variables for delayed closure (poll loop may change them)
const capturedSessionId = ctx.activeSessionId;
const capturedStepCount = currentCount;
const capturedModFiles = codeOnlyPaths.slice(0, 20);
const capturedEditSteps = codeOnlySteps.slice(0, 20);
ctx.logToFile(`[DIFF-REVIEW] IDLE with ${fileCount} code files: ${fileList}`);
// Reset tracking arrays immediately (so next session starts fresh)
pendingModifiedFiles = [];
pendingModifiedFilePaths = [];
pendingEditStepIndices = [];
// Delay diff_review pending by 8s so AI response snapshot arrives
// on Discord before the approval buttons (snapshot scanner needs time
// to relay the response text to Discord ahead of the approval embed)
setTimeout(() => {
ctx.logToFile(`[DIFF-REVIEW] deferred pending creation (8s) for: ${fileList}`);
ctx.writeChatSnapshot(`📝 **코드 리뷰 대기**\n\n수정된 파일: ${fileList}\n\nAG에서 Accept all / Reject all로 확인해주세요.`);
writePendingApproval({
conversation_id: capturedSessionId,
command: `코드 리뷰: ${fileList}`,
description: `${fileCount}개 파일이 수정되었습니다`,
step_type: 'diff_review',
step_index: capturedStepCount,
source: 'diff_review_detect',
buttons: [
{ text: 'Accept all', index: 0 },
{ text: 'Reject all', index: 1 },
],
modified_files: capturedModFiles,
edit_step_indices: capturedEditSteps,
});
}, 8000);
} else {
ctx.logToFile(`[DIFF-REVIEW] all ${pendingModifiedFiles.length} modified files are brain artifacts — skip diff_review`);
pendingModifiedFiles = [];
pendingModifiedFilePaths = [];
pendingEditStepIndices = [];
}
}
wasRunning = isRunning;
} catch (e: any) {
if (pollCount <= 5 || pollCount % 20 === 0) {
console.log(`Gravity Bridge: [POLL#${pollCount}] error: ${e.message}`);
}
}
}, 5000);
}
// ─── Response Watcher (Discord approval → Antigravity RPC) ───
// setupResponseWatcher → moved to ./approval-handler.ts
// processResponseFile → moved to ./approval-handler.ts
/**
* Extract AI text from a PLANNER_RESPONSE step.
* Known structure: {type, status, metadata, plannerResponse, ephemeralMessage, ...}
* ephemeralMessage = system prompt (SKIP), plannerResponse = AI content
*/
// extractPlannerText, filterEphemeral, extractToolCommand, extractToolDescription → extracted to ./step-utils.ts
/** Write a pending approval file matching Bot's ApprovalRequest dataclass. */
export function writePendingApproval(data: { conversation_id: string; command: string; description: string; step_type?: string; step_index?: number; source?: string; buttons?: Array<{ text: string; index: number }>; modified_files?: string[]; edit_step_indices?: number[]; safe_to_auto_run?: boolean }) {
try {
const pendingDir = path.join(ctx.bridgePath, 'pending');
if (!fs.existsSync(pendingDir)) { fs.mkdirSync(pendingDir, { recursive: true }); }
// ── Dedup: if DOM observer already created a "Run"-only pending, MERGE detailed info into it ──
const nowMs = Date.now();
const DEDUP_WINDOW_MS = 15_000; // 15 second dedup window
// ── FIX: Memory-based dedup (survives pending file deletion by Collector/Bot) ──
// Pending files are deleted when Bot writes a response (bridge.py L461, collector.py L259).
// File-based dedup alone fails after deletion → same step_index creates new pending → loop.
if (data.step_index !== undefined && data.conversation_id) {
const memKey = `${data.conversation_id}:${data.step_index}`;
const prevTs = recentPendingSteps.get(memKey);
if (prevTs && (nowMs - prevTs) < PENDING_MEMORY_TTL_MS) {
ctx.logToFile(`[DEDUP-MEM] skip: step_index ${data.step_index} already created ${Math.round((nowMs - prevTs) / 1000)}s ago`);
return;
}
// Cleanup stale entries (keep map small)
for (const [k, ts] of recentPendingSteps) {
if (nowMs - ts > PENDING_MEMORY_TTL_MS) recentPendingSteps.delete(k);
}
}
try {
const existingFiles = fs.readdirSync(pendingDir).filter((f: string) => f.endsWith('.json'));
for (const ef of existingFiles) {
const efPath = path.join(pendingDir, ef);
const existing = JSON.parse(fs.readFileSync(efPath, 'utf-8'));
if (existing.source === 'dom_observer' && existing.status === 'pending'
&& existing.project_name === ctx.projectName) { // CRITICAL: same project only
const age = nowMs - (existing.timestamp * 1000);
if (age < DEDUP_WINDOW_MS && age >= 0) {
// MERGE: update DOM observer pending with detailed step_probe info
existing.command = data.command;
existing.description = data.description;
if (data.step_type) existing.step_type = data.step_type;
if (data.step_index !== undefined) existing.step_index = data.step_index;
if (data.safe_to_auto_run !== undefined) existing.safe_to_auto_run = data.safe_to_auto_run;
existing.source = 'dom_observer+step_probe'; // mark as merged
fs.promises.writeFile(efPath, JSON.stringify(existing, null, 2), 'utf-8').catch(e => {
ctx.logToFile(`[DEDUP] merge write error: ${e.message}`);
});
ctx.logToFile(`[DEDUP] MERGED step_probe info into DOM pending: ${ef} cmd="${data.command.substring(0, 60)}"`);
// Record in memory dedup
if (data.step_index !== undefined && data.conversation_id) {
recentPendingSteps.set(`${data.conversation_id}:${data.step_index}`, nowMs);
}
return;
}
}
// Dedup: skip if step_probe already created pending for same step_index IN SAME SESSION (within window)
if (existing.status === 'pending' && existing.project_name === ctx.projectName
&& existing.conversation_id === data.conversation_id // CRITICAL: same session only
&& data.step_index !== undefined && existing.step_index === data.step_index) {
const age = nowMs - (existing.timestamp * 1000);
if (age < DEDUP_WINDOW_MS && age >= 0) {
ctx.logToFile(`[DEDUP] skip: step_index ${data.step_index} already pending in ${ef}`);
return;
}
}
}
} catch (dedupErr: any) {
ctx.logToFile(`[DEDUP] check error (non-fatal): ${dedupErr.message}`);
}
const id = nowMs.toString();
// Auto-inject 3-button array for file_permission steps
// (step_probe sets step_type but not buttons; DOM observer /pending handler
// only injects buttons when command contains 'allow' which misses step_probe paths)
let buttons = data.buttons;
if (!buttons && data.step_type === 'file_permission') {
buttons = [
{ text: 'Allow Once', index: 0 },
{ text: 'Allow This Conversation', index: 1 },
{ text: 'Deny', index: 2 },
];
}
const payload: Record<string, any> = {
request_id: id,
conversation_id: data.conversation_id,
command: data.command,
description: data.description,
timestamp: nowMs / 1000,
status: 'pending',
discord_message_id: 0,
project_name: ctx.projectName,
...(data.step_type ? { step_type: data.step_type } : {}),
...(data.step_index !== undefined ? { step_index: data.step_index } : {}),
...(data.source ? { source: data.source } : {}),
...(buttons ? { buttons } : {}),
...(data.modified_files ? { modified_files: data.modified_files } : {}),
...(data.edit_step_indices && data.edit_step_indices.length > 0 ? { edit_step_indices: data.edit_step_indices } : {}),
};
// WS route (preferred) — skip file write to prevent duplicate Discord delivery
if (ctx.wsBridge && ctx.wsBridge.isConnected()) {
ctx.wsBridge.sendPending({
request_id: id,
command: data.command,
description: data.description,
step_type: data.step_type,
status: 'pending',
buttons: buttons,
project_name: ctx.projectName,
modified_files: data.modified_files,
edit_step_indices: data.edit_step_indices,
});
ctx.logToFile(`[PENDING-WS] sent pending ${id} cmd="${data.command.substring(0, 60)}"`);
// Cache diff_review metadata in-memory (needed for RPC acknowledgement)
if (data.step_type === 'diff_review' && (data.edit_step_indices?.length || data.modified_files?.length)) {
ctx.diffReviewMetadata.set(id, {
edit_step_indices: data.edit_step_indices || [],
modified_files: data.modified_files || [],
});
ctx.logToFile(`[DIFF-REVIEW-CACHE] stored metadata for rid=${id}`);
}
// Record in memory dedup
if (data.step_index !== undefined && data.conversation_id) {
recentPendingSteps.set(`${data.conversation_id}:${data.step_index}`, nowMs);
}
if (data.conversation_id) { writeRegistration(data.conversation_id); }
return;
}
// File route (fallback — only when WS is NOT connected)
fs.promises.writeFile(path.join(pendingDir, `${id}.json`), JSON.stringify(payload, null, 2), 'utf-8').catch(e => {
console.error(`Gravity Bridge: failed to write pending: ${e.message}`);
});
console.log(`Gravity Bridge: pending approval written → ${id}.json`);
// Cache diff_review metadata in-memory (survives pending file deletion by Collector/Bot)
if (data.step_type === 'diff_review' && (data.edit_step_indices?.length || data.modified_files?.length)) {
ctx.diffReviewMetadata.set(id, {
edit_step_indices: data.edit_step_indices || [],
modified_files: data.modified_files || [],
});
ctx.logToFile(`[DIFF-REVIEW-CACHE] stored metadata for rid=${id}: steps=[${(data.edit_step_indices || []).join(',')}] files=${(data.modified_files || []).length}`);
}
// Record in memory dedup cache (survives file deletion by Collector/Bot)
if (data.step_index !== undefined && data.conversation_id) {
recentPendingSteps.set(`${data.conversation_id}:${data.step_index}`, nowMs);
}
// Register session → project mapping (correct because ctx.projectName is per-window)
if (data.conversation_id) { writeRegistration(data.conversation_id); }
} catch (e: any) {
console.log(`Gravity Bridge: pending write error: ${e.message}`);
}
}
// tryApprovalStrategies → moved to ./approval-handler.ts
// ─── Activation ───
/**
* Initialize step probe with shared context.
* Called from activate() in extension.ts.
*/
export function initStepProbe(context: BridgeContext) {
ctx = context;
initApprovalHandler(context, () => activeTrajectoryId);
setupMonitor();
setupResponseWatcher();
}