feat(integration): Gitea + Vikunja + CI 클라이언트 구현 #task-192 #task-193

This commit is contained in:
quantlab
2026-03-06 20:01:00 +09:00
parent 26cef9bb11
commit ee01a13129
6 changed files with 459 additions and 0 deletions

1
integrations/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Integrations package — Gitea, Vikunja, CI."""

View File

@@ -0,0 +1,68 @@
"""CI Monitor — Gitea commit status 기반 CI 결과 대기.
Woodpecker CI의 결과를 Gitea commit status API로 폴링합니다.
"""
import asyncio
import logging
from integrations.gitea_client import GiteaClient
logger = logging.getLogger("variet.ci")
class CIMonitor:
"""CI 결과 모니터링."""
def __init__(self, gitea: GiteaClient = None):
self.gitea = gitea or GiteaClient()
async def wait_for_ci(
self,
sha: str,
timeout: int = 300,
poll_interval: int = 10,
) -> dict:
"""CI 결과 대기.
Args:
sha: 커밋 SHA
timeout: 최대 대기 시간 (초)
poll_interval: 폴링 간격 (초)
Returns:
{"status": "success|failure|pending", "details": [...]}
"""
elapsed = 0
logger.info(f"CI 결과 대기 중... (SHA: {sha[:7]}, timeout: {timeout}s)")
while elapsed < timeout:
try:
combined = await self.gitea.get_combined_status(sha)
state = combined.get("state", "pending")
statuses = combined.get("statuses", []) or []
if state in ("success", "failure", "error"):
logger.info(f"CI 완료: {state} ({len(statuses)}개 체크)")
return {
"status": state,
"details": [
{
"context": s.get("context", ""),
"status": s.get("status", ""),
"description": s.get("description", ""),
}
for s in statuses
],
}
logger.debug(f"CI 진행 중... ({elapsed}s/{timeout}s)")
except Exception as e:
logger.warning(f"CI 상태 조회 실패: {e}")
await asyncio.sleep(poll_interval)
elapsed += poll_interval
logger.warning(f"CI 타임아웃 ({timeout}s)")
return {"status": "timeout", "details": []}

View File

@@ -0,0 +1,183 @@
"""Gitea API 클라이언트.
git.variet.net API를 통해 레포지토리, 브랜치, PR, 커밋을 관리합니다.
"""
import logging
from typing import Optional
import httpx
import config
logger = logging.getLogger("variet.gitea")
class GiteaClient:
"""Gitea REST API 비동기 클라이언트."""
def __init__(
self,
base_url: str = None,
token: str = None,
repo: str = None,
):
self.base_url = (base_url or config.GITEA_URL).rstrip("/")
self.api_url = f"{self.base_url}/api/v1"
self.token = token or config.GITEA_TOKEN
self.repo = repo or config.GITEA_REPO
self.headers = {
"Authorization": f"token {self.token}",
"Content-Type": "application/json",
}
async def _get(self, path: str, params: dict = None) -> dict | list:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(
f"{self.api_url}{path}",
headers=self.headers,
params=params,
)
resp.raise_for_status()
return resp.json()
async def _post(self, path: str, data: dict = None) -> dict:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(
f"{self.api_url}{path}",
headers=self.headers,
json=data,
)
resp.raise_for_status()
return resp.json()
# === Commits ===
async def get_commits(self, limit: int = 10, branch: str = "main") -> list[dict]:
"""최근 커밋 조회."""
commits = await self._get(
f"/repos/{self.repo}/commits",
params={"limit": limit, "sha": branch},
)
return [
{
"sha": c["sha"][:7],
"message": c["commit"]["message"].split("\n")[0],
"author": c["commit"]["author"]["name"],
"date": c["commit"]["author"]["date"],
}
for c in commits
]
# === Branches ===
async def create_branch(self, name: str, from_branch: str = "main") -> dict:
"""새 브랜치 생성."""
result = await self._post(
f"/repos/{self.repo}/branches",
data={"new_branch_name": name, "old_branch_name": from_branch},
)
logger.info(f"브랜치 생성: {name} (from {from_branch})")
return result
async def list_branches(self) -> list[str]:
"""브랜치 목록."""
branches = await self._get(f"/repos/{self.repo}/branches")
return [b["name"] for b in branches]
# === Pull Requests ===
async def create_pr(
self,
title: str,
head: str,
base: str = "main",
body: str = "",
) -> dict:
"""PR 생성."""
result = await self._post(
f"/repos/{self.repo}/pulls",
data={
"title": title,
"head": head,
"base": base,
"body": body,
},
)
pr_number = result.get("number")
logger.info(f"PR #{pr_number} 생성: {title}")
return result
async def list_prs(self, state: str = "open") -> list[dict]:
"""PR 목록 조회."""
prs = await self._get(
f"/repos/{self.repo}/pulls",
params={"state": state},
)
return [
{
"number": p["number"],
"title": p["title"],
"state": p["state"],
"user": p["user"]["login"],
}
for p in prs
]
async def merge_pr(self, pr_number: int, merge_type: str = "merge") -> dict:
"""PR 머지."""
result = await self._post(
f"/repos/{self.repo}/pulls/{pr_number}/merge",
data={"Do": merge_type},
)
logger.info(f"PR #{pr_number} 머지 완료")
return result
# === Issues ===
async def list_issues(self, state: str = "open") -> list[dict]:
"""이슈 목록 조회."""
issues = await self._get(
f"/repos/{self.repo}/issues",
params={"state": state, "type": "issues"},
)
return [
{
"number": i["number"],
"title": i["title"],
"state": i["state"],
}
for i in issues
]
async def create_issue(self, title: str, body: str = "") -> dict:
"""이슈 생성."""
result = await self._post(
f"/repos/{self.repo}/issues",
data={"title": title, "body": body},
)
logger.info(f"이슈 #{result.get('number')} 생성: {title}")
return result
# === Commit Status (CI) ===
async def get_commit_status(self, sha: str) -> list[dict]:
"""커밋의 CI 상태 조회."""
statuses = await self._get(
f"/repos/{self.repo}/statuses/{sha}",
)
return [
{
"status": s["status"],
"description": s.get("description", ""),
"context": s.get("context", ""),
"target_url": s.get("target_url", ""),
}
for s in statuses
]
async def get_combined_status(self, sha: str) -> dict:
"""커밋의 통합 CI 상태 조회."""
return await self._get(
f"/repos/{self.repo}/commits/{sha}/status",
)

View File

@@ -0,0 +1,146 @@
"""Vikunja API 클라이언트.
plan.variet.net API를 통해 태스크를 관리합니다.
vikunja_helper.py의 async 버전 — safe_update 패턴 유지.
"""
import logging
from typing import Optional
import httpx
import config
logger = logging.getLogger("variet.vikunja")
class VikunjaClient:
"""Vikunja REST API 비동기 클라이언트."""
def __init__(
self,
base_url: str = None,
token: str = None,
project_id: int = None,
):
self.base_url = (base_url or config.VIKUNJA_URL).rstrip("/")
self.api_url = f"{self.base_url}/api/v1"
self.token = token or config.VIKUNJA_TOKEN
self.project_id = project_id or config.VIKUNJA_PROJECT_ID
self.headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json",
}
async def _get(self, path: str, params: dict = None) -> dict | list:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(
f"{self.api_url}{path}",
headers=self.headers,
params=params,
)
resp.raise_for_status()
return resp.json()
async def _post(self, path: str, data: dict = None) -> dict:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(
f"{self.api_url}{path}",
headers=self.headers,
json=data,
)
resp.raise_for_status()
return resp.json()
async def _put(self, path: str, data: dict = None) -> dict:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.put(
f"{self.api_url}{path}",
headers=self.headers,
json=data,
)
resp.raise_for_status()
return resp.json()
# === Tasks ===
async def list_tasks(self, filter_: str = "all") -> list[dict]:
"""프로젝트 태스크 목록 조회."""
tasks = await self._get(
f"/projects/{self.project_id}/tasks",
params={"per_page": 100},
)
if filter_ == "todo":
tasks = [t for t in tasks if not t["done"]]
elif filter_ == "done":
tasks = [t for t in tasks if t["done"]]
tasks.sort(key=lambda t: t["id"])
return [
{
"id": t["id"],
"title": t["title"],
"description": (t.get("description") or "")[:100],
"done": t["done"],
"labels": [l["title"] for l in (t.get("labels") or [])],
}
for t in tasks
]
async def get_task(self, task_id: int) -> dict:
"""태스크 상세 조회."""
return await self._get(f"/tasks/{task_id}")
async def safe_update_task(self, task_id: int, updates: dict) -> dict:
"""안전한 태스크 업데이트 — GET → 보존 → POST.
Vikunja API는 POST 시 누락된 필드를 빈값으로 덮어쓰므로
반드시 기존 필드를 보존해야 합니다.
"""
task = await self.get_task(task_id)
safe_body = {
"title": task.get("title", ""),
"description": task.get("description", ""),
"priority": task.get("priority", 0),
"done": task.get("done", False),
}
safe_body.update(updates)
result = await self._post(f"/tasks/{task_id}", safe_body)
logger.info(f"태스크 #{task_id} 업데이트: {updates}")
return result
async def mark_done(self, task_id: int) -> dict:
"""태스크 완료 처리."""
result = await self.safe_update_task(task_id, {"done": True})
logger.info(f"태스크 #{task_id} 완료 처리")
return result
async def create_task(
self,
title: str,
description: str = "",
done: bool = False,
) -> dict:
"""새 태스크 생성."""
payload = {"title": title, "description": description}
result = await self._put(
f"/projects/{self.project_id}/tasks",
data=payload,
)
task_id = result["id"]
logger.info(f"태스크 #{task_id} 생성: {title}")
if done:
result = await self.safe_update_task(task_id, {"done": True})
return result
async def add_comment(self, task_id: int, comment: str) -> dict:
"""태스크에 코멘트 추가."""
result = await self._put(
f"/tasks/{task_id}/comments",
data={"comment": comment},
)
logger.info(f"태스크 #{task_id} 코멘트 추가")
return result