- extension.ts 3,446→1,289줄 (-63%) - step-probe.ts (1,435줄): setupMonitor, processResponseFile, tryApprovalStrategies - observer-script.ts (687줄): DOM observer script - ws-client.ts (390줄): WSBridgeClient - step-utils.ts (114줄): step 파싱 유틸 - auth.py (115줄): JWT + registration code - hub.py (581줄): WSHub + per-client queue - Hub WS 연동 테스트 통과 (auth, chat, register) - VSIX v0.4.0 빌드
128 lines
4.3 KiB
Python
128 lines
4.3 KiB
Python
"""Authentication module — JWT token management for WebSocket Hub.
|
|
|
|
Two-stage auth:
|
|
1. Extension connects with a registration code (built into .vsix at build time)
|
|
2. Hub validates the code and issues a short-lived JWT session token
|
|
3. Subsequent reconnections use the JWT token directly
|
|
|
|
The master secret is stored server-side only (GRAVITY_HUB_SECRET env var).
|
|
"""
|
|
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
from base64 import urlsafe_b64decode, urlsafe_b64encode
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Defaults
|
|
DEFAULT_TOKEN_TTL = 86400 # 24 hours
|
|
DEFAULT_REGISTRATION_CODE = "" # Set via GRAVITY_REGISTRATION_CODE env var
|
|
|
|
|
|
class TokenManager:
|
|
"""Manages JWT-like token creation and verification.
|
|
|
|
Uses HMAC-SHA256 for signing. Tokens contain:
|
|
- project: project name scope
|
|
- pc: PC identifier (hostname or custom name)
|
|
- iat: issued at (unix timestamp)
|
|
- exp: expiration (unix timestamp)
|
|
"""
|
|
|
|
def __init__(self, secret: str = "", registration_code: str = ""):
|
|
self.secret = secret or os.getenv("GRAVITY_HUB_SECRET", "")
|
|
self.registration_code = registration_code or os.getenv(
|
|
"GRAVITY_REGISTRATION_CODE", DEFAULT_REGISTRATION_CODE
|
|
)
|
|
if not self.secret:
|
|
# Auto-generate a secret if not set (ephemeral — tokens invalid after restart)
|
|
self.secret = hashlib.sha256(os.urandom(32)).hexdigest()
|
|
logger.warning(
|
|
"[AUTH] No GRAVITY_HUB_SECRET set — generated ephemeral secret. "
|
|
"Tokens will be invalid after server restart."
|
|
)
|
|
|
|
def validate_registration_code(self, code: str) -> bool:
|
|
"""Check if the provided registration code matches."""
|
|
if not self.registration_code:
|
|
# No registration code configured → allow all connections
|
|
logger.warning("[AUTH] No registration code configured — accepting all")
|
|
return True
|
|
return hmac.compare_digest(code, self.registration_code)
|
|
|
|
def create_token(
|
|
self, project: str, pc_name: str, ttl: int = DEFAULT_TOKEN_TTL
|
|
) -> str:
|
|
"""Create a signed token for a specific project and PC.
|
|
|
|
Returns a base64-encoded string: {header}.{payload}.{signature}
|
|
"""
|
|
now = int(time.time())
|
|
payload = {
|
|
"project": project,
|
|
"pc": pc_name,
|
|
"iat": now,
|
|
"exp": now + ttl,
|
|
}
|
|
payload_b64 = _b64_encode(json.dumps(payload))
|
|
signature = self._sign(payload_b64)
|
|
return f"{payload_b64}.{signature}"
|
|
|
|
def verify_token(self, token: str) -> dict | None:
|
|
"""Verify and decode a token.
|
|
|
|
Returns the payload dict if valid, None if invalid or expired.
|
|
"""
|
|
try:
|
|
parts = token.split(".")
|
|
if len(parts) != 2:
|
|
return None
|
|
|
|
payload_b64, signature = parts
|
|
expected_sig = self._sign(payload_b64)
|
|
|
|
if not hmac.compare_digest(signature, expected_sig):
|
|
logger.warning("[AUTH] Invalid token signature")
|
|
return None
|
|
|
|
payload = json.loads(_b64_decode(payload_b64))
|
|
|
|
# Check expiration
|
|
if payload.get("exp", 0) < time.time():
|
|
logger.info(f"[AUTH] Token expired for {payload.get('pc', '?')}")
|
|
return None
|
|
|
|
return payload
|
|
except (json.JSONDecodeError, ValueError, KeyError) as e:
|
|
logger.warning(f"[AUTH] Token decode error: {e}")
|
|
return None
|
|
|
|
def _sign(self, data: str) -> str:
|
|
"""HMAC-SHA256 sign and return base64."""
|
|
sig = hmac.new(
|
|
self.secret.encode("utf-8"),
|
|
data.encode("utf-8"),
|
|
hashlib.sha256,
|
|
).digest()
|
|
return _b64_encode_bytes(sig)
|
|
|
|
|
|
def _b64_encode(data: str) -> str:
|
|
"""URL-safe base64 encode a string, no padding."""
|
|
return urlsafe_b64encode(data.encode("utf-8")).rstrip(b"=").decode("ascii")
|
|
|
|
|
|
def _b64_encode_bytes(data: bytes) -> str:
|
|
"""URL-safe base64 encode bytes, no padding."""
|
|
return urlsafe_b64encode(data).rstrip(b"=").decode("ascii")
|
|
|
|
|
|
def _b64_decode(data: str) -> str:
|
|
"""URL-safe base64 decode, handles missing padding."""
|
|
padded = data + "=" * (4 - len(data) % 4)
|
|
return urlsafe_b64decode(padded).decode("utf-8")
|