From d2a477e12e5181349ee86d87e102d29bdf3184a3 Mon Sep 17 00:00:00 2001 From: Variet Worker Date: Wed, 11 Mar 2026 22:48:47 +0900 Subject: [PATCH] =?UTF-8?q?fix(collector):=20MERGE=20+=20auto=5Fresolved/e?= =?UTF-8?q?xpired=20=EC=83=81=ED=83=9C=20=EB=B3=80=EA=B2=BD=20=EA=B0=90?= =?UTF-8?q?=EC=A7=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - pending 파일 콘텐츠 해시 추적 (_pending_hashes) - 내용 변경 시 Gateway에 재전달 (MERGE: command 업데이트, status 변경) - _startup_pending으로 시작 시 기존 파일과 신규 파일 분리 --- collector.py | 59 ++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 48 insertions(+), 11 deletions(-) diff --git a/collector.py b/collector.py index b9a0eca..9489d21 100644 --- a/collector.py +++ b/collector.py @@ -11,6 +11,7 @@ Flow: """ import asyncio +import hashlib import json import time import logging @@ -42,12 +43,21 @@ class CollectorBridge: self._running = False # Pre-populate with existing pending files → skip on startup (prevents 만료됨 spam) + self._startup_pending: set[str] = set() self._forwarded_pending: set[str] = set() + self._pending_hashes: dict[str, str] = {} # rid → content hash (for MERGE/status detection) for fname in self.local.list_json_files("pending"): rid = fname.replace(".json", "") + self._startup_pending.add(rid) self._forwarded_pending.add(rid) - if self._forwarded_pending: - logger.info(f"[COLLECTOR] skipping {len(self._forwarded_pending)} existing pending files") + # Pre-hash existing files + data = self.local.read_json("pending", fname) + if data: + self._pending_hashes[rid] = hashlib.md5( + json.dumps(data, sort_keys=True).encode() + ).hexdigest() + if self._startup_pending: + logger.info(f"[COLLECTOR] skipping {len(self._startup_pending)} existing pending files") async def start(self): """Start the Collector polling loops.""" @@ -72,26 +82,53 @@ class CollectorBridge: # ─── Forward local pending → Gateway ─── async def _forward_pending_loop(self): - """Scan local pending/ and forward new requests to Gateway.""" + """Scan local pending/ and forward new + updated requests to Gateway. + + Tracks content hashes to detect: + - New pending files → forward immediately + - MERGE updates (step_probe updates command text) → re-forward + - Status changes (auto_resolved, expired) → re-forward + """ while self._running: try: + current_files = set() for fname in self.local.list_json_files("pending"): rid = fname.replace(".json", "") - if rid in self._forwarded_pending: - continue + current_files.add(rid) data = self.local.read_json("pending", fname) - if data is None or data.get("status") != "pending": + if data is None: continue - # Forward to Gateway + # Compute content hash to detect changes + content_hash = hashlib.md5( + json.dumps(data, sort_keys=True).encode() + ).hexdigest() + + prev_hash = self._pending_hashes.get(rid) + if prev_hash == content_hash: + continue # No change + + is_new = rid not in self._forwarded_pending + if is_new and rid in self._startup_pending: + continue # Skip pre-existing files from before startup + + # Forward to Gateway (new or updated) self.remote.write_json("pending", fname, data) self._forwarded_pending.add(rid) - logger.info(f"[COLLECTOR] → Gateway: pending {rid[:12]}") + self._pending_hashes[rid] = content_hash - # Clean up stale forwarded tracking (keep last 200) - if len(self._forwarded_pending) > 200: - self._forwarded_pending = set(list(self._forwarded_pending)[-100:]) + if is_new: + logger.info(f"[COLLECTOR] → Gateway: pending {rid[:12]}") + else: + status = data.get("status", "?") + logger.info(f"[COLLECTOR] → Gateway: pending UPDATE {rid[:12]} status={status}") + + # Clean up tracking for deleted files + for rid in list(self._forwarded_pending): + if rid not in current_files and rid not in self._startup_pending: + self._forwarded_pending.discard(rid) + self._pending_hashes.pop(rid, None) except Exception as e: logger.error(f"[COLLECTOR] forward_pending error: {e}")