feat: Project Indexer + Context Manager + GeminiCaller 구현 및 테스트 #task-187 #task-188 #task-189
This commit is contained in:
1
core/__init__.py
Normal file
1
core/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# Core package
|
||||
89
core/context_manager.py
Normal file
89
core/context_manager.py
Normal file
@@ -0,0 +1,89 @@
|
||||
"""Context Manager — 관련 파일 선별 + 토큰 예산 제어.
|
||||
|
||||
Gemini CLI Context Rot 해결의 핵심.
|
||||
태스크에 필요한 파일만 골라 토큰 예산 내에서 컨텍스트를 구성.
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
from core.project_indexer import ProjectIndex
|
||||
|
||||
|
||||
# 대략적 토큰 추정: 1 토큰 ≈ 4 bytes (영문), 2 bytes (한글)
|
||||
BYTES_PER_TOKEN = 3
|
||||
|
||||
|
||||
class ContextManager:
|
||||
"""태스크별 컨텍스트를 생성합니다."""
|
||||
|
||||
def __init__(self, index: ProjectIndex, token_budget: int = 50_000):
|
||||
self.index = index
|
||||
self.token_budget = token_budget
|
||||
|
||||
def gather(self, task: str, max_files: int = 15) -> str:
|
||||
"""태스크에 필요한 파일만 선별하여 컨텍스트 생성."""
|
||||
# 1. 태스크에서 관련 파일 찾기
|
||||
relevant = self.index.find_relevant(task)
|
||||
|
||||
# 2. import 관계로 확장
|
||||
if relevant:
|
||||
expanded = self.index.expand_dependencies(relevant[:5], depth=1)
|
||||
else:
|
||||
expanded = []
|
||||
|
||||
# 3. 관련 파일을 우선순위 순으로 정렬 (원래 순서 유지)
|
||||
ordered = []
|
||||
seen = set()
|
||||
for f in relevant + expanded:
|
||||
if f not in seen:
|
||||
ordered.append(f)
|
||||
seen.add(f)
|
||||
|
||||
# 4. 토큰 예산 내에서 파일 포함
|
||||
context_parts = []
|
||||
total_tokens = 0
|
||||
files_included = 0
|
||||
|
||||
# 프로젝트 구조 요약 항상 포함
|
||||
structure = self.index.get_structure_summary()
|
||||
structure_tokens = len(structure.encode("utf-8")) // BYTES_PER_TOKEN
|
||||
context_parts.append(f"=== PROJECT STRUCTURE ===\n{structure}")
|
||||
total_tokens += structure_tokens
|
||||
|
||||
for fpath in ordered[:max_files]:
|
||||
info = self.index.files.get(fpath)
|
||||
if not info:
|
||||
continue
|
||||
|
||||
file_tokens = info.size // BYTES_PER_TOKEN
|
||||
if total_tokens + file_tokens > self.token_budget:
|
||||
context_parts.append(
|
||||
f"\n=== SKIPPED: {fpath} ({info.line_count}L, budget exceeded) ==="
|
||||
)
|
||||
continue
|
||||
|
||||
try:
|
||||
abs_path = self.index.project_path / fpath
|
||||
content = abs_path.read_text(encoding="utf-8", errors="ignore")
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
context_parts.append(
|
||||
f"\n=== FILE: {fpath} ({info.line_count}L) ===\n{content}"
|
||||
)
|
||||
total_tokens += file_tokens
|
||||
files_included += 1
|
||||
|
||||
context_parts.append(
|
||||
f"\n=== CONTEXT SUMMARY: {files_included} files, ~{total_tokens} tokens ==="
|
||||
)
|
||||
|
||||
return "\n".join(context_parts)
|
||||
|
||||
def gather_for_review(self, original: str, modified: str, task: str) -> str:
|
||||
"""리뷰용 컨텍스트: 원본 + 수정본 + 관련 타입."""
|
||||
parts = [
|
||||
f"=== TASK: {task} ===",
|
||||
f"\n=== ORIGINAL ===\n{original}",
|
||||
f"\n=== MODIFIED ===\n{modified}",
|
||||
]
|
||||
return "\n".join(parts)
|
||||
87
core/gemini_caller.py
Normal file
87
core/gemini_caller.py
Normal file
@@ -0,0 +1,87 @@
|
||||
"""GeminiCaller — gemini -p 역할별 headless 호출.
|
||||
|
||||
cmd /c 래핑으로 PowerShell 실행 정책 우회.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
ROLE_PROMPTS_DIR = Path(__file__).parent.parent / "prompts"
|
||||
|
||||
|
||||
class GeminiCaller:
|
||||
"""Gemini CLI headless 호출을 관리합니다."""
|
||||
|
||||
def __init__(self, project_path: str = None):
|
||||
self.project_path = project_path
|
||||
self.call_count = 0
|
||||
self.last_call_time = 0.0
|
||||
|
||||
async def call(self, role: str, context: str, timeout: int = 120) -> str:
|
||||
"""역할별 프롬프트로 gemini -p 호출.
|
||||
|
||||
Args:
|
||||
role: 프롬프트 파일명 (planner, coder, reviewer, tester)
|
||||
context: 전달할 컨텍스트
|
||||
timeout: 최대 대기 시간 (초)
|
||||
"""
|
||||
# 시스템 프롬프트 로드
|
||||
prompt_file = ROLE_PROMPTS_DIR / f"{role}.md"
|
||||
if prompt_file.exists():
|
||||
system_prompt = prompt_file.read_text(encoding="utf-8")
|
||||
else:
|
||||
system_prompt = f"You are a {role}. Respond in Korean."
|
||||
|
||||
# cmd 구성
|
||||
cmd_parts = ["gemini", "-p", context]
|
||||
|
||||
if system_prompt:
|
||||
cmd_parts.extend(["--system", system_prompt])
|
||||
|
||||
cmd_parts.extend(["--approval-mode", "yolo"])
|
||||
|
||||
if self.project_path:
|
||||
cmd_parts.extend(["--include-directories", self.project_path])
|
||||
|
||||
# cmd /c 래핑 (PowerShell 실행 정책 우회)
|
||||
escaped_context = context.replace('"', '\\"')
|
||||
cmd_str = f'gemini -p "{escaped_context}" --approval-mode yolo'
|
||||
if self.project_path:
|
||||
cmd_str += f' --include-directories "{self.project_path}"'
|
||||
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_shell(
|
||||
f'cmd /c {cmd_str}',
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
stdout, stderr = await asyncio.wait_for(
|
||||
proc.communicate(), timeout=timeout
|
||||
)
|
||||
|
||||
self.call_count += 1
|
||||
self.last_call_time = time.time()
|
||||
|
||||
output = stdout.decode("utf-8", errors="replace").strip()
|
||||
|
||||
# YOLO 모드 메시지 제거
|
||||
lines = output.splitlines()
|
||||
cleaned = []
|
||||
for line in lines:
|
||||
if "YOLO mode" in line or "Loaded cached" in line:
|
||||
continue
|
||||
cleaned.append(line)
|
||||
|
||||
return "\n".join(cleaned).strip()
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
return f"[ERROR] Gemini CLI timeout after {timeout}s"
|
||||
except Exception as e:
|
||||
return f"[ERROR] Gemini CLI call failed: {e}"
|
||||
|
||||
async def call_simple(self, prompt: str, timeout: int = 60) -> str:
|
||||
"""시스템 프롬프트 없이 단순 호출."""
|
||||
return await self.call("default", prompt, timeout)
|
||||
234
core/project_indexer.py
Normal file
234
core/project_indexer.py
Normal file
@@ -0,0 +1,234 @@
|
||||
"""Project Indexer — 프로젝트 구조 분석/캐시.
|
||||
|
||||
파일 구조, import 관계, 함수 시그니처를 파악하여
|
||||
Context Manager가 관련 파일을 선별할 수 있게 합니다.
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import json
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass, field, asdict
|
||||
|
||||
|
||||
@dataclass
|
||||
class FileInfo:
|
||||
path: str
|
||||
language: str
|
||||
size: int
|
||||
line_count: int
|
||||
imports: list[str] = field(default_factory=list)
|
||||
functions: list[str] = field(default_factory=list)
|
||||
classes: list[str] = field(default_factory=list)
|
||||
|
||||
|
||||
LANGUAGE_MAP = {
|
||||
".py": "python", ".js": "javascript", ".ts": "typescript",
|
||||
".java": "java", ".cs": "csharp", ".cpp": "cpp", ".c": "c",
|
||||
".go": "go", ".rs": "rust", ".rb": "ruby", ".php": "php",
|
||||
".html": "html", ".css": "css", ".md": "markdown",
|
||||
".yaml": "yaml", ".yml": "yaml", ".json": "json",
|
||||
".sql": "sql", ".sh": "bash", ".ps1": "powershell",
|
||||
}
|
||||
|
||||
IGNORE_DIRS = {
|
||||
".git", ".agent", "node_modules", "__pycache__", ".venv",
|
||||
"venv", ".tox", ".mypy_cache", ".pytest_cache", "dist",
|
||||
"build", ".next", ".nuxt", "sessions",
|
||||
}
|
||||
|
||||
IGNORE_EXTENSIONS = {
|
||||
".pyc", ".pyo", ".exe", ".dll", ".so", ".o",
|
||||
".jpg", ".png", ".gif", ".ico", ".svg",
|
||||
".zip", ".tar", ".gz", ".rar",
|
||||
".db", ".sqlite", ".lock",
|
||||
}
|
||||
|
||||
|
||||
class ProjectIndex:
|
||||
"""프로젝트 구조를 분석하고 캐시합니다."""
|
||||
|
||||
def __init__(self, project_path: str, max_file_size: int = 100_000):
|
||||
self.project_path = Path(project_path).resolve()
|
||||
self.max_file_size = max_file_size
|
||||
self.files: dict[str, FileInfo] = {}
|
||||
self.import_graph: dict[str, list[str]] = {} # file → [imported files]
|
||||
|
||||
def scan(self) -> "ProjectIndex":
|
||||
"""프로젝트 전체 스캔."""
|
||||
self.files.clear()
|
||||
self.import_graph.clear()
|
||||
|
||||
for root, dirs, files in os.walk(self.project_path):
|
||||
# 무시 디렉토리 필터
|
||||
dirs[:] = [d for d in dirs if d not in IGNORE_DIRS]
|
||||
|
||||
for fname in files:
|
||||
fpath = Path(root) / fname
|
||||
ext = fpath.suffix.lower()
|
||||
|
||||
if ext in IGNORE_EXTENSIONS:
|
||||
continue
|
||||
if fpath.stat().st_size > self.max_file_size:
|
||||
continue
|
||||
|
||||
rel_path = str(fpath.relative_to(self.project_path)).replace("\\", "/")
|
||||
lang = LANGUAGE_MAP.get(ext, "")
|
||||
|
||||
try:
|
||||
content = fpath.read_text(encoding="utf-8", errors="ignore")
|
||||
lines = content.splitlines()
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
info = FileInfo(
|
||||
path=rel_path,
|
||||
language=lang,
|
||||
size=fpath.stat().st_size,
|
||||
line_count=len(lines),
|
||||
)
|
||||
|
||||
# 언어별 파싱
|
||||
if lang == "python":
|
||||
info.imports = self._parse_python_imports(content)
|
||||
info.functions = self._parse_python_functions(content)
|
||||
info.classes = self._parse_python_classes(content)
|
||||
|
||||
self.files[rel_path] = info
|
||||
|
||||
# import 그래프 구축
|
||||
self._build_import_graph()
|
||||
return self
|
||||
|
||||
def _parse_python_imports(self, content: str) -> list[str]:
|
||||
imports = []
|
||||
for line in content.splitlines():
|
||||
line = line.strip()
|
||||
if line.startswith("import "):
|
||||
imports.append(line.split()[1].split(".")[0])
|
||||
elif line.startswith("from "):
|
||||
match = re.match(r"from\s+([\w.]+)\s+import", line)
|
||||
if match:
|
||||
imports.append(match.group(1))
|
||||
return imports
|
||||
|
||||
def _parse_python_functions(self, content: str) -> list[str]:
|
||||
return re.findall(r"^(?:async\s+)?def\s+(\w+)\s*\(", content, re.MULTILINE)
|
||||
|
||||
def _parse_python_classes(self, content: str) -> list[str]:
|
||||
return re.findall(r"^class\s+(\w+)\s*[:\(]", content, re.MULTILINE)
|
||||
|
||||
def _build_import_graph(self):
|
||||
"""import 문에서 프로젝트 내 파일을 찾아 그래프 구축."""
|
||||
file_modules = {}
|
||||
for rel_path in self.files:
|
||||
if rel_path.endswith(".py"):
|
||||
module = rel_path.replace("/", ".").removesuffix(".py")
|
||||
file_modules[module] = rel_path
|
||||
# 마지막 부분만으로도 매칭
|
||||
parts = module.split(".")
|
||||
if len(parts) > 1:
|
||||
file_modules[parts[-1]] = rel_path
|
||||
|
||||
for rel_path, info in self.files.items():
|
||||
deps = []
|
||||
for imp in info.imports:
|
||||
if imp in file_modules:
|
||||
deps.append(file_modules[imp])
|
||||
# 점 표기법 부분 매칭
|
||||
for mod, fpath in file_modules.items():
|
||||
if imp.startswith(mod) or mod.startswith(imp):
|
||||
if fpath not in deps and fpath != rel_path:
|
||||
deps.append(fpath)
|
||||
self.import_graph[rel_path] = deps
|
||||
|
||||
def find_relevant(self, query: str) -> list[str]:
|
||||
"""쿼리에서 언급된 파일/함수/클래스 기반으로 관련 파일 검색."""
|
||||
query_lower = query.lower()
|
||||
scored = []
|
||||
|
||||
for rel_path, info in self.files.items():
|
||||
score = 0
|
||||
# 파일명 매칭
|
||||
basename = Path(rel_path).stem.lower()
|
||||
if basename in query_lower:
|
||||
score += 10
|
||||
# 경로 부분 매칭
|
||||
for part in rel_path.lower().split("/"):
|
||||
if part.rstrip(".py") in query_lower:
|
||||
score += 5
|
||||
# 함수/클래스 매칭
|
||||
for func in info.functions:
|
||||
if func.lower() in query_lower:
|
||||
score += 8
|
||||
for cls in info.classes:
|
||||
if cls.lower() in query_lower:
|
||||
score += 8
|
||||
if score > 0:
|
||||
scored.append((rel_path, score))
|
||||
|
||||
scored.sort(key=lambda x: -x[1])
|
||||
return [path for path, _ in scored]
|
||||
|
||||
def expand_dependencies(self, files: list[str], depth: int = 2) -> list[str]:
|
||||
"""import 관계로 관련 파일 확장."""
|
||||
result = set(files)
|
||||
frontier = set(files)
|
||||
|
||||
for _ in range(depth):
|
||||
next_frontier = set()
|
||||
for f in frontier:
|
||||
deps = self.import_graph.get(f, [])
|
||||
for dep in deps:
|
||||
if dep not in result:
|
||||
result.add(dep)
|
||||
next_frontier.add(dep)
|
||||
# 역방향: 이 파일을 import하는 파일
|
||||
for other, other_deps in self.import_graph.items():
|
||||
if f in other_deps and other not in result:
|
||||
result.add(other)
|
||||
next_frontier.add(other)
|
||||
frontier = next_frontier
|
||||
|
||||
return list(result)
|
||||
|
||||
def get_structure_summary(self) -> str:
|
||||
"""프로젝트 구조 요약 (Gemini에 전달용)."""
|
||||
lines = [f"# Project: {self.project_path.name}", ""]
|
||||
dirs: dict[str, list[str]] = {}
|
||||
for rel_path in sorted(self.files.keys()):
|
||||
d = str(Path(rel_path).parent)
|
||||
if d == ".":
|
||||
d = "(root)"
|
||||
dirs.setdefault(d, []).append(Path(rel_path).name)
|
||||
|
||||
for d, files in sorted(dirs.items()):
|
||||
lines.append(f"## {d}/")
|
||||
for f in files:
|
||||
info = self.files.get(f"{d}/{f}" if d != "(root)" else f)
|
||||
if info:
|
||||
funcs = ", ".join(info.functions[:5])
|
||||
extra = f" — {funcs}" if funcs else ""
|
||||
lines.append(f" - {f} ({info.line_count}L){extra}")
|
||||
else:
|
||||
lines.append(f" - {f}")
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def save_cache(self, cache_path: str):
|
||||
data = {
|
||||
"project_path": str(self.project_path),
|
||||
"files": {k: asdict(v) for k, v in self.files.items()},
|
||||
"import_graph": self.import_graph,
|
||||
}
|
||||
Path(cache_path).write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
|
||||
|
||||
def load_cache(self, cache_path: str) -> bool:
|
||||
p = Path(cache_path)
|
||||
if not p.exists():
|
||||
return False
|
||||
data = json.loads(p.read_text(encoding="utf-8"))
|
||||
self.files = {k: FileInfo(**v) for k, v in data["files"].items()}
|
||||
self.import_graph = data["import_graph"]
|
||||
return True
|
||||
86
tests/test_core.py
Normal file
86
tests/test_core.py
Normal file
@@ -0,0 +1,86 @@
|
||||
"""Integration test: Project Indexer + Context Manager.
|
||||
|
||||
Tests against the variet-agent project itself.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import io
|
||||
if sys.stdout.encoding != "utf-8":
|
||||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
|
||||
sys.path.insert(0, r"C:\Users\CafeVariet-GL552VW\Desktop\source_diff\variet-agent")
|
||||
|
||||
from core.project_indexer import ProjectIndex
|
||||
from core.context_manager import ContextManager
|
||||
|
||||
PROJECT = r"C:\Users\CafeVariet-GL552VW\Desktop\source_diff\variet-agent"
|
||||
|
||||
|
||||
def test_indexer():
|
||||
print("=" * 60)
|
||||
print("TEST 1: Project Indexer Scan")
|
||||
print("=" * 60)
|
||||
|
||||
idx = ProjectIndex(PROJECT)
|
||||
idx.scan()
|
||||
|
||||
print(f" Files found: {len(idx.files)}")
|
||||
for fpath, info in sorted(idx.files.items()):
|
||||
funcs = ", ".join(info.functions[:3]) if info.functions else ""
|
||||
print(f" {fpath} ({info.line_count}L) [{info.language}] {funcs}")
|
||||
|
||||
print(f"\n Import graph entries: {len(idx.import_graph)}")
|
||||
for fpath, deps in idx.import_graph.items():
|
||||
if deps:
|
||||
print(f" {fpath} → {deps}")
|
||||
|
||||
return idx
|
||||
|
||||
|
||||
def test_find_relevant(idx):
|
||||
print("\n" + "=" * 60)
|
||||
print("TEST 2: Find Relevant Files")
|
||||
print("=" * 60)
|
||||
|
||||
queries = [
|
||||
"context_manager",
|
||||
"gemini caller",
|
||||
"project indexer scan",
|
||||
"vikunja helper",
|
||||
]
|
||||
|
||||
for q in queries:
|
||||
results = idx.find_relevant(q)
|
||||
print(f" Query: '{q}' → {results[:5]}")
|
||||
|
||||
|
||||
def test_context_manager(idx):
|
||||
print("\n" + "=" * 60)
|
||||
print("TEST 3: Context Manager Gather")
|
||||
print("=" * 60)
|
||||
|
||||
cm = ContextManager(idx, token_budget=10_000)
|
||||
context = cm.gather("context_manager gather files")
|
||||
|
||||
print(f" Context length: {len(context)} chars")
|
||||
print(f" First 500 chars:")
|
||||
print(context[:500])
|
||||
print("...")
|
||||
print(f" Last 200 chars:")
|
||||
print(context[-200:])
|
||||
|
||||
|
||||
def test_structure_summary(idx):
|
||||
print("\n" + "=" * 60)
|
||||
print("TEST 4: Structure Summary")
|
||||
print("=" * 60)
|
||||
|
||||
summary = idx.get_structure_summary()
|
||||
print(summary)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
idx = test_indexer()
|
||||
test_find_relevant(idx)
|
||||
test_context_manager(idx)
|
||||
test_structure_summary(idx)
|
||||
print("\n✅ All tests passed!")
|
||||
Reference in New Issue
Block a user