- collector.py: _forward_events_loop — BrainEvent를 JSON으로 serialize하여 /api/event POST - gateway.py: /api/event 엔드포인트 — 수신한 이벤트를 bot event_queue에 주입 - main.py: event_queue를 CollectorBridge에 전달 이제 task.md, implementation_plan, walkthrough 변경사항이 Collector→Gateway→Discord 경로로 전달됨
159 lines
6.3 KiB
Python
159 lines
6.3 KiB
Python
"""Collector — local relay between Extension (file-based) and Gateway (HTTP).
|
|
|
|
The Collector runs on the local PC alongside the AG IDE.
|
|
It bridges the gap between the Extension (which writes to local bridge/ files)
|
|
and the remote Gateway (which manages Discord).
|
|
|
|
Flow:
|
|
Extension → bridge/pending/ → Collector → POST Gateway /api/pending
|
|
Gateway /api/response/{rid} → Collector → bridge/response/ → Extension
|
|
Gateway /api/commands/{project} → Collector → bridge/commands/ → Extension
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import time
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
from bridge import LocalTransport, RemoteTransport
|
|
from config import Config
|
|
from watcher import BrainEvent, EventType
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CollectorBridge:
|
|
"""Bridges local file-based bridge with remote Gateway API.
|
|
|
|
Periodically:
|
|
1. Scans local pending/ → forwards new ones to Gateway
|
|
2. Polls Gateway for responses → writes to local response/
|
|
3. Polls Gateway for commands → writes to local commands/
|
|
"""
|
|
|
|
def __init__(self, local: LocalTransport, remote: RemoteTransport,
|
|
project_name: str, event_queue: asyncio.Queue | None = None):
|
|
self.local = local
|
|
self.remote = remote
|
|
self.project_name = project_name
|
|
self.event_queue = event_queue
|
|
self._forwarded_pending: set[str] = set() # already forwarded request IDs
|
|
self._poll_interval = 3 # seconds
|
|
self._running = False
|
|
|
|
async def start(self):
|
|
"""Start the Collector polling loops."""
|
|
self._running = True
|
|
logger.info(f"[COLLECTOR] started for project={self.project_name}")
|
|
tasks = [
|
|
self._forward_pending_loop(),
|
|
self._poll_responses_loop(),
|
|
self._poll_commands_loop(),
|
|
]
|
|
if self.event_queue:
|
|
tasks.append(self._forward_events_loop())
|
|
await asyncio.gather(*tasks)
|
|
|
|
async def stop(self):
|
|
"""Stop the Collector."""
|
|
self._running = False
|
|
logger.info("[COLLECTOR] stopped")
|
|
|
|
# ─── Forward local pending → Gateway ───
|
|
|
|
async def _forward_pending_loop(self):
|
|
"""Scan local pending/ and forward new requests to Gateway."""
|
|
while self._running:
|
|
try:
|
|
for fname in self.local.list_json_files("pending"):
|
|
rid = fname.replace(".json", "")
|
|
if rid in self._forwarded_pending:
|
|
continue
|
|
|
|
data = self.local.read_json("pending", fname)
|
|
if data is None or data.get("status") != "pending":
|
|
continue
|
|
|
|
# Forward to Gateway
|
|
self.remote.write_json("pending", fname, data)
|
|
self._forwarded_pending.add(rid)
|
|
logger.info(f"[COLLECTOR] → Gateway: pending {rid[:12]}")
|
|
|
|
# Clean up stale forwarded tracking (keep last 200)
|
|
if len(self._forwarded_pending) > 200:
|
|
self._forwarded_pending = set(list(self._forwarded_pending)[-100:])
|
|
|
|
except Exception as e:
|
|
logger.error(f"[COLLECTOR] forward_pending error: {e}")
|
|
|
|
await asyncio.sleep(self._poll_interval)
|
|
|
|
# ─── Poll Gateway responses → local ───
|
|
|
|
async def _poll_responses_loop(self):
|
|
"""Poll Gateway for responses and write them locally for Extension."""
|
|
while self._running:
|
|
try:
|
|
# Check each forwarded pending for a response
|
|
for rid in list(self._forwarded_pending):
|
|
data = self.remote.read_json("response", f"{rid}.json")
|
|
if data is None or data.get("waiting"):
|
|
continue
|
|
|
|
# Write response locally for Extension to pick up
|
|
self.local.write_json("response", f"{rid}.json", data)
|
|
# Also delete local pending file (Extension expects this)
|
|
self.local.delete_file("pending", f"{rid}.json")
|
|
self._forwarded_pending.discard(rid)
|
|
approved = data.get("approved", "?")
|
|
logger.info(f"[COLLECTOR] ← Gateway: response {rid[:12]} approved={approved}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"[COLLECTOR] poll_responses error: {e}")
|
|
|
|
await asyncio.sleep(self._poll_interval)
|
|
|
|
# ─── Poll Gateway commands → local ───
|
|
|
|
async def _poll_commands_loop(self):
|
|
"""Poll Gateway for commands and write them locally for Extension."""
|
|
while self._running:
|
|
try:
|
|
commands = self.remote.poll_commands(self.project_name)
|
|
for cmd in commands:
|
|
cmd_id = cmd.get("id", str(int(time.time() * 1000)))
|
|
fname = f"{cmd_id}.json"
|
|
self.local.write_json("commands", fname, cmd)
|
|
logger.info(f"[COLLECTOR] ← Gateway: command {cmd.get('text', '?')[:30]}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"[COLLECTOR] poll_commands error: {e}")
|
|
|
|
await asyncio.sleep(self._poll_interval)
|
|
|
|
# ─── Forward brain events → Gateway ───
|
|
|
|
async def _forward_events_loop(self):
|
|
"""Read BrainEvents from Watcher queue and POST to Gateway."""
|
|
while self._running:
|
|
try:
|
|
event: BrainEvent = await asyncio.wait_for(
|
|
self.event_queue.get(), timeout=5.0
|
|
)
|
|
# Serialize event to JSON
|
|
event_data = {
|
|
"event_type": event.event_type.value,
|
|
"conversation_id": event.conversation_id,
|
|
"file_name": event.file_name,
|
|
"file_path": str(event.file_path) if event.file_path else "",
|
|
"content": event.content,
|
|
"timestamp": event.timestamp,
|
|
}
|
|
self.remote._request("POST", "/api/event", event_data)
|
|
logger.info(f"[COLLECTOR] → Gateway: event {event.event_type.value} {event.file_name}")
|
|
except asyncio.TimeoutError:
|
|
continue
|
|
except Exception as e:
|
|
logger.error(f"[COLLECTOR] forward_event error: {e}")
|