refactor(cleanup): v0.5.0 Collector 제거 + dead code 정리 + HttpBridgeContext 버그 수정
- DELETE collector.py (523줄) - main.py: BOT_MODE=remote 분기 제거 - gateway.py: Collector REST 6개 endpoint 제거 (311→168줄) - bridge.py: RemoteTransport 제거 (480→270줄) - config.py: REMOTE_BRIDGE_URL 제거 - extension.ts: dead code 4개 + stale module vars 제거 - step-probe.ts: getStepProbeContext() 추가, autoApproveEnabled 제거 - FIX: HttpBridgeContext stale primitive (getter 패턴으로 수정) - ADD: extension.log rotation (10MB→2MB tail) - docs: architecture.md, tech-stack.md, known-issues.md 업데이트
This commit is contained in:
@@ -41,11 +41,10 @@ gravity_control/
|
||||
├── bot.py # Discord 봇: 승인 UI, 채널 관리, Hub 핸들러 (1,286줄)
|
||||
├── hub.py # WebSocket Hub: 연결 관리, 메시지 라우팅 (580줄)
|
||||
├── auth.py # JWT 토큰 + registration code 인증 (127줄)
|
||||
├── gateway.py # HTTP REST API + /ws endpoint (310줄)
|
||||
├── bridge.py # 파일 기반 IPC (레거시 fallback) (479줄)
|
||||
├── gateway.py # HTTP REST API + /ws endpoint (168줄)
|
||||
├── bridge.py # 파일 기반 IPC (레거시 fallback) (270줄)
|
||||
├── watcher.py # Brain 디렉토리 변경 감시 (290줄)
|
||||
├── parser.py # Markdown → Discord 변환 (245줄)
|
||||
├── collector.py # 원격 파일→HTTP 릴레이 (⚠️ DEPRECATED — WS Hub 사용) (523줄)
|
||||
│
|
||||
├── ── Extension 측 (TypeScript) ──
|
||||
├── extension/src/
|
||||
|
||||
@@ -71,3 +71,4 @@
|
||||
| 8 | **`processResponseFile` 상태 리셋은 `sawRunningAfterPending=true`만** | processResponseFile 무한 루프 |
|
||||
| 9 | **fs.watch Windows 불안정 — 반드시 polling fallback 병행** | fs.watch silent fail |
|
||||
| 10 | **diff_review는 VS Code 커맨드만 유효** — RPC 3개 전략 모두 실패 확정 | diff_review RPC dead-end |
|
||||
| 11 | **HttpBridgeContext에 프리미티브 by-value 복사 금지** — 별도 객체 생성 시 getter 사용 | HttpBridgeContext stale primitive |
|
||||
|
||||
@@ -68,10 +68,7 @@
|
||||
| 변수명 | 용도 | 기본값 |
|
||||
|--------|------|--------|
|
||||
| BRAIN_PATH | AG 브레인 경로 | `~/.gemini/antigravity/brain` |
|
||||
| BOT_MODE | `local` / ~~`remote`~~ / `gateway` | `local` |
|
||||
|
||||
> [!WARNING]
|
||||
> `BOT_MODE=remote` (Collector 모드)는 **deprecated**입니다. `gateway` 모드 + Extension WS를 사용하세요.
|
||||
| BOT_MODE | `local` / `gateway` | `local` |
|
||||
| DEBOUNCE_SECONDS | Watcher 디바운스 간격 | `5` |
|
||||
| PROJECT_NAME | 프로젝트 이름 | `gravity_control` |
|
||||
|
||||
@@ -83,7 +80,6 @@
|
||||
| GATEWAY_API_KEY | REST API 인증 키 | (미설정 시 인증 미사용) |
|
||||
| GRAVITY_HUB_SECRET | WS Hub JWT 서명 시크릿 (64char hex) | (미설정 시 인증 생략) |
|
||||
| GRAVITY_REGISTRATION_CODE | Extension 등록 코드 (32char hex) | (미설정 시 인증 생략) |
|
||||
| REMOTE_BRIDGE_URL | ~~Collector 원격 URL~~ | ⚠️ deprecated (remote 모드 전용) |
|
||||
|
||||
## Extension VS Code 설정
|
||||
|
||||
|
||||
214
bridge.py
214
bridge.py
@@ -12,10 +12,6 @@ Protocol:
|
||||
2. Bot reads pending/ → sends Discord message with ✅/❌ buttons
|
||||
3. User clicks button → Bot writes JSON to response/
|
||||
4. VS Code Extension reads response/ → executes action
|
||||
|
||||
Transport layer:
|
||||
LocalTransport — file-based (default, single-PC)
|
||||
RemoteTransport — HTTP-based (future: multi-PC collector mode)
|
||||
"""
|
||||
|
||||
import json
|
||||
@@ -150,216 +146,6 @@ class LocalTransport(BridgeTransport):
|
||||
(self.bridge_dir / sub).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
class RemoteTransport(BridgeTransport):
|
||||
"""HTTP-based transport for Collector → Gateway communication.
|
||||
|
||||
Maps BridgeTransport methods to Gateway API endpoints:
|
||||
list_json_files("pending") → GET /api/pending (returns list)
|
||||
write_json("pending", ...) → POST /api/pending
|
||||
read_json("response", ...) → GET /api/response/{rid}
|
||||
write_json("commands", ...) → (not used by Collector, Gateway pushes commands)
|
||||
etc.
|
||||
"""
|
||||
|
||||
def __init__(self, base_url: str, api_key: str = ""):
|
||||
self.base_url = base_url.rstrip("/")
|
||||
self.api_key = api_key
|
||||
self._headers = {"Content-Type": "application/json"}
|
||||
if api_key:
|
||||
self._headers["Authorization"] = f"Bearer {api_key}"
|
||||
self._session = None # aiohttp.ClientSession — lazy created
|
||||
|
||||
# Connection health
|
||||
self.connected = False
|
||||
self._consecutive_failures = 0
|
||||
self._max_failures_before_warning = 3
|
||||
|
||||
# Rate limit backoff
|
||||
self._rate_limited_until = 0.0 # timestamp until which we should not send requests
|
||||
self._backoff_seconds = 0.0 # current backoff duration (exponential)
|
||||
self._BACKOFF_BASE = 2.0
|
||||
self._BACKOFF_MAX = 60.0
|
||||
self._success_streak = 0 # consecutive successes for gradual backoff reduction
|
||||
|
||||
# Retry queue: list of (method, path, data) tuples
|
||||
self._retry_queue: list[tuple[str, str, dict | None]] = []
|
||||
self._retry_queue_max = 100
|
||||
|
||||
logger.info(f"RemoteTransport: {self.base_url} (auth={'yes' if api_key else 'no'})")
|
||||
|
||||
async def _get_session(self):
|
||||
"""Lazy-create aiohttp session."""
|
||||
if self._session is None or self._session.closed:
|
||||
import aiohttp
|
||||
timeout = aiohttp.ClientTimeout(total=10)
|
||||
self._session = aiohttp.ClientSession(
|
||||
headers=self._headers, timeout=timeout
|
||||
)
|
||||
return self._session
|
||||
|
||||
async def close(self):
|
||||
"""Close the HTTP session."""
|
||||
if self._session and not self._session.closed:
|
||||
await self._session.close()
|
||||
|
||||
@property
|
||||
def is_rate_limited(self) -> bool:
|
||||
"""Check if we are currently in a rate-limit backoff period."""
|
||||
return time.time() < self._rate_limited_until
|
||||
|
||||
def _apply_backoff(self, retry_after: float = 0):
|
||||
"""Apply exponential backoff for rate limiting."""
|
||||
self._success_streak = 0 # Reset success streak on any failure
|
||||
if retry_after > 0:
|
||||
self._backoff_seconds = min(retry_after, self._BACKOFF_MAX)
|
||||
else:
|
||||
if self._backoff_seconds == 0:
|
||||
self._backoff_seconds = self._BACKOFF_BASE
|
||||
else:
|
||||
self._backoff_seconds = min(self._backoff_seconds * 2, self._BACKOFF_MAX)
|
||||
self._rate_limited_until = time.time() + self._backoff_seconds
|
||||
logger.warning(f"RemoteTransport: backing off {self._backoff_seconds:.0f}s (until +{self._backoff_seconds:.0f}s)")
|
||||
|
||||
def _on_request_success(self):
|
||||
"""Gradually reduce backoff after consecutive successes.
|
||||
|
||||
Instead of instantly resetting to 0 (which causes the 1s oscillation loop
|
||||
when 7 loops share one transport), require sustained success before reducing.
|
||||
"""
|
||||
if self._backoff_seconds <= 0:
|
||||
return # Already at zero, nothing to do
|
||||
self._success_streak += 1
|
||||
if self._success_streak >= 5:
|
||||
# Halve the backoff (gradual cooldown)
|
||||
self._backoff_seconds = self._backoff_seconds / 2
|
||||
if self._backoff_seconds < 0.5:
|
||||
self._backoff_seconds = 0
|
||||
self._rate_limited_until = 0
|
||||
self._success_streak = 0
|
||||
|
||||
async def _arequest(self, method: str, path: str, data: dict | None = None) -> dict | None:
|
||||
"""Async non-blocking HTTP request to Gateway API."""
|
||||
# Skip if in backoff period (except health checks)
|
||||
if self.is_rate_limited and path != "/health":
|
||||
return None
|
||||
|
||||
session = await self._get_session()
|
||||
url = f"{self.base_url}{path}"
|
||||
try:
|
||||
kwargs = {}
|
||||
if data is not None:
|
||||
kwargs["json"] = data
|
||||
async with session.request(method, url, **kwargs) as resp:
|
||||
if resp.status >= 400:
|
||||
if resp.status == 401:
|
||||
logger.error("RemoteTransport: 401 Unauthorized — check GATEWAY_API_KEY")
|
||||
elif resp.status == 429:
|
||||
retry_after = float(resp.headers.get("Retry-After", 0))
|
||||
self._apply_backoff(retry_after)
|
||||
else:
|
||||
logger.warning(f"RemoteTransport: {method} {path} → {resp.status}")
|
||||
return None
|
||||
result = await resp.json()
|
||||
if not self.connected:
|
||||
logger.info("RemoteTransport: ✅ Gateway connected")
|
||||
self.connected = True
|
||||
self._consecutive_failures = 0
|
||||
self._on_request_success()
|
||||
return result
|
||||
except Exception as e:
|
||||
self._consecutive_failures += 1
|
||||
if self._consecutive_failures == self._max_failures_before_warning:
|
||||
logger.error(f"RemoteTransport: ❌ Gateway unreachable ({self._consecutive_failures} failures): {e}")
|
||||
elif self._consecutive_failures < self._max_failures_before_warning:
|
||||
logger.warning(f"RemoteTransport: {method} {path} → {e}")
|
||||
self.connected = False
|
||||
# Apply backoff on connection failures too
|
||||
if self._consecutive_failures >= self._max_failures_before_warning:
|
||||
self._apply_backoff()
|
||||
return None
|
||||
|
||||
async def _arequest_retry(self, method: str, path: str, data: dict | None = None) -> dict | None:
|
||||
"""Request with retry queue — failed POSTs are queued for later."""
|
||||
result = await self._arequest(method, path, data)
|
||||
if result is None and method == "POST" and data is not None:
|
||||
if len(self._retry_queue) < self._retry_queue_max:
|
||||
self._retry_queue.append((method, path, data))
|
||||
return result
|
||||
|
||||
async def flush_retry_queue(self):
|
||||
"""Retry queued failed requests."""
|
||||
if not self._retry_queue or not self.connected:
|
||||
return
|
||||
queue = self._retry_queue[:]
|
||||
self._retry_queue.clear()
|
||||
succeeded = 0
|
||||
for method, path, data in queue:
|
||||
result = await self._arequest(method, path, data)
|
||||
if result is None:
|
||||
if len(self._retry_queue) < self._retry_queue_max:
|
||||
self._retry_queue.append((method, path, data))
|
||||
break
|
||||
succeeded += 1
|
||||
if succeeded:
|
||||
logger.info(f"[RETRY] flushed {succeeded}/{len(queue)} queued requests")
|
||||
|
||||
async def health_check(self) -> bool:
|
||||
"""Check if Gateway is reachable."""
|
||||
result = await self._arequest("GET", "/health")
|
||||
return result is not None and result.get("status") == "ok"
|
||||
|
||||
# ─── Async methods (used by Collector) ───
|
||||
|
||||
async def awrite_json(self, subdir: str, filename: str, data: dict) -> None:
|
||||
if subdir == "pending":
|
||||
await self._arequest_retry("POST", "/api/pending", data)
|
||||
elif subdir == "response":
|
||||
rid = data.get("request_id", filename.replace(".json", ""))
|
||||
await self._arequest_retry("POST", f"/api/response/{rid}", data)
|
||||
|
||||
async def aread_json(self, subdir: str, filename: str) -> dict | None:
|
||||
rid = filename.replace(".json", "")
|
||||
if subdir == "response":
|
||||
return await self._arequest("GET", f"/api/response/{rid}")
|
||||
return None
|
||||
|
||||
async def apoll_commands(self, project: str) -> list[dict]:
|
||||
result = await self._arequest("GET", f"/api/commands/{project}")
|
||||
if result and isinstance(result, dict):
|
||||
return result.get("commands", [])
|
||||
return []
|
||||
|
||||
async def aregister_session(self, conv_id: str, project: str) -> None:
|
||||
await self._arequest_retry("POST", "/api/register", {
|
||||
"conversation_id": conv_id, "project_name": project,
|
||||
})
|
||||
|
||||
async def asend_chat(self, project: str, content: str, *, attached_files: list[dict] | None = None) -> None:
|
||||
payload: dict = {"project_name": project, "content": content}
|
||||
if attached_files:
|
||||
payload["attached_files"] = attached_files
|
||||
await self._arequest_retry("POST", "/api/chat", payload)
|
||||
|
||||
async def asend_event(self, event_data: dict) -> None:
|
||||
await self._arequest_retry("POST", "/api/event", event_data)
|
||||
|
||||
# ─── Sync stubs (ABC compliance, not used in Collector) ───
|
||||
|
||||
def list_json_files(self, subdir: str) -> list[str]:
|
||||
return []
|
||||
|
||||
def read_json(self, subdir: str, filename: str) -> dict | None:
|
||||
return None
|
||||
|
||||
def write_json(self, subdir: str, filename: str, data: dict) -> None:
|
||||
pass
|
||||
|
||||
def delete_file(self, subdir: str, filename: str) -> bool:
|
||||
return True
|
||||
|
||||
def ensure_dirs(self) -> None:
|
||||
pass
|
||||
|
||||
|
||||
# ─── Bridge Protocol (uses Transport) ───
|
||||
|
||||
|
||||
522
collector.py
522
collector.py
@@ -1,522 +0,0 @@
|
||||
"""Collector — local relay between Extension (file-based) and Gateway (HTTP).
|
||||
|
||||
.. deprecated::
|
||||
Extension이 WebSocket으로 Hub에 직접 연결하므로 Collector는 더 이상 필요하지 않습니다.
|
||||
BOT_MODE=gateway + Extension WS 연결을 사용하세요.
|
||||
이 모듈은 하위 호환을 위해 유지되며, 향후 제거될 수 있습니다.
|
||||
|
||||
The Collector runs on the local PC alongside the AG IDE.
|
||||
It bridges the gap between the Extension (which writes to local bridge/ files)
|
||||
and the remote Gateway (which manages Discord).
|
||||
|
||||
Flow:
|
||||
Extension → bridge/pending/ → Collector → POST Gateway /api/pending
|
||||
Gateway /api/response/{rid} → Collector → bridge/response/ → Extension
|
||||
Gateway /api/commands/{project} → Collector → bridge/commands/ → Extension
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
import logging
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
|
||||
from bridge import LocalTransport, RemoteTransport
|
||||
from config import Config
|
||||
from watcher import BrainEvent, EventType
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CollectorBridge:
|
||||
"""Bridges local file-based bridge with remote Gateway API.
|
||||
|
||||
Periodically:
|
||||
1. Scans local pending/ → forwards new ones to Gateway
|
||||
2. Polls Gateway for responses → writes to local response/
|
||||
3. Polls Gateway for commands → writes to local commands/
|
||||
"""
|
||||
|
||||
def __init__(self, local: LocalTransport, remote: RemoteTransport,
|
||||
project_name: str, event_queue: asyncio.Queue | None = None):
|
||||
self.local = local
|
||||
self.remote = remote
|
||||
self.project_name = project_name
|
||||
self.event_queue = event_queue
|
||||
self._poll_interval = 5 # seconds (was 3 — reduced I/O frequency)
|
||||
self._running = False
|
||||
|
||||
# Pre-populate with existing pending files → skip on startup (prevents 만료됨 spam)
|
||||
self._startup_pending: set[str] = set()
|
||||
self._forwarded_pending: set[str] = set()
|
||||
self._forwarded_timestamps: dict[str, float] = {} # rid → when forwarded
|
||||
self._pending_hashes: dict[str, str] = {} # rid → content hash (for MERGE/status detection)
|
||||
self._pending_mtimes: dict[str, float] = {} # rid → last known file mtime
|
||||
self._RESPONSE_POLL_TTL = 300 # 5 min — stop polling responses for old pending
|
||||
|
||||
# Project discovery cache (avoid re-reading register/ every cycle)
|
||||
self._cached_projects: set[str] | None = None
|
||||
self._projects_cache_ts: float = 0
|
||||
self._PROJECTS_CACHE_TTL = 60.0 # seconds
|
||||
for fname in self.local.list_json_files("pending"):
|
||||
rid = fname.replace(".json", "")
|
||||
self._startup_pending.add(rid)
|
||||
self._forwarded_pending.add(rid)
|
||||
# Pre-hash existing files
|
||||
data = self.local.read_json("pending", fname)
|
||||
if data:
|
||||
self._pending_hashes[rid] = hashlib.md5(
|
||||
json.dumps(data, sort_keys=True).encode()
|
||||
).hexdigest()
|
||||
# Pre-cache mtime
|
||||
try:
|
||||
fpath = self.local.bridge_dir / "pending" / fname
|
||||
self._pending_mtimes[rid] = fpath.stat().st_mtime
|
||||
except OSError:
|
||||
pass
|
||||
if self._startup_pending:
|
||||
logger.info(f"[COLLECTOR] skipping {len(self._startup_pending)} existing pending files")
|
||||
|
||||
# Startup cleanup: remove stale response files (> 5 min)
|
||||
self._cleanup_stale_responses()
|
||||
|
||||
def _cleanup_stale_responses(self, max_age: int = 300):
|
||||
"""Remove stale response files (> max_age seconds) on startup."""
|
||||
now = time.time()
|
||||
cleaned = 0
|
||||
for fname in self.local.list_json_files("response"):
|
||||
try:
|
||||
fpath = self.local.bridge_dir / "response" / fname
|
||||
if now - fpath.stat().st_mtime > max_age:
|
||||
self.local.delete_file("response", fname)
|
||||
cleaned += 1
|
||||
except OSError:
|
||||
pass
|
||||
if cleaned:
|
||||
logger.info(f"[COLLECTOR] startup cleanup: removed {cleaned} stale response files")
|
||||
|
||||
async def start(self):
|
||||
"""Start the Collector polling loops with staggered offsets.
|
||||
|
||||
Each loop starts with a different delay to prevent all loops from waking
|
||||
up at the same time and causing burst requests to Gateway.
|
||||
"""
|
||||
self._running = True
|
||||
logger.info(f"[COLLECTOR] started for project={self.project_name}")
|
||||
|
||||
async def _staggered(coro, offset: float):
|
||||
await asyncio.sleep(offset)
|
||||
await coro()
|
||||
|
||||
tasks = [
|
||||
_staggered(self._forward_pending_loop, 0.0),
|
||||
_staggered(self._poll_responses_loop, 0.5),
|
||||
_staggered(self._poll_commands_loop, 1.0),
|
||||
_staggered(self._forward_chat_snapshots_loop, 1.5),
|
||||
_staggered(self._forward_registrations_loop, 2.0),
|
||||
_staggered(self._health_check_loop, 2.5),
|
||||
_staggered(self._retry_flush_loop, 3.0),
|
||||
]
|
||||
if self.event_queue:
|
||||
tasks.append(_staggered(self._forward_events_loop, 3.5))
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
async def stop(self):
|
||||
"""Stop the Collector and close HTTP session."""
|
||||
self._running = False
|
||||
await self.remote.close()
|
||||
logger.info("[COLLECTOR] stopped")
|
||||
|
||||
# ─── Forward local pending → Gateway ───
|
||||
|
||||
async def _forward_pending_loop(self):
|
||||
"""Scan local pending/ and forward new + updated requests to Gateway.
|
||||
|
||||
Tracks content hashes to detect:
|
||||
- New pending files → forward immediately
|
||||
- MERGE updates (step_probe updates command text) → re-forward
|
||||
- Status changes (auto_resolved, expired) → re-forward
|
||||
"""
|
||||
while self._running:
|
||||
try:
|
||||
# Skip cycle if rate-limited
|
||||
if self.remote.is_rate_limited:
|
||||
await asyncio.sleep(self._poll_interval)
|
||||
continue
|
||||
|
||||
current_files = set()
|
||||
for fname in self.local.list_json_files("pending"):
|
||||
rid = fname.replace(".json", "")
|
||||
current_files.add(rid)
|
||||
|
||||
# mtime pre-check: skip read+hash if file hasn't been modified
|
||||
try:
|
||||
fpath = self.local.bridge_dir / "pending" / fname
|
||||
current_mtime = fpath.stat().st_mtime
|
||||
except OSError:
|
||||
continue
|
||||
prev_mtime = self._pending_mtimes.get(rid)
|
||||
if prev_mtime is not None and current_mtime == prev_mtime:
|
||||
continue # File untouched since last check — skip read+hash
|
||||
self._pending_mtimes[rid] = current_mtime
|
||||
|
||||
data = self.local.read_json("pending", fname)
|
||||
if data is None:
|
||||
continue
|
||||
|
||||
# Compute content hash to detect changes
|
||||
content_hash = hashlib.md5(
|
||||
json.dumps(data, sort_keys=True).encode()
|
||||
).hexdigest()
|
||||
|
||||
prev_hash = self._pending_hashes.get(rid)
|
||||
if prev_hash == content_hash:
|
||||
continue # No change
|
||||
|
||||
is_new = rid not in self._forwarded_pending
|
||||
status = data.get("status", "pending")
|
||||
|
||||
if rid in self._startup_pending:
|
||||
# Startup files: only forward status CHANGES (not re-forward as new pending)
|
||||
if status == "pending":
|
||||
continue # Still pending from before startup — skip
|
||||
# Status changed (auto_resolved/expired) — forward the update
|
||||
|
||||
# Forward to Gateway (new or updated)
|
||||
await self.remote.awrite_json("pending", fname, data)
|
||||
self._forwarded_pending.add(rid)
|
||||
self._forwarded_timestamps[rid] = time.time()
|
||||
self._pending_hashes[rid] = content_hash
|
||||
|
||||
if is_new:
|
||||
logger.info(f"[COLLECTOR] → Gateway: pending {rid[:12]}")
|
||||
else:
|
||||
logger.info(f"[COLLECTOR] → Gateway: pending UPDATE {rid[:12]} status={status}")
|
||||
|
||||
# ── Phase 1 FIX: delete local auto_resolved/expired after forwarding ──
|
||||
if status in ("auto_resolved", "expired"):
|
||||
try:
|
||||
fpath.unlink()
|
||||
current_files.discard(rid)
|
||||
self._forwarded_pending.discard(rid)
|
||||
self._pending_hashes.pop(rid, None)
|
||||
self._pending_mtimes.pop(rid, None)
|
||||
logger.info(f"[COLLECTOR] 🗑 deleted local {status} pending: {rid[:12]}")
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
# ── Periodic stale cleanup: delete pending files > 10 min old ──
|
||||
now_ts = time.time()
|
||||
for rid in list(current_files):
|
||||
if rid in self._forwarded_pending:
|
||||
forwarded_at = self._forwarded_timestamps.get(rid, 0)
|
||||
if now_ts - forwarded_at > 600: # 10 min since forwarding
|
||||
try:
|
||||
stale_path = self.local.bridge_dir / "pending" / f"{rid}.json"
|
||||
stale_path.unlink()
|
||||
self._forwarded_pending.discard(rid)
|
||||
self._pending_hashes.pop(rid, None)
|
||||
self._pending_mtimes.pop(rid, None)
|
||||
logger.info(f"[COLLECTOR] 🗑 stale cleanup (>10min): {rid[:12]}")
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
# Clean up tracking for deleted files
|
||||
for rid in list(self._forwarded_pending):
|
||||
if rid not in current_files and rid not in self._startup_pending:
|
||||
self._forwarded_pending.discard(rid)
|
||||
self._pending_hashes.pop(rid, None)
|
||||
self._pending_mtimes.pop(rid, None)
|
||||
# Also clean up orphaned hashes/mtimes for files no longer on disk
|
||||
for rid in list(self._pending_hashes):
|
||||
if rid not in current_files:
|
||||
self._pending_hashes.pop(rid, None)
|
||||
self._pending_mtimes.pop(rid, None)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[COLLECTOR] forward_pending error: {e}")
|
||||
|
||||
await asyncio.sleep(self._poll_interval)
|
||||
|
||||
# ─── Poll Gateway responses → local ───
|
||||
|
||||
async def _poll_responses_loop(self):
|
||||
"""Poll Gateway for responses and write them locally for Extension.
|
||||
|
||||
Only polls responses for recently-forwarded pending (within _RESPONSE_POLL_TTL).
|
||||
Expired entries are removed from tracking to prevent request accumulation.
|
||||
"""
|
||||
while self._running:
|
||||
try:
|
||||
# Skip cycle if rate-limited
|
||||
if self.remote.is_rate_limited:
|
||||
await asyncio.sleep(self._poll_interval)
|
||||
continue
|
||||
|
||||
now = time.time()
|
||||
# Clean up expired forwarded pending (stop polling responses for old ones)
|
||||
expired = [
|
||||
rid for rid, ts in self._forwarded_timestamps.items()
|
||||
if now - ts > self._RESPONSE_POLL_TTL
|
||||
]
|
||||
for rid in expired:
|
||||
self._forwarded_pending.discard(rid)
|
||||
self._forwarded_timestamps.pop(rid, None)
|
||||
# NOTE: intentionally keep _pending_hashes[rid] to prevent
|
||||
# re-forward cycle (expired pending would be re-detected as
|
||||
# "new" if hash is cleared). Hash is cleaned up when file
|
||||
# is actually deleted from disk (see _forward_pending_loop).
|
||||
if expired:
|
||||
logger.info(f"[COLLECTOR] expired {len(expired)} stale forwarded pending (>{self._RESPONSE_POLL_TTL}s)")
|
||||
|
||||
# Check each active forwarded pending for a response
|
||||
active_rids = [
|
||||
rid for rid in self._forwarded_pending
|
||||
if rid not in self._startup_pending
|
||||
]
|
||||
for rid in active_rids:
|
||||
# Rate-limit guard: stop polling if we got rate-limited mid-cycle
|
||||
if self.remote.is_rate_limited:
|
||||
break
|
||||
data = await self.remote.aread_json("response", f"{rid}.json")
|
||||
if data is None or data.get("waiting"):
|
||||
await asyncio.sleep(0.3) # Throttle between individual response polls
|
||||
continue
|
||||
|
||||
# Write response locally for Extension to pick up
|
||||
self.local.write_json("response", f"{rid}.json", data)
|
||||
# Also delete local pending file (Extension expects this)
|
||||
self.local.delete_file("pending", f"{rid}.json")
|
||||
self._forwarded_pending.discard(rid)
|
||||
self._forwarded_timestamps.pop(rid, None)
|
||||
approved = data.get("approved", "?")
|
||||
logger.info(f"[COLLECTOR] ← Gateway: response {rid[:12]} approved={approved}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[COLLECTOR] poll_responses error: {e}")
|
||||
|
||||
await asyncio.sleep(self._poll_interval)
|
||||
|
||||
# ─── Poll Gateway commands → local ───
|
||||
|
||||
def _discover_local_projects(self) -> set[str]:
|
||||
"""Discover all project names registered by local Extension instances.
|
||||
|
||||
Reads bridge/register/*.json files, which are written by each AG window's
|
||||
Extension with {conversation_id, project_name}. Returns unique project names
|
||||
found, always including self.project_name as a fallback.
|
||||
|
||||
Results are cached for _PROJECTS_CACHE_TTL seconds to avoid re-reading
|
||||
22+ register files every polling cycle.
|
||||
"""
|
||||
now = time.time()
|
||||
if self._cached_projects is not None and now - self._projects_cache_ts < self._PROJECTS_CACHE_TTL:
|
||||
return self._cached_projects
|
||||
|
||||
projects = {self.project_name}
|
||||
register_dir = self.local.bridge_dir / "register"
|
||||
if not register_dir.exists():
|
||||
self._cached_projects = projects
|
||||
self._projects_cache_ts = now
|
||||
return projects
|
||||
for f in register_dir.glob("*.json"):
|
||||
try:
|
||||
data = json.loads(f.read_text(encoding="utf-8-sig"))
|
||||
p = data.get("project_name", "")
|
||||
if p:
|
||||
projects.add(p)
|
||||
except (json.JSONDecodeError, OSError):
|
||||
pass
|
||||
self._cached_projects = projects
|
||||
self._projects_cache_ts = now
|
||||
return projects
|
||||
|
||||
def _get_session_project(self, conversation_id: str) -> str | None:
|
||||
"""Look up which project a session belongs to from register/ files.
|
||||
|
||||
Returns project_name if found, None if unknown (allow forwarding).
|
||||
Uses _discover_local_projects cache timing to avoid redundant I/O.
|
||||
"""
|
||||
register_dir = self.local.bridge_dir / "register"
|
||||
reg_file = register_dir / f"{conversation_id}.json"
|
||||
if reg_file.exists():
|
||||
try:
|
||||
data = json.loads(reg_file.read_text(encoding="utf-8-sig"))
|
||||
return data.get("project_name", "")
|
||||
except (json.JSONDecodeError, OSError):
|
||||
pass
|
||||
return None # Unknown → allow (don't block unregistered sessions)
|
||||
|
||||
async def _poll_commands_loop(self):
|
||||
"""Poll Gateway for commands with adaptive per-project intervals.
|
||||
|
||||
When a project returns empty commands repeatedly, its poll interval
|
||||
increases (3s → 10s → 30s → 60s). On receiving a command, interval
|
||||
resets to base. This prevents idle projects from wasting requests.
|
||||
"""
|
||||
# Per-project adaptive state
|
||||
project_intervals: dict[str, float] = {} # project → current interval
|
||||
project_last_poll: dict[str, float] = {} # project → last poll timestamp
|
||||
_BASE_INTERVAL = 3.0
|
||||
_IDLE_STEPS = [10.0, 30.0, 60.0] # progressive idle intervals
|
||||
project_empty_streak: dict[str, int] = {} # project → consecutive empty polls
|
||||
|
||||
while self._running:
|
||||
try:
|
||||
# Skip cycle if rate-limited
|
||||
if not self.remote.is_rate_limited:
|
||||
projects = self._discover_local_projects()
|
||||
now = time.time()
|
||||
for project in projects:
|
||||
if self.remote.is_rate_limited:
|
||||
break
|
||||
|
||||
# Check if this project's interval has elapsed
|
||||
interval = project_intervals.get(project, _BASE_INTERVAL)
|
||||
last = project_last_poll.get(project, 0)
|
||||
if now - last < interval:
|
||||
continue # Not time yet for this project
|
||||
|
||||
project_last_poll[project] = now
|
||||
commands = await self.remote.apoll_commands(project)
|
||||
|
||||
if commands:
|
||||
# Got commands → reset to base interval
|
||||
project_intervals[project] = _BASE_INTERVAL
|
||||
project_empty_streak[project] = 0
|
||||
for cmd in commands:
|
||||
cmd_id = cmd.get("id", f"{int(time.time() * 1000)}_{uuid.uuid4().hex[:8]}")
|
||||
fname = f"{cmd_id}.json"
|
||||
self.local.write_json("commands", fname, cmd)
|
||||
logger.info(f"[COLLECTOR] ← Gateway: command [{project}] {cmd.get('text', '?')[:30]}")
|
||||
else:
|
||||
# Empty → increase interval progressively
|
||||
streak = project_empty_streak.get(project, 0) + 1
|
||||
project_empty_streak[project] = streak
|
||||
if streak <= len(_IDLE_STEPS):
|
||||
project_intervals[project] = _IDLE_STEPS[streak - 1]
|
||||
# else stays at max (60s)
|
||||
|
||||
await asyncio.sleep(0.3) # Throttle between projects
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[COLLECTOR] poll_commands error: {e}")
|
||||
|
||||
await asyncio.sleep(self._poll_interval)
|
||||
|
||||
# ─── Forward chat snapshots → Gateway ───
|
||||
|
||||
async def _forward_chat_snapshots_loop(self):
|
||||
"""Forward chat_snapshots/ from Extension to Gateway."""
|
||||
while self._running:
|
||||
try:
|
||||
snap_dir = self.local.bridge_dir / "chat_snapshots"
|
||||
if snap_dir.exists():
|
||||
for f in snap_dir.glob("*.json"):
|
||||
try:
|
||||
data = json.loads(f.read_text(encoding="utf-8-sig"))
|
||||
project = data.get("project_name", self.project_name)
|
||||
content = data.get("content", "")
|
||||
attached_files = data.get("attached_files", [])
|
||||
if content or attached_files:
|
||||
await self.remote.asend_chat(project, content, attached_files=attached_files)
|
||||
af_info = f" +{len(attached_files)} files" if attached_files else ""
|
||||
logger.info(f"[COLLECTOR] → Gateway: chat snapshot len={len(content)}{af_info}")
|
||||
f.unlink() # Cleanup after forwarding
|
||||
except (json.JSONDecodeError, OSError) as e:
|
||||
logger.warning(f"[COLLECTOR] bad chat snapshot {f.name}: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"[COLLECTOR] forward_chat_snapshots error: {e}")
|
||||
|
||||
await asyncio.sleep(10) # Chat snapshots: less urgent, 10s interval
|
||||
|
||||
# ─── Forward session registrations → Gateway ───
|
||||
|
||||
async def _forward_registrations_loop(self):
|
||||
"""Forward register/ files from Extension to Gateway."""
|
||||
forwarded_regs: set[str] = set()
|
||||
while self._running:
|
||||
try:
|
||||
register_dir = self.local.bridge_dir / "register"
|
||||
if register_dir.exists():
|
||||
for f in register_dir.glob("*.json"):
|
||||
if f.name in forwarded_regs:
|
||||
continue
|
||||
try:
|
||||
data = json.loads(f.read_text(encoding="utf-8-sig"))
|
||||
conv_id = data.get("conversation_id", "")
|
||||
project = data.get("project_name", "")
|
||||
if conv_id and project:
|
||||
await self.remote.aregister_session(conv_id, project)
|
||||
forwarded_regs.add(f.name)
|
||||
logger.info(f"[COLLECTOR] → Gateway: register {conv_id[:8]} → {project}")
|
||||
await asyncio.sleep(0.3) # Spread startup burst
|
||||
except (json.JSONDecodeError, OSError) as e:
|
||||
logger.warning(f"[COLLECTOR] bad register {f.name}: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"[COLLECTOR] forward_registrations error: {e}")
|
||||
|
||||
await asyncio.sleep(30) # Registration changes rarely — 30s interval
|
||||
# ─── Forward brain events → Gateway ───
|
||||
|
||||
async def _forward_events_loop(self):
|
||||
"""Read BrainEvents from Watcher queue and POST to Gateway.
|
||||
|
||||
Phase 2 FIX: Only forward events for sessions belonging to this project.
|
||||
Uses register/ files to determine session→project mapping.
|
||||
"""
|
||||
while self._running:
|
||||
try:
|
||||
event: BrainEvent = await asyncio.wait_for(
|
||||
self.event_queue.get(), timeout=5.0
|
||||
)
|
||||
|
||||
# ── Project filter: only forward events for MY project ──
|
||||
conv_id = event.conversation_id
|
||||
session_project = self._get_session_project(conv_id)
|
||||
if session_project and session_project != self.project_name:
|
||||
# Skip: this session belongs to another project
|
||||
continue
|
||||
|
||||
# Serialize event to JSON
|
||||
event_data = {
|
||||
"event_type": event.event_type.value,
|
||||
"conversation_id": event.conversation_id,
|
||||
"file_name": event.file_name,
|
||||
"file_path": str(event.file_path) if event.file_path else "",
|
||||
"content": event.content,
|
||||
"timestamp": event.timestamp,
|
||||
}
|
||||
await self.remote.asend_event(event_data)
|
||||
logger.info(f"[COLLECTOR] → Gateway: event {event.event_type.value} {event.file_name}")
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
except Exception as e:
|
||||
logger.error(f"[COLLECTOR] forward_event error: {e}")
|
||||
|
||||
# ─── Health check ───
|
||||
|
||||
async def _health_check_loop(self):
|
||||
"""Periodically check Gateway connectivity."""
|
||||
while self._running:
|
||||
try:
|
||||
ok = await self.remote.health_check()
|
||||
if not ok and self.remote.connected:
|
||||
logger.warning("[COLLECTOR] ❌ Gateway health check failed")
|
||||
except Exception:
|
||||
pass
|
||||
await asyncio.sleep(30)
|
||||
|
||||
# ─── Retry flush ───
|
||||
|
||||
async def _retry_flush_loop(self):
|
||||
"""Periodically flush failed request retry queue."""
|
||||
while self._running:
|
||||
try:
|
||||
await self.remote.flush_retry_queue()
|
||||
except Exception as e:
|
||||
logger.error(f"[COLLECTOR] retry flush error: {e}")
|
||||
await asyncio.sleep(30) # Retry flush: 30s interval (was 10s)
|
||||
@@ -43,9 +43,8 @@ class Config:
|
||||
CHANNEL_PREFIX: str = "AG"
|
||||
PROJECT_NAME: str = os.getenv("PROJECT_NAME", "gravity_control")
|
||||
|
||||
# Bot mode: 'local' (file-based bridge) or 'remote' (HTTP polling — future)
|
||||
# Bot mode: 'local' (file-based bridge) or 'gateway' (WS Hub + HTTP API)
|
||||
BOT_MODE: str = os.getenv("BOT_MODE", "local")
|
||||
REMOTE_BRIDGE_URL: str = os.getenv("REMOTE_BRIDGE_URL", "")
|
||||
GATEWAY_API_KEY: str = os.getenv("GATEWAY_API_KEY", "")
|
||||
|
||||
# WebSocket Hub
|
||||
|
||||
@@ -1,10 +1,5 @@
|
||||
# Devlog — 2026-03-18
|
||||
# 2026-03-18 Devlog
|
||||
|
||||
| # | 시간 | 작업 | 커밋 | 상태 |
|
||||
|---|------|------|------|------|
|
||||
| 001 | 06:09~06:26 | known-issues 정리/아카이빙 + Collector 폐기 마킹 + 문서 보강 (architecture, tech-stack, conventions) | `881a424` | ✅ |
|
||||
| 002 | 06:33~06:50 | Hub/WS 단위 테스트 45개 작성 (연결 관리, pending_owners, 라우팅, 인증) | `ac803d4` | ✅ |
|
||||
| 003 | 06:50~07:05 | !stop 핸들러 SDK cancelCurrentTask() 교체 + VSIX 재빌드/설치 | `759dab5` | ✅ |
|
||||
| 004 | 07:03~07:15 | !stop 재수정 — cancelCurrentTask→CancelCascadeInvocation RPC (AG 빨간■ 동일) + VSIX 설치 | `8d5940b` | ✅ |
|
||||
| 005 | 07:50~08:20 | !stop 최종 수정 — getActiveCascadeId (렌더러 전용 DOM) → activeSessionId (step-probe 폴링) + 데이터 흐름 5경로 검증 | `d55b6b9` | ✅ |
|
||||
| 006 | 08:25~09:15 | !stop 4차 수정 — activeSessionId 스테일 프리미티브 버그. extension.ts closure가 module-level string(불변) 참조 → step-probe getter export로 교체 + VSIX v0.4.6 | `ab0c116` | ✅ |
|
||||
| 1 | 11:00 | v0.5.0 Collector 제거 + dead code 정리 + HttpBridgeContext 버그 수정 | `pending` | 🔧 |
|
||||
|
||||
@@ -67,6 +67,15 @@ function logToFile(msg) {
|
||||
if (!bridgePath)
|
||||
return;
|
||||
const logFile = path.join(bridgePath, 'extension.log');
|
||||
// Log rotation: truncate when >10MB, keep last 2MB
|
||||
try {
|
||||
const stat = fs.statSync(logFile);
|
||||
if (stat.size > 10 * 1024 * 1024) {
|
||||
const content = fs.readFileSync(logFile, 'utf-8');
|
||||
fs.writeFileSync(logFile, content.slice(-2 * 1024 * 1024), 'utf-8');
|
||||
}
|
||||
}
|
||||
catch { /* file doesn't exist yet */ }
|
||||
fs.appendFileSync(logFile, line + '\n', 'utf-8');
|
||||
}
|
||||
catch (e) {
|
||||
@@ -84,15 +93,6 @@ let isActive = false;
|
||||
let autoApproveEnabled = false; // toggled via !auto from Discord
|
||||
let watcher = null;
|
||||
let wsBridge = null; // WebSocket Hub connection
|
||||
const sentPendingIds = new Set();
|
||||
// Memory-based dedup: tracks recently created pending step_indexes to prevent
|
||||
// regeneration after pending file deletion (by Collector/Bot response cycle).
|
||||
// Map<string, number> = `${conversationId}:${stepIndex}` → creation timestamp
|
||||
const recentPendingSteps = new Map();
|
||||
const PENDING_MEMORY_TTL_MS = 60_000; // 60 seconds memory retention
|
||||
// In-memory cache for diff_review metadata (survives pending file deletion by Collector).
|
||||
// Map<request_id, { edit_step_indices, modified_files }>
|
||||
const diffReviewMetadata = new Map();
|
||||
// ─── Project Detection ───
|
||||
function detectProjectName() {
|
||||
const config = vscode.workspace.getConfiguration('gravityBridge');
|
||||
@@ -374,11 +374,6 @@ async function fixLSConnection() {
|
||||
}
|
||||
// ─── Approval Observer + Product.json Checksums extracted to ./html-patcher.ts ───
|
||||
// ─── HTTP Bridge Server extracted to ./http-bridge.ts ───
|
||||
// Shared state for HTTP bridge context (module-level, referenced by BridgeContext too)
|
||||
let sessionStalled = false;
|
||||
let lastPendingStepIndex = -1;
|
||||
let stallProbed = false;
|
||||
let sawRunningAfterPending = true;
|
||||
// ─── Step Probe, Response Watcher, Approval Strategies → extracted to ./step-probe.ts ───
|
||||
async function activate(context) {
|
||||
console.log('Gravity Bridge: activating...');
|
||||
@@ -503,12 +498,11 @@ async function activate(context) {
|
||||
projectName,
|
||||
sdk,
|
||||
wsBridge,
|
||||
autoApproveEnabled,
|
||||
activeSessionId,
|
||||
sessionStalled,
|
||||
lastPendingStepIndex,
|
||||
stallProbed,
|
||||
sawRunningAfterPending,
|
||||
sessionStalled: false,
|
||||
lastPendingStepIndex: -1,
|
||||
stallProbed: false,
|
||||
sawRunningAfterPending: true,
|
||||
setClickTrigger: (action) => {
|
||||
const { setClickTrigger: setTrigger } = require('./http-bridge');
|
||||
setTrigger(action);
|
||||
@@ -520,10 +514,12 @@ async function activate(context) {
|
||||
writeChatSnapshot,
|
||||
writeChatSnapshotWithFiles,
|
||||
});
|
||||
// Start HTTP bridge, then setup observer
|
||||
// Start HTTP bridge with live step-probe state (prevents stale primitive bug)
|
||||
const httpBridgeCtx = {
|
||||
bridgePath, projectName, activeSessionId, wsBridge,
|
||||
sessionStalled, lastPendingStepIndex, logToFile,
|
||||
bridgePath, projectName, wsBridge, logToFile,
|
||||
get activeSessionId() { return (0, step_probe_1.getStepProbeContext)().activeSessionId; },
|
||||
get sessionStalled() { return (0, step_probe_1.getStepProbeContext)().sessionStalled; },
|
||||
get lastPendingStepIndex() { return (0, step_probe_1.getStepProbeContext)().lastPendingStepIndex; },
|
||||
};
|
||||
const bridgePort = await (0, http_bridge_1.startHttpBridge)(httpBridgeCtx, sdk);
|
||||
if (bridgePort) {
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -2,7 +2,7 @@
|
||||
"name": "gravity-bridge",
|
||||
"displayName": "Gravity Bridge",
|
||||
"description": "Antigravity ↔ Discord 브리지 연동 확장",
|
||||
"version": "0.4.6",
|
||||
"version": "0.5.0",
|
||||
"publisher": "variet",
|
||||
"engines": {
|
||||
"vscode": "^1.100.0"
|
||||
|
||||
@@ -16,7 +16,7 @@ import * as path from 'path';
|
||||
import * as os from 'os';
|
||||
import * as cp from 'child_process';
|
||||
import { WSBridgeClient, WSResponseData, WSCommandData } from './ws-client';
|
||||
import { initStepProbe, BridgeContext, writePendingApproval, tryApprovalStrategies, writeRegistration, getApprovalContext, resetPendingState, handleDiffReviewResponse, getActiveSessionId as getStepProbeSessionId } from './step-probe';
|
||||
import { initStepProbe, BridgeContext, writePendingApproval, tryApprovalStrategies, writeRegistration, getApprovalContext, resetPendingState, handleDiffReviewResponse, getActiveSessionId as getStepProbeSessionId, getStepProbeContext } from './step-probe';
|
||||
import { startHttpBridge, getDeterministicPort, HttpBridgeContext } from './http-bridge';
|
||||
import { setupApprovalObserver } from './html-patcher';
|
||||
import { watchCommandsDir, handleWSCommand, disposeCommandsWatcher, CommandHandlerContext } from './command-handler';
|
||||
@@ -31,6 +31,14 @@ function logToFile(msg: string) {
|
||||
try {
|
||||
if (!bridgePath) return;
|
||||
const logFile = path.join(bridgePath, 'extension.log');
|
||||
// Log rotation: truncate when >10MB, keep last 2MB
|
||||
try {
|
||||
const stat = fs.statSync(logFile);
|
||||
if (stat.size > 10 * 1024 * 1024) {
|
||||
const content = fs.readFileSync(logFile, 'utf-8');
|
||||
fs.writeFileSync(logFile, content.slice(-2 * 1024 * 1024), 'utf-8');
|
||||
}
|
||||
} catch { /* file doesn't exist yet */ }
|
||||
fs.appendFileSync(logFile, line + '\n', 'utf-8');
|
||||
} catch (e: any) {
|
||||
console.error(`Gravity Bridge LOG WRITE FAIL: ${e.message}`);
|
||||
@@ -50,15 +58,6 @@ let autoApproveEnabled = false; // toggled via !auto from Discord
|
||||
let watcher: fs.FSWatcher | null = null;
|
||||
let wsBridge: WSBridgeClient | null = null; // WebSocket Hub connection
|
||||
|
||||
const sentPendingIds = new Set<string>();
|
||||
// Memory-based dedup: tracks recently created pending step_indexes to prevent
|
||||
// regeneration after pending file deletion (by Collector/Bot response cycle).
|
||||
// Map<string, number> = `${conversationId}:${stepIndex}` → creation timestamp
|
||||
const recentPendingSteps = new Map<string, number>();
|
||||
const PENDING_MEMORY_TTL_MS = 60_000; // 60 seconds memory retention
|
||||
// In-memory cache for diff_review metadata (survives pending file deletion by Collector).
|
||||
// Map<request_id, { edit_step_indices, modified_files }>
|
||||
const diffReviewMetadata = new Map<string, { edit_step_indices: number[]; modified_files: string[] }>();
|
||||
|
||||
// ─── Project Detection ───
|
||||
|
||||
@@ -357,12 +356,6 @@ async function fixLSConnection(): Promise<void> {
|
||||
|
||||
// ─── HTTP Bridge Server extracted to ./http-bridge.ts ───
|
||||
|
||||
// Shared state for HTTP bridge context (module-level, referenced by BridgeContext too)
|
||||
let sessionStalled = false;
|
||||
let lastPendingStepIndex = -1;
|
||||
let stallProbed = false;
|
||||
let sawRunningAfterPending = true;
|
||||
|
||||
// ─── Step Probe, Response Watcher, Approval Strategies → extracted to ./step-probe.ts ───
|
||||
|
||||
|
||||
@@ -500,12 +493,11 @@ export async function activate(context: vscode.ExtensionContext) {
|
||||
projectName,
|
||||
sdk,
|
||||
wsBridge,
|
||||
autoApproveEnabled,
|
||||
activeSessionId,
|
||||
sessionStalled,
|
||||
lastPendingStepIndex,
|
||||
stallProbed,
|
||||
sawRunningAfterPending,
|
||||
sessionStalled: false,
|
||||
lastPendingStepIndex: -1,
|
||||
stallProbed: false,
|
||||
sawRunningAfterPending: true,
|
||||
setClickTrigger: (action: 'approve' | 'reject') => {
|
||||
const { setClickTrigger: setTrigger } = require('./http-bridge');
|
||||
setTrigger(action);
|
||||
@@ -517,10 +509,12 @@ export async function activate(context: vscode.ExtensionContext) {
|
||||
writeChatSnapshot,
|
||||
writeChatSnapshotWithFiles,
|
||||
} as BridgeContext);
|
||||
// Start HTTP bridge, then setup observer
|
||||
// Start HTTP bridge with live step-probe state (prevents stale primitive bug)
|
||||
const httpBridgeCtx: HttpBridgeContext = {
|
||||
bridgePath, projectName, activeSessionId, wsBridge,
|
||||
sessionStalled, lastPendingStepIndex, logToFile,
|
||||
bridgePath, projectName, wsBridge, logToFile,
|
||||
get activeSessionId() { return getStepProbeContext().activeSessionId; },
|
||||
get sessionStalled() { return getStepProbeContext().sessionStalled; },
|
||||
get lastPendingStepIndex() { return getStepProbeContext().lastPendingStepIndex; },
|
||||
};
|
||||
const bridgePort = await startHttpBridge(httpBridgeCtx, sdk);
|
||||
if (bridgePort) {
|
||||
|
||||
@@ -14,7 +14,6 @@ export interface BridgeContext {
|
||||
projectName: string;
|
||||
sdk: any;
|
||||
wsBridge: WSBridgeClient | null;
|
||||
autoApproveEnabled: boolean;
|
||||
activeSessionId: string;
|
||||
sessionStalled: boolean;
|
||||
lastPendingStepIndex: number;
|
||||
@@ -64,6 +63,18 @@ export function getActiveSessionId(): string {
|
||||
return ctx?.activeSessionId || '';
|
||||
}
|
||||
|
||||
/**
|
||||
* Get step-probe context values for http-bridge.
|
||||
* Prevents stale primitive copies by reading live ctx values.
|
||||
*/
|
||||
export function getStepProbeContext(): { activeSessionId: string; sessionStalled: boolean; lastPendingStepIndex: number } {
|
||||
return {
|
||||
activeSessionId: ctx?.activeSessionId || '',
|
||||
sessionStalled: ctx?.sessionStalled ?? false,
|
||||
lastPendingStepIndex: ctx?.lastPendingStepIndex ?? -1,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset pending state after successful approval.
|
||||
* Called after WS response triggers approval in extension.ts.
|
||||
|
||||
147
gateway.py
147
gateway.py
@@ -1,20 +1,10 @@
|
||||
"""Gateway HTTP API + WebSocket Hub — receives data from Collectors and Extensions.
|
||||
"""Gateway HTTP API + WebSocket Hub — serves WebSocket Hub and diagnostics.
|
||||
|
||||
Runs alongside the Discord bot in the server Docker container.
|
||||
Supports both:
|
||||
- REST API: for legacy Collectors (HTTP polling)
|
||||
- WebSocket: for direct Extension connections (real-time)
|
||||
|
||||
Endpoints:
|
||||
GET /ws — WebSocket endpoint (Extension direct connection)
|
||||
POST /api/pending — Collector pushes a new approval request
|
||||
GET /api/pending — List all pending requests (for diagnostics)
|
||||
POST /api/response/{rid} — Collector polls for response (or Gateway pushes)
|
||||
GET /api/response/{rid} — Get response for a specific request
|
||||
POST /api/chat — Collector pushes a chat snapshot
|
||||
POST /api/register — Collector registers session → project mapping
|
||||
POST /api/command — Gateway pushes command to specific collector
|
||||
GET /api/commands/{project} — Collector polls for commands
|
||||
GET /health — Health check
|
||||
GET /hub/status — WebSocket Hub diagnostics
|
||||
"""
|
||||
@@ -60,15 +50,9 @@ class GatewayAPI:
|
||||
# WebSocket endpoint (no auth middleware — Hub handles its own auth)
|
||||
self.app.router.add_get("/ws", self._ws_handler)
|
||||
self.app.router.add_get("/hub/status", self._hub_status)
|
||||
# Legacy REST endpoints (Collector compatibility)
|
||||
# REST endpoints
|
||||
self.app.router.add_get("/health", self._health)
|
||||
self.app.router.add_post("/api/pending", self._post_pending)
|
||||
self.app.router.add_get("/api/pending", self._list_pending)
|
||||
self.app.router.add_get("/api/response/{rid}", self._get_response)
|
||||
self.app.router.add_post("/api/chat", self._post_chat)
|
||||
self.app.router.add_post("/api/register", self._post_register)
|
||||
self.app.router.add_get("/api/commands/{project}", self._get_commands)
|
||||
self.app.router.add_post("/api/event", self._post_event)
|
||||
|
||||
# ─── WebSocket Handler ───
|
||||
|
||||
@@ -144,25 +128,7 @@ class GatewayAPI:
|
||||
status["hub_connections"] = len(self.hub.connections)
|
||||
return web.json_response(status)
|
||||
|
||||
# ─── Pending Approvals (Collector → Gateway → Discord) ───
|
||||
|
||||
async def _post_pending(self, request: web.Request) -> web.Response:
|
||||
"""Collector pushes a pending approval request."""
|
||||
try:
|
||||
data = await request.json()
|
||||
rid = data.get("request_id", f"{int(time.time() * 1000)}_{uuid.uuid4().hex[:8]}")
|
||||
data["request_id"] = rid
|
||||
data.setdefault("timestamp", time.time())
|
||||
data.setdefault("status", "pending")
|
||||
|
||||
# Write to bridge pending dir (bot's scanner will pick it up)
|
||||
self.bot.bridge.transport.write_json("pending", f"{rid}.json", data)
|
||||
logger.info(f"[GATEWAY] pending received: {rid[:12]} project={data.get('project_name', '?')}")
|
||||
|
||||
return web.json_response({"ok": True, "request_id": rid})
|
||||
except Exception as e:
|
||||
logger.error(f"[GATEWAY] pending error: {e}")
|
||||
return web.json_response({"ok": False, "error": str(e)}, status=400)
|
||||
# ─── Pending List (Diagnostics) ───
|
||||
|
||||
async def _list_pending(self, request: web.Request) -> web.Response:
|
||||
"""List all pending requests (diagnostics)."""
|
||||
@@ -174,88 +140,14 @@ class GatewayAPI:
|
||||
"status": r.status,
|
||||
} for r in requests])
|
||||
|
||||
# ─── Responses (Discord → Gateway → Collector) ───
|
||||
|
||||
async def _get_response(self, request: web.Request) -> web.Response:
|
||||
"""Collector polls for a response to a specific pending request."""
|
||||
rid = request.match_info["rid"]
|
||||
data = self.bot.bridge.transport.read_json("response", f"{rid}.json")
|
||||
if data is None:
|
||||
return web.json_response({"waiting": True, "request_id": rid})
|
||||
|
||||
# Serve response and delete both response + pending files (one-time consumption)
|
||||
self.bot.bridge.transport.delete_file("response", f"{rid}.json")
|
||||
self.bot.bridge.transport.delete_file("pending", f"{rid}.json") # Bug 2 fix
|
||||
return web.json_response(data)
|
||||
|
||||
# ─── Chat Snapshots (Collector → Gateway → Discord) ───
|
||||
|
||||
async def _post_chat(self, request: web.Request) -> web.Response:
|
||||
"""Collector pushes a chat snapshot for relay to Discord."""
|
||||
try:
|
||||
data = await request.json()
|
||||
project = data.get("project_name", "")
|
||||
content = data.get("content", "")
|
||||
attached_files = data.get("attached_files", [])
|
||||
|
||||
if not project or (not content and not attached_files):
|
||||
return web.json_response({"ok": False, "error": "project_name and content/attached_files required"}, status=400)
|
||||
|
||||
# Write to chat_snapshots dir for bot's scanner
|
||||
snap_dir = self.bot.bridge.transport.bridge_dir / "chat_snapshots" if hasattr(self.bot.bridge.transport, 'bridge_dir') else None
|
||||
if snap_dir:
|
||||
snap_dir.mkdir(parents=True, exist_ok=True)
|
||||
snap_id = f"{int(time.time() * 1000)}_{uuid.uuid4().hex[:8]}"
|
||||
snap_data = {
|
||||
"id": snap_id,
|
||||
"project_name": project,
|
||||
"content": content,
|
||||
"timestamp": time.time(),
|
||||
}
|
||||
if attached_files:
|
||||
snap_data["attached_files"] = attached_files
|
||||
(snap_dir / f"{snap_id}.json").write_text(
|
||||
json.dumps(snap_data, ensure_ascii=False, indent=2),
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
af_info = f" +{len(attached_files)} files" if attached_files else ""
|
||||
logger.info(f"[GATEWAY] chat received: project={project} len={len(content)}{af_info}")
|
||||
return web.json_response({"ok": True})
|
||||
except Exception as e:
|
||||
logger.error(f"[GATEWAY] chat error: {e}")
|
||||
return web.json_response({"ok": False, "error": str(e)}, status=400)
|
||||
|
||||
# ─── Registration (Collector → Gateway) ───
|
||||
|
||||
async def _post_register(self, request: web.Request) -> web.Response:
|
||||
"""Collector registers session → project mapping."""
|
||||
try:
|
||||
data = await request.json()
|
||||
session_id = data.get("conversation_id", "")
|
||||
project = data.get("project_name", "")
|
||||
if session_id and project:
|
||||
self.bot.conv_to_project[session_id] = project
|
||||
logger.info(f"[GATEWAY] registered: {session_id[:8]} → {project}")
|
||||
return web.json_response({"ok": True})
|
||||
except Exception as e:
|
||||
return web.json_response({"ok": False, "error": str(e)}, status=400)
|
||||
|
||||
# ─── Commands (Gateway → Collector) ───
|
||||
|
||||
async def _get_commands(self, request: web.Request) -> web.Response:
|
||||
"""Collector polls for commands (e.g. !auto, !stop, text messages)."""
|
||||
project = request.match_info["project"]
|
||||
commands = self._commands.pop(project, [])
|
||||
return web.json_response({"commands": commands})
|
||||
# ─── Commands (Legacy fallback: Bot → Extension via HTTP when Hub unavailable) ───
|
||||
|
||||
def push_command(self, project: str, command: dict):
|
||||
"""Bot pushes a command for a Collector to pick up."""
|
||||
"""Bot pushes a command for Extension to pick up (Hub fallback)."""
|
||||
if project not in self._commands:
|
||||
self._commands[project] = []
|
||||
command.setdefault("_ts", time.time()) # TTL tracking
|
||||
self._commands[project].append(command)
|
||||
# Auto-cleanup stale commands (Security 3: memory leak prevention)
|
||||
self._cleanup_stale_commands()
|
||||
|
||||
def _cleanup_stale_commands(self):
|
||||
@@ -269,35 +161,6 @@ class GatewayAPI:
|
||||
if not self._commands[project]:
|
||||
del self._commands[project]
|
||||
|
||||
# ─── Brain Events (Collector → Gateway → Discord) ───
|
||||
|
||||
async def _post_event(self, request: web.Request) -> web.Response:
|
||||
"""Collector pushes a brain event (file change) for relay to Discord."""
|
||||
try:
|
||||
data = await request.json()
|
||||
from watcher import BrainEvent, EventType
|
||||
|
||||
event_type_str = data.get("event_type", "file_changed")
|
||||
event_type = EventType(event_type_str)
|
||||
|
||||
event = BrainEvent(
|
||||
event_type=event_type,
|
||||
conversation_id=data.get("conversation_id", ""),
|
||||
file_name=data.get("file_name", ""),
|
||||
file_path=Path(data["file_path"]) if data.get("file_path") else None,
|
||||
content=data.get("content", ""),
|
||||
timestamp=data.get("timestamp", time.time()),
|
||||
)
|
||||
|
||||
# Inject into bot's event queue (same path as local mode)
|
||||
await self.bot.event_queue.put(event)
|
||||
logger.info(f"[GATEWAY] event received: {event_type_str} {event.file_name} conv={event.conversation_id[:8]}")
|
||||
|
||||
return web.json_response({"ok": True})
|
||||
except Exception as e:
|
||||
logger.error(f"[GATEWAY] event error: {e}")
|
||||
return web.json_response({"ok": False, "error": str(e)}, status=400)
|
||||
|
||||
# ─── Run ───
|
||||
|
||||
async def start(self):
|
||||
|
||||
41
main.py
41
main.py
@@ -51,47 +51,6 @@ async def main():
|
||||
# Get the running loop
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
# ── Collector mode (DEPRECATED): no Discord bot, just relay local ↔ Gateway ──
|
||||
if Config.BOT_MODE == "remote":
|
||||
logger.warning("=" * 50)
|
||||
logger.warning("⚠️ Collector mode (BOT_MODE=remote) is DEPRECATED")
|
||||
logger.warning("Extension이 WebSocket으로 Hub에 직접 연결합니다.")
|
||||
logger.warning("BOT_MODE=gateway로 전환하세요.")
|
||||
logger.warning("=" * 50)
|
||||
|
||||
from bridge import LocalTransport, RemoteTransport
|
||||
from collector import CollectorBridge
|
||||
|
||||
if not Config.REMOTE_BRIDGE_URL:
|
||||
logger.error("REMOTE_BRIDGE_URL is required for remote (Collector) mode")
|
||||
sys.exit(1)
|
||||
|
||||
bridge_dir = Config.BRAIN_PATH.parent / "bridge"
|
||||
local = LocalTransport(bridge_dir)
|
||||
local.ensure_dirs()
|
||||
remote = RemoteTransport(Config.REMOTE_BRIDGE_URL, api_key=Config.GATEWAY_API_KEY)
|
||||
|
||||
collector = CollectorBridge(local, remote, project_name=Config.PROJECT_NAME,
|
||||
event_queue=event_queue)
|
||||
logger.info(f"Collector mode: {Config.REMOTE_BRIDGE_URL}")
|
||||
|
||||
# Optionally start watcher for brain events (local display only)
|
||||
watcher = BrainWatcher(event_queue, loop)
|
||||
|
||||
try:
|
||||
watcher.start()
|
||||
logger.info(f"Watcher started, {len(watcher.known_sessions)} existing sessions")
|
||||
await collector.start()
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Received keyboard interrupt")
|
||||
except Exception as e:
|
||||
logger.error(f"Fatal error: {e}", exc_info=True)
|
||||
finally:
|
||||
await collector.stop()
|
||||
watcher.stop()
|
||||
logger.info("Collector shutdown complete")
|
||||
return
|
||||
|
||||
# ── Local / Gateway mode ──
|
||||
|
||||
# Create components
|
||||
|
||||
@@ -7,11 +7,10 @@ echo ║ Gravity Bridge Bot Launcher ║
|
||||
echo ╚══════════════════════════════════════╝
|
||||
echo.
|
||||
|
||||
echo [WARN] 로컬 Collector (BOT_MODE=remote)는 더 이상 필요하지 않습니다.
|
||||
echo [WARN] Extension이 WebSocket으로 Hub에 직접 연결합니다.
|
||||
echo [WARN] 서버에서 Docker로 실행하세요: docker compose up -d
|
||||
echo [INFO] 로컬 테스트 (BOT_MODE=local)를 시작합니다.
|
||||
echo [INFO] 서버 배포는 BOT_MODE=gateway로 실행하세요.
|
||||
echo.
|
||||
echo 로컬 테스트 (BOT_MODE=local)를 원하시면 아무 키나 누르세요...
|
||||
echo 시작하려면 아무 키나 누르세요...
|
||||
pause >nul
|
||||
|
||||
REM — Find Python (conda first, then system)
|
||||
|
||||
Reference in New Issue
Block a user