/** * WebSocket Bridge Client — connects Extension to the Hub server. * * Replaces file-based IPC for: * - Pending approvals (Extension → Hub → Bot → Discord) * - User responses (Discord → Bot → Hub → Extension) * - Chat snapshots (Extension → Hub → Bot → Discord) * - Commands (Discord → Bot → Hub → Extension) * - Session registration * - Auto-resolve notifications * * Features: * - Exponential backoff + jitter reconnection * - Message queue (survives reconnection) * - Heartbeat ping/pong * - First-message JWT authentication */ import * as vscode from 'vscode'; // ─── Types ─── export interface WSMessage { type: string; data?: any; msg_id?: string; } export interface WSAuthMessage { type: 'auth'; token?: string; registration_code?: string; project: string; pc: string; } export interface WSAuthOkResponse { type: 'auth_ok'; conn_id: string; instance_number: number; session_token: string; active_count: number; } export interface WSPendingData { request_id: string; command: string; description?: string; step_type?: string; status?: string; buttons?: Array<{ text: string; index: number }>; project_name?: string; // diff_review metadata edit_step_indices?: number[]; modified_files?: string[]; } export interface WSResponseData { request_id: string; approved: boolean; button_index?: number; step_type?: string; project_name?: string; } export interface WSCommandData { text: string; project_name?: string; action?: string; } export interface WSChatData { content: string; attached_files?: Array<{ name: string; content: string }>; conversation_id?: string; project_name?: string; } export interface WSRegisterData { conversation_id: string; project_name: string; } // ─── Event Handlers ─── export interface WSBridgeHandlers { onResponse?: (data: WSResponseData) => void; onCommand?: (data: WSCommandData) => void; onInstanceUpdate?: (activeCount: number, instances: Array<{ instance_number: number; pc: string }>) => void; onConnected?: (connId: string, instanceNumber: number, sessionToken: string) => void; onDisconnected?: () => void; onError?: (error: string) => void; } // ─── Constants ─── const INITIAL_RECONNECT_DELAY = 1000; // 1s const MAX_RECONNECT_DELAY = 60000; // 60s const RECONNECT_JITTER = 0.3; // ±30% const HEARTBEAT_INTERVAL = 25000; // 25s (server expects 30s) const MAX_QUEUE_SIZE = 200; const AUTH_TIMEOUT = 10000; // 10s // ─── WSBridgeClient ─── export class WSBridgeClient { private ws: any = null; // WebSocket instance (Node.js ws module) private hubUrl: string; private registrationCode: string; private project: string; private pcName: string; private handlers: WSBridgeHandlers; private logFn: (msg: string) => void; // Connection state private connected = false; private authenticated = false; private connId = ''; private instanceNumber = 0; private sessionToken = ''; private shouldReconnect = true; private reconnectDelay = INITIAL_RECONNECT_DELAY; private reconnectTimer: NodeJS.Timeout | null = null; private heartbeatTimer: NodeJS.Timeout | null = null; private authTimer: NodeJS.Timeout | null = null; private lastPongTime: number = 0; // Message queue (survives reconnection) private messageQueue: WSMessage[] = []; private msgIdCounter = 0; constructor( hubUrl: string, registrationCode: string, project: string, pcName: string, handlers: WSBridgeHandlers, logFn: (msg: string) => void, ) { this.hubUrl = hubUrl; this.registrationCode = registrationCode; this.project = project; this.pcName = pcName; this.handlers = handlers; this.logFn = logFn; } // ─── Public API ─── /** Start the WebSocket connection. */ async connect(): Promise { if (!this.hubUrl) { this.logFn('[WS] No hub URL configured — WS disabled'); return; } this.shouldReconnect = true; await this._connect(); } /** Gracefully disconnect. */ disconnect(): void { this.shouldReconnect = false; this._cleanup(); this.logFn('[WS] Disconnected (intentional)'); } /** Check if connected and authenticated. */ isConnected(): boolean { return this.connected && this.authenticated; } /** Get the instance number assigned by the Hub. */ getInstanceNumber(): number { return this.instanceNumber; } /** Send a pending approval to the Hub. */ sendPending(data: WSPendingData): boolean { return this._send({ type: 'pending', data }); } /** Send a chat snapshot to the Hub. */ sendChat(data: WSChatData): boolean { return this._send({ type: 'chat', data }); } /** Send a session registration. */ sendRegister(data: WSRegisterData): boolean { return this._send({ type: 'register', data }); } /** Send an auto_resolve notification. */ sendAutoResolve(requestId: string): boolean { return this._send({ type: 'auto_resolve', data: { request_id: requestId } }); } /** Send a brain event. */ sendBrainEvent(data: any): boolean { return this._send({ type: 'brain_event', data }); } // ─── Internal Connection ─── private async _connect(): Promise { try { // Dynamic import of ws module (Node.js built-in or npm package) const WebSocket = await this._getWebSocketClass(); if (!WebSocket) { this.logFn('[WS] WebSocket module not available'); return; } this.logFn(`[WS] Connecting to ${this.hubUrl}...`); const ws = new WebSocket(this.hubUrl); let connectTimeout: NodeJS.Timeout | null = null; const clearConnectTimeout = () => { if (connectTimeout) { clearTimeout(connectTimeout); connectTimeout = null; } }; // Detect API style: Node.js 'ws' module has .on(), browser WebSocket doesn't const isNodeWs = typeof ws.on === 'function'; if (isNodeWs) { // ─── Node.js ws module (EventEmitter API) ─── ws.on('open', () => { clearConnectTimeout(); this.logFn('[WS] Connection opened, authenticating...'); this.ws = ws; this.connected = true; this._authenticate(); }); ws.on('message', (raw: Buffer | string) => { try { const data = JSON.parse(typeof raw === 'string' ? raw : raw.toString('utf-8')); this._handleMessage(data); } catch (e: any) { this.logFn(`[WS] Parse error: ${e.message}`); } }); ws.on('close', (code: number, reason: Buffer) => { clearConnectTimeout(); const reasonStr = reason ? reason.toString('utf-8') : ''; this.logFn(`[WS] Connection closed: code=${code} reason=${reasonStr}`); this._onDisconnect(); }); ws.on('error', (err: any) => { clearConnectTimeout(); this.logFn(`[WS] Connection error: ${err.message || err}`); this._onDisconnect(); }); ws.on('pong', () => { // Server responded to our ping — connection is alive this.lastPongTime = Date.now(); }); } else { // ─── Browser-style WebSocket API (.onopen / .onmessage) ─── ws.onopen = () => { clearConnectTimeout(); this.logFn('[WS] Connection opened (browser API), authenticating...'); this.ws = ws; this.connected = true; this._authenticate(); }; ws.onmessage = (event: any) => { try { const raw = typeof event.data === 'string' ? event.data : event.data.toString(); const data = JSON.parse(raw); this._handleMessage(data); } catch (e: any) { this.logFn(`[WS] Parse error: ${e.message}`); } }; ws.onclose = (event: any) => { clearConnectTimeout(); this.logFn(`[WS] Connection closed: code=${event.code} reason=${event.reason || ''}`); this._onDisconnect(); }; ws.onerror = (event: any) => { clearConnectTimeout(); this.logFn(`[WS] Error: ${event.message || 'connection error'}`); this._onDisconnect(); }; } // Connection timeout to prevent hanging if no close/error fires connectTimeout = setTimeout(() => { this.logFn('[WS] Connection timeout (15s) — forcing disconnect'); if (this.ws) { try { this.ws.terminate(); } catch { try { this.ws.close(); } catch { } } } else if (ws) { try { ws.terminate(); } catch { try { ws.close(); } catch { } } } this._onDisconnect(); }, 15000); } catch (e: any) { this.logFn(`[WS] Connect failed: ${e.message}`); this._scheduleReconnect(); } } private async _getWebSocketClass(): Promise { try { // Prefer require('ws') — Node.js EventEmitter API with .on() // VS Code runs in Node.js, so this should be available const ws = require('ws'); return ws; } catch { // ws module not available — try built-in WebSocket try { if (typeof globalThis.WebSocket !== 'undefined') { return globalThis.WebSocket; } // Fallback: try the built-in undici WebSocket const { WebSocket } = require('undici'); return WebSocket; } catch { return null; } } } // ─── Authentication ─── private _authenticate(): void { if (!this.ws) return; const authMsg: WSAuthMessage = { type: 'auth', project: this.project, pc: this.pcName, }; // Use session token if available (from previous connection) if (this.sessionToken) { authMsg.token = this.sessionToken; } else if (this.registrationCode) { authMsg.registration_code = this.registrationCode; } this._sendRaw(authMsg); // Timeout for auth response this.authTimer = setTimeout(() => { if (!this.authenticated) { this.logFn('[WS] Auth timeout — closing connection'); this._cleanup(); this._scheduleReconnect(); } }, AUTH_TIMEOUT); } // ─── Message Handling ─── private _handleMessage(msg: WSMessage): void { switch (msg.type) { case 'auth_ok': { const authOk = msg as unknown as WSAuthOkResponse; this.authenticated = true; this.connId = authOk.conn_id; this.instanceNumber = authOk.instance_number; this.sessionToken = authOk.session_token; this.reconnectDelay = INITIAL_RECONNECT_DELAY; this.lastPongTime = Date.now(); // Reset pong timer on auth if (this.authTimer) { clearTimeout(this.authTimer); this.authTimer = null; } this.logFn(`[WS] Authenticated: conn=${this.connId} instance=#${this.instanceNumber} active=${authOk.active_count}`); this._startHeartbeat(); this._flushQueue(); this.handlers.onConnected?.(this.connId, this.instanceNumber, this.sessionToken); break; } case 'auth_fail': { const reason = (msg as any).reason || 'Unknown'; this.logFn(`[WS] Auth failed: ${reason}`); // Clear session token if it was rejected (e.g. expired after 24h) this.sessionToken = ''; if (this.registrationCode) { // Token expired → retry with registration code on next reconnect this.logFn('[WS] Retrying with registration code...'); this._cleanup(); this._scheduleReconnect(); } else { // No registration code available → permanent failure // MUST set shouldReconnect=false BEFORE _cleanup(), because _cleanup() // closes the WS → triggers close event → _onDisconnect() → _scheduleReconnect(). this.shouldReconnect = false; this._cleanup(); this.handlers.onError?.(`Auth failed: ${reason}`); } break; } case 'response': { const data = msg.data as WSResponseData; if (data) { this.logFn(`[WS] Response received: ${data.request_id?.substring(0, 12)} approved=${data.approved}`); this.handlers.onResponse?.(data); } break; } case 'command': { const data = msg.data as WSCommandData; if (data) { this.logFn(`[WS] Command received: ${data.text?.substring(0, 50)}`); this.handlers.onCommand?.(data); } break; } case 'instance_update': { const activeCount = (msg as any).active_count || 0; const instances = (msg as any).instances || []; this.logFn(`[WS] Instance update: ${activeCount} active`); this.handlers.onInstanceUpdate?.(activeCount, instances); break; } case 'error': { const error = (msg as any).error || 'Unknown error'; this.logFn(`[WS] Server error: ${error}`); this.handlers.onError?.(error); break; } default: this.logFn(`[WS] Unknown message type: ${msg.type}`); } } // ─── Send ─── private _send(msg: WSMessage): boolean { // Add unique message ID for dedup msg.msg_id = `${this.project}-${Date.now()}-${++this.msgIdCounter}`; if (this.isConnected()) { return this._sendRaw(msg); } // Queue for later if (this.messageQueue.length >= MAX_QUEUE_SIZE) { // Drop oldest this.messageQueue.shift(); this.logFn('[WS] Queue full — dropped oldest message'); } this.messageQueue.push(msg); this.logFn(`[WS] Queued message (type=${msg.type}, queue=${this.messageQueue.length})`); return false; } private _sendRaw(msg: any): boolean { try { if (this.ws && this.connected) { this.ws.send(JSON.stringify(msg)); return true; } return false; } catch (e: any) { this.logFn(`[WS] Send error: ${e.message}`); return false; } } private async _flushQueue(): Promise { if (this.messageQueue.length === 0) return; this.logFn(`[WS] Flushing ${this.messageQueue.length} queued messages (paced)`); const queue = [...this.messageQueue]; this.messageQueue = []; for (const msg of queue) { this._sendRaw(msg); // Pace the burst to avoid hitting the Hub's rate limit (60 msgs / 10s) await new Promise(r => setTimeout(r, 50)); } } // ─── Heartbeat ─── private _startHeartbeat(): void { this._stopHeartbeat(); this.heartbeatTimer = setInterval(() => { if (this.ws && this.connected) { // Check for zombie connection (no pong for 60s) if (Date.now() - this.lastPongTime > 60000) { this.logFn('[WS] Heartbeat timeout — no pong received for 60s (zombie connection), terminating'); if (this.ws) { try { this.ws.terminate(); } catch { try { this.ws.close(); } catch { } } } this._onDisconnect(); return; } try { // Node.js ws has .ping(), browser WebSocket doesn't if (typeof this.ws.ping === 'function') { this.ws.ping(); } else { // Fallback: send heartbeat as JSON message this.ws.send(JSON.stringify({ type: 'heartbeat' })); } } catch { // ping failure will trigger close event } } }, HEARTBEAT_INTERVAL); } private _stopHeartbeat(): void { if (this.heartbeatTimer) { clearInterval(this.heartbeatTimer); this.heartbeatTimer = null; } } // ─── Reconnection ─── private _onDisconnect(): void { const wasAuthenticated = this.authenticated; this.connected = false; this.authenticated = false; this.ws = null; this._stopHeartbeat(); if (this.authTimer) { clearTimeout(this.authTimer); this.authTimer = null; } if (wasAuthenticated) { this.handlers.onDisconnected?.(); } if (this.shouldReconnect) { this._scheduleReconnect(); } } private _scheduleReconnect(): void { if (this.reconnectTimer) return; // Exponential backoff with jitter const jitter = 1 + (Math.random() * 2 - 1) * RECONNECT_JITTER; const delay = Math.min(this.reconnectDelay * jitter, MAX_RECONNECT_DELAY); this.logFn(`[WS] Reconnecting in ${Math.round(delay)}ms...`); this.reconnectTimer = setTimeout(() => { this.reconnectTimer = null; this.reconnectDelay = Math.min(this.reconnectDelay * 2, MAX_RECONNECT_DELAY); this._connect(); }, delay); } // ─── Cleanup ─── private _cleanup(): void { this._stopHeartbeat(); if (this.authTimer) { clearTimeout(this.authTimer); this.authTimer = null; } if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; } if (this.ws) { try { this.ws.close(); } catch { } this.ws = null; } this.connected = false; this.authenticated = false; } }