feat: Variet Engine v1.0 + 5-model tuning complete

Phase 01 (LLM Tuning):
- Gemma4 26B: 74.65 t/s (fast)
- Qwen 35B: 61.62 t/s (balanced)
- Gemma4 31B: 16.0 t/s (deep-coder)
- Qwen 27B: 16.7 t/s (deep-logic)
- Qwen 122B: 8.95 t/s (ultra, GPU 1 only)

Phase 02 (API Engine):
- FastAPI reverse proxy on port 8000
- /engine/switch hot-swap with 503 protection
- config/engine_models.json as single source of truth
- Replaced 4 individual .bat files with unified engine

File cleanup:
- scripts/ 85 files -> 9 + _archive/
- Root .bat files -> _archive/
This commit is contained in:
Variet-Worker
2026-04-07 18:08:58 +09:00
parent 7c7a899fd5
commit c111b3a9b0
414 changed files with 3402 additions and 68598 deletions

408
engine/variet_engine.py Normal file
View File

@@ -0,0 +1,408 @@
"""
Variet LLM Engine v1.0
━━━━━━━━━━━━━━━━━━━━━━
FastAPI reverse-proxy + llama-server process manager.
Architecture:
Client (Machine B) ──► Engine :8000 ──► llama-server :8080 (internal)
- /v1/* → transparent proxy to llama-server (OpenAI-compatible)
- /engine/status → current model & state
- /engine/models → available roles
- /engine/switch → hot-swap model
- /engine/health → health check
"""
import os
import json
import time
import socket
import asyncio
import logging
import subprocess
from pathlib import Path
from contextlib import asynccontextmanager
import httpx
import uvicorn
from fastapi import FastAPI, Request, BackgroundTasks
from fastapi.responses import JSONResponse, StreamingResponse
from starlette.background import BackgroundTask
# ── Logging ──────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(levelname)s %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("variet-engine")
# ── Paths ────────────────────────────────────────────────
ROOT_DIR = Path(__file__).resolve().parent.parent
CONFIG_FILE = ROOT_DIR / "config" / "engine_models.json"
LOG_DIR = ROOT_DIR / "logs"
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Engine State
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
class EngineState:
"""Tracks the running llama-server process and current role."""
def __init__(self):
self.state: str = "starting" # starting | loading | ready | error
self.current_role: str | None = None
self.process: subprocess.Popen | None = None
self.config: dict = {}
self.boot_time: float = time.time()
self._log_file = None
def load_config(self):
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
self.config = json.load(f)
log.info(f"Config loaded: {len(self.config.get('roles', {}))} roles available")
@property
def internal_url(self) -> str:
host = self.config["llama_server"]["internal_host"]
port = self.config["llama_server"]["internal_port"]
return f"http://{host}:{port}"
@property
def role_info(self) -> dict:
if self.current_role and self.current_role in self.config.get("roles", {}):
return self.config["roles"][self.current_role]
return {}
engine = EngineState()
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Process Management
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def _is_port_free(port: int) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(("127.0.0.1", port)) != 0
def kill_llama_server():
"""Force-kill any running llama-server process."""
log.info("Stopping llama-server...")
# Kill our tracked process first
if engine.process and engine.process.poll() is None:
engine.process.terminate()
try:
engine.process.wait(timeout=5)
except subprocess.TimeoutExpired:
engine.process.kill()
# Also kill any orphaned llama-server
subprocess.run(
"taskkill /F /IM llama-server.exe",
shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
# Close log file
if engine._log_file:
try:
engine._log_file.close()
except:
pass
engine._log_file = None
# Wait for port to free
port = engine.config.get("llama_server", {}).get("internal_port", 8080)
for _ in range(20):
if _is_port_free(port):
break
time.sleep(0.5)
else:
log.warning(f"Port {port} still occupied after 10s")
def build_command(role: str) -> list[str]:
"""Build llama-server CLI command from config."""
role_cfg = engine.config["roles"][role]
llama_path = str(ROOT_DIR / engine.config["llama_server"]["path"])
model_path = str(ROOT_DIR / role_cfg["model_path"])
internal_port = engine.config["llama_server"]["internal_port"]
internal_host = engine.config["llama_server"]["internal_host"]
cmd = [llama_path, "--model", model_path, "--port", str(internal_port), "--host", internal_host]
# args is a flat list of CLI arguments, passed through as-is
cmd.extend(role_cfg.get("args", []))
return cmd
def start_llama_server(role: str):
"""Start llama-server for the given role."""
role_cfg = engine.config["roles"][role]
cmd = build_command(role)
log.info(f"Starting [{role}] {role_cfg['display_name']}")
log.info(f"CMD: {' '.join(cmd)}")
LOG_DIR.mkdir(exist_ok=True)
engine._log_file = open(LOG_DIR / "llama-server.log", "a", encoding="utf-8")
engine.process = subprocess.Popen(
cmd, stdout=engine._log_file, stderr=subprocess.STDOUT, cwd=str(ROOT_DIR)
)
engine.current_role = role
engine.boot_time = time.time()
async def wait_for_health(timeout: int = 300) -> bool:
"""Poll llama-server /health until ready."""
url = f"{engine.internal_url}/health"
start = time.time()
async with httpx.AsyncClient() as client:
while time.time() - start < timeout:
# Check if process died
if engine.process and engine.process.poll() is not None:
log.error(f"llama-server exited with code {engine.process.returncode}")
engine.state = "error"
return False
try:
resp = await client.get(url, timeout=3.0)
if resp.status_code == 200:
data = resp.json()
if data.get("status") in ("ok", "ready"):
elapsed = round(time.time() - start, 1)
log.info(f"llama-server READY in {elapsed}s")
engine.state = "ready"
return True
except (httpx.ConnectError, httpx.ReadTimeout, httpx.ConnectTimeout):
pass
await asyncio.sleep(2)
log.error(f"llama-server failed to become healthy within {timeout}s")
engine.state = "error"
return False
async def perform_switch(role: str):
"""Full hot-swap sequence: kill → start → wait."""
engine.state = "loading"
log.info(f"═══ HOT-SWAP: switching to [{role}] ═══")
kill_llama_server()
start_llama_server(role)
await wait_for_health()
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# FastAPI App
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup: boot default model. Shutdown: kill server."""
LOG_DIR.mkdir(exist_ok=True)
engine.load_config()
default_role = engine.config.get("default_role", "fast")
engine.state = "loading"
kill_llama_server()
start_llama_server(default_role)
asyncio.create_task(wait_for_health())
yield # App is running
log.info("Shutting down engine...")
kill_llama_server()
app = FastAPI(title="Variet LLM Engine", version="1.0", lifespan=lifespan)
# ── Engine Management Endpoints ──────────────────────────
@app.get("/engine/health")
async def health():
return {"state": engine.state, "role": engine.current_role}
def _get_arg_value(args_list: list, flag: str):
"""Extract a value from a CLI args list by flag name."""
try:
idx = args_list.index(flag)
return args_list[idx + 1]
except (ValueError, IndexError):
return None
@app.get("/engine/status")
async def status():
info = engine.role_info
args = info.get("args", [])
return {
"state": engine.state,
"role": engine.current_role,
"display_name": info.get("display_name", "Unknown"),
"measured_tps": info.get("measured_tps"),
"context_size": _get_arg_value(args, "-c"),
"uptime_seconds": round(time.time() - engine.boot_time, 1),
}
@app.get("/engine/models")
async def models():
roles = {}
for name, cfg in engine.config.get("roles", {}).items():
args = cfg.get("args", [])
roles[name] = {
"display_name": cfg["display_name"],
"measured_tps": cfg.get("measured_tps"),
"context_size": _get_arg_value(args, "-c"),
}
return {"current": engine.current_role, "roles": roles}
@app.post("/engine/switch/{role}")
async def switch(role: str, background_tasks: BackgroundTasks):
engine.load_config()
if role not in engine.config["roles"]:
available = list(engine.config["roles"].keys())
return JSONResponse(
status_code=400,
content={"error": f"Unknown role '{role}'", "available": available},
)
if engine.state == "loading":
return JSONResponse(
status_code=409,
content={"error": "A model switch is already in progress."},
)
if role == engine.current_role and engine.state == "ready":
return {"status": "already_active", "role": role}
target = engine.config["roles"][role]
background_tasks.add_task(perform_switch, role)
return {
"status": "switching",
"from_role": engine.current_role,
"to_role": role,
"to_model": target["display_name"],
"eta_seconds": 180 if role == "ultra" else 30,
}
# ── Reverse Proxy ────────────────────────────────────────
@app.api_route(
"/v1/{path:path}",
methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"],
)
async def proxy(request: Request, path: str):
"""Transparently proxy all /v1/* to llama-server."""
if engine.state != "ready":
eta = 180 if engine.current_role == "ultra" else 30
return JSONResponse(
status_code=503,
headers={"Retry-After": str(eta)},
content={
"error": {
"message": f"Engine is loading model ({engine.current_role}). Retry in ~{eta}s.",
"type": "engine_loading",
"code": "service_unavailable",
},
"state": engine.state,
},
)
target_url = f"{engine.internal_url}/v1/{path}"
body = await request.body()
# Forward headers, removing hop-by-hop
fwd_headers = {}
for k, v in request.headers.items():
if k.lower() not in ("host", "content-length", "transfer-encoding"):
fwd_headers[k] = v
client = httpx.AsyncClient(timeout=600.0)
try:
req = client.build_request(
method=request.method,
url=target_url,
headers=fwd_headers,
content=body,
params=dict(request.query_params),
)
resp = await client.send(req, stream=True)
async def stream_and_close():
async for chunk in resp.aiter_raw():
yield chunk
await resp.aclose()
await client.aclose()
# Forward response headers, removing hop-by-hop
resp_headers = {}
for k, v in resp.headers.items():
if k.lower() not in ("transfer-encoding", "content-length", "content-encoding"):
resp_headers[k] = v
return StreamingResponse(
stream_and_close(),
status_code=resp.status_code,
headers=resp_headers,
)
except httpx.ConnectError:
await client.aclose()
engine.state = "error"
return JSONResponse(
status_code=502,
content={"error": "llama-server connection failed. Model may have crashed."},
)
except Exception as e:
await client.aclose()
return JSONResponse(status_code=500, content={"error": str(e)})
# ── Root info ────────────────────────────────────────────
@app.get("/")
async def root():
return {
"name": "Variet LLM Engine",
"version": "1.0",
"state": engine.state,
"current_model": engine.role_info.get("display_name", "None"),
"endpoints": {
"inference": "/v1/chat/completions",
"status": "/engine/status",
"models": "/engine/models",
"switch": "/engine/switch/{role}",
"health": "/engine/health",
},
}
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Main
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
if __name__ == "__main__":
engine.load_config()
ext_port = engine.config["engine"].get("external_port", 8000)
ext_host = engine.config["engine"].get("external_host", "0.0.0.0")
log.info("══════════════════════════════════════")
log.info(" Variet LLM Engine v1.0")
log.info(f" Listening on {ext_host}:{ext_port}")
log.info(f" Default role: {engine.config.get('default_role', 'fast')}")
log.info("══════════════════════════════════════")
uvicorn.run(app, host=ext_host, port=ext_port, log_level="info")