feat: subscribeToStream on new cascade — real-time StreamCascadeReactiveUpdates subscription

This commit is contained in:
2026-03-07 23:23:16 +09:00
parent 12131b9103
commit 87094e00b0
4 changed files with 124 additions and 169 deletions

Binary file not shown.

View File

@@ -334,6 +334,7 @@ function activate(context) {
else if (stepIdx > 0 && summary) { else if (stepIdx > 0 && summary) {
// New conversation with AI response! // New conversation with AI response!
console.log(`Gravity Bridge: [LS] NEW conversation ${key.substring(0, 8)} at step ${stepIdx} "${summary.substring(0, 40)}"`); console.log(`Gravity Bridge: [LS] NEW conversation ${key.substring(0, 8)} at step ${stepIdx} "${summary.substring(0, 40)}"`);
subscribeToStream(key);
// Try to extract AI text from extensionLogs // Try to extract AI text from extensionLogs
const aiText = extractFromLogs(parsed); const aiText = extractFromLogs(parsed);
if (aiText) { if (aiText) {
@@ -435,96 +436,71 @@ function activate(context) {
} }
return null; return null;
} }
// ========== Trial D2: Streaming RPC with protocol_version=1 ========== // ========== Stream subscription for active cascades ==========
setTimeout(async () => { function subscribeToStream(cascadeId) {
if (!lsPort || !lsCsrf) { if (!lsPort || !lsCsrf) {
console.log('Gravity Bridge: [Trial D2] Skipped — no LS');
return; return;
} }
console.log(`Gravity Bridge: [Trial D2] Streaming RPCs with proto_version=1...`);
const http = require('http'); const http = require('http');
function tryProtoRPC(method, protoBody, timeout = 12000) { console.log(`Gravity Bridge: [Stream] Subscribing to cascade ${cascadeId.substring(0, 8)}...`);
return new Promise((resolve) => { const gidBuf = Buffer.from(cascadeId, 'utf-8');
// ConnectRPC frame: [flag(1)] [length(4 big-endian)] [message] // field1=version(varint=1), field2=cascadeId(string)
const frame = Buffer.alloc(5 + protoBody.length); const proto = Buffer.alloc(2 + 2 + gidBuf.length);
frame[0] = 0x00; // no compression proto[0] = 0x08;
frame.writeUInt32BE(protoBody.length, 1); proto[1] = 0x01;
protoBody.copy(frame, 5); proto[2] = 0x12;
proto[3] = gidBuf.length;
gidBuf.copy(proto, 4);
// ConnectRPC frame
const frame = Buffer.alloc(5 + proto.length);
frame[0] = 0x00;
frame.writeUInt32BE(proto.length, 1);
proto.copy(frame, 5);
const req = http.request({ const req = http.request({
hostname: '127.0.0.1', port: lsPort, hostname: '127.0.0.1', port: lsPort,
path: `/exa.language_server_pb.LanguageServerService/${method}`, path: '/exa.language_server_pb.LanguageServerService/StreamCascadeReactiveUpdates',
method: 'POST', method: 'POST',
headers: { headers: {
'Content-Type': 'application/connect+proto', 'Content-Type': 'application/connect+proto',
'Connect-Protocol-Version': '1', 'Connect-Protocol-Version': '1',
'x-codeium-csrf-token': lsCsrf, 'x-codeium-csrf-token': lsCsrf,
}, },
timeout: timeout timeout: 30000
}, (res) => { }, (res) => {
const chunks = []; console.log(`Gravity Bridge: [Stream] Connected! status=${res.statusCode}`);
res.on('data', (c) => chunks.push(c)); let totalBytes = 0;
const allChunks = [];
res.on('data', (chunk) => {
allChunks.push(chunk);
totalBytes += chunk.length;
// Log each chunk as it arrives (real-time streaming)
const text = chunk.toString('utf-8');
console.log(`Gravity Bridge: [Stream] chunk (${chunk.length}B): ${text.substring(0, 300)}`);
});
res.on('end', () => { res.on('end', () => {
const buf = Buffer.concat(chunks); const fullBuf = Buffer.concat(allChunks);
// Skip ConnectRPC frame header (5 bytes) to get proto/JSON content const fullText = fullBuf.toString('utf-8');
const text = buf.toString('utf-8'); console.log(`Gravity Bridge: [Stream] Ended. Total ${totalBytes}B in ${allChunks.length} chunks`);
resolve(text); // Try to extract readable text from the stream data
// Look for strings in the protobuf data
const readable = fullText.replace(/[\x00-\x1F\x7F-\x9F]/g, ' ').replace(/\s+/g, ' ').trim();
if (readable.length > 20) {
console.log(`Gravity Bridge: [Stream] Readable: ${readable.substring(0, 1000)}`);
}
// Check for error messages
if (fullText.includes('"error"')) {
console.log(`Gravity Bridge: [Stream] Error in response: ${fullText.substring(0, 500)}`);
}
}); });
}); });
req.on('error', (e) => resolve(`err:${e.message}`)); req.on('error', (e) => console.log(`Gravity Bridge: [Stream] Error: ${e.message}`));
req.on('timeout', () => { req.destroy(); resolve('timeout(stream open)'); }); req.on('timeout', () => {
console.log(`Gravity Bridge: [Stream] Timeout (30s) — closing`);
req.destroy();
});
req.write(frame); req.write(frame);
req.end(); req.end();
});
} }
// Get latest trajectory ID first, then try streams WITH cascadeId
try {
const dRaw = await vscode.commands.executeCommand('antigravity.getDiagnostics');
const d = typeof dRaw === 'string' ? JSON.parse(dRaw) : dRaw;
const ts = d?.recentTrajectories || [];
if (ts.length > 0) {
const latest = ts[ts.length - 1];
const gid = latest.googleAgentId || '';
console.log(`Gravity Bridge: [Trial D3] Using cascade ${gid.substring(0, 8)} step=${latest.lastStepIndex}`);
const gidBuf = Buffer.from(gid, 'utf-8');
// Attempt 1: field1=version(varint=1), field2=cascadeId(string)
// 0x08 0x01 0x12 <len> <cascadeId>
const proto1 = Buffer.alloc(2 + 2 + gidBuf.length);
proto1[0] = 0x08;
proto1[1] = 0x01; // field1 varint = 1
proto1[2] = 0x12;
proto1[3] = gidBuf.length; // field2 string
gidBuf.copy(proto1, 4);
let r = await tryProtoRPC('StreamCascadeReactiveUpdates', proto1);
console.log(`Gravity Bridge: [Trial D3] StreamCascade(v1+cascade): ${r.substring(0, 800)}`);
// Attempt 2: field1=cascadeId(string), field2=version(varint=1)
// 0x0A <len> <cascadeId> 0x10 0x01
const proto2 = Buffer.alloc(2 + gidBuf.length + 2);
proto2[0] = 0x0A;
proto2[1] = gidBuf.length; // field1 string
gidBuf.copy(proto2, 2);
proto2[2 + gidBuf.length] = 0x10; // field2 varint
proto2[3 + gidBuf.length] = 0x01;
r = await tryProtoRPC('StreamCascadeReactiveUpdates', proto2);
console.log(`Gravity Bridge: [Trial D3] StreamCascade(cascade+v1): ${r.substring(0, 800)}`);
// Attempt 3: StreamCascadeSummariesReactiveUpdates with version+cascadeId
r = await tryProtoRPC('StreamCascadeSummariesReactiveUpdates', proto1);
console.log(`Gravity Bridge: [Trial D3] StreamSummaries(v1+cascade): ${r.substring(0, 800)}`);
// Attempt 4: GetCascadeTrajectorySteps with proto
const stepIdx = Math.max(0, latest.lastStepIndex - 1);
const stepsProto = Buffer.alloc(2 + gidBuf.length + 2);
stepsProto[0] = 0x0A;
stepsProto[1] = gidBuf.length;
gidBuf.copy(stepsProto, 2);
stepsProto[2 + gidBuf.length] = 0x10;
stepsProto[3 + gidBuf.length] = stepIdx;
r = await tryProtoRPC('GetCascadeTrajectorySteps', stepsProto);
console.log(`Gravity Bridge: [Trial D3] Steps(proto): ${r.substring(0, 1000)}`);
}
}
catch (e) {
console.log(`Gravity Bridge: [Trial D3] err: ${e.message}`);
}
}, 15000);
// Start LS bridge after a delay // Start LS bridge after a delay
setTimeout(async () => { setTimeout(async () => {
const found = await discoverLS(); const found = await discoverLS();

File diff suppressed because one or more lines are too long

View File

@@ -313,7 +313,7 @@ export function activate(context: vscode.ExtensionContext) {
} else if (stepIdx > 0 && summary) { } else if (stepIdx > 0 && summary) {
// New conversation with AI response! // New conversation with AI response!
console.log(`Gravity Bridge: [LS] NEW conversation ${key.substring(0, 8)} at step ${stepIdx} "${summary.substring(0, 40)}"`); console.log(`Gravity Bridge: [LS] NEW conversation ${key.substring(0, 8)} at step ${stepIdx} "${summary.substring(0, 40)}"`);
subscribeToStream(key);
// Try to extract AI text from extensionLogs // Try to extract AI text from extensionLogs
const aiText = extractFromLogs(parsed); const aiText = extractFromLogs(parsed);
if (aiText) { if (aiText) {
@@ -429,95 +429,74 @@ export function activate(context: vscode.ExtensionContext) {
// ========== Trial D2: Streaming RPC with protocol_version=1 ========== // ========== Stream subscription for active cascades ==========
setTimeout(async () => { function subscribeToStream(cascadeId: string): void {
if (!lsPort || !lsCsrf) { console.log('Gravity Bridge: [Trial D2] Skipped — no LS'); return; } if (!lsPort || !lsCsrf) { return; }
console.log(`Gravity Bridge: [Trial D2] Streaming RPCs with proto_version=1...`);
const http = require('http'); const http = require('http');
console.log(`Gravity Bridge: [Stream] Subscribing to cascade ${cascadeId.substring(0, 8)}...`);
function tryProtoRPC(method: string, protoBody: Buffer, timeout: number = 12000): Promise<string> { const gidBuf = Buffer.from(cascadeId, 'utf-8');
return new Promise((resolve) => { // field1=version(varint=1), field2=cascadeId(string)
// ConnectRPC frame: [flag(1)] [length(4 big-endian)] [message] const proto = Buffer.alloc(2 + 2 + gidBuf.length);
const frame = Buffer.alloc(5 + protoBody.length); proto[0] = 0x08; proto[1] = 0x01;
frame[0] = 0x00; // no compression proto[2] = 0x12; proto[3] = gidBuf.length;
frame.writeUInt32BE(protoBody.length, 1); gidBuf.copy(proto, 4);
protoBody.copy(frame, 5);
// ConnectRPC frame
const frame = Buffer.alloc(5 + proto.length);
frame[0] = 0x00;
frame.writeUInt32BE(proto.length, 1);
proto.copy(frame, 5);
const req = http.request({ const req = http.request({
hostname: '127.0.0.1', port: lsPort, hostname: '127.0.0.1', port: lsPort,
path: `/exa.language_server_pb.LanguageServerService/${method}`, path: '/exa.language_server_pb.LanguageServerService/StreamCascadeReactiveUpdates',
method: 'POST', method: 'POST',
headers: { headers: {
'Content-Type': 'application/connect+proto', 'Content-Type': 'application/connect+proto',
'Connect-Protocol-Version': '1', 'Connect-Protocol-Version': '1',
'x-codeium-csrf-token': lsCsrf, 'x-codeium-csrf-token': lsCsrf,
}, },
timeout: timeout timeout: 30000
}, (res: any) => { }, (res: any) => {
const chunks: Buffer[] = []; console.log(`Gravity Bridge: [Stream] Connected! status=${res.statusCode}`);
res.on('data', (c: Buffer) => chunks.push(c)); let totalBytes = 0;
const allChunks: Buffer[] = [];
res.on('data', (chunk: Buffer) => {
allChunks.push(chunk);
totalBytes += chunk.length;
// Log each chunk as it arrives (real-time streaming)
const text = chunk.toString('utf-8');
console.log(`Gravity Bridge: [Stream] chunk (${chunk.length}B): ${text.substring(0, 300)}`);
});
res.on('end', () => { res.on('end', () => {
const buf = Buffer.concat(chunks); const fullBuf = Buffer.concat(allChunks);
// Skip ConnectRPC frame header (5 bytes) to get proto/JSON content const fullText = fullBuf.toString('utf-8');
const text = buf.toString('utf-8'); console.log(`Gravity Bridge: [Stream] Ended. Total ${totalBytes}B in ${allChunks.length} chunks`);
resolve(text);
}); // Try to extract readable text from the stream data
}); // Look for strings in the protobuf data
req.on('error', (e: any) => resolve(`err:${e.message}`)); const readable = fullText.replace(/[\x00-\x1F\x7F-\x9F]/g, ' ').replace(/\s+/g, ' ').trim();
req.on('timeout', () => { req.destroy(); resolve('timeout(stream open)'); }); if (readable.length > 20) {
req.write(frame); req.end(); console.log(`Gravity Bridge: [Stream] Readable: ${readable.substring(0, 1000)}`);
});
} }
// Get latest trajectory ID first, then try streams WITH cascadeId // Check for error messages
try { if (fullText.includes('"error"')) {
const dRaw = await vscode.commands.executeCommand('antigravity.getDiagnostics'); console.log(`Gravity Bridge: [Stream] Error in response: ${fullText.substring(0, 500)}`);
const d = typeof dRaw === 'string' ? JSON.parse(dRaw) : dRaw; }
const ts = d?.recentTrajectories || []; });
if (ts.length > 0) { });
const latest = ts[ts.length - 1]; req.on('error', (e: any) => console.log(`Gravity Bridge: [Stream] Error: ${e.message}`));
const gid = latest.googleAgentId || ''; req.on('timeout', () => {
console.log(`Gravity Bridge: [Trial D3] Using cascade ${gid.substring(0, 8)} step=${latest.lastStepIndex}`); console.log(`Gravity Bridge: [Stream] Timeout (30s) — closing`);
req.destroy();
const gidBuf = Buffer.from(gid, 'utf-8'); });
req.write(frame);
// Attempt 1: field1=version(varint=1), field2=cascadeId(string) req.end();
// 0x08 0x01 0x12 <len> <cascadeId>
const proto1 = Buffer.alloc(2 + 2 + gidBuf.length);
proto1[0] = 0x08; proto1[1] = 0x01; // field1 varint = 1
proto1[2] = 0x12; proto1[3] = gidBuf.length; // field2 string
gidBuf.copy(proto1, 4);
let r = await tryProtoRPC('StreamCascadeReactiveUpdates', proto1);
console.log(`Gravity Bridge: [Trial D3] StreamCascade(v1+cascade): ${r.substring(0, 800)}`);
// Attempt 2: field1=cascadeId(string), field2=version(varint=1)
// 0x0A <len> <cascadeId> 0x10 0x01
const proto2 = Buffer.alloc(2 + gidBuf.length + 2);
proto2[0] = 0x0A; proto2[1] = gidBuf.length; // field1 string
gidBuf.copy(proto2, 2);
proto2[2 + gidBuf.length] = 0x10; // field2 varint
proto2[3 + gidBuf.length] = 0x01;
r = await tryProtoRPC('StreamCascadeReactiveUpdates', proto2);
console.log(`Gravity Bridge: [Trial D3] StreamCascade(cascade+v1): ${r.substring(0, 800)}`);
// Attempt 3: StreamCascadeSummariesReactiveUpdates with version+cascadeId
r = await tryProtoRPC('StreamCascadeSummariesReactiveUpdates', proto1);
console.log(`Gravity Bridge: [Trial D3] StreamSummaries(v1+cascade): ${r.substring(0, 800)}`);
// Attempt 4: GetCascadeTrajectorySteps with proto
const stepIdx = Math.max(0, latest.lastStepIndex - 1);
const stepsProto = Buffer.alloc(2 + gidBuf.length + 2);
stepsProto[0] = 0x0A; stepsProto[1] = gidBuf.length;
gidBuf.copy(stepsProto, 2);
stepsProto[2 + gidBuf.length] = 0x10;
stepsProto[3 + gidBuf.length] = stepIdx;
r = await tryProtoRPC('GetCascadeTrajectorySteps', stepsProto);
console.log(`Gravity Bridge: [Trial D3] Steps(proto): ${r.substring(0, 1000)}`);
} }
} catch (e: any) { console.log(`Gravity Bridge: [Trial D3] err: ${e.message}`); }
}, 15000);
// Start LS bridge after a delay // Start LS bridge after a delay