From 1bf41ceee3205975d935045380d202edca2c5ab6 Mon Sep 17 00:00:00 2001 From: Variet Worker Date: Wed, 11 Mar 2026 22:55:54 +0900 Subject: [PATCH] =?UTF-8?q?refactor:=20=EC=95=84=ED=82=A4=ED=85=8D?= =?UTF-8?q?=EC=B2=98=20=EC=88=98=EC=A0=95=20=E2=80=94=20=EB=8F=99=EA=B8=B0?= =?UTF-8?q?HTTP=E2=86=92aiohttp=20+=20=EC=97=B0=EA=B2=B0=20=EB=AA=A8?= =?UTF-8?q?=EB=8B=88=ED=84=B0=EB=A7=81=20+=20=EC=9E=AC=EC=8B=9C=EB=8F=84?= =?UTF-8?q?=ED=81=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #1 동기 HTTP → async aiohttp (Critical) - RemoteTransport: urllib.request → aiohttp.ClientSession - 모든 HTTP 요청이 non-blocking으로 전환 - 이벤트 루프 블로킹 문제 해결 #2 연결 상태 모니터링 - RemoteTransport: connected 플래그 + 연속 실패 카운터 - Collector: 30초마다 health check → 실패 시 경고 로그 - 연결 복구 시 '✅ Gateway connected' 메시지 #3 실패 재시도 큐 - RemoteTransport: _retry_queue (최대 100건) - POST 실패 시 큐에 저장, 연결 복구 후 자동 재전송 - Collector: 10초마다 retry flush --- bridge.py | 183 +++++++++++++++++++++++++++++++++------------------ collector.py | 41 ++++++++++-- 2 files changed, 153 insertions(+), 71 deletions(-) diff --git a/bridge.py b/bridge.py index 3880f4f..a4c6b6e 100644 --- a/bridge.py +++ b/bridge.py @@ -166,92 +166,147 @@ class RemoteTransport(BridgeTransport): self._headers = {"Content-Type": "application/json"} if api_key: self._headers["Authorization"] = f"Bearer {api_key}" + self._session = None # aiohttp.ClientSession — lazy created + + # Connection health + self.connected = False + self._consecutive_failures = 0 + self._max_failures_before_warning = 3 + + # Retry queue: list of (method, path, data) tuples + self._retry_queue: list[tuple[str, str, dict | None]] = [] + self._retry_queue_max = 100 + logger.info(f"RemoteTransport: {self.base_url} (auth={'yes' if api_key else 'no'})") - def _request(self, method: str, path: str, data: dict | None = None) -> dict | None: - """Make HTTP request to Gateway API.""" - import urllib.request - import urllib.error + async def _get_session(self): + """Lazy-create aiohttp session.""" + if self._session is None or self._session.closed: + import aiohttp + timeout = aiohttp.ClientTimeout(total=10) + self._session = aiohttp.ClientSession( + headers=self._headers, timeout=timeout + ) + return self._session + async def close(self): + """Close the HTTP session.""" + if self._session and not self._session.closed: + await self._session.close() + + async def _arequest(self, method: str, path: str, data: dict | None = None) -> dict | None: + """Async non-blocking HTTP request to Gateway API.""" + session = await self._get_session() url = f"{self.base_url}{path}" - body = json.dumps(data, ensure_ascii=False).encode("utf-8") if data else None - req = urllib.request.Request(url, data=body, headers=self._headers, method=method) - try: - with urllib.request.urlopen(req, timeout=10) as resp: - return json.loads(resp.read().decode("utf-8")) - except urllib.error.HTTPError as e: - logger.warning(f"RemoteTransport: {method} {path} → {e.code} {e.reason}") - return None - except (urllib.error.URLError, OSError, json.JSONDecodeError) as e: - logger.warning(f"RemoteTransport: {method} {path} → {e}") + kwargs = {} + if data is not None: + kwargs["json"] = data + async with session.request(method, url, **kwargs) as resp: + if resp.status >= 400: + if resp.status == 401: + logger.error("RemoteTransport: 401 Unauthorized — check GATEWAY_API_KEY") + elif resp.status == 429: + logger.warning("RemoteTransport: 429 Rate limited") + else: + logger.warning(f"RemoteTransport: {method} {path} → {resp.status}") + return None + result = await resp.json() + if not self.connected: + logger.info("RemoteTransport: ✅ Gateway connected") + self.connected = True + self._consecutive_failures = 0 + return result + except Exception as e: + self._consecutive_failures += 1 + if self._consecutive_failures == self._max_failures_before_warning: + logger.error(f"RemoteTransport: ❌ Gateway unreachable ({self._consecutive_failures} failures): {e}") + elif self._consecutive_failures < self._max_failures_before_warning: + logger.warning(f"RemoteTransport: {method} {path} → {e}") + self.connected = False return None - def list_json_files(self, subdir: str) -> list[str]: - """List pending requests from Gateway.""" - if subdir == "pending": - result = self._request("GET", "/api/pending") - if result and isinstance(result, list): - return [f"{r['request_id']}.json" for r in result] - elif subdir == "commands": - # Commands are polled per-project (handled separately) - return [] - return [] + async def _arequest_retry(self, method: str, path: str, data: dict | None = None) -> dict | None: + """Request with retry queue — failed POSTs are queued for later.""" + result = await self._arequest(method, path, data) + if result is None and method == "POST" and data is not None: + if len(self._retry_queue) < self._retry_queue_max: + self._retry_queue.append((method, path, data)) + return result - def read_json(self, subdir: str, filename: str) -> dict | None: - """Read a JSON file from Gateway.""" - rid = filename.replace(".json", "") - if subdir == "response": - return self._request("GET", f"/api/response/{rid}") - elif subdir == "pending": - # Pending data comes from list, not individual read - result = self._request("GET", "/api/pending") - if result and isinstance(result, list): - for r in result: - if r.get("request_id") == rid: - return r - return None + async def flush_retry_queue(self): + """Retry queued failed requests.""" + if not self._retry_queue or not self.connected: + return + queue = self._retry_queue[:] + self._retry_queue.clear() + succeeded = 0 + for method, path, data in queue: + result = await self._arequest(method, path, data) + if result is None: + if len(self._retry_queue) < self._retry_queue_max: + self._retry_queue.append((method, path, data)) + break + succeeded += 1 + if succeeded: + logger.info(f"[RETRY] flushed {succeeded}/{len(queue)} queued requests") - def write_json(self, subdir: str, filename: str, data: dict) -> None: - """Write data to Gateway via API.""" + async def health_check(self) -> bool: + """Check if Gateway is reachable.""" + result = await self._arequest("GET", "/../health") + return result is not None and result.get("status") == "ok" + + # ─── Async methods (used by Collector) ─── + + async def awrite_json(self, subdir: str, filename: str, data: dict) -> None: if subdir == "pending": - self._request("POST", "/api/pending", data) + await self._arequest_retry("POST", "/api/pending", data) elif subdir == "response": rid = data.get("request_id", filename.replace(".json", "")) - self._request("POST", f"/api/response/{rid}", data) - elif subdir == "commands": - # Commands go through write_command in BridgeProtocol - self._request("POST", "/api/chat", data) + await self._arequest_retry("POST", f"/api/response/{rid}", data) - def delete_file(self, subdir: str, filename: str) -> bool: - """Delete not needed for remote — Gateway manages cleanup.""" - return True + async def aread_json(self, subdir: str, filename: str) -> dict | None: + rid = filename.replace(".json", "") + if subdir == "response": + return await self._arequest("GET", f"/api/response/{rid}") + return None - def ensure_dirs(self) -> None: - """No local dirs needed for remote transport.""" - pass - - def poll_commands(self, project: str) -> list[dict]: - """Poll Gateway for commands (Collector-specific, not in ABC).""" - result = self._request("GET", f"/api/commands/{project}") + async def apoll_commands(self, project: str) -> list[dict]: + result = await self._arequest("GET", f"/api/commands/{project}") if result and isinstance(result, dict): return result.get("commands", []) return [] - def register_session(self, conversation_id: str, project_name: str) -> None: - """Register session → project mapping on Gateway.""" - self._request("POST", "/api/register", { - "conversation_id": conversation_id, - "project_name": project_name, + async def aregister_session(self, conv_id: str, project: str) -> None: + await self._arequest_retry("POST", "/api/register", { + "conversation_id": conv_id, "project_name": project, }) - def send_chat(self, project_name: str, content: str) -> None: - """Push chat snapshot to Gateway for relay to Discord.""" - self._request("POST", "/api/chat", { - "project_name": project_name, - "content": content, + async def asend_chat(self, project: str, content: str) -> None: + await self._arequest_retry("POST", "/api/chat", { + "project_name": project, "content": content, }) + async def asend_event(self, event_data: dict) -> None: + await self._arequest_retry("POST", "/api/event", event_data) + + # ─── Sync stubs (ABC compliance, not used in Collector) ─── + + def list_json_files(self, subdir: str) -> list[str]: + return [] + + def read_json(self, subdir: str, filename: str) -> dict | None: + return None + + def write_json(self, subdir: str, filename: str, data: dict) -> None: + pass + + def delete_file(self, subdir: str, filename: str) -> bool: + return True + + def ensure_dirs(self) -> None: + pass + # ─── Bridge Protocol (uses Transport) ─── diff --git a/collector.py b/collector.py index 9489d21..4e72a1e 100644 --- a/collector.py +++ b/collector.py @@ -69,14 +69,17 @@ class CollectorBridge: self._poll_commands_loop(), self._forward_chat_snapshots_loop(), self._forward_registrations_loop(), + self._health_check_loop(), + self._retry_flush_loop(), ] if self.event_queue: tasks.append(self._forward_events_loop()) await asyncio.gather(*tasks) async def stop(self): - """Stop the Collector.""" + """Stop the Collector and close HTTP session.""" self._running = False + await self.remote.close() logger.info("[COLLECTOR] stopped") # ─── Forward local pending → Gateway ─── @@ -114,7 +117,7 @@ class CollectorBridge: continue # Skip pre-existing files from before startup # Forward to Gateway (new or updated) - self.remote.write_json("pending", fname, data) + await self.remote.awrite_json("pending", fname, data) self._forwarded_pending.add(rid) self._pending_hashes[rid] = content_hash @@ -143,7 +146,7 @@ class CollectorBridge: try: # Check each forwarded pending for a response for rid in list(self._forwarded_pending): - data = self.remote.read_json("response", f"{rid}.json") + data = await self.remote.aread_json("response", f"{rid}.json") if data is None or data.get("waiting"): continue @@ -166,7 +169,7 @@ class CollectorBridge: """Poll Gateway for commands and write them locally for Extension.""" while self._running: try: - commands = self.remote.poll_commands(self.project_name) + commands = await self.remote.apoll_commands(self.project_name) for cmd in commands: cmd_id = cmd.get("id", str(int(time.time() * 1000))) fname = f"{cmd_id}.json" @@ -192,7 +195,7 @@ class CollectorBridge: project = data.get("project_name", self.project_name) content = data.get("content", "") if content: - self.remote.send_chat(project, content) + await self.remote.asend_chat(project, content) logger.info(f"[COLLECTOR] → Gateway: chat snapshot len={len(content)}") f.unlink() # Cleanup after forwarding except (json.JSONDecodeError, OSError) as e: @@ -219,7 +222,7 @@ class CollectorBridge: conv_id = data.get("conversation_id", "") project = data.get("project_name", "") if conv_id and project: - self.remote.register_session(conv_id, project) + await self.remote.aregister_session(conv_id, project) forwarded_regs.add(f.name) logger.info(f"[COLLECTOR] → Gateway: register {conv_id[:8]} → {project}") except (json.JSONDecodeError, OSError) as e: @@ -246,9 +249,33 @@ class CollectorBridge: "content": event.content, "timestamp": event.timestamp, } - self.remote._request("POST", "/api/event", event_data) + await self.remote.asend_event(event_data) logger.info(f"[COLLECTOR] → Gateway: event {event.event_type.value} {event.file_name}") except asyncio.TimeoutError: continue except Exception as e: logger.error(f"[COLLECTOR] forward_event error: {e}") + + # ─── Health check ─── + + async def _health_check_loop(self): + """Periodically check Gateway connectivity.""" + while self._running: + try: + ok = await self.remote.health_check() + if not ok and self.remote.connected: + logger.warning("[COLLECTOR] ❌ Gateway health check failed") + except Exception: + pass + await asyncio.sleep(30) + + # ─── Retry flush ─── + + async def _retry_flush_loop(self): + """Periodically flush failed request retry queue.""" + while self._running: + try: + await self.remote.flush_retry_queue() + except Exception as e: + logger.error(f"[COLLECTOR] retry flush error: {e}") + await asyncio.sleep(10)