"""Authentication module — JWT token management for WebSocket Hub. Two-stage auth: 1. Extension connects with a registration code (built into .vsix at build time) 2. Hub validates the code and issues a short-lived JWT session token 3. Subsequent reconnections use the JWT token directly The master secret is stored server-side only (GRAVITY_HUB_SECRET env var). """ import hashlib import hmac import json import logging import os import time from base64 import urlsafe_b64decode, urlsafe_b64encode logger = logging.getLogger(__name__) # Defaults DEFAULT_TOKEN_TTL = 86400 # 24 hours DEFAULT_REGISTRATION_CODE = "" # Set via GRAVITY_REGISTRATION_CODE env var class TokenManager: """Manages JWT-like token creation and verification. Uses HMAC-SHA256 for signing. Tokens contain: - project: project name scope - pc: PC identifier (hostname or custom name) - iat: issued at (unix timestamp) - exp: expiration (unix timestamp) """ def __init__(self, secret: str = "", registration_code: str = ""): self.secret = secret or os.getenv("GRAVITY_HUB_SECRET", "") self.registration_code = registration_code or os.getenv( "GRAVITY_REGISTRATION_CODE", DEFAULT_REGISTRATION_CODE ) if not self.secret: # Auto-generate a secret if not set (ephemeral — tokens invalid after restart) self.secret = hashlib.sha256(os.urandom(32)).hexdigest() logger.warning( "[AUTH] No GRAVITY_HUB_SECRET set — generated ephemeral secret. " "Tokens will be invalid after server restart." ) def validate_registration_code(self, code: str) -> bool: """Check if the provided registration code matches.""" if not self.registration_code: # No registration code configured → allow all connections logger.warning("[AUTH] No registration code configured — accepting all") return True return hmac.compare_digest(code, self.registration_code) def create_token( self, project: str, pc_name: str, ttl: int = DEFAULT_TOKEN_TTL ) -> str: """Create a signed token for a specific project and PC. Returns a base64-encoded string: {header}.{payload}.{signature} """ now = int(time.time()) payload = { "project": project, "pc": pc_name, "iat": now, "exp": now + ttl, } payload_b64 = _b64_encode(json.dumps(payload)) signature = self._sign(payload_b64) return f"{payload_b64}.{signature}" def verify_token(self, token: str) -> dict | None: """Verify and decode a token. Returns the payload dict if valid, None if invalid or expired. """ try: parts = token.split(".") if len(parts) != 2: return None payload_b64, signature = parts expected_sig = self._sign(payload_b64) if not hmac.compare_digest(signature, expected_sig): logger.warning("[AUTH] Invalid token signature") return None payload = json.loads(_b64_decode(payload_b64)) # Check expiration if payload.get("exp", 0) < time.time(): logger.info(f"[AUTH] Token expired for {payload.get('pc', '?')}") return None return payload except (json.JSONDecodeError, ValueError, KeyError) as e: logger.warning(f"[AUTH] Token decode error: {e}") return None def _sign(self, data: str) -> str: """HMAC-SHA256 sign and return base64.""" sig = hmac.new( self.secret.encode("utf-8"), data.encode("utf-8"), hashlib.sha256, ).digest() return _b64_encode_bytes(sig) def _b64_encode(data: str) -> str: """URL-safe base64 encode a string, no padding.""" return urlsafe_b64encode(data.encode("utf-8")).rstrip(b"=").decode("ascii") def _b64_encode_bytes(data: bytes) -> str: """URL-safe base64 encode bytes, no padding.""" return urlsafe_b64encode(data).rstrip(b"=").decode("ascii") def _b64_decode(data: str) -> str: """URL-safe base64 decode, handles missing padding.""" padded = data + "=" * (4 - len(data) % 4) return urlsafe_b64decode(padded).decode("utf-8")