perf(collector/watcher): 로컬 데이터 전송 성능 최적화 — mtime 프리체크, 프로젝트 캐시, re-forward 수정, 폴링 간격 조정
This commit is contained in:
73
collector.py
73
collector.py
@@ -13,6 +13,7 @@ Flow:
|
|||||||
import asyncio
|
import asyncio
|
||||||
import hashlib
|
import hashlib
|
||||||
import json
|
import json
|
||||||
|
import os
|
||||||
import time
|
import time
|
||||||
import logging
|
import logging
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
@@ -39,7 +40,7 @@ class CollectorBridge:
|
|||||||
self.remote = remote
|
self.remote = remote
|
||||||
self.project_name = project_name
|
self.project_name = project_name
|
||||||
self.event_queue = event_queue
|
self.event_queue = event_queue
|
||||||
self._poll_interval = 3 # seconds
|
self._poll_interval = 5 # seconds (was 3 — reduced I/O frequency)
|
||||||
self._running = False
|
self._running = False
|
||||||
|
|
||||||
# Pre-populate with existing pending files → skip on startup (prevents 만료됨 spam)
|
# Pre-populate with existing pending files → skip on startup (prevents 만료됨 spam)
|
||||||
@@ -47,7 +48,13 @@ class CollectorBridge:
|
|||||||
self._forwarded_pending: set[str] = set()
|
self._forwarded_pending: set[str] = set()
|
||||||
self._forwarded_timestamps: dict[str, float] = {} # rid → when forwarded
|
self._forwarded_timestamps: dict[str, float] = {} # rid → when forwarded
|
||||||
self._pending_hashes: dict[str, str] = {} # rid → content hash (for MERGE/status detection)
|
self._pending_hashes: dict[str, str] = {} # rid → content hash (for MERGE/status detection)
|
||||||
|
self._pending_mtimes: dict[str, float] = {} # rid → last known file mtime
|
||||||
self._RESPONSE_POLL_TTL = 300 # 5 min — stop polling responses for old pending
|
self._RESPONSE_POLL_TTL = 300 # 5 min — stop polling responses for old pending
|
||||||
|
|
||||||
|
# Project discovery cache (avoid re-reading register/ every cycle)
|
||||||
|
self._cached_projects: set[str] | None = None
|
||||||
|
self._projects_cache_ts: float = 0
|
||||||
|
self._PROJECTS_CACHE_TTL = 60.0 # seconds
|
||||||
for fname in self.local.list_json_files("pending"):
|
for fname in self.local.list_json_files("pending"):
|
||||||
rid = fname.replace(".json", "")
|
rid = fname.replace(".json", "")
|
||||||
self._startup_pending.add(rid)
|
self._startup_pending.add(rid)
|
||||||
@@ -58,9 +65,33 @@ class CollectorBridge:
|
|||||||
self._pending_hashes[rid] = hashlib.md5(
|
self._pending_hashes[rid] = hashlib.md5(
|
||||||
json.dumps(data, sort_keys=True).encode()
|
json.dumps(data, sort_keys=True).encode()
|
||||||
).hexdigest()
|
).hexdigest()
|
||||||
|
# Pre-cache mtime
|
||||||
|
try:
|
||||||
|
fpath = self.local.bridge_dir / "pending" / fname
|
||||||
|
self._pending_mtimes[rid] = fpath.stat().st_mtime
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
if self._startup_pending:
|
if self._startup_pending:
|
||||||
logger.info(f"[COLLECTOR] skipping {len(self._startup_pending)} existing pending files")
|
logger.info(f"[COLLECTOR] skipping {len(self._startup_pending)} existing pending files")
|
||||||
|
|
||||||
|
# Startup cleanup: remove stale response files (> 5 min)
|
||||||
|
self._cleanup_stale_responses()
|
||||||
|
|
||||||
|
def _cleanup_stale_responses(self, max_age: int = 300):
|
||||||
|
"""Remove stale response files (> max_age seconds) on startup."""
|
||||||
|
now = time.time()
|
||||||
|
cleaned = 0
|
||||||
|
for fname in self.local.list_json_files("response"):
|
||||||
|
try:
|
||||||
|
fpath = self.local.bridge_dir / "response" / fname
|
||||||
|
if now - fpath.stat().st_mtime > max_age:
|
||||||
|
self.local.delete_file("response", fname)
|
||||||
|
cleaned += 1
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
if cleaned:
|
||||||
|
logger.info(f"[COLLECTOR] startup cleanup: removed {cleaned} stale response files")
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
"""Start the Collector polling loops with staggered offsets.
|
"""Start the Collector polling loops with staggered offsets.
|
||||||
|
|
||||||
@@ -115,6 +146,17 @@ class CollectorBridge:
|
|||||||
rid = fname.replace(".json", "")
|
rid = fname.replace(".json", "")
|
||||||
current_files.add(rid)
|
current_files.add(rid)
|
||||||
|
|
||||||
|
# mtime pre-check: skip read+hash if file hasn't been modified
|
||||||
|
try:
|
||||||
|
fpath = self.local.bridge_dir / "pending" / fname
|
||||||
|
current_mtime = fpath.stat().st_mtime
|
||||||
|
except OSError:
|
||||||
|
continue
|
||||||
|
prev_mtime = self._pending_mtimes.get(rid)
|
||||||
|
if prev_mtime is not None and current_mtime == prev_mtime:
|
||||||
|
continue # File untouched since last check — skip read+hash
|
||||||
|
self._pending_mtimes[rid] = current_mtime
|
||||||
|
|
||||||
data = self.local.read_json("pending", fname)
|
data = self.local.read_json("pending", fname)
|
||||||
if data is None:
|
if data is None:
|
||||||
continue
|
continue
|
||||||
@@ -153,6 +195,12 @@ class CollectorBridge:
|
|||||||
if rid not in current_files and rid not in self._startup_pending:
|
if rid not in current_files and rid not in self._startup_pending:
|
||||||
self._forwarded_pending.discard(rid)
|
self._forwarded_pending.discard(rid)
|
||||||
self._pending_hashes.pop(rid, None)
|
self._pending_hashes.pop(rid, None)
|
||||||
|
self._pending_mtimes.pop(rid, None)
|
||||||
|
# Also clean up orphaned hashes/mtimes for files no longer on disk
|
||||||
|
for rid in list(self._pending_hashes):
|
||||||
|
if rid not in current_files:
|
||||||
|
self._pending_hashes.pop(rid, None)
|
||||||
|
self._pending_mtimes.pop(rid, None)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"[COLLECTOR] forward_pending error: {e}")
|
logger.error(f"[COLLECTOR] forward_pending error: {e}")
|
||||||
@@ -183,7 +231,10 @@ class CollectorBridge:
|
|||||||
for rid in expired:
|
for rid in expired:
|
||||||
self._forwarded_pending.discard(rid)
|
self._forwarded_pending.discard(rid)
|
||||||
self._forwarded_timestamps.pop(rid, None)
|
self._forwarded_timestamps.pop(rid, None)
|
||||||
self._pending_hashes.pop(rid, None)
|
# NOTE: intentionally keep _pending_hashes[rid] to prevent
|
||||||
|
# re-forward cycle (expired pending would be re-detected as
|
||||||
|
# "new" if hash is cleared). Hash is cleaned up when file
|
||||||
|
# is actually deleted from disk (see _forward_pending_loop).
|
||||||
if expired:
|
if expired:
|
||||||
logger.info(f"[COLLECTOR] expired {len(expired)} stale forwarded pending (>{self._RESPONSE_POLL_TTL}s)")
|
logger.info(f"[COLLECTOR] expired {len(expired)} stale forwarded pending (>{self._RESPONSE_POLL_TTL}s)")
|
||||||
|
|
||||||
@@ -223,10 +274,19 @@ class CollectorBridge:
|
|||||||
Reads bridge/register/*.json files, which are written by each AG window's
|
Reads bridge/register/*.json files, which are written by each AG window's
|
||||||
Extension with {conversation_id, project_name}. Returns unique project names
|
Extension with {conversation_id, project_name}. Returns unique project names
|
||||||
found, always including self.project_name as a fallback.
|
found, always including self.project_name as a fallback.
|
||||||
|
|
||||||
|
Results are cached for _PROJECTS_CACHE_TTL seconds to avoid re-reading
|
||||||
|
22+ register files every polling cycle.
|
||||||
"""
|
"""
|
||||||
|
now = time.time()
|
||||||
|
if self._cached_projects is not None and now - self._projects_cache_ts < self._PROJECTS_CACHE_TTL:
|
||||||
|
return self._cached_projects
|
||||||
|
|
||||||
projects = {self.project_name}
|
projects = {self.project_name}
|
||||||
register_dir = self.local.bridge_dir / "register"
|
register_dir = self.local.bridge_dir / "register"
|
||||||
if not register_dir.exists():
|
if not register_dir.exists():
|
||||||
|
self._cached_projects = projects
|
||||||
|
self._projects_cache_ts = now
|
||||||
return projects
|
return projects
|
||||||
for f in register_dir.glob("*.json"):
|
for f in register_dir.glob("*.json"):
|
||||||
try:
|
try:
|
||||||
@@ -236,6 +296,8 @@ class CollectorBridge:
|
|||||||
projects.add(p)
|
projects.add(p)
|
||||||
except (json.JSONDecodeError, OSError):
|
except (json.JSONDecodeError, OSError):
|
||||||
pass
|
pass
|
||||||
|
self._cached_projects = projects
|
||||||
|
self._projects_cache_ts = now
|
||||||
return projects
|
return projects
|
||||||
|
|
||||||
async def _poll_commands_loop(self):
|
async def _poll_commands_loop(self):
|
||||||
@@ -319,7 +381,7 @@ class CollectorBridge:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"[COLLECTOR] forward_chat_snapshots error: {e}")
|
logger.error(f"[COLLECTOR] forward_chat_snapshots error: {e}")
|
||||||
|
|
||||||
await asyncio.sleep(self._poll_interval)
|
await asyncio.sleep(10) # Chat snapshots: less urgent, 10s interval
|
||||||
|
|
||||||
# ─── Forward session registrations → Gateway ───
|
# ─── Forward session registrations → Gateway ───
|
||||||
|
|
||||||
@@ -341,12 +403,13 @@ class CollectorBridge:
|
|||||||
await self.remote.aregister_session(conv_id, project)
|
await self.remote.aregister_session(conv_id, project)
|
||||||
forwarded_regs.add(f.name)
|
forwarded_regs.add(f.name)
|
||||||
logger.info(f"[COLLECTOR] → Gateway: register {conv_id[:8]} → {project}")
|
logger.info(f"[COLLECTOR] → Gateway: register {conv_id[:8]} → {project}")
|
||||||
|
await asyncio.sleep(0.3) # Spread startup burst
|
||||||
except (json.JSONDecodeError, OSError) as e:
|
except (json.JSONDecodeError, OSError) as e:
|
||||||
logger.warning(f"[COLLECTOR] bad register {f.name}: {e}")
|
logger.warning(f"[COLLECTOR] bad register {f.name}: {e}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"[COLLECTOR] forward_registrations error: {e}")
|
logger.error(f"[COLLECTOR] forward_registrations error: {e}")
|
||||||
|
|
||||||
await asyncio.sleep(self._poll_interval * 3) # Less frequent
|
await asyncio.sleep(30) # Registration changes rarely — 30s interval
|
||||||
# ─── Forward brain events → Gateway ───
|
# ─── Forward brain events → Gateway ───
|
||||||
|
|
||||||
async def _forward_events_loop(self):
|
async def _forward_events_loop(self):
|
||||||
@@ -394,4 +457,4 @@ class CollectorBridge:
|
|||||||
await self.remote.flush_retry_queue()
|
await self.remote.flush_retry_queue()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"[COLLECTOR] retry flush error: {e}")
|
logger.error(f"[COLLECTOR] retry flush error: {e}")
|
||||||
await asyncio.sleep(10)
|
await asyncio.sleep(30) # Retry flush: 30s interval (was 10s)
|
||||||
|
|||||||
21
watcher.py
21
watcher.py
@@ -77,6 +77,27 @@ class BrainEventHandler(FileSystemEventHandler):
|
|||||||
f"pre-loaded {hash_count} content hashes"
|
f"pre-loaded {hash_count} content hashes"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def dispatch(self, event: FileSystemEvent):
|
||||||
|
"""Early filter: skip events for files/dirs we don't care about.
|
||||||
|
|
||||||
|
This runs BEFORE on_created/on_modified, avoiding unnecessary
|
||||||
|
method dispatch overhead for the majority of file events.
|
||||||
|
"""
|
||||||
|
path = Path(event.src_path)
|
||||||
|
|
||||||
|
# Skip .system_generated and logs subdirectories immediately
|
||||||
|
path_parts = path.parts
|
||||||
|
if '.system_generated' in path_parts or 'logs' in path_parts:
|
||||||
|
return
|
||||||
|
|
||||||
|
# For file events, skip non-watched files immediately
|
||||||
|
if not event.is_directory:
|
||||||
|
file_name = path.name
|
||||||
|
if not self._is_watched_file(file_name):
|
||||||
|
return
|
||||||
|
|
||||||
|
super().dispatch(event)
|
||||||
|
|
||||||
def _is_conversation_id(self, name: str) -> bool:
|
def _is_conversation_id(self, name: str) -> bool:
|
||||||
parts = name.split("-")
|
parts = name.split("-")
|
||||||
return len(parts) == 5 and all(len(p) >= 4 for p in parts)
|
return len(parts) == 5 and all(len(p) >= 4 for p in parts)
|
||||||
|
|||||||
Reference in New Issue
Block a user