diff --git a/bridge.py b/bridge.py index 33ce977..bb56334 100644 --- a/bridge.py +++ b/bridge.py @@ -176,8 +176,9 @@ class RemoteTransport(BridgeTransport): # Rate limit backoff self._rate_limited_until = 0.0 # timestamp until which we should not send requests self._backoff_seconds = 0.0 # current backoff duration (exponential) - self._BACKOFF_BASE = 1.0 + self._BACKOFF_BASE = 2.0 self._BACKOFF_MAX = 60.0 + self._success_streak = 0 # consecutive successes for gradual backoff reduction # Retry queue: list of (method, path, data) tuples self._retry_queue: list[tuple[str, str, dict | None]] = [] @@ -207,6 +208,7 @@ class RemoteTransport(BridgeTransport): def _apply_backoff(self, retry_after: float = 0): """Apply exponential backoff for rate limiting.""" + self._success_streak = 0 # Reset success streak on any failure if retry_after > 0: self._backoff_seconds = min(retry_after, self._BACKOFF_MAX) else: @@ -217,11 +219,22 @@ class RemoteTransport(BridgeTransport): self._rate_limited_until = time.time() + self._backoff_seconds logger.warning(f"RemoteTransport: backing off {self._backoff_seconds:.0f}s (until +{self._backoff_seconds:.0f}s)") - def _reset_backoff(self): - """Reset backoff after a successful request.""" - if self._backoff_seconds > 0: - self._backoff_seconds = 0 - self._rate_limited_until = 0 + def _on_request_success(self): + """Gradually reduce backoff after consecutive successes. + + Instead of instantly resetting to 0 (which causes the 1s oscillation loop + when 7 loops share one transport), require sustained success before reducing. + """ + if self._backoff_seconds <= 0: + return # Already at zero, nothing to do + self._success_streak += 1 + if self._success_streak >= 5: + # Halve the backoff (gradual cooldown) + self._backoff_seconds = self._backoff_seconds / 2 + if self._backoff_seconds < 0.5: + self._backoff_seconds = 0 + self._rate_limited_until = 0 + self._success_streak = 0 async def _arequest(self, method: str, path: str, data: dict | None = None) -> dict | None: """Async non-blocking HTTP request to Gateway API.""" @@ -250,7 +263,7 @@ class RemoteTransport(BridgeTransport): logger.info("RemoteTransport: ✅ Gateway connected") self.connected = True self._consecutive_failures = 0 - self._reset_backoff() + self._on_request_success() return result except Exception as e: self._consecutive_failures += 1 diff --git a/collector.py b/collector.py index 1b58ced..ba1b2e6 100644 --- a/collector.py +++ b/collector.py @@ -45,7 +45,9 @@ class CollectorBridge: # Pre-populate with existing pending files → skip on startup (prevents 만료됨 spam) self._startup_pending: set[str] = set() self._forwarded_pending: set[str] = set() + self._forwarded_timestamps: dict[str, float] = {} # rid → when forwarded self._pending_hashes: dict[str, str] = {} # rid → content hash (for MERGE/status detection) + self._RESPONSE_POLL_TTL = 300 # 5 min — stop polling responses for old pending for fname in self.local.list_json_files("pending"): rid = fname.replace(".json", "") self._startup_pending.add(rid) @@ -60,20 +62,29 @@ class CollectorBridge: logger.info(f"[COLLECTOR] skipping {len(self._startup_pending)} existing pending files") async def start(self): - """Start the Collector polling loops.""" + """Start the Collector polling loops with staggered offsets. + + Each loop starts with a different delay to prevent all loops from waking + up at the same time and causing burst requests to Gateway. + """ self._running = True logger.info(f"[COLLECTOR] started for project={self.project_name}") + + async def _staggered(coro, offset: float): + await asyncio.sleep(offset) + await coro() + tasks = [ - self._forward_pending_loop(), - self._poll_responses_loop(), - self._poll_commands_loop(), - self._forward_chat_snapshots_loop(), - self._forward_registrations_loop(), - self._health_check_loop(), - self._retry_flush_loop(), + _staggered(self._forward_pending_loop, 0.0), + _staggered(self._poll_responses_loop, 0.5), + _staggered(self._poll_commands_loop, 1.0), + _staggered(self._forward_chat_snapshots_loop, 1.5), + _staggered(self._forward_registrations_loop, 2.0), + _staggered(self._health_check_loop, 2.5), + _staggered(self._retry_flush_loop, 3.0), ] if self.event_queue: - tasks.append(self._forward_events_loop()) + tasks.append(_staggered(self._forward_events_loop, 3.5)) await asyncio.gather(*tasks) async def stop(self): @@ -128,6 +139,7 @@ class CollectorBridge: # Forward to Gateway (new or updated) await self.remote.awrite_json("pending", fname, data) self._forwarded_pending.add(rid) + self._forwarded_timestamps[rid] = time.time() self._pending_hashes[rid] = content_hash if is_new: @@ -150,7 +162,11 @@ class CollectorBridge: # ─── Poll Gateway responses → local ─── async def _poll_responses_loop(self): - """Poll Gateway for responses and write them locally for Extension.""" + """Poll Gateway for responses and write them locally for Extension. + + Only polls responses for recently-forwarded pending (within _RESPONSE_POLL_TTL). + Expired entries are removed from tracking to prevent request accumulation. + """ while self._running: try: # Skip cycle if rate-limited @@ -158,16 +174,31 @@ class CollectorBridge: await asyncio.sleep(self._poll_interval) continue - # Check each forwarded pending for a response - for rid in list(self._forwarded_pending): - if rid in self._startup_pending: - continue # Don't poll responses for pre-startup files + now = time.time() + # Clean up expired forwarded pending (stop polling responses for old ones) + expired = [ + rid for rid, ts in self._forwarded_timestamps.items() + if now - ts > self._RESPONSE_POLL_TTL + ] + for rid in expired: + self._forwarded_pending.discard(rid) + self._forwarded_timestamps.pop(rid, None) + self._pending_hashes.pop(rid, None) + if expired: + logger.info(f"[COLLECTOR] expired {len(expired)} stale forwarded pending (>{self._RESPONSE_POLL_TTL}s)") + + # Check each active forwarded pending for a response + active_rids = [ + rid for rid in self._forwarded_pending + if rid not in self._startup_pending + ] + for rid in active_rids: # Rate-limit guard: stop polling if we got rate-limited mid-cycle if self.remote.is_rate_limited: break data = await self.remote.aread_json("response", f"{rid}.json") if data is None or data.get("waiting"): - await asyncio.sleep(0.2) # Throttle between individual response polls + await asyncio.sleep(0.3) # Throttle between individual response polls continue # Write response locally for Extension to pick up @@ -175,6 +206,7 @@ class CollectorBridge: # Also delete local pending file (Extension expects this) self.local.delete_file("pending", f"{rid}.json") self._forwarded_pending.discard(rid) + self._forwarded_timestamps.pop(rid, None) approved = data.get("approved", "?") logger.info(f"[COLLECTOR] ← Gateway: response {rid[:12]} approved={approved}") @@ -207,27 +239,56 @@ class CollectorBridge: return projects async def _poll_commands_loop(self): - """Poll Gateway for commands for ALL local projects. + """Poll Gateway for commands with adaptive per-project intervals. - Discovers projects from bridge/register/ (written by each AG Extension) - and polls commands for each. Extension-side filtering (project_name check) - ensures each AG window only processes its own commands. + When a project returns empty commands repeatedly, its poll interval + increases (3s → 10s → 30s → 60s). On receiving a command, interval + resets to base. This prevents idle projects from wasting requests. """ + # Per-project adaptive state + project_intervals: dict[str, float] = {} # project → current interval + project_last_poll: dict[str, float] = {} # project → last poll timestamp + _BASE_INTERVAL = 3.0 + _IDLE_STEPS = [10.0, 30.0, 60.0] # progressive idle intervals + project_empty_streak: dict[str, int] = {} # project → consecutive empty polls + while self._running: try: # Skip cycle if rate-limited if not self.remote.is_rate_limited: projects = self._discover_local_projects() + now = time.time() for project in projects: if self.remote.is_rate_limited: - break # Stop mid-cycle if rate-limited + break + + # Check if this project's interval has elapsed + interval = project_intervals.get(project, _BASE_INTERVAL) + last = project_last_poll.get(project, 0) + if now - last < interval: + continue # Not time yet for this project + + project_last_poll[project] = now commands = await self.remote.apoll_commands(project) - for cmd in commands: - cmd_id = cmd.get("id", str(int(time.time() * 1000))) - fname = f"{cmd_id}.json" - self.local.write_json("commands", fname, cmd) - logger.info(f"[COLLECTOR] ← Gateway: command [{project}] {cmd.get('text', '?')[:30]}") - await asyncio.sleep(0.3) # Throttle between projects to avoid rate limit bursts + + if commands: + # Got commands → reset to base interval + project_intervals[project] = _BASE_INTERVAL + project_empty_streak[project] = 0 + for cmd in commands: + cmd_id = cmd.get("id", str(int(time.time() * 1000))) + fname = f"{cmd_id}.json" + self.local.write_json("commands", fname, cmd) + logger.info(f"[COLLECTOR] ← Gateway: command [{project}] {cmd.get('text', '?')[:30]}") + else: + # Empty → increase interval progressively + streak = project_empty_streak.get(project, 0) + 1 + project_empty_streak[project] = streak + if streak <= len(_IDLE_STEPS): + project_intervals[project] = _IDLE_STEPS[streak - 1] + # else stays at max (60s) + + await asyncio.sleep(0.3) # Throttle between projects except Exception as e: logger.error(f"[COLLECTOR] poll_commands error: {e}") diff --git a/gateway.py b/gateway.py index 547fb8e..26ef147 100644 --- a/gateway.py +++ b/gateway.py @@ -26,9 +26,9 @@ from aiohttp import web logger = logging.getLogger(__name__) -# Rate limiting -RATE_LIMIT_WINDOW = 1.0 # seconds -RATE_LIMIT_MAX = 30 # max requests per window per IP (Collector needs ~15-20/cycle) +# Rate limiting — burst-friendly: 10s window absorbs async loop wake-up bursts +RATE_LIMIT_WINDOW = 10.0 # seconds (was 1.0 — too strict for burst patterns) +RATE_LIMIT_MAX = 100 # max requests per window per IP COMMAND_TTL = 1800 # 30 min — stale commands auto-deleted