fix(collector): MERGE + auto_resolved/expired 상태 변경 감지
- pending 파일 콘텐츠 해시 추적 (_pending_hashes) - 내용 변경 시 Gateway에 재전달 (MERGE: command 업데이트, status 변경) - _startup_pending으로 시작 시 기존 파일과 신규 파일 분리
This commit is contained in:
59
collector.py
59
collector.py
@@ -11,6 +11,7 @@ Flow:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import hashlib
|
||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
import logging
|
import logging
|
||||||
@@ -42,12 +43,21 @@ class CollectorBridge:
|
|||||||
self._running = False
|
self._running = False
|
||||||
|
|
||||||
# Pre-populate with existing pending files → skip on startup (prevents 만료됨 spam)
|
# Pre-populate with existing pending files → skip on startup (prevents 만료됨 spam)
|
||||||
|
self._startup_pending: set[str] = set()
|
||||||
self._forwarded_pending: set[str] = set()
|
self._forwarded_pending: set[str] = set()
|
||||||
|
self._pending_hashes: dict[str, str] = {} # rid → content hash (for MERGE/status detection)
|
||||||
for fname in self.local.list_json_files("pending"):
|
for fname in self.local.list_json_files("pending"):
|
||||||
rid = fname.replace(".json", "")
|
rid = fname.replace(".json", "")
|
||||||
|
self._startup_pending.add(rid)
|
||||||
self._forwarded_pending.add(rid)
|
self._forwarded_pending.add(rid)
|
||||||
if self._forwarded_pending:
|
# Pre-hash existing files
|
||||||
logger.info(f"[COLLECTOR] skipping {len(self._forwarded_pending)} existing pending files")
|
data = self.local.read_json("pending", fname)
|
||||||
|
if data:
|
||||||
|
self._pending_hashes[rid] = hashlib.md5(
|
||||||
|
json.dumps(data, sort_keys=True).encode()
|
||||||
|
).hexdigest()
|
||||||
|
if self._startup_pending:
|
||||||
|
logger.info(f"[COLLECTOR] skipping {len(self._startup_pending)} existing pending files")
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
"""Start the Collector polling loops."""
|
"""Start the Collector polling loops."""
|
||||||
@@ -72,26 +82,53 @@ class CollectorBridge:
|
|||||||
# ─── Forward local pending → Gateway ───
|
# ─── Forward local pending → Gateway ───
|
||||||
|
|
||||||
async def _forward_pending_loop(self):
|
async def _forward_pending_loop(self):
|
||||||
"""Scan local pending/ and forward new requests to Gateway."""
|
"""Scan local pending/ and forward new + updated requests to Gateway.
|
||||||
|
|
||||||
|
Tracks content hashes to detect:
|
||||||
|
- New pending files → forward immediately
|
||||||
|
- MERGE updates (step_probe updates command text) → re-forward
|
||||||
|
- Status changes (auto_resolved, expired) → re-forward
|
||||||
|
"""
|
||||||
while self._running:
|
while self._running:
|
||||||
try:
|
try:
|
||||||
|
current_files = set()
|
||||||
for fname in self.local.list_json_files("pending"):
|
for fname in self.local.list_json_files("pending"):
|
||||||
rid = fname.replace(".json", "")
|
rid = fname.replace(".json", "")
|
||||||
if rid in self._forwarded_pending:
|
current_files.add(rid)
|
||||||
continue
|
|
||||||
|
|
||||||
data = self.local.read_json("pending", fname)
|
data = self.local.read_json("pending", fname)
|
||||||
if data is None or data.get("status") != "pending":
|
if data is None:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Forward to Gateway
|
# Compute content hash to detect changes
|
||||||
|
content_hash = hashlib.md5(
|
||||||
|
json.dumps(data, sort_keys=True).encode()
|
||||||
|
).hexdigest()
|
||||||
|
|
||||||
|
prev_hash = self._pending_hashes.get(rid)
|
||||||
|
if prev_hash == content_hash:
|
||||||
|
continue # No change
|
||||||
|
|
||||||
|
is_new = rid not in self._forwarded_pending
|
||||||
|
if is_new and rid in self._startup_pending:
|
||||||
|
continue # Skip pre-existing files from before startup
|
||||||
|
|
||||||
|
# Forward to Gateway (new or updated)
|
||||||
self.remote.write_json("pending", fname, data)
|
self.remote.write_json("pending", fname, data)
|
||||||
self._forwarded_pending.add(rid)
|
self._forwarded_pending.add(rid)
|
||||||
logger.info(f"[COLLECTOR] → Gateway: pending {rid[:12]}")
|
self._pending_hashes[rid] = content_hash
|
||||||
|
|
||||||
# Clean up stale forwarded tracking (keep last 200)
|
if is_new:
|
||||||
if len(self._forwarded_pending) > 200:
|
logger.info(f"[COLLECTOR] → Gateway: pending {rid[:12]}")
|
||||||
self._forwarded_pending = set(list(self._forwarded_pending)[-100:])
|
else:
|
||||||
|
status = data.get("status", "?")
|
||||||
|
logger.info(f"[COLLECTOR] → Gateway: pending UPDATE {rid[:12]} status={status}")
|
||||||
|
|
||||||
|
# Clean up tracking for deleted files
|
||||||
|
for rid in list(self._forwarded_pending):
|
||||||
|
if rid not in current_files and rid not in self._startup_pending:
|
||||||
|
self._forwarded_pending.discard(rid)
|
||||||
|
self._pending_hashes.pop(rid, None)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"[COLLECTOR] forward_pending error: {e}")
|
logger.error(f"[COLLECTOR] forward_pending error: {e}")
|
||||||
|
|||||||
Reference in New Issue
Block a user