fix(collector/bridge/gateway): rate limit 구조적 수정 — 점진적 백오프 + adaptive 폴링 + burst-friendly 윈도우
This commit is contained in:
23
bridge.py
23
bridge.py
@@ -176,8 +176,9 @@ class RemoteTransport(BridgeTransport):
|
|||||||
# Rate limit backoff
|
# Rate limit backoff
|
||||||
self._rate_limited_until = 0.0 # timestamp until which we should not send requests
|
self._rate_limited_until = 0.0 # timestamp until which we should not send requests
|
||||||
self._backoff_seconds = 0.0 # current backoff duration (exponential)
|
self._backoff_seconds = 0.0 # current backoff duration (exponential)
|
||||||
self._BACKOFF_BASE = 1.0
|
self._BACKOFF_BASE = 2.0
|
||||||
self._BACKOFF_MAX = 60.0
|
self._BACKOFF_MAX = 60.0
|
||||||
|
self._success_streak = 0 # consecutive successes for gradual backoff reduction
|
||||||
|
|
||||||
# Retry queue: list of (method, path, data) tuples
|
# Retry queue: list of (method, path, data) tuples
|
||||||
self._retry_queue: list[tuple[str, str, dict | None]] = []
|
self._retry_queue: list[tuple[str, str, dict | None]] = []
|
||||||
@@ -207,6 +208,7 @@ class RemoteTransport(BridgeTransport):
|
|||||||
|
|
||||||
def _apply_backoff(self, retry_after: float = 0):
|
def _apply_backoff(self, retry_after: float = 0):
|
||||||
"""Apply exponential backoff for rate limiting."""
|
"""Apply exponential backoff for rate limiting."""
|
||||||
|
self._success_streak = 0 # Reset success streak on any failure
|
||||||
if retry_after > 0:
|
if retry_after > 0:
|
||||||
self._backoff_seconds = min(retry_after, self._BACKOFF_MAX)
|
self._backoff_seconds = min(retry_after, self._BACKOFF_MAX)
|
||||||
else:
|
else:
|
||||||
@@ -217,11 +219,22 @@ class RemoteTransport(BridgeTransport):
|
|||||||
self._rate_limited_until = time.time() + self._backoff_seconds
|
self._rate_limited_until = time.time() + self._backoff_seconds
|
||||||
logger.warning(f"RemoteTransport: backing off {self._backoff_seconds:.0f}s (until +{self._backoff_seconds:.0f}s)")
|
logger.warning(f"RemoteTransport: backing off {self._backoff_seconds:.0f}s (until +{self._backoff_seconds:.0f}s)")
|
||||||
|
|
||||||
def _reset_backoff(self):
|
def _on_request_success(self):
|
||||||
"""Reset backoff after a successful request."""
|
"""Gradually reduce backoff after consecutive successes.
|
||||||
if self._backoff_seconds > 0:
|
|
||||||
|
Instead of instantly resetting to 0 (which causes the 1s oscillation loop
|
||||||
|
when 7 loops share one transport), require sustained success before reducing.
|
||||||
|
"""
|
||||||
|
if self._backoff_seconds <= 0:
|
||||||
|
return # Already at zero, nothing to do
|
||||||
|
self._success_streak += 1
|
||||||
|
if self._success_streak >= 5:
|
||||||
|
# Halve the backoff (gradual cooldown)
|
||||||
|
self._backoff_seconds = self._backoff_seconds / 2
|
||||||
|
if self._backoff_seconds < 0.5:
|
||||||
self._backoff_seconds = 0
|
self._backoff_seconds = 0
|
||||||
self._rate_limited_until = 0
|
self._rate_limited_until = 0
|
||||||
|
self._success_streak = 0
|
||||||
|
|
||||||
async def _arequest(self, method: str, path: str, data: dict | None = None) -> dict | None:
|
async def _arequest(self, method: str, path: str, data: dict | None = None) -> dict | None:
|
||||||
"""Async non-blocking HTTP request to Gateway API."""
|
"""Async non-blocking HTTP request to Gateway API."""
|
||||||
@@ -250,7 +263,7 @@ class RemoteTransport(BridgeTransport):
|
|||||||
logger.info("RemoteTransport: ✅ Gateway connected")
|
logger.info("RemoteTransport: ✅ Gateway connected")
|
||||||
self.connected = True
|
self.connected = True
|
||||||
self._consecutive_failures = 0
|
self._consecutive_failures = 0
|
||||||
self._reset_backoff()
|
self._on_request_success()
|
||||||
return result
|
return result
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self._consecutive_failures += 1
|
self._consecutive_failures += 1
|
||||||
|
|||||||
103
collector.py
103
collector.py
@@ -45,7 +45,9 @@ class CollectorBridge:
|
|||||||
# Pre-populate with existing pending files → skip on startup (prevents 만료됨 spam)
|
# Pre-populate with existing pending files → skip on startup (prevents 만료됨 spam)
|
||||||
self._startup_pending: set[str] = set()
|
self._startup_pending: set[str] = set()
|
||||||
self._forwarded_pending: set[str] = set()
|
self._forwarded_pending: set[str] = set()
|
||||||
|
self._forwarded_timestamps: dict[str, float] = {} # rid → when forwarded
|
||||||
self._pending_hashes: dict[str, str] = {} # rid → content hash (for MERGE/status detection)
|
self._pending_hashes: dict[str, str] = {} # rid → content hash (for MERGE/status detection)
|
||||||
|
self._RESPONSE_POLL_TTL = 300 # 5 min — stop polling responses for old pending
|
||||||
for fname in self.local.list_json_files("pending"):
|
for fname in self.local.list_json_files("pending"):
|
||||||
rid = fname.replace(".json", "")
|
rid = fname.replace(".json", "")
|
||||||
self._startup_pending.add(rid)
|
self._startup_pending.add(rid)
|
||||||
@@ -60,20 +62,29 @@ class CollectorBridge:
|
|||||||
logger.info(f"[COLLECTOR] skipping {len(self._startup_pending)} existing pending files")
|
logger.info(f"[COLLECTOR] skipping {len(self._startup_pending)} existing pending files")
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
"""Start the Collector polling loops."""
|
"""Start the Collector polling loops with staggered offsets.
|
||||||
|
|
||||||
|
Each loop starts with a different delay to prevent all loops from waking
|
||||||
|
up at the same time and causing burst requests to Gateway.
|
||||||
|
"""
|
||||||
self._running = True
|
self._running = True
|
||||||
logger.info(f"[COLLECTOR] started for project={self.project_name}")
|
logger.info(f"[COLLECTOR] started for project={self.project_name}")
|
||||||
|
|
||||||
|
async def _staggered(coro, offset: float):
|
||||||
|
await asyncio.sleep(offset)
|
||||||
|
await coro()
|
||||||
|
|
||||||
tasks = [
|
tasks = [
|
||||||
self._forward_pending_loop(),
|
_staggered(self._forward_pending_loop, 0.0),
|
||||||
self._poll_responses_loop(),
|
_staggered(self._poll_responses_loop, 0.5),
|
||||||
self._poll_commands_loop(),
|
_staggered(self._poll_commands_loop, 1.0),
|
||||||
self._forward_chat_snapshots_loop(),
|
_staggered(self._forward_chat_snapshots_loop, 1.5),
|
||||||
self._forward_registrations_loop(),
|
_staggered(self._forward_registrations_loop, 2.0),
|
||||||
self._health_check_loop(),
|
_staggered(self._health_check_loop, 2.5),
|
||||||
self._retry_flush_loop(),
|
_staggered(self._retry_flush_loop, 3.0),
|
||||||
]
|
]
|
||||||
if self.event_queue:
|
if self.event_queue:
|
||||||
tasks.append(self._forward_events_loop())
|
tasks.append(_staggered(self._forward_events_loop, 3.5))
|
||||||
await asyncio.gather(*tasks)
|
await asyncio.gather(*tasks)
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
@@ -128,6 +139,7 @@ class CollectorBridge:
|
|||||||
# Forward to Gateway (new or updated)
|
# Forward to Gateway (new or updated)
|
||||||
await self.remote.awrite_json("pending", fname, data)
|
await self.remote.awrite_json("pending", fname, data)
|
||||||
self._forwarded_pending.add(rid)
|
self._forwarded_pending.add(rid)
|
||||||
|
self._forwarded_timestamps[rid] = time.time()
|
||||||
self._pending_hashes[rid] = content_hash
|
self._pending_hashes[rid] = content_hash
|
||||||
|
|
||||||
if is_new:
|
if is_new:
|
||||||
@@ -150,7 +162,11 @@ class CollectorBridge:
|
|||||||
# ─── Poll Gateway responses → local ───
|
# ─── Poll Gateway responses → local ───
|
||||||
|
|
||||||
async def _poll_responses_loop(self):
|
async def _poll_responses_loop(self):
|
||||||
"""Poll Gateway for responses and write them locally for Extension."""
|
"""Poll Gateway for responses and write them locally for Extension.
|
||||||
|
|
||||||
|
Only polls responses for recently-forwarded pending (within _RESPONSE_POLL_TTL).
|
||||||
|
Expired entries are removed from tracking to prevent request accumulation.
|
||||||
|
"""
|
||||||
while self._running:
|
while self._running:
|
||||||
try:
|
try:
|
||||||
# Skip cycle if rate-limited
|
# Skip cycle if rate-limited
|
||||||
@@ -158,16 +174,31 @@ class CollectorBridge:
|
|||||||
await asyncio.sleep(self._poll_interval)
|
await asyncio.sleep(self._poll_interval)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Check each forwarded pending for a response
|
now = time.time()
|
||||||
for rid in list(self._forwarded_pending):
|
# Clean up expired forwarded pending (stop polling responses for old ones)
|
||||||
if rid in self._startup_pending:
|
expired = [
|
||||||
continue # Don't poll responses for pre-startup files
|
rid for rid, ts in self._forwarded_timestamps.items()
|
||||||
|
if now - ts > self._RESPONSE_POLL_TTL
|
||||||
|
]
|
||||||
|
for rid in expired:
|
||||||
|
self._forwarded_pending.discard(rid)
|
||||||
|
self._forwarded_timestamps.pop(rid, None)
|
||||||
|
self._pending_hashes.pop(rid, None)
|
||||||
|
if expired:
|
||||||
|
logger.info(f"[COLLECTOR] expired {len(expired)} stale forwarded pending (>{self._RESPONSE_POLL_TTL}s)")
|
||||||
|
|
||||||
|
# Check each active forwarded pending for a response
|
||||||
|
active_rids = [
|
||||||
|
rid for rid in self._forwarded_pending
|
||||||
|
if rid not in self._startup_pending
|
||||||
|
]
|
||||||
|
for rid in active_rids:
|
||||||
# Rate-limit guard: stop polling if we got rate-limited mid-cycle
|
# Rate-limit guard: stop polling if we got rate-limited mid-cycle
|
||||||
if self.remote.is_rate_limited:
|
if self.remote.is_rate_limited:
|
||||||
break
|
break
|
||||||
data = await self.remote.aread_json("response", f"{rid}.json")
|
data = await self.remote.aread_json("response", f"{rid}.json")
|
||||||
if data is None or data.get("waiting"):
|
if data is None or data.get("waiting"):
|
||||||
await asyncio.sleep(0.2) # Throttle between individual response polls
|
await asyncio.sleep(0.3) # Throttle between individual response polls
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Write response locally for Extension to pick up
|
# Write response locally for Extension to pick up
|
||||||
@@ -175,6 +206,7 @@ class CollectorBridge:
|
|||||||
# Also delete local pending file (Extension expects this)
|
# Also delete local pending file (Extension expects this)
|
||||||
self.local.delete_file("pending", f"{rid}.json")
|
self.local.delete_file("pending", f"{rid}.json")
|
||||||
self._forwarded_pending.discard(rid)
|
self._forwarded_pending.discard(rid)
|
||||||
|
self._forwarded_timestamps.pop(rid, None)
|
||||||
approved = data.get("approved", "?")
|
approved = data.get("approved", "?")
|
||||||
logger.info(f"[COLLECTOR] ← Gateway: response {rid[:12]} approved={approved}")
|
logger.info(f"[COLLECTOR] ← Gateway: response {rid[:12]} approved={approved}")
|
||||||
|
|
||||||
@@ -207,27 +239,56 @@ class CollectorBridge:
|
|||||||
return projects
|
return projects
|
||||||
|
|
||||||
async def _poll_commands_loop(self):
|
async def _poll_commands_loop(self):
|
||||||
"""Poll Gateway for commands for ALL local projects.
|
"""Poll Gateway for commands with adaptive per-project intervals.
|
||||||
|
|
||||||
Discovers projects from bridge/register/ (written by each AG Extension)
|
When a project returns empty commands repeatedly, its poll interval
|
||||||
and polls commands for each. Extension-side filtering (project_name check)
|
increases (3s → 10s → 30s → 60s). On receiving a command, interval
|
||||||
ensures each AG window only processes its own commands.
|
resets to base. This prevents idle projects from wasting requests.
|
||||||
"""
|
"""
|
||||||
|
# Per-project adaptive state
|
||||||
|
project_intervals: dict[str, float] = {} # project → current interval
|
||||||
|
project_last_poll: dict[str, float] = {} # project → last poll timestamp
|
||||||
|
_BASE_INTERVAL = 3.0
|
||||||
|
_IDLE_STEPS = [10.0, 30.0, 60.0] # progressive idle intervals
|
||||||
|
project_empty_streak: dict[str, int] = {} # project → consecutive empty polls
|
||||||
|
|
||||||
while self._running:
|
while self._running:
|
||||||
try:
|
try:
|
||||||
# Skip cycle if rate-limited
|
# Skip cycle if rate-limited
|
||||||
if not self.remote.is_rate_limited:
|
if not self.remote.is_rate_limited:
|
||||||
projects = self._discover_local_projects()
|
projects = self._discover_local_projects()
|
||||||
|
now = time.time()
|
||||||
for project in projects:
|
for project in projects:
|
||||||
if self.remote.is_rate_limited:
|
if self.remote.is_rate_limited:
|
||||||
break # Stop mid-cycle if rate-limited
|
break
|
||||||
|
|
||||||
|
# Check if this project's interval has elapsed
|
||||||
|
interval = project_intervals.get(project, _BASE_INTERVAL)
|
||||||
|
last = project_last_poll.get(project, 0)
|
||||||
|
if now - last < interval:
|
||||||
|
continue # Not time yet for this project
|
||||||
|
|
||||||
|
project_last_poll[project] = now
|
||||||
commands = await self.remote.apoll_commands(project)
|
commands = await self.remote.apoll_commands(project)
|
||||||
|
|
||||||
|
if commands:
|
||||||
|
# Got commands → reset to base interval
|
||||||
|
project_intervals[project] = _BASE_INTERVAL
|
||||||
|
project_empty_streak[project] = 0
|
||||||
for cmd in commands:
|
for cmd in commands:
|
||||||
cmd_id = cmd.get("id", str(int(time.time() * 1000)))
|
cmd_id = cmd.get("id", str(int(time.time() * 1000)))
|
||||||
fname = f"{cmd_id}.json"
|
fname = f"{cmd_id}.json"
|
||||||
self.local.write_json("commands", fname, cmd)
|
self.local.write_json("commands", fname, cmd)
|
||||||
logger.info(f"[COLLECTOR] ← Gateway: command [{project}] {cmd.get('text', '?')[:30]}")
|
logger.info(f"[COLLECTOR] ← Gateway: command [{project}] {cmd.get('text', '?')[:30]}")
|
||||||
await asyncio.sleep(0.3) # Throttle between projects to avoid rate limit bursts
|
else:
|
||||||
|
# Empty → increase interval progressively
|
||||||
|
streak = project_empty_streak.get(project, 0) + 1
|
||||||
|
project_empty_streak[project] = streak
|
||||||
|
if streak <= len(_IDLE_STEPS):
|
||||||
|
project_intervals[project] = _IDLE_STEPS[streak - 1]
|
||||||
|
# else stays at max (60s)
|
||||||
|
|
||||||
|
await asyncio.sleep(0.3) # Throttle between projects
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"[COLLECTOR] poll_commands error: {e}")
|
logger.error(f"[COLLECTOR] poll_commands error: {e}")
|
||||||
|
|||||||
@@ -26,9 +26,9 @@ from aiohttp import web
|
|||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# Rate limiting
|
# Rate limiting — burst-friendly: 10s window absorbs async loop wake-up bursts
|
||||||
RATE_LIMIT_WINDOW = 1.0 # seconds
|
RATE_LIMIT_WINDOW = 10.0 # seconds (was 1.0 — too strict for burst patterns)
|
||||||
RATE_LIMIT_MAX = 30 # max requests per window per IP (Collector needs ~15-20/cycle)
|
RATE_LIMIT_MAX = 100 # max requests per window per IP
|
||||||
COMMAND_TTL = 1800 # 30 min — stale commands auto-deleted
|
COMMAND_TTL = 1800 # 30 min — stale commands auto-deleted
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user