From 8bd7dcab3f8c8b1112455cc2eb95a9c4f5804312 Mon Sep 17 00:00:00 2001 From: CD Date: Fri, 6 Mar 2026 21:38:08 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20=EC=B6=9C=EB=A0=A5=20=ED=8C=8C=EC=8B=B1?= =?UTF-8?q?=204=ED=8C=A8=ED=84=B4=20=EC=A7=80=EC=9B=90=20+=20=EC=8B=A4?= =?UTF-8?q?=ED=8C=A8=20=EA=B8=B0=EB=A1=9D=20+=20Gemini=20CLI=20Windows=20?= =?UTF-8?q?=ED=98=B8=ED=99=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - file_applier.py: === FILE ===, `lang:path, // file: comment, **header**+code 4패턴 경로 검증, 중복 제거, 빈 내용 스킵, 소스 추적 - task_pipeline.py: try/except/finally로 성공/실패 모두 docs 기록 파싱 실패 추적, 에러를 총평 warnings에 전파 - gemini_caller.py: Windows에서 cmd /c gemini 사용 (PS ExecutionPolicy 우회) - Gemini CLI stdin 파이프 동작 검증 완료 --- core/file_applier.py | 199 ++++++++++++++++++++++++++++++++++++------ core/gemini_caller.py | 9 +- core/task_pipeline.py | 128 ++++++++++++++++++--------- 3 files changed, 266 insertions(+), 70 deletions(-) diff --git a/core/file_applier.py b/core/file_applier.py index 8626ade..d3f36be 100644 --- a/core/file_applier.py +++ b/core/file_applier.py @@ -1,7 +1,12 @@ """File Applier — Coder 출력을 실제 파일에 적용. -Coder가 출력한 `=== FILE: path === ... === END FILE ===` 블록을 -파싱하여 프로젝트 파일에 실제로 쓰기. +Gemini가 출력할 수 있는 다양한 형식을 모두 지원: +1. === FILE: path === ... === END FILE === +2. ```lang:path/to/file.py ... ``` +3. ```lang\n// file: path/to/file.py ... ``` +4. **path/to/file.py** + 코드블록 +5. `path/to/file.py`: + 코드블록 +6. 순수 마크다운 코드블록 (파일명 헤더 포함) """ import re @@ -11,6 +16,14 @@ from dataclasses import dataclass logger = logging.getLogger("variet.applier") +# 소스 파일 확장자 (파일명 판별용) +SOURCE_EXTENSIONS = { + ".py", ".js", ".ts", ".jsx", ".tsx", ".java", ".cs", ".cpp", ".c", ".h", + ".go", ".rs", ".rb", ".php", ".html", ".css", ".scss", ".yaml", ".yml", + ".json", ".toml", ".md", ".sql", ".sh", ".ps1", ".bat", ".xml", ".vue", + ".svelte", ".astro", +} + @dataclass class FileChange: @@ -18,44 +31,160 @@ class FileChange: path: str # 상대 경로 content: str # 전체 파일 내용 is_new: bool # 신규 파일 여부 + source: str = "" # 어떤 패턴으로 감지했는지 + + +def _looks_like_filepath(text: str) -> bool: + """텍스트가 파일 경로처럼 보이는지.""" + text = text.strip().strip("`").strip("*").strip('"').strip("'") + if not text or len(text) > 200: + return False + # 확장자가 있는지 + if Path(text).suffix.lower() in SOURCE_EXTENSIONS: + return True + # 경로 구분자가 있는지 + if "/" in text or "\\" in text: + return Path(text).suffix != "" + return False + + +def _clean_path(raw: str) -> str: + """경로에서 불필요한 장식 제거.""" + path = raw.strip() + # 마크다운 장식 제거 + path = path.strip("`").strip("*").strip('"').strip("'") + # 앞뒤 공백/콜론 제거 + path = path.strip().rstrip(":").strip() + # 윈도우 → 유닉스 + path = path.replace("\\", "/") + # 선행 ./ 제거 + if path.startswith("./"): + path = path[2:] + return path def parse_code_output(raw: str) -> list[FileChange]: """Coder 출력에서 파일 블록을 추출. - 지원 형식: - === FILE: path/to/file.py === - (content) - === END FILE === - - 또는 마크다운 방식: - ```python:path/to/file.py - (content) - ``` + 여러 패턴을 순서대로 시도하여 가장 많이 매칭되는 것을 사용. """ - changes: list[FileChange] = [] + results = [] # 패턴 1: === FILE: path === ... === END FILE === - pattern1 = re.compile( + p1 = _parse_file_markers(raw) + if p1: + results.extend(p1) + + # 패턴 2: ```lang:path/to/file.py ... ``` + p2 = _parse_lang_colon_path(raw) + if p2: + results.extend(p2) + + # 패턴 3: // file: path 또는 # file: path (코드블록 내부 주석) + p3 = _parse_comment_filepath(raw) + if p3: + results.extend(p3) + + # 패턴 4: **path/to/file.py** 또는 `path/to/file.py` 뒤에 코드블록 + p4 = _parse_header_then_codeblock(raw) + if p4: + results.extend(p4) + + # 중복 제거 (같은 경로의 파일은 마지막 것 사용) + seen = {} + for fc in results: + seen[fc.path] = fc + return list(seen.values()) + + +def _parse_file_markers(raw: str) -> list[FileChange]: + """패턴 1: === FILE: path === ... === END FILE ===""" + pattern = re.compile( r'===\s*FILE:\s*(.+?)\s*===\s*\n(.*?)\n\s*===\s*END\s*FILE\s*===', re.DOTALL, ) - for match in pattern1.finditer(raw): - path = match.group(1).strip() + changes = [] + for match in pattern.finditer(raw): + path = _clean_path(match.group(1)) content = match.group(2) - changes.append(FileChange(path=path, content=content, is_new=False)) + if path: + changes.append(FileChange(path=path, content=content, is_new=False, source="file_marker")) + return changes - # 패턴 2: ```lang:path/to/file.py\n...\n``` - if not changes: - pattern2 = re.compile( - r'```\w*:(.+?)\n(.*?)\n```', - re.DOTALL, + +def _parse_lang_colon_path(raw: str) -> list[FileChange]: + """패턴 2: ```lang:path/to/file.py ... ```""" + pattern = re.compile( + r'```\w*:(.+?)\n(.*?)\n```', + re.DOTALL, + ) + changes = [] + for match in pattern.finditer(raw): + path = _clean_path(match.group(1)) + content = match.group(2) + if _looks_like_filepath(path): + changes.append(FileChange(path=path, content=content, is_new=False, source="lang_colon")) + return changes + + +def _parse_comment_filepath(raw: str) -> list[FileChange]: + """패턴 3: 코드블록 내 첫 줄이 // file: path 또는 # file: path""" + pattern = re.compile( + r'```(\w*)\n(.*?)\n```', + re.DOTALL, + ) + changes = [] + for match in pattern.finditer(raw): + content = match.group(2) + lines = content.split("\n", 1) + if not lines: + continue + + first_line = lines[0].strip() + # // file: path, # file: path, /* file: path */ + file_match = re.match( + r'(?://|#|/\*)\s*[Ff]ile:\s*(.+?)(?:\s*\*/)?$', + first_line, ) - for match in pattern2.finditer(raw): - path = match.group(1).strip() - content = match.group(2) - changes.append(FileChange(path=path, content=content, is_new=False)) + if file_match: + path = _clean_path(file_match.group(1)) + actual_content = lines[1] if len(lines) > 1 else "" + if _looks_like_filepath(path): + changes.append(FileChange( + path=path, content=actual_content, is_new=False, source="comment_filepath" + )) + return changes + +def _parse_header_then_codeblock(raw: str) -> list[FileChange]: + """패턴 4: **path** 또는 `path` 일 줄 + 바로 다음 코드블록. + + 예: + **api/server.py** + ```python + content... + ``` + + 또는: + `core/utils.py`: + ```python + content... + ``` + """ + # 파일 경로 헤더 + 코드블록 패턴 + pattern = re.compile( + r'(?:\*\*([^*\n]+?)\*\*|`([^`\n]+?)`)\s*:?\s*\n+' + r'```\w*\n(.*?)\n```', + re.DOTALL, + ) + changes = [] + for match in pattern.finditer(raw): + path = _clean_path(match.group(1) or match.group(2)) + content = match.group(3) + if _looks_like_filepath(path): + changes.append(FileChange( + path=path, content=content, is_new=False, source="header_codeblock" + )) return changes @@ -72,7 +201,7 @@ def apply_changes( dry_run: True면 실제 파일 쓰기 없이 결과만 반환 Returns: - 적용 결과 리스트 [{"path": ..., "action": "created|modified|skipped", "lines": N}] + 적용 결과 리스트 [{"path": ..., "action": ..., "lines": N, "source": ...}] """ root = Path(project_path).resolve() results = [] @@ -81,11 +210,23 @@ def apply_changes( # 경로 정규화 + 보안: 프로젝트 밖 경로 차단 target = (root / change.path).resolve() if not str(target).startswith(str(root)): - logger.warning(f"경로 보안 위반 — 스킵: {change.path}") + logger.warning(f"경로 보안 위반 - 스킵: {change.path}") results.append({ "path": change.path, "action": "skipped", "reason": "프로젝트 외부 경로", + "source": change.source, + }) + continue + + # 빈 내용 스킵 + if not change.content.strip(): + logger.warning(f"빈 내용 - 스킵: {change.path}") + results.append({ + "path": change.path, + "action": "skipped", + "reason": "내용 없음", + "source": change.source, }) continue @@ -97,6 +238,7 @@ def apply_changes( "path": change.path, "action": "would_create" if is_new else "would_modify", "lines": line_count, + "source": change.source, }) continue @@ -106,12 +248,13 @@ def apply_changes( # 파일 쓰기 target.write_text(change.content, encoding="utf-8") action = "created" if is_new else "modified" - logger.info(f"파일 {action}: {change.path} ({line_count}L)") + logger.info(f"파일 {action}: {change.path} ({line_count}L, via {change.source})") results.append({ "path": change.path, "action": action, "lines": line_count, + "source": change.source, }) return results diff --git a/core/gemini_caller.py b/core/gemini_caller.py index a0ca885..1edbb95 100644 --- a/core/gemini_caller.py +++ b/core/gemini_caller.py @@ -56,8 +56,15 @@ class GeminiCaller: ) try: + # Windows: cmd /c gemini (PS ExecutionPolicy가 .ps1을 차단하므로) + import sys + if sys.platform == "win32": + cmd = ["cmd", "/c", "gemini", "--approval-mode", "yolo"] + else: + cmd = ["gemini", "--approval-mode", "yolo"] + proc = await asyncio.create_subprocess_exec( - "gemini", "--approval-mode", "yolo", + *cmd, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, diff --git a/core/task_pipeline.py b/core/task_pipeline.py index 4bed97e..59db1dd 100644 --- a/core/task_pipeline.py +++ b/core/task_pipeline.py @@ -172,7 +172,10 @@ class TaskPipeline: # ────────────────────────────────────────── async def execute(self, user_request: str) -> dict: - """Plan → Code(병렬) → 파일 적용 → Review → 총평 → 기록.""" + """Plan -> Code(병렬) -> 파일 적용 -> Review -> 총평 -> 기록. + + 성공/실패 모두 docs에 기록됩니다. + """ result = { "request": user_request, "plan": None, @@ -180,53 +183,96 @@ class TaskPipeline: "review": None, "applied_files": [], "summary": None, + "errors": [], } - # 1. Plan - plan = await self.plan(user_request) - result["plan"] = plan + try: + # 1. Plan + plan = await self.plan(user_request) + result["plan"] = plan - tasks = plan.get("tasks", []) - if not tasks: + tasks = plan.get("tasks", []) + if not tasks: + result["summary"] = { + "title": "태스크 없음", + "summary": "Planner가 태스크를 생성하지 못했습니다.", + "changes": [], + "warnings": ["요청을 더 구체적으로 해주세요."], + "next_steps": [], + } + self.docs.record_session(user_request, result["summary"], plan) + return result + + # 2. Code 병렬 실행 + code_outputs = await self.code_parallel(tasks) + result["code_outputs"] = [o[:500] for o in code_outputs] + + # 에러 추적 + error_count = sum(1 for o in code_outputs if o.startswith("[ERROR]")) + if error_count > 0: + result["errors"].append(f"코딩 실패: {error_count}/{len(tasks)}개 태스크") + + # 3. 파일 적용 + all_applied = [] + parse_failures = 0 + for i, output in enumerate(code_outputs): + if output.startswith("[ERROR]"): + continue + changes = parse_code_output(output) + if changes: + applied = apply_changes(changes, self.project_path) + all_applied.extend(applied) + else: + parse_failures += 1 + result["errors"].append( + f"Task {i+1} 출력에서 파일을 추출하지 못함 " + f"(출력 {len(output)}자)" + ) + result["applied_files"] = all_applied + + if parse_failures > 0: + self._log( + "parse_warning", + f"{parse_failures}/{len(tasks)} 파싱 실패", + f"적용 성공: {len(all_applied)}개 파일", + ) + + # 4. Batch Review + review = await self.batch_review(tasks, code_outputs) + result["review"] = review + + # 5. 총평 + summary = await self.summarize( + user_request, plan, code_outputs, review, all_applied + ) + # 에러 정보를 총평에 추가 + if result["errors"]: + existing_warnings = summary.get("warnings", []) + summary["warnings"] = existing_warnings + result["errors"] + result["summary"] = summary + + except Exception as e: + # 파이프라인 자체 실패 + result["errors"].append(f"파이프라인 오류: {str(e)}") result["summary"] = { - "title": "태스크 없음", - "summary": "Planner가 태스크를 생성하지 못했습니다.", + "title": "작업 실패", + "summary": f"파이프라인 실행 중 오류 발생: {str(e)}", "changes": [], - "warnings": ["요청을 더 구체적으로 해주세요."], - "next_steps": [], + "warnings": result["errors"], + "next_steps": ["오류 내용 확인 후 다시 시도"], } - return result + self._log("pipeline_error", user_request, str(e)) - # 2. Code 병렬 실행 - code_outputs = await self.code_parallel(tasks) - result["code_outputs"] = [o[:500] for o in code_outputs] - - # 3. 파일 적용 - all_applied = [] - for output in code_outputs: - if output.startswith("[ERROR]"): - continue - changes = parse_code_output(output) - if changes: - applied = apply_changes(changes, self.project_path) - all_applied.extend(applied) - result["applied_files"] = all_applied - - # 4. Batch Review - review = await self.batch_review(tasks, code_outputs) - result["review"] = review - - # 5. 총평 - summary = await self.summarize( - user_request, plan, code_outputs, review, all_applied - ) - result["summary"] = summary - - # 6. 기록 - self.docs.record_session(user_request, summary, plan) - self.docs.append_changelog( - summary.get("title", user_request[:50]) - ) + finally: + # 성공/실패 모두 기록 + self.docs.record_session( + user_request, + result.get("summary", {"summary": "기록 없음"}), + result.get("plan"), + ) + self.docs.append_changelog( + result.get("summary", {}).get("title", user_request[:50]) + ) return result