feat(collector): RemoteTransport + CollectorBridge 구현 — Collector↔Gateway HTTP 통신 완성

- bridge.py RemoteTransport: HTTP 클라이언트, API Key auth, Gateway API 매핑
- collector.py CollectorBridge: 3개 async loop (pending 전달, response 폴링, commands 폴링)
- main.py: BOT_MODE=remote → CollectorBridge 실행 (Discord bot 없이)
- config.py: GATEWAY_API_KEY 설정
- .env.example: 모든 설정 항목 업데이트
This commit is contained in:
Variet Worker
2026-03-11 20:10:45 +09:00
parent 95da3e9307
commit 95c2905e14
4 changed files with 256 additions and 21 deletions

View File

@@ -16,3 +16,13 @@ ACTIVE_TIMEOUT_SECONDS=300
# Watcher Settings # Watcher Settings
DEBOUNCE_SECONDS=2 DEBOUNCE_SECONDS=2
# Bot mode: 'local' (default, file-based) or 'gateway' (서버 Docker)
BOT_MODE=local
# Remote bridge URL (only used when BOT_MODE=remote)
REMOTE_BRIDGE_URL=
# Gateway API Key (보안)
# 서버와 Collector에 동일한 키를 설정하세요
# 생성: python -c "import secrets; print(secrets.token_urlsafe(32))"
GATEWAY_API_KEY=

View File

@@ -150,30 +150,107 @@ class LocalTransport(BridgeTransport):
class RemoteTransport(BridgeTransport): class RemoteTransport(BridgeTransport):
"""HTTP-based transport for remote/multi-PC mode (skeleton). """HTTP-based transport for Collector → Gateway communication.
Future implementation: polls a remote bridge HTTP server that Maps BridgeTransport methods to Gateway API endpoints:
exposes the same pending/response/commands JSON files via API. list_json_files("pending") → GET /api/pending (returns list)
write_json("pending", ...) → POST /api/pending
read_json("response", ...) → GET /api/response/{rid}
write_json("commands", ...) → (not used by Collector, Gateway pushes commands)
etc.
""" """
def __init__(self, base_url: str): def __init__(self, base_url: str, api_key: str = ""):
self.base_url = base_url.rstrip("/") self.base_url = base_url.rstrip("/")
logger.info(f"RemoteTransport: initialized with {self.base_url}") self.api_key = api_key
self._headers = {"Content-Type": "application/json"}
if api_key:
self._headers["Authorization"] = f"Bearer {api_key}"
logger.info(f"RemoteTransport: {self.base_url} (auth={'yes' if api_key else 'no'})")
def _request(self, method: str, path: str, data: dict | None = None) -> dict | None:
"""Make HTTP request to Gateway API."""
import urllib.request
import urllib.error
url = f"{self.base_url}{path}"
body = json.dumps(data, ensure_ascii=False).encode("utf-8") if data else None
req = urllib.request.Request(url, data=body, headers=self._headers, method=method)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as e:
logger.warning(f"RemoteTransport: {method} {path}{e.code} {e.reason}")
return None
except (urllib.error.URLError, OSError, json.JSONDecodeError) as e:
logger.warning(f"RemoteTransport: {method} {path}{e}")
return None
def list_json_files(self, subdir: str) -> list[str]: def list_json_files(self, subdir: str) -> list[str]:
raise NotImplementedError("RemoteTransport not yet implemented") """List pending requests from Gateway."""
if subdir == "pending":
result = self._request("GET", "/api/pending")
if result and isinstance(result, list):
return [f"{r['request_id']}.json" for r in result]
elif subdir == "commands":
# Commands are polled per-project (handled separately)
return []
return []
def read_json(self, subdir: str, filename: str) -> dict | None: def read_json(self, subdir: str, filename: str) -> dict | None:
raise NotImplementedError("RemoteTransport not yet implemented") """Read a JSON file from Gateway."""
rid = filename.replace(".json", "")
if subdir == "response":
return self._request("GET", f"/api/response/{rid}")
elif subdir == "pending":
# Pending data comes from list, not individual read
result = self._request("GET", "/api/pending")
if result and isinstance(result, list):
for r in result:
if r.get("request_id") == rid:
return r
return None
def write_json(self, subdir: str, filename: str, data: dict) -> None: def write_json(self, subdir: str, filename: str, data: dict) -> None:
raise NotImplementedError("RemoteTransport not yet implemented") """Write data to Gateway via API."""
if subdir == "pending":
self._request("POST", "/api/pending", data)
elif subdir == "response":
rid = data.get("request_id", filename.replace(".json", ""))
self._request("POST", f"/api/response/{rid}", data)
elif subdir == "commands":
# Commands go through write_command in BridgeProtocol
self._request("POST", "/api/chat", data)
def delete_file(self, subdir: str, filename: str) -> bool: def delete_file(self, subdir: str, filename: str) -> bool:
raise NotImplementedError("RemoteTransport not yet implemented") """Delete not needed for remote — Gateway manages cleanup."""
return True
def ensure_dirs(self) -> None: def ensure_dirs(self) -> None:
pass # Remote server manages its own directories """No local dirs needed for remote transport."""
pass
def poll_commands(self, project: str) -> list[dict]:
"""Poll Gateway for commands (Collector-specific, not in ABC)."""
result = self._request("GET", f"/api/commands/{project}")
if result and isinstance(result, dict):
return result.get("commands", [])
return []
def register_session(self, conversation_id: str, project_name: str) -> None:
"""Register session → project mapping on Gateway."""
self._request("POST", "/api/register", {
"conversation_id": conversation_id,
"project_name": project_name,
})
def send_chat(self, project_name: str, content: str) -> None:
"""Push chat snapshot to Gateway for relay to Discord."""
self._request("POST", "/api/chat", {
"project_name": project_name,
"content": content,
})
# ─── Bridge Protocol (uses Transport) ─── # ─── Bridge Protocol (uses Transport) ───

127
collector.py Normal file
View File

@@ -0,0 +1,127 @@
"""Collector — local relay between Extension (file-based) and Gateway (HTTP).
The Collector runs on the local PC alongside the AG IDE.
It bridges the gap between the Extension (which writes to local bridge/ files)
and the remote Gateway (which manages Discord).
Flow:
Extension → bridge/pending/ → Collector → POST Gateway /api/pending
Gateway /api/response/{rid} → Collector → bridge/response/ → Extension
Gateway /api/commands/{project} → Collector → bridge/commands/ → Extension
"""
import asyncio
import json
import time
import logging
from pathlib import Path
from bridge import LocalTransport, RemoteTransport
from config import Config
logger = logging.getLogger(__name__)
class CollectorBridge:
"""Bridges local file-based bridge with remote Gateway API.
Periodically:
1. Scans local pending/ → forwards new ones to Gateway
2. Polls Gateway for responses → writes to local response/
3. Polls Gateway for commands → writes to local commands/
"""
def __init__(self, local: LocalTransport, remote: RemoteTransport, project_name: str):
self.local = local
self.remote = remote
self.project_name = project_name
self._forwarded_pending: set[str] = set() # already forwarded request IDs
self._poll_interval = 3 # seconds
self._running = False
async def start(self):
"""Start the Collector polling loops."""
self._running = True
logger.info(f"[COLLECTOR] started for project={self.project_name}")
await asyncio.gather(
self._forward_pending_loop(),
self._poll_responses_loop(),
self._poll_commands_loop(),
)
async def stop(self):
"""Stop the Collector."""
self._running = False
logger.info("[COLLECTOR] stopped")
# ─── Forward local pending → Gateway ───
async def _forward_pending_loop(self):
"""Scan local pending/ and forward new requests to Gateway."""
while self._running:
try:
for fname in self.local.list_json_files("pending"):
rid = fname.replace(".json", "")
if rid in self._forwarded_pending:
continue
data = self.local.read_json("pending", fname)
if data is None or data.get("status") != "pending":
continue
# Forward to Gateway
self.remote.write_json("pending", fname, data)
self._forwarded_pending.add(rid)
logger.info(f"[COLLECTOR] → Gateway: pending {rid[:12]}")
# Clean up stale forwarded tracking (keep last 200)
if len(self._forwarded_pending) > 200:
self._forwarded_pending = set(list(self._forwarded_pending)[-100:])
except Exception as e:
logger.error(f"[COLLECTOR] forward_pending error: {e}")
await asyncio.sleep(self._poll_interval)
# ─── Poll Gateway responses → local ───
async def _poll_responses_loop(self):
"""Poll Gateway for responses and write them locally for Extension."""
while self._running:
try:
# Check each forwarded pending for a response
for rid in list(self._forwarded_pending):
data = self.remote.read_json("response", f"{rid}.json")
if data is None or data.get("waiting"):
continue
# Write response locally for Extension to pick up
self.local.write_json("response", f"{rid}.json", data)
# Also delete local pending file (Extension expects this)
self.local.delete_file("pending", f"{rid}.json")
self._forwarded_pending.discard(rid)
approved = data.get("approved", "?")
logger.info(f"[COLLECTOR] ← Gateway: response {rid[:12]} approved={approved}")
except Exception as e:
logger.error(f"[COLLECTOR] poll_responses error: {e}")
await asyncio.sleep(self._poll_interval)
# ─── Poll Gateway commands → local ───
async def _poll_commands_loop(self):
"""Poll Gateway for commands and write them locally for Extension."""
while self._running:
try:
commands = self.remote.poll_commands(self.project_name)
for cmd in commands:
cmd_id = cmd.get("id", str(int(time.time() * 1000)))
fname = f"{cmd_id}.json"
self.local.write_json("commands", fname, cmd)
logger.info(f"[COLLECTOR] ← Gateway: command {cmd.get('text', '?')[:30]}")
except Exception as e:
logger.error(f"[COLLECTOR] poll_commands error: {e}")
await asyncio.sleep(self._poll_interval)

43
main.py
View File

@@ -51,15 +51,41 @@ async def main():
# Get the running loop # Get the running loop
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
# Create transport based on BOT_MODE # ── Collector mode: no Discord bot, just relay local ↔ Gateway ──
transport = None # None → LocalTransport (default)
if Config.BOT_MODE == "remote": if Config.BOT_MODE == "remote":
from bridge import RemoteTransport from bridge import LocalTransport, RemoteTransport
from collector import CollectorBridge
if not Config.REMOTE_BRIDGE_URL: if not Config.REMOTE_BRIDGE_URL:
logger.error("REMOTE_BRIDGE_URL is required for remote mode") logger.error("REMOTE_BRIDGE_URL is required for remote (Collector) mode")
sys.exit(1) sys.exit(1)
transport = RemoteTransport(Config.REMOTE_BRIDGE_URL)
logger.info(f"Remote transport: {Config.REMOTE_BRIDGE_URL}") bridge_dir = Config.BRAIN_PATH.parent / "bridge"
local = LocalTransport(bridge_dir)
local.ensure_dirs()
remote = RemoteTransport(Config.REMOTE_BRIDGE_URL, api_key=Config.GATEWAY_API_KEY)
collector = CollectorBridge(local, remote, project_name=Config.PROJECT_NAME)
logger.info(f"Collector mode: {Config.REMOTE_BRIDGE_URL}")
# Optionally start watcher for brain events (local display only)
watcher = BrainWatcher(event_queue, loop)
try:
watcher.start()
logger.info(f"Watcher started, {len(watcher.known_sessions)} existing sessions")
await collector.start()
except KeyboardInterrupt:
logger.info("Received keyboard interrupt")
except Exception as e:
logger.error(f"Fatal error: {e}", exc_info=True)
finally:
await collector.stop()
watcher.stop()
logger.info("Collector shutdown complete")
return
# ── Local / Gateway mode ──
# Create components # Create components
watcher = None watcher = None
@@ -67,11 +93,6 @@ async def main():
watcher = BrainWatcher(event_queue, loop) watcher = BrainWatcher(event_queue, loop)
bot = GravityBot(event_queue) bot = GravityBot(event_queue)
# Inject transport if specified (otherwise bot uses default LocalTransport)
if transport is not None:
from bridge import BridgeProtocol
bot.bridge = BridgeProtocol(transport)
try: try:
# Start watcher (local mode only — gateway receives data via HTTP) # Start watcher (local mode only — gateway receives data via HTTP)
if watcher: if watcher: