From 91927703008d3d97bd712367f77df77b77ed659a Mon Sep 17 00:00:00 2001 From: quantlab Date: Fri, 6 Mar 2026 17:15:54 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20Project=20Indexer=20+=20Context=20Manag?= =?UTF-8?q?er=20+=20GeminiCaller=20=EA=B5=AC=ED=98=84=20=EB=B0=8F=20?= =?UTF-8?q?=ED=85=8C=EC=8A=A4=ED=8A=B8=20#task-187=20#task-188=20#task-189?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/__init__.py | 1 + core/context_manager.py | 89 +++++++++++++++ core/gemini_caller.py | 87 +++++++++++++++ core/project_indexer.py | 234 ++++++++++++++++++++++++++++++++++++++++ tests/test_core.py | 86 +++++++++++++++ 5 files changed, 497 insertions(+) create mode 100644 core/__init__.py create mode 100644 core/context_manager.py create mode 100644 core/gemini_caller.py create mode 100644 core/project_indexer.py create mode 100644 tests/test_core.py diff --git a/core/__init__.py b/core/__init__.py new file mode 100644 index 0000000..d61a255 --- /dev/null +++ b/core/__init__.py @@ -0,0 +1 @@ +# Core package diff --git a/core/context_manager.py b/core/context_manager.py new file mode 100644 index 0000000..19f70db --- /dev/null +++ b/core/context_manager.py @@ -0,0 +1,89 @@ +"""Context Manager — 관련 파일 선별 + 토큰 예산 제어. + +Gemini CLI Context Rot 해결의 핵심. +태스크에 필요한 파일만 골라 토큰 예산 내에서 컨텍스트를 구성. +""" + +from pathlib import Path +from core.project_indexer import ProjectIndex + + +# 대략적 토큰 추정: 1 토큰 ≈ 4 bytes (영문), 2 bytes (한글) +BYTES_PER_TOKEN = 3 + + +class ContextManager: + """태스크별 컨텍스트를 생성합니다.""" + + def __init__(self, index: ProjectIndex, token_budget: int = 50_000): + self.index = index + self.token_budget = token_budget + + def gather(self, task: str, max_files: int = 15) -> str: + """태스크에 필요한 파일만 선별하여 컨텍스트 생성.""" + # 1. 태스크에서 관련 파일 찾기 + relevant = self.index.find_relevant(task) + + # 2. import 관계로 확장 + if relevant: + expanded = self.index.expand_dependencies(relevant[:5], depth=1) + else: + expanded = [] + + # 3. 관련 파일을 우선순위 순으로 정렬 (원래 순서 유지) + ordered = [] + seen = set() + for f in relevant + expanded: + if f not in seen: + ordered.append(f) + seen.add(f) + + # 4. 토큰 예산 내에서 파일 포함 + context_parts = [] + total_tokens = 0 + files_included = 0 + + # 프로젝트 구조 요약 항상 포함 + structure = self.index.get_structure_summary() + structure_tokens = len(structure.encode("utf-8")) // BYTES_PER_TOKEN + context_parts.append(f"=== PROJECT STRUCTURE ===\n{structure}") + total_tokens += structure_tokens + + for fpath in ordered[:max_files]: + info = self.index.files.get(fpath) + if not info: + continue + + file_tokens = info.size // BYTES_PER_TOKEN + if total_tokens + file_tokens > self.token_budget: + context_parts.append( + f"\n=== SKIPPED: {fpath} ({info.line_count}L, budget exceeded) ===" + ) + continue + + try: + abs_path = self.index.project_path / fpath + content = abs_path.read_text(encoding="utf-8", errors="ignore") + except Exception: + continue + + context_parts.append( + f"\n=== FILE: {fpath} ({info.line_count}L) ===\n{content}" + ) + total_tokens += file_tokens + files_included += 1 + + context_parts.append( + f"\n=== CONTEXT SUMMARY: {files_included} files, ~{total_tokens} tokens ===" + ) + + return "\n".join(context_parts) + + def gather_for_review(self, original: str, modified: str, task: str) -> str: + """리뷰용 컨텍스트: 원본 + 수정본 + 관련 타입.""" + parts = [ + f"=== TASK: {task} ===", + f"\n=== ORIGINAL ===\n{original}", + f"\n=== MODIFIED ===\n{modified}", + ] + return "\n".join(parts) diff --git a/core/gemini_caller.py b/core/gemini_caller.py new file mode 100644 index 0000000..cede404 --- /dev/null +++ b/core/gemini_caller.py @@ -0,0 +1,87 @@ +"""GeminiCaller — gemini -p 역할별 headless 호출. + +cmd /c 래핑으로 PowerShell 실행 정책 우회. +""" + +import asyncio +import json +import time +from pathlib import Path + + +ROLE_PROMPTS_DIR = Path(__file__).parent.parent / "prompts" + + +class GeminiCaller: + """Gemini CLI headless 호출을 관리합니다.""" + + def __init__(self, project_path: str = None): + self.project_path = project_path + self.call_count = 0 + self.last_call_time = 0.0 + + async def call(self, role: str, context: str, timeout: int = 120) -> str: + """역할별 프롬프트로 gemini -p 호출. + + Args: + role: 프롬프트 파일명 (planner, coder, reviewer, tester) + context: 전달할 컨텍스트 + timeout: 최대 대기 시간 (초) + """ + # 시스템 프롬프트 로드 + prompt_file = ROLE_PROMPTS_DIR / f"{role}.md" + if prompt_file.exists(): + system_prompt = prompt_file.read_text(encoding="utf-8") + else: + system_prompt = f"You are a {role}. Respond in Korean." + + # cmd 구성 + cmd_parts = ["gemini", "-p", context] + + if system_prompt: + cmd_parts.extend(["--system", system_prompt]) + + cmd_parts.extend(["--approval-mode", "yolo"]) + + if self.project_path: + cmd_parts.extend(["--include-directories", self.project_path]) + + # cmd /c 래핑 (PowerShell 실행 정책 우회) + escaped_context = context.replace('"', '\\"') + cmd_str = f'gemini -p "{escaped_context}" --approval-mode yolo' + if self.project_path: + cmd_str += f' --include-directories "{self.project_path}"' + + try: + proc = await asyncio.create_subprocess_shell( + f'cmd /c {cmd_str}', + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await asyncio.wait_for( + proc.communicate(), timeout=timeout + ) + + self.call_count += 1 + self.last_call_time = time.time() + + output = stdout.decode("utf-8", errors="replace").strip() + + # YOLO 모드 메시지 제거 + lines = output.splitlines() + cleaned = [] + for line in lines: + if "YOLO mode" in line or "Loaded cached" in line: + continue + cleaned.append(line) + + return "\n".join(cleaned).strip() + + except asyncio.TimeoutError: + return f"[ERROR] Gemini CLI timeout after {timeout}s" + except Exception as e: + return f"[ERROR] Gemini CLI call failed: {e}" + + async def call_simple(self, prompt: str, timeout: int = 60) -> str: + """시스템 프롬프트 없이 단순 호출.""" + return await self.call("default", prompt, timeout) diff --git a/core/project_indexer.py b/core/project_indexer.py new file mode 100644 index 0000000..45650b5 --- /dev/null +++ b/core/project_indexer.py @@ -0,0 +1,234 @@ +"""Project Indexer — 프로젝트 구조 분석/캐시. + +파일 구조, import 관계, 함수 시그니처를 파악하여 +Context Manager가 관련 파일을 선별할 수 있게 합니다. +""" + +import os +import re +import json +from pathlib import Path +from dataclasses import dataclass, field, asdict + + +@dataclass +class FileInfo: + path: str + language: str + size: int + line_count: int + imports: list[str] = field(default_factory=list) + functions: list[str] = field(default_factory=list) + classes: list[str] = field(default_factory=list) + + +LANGUAGE_MAP = { + ".py": "python", ".js": "javascript", ".ts": "typescript", + ".java": "java", ".cs": "csharp", ".cpp": "cpp", ".c": "c", + ".go": "go", ".rs": "rust", ".rb": "ruby", ".php": "php", + ".html": "html", ".css": "css", ".md": "markdown", + ".yaml": "yaml", ".yml": "yaml", ".json": "json", + ".sql": "sql", ".sh": "bash", ".ps1": "powershell", +} + +IGNORE_DIRS = { + ".git", ".agent", "node_modules", "__pycache__", ".venv", + "venv", ".tox", ".mypy_cache", ".pytest_cache", "dist", + "build", ".next", ".nuxt", "sessions", +} + +IGNORE_EXTENSIONS = { + ".pyc", ".pyo", ".exe", ".dll", ".so", ".o", + ".jpg", ".png", ".gif", ".ico", ".svg", + ".zip", ".tar", ".gz", ".rar", + ".db", ".sqlite", ".lock", +} + + +class ProjectIndex: + """프로젝트 구조를 분석하고 캐시합니다.""" + + def __init__(self, project_path: str, max_file_size: int = 100_000): + self.project_path = Path(project_path).resolve() + self.max_file_size = max_file_size + self.files: dict[str, FileInfo] = {} + self.import_graph: dict[str, list[str]] = {} # file → [imported files] + + def scan(self) -> "ProjectIndex": + """프로젝트 전체 스캔.""" + self.files.clear() + self.import_graph.clear() + + for root, dirs, files in os.walk(self.project_path): + # 무시 디렉토리 필터 + dirs[:] = [d for d in dirs if d not in IGNORE_DIRS] + + for fname in files: + fpath = Path(root) / fname + ext = fpath.suffix.lower() + + if ext in IGNORE_EXTENSIONS: + continue + if fpath.stat().st_size > self.max_file_size: + continue + + rel_path = str(fpath.relative_to(self.project_path)).replace("\\", "/") + lang = LANGUAGE_MAP.get(ext, "") + + try: + content = fpath.read_text(encoding="utf-8", errors="ignore") + lines = content.splitlines() + except Exception: + continue + + info = FileInfo( + path=rel_path, + language=lang, + size=fpath.stat().st_size, + line_count=len(lines), + ) + + # 언어별 파싱 + if lang == "python": + info.imports = self._parse_python_imports(content) + info.functions = self._parse_python_functions(content) + info.classes = self._parse_python_classes(content) + + self.files[rel_path] = info + + # import 그래프 구축 + self._build_import_graph() + return self + + def _parse_python_imports(self, content: str) -> list[str]: + imports = [] + for line in content.splitlines(): + line = line.strip() + if line.startswith("import "): + imports.append(line.split()[1].split(".")[0]) + elif line.startswith("from "): + match = re.match(r"from\s+([\w.]+)\s+import", line) + if match: + imports.append(match.group(1)) + return imports + + def _parse_python_functions(self, content: str) -> list[str]: + return re.findall(r"^(?:async\s+)?def\s+(\w+)\s*\(", content, re.MULTILINE) + + def _parse_python_classes(self, content: str) -> list[str]: + return re.findall(r"^class\s+(\w+)\s*[:\(]", content, re.MULTILINE) + + def _build_import_graph(self): + """import 문에서 프로젝트 내 파일을 찾아 그래프 구축.""" + file_modules = {} + for rel_path in self.files: + if rel_path.endswith(".py"): + module = rel_path.replace("/", ".").removesuffix(".py") + file_modules[module] = rel_path + # 마지막 부분만으로도 매칭 + parts = module.split(".") + if len(parts) > 1: + file_modules[parts[-1]] = rel_path + + for rel_path, info in self.files.items(): + deps = [] + for imp in info.imports: + if imp in file_modules: + deps.append(file_modules[imp]) + # 점 표기법 부분 매칭 + for mod, fpath in file_modules.items(): + if imp.startswith(mod) or mod.startswith(imp): + if fpath not in deps and fpath != rel_path: + deps.append(fpath) + self.import_graph[rel_path] = deps + + def find_relevant(self, query: str) -> list[str]: + """쿼리에서 언급된 파일/함수/클래스 기반으로 관련 파일 검색.""" + query_lower = query.lower() + scored = [] + + for rel_path, info in self.files.items(): + score = 0 + # 파일명 매칭 + basename = Path(rel_path).stem.lower() + if basename in query_lower: + score += 10 + # 경로 부분 매칭 + for part in rel_path.lower().split("/"): + if part.rstrip(".py") in query_lower: + score += 5 + # 함수/클래스 매칭 + for func in info.functions: + if func.lower() in query_lower: + score += 8 + for cls in info.classes: + if cls.lower() in query_lower: + score += 8 + if score > 0: + scored.append((rel_path, score)) + + scored.sort(key=lambda x: -x[1]) + return [path for path, _ in scored] + + def expand_dependencies(self, files: list[str], depth: int = 2) -> list[str]: + """import 관계로 관련 파일 확장.""" + result = set(files) + frontier = set(files) + + for _ in range(depth): + next_frontier = set() + for f in frontier: + deps = self.import_graph.get(f, []) + for dep in deps: + if dep not in result: + result.add(dep) + next_frontier.add(dep) + # 역방향: 이 파일을 import하는 파일 + for other, other_deps in self.import_graph.items(): + if f in other_deps and other not in result: + result.add(other) + next_frontier.add(other) + frontier = next_frontier + + return list(result) + + def get_structure_summary(self) -> str: + """프로젝트 구조 요약 (Gemini에 전달용).""" + lines = [f"# Project: {self.project_path.name}", ""] + dirs: dict[str, list[str]] = {} + for rel_path in sorted(self.files.keys()): + d = str(Path(rel_path).parent) + if d == ".": + d = "(root)" + dirs.setdefault(d, []).append(Path(rel_path).name) + + for d, files in sorted(dirs.items()): + lines.append(f"## {d}/") + for f in files: + info = self.files.get(f"{d}/{f}" if d != "(root)" else f) + if info: + funcs = ", ".join(info.functions[:5]) + extra = f" — {funcs}" if funcs else "" + lines.append(f" - {f} ({info.line_count}L){extra}") + else: + lines.append(f" - {f}") + lines.append("") + + return "\n".join(lines) + + def save_cache(self, cache_path: str): + data = { + "project_path": str(self.project_path), + "files": {k: asdict(v) for k, v in self.files.items()}, + "import_graph": self.import_graph, + } + Path(cache_path).write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") + + def load_cache(self, cache_path: str) -> bool: + p = Path(cache_path) + if not p.exists(): + return False + data = json.loads(p.read_text(encoding="utf-8")) + self.files = {k: FileInfo(**v) for k, v in data["files"].items()} + self.import_graph = data["import_graph"] + return True diff --git a/tests/test_core.py b/tests/test_core.py new file mode 100644 index 0000000..78628b0 --- /dev/null +++ b/tests/test_core.py @@ -0,0 +1,86 @@ +"""Integration test: Project Indexer + Context Manager. + +Tests against the variet-agent project itself. +""" + +import sys +import io +if sys.stdout.encoding != "utf-8": + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace") +sys.path.insert(0, r"C:\Users\CafeVariet-GL552VW\Desktop\source_diff\variet-agent") + +from core.project_indexer import ProjectIndex +from core.context_manager import ContextManager + +PROJECT = r"C:\Users\CafeVariet-GL552VW\Desktop\source_diff\variet-agent" + + +def test_indexer(): + print("=" * 60) + print("TEST 1: Project Indexer Scan") + print("=" * 60) + + idx = ProjectIndex(PROJECT) + idx.scan() + + print(f" Files found: {len(idx.files)}") + for fpath, info in sorted(idx.files.items()): + funcs = ", ".join(info.functions[:3]) if info.functions else "" + print(f" {fpath} ({info.line_count}L) [{info.language}] {funcs}") + + print(f"\n Import graph entries: {len(idx.import_graph)}") + for fpath, deps in idx.import_graph.items(): + if deps: + print(f" {fpath} → {deps}") + + return idx + + +def test_find_relevant(idx): + print("\n" + "=" * 60) + print("TEST 2: Find Relevant Files") + print("=" * 60) + + queries = [ + "context_manager", + "gemini caller", + "project indexer scan", + "vikunja helper", + ] + + for q in queries: + results = idx.find_relevant(q) + print(f" Query: '{q}' → {results[:5]}") + + +def test_context_manager(idx): + print("\n" + "=" * 60) + print("TEST 3: Context Manager Gather") + print("=" * 60) + + cm = ContextManager(idx, token_budget=10_000) + context = cm.gather("context_manager gather files") + + print(f" Context length: {len(context)} chars") + print(f" First 500 chars:") + print(context[:500]) + print("...") + print(f" Last 200 chars:") + print(context[-200:]) + + +def test_structure_summary(idx): + print("\n" + "=" * 60) + print("TEST 4: Structure Summary") + print("=" * 60) + + summary = idx.get_structure_summary() + print(summary) + + +if __name__ == "__main__": + idx = test_indexer() + test_find_relevant(idx) + test_context_manager(idx) + test_structure_summary(idx) + print("\n✅ All tests passed!")