From 3d75825bba7668e70eb8b117435a11704cadbdaa Mon Sep 17 00:00:00 2001 From: Variet Worker Date: Wed, 11 Mar 2026 22:24:48 +0900 Subject: [PATCH] =?UTF-8?q?feat(collector):=20brain=20event=20=EC=A4=91?= =?UTF-8?q?=EA=B3=84=20=EC=B6=94=EA=B0=80=20=E2=80=94=20Watcher=20?= =?UTF-8?q?=EC=9D=B4=EB=B2=A4=ED=8A=B8=EB=A5=BC=20Gateway=EB=A1=9C=20?= =?UTF-8?q?=EC=A0=84=EB=8B=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - collector.py: _forward_events_loop — BrainEvent를 JSON으로 serialize하여 /api/event POST - gateway.py: /api/event 엔드포인트 — 수신한 이벤트를 bot event_queue에 주입 - main.py: event_queue를 CollectorBridge에 전달 이제 task.md, implementation_plan, walkthrough 변경사항이 Collector→Gateway→Discord 경로로 전달됨 --- collector.py | 37 ++++++++++++++++++++++++++++++++++--- gateway.py | 31 +++++++++++++++++++++++++++++++ main.py | 3 ++- 3 files changed, 67 insertions(+), 4 deletions(-) diff --git a/collector.py b/collector.py index e485b19..82f1e23 100644 --- a/collector.py +++ b/collector.py @@ -18,6 +18,7 @@ from pathlib import Path from bridge import LocalTransport, RemoteTransport from config import Config +from watcher import BrainEvent, EventType logger = logging.getLogger(__name__) @@ -31,10 +32,12 @@ class CollectorBridge: 3. Polls Gateway for commands → writes to local commands/ """ - def __init__(self, local: LocalTransport, remote: RemoteTransport, project_name: str): + def __init__(self, local: LocalTransport, remote: RemoteTransport, + project_name: str, event_queue: asyncio.Queue | None = None): self.local = local self.remote = remote self.project_name = project_name + self.event_queue = event_queue self._forwarded_pending: set[str] = set() # already forwarded request IDs self._poll_interval = 3 # seconds self._running = False @@ -43,11 +46,14 @@ class CollectorBridge: """Start the Collector polling loops.""" self._running = True logger.info(f"[COLLECTOR] started for project={self.project_name}") - await asyncio.gather( + tasks = [ self._forward_pending_loop(), self._poll_responses_loop(), self._poll_commands_loop(), - ) + ] + if self.event_queue: + tasks.append(self._forward_events_loop()) + await asyncio.gather(*tasks) async def stop(self): """Stop the Collector.""" @@ -125,3 +131,28 @@ class CollectorBridge: logger.error(f"[COLLECTOR] poll_commands error: {e}") await asyncio.sleep(self._poll_interval) + + # ─── Forward brain events → Gateway ─── + + async def _forward_events_loop(self): + """Read BrainEvents from Watcher queue and POST to Gateway.""" + while self._running: + try: + event: BrainEvent = await asyncio.wait_for( + self.event_queue.get(), timeout=5.0 + ) + # Serialize event to JSON + event_data = { + "event_type": event.event_type.value, + "conversation_id": event.conversation_id, + "file_name": event.file_name, + "file_path": str(event.file_path) if event.file_path else "", + "content": event.content, + "timestamp": event.timestamp, + } + self.remote._request("POST", "/api/event", event_data) + logger.info(f"[COLLECTOR] → Gateway: event {event.event_type.value} {event.file_name}") + except asyncio.TimeoutError: + continue + except Exception as e: + logger.error(f"[COLLECTOR] forward_event error: {e}") diff --git a/gateway.py b/gateway.py index b2b5a6d..4b1a036 100644 --- a/gateway.py +++ b/gateway.py @@ -20,6 +20,7 @@ import asyncio import json import time import logging +from pathlib import Path from aiohttp import web logger = logging.getLogger(__name__) @@ -47,6 +48,7 @@ class GatewayAPI: self.app.router.add_post("/api/chat", self._post_chat) self.app.router.add_post("/api/register", self._post_register) self.app.router.add_get("/api/commands/{project}", self._get_commands) + self.app.router.add_post("/api/event", self._post_event) # ─── Auth Middleware ─── @@ -184,6 +186,35 @@ class GatewayAPI: self._commands[project] = [] self._commands[project].append(command) + # ─── Brain Events (Collector → Gateway → Discord) ─── + + async def _post_event(self, request: web.Request) -> web.Response: + """Collector pushes a brain event (file change) for relay to Discord.""" + try: + data = await request.json() + from watcher import BrainEvent, EventType + + event_type_str = data.get("event_type", "file_changed") + event_type = EventType(event_type_str) + + event = BrainEvent( + event_type=event_type, + conversation_id=data.get("conversation_id", ""), + file_name=data.get("file_name", ""), + file_path=Path(data["file_path"]) if data.get("file_path") else None, + content=data.get("content", ""), + timestamp=data.get("timestamp", time.time()), + ) + + # Inject into bot's event queue (same path as local mode) + await self.bot.event_queue.put(event) + logger.info(f"[GATEWAY] event received: {event_type_str} {event.file_name} conv={event.conversation_id[:8]}") + + return web.json_response({"ok": True}) + except Exception as e: + logger.error(f"[GATEWAY] event error: {e}") + return web.json_response({"ok": False, "error": str(e)}, status=400) + # ─── Run ─── async def start(self): diff --git a/main.py b/main.py index 5db23bc..1df5939 100644 --- a/main.py +++ b/main.py @@ -65,7 +65,8 @@ async def main(): local.ensure_dirs() remote = RemoteTransport(Config.REMOTE_BRIDGE_URL, api_key=Config.GATEWAY_API_KEY) - collector = CollectorBridge(local, remote, project_name=Config.PROJECT_NAME) + collector = CollectorBridge(local, remote, project_name=Config.PROJECT_NAME, + event_queue=event_queue) logger.info(f"Collector mode: {Config.REMOTE_BRIDGE_URL}") # Optionally start watcher for brain events (local display only)