461 lines
21 KiB
Python
461 lines
21 KiB
Python
"""Collector — local relay between Extension (file-based) and Gateway (HTTP).
|
|
|
|
The Collector runs on the local PC alongside the AG IDE.
|
|
It bridges the gap between the Extension (which writes to local bridge/ files)
|
|
and the remote Gateway (which manages Discord).
|
|
|
|
Flow:
|
|
Extension → bridge/pending/ → Collector → POST Gateway /api/pending
|
|
Gateway /api/response/{rid} → Collector → bridge/response/ → Extension
|
|
Gateway /api/commands/{project} → Collector → bridge/commands/ → Extension
|
|
"""
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import time
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
from bridge import LocalTransport, RemoteTransport
|
|
from config import Config
|
|
from watcher import BrainEvent, EventType
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CollectorBridge:
|
|
"""Bridges local file-based bridge with remote Gateway API.
|
|
|
|
Periodically:
|
|
1. Scans local pending/ → forwards new ones to Gateway
|
|
2. Polls Gateway for responses → writes to local response/
|
|
3. Polls Gateway for commands → writes to local commands/
|
|
"""
|
|
|
|
def __init__(self, local: LocalTransport, remote: RemoteTransport,
|
|
project_name: str, event_queue: asyncio.Queue | None = None):
|
|
self.local = local
|
|
self.remote = remote
|
|
self.project_name = project_name
|
|
self.event_queue = event_queue
|
|
self._poll_interval = 5 # seconds (was 3 — reduced I/O frequency)
|
|
self._running = False
|
|
|
|
# Pre-populate with existing pending files → skip on startup (prevents 만료됨 spam)
|
|
self._startup_pending: set[str] = set()
|
|
self._forwarded_pending: set[str] = set()
|
|
self._forwarded_timestamps: dict[str, float] = {} # rid → when forwarded
|
|
self._pending_hashes: dict[str, str] = {} # rid → content hash (for MERGE/status detection)
|
|
self._pending_mtimes: dict[str, float] = {} # rid → last known file mtime
|
|
self._RESPONSE_POLL_TTL = 300 # 5 min — stop polling responses for old pending
|
|
|
|
# Project discovery cache (avoid re-reading register/ every cycle)
|
|
self._cached_projects: set[str] | None = None
|
|
self._projects_cache_ts: float = 0
|
|
self._PROJECTS_CACHE_TTL = 60.0 # seconds
|
|
for fname in self.local.list_json_files("pending"):
|
|
rid = fname.replace(".json", "")
|
|
self._startup_pending.add(rid)
|
|
self._forwarded_pending.add(rid)
|
|
# Pre-hash existing files
|
|
data = self.local.read_json("pending", fname)
|
|
if data:
|
|
self._pending_hashes[rid] = hashlib.md5(
|
|
json.dumps(data, sort_keys=True).encode()
|
|
).hexdigest()
|
|
# Pre-cache mtime
|
|
try:
|
|
fpath = self.local.bridge_dir / "pending" / fname
|
|
self._pending_mtimes[rid] = fpath.stat().st_mtime
|
|
except OSError:
|
|
pass
|
|
if self._startup_pending:
|
|
logger.info(f"[COLLECTOR] skipping {len(self._startup_pending)} existing pending files")
|
|
|
|
# Startup cleanup: remove stale response files (> 5 min)
|
|
self._cleanup_stale_responses()
|
|
|
|
def _cleanup_stale_responses(self, max_age: int = 300):
|
|
"""Remove stale response files (> max_age seconds) on startup."""
|
|
now = time.time()
|
|
cleaned = 0
|
|
for fname in self.local.list_json_files("response"):
|
|
try:
|
|
fpath = self.local.bridge_dir / "response" / fname
|
|
if now - fpath.stat().st_mtime > max_age:
|
|
self.local.delete_file("response", fname)
|
|
cleaned += 1
|
|
except OSError:
|
|
pass
|
|
if cleaned:
|
|
logger.info(f"[COLLECTOR] startup cleanup: removed {cleaned} stale response files")
|
|
|
|
async def start(self):
|
|
"""Start the Collector polling loops with staggered offsets.
|
|
|
|
Each loop starts with a different delay to prevent all loops from waking
|
|
up at the same time and causing burst requests to Gateway.
|
|
"""
|
|
self._running = True
|
|
logger.info(f"[COLLECTOR] started for project={self.project_name}")
|
|
|
|
async def _staggered(coro, offset: float):
|
|
await asyncio.sleep(offset)
|
|
await coro()
|
|
|
|
tasks = [
|
|
_staggered(self._forward_pending_loop, 0.0),
|
|
_staggered(self._poll_responses_loop, 0.5),
|
|
_staggered(self._poll_commands_loop, 1.0),
|
|
_staggered(self._forward_chat_snapshots_loop, 1.5),
|
|
_staggered(self._forward_registrations_loop, 2.0),
|
|
_staggered(self._health_check_loop, 2.5),
|
|
_staggered(self._retry_flush_loop, 3.0),
|
|
]
|
|
if self.event_queue:
|
|
tasks.append(_staggered(self._forward_events_loop, 3.5))
|
|
await asyncio.gather(*tasks)
|
|
|
|
async def stop(self):
|
|
"""Stop the Collector and close HTTP session."""
|
|
self._running = False
|
|
await self.remote.close()
|
|
logger.info("[COLLECTOR] stopped")
|
|
|
|
# ─── Forward local pending → Gateway ───
|
|
|
|
async def _forward_pending_loop(self):
|
|
"""Scan local pending/ and forward new + updated requests to Gateway.
|
|
|
|
Tracks content hashes to detect:
|
|
- New pending files → forward immediately
|
|
- MERGE updates (step_probe updates command text) → re-forward
|
|
- Status changes (auto_resolved, expired) → re-forward
|
|
"""
|
|
while self._running:
|
|
try:
|
|
# Skip cycle if rate-limited
|
|
if self.remote.is_rate_limited:
|
|
await asyncio.sleep(self._poll_interval)
|
|
continue
|
|
|
|
current_files = set()
|
|
for fname in self.local.list_json_files("pending"):
|
|
rid = fname.replace(".json", "")
|
|
current_files.add(rid)
|
|
|
|
# mtime pre-check: skip read+hash if file hasn't been modified
|
|
try:
|
|
fpath = self.local.bridge_dir / "pending" / fname
|
|
current_mtime = fpath.stat().st_mtime
|
|
except OSError:
|
|
continue
|
|
prev_mtime = self._pending_mtimes.get(rid)
|
|
if prev_mtime is not None and current_mtime == prev_mtime:
|
|
continue # File untouched since last check — skip read+hash
|
|
self._pending_mtimes[rid] = current_mtime
|
|
|
|
data = self.local.read_json("pending", fname)
|
|
if data is None:
|
|
continue
|
|
|
|
# Compute content hash to detect changes
|
|
content_hash = hashlib.md5(
|
|
json.dumps(data, sort_keys=True).encode()
|
|
).hexdigest()
|
|
|
|
prev_hash = self._pending_hashes.get(rid)
|
|
if prev_hash == content_hash:
|
|
continue # No change
|
|
|
|
is_new = rid not in self._forwarded_pending
|
|
if rid in self._startup_pending:
|
|
# Startup files: only forward status CHANGES (not re-forward as new pending)
|
|
status = data.get("status", "pending")
|
|
if status == "pending":
|
|
continue # Still pending from before startup — skip
|
|
# Status changed (auto_resolved/expired) — forward the update
|
|
|
|
# Forward to Gateway (new or updated)
|
|
await self.remote.awrite_json("pending", fname, data)
|
|
self._forwarded_pending.add(rid)
|
|
self._forwarded_timestamps[rid] = time.time()
|
|
self._pending_hashes[rid] = content_hash
|
|
|
|
if is_new:
|
|
logger.info(f"[COLLECTOR] → Gateway: pending {rid[:12]}")
|
|
else:
|
|
status = data.get("status", "?")
|
|
logger.info(f"[COLLECTOR] → Gateway: pending UPDATE {rid[:12]} status={status}")
|
|
|
|
# Clean up tracking for deleted files
|
|
for rid in list(self._forwarded_pending):
|
|
if rid not in current_files and rid not in self._startup_pending:
|
|
self._forwarded_pending.discard(rid)
|
|
self._pending_hashes.pop(rid, None)
|
|
self._pending_mtimes.pop(rid, None)
|
|
# Also clean up orphaned hashes/mtimes for files no longer on disk
|
|
for rid in list(self._pending_hashes):
|
|
if rid not in current_files:
|
|
self._pending_hashes.pop(rid, None)
|
|
self._pending_mtimes.pop(rid, None)
|
|
|
|
except Exception as e:
|
|
logger.error(f"[COLLECTOR] forward_pending error: {e}")
|
|
|
|
await asyncio.sleep(self._poll_interval)
|
|
|
|
# ─── Poll Gateway responses → local ───
|
|
|
|
async def _poll_responses_loop(self):
|
|
"""Poll Gateway for responses and write them locally for Extension.
|
|
|
|
Only polls responses for recently-forwarded pending (within _RESPONSE_POLL_TTL).
|
|
Expired entries are removed from tracking to prevent request accumulation.
|
|
"""
|
|
while self._running:
|
|
try:
|
|
# Skip cycle if rate-limited
|
|
if self.remote.is_rate_limited:
|
|
await asyncio.sleep(self._poll_interval)
|
|
continue
|
|
|
|
now = time.time()
|
|
# Clean up expired forwarded pending (stop polling responses for old ones)
|
|
expired = [
|
|
rid for rid, ts in self._forwarded_timestamps.items()
|
|
if now - ts > self._RESPONSE_POLL_TTL
|
|
]
|
|
for rid in expired:
|
|
self._forwarded_pending.discard(rid)
|
|
self._forwarded_timestamps.pop(rid, None)
|
|
# NOTE: intentionally keep _pending_hashes[rid] to prevent
|
|
# re-forward cycle (expired pending would be re-detected as
|
|
# "new" if hash is cleared). Hash is cleaned up when file
|
|
# is actually deleted from disk (see _forward_pending_loop).
|
|
if expired:
|
|
logger.info(f"[COLLECTOR] expired {len(expired)} stale forwarded pending (>{self._RESPONSE_POLL_TTL}s)")
|
|
|
|
# Check each active forwarded pending for a response
|
|
active_rids = [
|
|
rid for rid in self._forwarded_pending
|
|
if rid not in self._startup_pending
|
|
]
|
|
for rid in active_rids:
|
|
# Rate-limit guard: stop polling if we got rate-limited mid-cycle
|
|
if self.remote.is_rate_limited:
|
|
break
|
|
data = await self.remote.aread_json("response", f"{rid}.json")
|
|
if data is None or data.get("waiting"):
|
|
await asyncio.sleep(0.3) # Throttle between individual response polls
|
|
continue
|
|
|
|
# Write response locally for Extension to pick up
|
|
self.local.write_json("response", f"{rid}.json", data)
|
|
# Also delete local pending file (Extension expects this)
|
|
self.local.delete_file("pending", f"{rid}.json")
|
|
self._forwarded_pending.discard(rid)
|
|
self._forwarded_timestamps.pop(rid, None)
|
|
approved = data.get("approved", "?")
|
|
logger.info(f"[COLLECTOR] ← Gateway: response {rid[:12]} approved={approved}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"[COLLECTOR] poll_responses error: {e}")
|
|
|
|
await asyncio.sleep(self._poll_interval)
|
|
|
|
# ─── Poll Gateway commands → local ───
|
|
|
|
def _discover_local_projects(self) -> set[str]:
|
|
"""Discover all project names registered by local Extension instances.
|
|
|
|
Reads bridge/register/*.json files, which are written by each AG window's
|
|
Extension with {conversation_id, project_name}. Returns unique project names
|
|
found, always including self.project_name as a fallback.
|
|
|
|
Results are cached for _PROJECTS_CACHE_TTL seconds to avoid re-reading
|
|
22+ register files every polling cycle.
|
|
"""
|
|
now = time.time()
|
|
if self._cached_projects is not None and now - self._projects_cache_ts < self._PROJECTS_CACHE_TTL:
|
|
return self._cached_projects
|
|
|
|
projects = {self.project_name}
|
|
register_dir = self.local.bridge_dir / "register"
|
|
if not register_dir.exists():
|
|
self._cached_projects = projects
|
|
self._projects_cache_ts = now
|
|
return projects
|
|
for f in register_dir.glob("*.json"):
|
|
try:
|
|
data = json.loads(f.read_text(encoding="utf-8-sig"))
|
|
p = data.get("project_name", "")
|
|
if p:
|
|
projects.add(p)
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|
|
self._cached_projects = projects
|
|
self._projects_cache_ts = now
|
|
return projects
|
|
|
|
async def _poll_commands_loop(self):
|
|
"""Poll Gateway for commands with adaptive per-project intervals.
|
|
|
|
When a project returns empty commands repeatedly, its poll interval
|
|
increases (3s → 10s → 30s → 60s). On receiving a command, interval
|
|
resets to base. This prevents idle projects from wasting requests.
|
|
"""
|
|
# Per-project adaptive state
|
|
project_intervals: dict[str, float] = {} # project → current interval
|
|
project_last_poll: dict[str, float] = {} # project → last poll timestamp
|
|
_BASE_INTERVAL = 3.0
|
|
_IDLE_STEPS = [10.0, 30.0, 60.0] # progressive idle intervals
|
|
project_empty_streak: dict[str, int] = {} # project → consecutive empty polls
|
|
|
|
while self._running:
|
|
try:
|
|
# Skip cycle if rate-limited
|
|
if not self.remote.is_rate_limited:
|
|
projects = self._discover_local_projects()
|
|
now = time.time()
|
|
for project in projects:
|
|
if self.remote.is_rate_limited:
|
|
break
|
|
|
|
# Check if this project's interval has elapsed
|
|
interval = project_intervals.get(project, _BASE_INTERVAL)
|
|
last = project_last_poll.get(project, 0)
|
|
if now - last < interval:
|
|
continue # Not time yet for this project
|
|
|
|
project_last_poll[project] = now
|
|
commands = await self.remote.apoll_commands(project)
|
|
|
|
if commands:
|
|
# Got commands → reset to base interval
|
|
project_intervals[project] = _BASE_INTERVAL
|
|
project_empty_streak[project] = 0
|
|
for cmd in commands:
|
|
cmd_id = cmd.get("id", str(int(time.time() * 1000)))
|
|
fname = f"{cmd_id}.json"
|
|
self.local.write_json("commands", fname, cmd)
|
|
logger.info(f"[COLLECTOR] ← Gateway: command [{project}] {cmd.get('text', '?')[:30]}")
|
|
else:
|
|
# Empty → increase interval progressively
|
|
streak = project_empty_streak.get(project, 0) + 1
|
|
project_empty_streak[project] = streak
|
|
if streak <= len(_IDLE_STEPS):
|
|
project_intervals[project] = _IDLE_STEPS[streak - 1]
|
|
# else stays at max (60s)
|
|
|
|
await asyncio.sleep(0.3) # Throttle between projects
|
|
|
|
except Exception as e:
|
|
logger.error(f"[COLLECTOR] poll_commands error: {e}")
|
|
|
|
await asyncio.sleep(self._poll_interval)
|
|
|
|
# ─── Forward chat snapshots → Gateway ───
|
|
|
|
async def _forward_chat_snapshots_loop(self):
|
|
"""Forward chat_snapshots/ from Extension to Gateway."""
|
|
while self._running:
|
|
try:
|
|
snap_dir = self.local.bridge_dir / "chat_snapshots"
|
|
if snap_dir.exists():
|
|
for f in snap_dir.glob("*.json"):
|
|
try:
|
|
data = json.loads(f.read_text(encoding="utf-8-sig"))
|
|
project = data.get("project_name", self.project_name)
|
|
content = data.get("content", "")
|
|
attached_files = data.get("attached_files", [])
|
|
if content or attached_files:
|
|
await self.remote.asend_chat(project, content, attached_files=attached_files)
|
|
af_info = f" +{len(attached_files)} files" if attached_files else ""
|
|
logger.info(f"[COLLECTOR] → Gateway: chat snapshot len={len(content)}{af_info}")
|
|
f.unlink() # Cleanup after forwarding
|
|
except (json.JSONDecodeError, OSError) as e:
|
|
logger.warning(f"[COLLECTOR] bad chat snapshot {f.name}: {e}")
|
|
except Exception as e:
|
|
logger.error(f"[COLLECTOR] forward_chat_snapshots error: {e}")
|
|
|
|
await asyncio.sleep(10) # Chat snapshots: less urgent, 10s interval
|
|
|
|
# ─── Forward session registrations → Gateway ───
|
|
|
|
async def _forward_registrations_loop(self):
|
|
"""Forward register/ files from Extension to Gateway."""
|
|
forwarded_regs: set[str] = set()
|
|
while self._running:
|
|
try:
|
|
register_dir = self.local.bridge_dir / "register"
|
|
if register_dir.exists():
|
|
for f in register_dir.glob("*.json"):
|
|
if f.name in forwarded_regs:
|
|
continue
|
|
try:
|
|
data = json.loads(f.read_text(encoding="utf-8-sig"))
|
|
conv_id = data.get("conversation_id", "")
|
|
project = data.get("project_name", "")
|
|
if conv_id and project:
|
|
await self.remote.aregister_session(conv_id, project)
|
|
forwarded_regs.add(f.name)
|
|
logger.info(f"[COLLECTOR] → Gateway: register {conv_id[:8]} → {project}")
|
|
await asyncio.sleep(0.3) # Spread startup burst
|
|
except (json.JSONDecodeError, OSError) as e:
|
|
logger.warning(f"[COLLECTOR] bad register {f.name}: {e}")
|
|
except Exception as e:
|
|
logger.error(f"[COLLECTOR] forward_registrations error: {e}")
|
|
|
|
await asyncio.sleep(30) # Registration changes rarely — 30s interval
|
|
# ─── Forward brain events → Gateway ───
|
|
|
|
async def _forward_events_loop(self):
|
|
"""Read BrainEvents from Watcher queue and POST to Gateway."""
|
|
while self._running:
|
|
try:
|
|
event: BrainEvent = await asyncio.wait_for(
|
|
self.event_queue.get(), timeout=5.0
|
|
)
|
|
# Serialize event to JSON
|
|
event_data = {
|
|
"event_type": event.event_type.value,
|
|
"conversation_id": event.conversation_id,
|
|
"file_name": event.file_name,
|
|
"file_path": str(event.file_path) if event.file_path else "",
|
|
"content": event.content,
|
|
"timestamp": event.timestamp,
|
|
}
|
|
await self.remote.asend_event(event_data)
|
|
logger.info(f"[COLLECTOR] → Gateway: event {event.event_type.value} {event.file_name}")
|
|
except asyncio.TimeoutError:
|
|
continue
|
|
except Exception as e:
|
|
logger.error(f"[COLLECTOR] forward_event error: {e}")
|
|
|
|
# ─── Health check ───
|
|
|
|
async def _health_check_loop(self):
|
|
"""Periodically check Gateway connectivity."""
|
|
while self._running:
|
|
try:
|
|
ok = await self.remote.health_check()
|
|
if not ok and self.remote.connected:
|
|
logger.warning("[COLLECTOR] ❌ Gateway health check failed")
|
|
except Exception:
|
|
pass
|
|
await asyncio.sleep(30)
|
|
|
|
# ─── Retry flush ───
|
|
|
|
async def _retry_flush_loop(self):
|
|
"""Periodically flush failed request retry queue."""
|
|
while self._running:
|
|
try:
|
|
await self.remote.flush_retry_queue()
|
|
except Exception as e:
|
|
logger.error(f"[COLLECTOR] retry flush error: {e}")
|
|
await asyncio.sleep(30) # Retry flush: 30s interval (was 10s)
|