235 lines
8.4 KiB
Python
235 lines
8.4 KiB
Python
"""Project Indexer — 프로젝트 구조 분석/캐시.
|
|
|
|
파일 구조, import 관계, 함수 시그니처를 파악하여
|
|
Context Manager가 관련 파일을 선별할 수 있게 합니다.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import json
|
|
from pathlib import Path
|
|
from dataclasses import dataclass, field, asdict
|
|
|
|
|
|
@dataclass
|
|
class FileInfo:
|
|
path: str
|
|
language: str
|
|
size: int
|
|
line_count: int
|
|
imports: list[str] = field(default_factory=list)
|
|
functions: list[str] = field(default_factory=list)
|
|
classes: list[str] = field(default_factory=list)
|
|
|
|
|
|
LANGUAGE_MAP = {
|
|
".py": "python", ".js": "javascript", ".ts": "typescript",
|
|
".java": "java", ".cs": "csharp", ".cpp": "cpp", ".c": "c",
|
|
".go": "go", ".rs": "rust", ".rb": "ruby", ".php": "php",
|
|
".html": "html", ".css": "css", ".md": "markdown",
|
|
".yaml": "yaml", ".yml": "yaml", ".json": "json",
|
|
".sql": "sql", ".sh": "bash", ".ps1": "powershell",
|
|
}
|
|
|
|
IGNORE_DIRS = {
|
|
".git", ".agent", "node_modules", "__pycache__", ".venv",
|
|
"venv", ".tox", ".mypy_cache", ".pytest_cache", "dist",
|
|
"build", ".next", ".nuxt", "sessions",
|
|
}
|
|
|
|
IGNORE_EXTENSIONS = {
|
|
".pyc", ".pyo", ".exe", ".dll", ".so", ".o",
|
|
".jpg", ".png", ".gif", ".ico", ".svg",
|
|
".zip", ".tar", ".gz", ".rar",
|
|
".db", ".sqlite", ".lock",
|
|
}
|
|
|
|
|
|
class ProjectIndex:
|
|
"""프로젝트 구조를 분석하고 캐시합니다."""
|
|
|
|
def __init__(self, project_path: str, max_file_size: int = 100_000):
|
|
self.project_path = Path(project_path).resolve()
|
|
self.max_file_size = max_file_size
|
|
self.files: dict[str, FileInfo] = {}
|
|
self.import_graph: dict[str, list[str]] = {} # file → [imported files]
|
|
|
|
def scan(self) -> "ProjectIndex":
|
|
"""프로젝트 전체 스캔."""
|
|
self.files.clear()
|
|
self.import_graph.clear()
|
|
|
|
for root, dirs, files in os.walk(self.project_path):
|
|
# 무시 디렉토리 필터
|
|
dirs[:] = [d for d in dirs if d not in IGNORE_DIRS]
|
|
|
|
for fname in files:
|
|
fpath = Path(root) / fname
|
|
ext = fpath.suffix.lower()
|
|
|
|
if ext in IGNORE_EXTENSIONS:
|
|
continue
|
|
if fpath.stat().st_size > self.max_file_size:
|
|
continue
|
|
|
|
rel_path = str(fpath.relative_to(self.project_path)).replace("\\", "/")
|
|
lang = LANGUAGE_MAP.get(ext, "")
|
|
|
|
try:
|
|
content = fpath.read_text(encoding="utf-8", errors="ignore")
|
|
lines = content.splitlines()
|
|
except Exception:
|
|
continue
|
|
|
|
info = FileInfo(
|
|
path=rel_path,
|
|
language=lang,
|
|
size=fpath.stat().st_size,
|
|
line_count=len(lines),
|
|
)
|
|
|
|
# 언어별 파싱
|
|
if lang == "python":
|
|
info.imports = self._parse_python_imports(content)
|
|
info.functions = self._parse_python_functions(content)
|
|
info.classes = self._parse_python_classes(content)
|
|
|
|
self.files[rel_path] = info
|
|
|
|
# import 그래프 구축
|
|
self._build_import_graph()
|
|
return self
|
|
|
|
def _parse_python_imports(self, content: str) -> list[str]:
|
|
imports = []
|
|
for line in content.splitlines():
|
|
line = line.strip()
|
|
if line.startswith("import "):
|
|
imports.append(line.split()[1].split(".")[0])
|
|
elif line.startswith("from "):
|
|
match = re.match(r"from\s+([\w.]+)\s+import", line)
|
|
if match:
|
|
imports.append(match.group(1))
|
|
return imports
|
|
|
|
def _parse_python_functions(self, content: str) -> list[str]:
|
|
return re.findall(r"^(?:async\s+)?def\s+(\w+)\s*\(", content, re.MULTILINE)
|
|
|
|
def _parse_python_classes(self, content: str) -> list[str]:
|
|
return re.findall(r"^class\s+(\w+)\s*[:\(]", content, re.MULTILINE)
|
|
|
|
def _build_import_graph(self):
|
|
"""import 문에서 프로젝트 내 파일을 찾아 그래프 구축."""
|
|
file_modules = {}
|
|
for rel_path in self.files:
|
|
if rel_path.endswith(".py"):
|
|
module = rel_path.replace("/", ".").removesuffix(".py")
|
|
file_modules[module] = rel_path
|
|
# 마지막 부분만으로도 매칭
|
|
parts = module.split(".")
|
|
if len(parts) > 1:
|
|
file_modules[parts[-1]] = rel_path
|
|
|
|
for rel_path, info in self.files.items():
|
|
deps = []
|
|
for imp in info.imports:
|
|
if imp in file_modules:
|
|
deps.append(file_modules[imp])
|
|
# 점 표기법 부분 매칭
|
|
for mod, fpath in file_modules.items():
|
|
if imp.startswith(mod) or mod.startswith(imp):
|
|
if fpath not in deps and fpath != rel_path:
|
|
deps.append(fpath)
|
|
self.import_graph[rel_path] = deps
|
|
|
|
def find_relevant(self, query: str) -> list[str]:
|
|
"""쿼리에서 언급된 파일/함수/클래스 기반으로 관련 파일 검색."""
|
|
query_lower = query.lower()
|
|
scored = []
|
|
|
|
for rel_path, info in self.files.items():
|
|
score = 0
|
|
# 파일명 매칭
|
|
basename = Path(rel_path).stem.lower()
|
|
if basename in query_lower:
|
|
score += 10
|
|
# 경로 부분 매칭
|
|
for part in rel_path.lower().split("/"):
|
|
if part.rstrip(".py") in query_lower:
|
|
score += 5
|
|
# 함수/클래스 매칭
|
|
for func in info.functions:
|
|
if func.lower() in query_lower:
|
|
score += 8
|
|
for cls in info.classes:
|
|
if cls.lower() in query_lower:
|
|
score += 8
|
|
if score > 0:
|
|
scored.append((rel_path, score))
|
|
|
|
scored.sort(key=lambda x: -x[1])
|
|
return [path for path, _ in scored]
|
|
|
|
def expand_dependencies(self, files: list[str], depth: int = 2) -> list[str]:
|
|
"""import 관계로 관련 파일 확장."""
|
|
result = set(files)
|
|
frontier = set(files)
|
|
|
|
for _ in range(depth):
|
|
next_frontier = set()
|
|
for f in frontier:
|
|
deps = self.import_graph.get(f, [])
|
|
for dep in deps:
|
|
if dep not in result:
|
|
result.add(dep)
|
|
next_frontier.add(dep)
|
|
# 역방향: 이 파일을 import하는 파일
|
|
for other, other_deps in self.import_graph.items():
|
|
if f in other_deps and other not in result:
|
|
result.add(other)
|
|
next_frontier.add(other)
|
|
frontier = next_frontier
|
|
|
|
return list(result)
|
|
|
|
def get_structure_summary(self) -> str:
|
|
"""프로젝트 구조 요약 (Gemini에 전달용)."""
|
|
lines = [f"# Project: {self.project_path.name}", ""]
|
|
dirs: dict[str, list[str]] = {}
|
|
for rel_path in sorted(self.files.keys()):
|
|
d = str(Path(rel_path).parent)
|
|
if d == ".":
|
|
d = "(root)"
|
|
dirs.setdefault(d, []).append(Path(rel_path).name)
|
|
|
|
for d, files in sorted(dirs.items()):
|
|
lines.append(f"## {d}/")
|
|
for f in files:
|
|
info = self.files.get(f"{d}/{f}" if d != "(root)" else f)
|
|
if info:
|
|
funcs = ", ".join(info.functions[:5])
|
|
extra = f" — {funcs}" if funcs else ""
|
|
lines.append(f" - {f} ({info.line_count}L){extra}")
|
|
else:
|
|
lines.append(f" - {f}")
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|
|
|
|
def save_cache(self, cache_path: str):
|
|
data = {
|
|
"project_path": str(self.project_path),
|
|
"files": {k: asdict(v) for k, v in self.files.items()},
|
|
"import_graph": self.import_graph,
|
|
}
|
|
Path(cache_path).write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
def load_cache(self, cache_path: str) -> bool:
|
|
p = Path(cache_path)
|
|
if not p.exists():
|
|
return False
|
|
data = json.loads(p.read_text(encoding="utf-8"))
|
|
self.files = {k: FileInfo(**v) for k, v in data["files"].items()}
|
|
self.import_graph = data["import_graph"]
|
|
return True
|