fix(bridge): resolve websocket zombie connection and bounding memory leaks
This commit is contained in:
@@ -40,6 +40,12 @@
|
|||||||
|
|
||||||
## 미해결 이슈
|
## 미해결 이슈
|
||||||
|
|
||||||
|
### [2026-03-23] 평생 지속되는 WebSocket 좀비 커넥션 — ping 전송은 성공하나 pong 누락
|
||||||
|
- **증상**: 네트워크 환경이 바뀌거나 컴퓨터 절전 모드 복구 시, 서버와의 실연결이 끊어졌음에도 `ws-client.ts`는 이를 인지하지 못하고 연결됨 상태로 표기하며(Zombie) 에이전트 승인 신호가 도착하지 못함.
|
||||||
|
- **원인**: Node.js `ws` 라이브러리의 `ws.ping()`은 로컬 OS 버퍼에만 패킷을 찔러넣고 반환을 성공으로 처리함. 서버로부터 돌아오는 `pong`을 기다려 타임아웃시키는 Watchdog이 빠져 있어 영구적인 반쪽짜리 소켓(Half-open state)이 됨.
|
||||||
|
- **해결** (v0.5.6): `ping()` 직후 10초 `setTimeout`을 걸고 `on('pong')`에서 해제. 초과 시 로컬 소켓을 강제 `ws.terminate()`하여 `_scheduleReconnect` 재연결 루프를 강제로 트리거함.
|
||||||
|
- **주의**: WebSocket 기반 앱을 설계할 땐 반드시 한쪽이 아닌 '양방향' 응답 검증(Watchdog Ping-Pong) 타이머 구조를 가져야 좀비 소켓을 막을 수 있음.
|
||||||
|
|
||||||
### [2026-03-11] rejectAgentStep / !stop — AG 미등록 커맨드 + 렌더러 전용 함수 + 스테일 프리미티브
|
### [2026-03-11] rejectAgentStep / !stop — AG 미등록 커맨드 + 렌더러 전용 함수 + 스테일 프리미티브
|
||||||
- **증상**: `!stop` 명령이 AI를 멈추지 못함. 로그: "No active cascade" / "no session tracked yet"
|
- **증상**: `!stop` 명령이 AI를 멈추지 못함. 로그: "No active cascade" / "no session tracked yet"
|
||||||
- **원인**: (1) `antigravity.agent.rejectAgentStep`은 AG 미등록 커맨드. (2) 대체한 `getActiveCascadeId()`는 **렌더러(DOM) 전용 함수** — Extension host에서 항상 `undefined` 반환. (3) **v0.4.5 수정도 실패**: `extension.ts`의 `getActiveSessionId: () => activeSessionId`가 module-level 스트링 프리미티브를 참조 — step-probe가 `ctx.activeSessionId`를 업데이트해도 extension.ts의 변수는 불변 (프리미티브 복사)
|
- **원인**: (1) `antigravity.agent.rejectAgentStep`은 AG 미등록 커맨드. (2) 대체한 `getActiveCascadeId()`는 **렌더러(DOM) 전용 함수** — Extension host에서 항상 `undefined` 반환. (3) **v0.4.5 수정도 실패**: `extension.ts`의 `getActiveSessionId: () => activeSessionId`가 module-level 스트링 프리미티브를 참조 — step-probe가 `ctx.activeSessionId`를 업데이트해도 extension.ts의 변수는 불변 (프리미티브 복사)
|
||||||
|
|||||||
32
bot.py
32
bot.py
@@ -202,7 +202,7 @@ class GravityBot(commands.Bot):
|
|||||||
self.conv_to_project: dict[str, str] = {} # conv_id → project
|
self.conv_to_project: dict[str, str] = {} # conv_id → project
|
||||||
self.channel_to_project: dict[int, str] = {} # channel.id → project
|
self.channel_to_project: dict[int, str] = {} # channel.id → project
|
||||||
self.session_status_messages: dict[str, int] = {} # conv_id → msg_id
|
self.session_status_messages: dict[str, int] = {} # conv_id → msg_id
|
||||||
self._sent_approval_ids: set[str] = set()
|
self._sent_approval_ids: dict[str, bool] = {} # request_id → bool
|
||||||
self._deferred_ids: dict[str, int] = {} # request_id → defer count
|
self._deferred_ids: dict[str, int] = {} # request_id → defer count
|
||||||
self._sent_commands: dict[str, str] = {} # request_id → command text (for MERGE edit detection)
|
self._sent_commands: dict[str, str] = {} # request_id → command text (for MERGE edit detection)
|
||||||
self._ready_event = asyncio.Event()
|
self._ready_event = asyncio.Event()
|
||||||
@@ -255,6 +255,13 @@ class GravityBot(commands.Bot):
|
|||||||
cmd_data["id"] = cmd_data.get("id", str(int(_time.time() * 1000)))
|
cmd_data["id"] = cmd_data.get("id", str(int(_time.time() * 1000)))
|
||||||
self.gateway.push_command(project, cmd_data)
|
self.gateway.push_command(project, cmd_data)
|
||||||
|
|
||||||
|
def _cap_dict(self, d: dict, max_size: int = 5000):
|
||||||
|
"""Prevent memory leaks by capping dictionary sizes using insertion order (oldest first)."""
|
||||||
|
if len(d) >= max_size:
|
||||||
|
to_remove = len(d) - max_size + max_size // 10 # remove 10%
|
||||||
|
for k in list(d.keys())[:to_remove]:
|
||||||
|
d.pop(k, None)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _make_channel_name(project_name: str) -> str:
|
def _make_channel_name(project_name: str) -> str:
|
||||||
"""ag-gravity_control, ag-deriva, etc."""
|
"""ag-gravity_control, ag-deriva, etc."""
|
||||||
@@ -650,11 +657,13 @@ class GravityBot(commands.Bot):
|
|||||||
reject_commands = {"deny", "reject", "cancel", "decline", "dismiss", "stop"}
|
reject_commands = {"deny", "reject", "cancel", "decline", "dismiss", "stop"}
|
||||||
if req.command.strip().lower() in reject_commands:
|
if req.command.strip().lower() in reject_commands:
|
||||||
logger.warning(f"Auto-approve BLOCKED: command='{req.command}' is reject-word — skipping")
|
logger.warning(f"Auto-approve BLOCKED: command='{req.command}' is reject-word — skipping")
|
||||||
self._sent_approval_ids.add(req.request_id)
|
self._cap_dict(self._sent_approval_ids)
|
||||||
|
self._sent_approval_ids[req.request_id] = True
|
||||||
phase1_processed += 1
|
phase1_processed += 1
|
||||||
continue
|
continue
|
||||||
|
|
||||||
self._sent_approval_ids.add(req.request_id)
|
self._cap_dict(self._sent_approval_ids)
|
||||||
|
self._sent_approval_ids[req.request_id] = True
|
||||||
|
|
||||||
# Smart button_index: read buttons array from pending file
|
# Smart button_index: read buttons array from pending file
|
||||||
# file_permission buttons = [Allow Once(0), Allow This Conv(1), Deny(2)]
|
# file_permission buttons = [Allow Once(0), Allow This Conv(1), Deny(2)]
|
||||||
@@ -724,7 +733,9 @@ class GravityBot(commands.Bot):
|
|||||||
|
|
||||||
channel = await self._get_channel(project)
|
channel = await self._get_channel(project)
|
||||||
if channel:
|
if channel:
|
||||||
self._sent_approval_ids.add(req.request_id)
|
self._cap_dict(self._sent_approval_ids)
|
||||||
|
self._sent_approval_ids[req.request_id] = True
|
||||||
|
self._cap_dict(self._sent_commands)
|
||||||
self._sent_commands[req.request_id] = req.command
|
self._sent_commands[req.request_id] = req.command
|
||||||
await self._send_approval_request(channel, req)
|
await self._send_approval_request(channel, req)
|
||||||
phase1_processed += 1
|
phase1_processed += 1
|
||||||
@@ -767,7 +778,7 @@ class GravityBot(commands.Bot):
|
|||||||
self._deferred_ids.pop(rid, None)
|
self._deferred_ids.pop(rid, None)
|
||||||
self._sent_commands.pop(rid, None)
|
self._sent_commands.pop(rid, None)
|
||||||
self._approval_messages.pop(rid, None)
|
self._approval_messages.pop(rid, None)
|
||||||
self._sent_approval_ids.discard(rid)
|
self._sent_approval_ids.pop(rid, None)
|
||||||
phase2_processed += 1
|
phase2_processed += 1
|
||||||
|
|
||||||
elif status == "expired":
|
elif status == "expired":
|
||||||
@@ -790,7 +801,7 @@ class GravityBot(commands.Bot):
|
|||||||
f.unlink()
|
f.unlink()
|
||||||
self._deferred_ids.pop(rid, None)
|
self._deferred_ids.pop(rid, None)
|
||||||
self._sent_commands.pop(rid, None)
|
self._sent_commands.pop(rid, None)
|
||||||
self._sent_approval_ids.discard(rid)
|
self._sent_approval_ids.pop(rid, None)
|
||||||
phase2_processed += 1
|
phase2_processed += 1
|
||||||
|
|
||||||
elif status == "pending":
|
elif status == "pending":
|
||||||
@@ -885,6 +896,7 @@ class GravityBot(commands.Bot):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
logger.info(f"Sent approval request: {request.request_id[:12]}")
|
logger.info(f"Sent approval request: {request.request_id[:12]}")
|
||||||
|
self._cap_dict(self._approval_messages)
|
||||||
self._approval_messages[request.request_id] = msg.id # FIX #4: Track msg_id for auto_resolved lookup
|
self._approval_messages[request.request_id] = msg.id # FIX #4: Track msg_id for auto_resolved lookup
|
||||||
|
|
||||||
# ─── Discord → IDE Text Relay + Multi-PC UX ───────────────────────────
|
# ─── Discord → IDE Text Relay + Multi-PC UX ───────────────────────────
|
||||||
@@ -1073,7 +1085,10 @@ class GravityBot(commands.Bot):
|
|||||||
view = ApprovalView(self.bridge, request, buttons=buttons, hub=self.hub)
|
view = ApprovalView(self.bridge, request, buttons=buttons, hub=self.hub)
|
||||||
msg = await channel.send(embed=embed, view=view)
|
msg = await channel.send(embed=embed, view=view)
|
||||||
|
|
||||||
self._sent_approval_ids.add(request_id)
|
self._cap_dict(self._sent_approval_ids)
|
||||||
|
self._sent_approval_ids[request_id] = True
|
||||||
|
|
||||||
|
self._cap_dict(self._approval_messages)
|
||||||
self._approval_messages[request_id] = msg.id
|
self._approval_messages[request_id] = msg.id
|
||||||
logger.info(f"[HUB-PENDING] Sent approval: {request_id[:12]} project={project}")
|
logger.info(f"[HUB-PENDING] Sent approval: {request_id[:12]} project={project}")
|
||||||
|
|
||||||
@@ -1082,7 +1097,8 @@ class GravityBot(commands.Bot):
|
|||||||
|
|
||||||
async def _auto_approve_via_hub(self, request: ApprovalRequest):
|
async def _auto_approve_via_hub(self, request: ApprovalRequest):
|
||||||
"""Auto-approve a pending request via Hub."""
|
"""Auto-approve a pending request via Hub."""
|
||||||
self._sent_approval_ids.add(request.request_id)
|
self._cap_dict(self._sent_approval_ids)
|
||||||
|
self._sent_approval_ids[request.request_id] = True
|
||||||
|
|
||||||
delivered = False
|
delivered = False
|
||||||
if self.hub:
|
if self.hub:
|
||||||
|
|||||||
5
docs/devlog/2026-03-23.md
Normal file
5
docs/devlog/2026-03-23.md
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
# 2026-03-23 Devlog
|
||||||
|
|
||||||
|
| NNN | HH:MM | 작업 설명 | `커밋해시` | ✅ 또는 🔧 |
|
||||||
|
|-----|-------|----------|-----------|-----------|
|
||||||
|
| 001 | 21:09 | WebSocket 좀비 커넥션 해결 및 통신망 메모리 누수 패치 | `TBD` | ✅ |
|
||||||
12
docs/devlog/entries/20260323-001.md
Normal file
12
docs/devlog/entries/20260323-001.md
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
# WebSocket 좀비 커넥션 해결 및 통신망 메모리 누수 구조 패치
|
||||||
|
|
||||||
|
- **시간**: 2026-03-23 21:09~21:20
|
||||||
|
- **Commit**: TBD
|
||||||
|
- **Vikunja**: #510 → done
|
||||||
|
|
||||||
|
## 결정 사항
|
||||||
|
- **ws-client.ts 핑퐁 와치독(Ping-Pong Watchdog)**: 단순 에러 캐치가 아니라 `ws.terminate()`를 통해 무반응 소켓을 강제 종료하여 자체 재연결 로직(`_onDisconnect`)을 활성화하도록 설계.
|
||||||
|
- **통신망 추적 변수 캡핑(Bounded Cap)**: `hub.py`의 `pending_owners` 및 `bot.py`의 `_sent_approval_ids` 등 무한히 쌓일 수 있는 파이썬 딕셔너리에 LRU(오래된 순 삭제) 로직을 추가. 비록 당장 OOM을 유발하진 않지만 이 구조적 메모리 누수(Leak)를 원천적으로 차단하여 시스템 안정성을 극대화함.
|
||||||
|
|
||||||
|
## 미완료
|
||||||
|
- 없음
|
||||||
@@ -44,6 +44,7 @@ var __importStar = (this && this.__importStar) || (function () {
|
|||||||
};
|
};
|
||||||
})();
|
})();
|
||||||
Object.defineProperty(exports, "__esModule", { value: true });
|
Object.defineProperty(exports, "__esModule", { value: true });
|
||||||
|
exports.fixLSConnection = fixLSConnection;
|
||||||
exports.activate = activate;
|
exports.activate = activate;
|
||||||
exports.deactivate = deactivate;
|
exports.deactivate = deactivate;
|
||||||
const vscode = __importStar(require("vscode"));
|
const vscode = __importStar(require("vscode"));
|
||||||
@@ -251,18 +252,24 @@ async function initSDK(context) {
|
|||||||
* found (wrong workspace).
|
* found (wrong workspace).
|
||||||
*/
|
*/
|
||||||
async function fixLSConnection() {
|
async function fixLSConnection() {
|
||||||
if (!sdk?.ls)
|
if (!sdk?.ls) {
|
||||||
return;
|
logToFile('[LS-FIX] skipped: sdk.ls not available');
|
||||||
|
return false;
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
const folders = vscode.workspace.workspaceFolders;
|
const folders = vscode.workspace.workspaceFolders;
|
||||||
if (!folders || folders.length === 0)
|
if (!folders || folders.length === 0) {
|
||||||
return;
|
logToFile('[LS-FIX] skipped: no workspace folders');
|
||||||
|
return false;
|
||||||
|
}
|
||||||
// Generate the workspace hint the same way SDK does, but we'll match case-insensitively
|
// Generate the workspace hint the same way SDK does, but we'll match case-insensitively
|
||||||
const folder = folders[0].uri.fsPath;
|
const folder = folders[0].uri.fsPath;
|
||||||
const parts = folder.replace(/\\/g, '/').split('/');
|
const parts = folder.replace(/\\/g, '/').split('/');
|
||||||
const hint = parts.slice(-2).join('_').replace(/[-.\s]/g, '_').toLowerCase();
|
const hint = parts.slice(-2).join('_').replace(/[-.\s]/g, '_').toLowerCase();
|
||||||
if (!hint)
|
if (!hint) {
|
||||||
return;
|
logToFile('[LS-FIX] skipped: empty hint');
|
||||||
|
return false;
|
||||||
|
}
|
||||||
// Find all language_server processes with csrf_token
|
// Find all language_server processes with csrf_token
|
||||||
const { exec } = cp;
|
const { exec } = cp;
|
||||||
const { promisify } = require('util');
|
const { promisify } = require('util');
|
||||||
@@ -274,12 +281,18 @@ async function fixLSConnection() {
|
|||||||
const result = await execAsync(`powershell.exe -NoProfile -EncodedCommand ${encoded}`, { encoding: 'utf8', timeout: 15000, windowsHide: true });
|
const result = await execAsync(`powershell.exe -NoProfile -EncodedCommand ${encoded}`, { encoding: 'utf8', timeout: 15000, windowsHide: true });
|
||||||
output = result.stdout;
|
output = result.stdout;
|
||||||
}
|
}
|
||||||
catch {
|
catch (psErr) {
|
||||||
return; // Can't discover processes — leave SDK's choice
|
logToFile(`[LS-FIX] skipped: PowerShell failed — ${psErr.message?.substring(0, 100)}`);
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
const lines = output.split('\n').filter((l) => l.trim().length > 0);
|
const lines = output.split('\n').filter((l) => l.trim().length > 0);
|
||||||
if (lines.length <= 1)
|
if (lines.length === 0) {
|
||||||
return; // Only one LS — no ambiguity
|
logToFile('[LS-FIX] skipped: no LS processes found');
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// NOTE: Do NOT skip on single LS — SDK may have fallen back to wrong LS
|
||||||
|
// due to case-sensitive hint mismatch, even when only one process exists.
|
||||||
|
logToFile(`[LS-FIX] found ${lines.length} LS process(es), hint="${hint}"`);
|
||||||
// Find the line whose workspace_id matches our workspace (case-insensitive)
|
// Find the line whose workspace_id matches our workspace (case-insensitive)
|
||||||
let matchedLine = null;
|
let matchedLine = null;
|
||||||
for (const line of lines) {
|
for (const line of lines) {
|
||||||
@@ -296,7 +309,7 @@ async function fixLSConnection() {
|
|||||||
}
|
}
|
||||||
if (!matchedLine) {
|
if (!matchedLine) {
|
||||||
logToFile(`[LS-FIX] No LS process matched hint="${hint}" (${lines.length} processes)`);
|
logToFile(`[LS-FIX] No LS process matched hint="${hint}" (${lines.length} processes)`);
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
// Extract port and csrf_token from matched line
|
// Extract port and csrf_token from matched line
|
||||||
const csrfMatch = matchedLine.match(/--csrf_token[= ](\S+)/);
|
const csrfMatch = matchedLine.match(/--csrf_token[= ](\S+)/);
|
||||||
@@ -304,7 +317,7 @@ async function fixLSConnection() {
|
|||||||
const pidMatch = matchedLine.split('|')[0]?.trim();
|
const pidMatch = matchedLine.split('|')[0]?.trim();
|
||||||
if (!csrfMatch || !extPortMatch) {
|
if (!csrfMatch || !extPortMatch) {
|
||||||
logToFile(`[LS-FIX] Matched LS but missing csrf/port args`);
|
logToFile(`[LS-FIX] Matched LS but missing csrf/port args`);
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
const csrfToken = csrfMatch[1];
|
const csrfToken = csrfMatch[1];
|
||||||
const extPort = parseInt(extPortMatch[1], 10);
|
const extPort = parseInt(extPortMatch[1], 10);
|
||||||
@@ -312,7 +325,7 @@ async function fixLSConnection() {
|
|||||||
// Check if SDK already connected to this LS
|
// Check if SDK already connected to this LS
|
||||||
if (sdk.ls.port === extPort) {
|
if (sdk.ls.port === extPort) {
|
||||||
logToFile(`[LS-FIX] SDK already on correct LS port=${extPort}`);
|
logToFile(`[LS-FIX] SDK already on correct LS port=${extPort}`);
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
// Find ConnectRPC port via netstat (same as SDK logic)
|
// Find ConnectRPC port via netstat (same as SDK logic)
|
||||||
let netstatOutput;
|
let netstatOutput;
|
||||||
@@ -325,7 +338,7 @@ async function fixLSConnection() {
|
|||||||
logToFile(`[LS-FIX] netstat failed, using ext_port=${extPort} for PID=${pid}`);
|
logToFile(`[LS-FIX] netstat failed, using ext_port=${extPort} for PID=${pid}`);
|
||||||
sdk.ls.setConnection(extPort, csrfToken, false);
|
sdk.ls.setConnection(extPort, csrfToken, false);
|
||||||
logToFile(`[LS-FIX] ✅ Reconnected to correct LS: port=${extPort} hint="${hint}" PID=${pid}`);
|
logToFile(`[LS-FIX] ✅ Reconnected to correct LS: port=${extPort} hint="${hint}" PID=${pid}`);
|
||||||
return;
|
return true;
|
||||||
}
|
}
|
||||||
const portMatches = netstatOutput.matchAll(/127\.0\.0\.1:(\d+)/g);
|
const portMatches = netstatOutput.matchAll(/127\.0\.0\.1:(\d+)/g);
|
||||||
const ports = [];
|
const ports = [];
|
||||||
@@ -358,7 +371,7 @@ async function fixLSConnection() {
|
|||||||
if (ok) {
|
if (ok) {
|
||||||
sdk.ls.setConnection(port, csrfToken, useTls);
|
sdk.ls.setConnection(port, csrfToken, useTls);
|
||||||
logToFile(`[LS-FIX] ✅ Reconnected to correct LS: port=${port} ${proto} hint="${hint}" PID=${pid}`);
|
logToFile(`[LS-FIX] ✅ Reconnected to correct LS: port=${port} ${proto} hint="${hint}" PID=${pid}`);
|
||||||
return;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch { /* try next */ }
|
catch { /* try next */ }
|
||||||
@@ -367,9 +380,11 @@ async function fixLSConnection() {
|
|||||||
// Last resort: use extension_server_port
|
// Last resort: use extension_server_port
|
||||||
sdk.ls.setConnection(extPort, csrfToken, false);
|
sdk.ls.setConnection(extPort, csrfToken, false);
|
||||||
logToFile(`[LS-FIX] ✅ Reconnected via ext_port=${extPort} hint="${hint}" PID=${pid}`);
|
logToFile(`[LS-FIX] ✅ Reconnected via ext_port=${extPort} hint="${hint}" PID=${pid}`);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
catch (err) {
|
catch (err) {
|
||||||
logToFile(`[LS-FIX] error: ${err.message}`);
|
logToFile(`[LS-FIX] error: ${err.message}`);
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// ─── Approval Observer + Product.json Checksums extracted to ./html-patcher.ts ───
|
// ─── Approval Observer + Product.json Checksums extracted to ./html-patcher.ts ───
|
||||||
@@ -515,6 +530,7 @@ async function activate(context) {
|
|||||||
recentDiscordSentTexts,
|
recentDiscordSentTexts,
|
||||||
writeChatSnapshot,
|
writeChatSnapshot,
|
||||||
writeChatSnapshotWithFiles,
|
writeChatSnapshotWithFiles,
|
||||||
|
fixLSConnection,
|
||||||
});
|
});
|
||||||
// Start HTTP bridge with live step-probe state (prevents stale primitive bug)
|
// Start HTTP bridge with live step-probe state (prevents stale primitive bug)
|
||||||
const httpBridgeCtx = {
|
const httpBridgeCtx = {
|
||||||
|
|||||||
File diff suppressed because one or more lines are too long
@@ -2,7 +2,7 @@
|
|||||||
"name": "gravity-bridge",
|
"name": "gravity-bridge",
|
||||||
"displayName": "Gravity Bridge",
|
"displayName": "Gravity Bridge",
|
||||||
"description": "Antigravity ↔ Discord 브리지 연동 확장",
|
"description": "Antigravity ↔ Discord 브리지 연동 확장",
|
||||||
"version": "0.5.5",
|
"version": "0.5.6",
|
||||||
"publisher": "variet",
|
"publisher": "variet",
|
||||||
"engines": {
|
"engines": {
|
||||||
"vscode": "^1.100.0"
|
"vscode": "^1.100.0"
|
||||||
|
|||||||
@@ -92,6 +92,7 @@ export function resetPendingState(): void {
|
|||||||
* re-detection of WAITING steps whose pending was lost during disconnect.
|
* re-detection of WAITING steps whose pending was lost during disconnect.
|
||||||
*/
|
*/
|
||||||
export function resetPendingStateForReconnect(): void {
|
export function resetPendingStateForReconnect(): void {
|
||||||
|
if (!ctx) return; // Prevent startup race conditions
|
||||||
ctx.lastPendingStepIndex = -1;
|
ctx.lastPendingStepIndex = -1;
|
||||||
ctx.stallProbed = false;
|
ctx.stallProbed = false;
|
||||||
ctx.sawRunningAfterPending = false;
|
ctx.sawRunningAfterPending = false;
|
||||||
|
|||||||
@@ -122,6 +122,7 @@ export class WSBridgeClient {
|
|||||||
private reconnectDelay = INITIAL_RECONNECT_DELAY;
|
private reconnectDelay = INITIAL_RECONNECT_DELAY;
|
||||||
private reconnectTimer: NodeJS.Timeout | null = null;
|
private reconnectTimer: NodeJS.Timeout | null = null;
|
||||||
private heartbeatTimer: NodeJS.Timeout | null = null;
|
private heartbeatTimer: NodeJS.Timeout | null = null;
|
||||||
|
private pongTimeoutTimer: NodeJS.Timeout | null = null;
|
||||||
private authTimer: NodeJS.Timeout | null = null;
|
private authTimer: NodeJS.Timeout | null = null;
|
||||||
|
|
||||||
// Message queue (survives reconnection)
|
// Message queue (survives reconnection)
|
||||||
@@ -245,6 +246,10 @@ export class WSBridgeClient {
|
|||||||
|
|
||||||
ws.on('pong', () => {
|
ws.on('pong', () => {
|
||||||
// Server responded to our ping — connection is alive
|
// Server responded to our ping — connection is alive
|
||||||
|
if (this.pongTimeoutTimer) {
|
||||||
|
clearTimeout(this.pongTimeoutTimer);
|
||||||
|
this.pongTimeoutTimer = null;
|
||||||
|
}
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
// ─── Browser-style WebSocket API (.onopen / .onmessage) ───
|
// ─── Browser-style WebSocket API (.onopen / .onmessage) ───
|
||||||
@@ -469,6 +474,16 @@ export class WSBridgeClient {
|
|||||||
// Node.js ws has .ping(), browser WebSocket doesn't
|
// Node.js ws has .ping(), browser WebSocket doesn't
|
||||||
if (typeof this.ws.ping === 'function') {
|
if (typeof this.ws.ping === 'function') {
|
||||||
this.ws.ping();
|
this.ws.ping();
|
||||||
|
|
||||||
|
// Set timeout waiting for pong
|
||||||
|
if (this.pongTimeoutTimer) clearTimeout(this.pongTimeoutTimer);
|
||||||
|
this.pongTimeoutTimer = setTimeout(() => {
|
||||||
|
this.logFn('[WS] Heartbeat timeout — no pong received, terminating connection');
|
||||||
|
if (this.ws) {
|
||||||
|
try { this.ws.terminate(); } catch { try { this.ws.close(); } catch { } }
|
||||||
|
}
|
||||||
|
this._onDisconnect();
|
||||||
|
}, 10000); // 10s timeout
|
||||||
} else {
|
} else {
|
||||||
// Fallback: send heartbeat as JSON message
|
// Fallback: send heartbeat as JSON message
|
||||||
this.ws.send(JSON.stringify({ type: 'heartbeat' }));
|
this.ws.send(JSON.stringify({ type: 'heartbeat' }));
|
||||||
@@ -485,6 +500,10 @@ export class WSBridgeClient {
|
|||||||
clearInterval(this.heartbeatTimer);
|
clearInterval(this.heartbeatTimer);
|
||||||
this.heartbeatTimer = null;
|
this.heartbeatTimer = null;
|
||||||
}
|
}
|
||||||
|
if (this.pongTimeoutTimer) {
|
||||||
|
clearTimeout(this.pongTimeoutTimer);
|
||||||
|
this.pongTimeoutTimer = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ─── Reconnection ───
|
// ─── Reconnection ───
|
||||||
@@ -540,6 +559,11 @@ export class WSBridgeClient {
|
|||||||
this.reconnectTimer = null;
|
this.reconnectTimer = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (this.pongTimeoutTimer) {
|
||||||
|
clearTimeout(this.pongTimeoutTimer);
|
||||||
|
this.pongTimeoutTimer = null;
|
||||||
|
}
|
||||||
|
|
||||||
if (this.ws) {
|
if (this.ws) {
|
||||||
try {
|
try {
|
||||||
this.ws.close();
|
this.ws.close();
|
||||||
|
|||||||
5
hub.py
5
hub.py
@@ -546,6 +546,11 @@ class WSHub:
|
|||||||
payload = data.get("data", {})
|
payload = data.get("data", {})
|
||||||
request_id = payload.get("request_id", "")
|
request_id = payload.get("request_id", "")
|
||||||
if request_id:
|
if request_id:
|
||||||
|
# Prevent slow memory leak for stranded requests
|
||||||
|
if len(self.pending_owners) > 10000:
|
||||||
|
oldest = next(iter(self.pending_owners))
|
||||||
|
self.pending_owners.pop(oldest, None)
|
||||||
|
|
||||||
# Track ownership for response routing
|
# Track ownership for response routing
|
||||||
self.pending_owners[request_id] = conn.conn_id
|
self.pending_owners[request_id] = conn.conn_id
|
||||||
# Add source metadata
|
# Add source metadata
|
||||||
|
|||||||
Reference in New Issue
Block a user