334 lines
14 KiB
Python
334 lines
14 KiB
Python
"""Collector — local relay between Extension (file-based) and Gateway (HTTP).
|
|
|
|
The Collector runs on the local PC alongside the AG IDE.
|
|
It bridges the gap between the Extension (which writes to local bridge/ files)
|
|
and the remote Gateway (which manages Discord).
|
|
|
|
Flow:
|
|
Extension → bridge/pending/ → Collector → POST Gateway /api/pending
|
|
Gateway /api/response/{rid} → Collector → bridge/response/ → Extension
|
|
Gateway /api/commands/{project} → Collector → bridge/commands/ → Extension
|
|
"""
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import json
|
|
import time
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
from bridge import LocalTransport, RemoteTransport
|
|
from config import Config
|
|
from watcher import BrainEvent, EventType
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CollectorBridge:
|
|
"""Bridges local file-based bridge with remote Gateway API.
|
|
|
|
Periodically:
|
|
1. Scans local pending/ → forwards new ones to Gateway
|
|
2. Polls Gateway for responses → writes to local response/
|
|
3. Polls Gateway for commands → writes to local commands/
|
|
"""
|
|
|
|
def __init__(self, local: LocalTransport, remote: RemoteTransport,
|
|
project_name: str, event_queue: asyncio.Queue | None = None):
|
|
self.local = local
|
|
self.remote = remote
|
|
self.project_name = project_name
|
|
self.event_queue = event_queue
|
|
self._poll_interval = 3 # seconds
|
|
self._running = False
|
|
|
|
# Pre-populate with existing pending files → skip on startup (prevents 만료됨 spam)
|
|
self._startup_pending: set[str] = set()
|
|
self._forwarded_pending: set[str] = set()
|
|
self._pending_hashes: dict[str, str] = {} # rid → content hash (for MERGE/status detection)
|
|
for fname in self.local.list_json_files("pending"):
|
|
rid = fname.replace(".json", "")
|
|
self._startup_pending.add(rid)
|
|
self._forwarded_pending.add(rid)
|
|
# Pre-hash existing files
|
|
data = self.local.read_json("pending", fname)
|
|
if data:
|
|
self._pending_hashes[rid] = hashlib.md5(
|
|
json.dumps(data, sort_keys=True).encode()
|
|
).hexdigest()
|
|
if self._startup_pending:
|
|
logger.info(f"[COLLECTOR] skipping {len(self._startup_pending)} existing pending files")
|
|
|
|
async def start(self):
|
|
"""Start the Collector polling loops."""
|
|
self._running = True
|
|
logger.info(f"[COLLECTOR] started for project={self.project_name}")
|
|
tasks = [
|
|
self._forward_pending_loop(),
|
|
self._poll_responses_loop(),
|
|
self._poll_commands_loop(),
|
|
self._forward_chat_snapshots_loop(),
|
|
self._forward_registrations_loop(),
|
|
self._health_check_loop(),
|
|
self._retry_flush_loop(),
|
|
]
|
|
if self.event_queue:
|
|
tasks.append(self._forward_events_loop())
|
|
await asyncio.gather(*tasks)
|
|
|
|
async def stop(self):
|
|
"""Stop the Collector and close HTTP session."""
|
|
self._running = False
|
|
await self.remote.close()
|
|
logger.info("[COLLECTOR] stopped")
|
|
|
|
# ─── Forward local pending → Gateway ───
|
|
|
|
async def _forward_pending_loop(self):
|
|
"""Scan local pending/ and forward new + updated requests to Gateway.
|
|
|
|
Tracks content hashes to detect:
|
|
- New pending files → forward immediately
|
|
- MERGE updates (step_probe updates command text) → re-forward
|
|
- Status changes (auto_resolved, expired) → re-forward
|
|
"""
|
|
while self._running:
|
|
try:
|
|
# Skip cycle if rate-limited
|
|
if self.remote.is_rate_limited:
|
|
await asyncio.sleep(self._poll_interval)
|
|
continue
|
|
|
|
current_files = set()
|
|
for fname in self.local.list_json_files("pending"):
|
|
rid = fname.replace(".json", "")
|
|
current_files.add(rid)
|
|
|
|
data = self.local.read_json("pending", fname)
|
|
if data is None:
|
|
continue
|
|
|
|
# Compute content hash to detect changes
|
|
content_hash = hashlib.md5(
|
|
json.dumps(data, sort_keys=True).encode()
|
|
).hexdigest()
|
|
|
|
prev_hash = self._pending_hashes.get(rid)
|
|
if prev_hash == content_hash:
|
|
continue # No change
|
|
|
|
is_new = rid not in self._forwarded_pending
|
|
if rid in self._startup_pending:
|
|
# Startup files: only forward status CHANGES (not re-forward as new pending)
|
|
status = data.get("status", "pending")
|
|
if status == "pending":
|
|
continue # Still pending from before startup — skip
|
|
# Status changed (auto_resolved/expired) — forward the update
|
|
|
|
# Forward to Gateway (new or updated)
|
|
await self.remote.awrite_json("pending", fname, data)
|
|
self._forwarded_pending.add(rid)
|
|
self._pending_hashes[rid] = content_hash
|
|
|
|
if is_new:
|
|
logger.info(f"[COLLECTOR] → Gateway: pending {rid[:12]}")
|
|
else:
|
|
status = data.get("status", "?")
|
|
logger.info(f"[COLLECTOR] → Gateway: pending UPDATE {rid[:12]} status={status}")
|
|
|
|
# Clean up tracking for deleted files
|
|
for rid in list(self._forwarded_pending):
|
|
if rid not in current_files and rid not in self._startup_pending:
|
|
self._forwarded_pending.discard(rid)
|
|
self._pending_hashes.pop(rid, None)
|
|
|
|
except Exception as e:
|
|
logger.error(f"[COLLECTOR] forward_pending error: {e}")
|
|
|
|
await asyncio.sleep(self._poll_interval)
|
|
|
|
# ─── Poll Gateway responses → local ───
|
|
|
|
async def _poll_responses_loop(self):
|
|
"""Poll Gateway for responses and write them locally for Extension."""
|
|
while self._running:
|
|
try:
|
|
# Skip cycle if rate-limited
|
|
if self.remote.is_rate_limited:
|
|
await asyncio.sleep(self._poll_interval)
|
|
continue
|
|
|
|
# Check each forwarded pending for a response
|
|
for rid in list(self._forwarded_pending):
|
|
if rid in self._startup_pending:
|
|
continue # Don't poll responses for pre-startup files
|
|
# Rate-limit guard: stop polling if we got rate-limited mid-cycle
|
|
if self.remote.is_rate_limited:
|
|
break
|
|
data = await self.remote.aread_json("response", f"{rid}.json")
|
|
if data is None or data.get("waiting"):
|
|
await asyncio.sleep(0.2) # Throttle between individual response polls
|
|
continue
|
|
|
|
# Write response locally for Extension to pick up
|
|
self.local.write_json("response", f"{rid}.json", data)
|
|
# Also delete local pending file (Extension expects this)
|
|
self.local.delete_file("pending", f"{rid}.json")
|
|
self._forwarded_pending.discard(rid)
|
|
approved = data.get("approved", "?")
|
|
logger.info(f"[COLLECTOR] ← Gateway: response {rid[:12]} approved={approved}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"[COLLECTOR] poll_responses error: {e}")
|
|
|
|
await asyncio.sleep(self._poll_interval)
|
|
|
|
# ─── Poll Gateway commands → local ───
|
|
|
|
def _discover_local_projects(self) -> set[str]:
|
|
"""Discover all project names registered by local Extension instances.
|
|
|
|
Reads bridge/register/*.json files, which are written by each AG window's
|
|
Extension with {conversation_id, project_name}. Returns unique project names
|
|
found, always including self.project_name as a fallback.
|
|
"""
|
|
projects = {self.project_name}
|
|
register_dir = self.local.bridge_dir / "register"
|
|
if not register_dir.exists():
|
|
return projects
|
|
for f in register_dir.glob("*.json"):
|
|
try:
|
|
data = json.loads(f.read_text(encoding="utf-8-sig"))
|
|
p = data.get("project_name", "")
|
|
if p:
|
|
projects.add(p)
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|
|
return projects
|
|
|
|
async def _poll_commands_loop(self):
|
|
"""Poll Gateway for commands for ALL local projects.
|
|
|
|
Discovers projects from bridge/register/ (written by each AG Extension)
|
|
and polls commands for each. Extension-side filtering (project_name check)
|
|
ensures each AG window only processes its own commands.
|
|
"""
|
|
while self._running:
|
|
try:
|
|
# Skip cycle if rate-limited
|
|
if not self.remote.is_rate_limited:
|
|
projects = self._discover_local_projects()
|
|
for project in projects:
|
|
if self.remote.is_rate_limited:
|
|
break # Stop mid-cycle if rate-limited
|
|
commands = await self.remote.apoll_commands(project)
|
|
for cmd in commands:
|
|
cmd_id = cmd.get("id", str(int(time.time() * 1000)))
|
|
fname = f"{cmd_id}.json"
|
|
self.local.write_json("commands", fname, cmd)
|
|
logger.info(f"[COLLECTOR] ← Gateway: command [{project}] {cmd.get('text', '?')[:30]}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"[COLLECTOR] poll_commands error: {e}")
|
|
|
|
await asyncio.sleep(self._poll_interval)
|
|
|
|
# ─── Forward chat snapshots → Gateway ───
|
|
|
|
async def _forward_chat_snapshots_loop(self):
|
|
"""Forward chat_snapshots/ from Extension to Gateway."""
|
|
while self._running:
|
|
try:
|
|
snap_dir = self.local.bridge_dir / "chat_snapshots"
|
|
if snap_dir.exists():
|
|
for f in snap_dir.glob("*.json"):
|
|
try:
|
|
data = json.loads(f.read_text(encoding="utf-8-sig"))
|
|
project = data.get("project_name", self.project_name)
|
|
content = data.get("content", "")
|
|
if content:
|
|
await self.remote.asend_chat(project, content)
|
|
logger.info(f"[COLLECTOR] → Gateway: chat snapshot len={len(content)}")
|
|
f.unlink() # Cleanup after forwarding
|
|
except (json.JSONDecodeError, OSError) as e:
|
|
logger.warning(f"[COLLECTOR] bad chat snapshot {f.name}: {e}")
|
|
except Exception as e:
|
|
logger.error(f"[COLLECTOR] forward_chat_snapshots error: {e}")
|
|
|
|
await asyncio.sleep(self._poll_interval)
|
|
|
|
# ─── Forward session registrations → Gateway ───
|
|
|
|
async def _forward_registrations_loop(self):
|
|
"""Forward register/ files from Extension to Gateway."""
|
|
forwarded_regs: set[str] = set()
|
|
while self._running:
|
|
try:
|
|
register_dir = self.local.bridge_dir / "register"
|
|
if register_dir.exists():
|
|
for f in register_dir.glob("*.json"):
|
|
if f.name in forwarded_regs:
|
|
continue
|
|
try:
|
|
data = json.loads(f.read_text(encoding="utf-8-sig"))
|
|
conv_id = data.get("conversation_id", "")
|
|
project = data.get("project_name", "")
|
|
if conv_id and project:
|
|
await self.remote.aregister_session(conv_id, project)
|
|
forwarded_regs.add(f.name)
|
|
logger.info(f"[COLLECTOR] → Gateway: register {conv_id[:8]} → {project}")
|
|
except (json.JSONDecodeError, OSError) as e:
|
|
logger.warning(f"[COLLECTOR] bad register {f.name}: {e}")
|
|
except Exception as e:
|
|
logger.error(f"[COLLECTOR] forward_registrations error: {e}")
|
|
|
|
await asyncio.sleep(self._poll_interval * 3) # Less frequent
|
|
# ─── Forward brain events → Gateway ───
|
|
|
|
async def _forward_events_loop(self):
|
|
"""Read BrainEvents from Watcher queue and POST to Gateway."""
|
|
while self._running:
|
|
try:
|
|
event: BrainEvent = await asyncio.wait_for(
|
|
self.event_queue.get(), timeout=5.0
|
|
)
|
|
# Serialize event to JSON
|
|
event_data = {
|
|
"event_type": event.event_type.value,
|
|
"conversation_id": event.conversation_id,
|
|
"file_name": event.file_name,
|
|
"file_path": str(event.file_path) if event.file_path else "",
|
|
"content": event.content,
|
|
"timestamp": event.timestamp,
|
|
}
|
|
await self.remote.asend_event(event_data)
|
|
logger.info(f"[COLLECTOR] → Gateway: event {event.event_type.value} {event.file_name}")
|
|
except asyncio.TimeoutError:
|
|
continue
|
|
except Exception as e:
|
|
logger.error(f"[COLLECTOR] forward_event error: {e}")
|
|
|
|
# ─── Health check ───
|
|
|
|
async def _health_check_loop(self):
|
|
"""Periodically check Gateway connectivity."""
|
|
while self._running:
|
|
try:
|
|
ok = await self.remote.health_check()
|
|
if not ok and self.remote.connected:
|
|
logger.warning("[COLLECTOR] ❌ Gateway health check failed")
|
|
except Exception:
|
|
pass
|
|
await asyncio.sleep(30)
|
|
|
|
# ─── Retry flush ───
|
|
|
|
async def _retry_flush_loop(self):
|
|
"""Periodically flush failed request retry queue."""
|
|
while self._running:
|
|
try:
|
|
await self.remote.flush_retry_queue()
|
|
except Exception as e:
|
|
logger.error(f"[COLLECTOR] retry flush error: {e}")
|
|
await asyncio.sleep(10)
|