refactor: 아키텍처 수정 — 동기HTTP→aiohttp + 연결 모니터링 + 재시도큐
#1 동기 HTTP → async aiohttp (Critical) - RemoteTransport: urllib.request → aiohttp.ClientSession - 모든 HTTP 요청이 non-blocking으로 전환 - 이벤트 루프 블로킹 문제 해결 #2 연결 상태 모니터링 - RemoteTransport: connected 플래그 + 연속 실패 카운터 - Collector: 30초마다 health check → 실패 시 경고 로그 - 연결 복구 시 '✅ Gateway connected' 메시지 #3 실패 재시도 큐 - RemoteTransport: _retry_queue (최대 100건) - POST 실패 시 큐에 저장, 연결 복구 후 자동 재전송 - Collector: 10초마다 retry flush
This commit is contained in:
179
bridge.py
179
bridge.py
@@ -166,92 +166,147 @@ class RemoteTransport(BridgeTransport):
|
|||||||
self._headers = {"Content-Type": "application/json"}
|
self._headers = {"Content-Type": "application/json"}
|
||||||
if api_key:
|
if api_key:
|
||||||
self._headers["Authorization"] = f"Bearer {api_key}"
|
self._headers["Authorization"] = f"Bearer {api_key}"
|
||||||
|
self._session = None # aiohttp.ClientSession — lazy created
|
||||||
|
|
||||||
|
# Connection health
|
||||||
|
self.connected = False
|
||||||
|
self._consecutive_failures = 0
|
||||||
|
self._max_failures_before_warning = 3
|
||||||
|
|
||||||
|
# Retry queue: list of (method, path, data) tuples
|
||||||
|
self._retry_queue: list[tuple[str, str, dict | None]] = []
|
||||||
|
self._retry_queue_max = 100
|
||||||
|
|
||||||
logger.info(f"RemoteTransport: {self.base_url} (auth={'yes' if api_key else 'no'})")
|
logger.info(f"RemoteTransport: {self.base_url} (auth={'yes' if api_key else 'no'})")
|
||||||
|
|
||||||
def _request(self, method: str, path: str, data: dict | None = None) -> dict | None:
|
async def _get_session(self):
|
||||||
"""Make HTTP request to Gateway API."""
|
"""Lazy-create aiohttp session."""
|
||||||
import urllib.request
|
if self._session is None or self._session.closed:
|
||||||
import urllib.error
|
import aiohttp
|
||||||
|
timeout = aiohttp.ClientTimeout(total=10)
|
||||||
|
self._session = aiohttp.ClientSession(
|
||||||
|
headers=self._headers, timeout=timeout
|
||||||
|
)
|
||||||
|
return self._session
|
||||||
|
|
||||||
|
async def close(self):
|
||||||
|
"""Close the HTTP session."""
|
||||||
|
if self._session and not self._session.closed:
|
||||||
|
await self._session.close()
|
||||||
|
|
||||||
|
async def _arequest(self, method: str, path: str, data: dict | None = None) -> dict | None:
|
||||||
|
"""Async non-blocking HTTP request to Gateway API."""
|
||||||
|
session = await self._get_session()
|
||||||
url = f"{self.base_url}{path}"
|
url = f"{self.base_url}{path}"
|
||||||
body = json.dumps(data, ensure_ascii=False).encode("utf-8") if data else None
|
|
||||||
req = urllib.request.Request(url, data=body, headers=self._headers, method=method)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with urllib.request.urlopen(req, timeout=10) as resp:
|
kwargs = {}
|
||||||
return json.loads(resp.read().decode("utf-8"))
|
if data is not None:
|
||||||
except urllib.error.HTTPError as e:
|
kwargs["json"] = data
|
||||||
logger.warning(f"RemoteTransport: {method} {path} → {e.code} {e.reason}")
|
async with session.request(method, url, **kwargs) as resp:
|
||||||
|
if resp.status >= 400:
|
||||||
|
if resp.status == 401:
|
||||||
|
logger.error("RemoteTransport: 401 Unauthorized — check GATEWAY_API_KEY")
|
||||||
|
elif resp.status == 429:
|
||||||
|
logger.warning("RemoteTransport: 429 Rate limited")
|
||||||
|
else:
|
||||||
|
logger.warning(f"RemoteTransport: {method} {path} → {resp.status}")
|
||||||
return None
|
return None
|
||||||
except (urllib.error.URLError, OSError, json.JSONDecodeError) as e:
|
result = await resp.json()
|
||||||
|
if not self.connected:
|
||||||
|
logger.info("RemoteTransport: ✅ Gateway connected")
|
||||||
|
self.connected = True
|
||||||
|
self._consecutive_failures = 0
|
||||||
|
return result
|
||||||
|
except Exception as e:
|
||||||
|
self._consecutive_failures += 1
|
||||||
|
if self._consecutive_failures == self._max_failures_before_warning:
|
||||||
|
logger.error(f"RemoteTransport: ❌ Gateway unreachable ({self._consecutive_failures} failures): {e}")
|
||||||
|
elif self._consecutive_failures < self._max_failures_before_warning:
|
||||||
logger.warning(f"RemoteTransport: {method} {path} → {e}")
|
logger.warning(f"RemoteTransport: {method} {path} → {e}")
|
||||||
|
self.connected = False
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def list_json_files(self, subdir: str) -> list[str]:
|
async def _arequest_retry(self, method: str, path: str, data: dict | None = None) -> dict | None:
|
||||||
"""List pending requests from Gateway."""
|
"""Request with retry queue — failed POSTs are queued for later."""
|
||||||
if subdir == "pending":
|
result = await self._arequest(method, path, data)
|
||||||
result = self._request("GET", "/api/pending")
|
if result is None and method == "POST" and data is not None:
|
||||||
if result and isinstance(result, list):
|
if len(self._retry_queue) < self._retry_queue_max:
|
||||||
return [f"{r['request_id']}.json" for r in result]
|
self._retry_queue.append((method, path, data))
|
||||||
elif subdir == "commands":
|
return result
|
||||||
# Commands are polled per-project (handled separately)
|
|
||||||
return []
|
|
||||||
return []
|
|
||||||
|
|
||||||
def read_json(self, subdir: str, filename: str) -> dict | None:
|
async def flush_retry_queue(self):
|
||||||
"""Read a JSON file from Gateway."""
|
"""Retry queued failed requests."""
|
||||||
rid = filename.replace(".json", "")
|
if not self._retry_queue or not self.connected:
|
||||||
if subdir == "response":
|
return
|
||||||
return self._request("GET", f"/api/response/{rid}")
|
queue = self._retry_queue[:]
|
||||||
elif subdir == "pending":
|
self._retry_queue.clear()
|
||||||
# Pending data comes from list, not individual read
|
succeeded = 0
|
||||||
result = self._request("GET", "/api/pending")
|
for method, path, data in queue:
|
||||||
if result and isinstance(result, list):
|
result = await self._arequest(method, path, data)
|
||||||
for r in result:
|
if result is None:
|
||||||
if r.get("request_id") == rid:
|
if len(self._retry_queue) < self._retry_queue_max:
|
||||||
return r
|
self._retry_queue.append((method, path, data))
|
||||||
return None
|
break
|
||||||
|
succeeded += 1
|
||||||
|
if succeeded:
|
||||||
|
logger.info(f"[RETRY] flushed {succeeded}/{len(queue)} queued requests")
|
||||||
|
|
||||||
def write_json(self, subdir: str, filename: str, data: dict) -> None:
|
async def health_check(self) -> bool:
|
||||||
"""Write data to Gateway via API."""
|
"""Check if Gateway is reachable."""
|
||||||
|
result = await self._arequest("GET", "/../health")
|
||||||
|
return result is not None and result.get("status") == "ok"
|
||||||
|
|
||||||
|
# ─── Async methods (used by Collector) ───
|
||||||
|
|
||||||
|
async def awrite_json(self, subdir: str, filename: str, data: dict) -> None:
|
||||||
if subdir == "pending":
|
if subdir == "pending":
|
||||||
self._request("POST", "/api/pending", data)
|
await self._arequest_retry("POST", "/api/pending", data)
|
||||||
elif subdir == "response":
|
elif subdir == "response":
|
||||||
rid = data.get("request_id", filename.replace(".json", ""))
|
rid = data.get("request_id", filename.replace(".json", ""))
|
||||||
self._request("POST", f"/api/response/{rid}", data)
|
await self._arequest_retry("POST", f"/api/response/{rid}", data)
|
||||||
elif subdir == "commands":
|
|
||||||
# Commands go through write_command in BridgeProtocol
|
|
||||||
self._request("POST", "/api/chat", data)
|
|
||||||
|
|
||||||
def delete_file(self, subdir: str, filename: str) -> bool:
|
async def aread_json(self, subdir: str, filename: str) -> dict | None:
|
||||||
"""Delete not needed for remote — Gateway manages cleanup."""
|
rid = filename.replace(".json", "")
|
||||||
return True
|
if subdir == "response":
|
||||||
|
return await self._arequest("GET", f"/api/response/{rid}")
|
||||||
|
return None
|
||||||
|
|
||||||
def ensure_dirs(self) -> None:
|
async def apoll_commands(self, project: str) -> list[dict]:
|
||||||
"""No local dirs needed for remote transport."""
|
result = await self._arequest("GET", f"/api/commands/{project}")
|
||||||
pass
|
|
||||||
|
|
||||||
def poll_commands(self, project: str) -> list[dict]:
|
|
||||||
"""Poll Gateway for commands (Collector-specific, not in ABC)."""
|
|
||||||
result = self._request("GET", f"/api/commands/{project}")
|
|
||||||
if result and isinstance(result, dict):
|
if result and isinstance(result, dict):
|
||||||
return result.get("commands", [])
|
return result.get("commands", [])
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def register_session(self, conversation_id: str, project_name: str) -> None:
|
async def aregister_session(self, conv_id: str, project: str) -> None:
|
||||||
"""Register session → project mapping on Gateway."""
|
await self._arequest_retry("POST", "/api/register", {
|
||||||
self._request("POST", "/api/register", {
|
"conversation_id": conv_id, "project_name": project,
|
||||||
"conversation_id": conversation_id,
|
|
||||||
"project_name": project_name,
|
|
||||||
})
|
})
|
||||||
|
|
||||||
def send_chat(self, project_name: str, content: str) -> None:
|
async def asend_chat(self, project: str, content: str) -> None:
|
||||||
"""Push chat snapshot to Gateway for relay to Discord."""
|
await self._arequest_retry("POST", "/api/chat", {
|
||||||
self._request("POST", "/api/chat", {
|
"project_name": project, "content": content,
|
||||||
"project_name": project_name,
|
|
||||||
"content": content,
|
|
||||||
})
|
})
|
||||||
|
|
||||||
|
async def asend_event(self, event_data: dict) -> None:
|
||||||
|
await self._arequest_retry("POST", "/api/event", event_data)
|
||||||
|
|
||||||
|
# ─── Sync stubs (ABC compliance, not used in Collector) ───
|
||||||
|
|
||||||
|
def list_json_files(self, subdir: str) -> list[str]:
|
||||||
|
return []
|
||||||
|
|
||||||
|
def read_json(self, subdir: str, filename: str) -> dict | None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def write_json(self, subdir: str, filename: str, data: dict) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def delete_file(self, subdir: str, filename: str) -> bool:
|
||||||
|
return True
|
||||||
|
|
||||||
|
def ensure_dirs(self) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
# ─── Bridge Protocol (uses Transport) ───
|
# ─── Bridge Protocol (uses Transport) ───
|
||||||
|
|
||||||
|
|||||||
41
collector.py
41
collector.py
@@ -69,14 +69,17 @@ class CollectorBridge:
|
|||||||
self._poll_commands_loop(),
|
self._poll_commands_loop(),
|
||||||
self._forward_chat_snapshots_loop(),
|
self._forward_chat_snapshots_loop(),
|
||||||
self._forward_registrations_loop(),
|
self._forward_registrations_loop(),
|
||||||
|
self._health_check_loop(),
|
||||||
|
self._retry_flush_loop(),
|
||||||
]
|
]
|
||||||
if self.event_queue:
|
if self.event_queue:
|
||||||
tasks.append(self._forward_events_loop())
|
tasks.append(self._forward_events_loop())
|
||||||
await asyncio.gather(*tasks)
|
await asyncio.gather(*tasks)
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
"""Stop the Collector."""
|
"""Stop the Collector and close HTTP session."""
|
||||||
self._running = False
|
self._running = False
|
||||||
|
await self.remote.close()
|
||||||
logger.info("[COLLECTOR] stopped")
|
logger.info("[COLLECTOR] stopped")
|
||||||
|
|
||||||
# ─── Forward local pending → Gateway ───
|
# ─── Forward local pending → Gateway ───
|
||||||
@@ -114,7 +117,7 @@ class CollectorBridge:
|
|||||||
continue # Skip pre-existing files from before startup
|
continue # Skip pre-existing files from before startup
|
||||||
|
|
||||||
# Forward to Gateway (new or updated)
|
# Forward to Gateway (new or updated)
|
||||||
self.remote.write_json("pending", fname, data)
|
await self.remote.awrite_json("pending", fname, data)
|
||||||
self._forwarded_pending.add(rid)
|
self._forwarded_pending.add(rid)
|
||||||
self._pending_hashes[rid] = content_hash
|
self._pending_hashes[rid] = content_hash
|
||||||
|
|
||||||
@@ -143,7 +146,7 @@ class CollectorBridge:
|
|||||||
try:
|
try:
|
||||||
# Check each forwarded pending for a response
|
# Check each forwarded pending for a response
|
||||||
for rid in list(self._forwarded_pending):
|
for rid in list(self._forwarded_pending):
|
||||||
data = self.remote.read_json("response", f"{rid}.json")
|
data = await self.remote.aread_json("response", f"{rid}.json")
|
||||||
if data is None or data.get("waiting"):
|
if data is None or data.get("waiting"):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@@ -166,7 +169,7 @@ class CollectorBridge:
|
|||||||
"""Poll Gateway for commands and write them locally for Extension."""
|
"""Poll Gateway for commands and write them locally for Extension."""
|
||||||
while self._running:
|
while self._running:
|
||||||
try:
|
try:
|
||||||
commands = self.remote.poll_commands(self.project_name)
|
commands = await self.remote.apoll_commands(self.project_name)
|
||||||
for cmd in commands:
|
for cmd in commands:
|
||||||
cmd_id = cmd.get("id", str(int(time.time() * 1000)))
|
cmd_id = cmd.get("id", str(int(time.time() * 1000)))
|
||||||
fname = f"{cmd_id}.json"
|
fname = f"{cmd_id}.json"
|
||||||
@@ -192,7 +195,7 @@ class CollectorBridge:
|
|||||||
project = data.get("project_name", self.project_name)
|
project = data.get("project_name", self.project_name)
|
||||||
content = data.get("content", "")
|
content = data.get("content", "")
|
||||||
if content:
|
if content:
|
||||||
self.remote.send_chat(project, content)
|
await self.remote.asend_chat(project, content)
|
||||||
logger.info(f"[COLLECTOR] → Gateway: chat snapshot len={len(content)}")
|
logger.info(f"[COLLECTOR] → Gateway: chat snapshot len={len(content)}")
|
||||||
f.unlink() # Cleanup after forwarding
|
f.unlink() # Cleanup after forwarding
|
||||||
except (json.JSONDecodeError, OSError) as e:
|
except (json.JSONDecodeError, OSError) as e:
|
||||||
@@ -219,7 +222,7 @@ class CollectorBridge:
|
|||||||
conv_id = data.get("conversation_id", "")
|
conv_id = data.get("conversation_id", "")
|
||||||
project = data.get("project_name", "")
|
project = data.get("project_name", "")
|
||||||
if conv_id and project:
|
if conv_id and project:
|
||||||
self.remote.register_session(conv_id, project)
|
await self.remote.aregister_session(conv_id, project)
|
||||||
forwarded_regs.add(f.name)
|
forwarded_regs.add(f.name)
|
||||||
logger.info(f"[COLLECTOR] → Gateway: register {conv_id[:8]} → {project}")
|
logger.info(f"[COLLECTOR] → Gateway: register {conv_id[:8]} → {project}")
|
||||||
except (json.JSONDecodeError, OSError) as e:
|
except (json.JSONDecodeError, OSError) as e:
|
||||||
@@ -246,9 +249,33 @@ class CollectorBridge:
|
|||||||
"content": event.content,
|
"content": event.content,
|
||||||
"timestamp": event.timestamp,
|
"timestamp": event.timestamp,
|
||||||
}
|
}
|
||||||
self.remote._request("POST", "/api/event", event_data)
|
await self.remote.asend_event(event_data)
|
||||||
logger.info(f"[COLLECTOR] → Gateway: event {event.event_type.value} {event.file_name}")
|
logger.info(f"[COLLECTOR] → Gateway: event {event.event_type.value} {event.file_name}")
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
continue
|
continue
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"[COLLECTOR] forward_event error: {e}")
|
logger.error(f"[COLLECTOR] forward_event error: {e}")
|
||||||
|
|
||||||
|
# ─── Health check ───
|
||||||
|
|
||||||
|
async def _health_check_loop(self):
|
||||||
|
"""Periodically check Gateway connectivity."""
|
||||||
|
while self._running:
|
||||||
|
try:
|
||||||
|
ok = await self.remote.health_check()
|
||||||
|
if not ok and self.remote.connected:
|
||||||
|
logger.warning("[COLLECTOR] ❌ Gateway health check failed")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
await asyncio.sleep(30)
|
||||||
|
|
||||||
|
# ─── Retry flush ───
|
||||||
|
|
||||||
|
async def _retry_flush_loop(self):
|
||||||
|
"""Periodically flush failed request retry queue."""
|
||||||
|
while self._running:
|
||||||
|
try:
|
||||||
|
await self.remote.flush_retry_queue()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"[COLLECTOR] retry flush error: {e}")
|
||||||
|
await asyncio.sleep(10)
|
||||||
|
|||||||
Reference in New Issue
Block a user