"""Quick test for WebSocket Hub connection and auth protocol.""" import asyncio import json import os import urllib.request from dotenv import load_dotenv load_dotenv() async def test_ws(): import websockets reg_code = os.getenv("GRAVITY_REGISTRATION_CODE") uri = "ws://localhost:8586/ws" print(f"1. Connecting to {uri}...") async with websockets.connect(uri) as ws: # Correct auth protocol: type=auth with registration_code auth_msg = { "type": "auth", "registration_code": reg_code, "project": "gravity_control", "pc": "test_pc", } await ws.send(json.dumps(auth_msg)) print("2. Sent auth message") resp = await asyncio.wait_for(ws.recv(), timeout=5) resp_data = json.loads(resp) print(f"3. Auth response: {json.dumps(resp_data, indent=2)}") if resp_data.get("type") == "auth_ok": print("4. AUTH SUCCESS!") conn_id = resp_data.get("conn_id", "") instance = resp_data.get("instance_number", "") print(f" conn_id={conn_id}, instance=#{instance}") # Send a chat message (Hub expects type='chat', not 'chat_snapshot') snap = { "type": "chat", "data": { "content": "[TEST] Hub WS test message", "project_name": "gravity_control", "conversation_id": "test-ws-hub-123", }, } await ws.send(json.dumps(snap)) print("5. Sent chat_snapshot via WS") # Check hub status via HTTP r = urllib.request.urlopen("http://localhost:8586/hub/status", timeout=3) status = json.loads(r.read()) print(f"6. Hub status: {json.dumps(status, indent=2)}") # Send register message reg = { "type": "register", "conversation_id": "test-ws-hub-123", "project_name": "gravity_control", } await ws.send(json.dumps(reg)) print("7. Sent register") else: reason = resp_data.get("reason", "unknown") print(f"4. AUTH FAILED: {reason}") await ws.close() print("8. Done!") if __name__ == "__main__": asyncio.run(test_ws())