diff --git a/public/classroom_poc.html b/public/classroom_poc.html index 704967d..06c030b 100644 --- a/public/classroom_poc.html +++ b/public/classroom_poc.html @@ -139,8 +139,9 @@

-
+
? +
You @@ -161,6 +162,14 @@

E Sit/Stand

+
+ + +
Virtual Classroom
@@ -225,6 +234,7 @@

const myCharacter = document.getElementById('myCharacter'); const myInitial = document.getElementById('myInitial'); const myNameLabel = document.getElementById('myNameLabel'); + const myPresenceBadge = document.getElementById('myPresenceBadge'); const chatMessages = document.getElementById('chatMessages'); const chatForm = document.getElementById('chatForm'); const chatInput = document.getElementById('chatInput'); @@ -234,6 +244,8 @@

const participantsList = document.getElementById('participantsList'); const desksContainer = document.getElementById('desksContainer'); const toastContainer = document.getElementById('toastContainer'); + const btnThumbsUp = document.getElementById('btnThumbsUp'); + const btnRaiseHand = document.getElementById('btnRaiseHand'); // COORDINATE HELPERS // Positions are stored and transmitted as NORMALIZED values (0.0 – 1.0) @@ -258,19 +270,27 @@

// State let ws = null; + let presenceWs = null; let roomId = ''; let myParticipantId = ''; let myDisplayName = ''; let mySessionId = ''; let reconnectAttempts = 0; + let presenceReconnectAttempts = 0; let intentionalDisconnect = false; const MAX_RECONNECT = 10; const BASE_DELAY = 1000; let reconnectTimer = null; + let presenceReconnectTimer = null; // participants: participant_id → { display_name, normPos:{x,y}, direction, is_moving, seat_id } // normPos is always in normalized (0–1) space. const participants = {}; + // user_id -> { x, y, emoji, hand_raised, display_name } + const presenceByUser = {}; + let pendingSelfPresenceReplay = false; + // seat_id -> { name, pid } + const seatOccupants = {}; const THROTTLE_MS = 100; let lastPositionSend = 0; @@ -387,15 +407,30 @@

} } - function updateSeatLabel(seatId, name) { - const label = document.querySelector(`[data-seat-label="${seatId}"]`); - if (label) { label.textContent = name; label.classList.remove('hidden'); } + function updateSeatLabel(seatId, name, pid = null) { + if (!seatId) return; + seatOccupants[seatId] = { name, pid }; + renderSeatLabel(seatId); } function clearSeatLabel(seatId) { + delete seatOccupants[seatId]; const label = document.querySelector(`[data-seat-label="${seatId}"]`); if (label) { label.textContent = ''; label.classList.add('hidden'); } } + function renderSeatLabel(seatId) { + const label = document.querySelector(`[data-seat-label="${seatId}"]`); + const occupant = seatOccupants[seatId]; + if (!label || !occupant) return; + const badge = occupant.pid ? presenceBadge(occupant.pid) : ''; + label.textContent = badge ? `${occupant.name} ${badge}` : occupant.name; + label.classList.remove('hidden'); + } + + function renderSeatLabels() { + Object.keys(seatOccupants).forEach((seatId) => renderSeatLabel(seatId)); + } + // Toast function showToast(msg, type = 'info') { const colors = { info: 'bg-blue-600', success: 'bg-emerald-600', warning: 'bg-amber-600', error: 'bg-red-600' }; @@ -435,6 +470,7 @@

renderDesks(); connectWS(); startGameLoop(); + updatePresenceControls(); refreshIcons(); } @@ -452,6 +488,7 @@

classroomScreen.classList.remove('flex'); lobby.classList.remove('hidden'); Object.keys(participants).forEach(k => delete participants[k]); + Object.keys(presenceByUser).forEach(k => delete presenceByUser[k]); avatarLayer.innerHTML = ''; chatMessages.innerHTML = '
No messages yet
'; chatFirstMessage = true; @@ -476,6 +513,7 @@

ws.onopen = () => { reconnectAttempts = 0; setConnStatus(true); + connectPresenceWS(); showToast('Connected to classroom', 'success'); }; @@ -498,7 +536,9 @@

function disconnectWS() { intentionalDisconnect = true; if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; } + if (presenceReconnectTimer) { clearTimeout(presenceReconnectTimer); presenceReconnectTimer = null; } if (ws) { ws.close(); ws = null; } + if (presenceWs) { presenceWs.close(); presenceWs = null; } setConnStatus(false); } @@ -524,6 +564,125 @@

if (ws && ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify(obj)); } + function connectPresenceWS() { + if (presenceWs && (presenceWs.readyState === WebSocket.OPEN || presenceWs.readyState === WebSocket.CONNECTING)) return; + const proto = location.protocol === 'https:' ? 'wss:' : 'ws:'; + const token = localStorage.getItem('edu_token') || ''; + const params = new URLSearchParams(); + if (token) params.set('token', token); + params.set('user_id', myParticipantId); + params.set('display_name', myDisplayName); + const url = `${proto}//${location.host}/api/presence/${encodeURIComponent(roomId)}?${params.toString()}`; + presenceWs = new WebSocket(url); + presenceWs.onopen = () => { + presenceReconnectAttempts = 0; + if (presenceReconnectTimer) { clearTimeout(presenceReconnectTimer); presenceReconnectTimer = null; } + updatePresenceControls(); + replaySelfPresence(); + }; + presenceWs.onmessage = (ev) => { + try { + handlePresenceMessage(JSON.parse(ev.data)); + } catch (err) { + console.warn('[presence] dropped malformed ws frame', err, ev.data); + } + }; + presenceWs.onerror = () => { + showToast('Presence channel error', 'warning'); + }; + presenceWs.onclose = () => { + if (intentionalDisconnect) return; + if (presenceReconnectAttempts >= MAX_RECONNECT) { + showToast('Lost connection to presence channel - please rejoin.', 'warning'); + return; + } + const delay = Math.min(BASE_DELAY * Math.pow(2, presenceReconnectAttempts), 30000); + presenceReconnectAttempts++; + presenceReconnectTimer = setTimeout(() => { + if (!presenceWs || presenceWs.readyState === WebSocket.CLOSED) connectPresenceWS(); + }, delay); + }; + } + + function presenceSend(obj) { + if (presenceWs && presenceWs.readyState === WebSocket.OPEN) { + presenceWs.send(JSON.stringify(obj)); + return; + } + pendingSelfPresenceReplay = true; + } + + function replaySelfPresence() { + const self = presenceByUser[myParticipantId]; + if (!self || !pendingSelfPresenceReplay) return; + if (!presenceWs || presenceWs.readyState !== WebSocket.OPEN) return; + presenceWs.send(JSON.stringify({ + type: 'presence', + x: Number.isFinite(Number(self.x)) ? Number(self.x) : 0.5, + y: Number.isFinite(Number(self.y)) ? Number(self.y) : 0.5, + emoji: typeof self.emoji === 'string' ? self.emoji : '', + hand_raised: self.hand_raised === true, + })); + pendingSelfPresenceReplay = false; + } + + function handlePresenceMessage(data) { + if (!data || typeof data !== 'object') return; + if (data.type === 'welcome' && data.state && typeof data.state === 'object') { + const localSelf = presenceByUser[myParticipantId] ? { ...presenceByUser[myParticipantId] } : null; + Object.keys(presenceByUser).forEach((k) => delete presenceByUser[k]); + Object.entries(data.state).forEach(([uid, state]) => { + presenceByUser[uid] = { + x: Number.isFinite(Number(state?.x)) ? Number(state.x) : 0.5, + y: Number.isFinite(Number(state?.y)) ? Number(state.y) : 0.5, + emoji: typeof state?.emoji === 'string' ? state.emoji : '', + hand_raised: state?.hand_raised === true, + display_name: typeof state?.display_name === 'string' ? state.display_name : uid, + }; + }); + if (localSelf) { + const serverSelf = presenceByUser[myParticipantId] || {}; + presenceByUser[myParticipantId] = { + ...serverSelf, + x: Number.isFinite(Number(localSelf.x)) ? Number(localSelf.x) : (Number.isFinite(Number(serverSelf.x)) ? Number(serverSelf.x) : 0.5), + y: Number.isFinite(Number(localSelf.y)) ? Number(localSelf.y) : (Number.isFinite(Number(serverSelf.y)) ? Number(serverSelf.y) : 0.5), + emoji: typeof localSelf.emoji === 'string' ? localSelf.emoji : (serverSelf.emoji || ''), + hand_raised: localSelf.hand_raised === true, + display_name: localSelf.display_name || serverSelf.display_name || myDisplayName, + }; + } + renderOtherAvatars(); + renderParticipantsList(); + renderSeatLabels(); + updatePresenceControls(); + return; + } + if (data.type === 'delta' && data.user_id) { + const current = presenceByUser[data.user_id] || { + x: 0.5, y: 0.5, emoji: '', hand_raised: false, display_name: data.user_id, + }; + const next = { ...current }; + if (Object.prototype.hasOwnProperty.call(data, 'x')) next.x = Number.isFinite(Number(data.x)) ? Number(data.x) : current.x; + if (Object.prototype.hasOwnProperty.call(data, 'y')) next.y = Number.isFinite(Number(data.y)) ? Number(data.y) : current.y; + if (Object.prototype.hasOwnProperty.call(data, 'emoji') && typeof data.emoji === 'string') next.emoji = data.emoji; + if (Object.prototype.hasOwnProperty.call(data, 'hand_raised') && typeof data.hand_raised === 'boolean') next.hand_raised = data.hand_raised; + if (Object.prototype.hasOwnProperty.call(data, 'display_name') && typeof data.display_name === 'string') next.display_name = data.display_name; + presenceByUser[data.user_id] = next; + renderOtherAvatars(); + renderParticipantsList(); + renderSeatLabels(); + updatePresenceControls(); + return; + } + if (data.type === 'leave' && data.user_id) { + delete presenceByUser[data.user_id]; + renderOtherAvatars(); + renderParticipantsList(); + renderSeatLabels(); + updatePresenceControls(); + } + } + // Message handler function handleMessage(data) { switch (data.type) { @@ -532,8 +691,14 @@

mySessionId = data.session_id; myParticipantId = data.participant_id; myDisplayName = data.display_name; + if (!presenceByUser[myParticipantId]) { + presenceByUser[myParticipantId] = { + x: 0.5, y: 0.5, emoji: '', hand_raised: false, display_name: myDisplayName, + }; + } myInitial.textContent = myDisplayName.charAt(0).toUpperCase(); myNameLabel.textContent = myDisplayName; + updatePresenceControls(); // Place the local character at the centre of the canvas (in px). myCharacter.style.display = ''; @@ -556,6 +721,7 @@

case 'participant_left': showToast(`${data.display_name} left the classroom`, 'info'); delete participants[data.participant_id]; + delete presenceByUser[data.participant_id]; renderOtherAvatars(); renderParticipantsList(); break; @@ -586,21 +752,44 @@

break; case 'seat_updated': + if (!participants[data.participant_id] && data.participant_id !== myParticipantId) { + participants[data.participant_id] = { + display_name: data.display_name || data.participant_id, + normPos: { x: 0.5, y: 0.5 }, + direction: 'down', + is_moving: false, + seat_id: '', + }; + } + if (participants[data.participant_id]) { + participants[data.participant_id].seat_id = data.seat_id || ''; + if (data.display_name) participants[data.participant_id].display_name = data.display_name; + } if (data.participant_id === myParticipantId) { isSeated = true; currentSeat = data.seat_id; + myCharacter.style.display = 'none'; } - updateSeatLabel(data.seat_id, data.display_name); + updateSeatLabel(data.seat_id, data.display_name, data.participant_id); showToast(`${data.display_name} took ${data.seat_id}`, 'info'); + renderOtherAvatars(); + renderParticipantsList(); break; case 'seat_left': + if (participants[data.participant_id]) { + participants[data.participant_id].seat_id = ''; + if (data.display_name) participants[data.participant_id].display_name = data.display_name; + } if (data.participant_id === myParticipantId) { isSeated = false; currentSeat = null; + myCharacter.style.display = ''; } clearSeatLabel(data.seat_id); showToast(`${data.display_name} left ${data.seat_id}`, 'info'); + renderOtherAvatars(); + renderParticipantsList(); break; case 'seat_occupied': @@ -614,6 +803,7 @@

// Room state (bulk sync on join / reconnect) function handleRoomState(data) { Object.keys(participants).forEach(k => delete participants[k]); + Object.keys(seatOccupants).forEach(k => delete seatOccupants[k]); document.querySelectorAll('[data-seat-label]').forEach(el => { el.textContent = ''; el.classList.add('hidden'); @@ -643,15 +833,18 @@

is_moving: p.is_moving || false, seat_id: p.seat_id || '', }; - if (p.seat_id) updateSeatLabel(p.seat_id, p.display_name); + if (p.seat_id) updateSeatLabel(p.seat_id, p.display_name, p.participant_id); }); // Restore own seat if server knows about it const selfP = (data.participants || []).find(p => p.participant_id === myParticipantId); if (selfP && selfP.seat_id) { - updateSeatLabel(selfP.seat_id, myDisplayName); + updateSeatLabel(selfP.seat_id, myDisplayName, myParticipantId); isSeated = true; currentSeat = selfP.seat_id; + myCharacter.style.display = 'none'; + } else { + myCharacter.style.display = ''; } participantCount.textContent = data.count || (Object.keys(participants).length + 1); @@ -668,6 +861,7 @@

const canvasRect = classroomCanvas.getBoundingClientRect(); Object.entries(participants).forEach(([pid, p]) => { + if (p.seat_id) return; const [c1, c2] = colorForId(pid); // Convert stored normalized position to local pixels @@ -685,9 +879,10 @@

el.style.zIndex = '41'; el.style.transform = 'translate(-50%, -50%)'; el.innerHTML = ` -
${(p.display_name || '?').charAt(0).toUpperCase()} + ${presenceBadge(pid)}
${escapeHtml(p.display_name || pid)} @@ -718,11 +913,68 @@

${escapeHtml(name)}${isSelf ? ' (You)' : ''}

${seatId ? 'Seat ' + escapeHtml(seatId) : 'Walking around'}

+

${presenceBadge(pid)}

`; return card; } + function presenceBadge(userId) { + const p = presenceByUser[userId]; + if (!p) return ''; + // Returned string is safe for innerHTML interpolation (emoji is escaped). + let badge = ''; + if (p.emoji) badge += escapeHtml(p.emoji) + ' '; + if (p.hand_raised) badge += '✋'; + return badge.trim(); + } + + function updatePresenceControls() { + const self = presenceByUser[myParticipantId] || { emoji: '', hand_raised: false }; + btnThumbsUp.classList.toggle('bg-teal-700', self.emoji === '👍'); + btnThumbsUp.classList.toggle('border-teal-500', self.emoji === '👍'); + btnRaiseHand.classList.toggle('bg-teal-700', self.hand_raised === true); + btnRaiseHand.classList.toggle('border-teal-500', self.hand_raised === true); + myPresenceBadge.textContent = presenceBadge(myParticipantId); + renderSeatLabels(); + } + + btnThumbsUp.addEventListener('click', () => { + const current = presenceByUser[myParticipantId] || {}; + const nextEmoji = current.emoji === '👍' ? '' : '👍'; + pendingSelfPresenceReplay = true; + presenceByUser[myParticipantId] = { + x: Number.isFinite(Number(current.x)) ? Number(current.x) : 0.5, + y: Number.isFinite(Number(current.y)) ? Number(current.y) : 0.5, + emoji: nextEmoji, + hand_raised: current.hand_raised === true, + display_name: current.display_name || myDisplayName, + }; + renderOtherAvatars(); + renderParticipantsList(); + renderSeatLabels(); + updatePresenceControls(); + presenceSend({ type: 'presence', emoji: nextEmoji }); + }); + + btnRaiseHand.addEventListener('click', () => { + const current = presenceByUser[myParticipantId] || {}; + const nextHandRaised = current.hand_raised !== true; + pendingSelfPresenceReplay = true; + presenceByUser[myParticipantId] = { + x: Number.isFinite(Number(current.x)) ? Number(current.x) : 0.5, + y: Number.isFinite(Number(current.y)) ? Number(current.y) : 0.5, + emoji: typeof current.emoji === 'string' ? current.emoji : '', + hand_raised: nextHandRaised, + display_name: current.display_name || myDisplayName, + }; + renderOtherAvatars(); + renderParticipantsList(); + renderSeatLabels(); + updatePresenceControls(); + presenceSend({ type: 'presence', hand_raised: nextHandRaised }); + }); + // Chat chatForm.addEventListener('submit', (e) => { e.preventDefault(); @@ -836,6 +1088,7 @@

myIsMoving = Math.abs(myVel.x) > 0.1 || Math.abs(myVel.y) > 0.1; + myCharacter.style.display = isSeated ? 'none' : ''; myCharacter.style.left = myPos.x + 'px'; myCharacter.style.top = myPos.y + 'px'; myCharacter.style.transform = 'translate(-50%, -50%)'; diff --git a/src/worker.py b/src/worker.py index 25001e3..f097b9e 100644 --- a/src/worker.py +++ b/src/worker.py @@ -1697,6 +1697,313 @@ def _persist_attachment(self, session_id, info): print(f"[ClassroomDO._persist_attachment] sid={session_id} pid={info.get('participant_id')} error={exc!r}") +class PresenceDO(DurableObject): + """Room-scoped real-time user presence Durable Object.""" + + def __init__(self, ctx, env): + super().__init__(ctx, env) + # session_id -> {ws, user_id, display_name} + self.sessions = {} + # user_id -> {x, y, emoji, hand_raised, display_name} + self.presence = {} + + for ws in self.ctx.getWebSockets(): + try: + attachment = ws.deserializeAttachment() + if not attachment: + continue + data = json.loads(attachment) if isinstance(attachment, str) else attachment + session_id = data.get("session_id", str(uuid.uuid4())) + user_id = str(data.get("user_id", ""))[:64] + display_name = str(data.get("display_name", user_id or "Unknown"))[:64] + if not user_id: + continue + + self.sessions[session_id] = { + "ws": ws, + "user_id": user_id, + "display_name": display_name, + } + if user_id not in self.presence: + self.presence[user_id] = { + "x": self._clamp_01(data.get("x", 0.5)), + "y": self._clamp_01(data.get("y", 0.5)), + "emoji": data.get("emoji", "") if isinstance(data.get("emoji", ""), str) else "", + "hand_raised": data.get("hand_raised", False) is True, + "display_name": display_name, + } + except Exception as exc: + print(f"[PresenceDO.__init__.restore] error={exc!r}") + + self.ctx.setWebSocketAutoResponse( + WebSocketRequestResponsePair.new("ping", "pong") + ) + + async def on_fetch(self, request): + upgrade = request.headers.get("Upgrade") or "" + if upgrade.lower() != "websocket": + return Response( + json.dumps({"error": "Expected WebSocket upgrade"}), + status=426, + headers={"Content-Type": "application/json"}, + ) + + parsed = urlparse(request.url) + qs = parse_qs(parsed.query) + token_param = (qs.get("token") or [None])[0] + user_param = (qs.get("user_id") or [None])[0] + display_param = (qs.get("display_name") or [None])[0] + + allow_presence_setting = getattr(self.env, "ALLOW_ANON_PRESENCE", None) + if allow_presence_setting is None: + allow_presence_setting = getattr(self.env, "ALLOW_ANON_CLASSROOM_POC", "") + allow_anonymous = str(allow_presence_setting).lower() in {"1", "true", "yes"} + authenticated_user = verify_token(token_param or "", self.env.JWT_SECRET) if token_param else None + + if authenticated_user: + user_id = str(authenticated_user.get("id", "")) + display_name = str(authenticated_user.get("username") or user_id) + else: + if token_param or not allow_anonymous or not user_param: + return Response( + json.dumps({"error": "Authentication required"}), + status=401, + headers={"Content-Type": "application/json"}, + ) + user_id = str(user_param) + display_name = str(display_param or user_id) + + user_id = user_id[:64] + display_name = display_name[:64] + if not user_id: + return Response( + json.dumps({"error": "Invalid user_id"}), + status=400, + headers={"Content-Type": "application/json"}, + ) + + client, server = WebSocketPair.new().object_values() + self.ctx.acceptWebSocket(server) + + session_id = str(uuid.uuid4()) + existing = self.presence.get(user_id) + if existing is None: + existing = { + "x": 0.5, + "y": 0.5, + "emoji": "", + "hand_raised": False, + "display_name": display_name, + } + self.presence[user_id] = dict(existing) + else: + existing["display_name"] = display_name + self.presence[user_id] = existing + + attachment = json.dumps({ + "session_id": session_id, + "user_id": user_id, + "display_name": display_name, + "x": existing["x"], + "y": existing["y"], + "emoji": existing["emoji"], + "hand_raised": existing["hand_raised"], + }) + server.serializeAttachment(attachment) + + self.sessions[session_id] = { + "ws": server, + "user_id": user_id, + "display_name": display_name, + } + + self._send_welcome(server, session_id, user_id) + self._broadcast( + json.dumps({ + "type": "delta", + "user_id": user_id, + "display_name": display_name, + "x": existing["x"], + "y": existing["y"], + "emoji": existing["emoji"], + "hand_raised": existing["hand_raised"], + }), + exclude_session_id=session_id, + ) + + return Response(None, status=101, web_socket=client) + + async def on_webSocketMessage(self, ws, message): + try: + raw = message if isinstance(message, str) else message.decode("utf-8") + if len(raw) > 512: + print("[PresenceDO.on_webSocketMessage] dropped oversized payload") + return + data = json.loads(raw) + except Exception as exc: + await capture_exception(exc, None, self.env, "presence_on_webSocketMessage.parse") + return + + if not isinstance(data, dict): + return + + session = self._session_for_ws(ws) + if not session: + return + sid, info = session + user_id = info["user_id"] + current = self.presence.get(user_id) + if current is None: + current = { + "x": 0.5, + "y": 0.5, + "emoji": "", + "hand_raised": False, + "display_name": info["display_name"], + } + self.presence[user_id] = current + + msg_type = data.get("type", "") + if msg_type == "join": + self._send_welcome(ws, sid, user_id) + return + + if msg_type != "presence": + return + + delta = {"type": "delta", "user_id": user_id} + changed = False + + if "x" in data: + next_x = self._clamp_01(data.get("x")) + if next_x != current["x"]: + current["x"] = next_x + delta["x"] = next_x + changed = True + + if "y" in data: + next_y = self._clamp_01(data.get("y")) + if next_y != current["y"]: + current["y"] = next_y + delta["y"] = next_y + changed = True + + if "emoji" in data and isinstance(data.get("emoji"), str): + next_emoji = data.get("emoji", "")[:32] + if next_emoji != current["emoji"]: + current["emoji"] = next_emoji + delta["emoji"] = next_emoji + changed = True + + if "hand_raised" in data and isinstance(data.get("hand_raised"), bool): + next_hand = data.get("hand_raised") + if next_hand != current["hand_raised"]: + current["hand_raised"] = next_hand + delta["hand_raised"] = next_hand + changed = True + + if "display_name" in data and isinstance(data.get("display_name"), str): + next_display_name = data.get("display_name", "").strip()[:64] + if next_display_name and next_display_name != current.get("display_name", ""): + current["display_name"] = next_display_name + delta["display_name"] = next_display_name + for session_info in self.sessions.values(): + if session_info["user_id"] == user_id: + session_info["display_name"] = next_display_name + changed = True + + if not changed: + return + + self.presence[user_id] = current + self._persist_user_attachments(user_id) + self._broadcast(json.dumps(delta), exclude_session_id=sid) + + async def on_webSocketClose(self, ws, _code, _reason, _was_clean): + session = self._session_for_ws(ws) + if not session: + return + + sid, info = session + user_id = info["user_id"] + self.sessions.pop(sid, None) + + still_connected = any(s["user_id"] == user_id for s in self.sessions.values()) + if not still_connected: + self.presence.pop(user_id, None) + self._broadcast(json.dumps({"type": "leave", "user_id": user_id})) + + async def on_webSocketError(self, _ws, error): + print(f"[PresenceDO.on_webSocketError] error={error!r}") + + def _send_welcome(self, ws, session_id, user_id): + snapshot = {uid: dict(state) for uid, state in self.presence.items()} + try: + ws.send(json.dumps({ + "type": "welcome", + "session_id": session_id, + "user_id": user_id, + "state": snapshot, + })) + except Exception as exc: + print(f"[PresenceDO._send_welcome] error={exc!r}") + + def _session_for_ws(self, ws): + try: + raw = ws.deserializeAttachment() + if raw: + data = json.loads(raw) if isinstance(raw, str) else raw + session_id = data.get("session_id", "") + if session_id and session_id in self.sessions: + return session_id, self.sessions[session_id] + except Exception as exc: + print(f"[PresenceDO._session_for_ws.deserialize] error={exc!r}") + + for sid, info in self.sessions.items(): + try: + if info["ws"] == ws: + return sid, info + except Exception as exc: + print(f"[PresenceDO._session_for_ws.fallback] sid={sid} error={exc!r}") + return None + + def _broadcast(self, payload, exclude_session_id=None): + for sid, info in self.sessions.items(): + if sid == exclude_session_id: + continue + try: + info["ws"].send(payload) + except Exception as exc: + print(f"[PresenceDO._broadcast] sid={sid} user_id={info.get('user_id')} error={exc!r}") + + def _persist_user_attachments(self, user_id): + state = self.presence.get(user_id) + if not state: + return + for sid, info in self.sessions.items(): + if info["user_id"] != user_id: + continue + try: + info["ws"].serializeAttachment(json.dumps({ + "session_id": sid, + "user_id": user_id, + "display_name": info["display_name"], + "x": state["x"], + "y": state["y"], + "emoji": state["emoji"], + "hand_raised": state["hand_raised"], + })) + except Exception as exc: + print(f"[PresenceDO._persist_user_attachments] sid={sid} user_id={user_id} error={exc!r}") + + @staticmethod + def _clamp_01(value): + try: + return max(0.0, min(1.0, float(value))) + except (TypeError, ValueError): + return 0.5 + + # --------------------------------------------------------------------------- # Main dispatcher # --------------------------------------------------------------------------- @@ -1725,6 +2032,17 @@ async def _dispatch(request, env): await capture_exception(e, request, env, "classroom_do_dispatch") return err("Failed to connect to classroom", 500) + m_presence = re.fullmatch(r"/api/presence/([A-Za-z0-9_-]+)", path) + if m_presence: + room_id = m_presence.group(1) + try: + do_id = env.PRESENCE_DO.idFromName(room_id) + stub = env.PRESENCE_DO.get(do_id) + return await stub.fetch(request) + except Exception as e: + await capture_exception(e, request, env, "presence_do_dispatch") + return err("Failed to connect to presence channel", 500) + if path.startswith("/api/"): if path == "/api/init" and method == "POST": try: diff --git a/tests/conftest.py b/tests/conftest.py index d964287..58530c0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -22,10 +22,11 @@ class _Response: """Minimal stub for workers.Response.""" - def __init__(self, body="", *, status=200, headers=None): + def __init__(self, body="", *, status=200, headers=None, web_socket=None, **_kwargs): self.body = body self.status = status self.headers = dict(headers or {}) + self.web_socket = web_socket def json(self): import json diff --git a/tests/test_presence_do.py b/tests/test_presence_do.py new file mode 100644 index 0000000..9372bbb --- /dev/null +++ b/tests/test_presence_do.py @@ -0,0 +1,527 @@ +""" +Unit tests for PresenceDO. + +Stubs are installed by conftest.py before any test file imports worker.py. +All WebSocket interactions use unittest.mock.MagicMock so no real runtime +primitives are needed. +""" + +import json +import pytest +from unittest.mock import MagicMock, AsyncMock + +from tests.helpers import load_worker, make_env + +worker = load_worker() + + +# --------------------------------------------------------------------------- +# Helpers: fake ctx / ws / request objects +# --------------------------------------------------------------------------- + +def _make_ctx(sockets=None): + ctx = MagicMock() + ctx.getWebSockets.return_value = sockets or [] + ctx.setWebSocketAutoResponse.return_value = None + ctx.acceptWebSocket.return_value = None + return ctx + + +def _make_ws(attachment=None): + """Return a MagicMock WebSocket with optional serialized attachment.""" + ws = MagicMock() + ws.deserializeAttachment.return_value = ( + json.dumps(attachment) if attachment else None + ) + ws.serializeAttachment.return_value = None + ws.send.return_value = None + return ws + + +def _make_presence_env(*, allow_anon="true", jwt_secret="test-jwt-secret"): + env = make_env(jwt_secret=jwt_secret) + env.ALLOW_ANON_PRESENCE = allow_anon + return env + + +def _make_request(path="ws://localhost/api/presence/room1", + upgrade="websocket", + token=None, user_id=None, display_name=None): + """Build a minimal fake request object.""" + qs_parts = [] + if token: + qs_parts.append(f"token={token}") + if user_id: + qs_parts.append(f"user_id={user_id}") + if display_name: + qs_parts.append(f"display_name={display_name}") + qs = "&".join(qs_parts) + url = f"{path}{'?' + qs if qs else ''}" + + req = MagicMock() + req.url = url + req.headers = MagicMock() + req.headers.get = lambda k, d=None: upgrade if k == "Upgrade" else d + return req + + +def _make_presence_do(allow_anon="true"): + """Return a fresh PresenceDO with no prior sessions.""" + ctx = _make_ctx() + env = _make_presence_env(allow_anon=allow_anon) + return worker.PresenceDO(ctx, env) + + +# =========================================================================== +# 1. Constructor / hibernation restore +# =========================================================================== + +class TestPresenceDOInit: + def test_empty_init(self): + do = _make_presence_do() + assert do.sessions == {} + assert do.presence == {} + + def test_restores_hibernated_socket(self): + attachment = { + "session_id": "sid-1", + "user_id": "alice", + "display_name": "Alice", + "x": 0.3, "y": 0.7, "emoji": "👋", "hand_raised": True, + } + ws = _make_ws(attachment=attachment) + ctx = _make_ctx(sockets=[ws]) + do = worker.PresenceDO(ctx, _make_presence_env()) + + assert "sid-1" in do.sessions + assert do.presence["alice"]["x"] == pytest.approx(0.3) + assert do.presence["alice"]["hand_raised"] is True + + def test_skips_socket_with_no_attachment(self): + ws = _make_ws(attachment=None) + ctx = _make_ctx(sockets=[ws]) + do = worker.PresenceDO(ctx, _make_presence_env()) + assert do.sessions == {} + assert do.presence == {} + + def test_does_not_overwrite_existing_presence_on_restore(self): + """If two sockets belong to the same user, presence is only set once.""" + att1 = {"session_id": "s1", "user_id": "alice", "display_name": "Alice", + "x": 0.1, "y": 0.1, "emoji": "", "hand_raised": False} + att2 = {"session_id": "s2", "user_id": "alice", "display_name": "Alice", + "x": 0.9, "y": 0.9, "emoji": "", "hand_raised": True} + ws1, ws2 = _make_ws(att1), _make_ws(att2) + ctx = _make_ctx(sockets=[ws1, ws2]) + do = worker.PresenceDO(ctx, _make_presence_env()) + + # First socket wins for the presence record + assert do.presence["alice"]["x"] == pytest.approx(0.1) + + +# =========================================================================== +# 2. on_fetch – WebSocket upgrade +# =========================================================================== + +class TestPresenceDOOnFetch: + async def test_rejects_non_websocket(self): + do = _make_presence_do() + req = _make_request(upgrade="") + resp = await do.on_fetch(req) + assert resp.status == 426 + + async def test_rejects_unauthenticated_when_anon_disabled(self): + do = _make_presence_do(allow_anon="false") + req = _make_request(upgrade="websocket") # no token, no user_id + resp = await do.on_fetch(req) + assert resp.status == 401 + + async def test_rejects_when_anon_disabled_even_with_user_id(self): + do = _make_presence_do(allow_anon="false") + req = _make_request(upgrade="websocket", user_id="bob") + resp = await do.on_fetch(req) + assert resp.status == 401 + + async def test_accepts_anon_join(self): + do = _make_presence_do(allow_anon="true") + req = _make_request(upgrade="websocket", user_id="bob", display_name="Bob") + resp = await do.on_fetch(req) + assert resp.status == 101 + assert len(do.sessions) == 1 + assert "bob" in do.presence + + async def test_authenticated_join_uses_token_identity(self): + env = _make_presence_env(allow_anon="false") + ctx = _make_ctx() + do = worker.PresenceDO(ctx, env) + token = worker.create_token("token-user", "TokenName", "student", env.JWT_SECRET) + req = _make_request( + upgrade="websocket", + token=token, + user_id="query-user", + display_name="QueryName", + ) + resp = await do.on_fetch(req) + assert resp.status == 101 + assert "token-user" in do.presence + assert "query-user" not in do.presence + assert do.presence["token-user"]["display_name"] == "TokenName" + + async def test_rejects_empty_user_from_token(self): + env = _make_presence_env(allow_anon="false") + ctx = _make_ctx() + do = worker.PresenceDO(ctx, env) + token = worker.create_token("", "NoId", "student", env.JWT_SECRET) + req = _make_request(upgrade="websocket", token=token) + resp = await do.on_fetch(req) + assert resp.status == 400 + + async def test_welcome_message_contains_full_state(self): + do = _make_presence_do() + # Pre-populate an existing user + do.presence["old-user"] = { + "x": 0.1, "y": 0.9, "emoji": "", "hand_raised": False, + "display_name": "Old", + } + + req = _make_request(upgrade="websocket", user_id="newbie", display_name="Newbie") + await do.on_fetch(req) + + sid = next(iter(do.sessions)) + server_ws = do.sessions[sid]["ws"] + welcome = json.loads(server_ws.send.call_args_list[0][0][0]) + assert welcome["type"] == "welcome" + assert "old-user" in welcome["state"] + assert "newbie" in welcome["state"] + + async def test_second_join_broadcasts_delta_to_first(self): + do = _make_presence_do() + + await do.on_fetch(_make_request(upgrade="websocket", user_id="alice", display_name="Alice")) + alice_sid = next(iter(do.sessions)) + alice_ws = do.sessions[alice_sid]["ws"] + alice_ws.send.reset_mock() + + await do.on_fetch(_make_request(upgrade="websocket", user_id="bob", display_name="Bob")) + + calls = [json.loads(c[0][0]) for c in alice_ws.send.call_args_list] + assert any(c.get("type") == "delta" and c.get("user_id") == "bob" + for c in calls) + + async def test_user_id_sanitised_to_64_chars(self): + do = _make_presence_do() + uid = "a" * 100 + await do.on_fetch(_make_request(upgrade="websocket", user_id=uid)) + assert uid[:64] in do.presence + assert uid not in do.presence # raw 100-char uid must not exist + + async def test_display_name_sanitised_to_64_chars(self): + do = _make_presence_do() + dname = "b" * 100 + await do.on_fetch(_make_request(upgrade="websocket", user_id="alice", display_name=dname)) + assert do.presence["alice"]["display_name"] == dname[:64] + + async def test_same_user_can_have_multiple_sessions(self): + """Multi-tab: same user_id, two independent WS connections.""" + do = _make_presence_do() + for _ in range(2): + await do.on_fetch( + _make_request(upgrade="websocket", user_id="alice", display_name="Alice") + ) + alice_sessions = [s for s in do.sessions.values() if s["user_id"] == "alice"] + assert len(alice_sessions) == 2 + # Presence has only one record for alice + assert list(do.presence.keys()).count("alice") == 1 + + +# =========================================================================== +# 3. on_webSocketMessage – presence updates +# =========================================================================== + +class TestPresenceDOMessage: + async def _setup_user(self, user_id="alice", display_name="Alice"): + """Helper: create DO, join one user, and return (do, session_id, ws).""" + do = _make_presence_do() + req = _make_request(upgrade="websocket", user_id=user_id, display_name=display_name) + await do.on_fetch(req) + sid = next(s for s, info in do.sessions.items() if info["user_id"] == user_id) + ws = do.sessions[sid]["ws"] + # Teach the fake ws to return its own attachment via deserializeAttachment + ws.deserializeAttachment.return_value = json.dumps({ + "session_id": sid, "user_id": user_id, "display_name": display_name, + "x": 0.5, "y": 0.5, "emoji": "", "hand_raised": False, + }) + return do, sid, ws + + async def test_position_update_changes_presence(self): + do, sid, ws = await self._setup_user() + await do.on_webSocketMessage(ws, json.dumps({"type": "presence", "x": 0.8, "y": 0.2})) + assert do.presence["alice"]["x"] == pytest.approx(0.8) + assert do.presence["alice"]["y"] == pytest.approx(0.2) + + async def test_only_x_changed(self): + """Sending only x must not modify y.""" + do, sid, ws = await self._setup_user() + await do.on_webSocketMessage(ws, json.dumps({"type": "presence", "x": 0.9})) + assert do.presence["alice"]["x"] == pytest.approx(0.9) + assert do.presence["alice"]["y"] == pytest.approx(0.5) # unchanged + + async def test_no_broadcast_on_no_change(self): + """If nothing actually changed, skip broadcast entirely.""" + do, _sid, ws = await self._setup_user() + ws.send.reset_mock() + # Send the same values that are already stored (defaults) + await do.on_webSocketMessage(ws, json.dumps({"type": "presence", "x": 0.5, "y": 0.5})) + ws.send.assert_not_called() + + async def test_position_clamped_to_01(self): + do, _sid, ws = await self._setup_user() + await do.on_webSocketMessage(ws, json.dumps({"type": "presence", "x": 99.0, "y": -5.0})) + assert do.presence["alice"]["x"] == pytest.approx(1.0) + assert do.presence["alice"]["y"] == pytest.approx(0.0) + + async def test_emoji_update(self): + do, _sid, ws = await self._setup_user() + await do.on_webSocketMessage(ws, json.dumps({"type": "presence", "emoji": "🎉"})) + assert do.presence["alice"]["emoji"] == "🎉" + + async def test_emoji_cleared(self): + do, sid, ws = await self._setup_user() + do.presence["alice"]["emoji"] = "🎉" + # Reset attachment to reflect current emoji + ws.deserializeAttachment.return_value = json.dumps({ + "session_id": sid, "user_id": "alice", "display_name": "Alice", + "x": 0.5, "y": 0.5, "emoji": "🎉", "hand_raised": False, + }) + await do.on_webSocketMessage(ws, json.dumps({"type": "presence", "emoji": ""})) + assert do.presence["alice"]["emoji"] == "" + + async def test_hand_raised_toggle(self): + do, sid, ws = await self._setup_user() + await do.on_webSocketMessage(ws, json.dumps({"type": "presence", "hand_raised": True})) + assert do.presence["alice"]["hand_raised"] is True + # Update attachment to reflect raised state + ws.deserializeAttachment.return_value = json.dumps({ + "session_id": sid, "user_id": "alice", "display_name": "Alice", + "x": 0.5, "y": 0.5, "emoji": "", "hand_raised": True, + }) + await do.on_webSocketMessage(ws, json.dumps({"type": "presence", "hand_raised": False})) + assert do.presence["alice"]["hand_raised"] is False + + async def test_non_bool_hand_raised_ignored(self): + do, _sid, ws = await self._setup_user() + ws.send.reset_mock() + await do.on_webSocketMessage(ws, json.dumps({"type": "presence", "hand_raised": "yes"})) + # hand_raised must stay False (non-bool rejected) + assert do.presence["alice"]["hand_raised"] is False + ws.send.assert_not_called() + + async def test_join_message_returns_welcome(self): + do, _sid, ws = await self._setup_user() + ws.send.reset_mock() + await do.on_webSocketMessage(ws, json.dumps({"type": "join"})) + call = json.loads(ws.send.call_args_list[0][0][0]) + assert call["type"] == "welcome" + assert "alice" in call["state"] + + async def test_oversized_message_ignored(self): + do, _sid, ws = await self._setup_user() + ws.send.reset_mock() + big = json.dumps({"type": "presence", "x": 0.1, "data": "A" * 600}) + await do.on_webSocketMessage(ws, big) + # Presence must NOT change + assert do.presence["alice"]["x"] == pytest.approx(0.5) + + async def test_invalid_json_ignored(self): + do, sid, ws = await self._setup_user() + ws.send.reset_mock() + await do.on_webSocketMessage(ws, "not-json{{") + ws.send.assert_not_called() + + async def test_non_dict_payload_ignored(self): + do, sid, ws = await self._setup_user() + ws.send.reset_mock() + await do.on_webSocketMessage(ws, json.dumps([1, 2, 3])) + ws.send.assert_not_called() + + async def test_broadcast_reaches_second_client(self): + """Delta for alice's move must reach bob but NOT be echoed back to alice.""" + do = _make_presence_do() + + await do.on_fetch(_make_request(upgrade="websocket", user_id="alice", display_name="Alice")) + alice_sid = next(s for s, i in do.sessions.items() if i["user_id"] == "alice") + alice_ws = do.sessions[alice_sid]["ws"] + alice_ws.deserializeAttachment.return_value = json.dumps({ + "session_id": alice_sid, "user_id": "alice", "display_name": "Alice", + "x": 0.5, "y": 0.5, "emoji": "", "hand_raised": False, + }) + + await do.on_fetch(_make_request(upgrade="websocket", user_id="bob", display_name="Bob")) + bob_sid = next(s for s, i in do.sessions.items() if i["user_id"] == "bob") + bob_ws = do.sessions[bob_sid]["ws"] + + alice_ws.send.reset_mock() + bob_ws.send.reset_mock() + + await do.on_webSocketMessage(alice_ws, json.dumps({"type": "presence", "x": 0.1})) + + bob_msgs = [json.loads(c[0][0]) for c in bob_ws.send.call_args_list] + alice_msgs = [json.loads(c[0][0]) for c in alice_ws.send.call_args_list] + + assert any(m.get("type") == "delta" and m.get("user_id") == "alice" + for m in bob_msgs), "Bob should receive alice's delta" + assert not any(m.get("type") == "delta" for m in alice_msgs), \ + "Alice should NOT receive her own delta" + + async def test_delta_contains_only_changed_fields(self): + """Only the changed field (x) should appear in the broadcast delta.""" + do = _make_presence_do() + + await do.on_fetch(_make_request(upgrade="websocket", user_id="alice", display_name="Alice")) + alice_sid = next(s for s, i in do.sessions.items() if i["user_id"] == "alice") + alice_ws = do.sessions[alice_sid]["ws"] + alice_ws.deserializeAttachment.return_value = json.dumps({ + "session_id": alice_sid, "user_id": "alice", "display_name": "Alice", + "x": 0.5, "y": 0.5, "emoji": "", "hand_raised": False, + }) + + await do.on_fetch(_make_request(upgrade="websocket", user_id="bob", display_name="Bob")) + bob_sid = next(s for s, i in do.sessions.items() if i["user_id"] == "bob") + bob_ws = do.sessions[bob_sid]["ws"] + bob_ws.send.reset_mock() + + # Alice moves only in x + await do.on_webSocketMessage(alice_ws, json.dumps({"type": "presence", "x": 0.9})) + + bob_deltas = [json.loads(c[0][0]) for c in bob_ws.send.call_args_list + if json.loads(c[0][0]).get("type") == "delta"] + assert len(bob_deltas) == 1 + delta = bob_deltas[0] + assert "x" in delta + assert "y" not in delta + assert "emoji" not in delta + assert "hand_raised" not in delta + + +# =========================================================================== +# 4. on_webSocketClose – disconnect / leave +# =========================================================================== + +class TestPresenceDODisconnect: + async def test_single_user_disconnect_clears_presence(self): + do = _make_presence_do() + req = _make_request(upgrade="websocket", user_id="alice", display_name="Alice") + await do.on_fetch(req) + + sid = next(iter(do.sessions)) + ws = do.sessions[sid]["ws"] + ws.deserializeAttachment.return_value = json.dumps({ + "session_id": sid, "user_id": "alice", "display_name": "Alice", + "x": 0.5, "y": 0.5, "emoji": "", "hand_raised": False, + }) + + await do.on_webSocketClose(ws, 1000, "normal", True) + + assert sid not in do.sessions + assert "alice" not in do.presence + + async def test_disconnect_broadcasts_leave_to_others(self): + do = _make_presence_do() + + await do.on_fetch(_make_request(upgrade="websocket", user_id="alice", display_name="Alice")) + alice_sid = next(s for s, i in do.sessions.items() if i["user_id"] == "alice") + alice_ws = do.sessions[alice_sid]["ws"] + alice_ws.deserializeAttachment.return_value = json.dumps({ + "session_id": alice_sid, "user_id": "alice", "display_name": "Alice", + "x": 0.5, "y": 0.5, "emoji": "", "hand_raised": False, + }) + + await do.on_fetch(_make_request(upgrade="websocket", user_id="bob", display_name="Bob")) + bob_sid = next(s for s, i in do.sessions.items() if i["user_id"] == "bob") + bob_ws = do.sessions[bob_sid]["ws"] + bob_ws.send.reset_mock() + + await do.on_webSocketClose(alice_ws, 1000, "normal", True) + + leave_msgs = [json.loads(c[0][0]) for c in bob_ws.send.call_args_list if c[0]] + assert any(m.get("type") == "leave" and m.get("user_id") == "alice" + for m in leave_msgs) + + async def test_multi_tab_not_evicted_until_last_socket_closes(self): + """Presence stays alive until the very last socket for that user closes.""" + do = _make_presence_do() + + await do.on_fetch(_make_request(upgrade="websocket", user_id="alice", display_name="Alice")) + await do.on_fetch(_make_request(upgrade="websocket", user_id="alice", display_name="Alice")) + + sids = [s for s, i in do.sessions.items() if i["user_id"] == "alice"] + assert len(sids) == 2 + + # Close first socket + ws1 = do.sessions[sids[0]]["ws"] + ws1.deserializeAttachment.return_value = json.dumps({ + "session_id": sids[0], "user_id": "alice", "display_name": "Alice", + "x": 0.5, "y": 0.5, "emoji": "", "hand_raised": False, + }) + await do.on_webSocketClose(ws1, 1000, "", True) + assert "alice" in do.presence # second tab still open + + # Close second socket + ws2 = do.sessions[sids[1]]["ws"] + ws2.deserializeAttachment.return_value = json.dumps({ + "session_id": sids[1], "user_id": "alice", "display_name": "Alice", + "x": 0.5, "y": 0.5, "emoji": "", "hand_raised": False, + }) + await do.on_webSocketClose(ws2, 1000, "", True) + assert "alice" not in do.presence # now fully gone + + async def test_unknown_ws_on_close_is_safe(self): + """on_webSocketClose with an unrecognised ws must not raise.""" + do = _make_presence_do() + orphan_ws = _make_ws() # not registered in sessions + await do.on_webSocketClose(orphan_ws, 1000, "", True) # must not raise + + +# =========================================================================== +# 5. Dispatcher – /api/presence/ route +# =========================================================================== + +class TestPresenceDispatch: + async def test_presence_route_dispatches_to_do(self): + stub = MagicMock() + stub.fetch = AsyncMock(return_value=MagicMock(status=101)) + + do_ns = MagicMock() + do_ns.idFromName.return_value = "fake-id" + do_ns.get.return_value = stub + + env = make_env() + env.CLASSROOM_DO = MagicMock() + env.PRESENCE_DO = do_ns + + req = MagicMock() + req.method = "GET" + req.headers = MagicMock() + req.headers.get = lambda k, d=None: "websocket" if k == "Upgrade" else d + req.url = "https://example.com/api/presence/room-42?user_id=alice" + + resp = await worker._dispatch(req, env) + do_ns.idFromName.assert_called_once_with("room-42") + stub.fetch.assert_awaited_once() + + async def test_invalid_room_id_not_matched(self): + """Room IDs with special characters must NOT match the route.""" + env = make_env() + env.CLASSROOM_DO = MagicMock() + env.PRESENCE_DO = MagicMock() + + req = MagicMock() + req.method = "GET" + req.headers = MagicMock() + req.headers.get = lambda k, d=None: None + req.url = "https://example.com/api/presence/room id with spaces" + + # Falls through to static serving → 404 (no static content in mock) + resp = await worker._dispatch(req, env) + # PRESENCE_DO must not have been touched + env.PRESENCE_DO.idFromName.assert_not_called() + assert resp.status == 404 diff --git a/wrangler.toml b/wrangler.toml index 516154e..e606b91 100644 --- a/wrangler.toml +++ b/wrangler.toml @@ -23,10 +23,18 @@ database_id = "a0021f2e-a8cc-4e20-8910-3c7290ba47a6" name = "CLASSROOM_DO" class_name = "ClassroomDO" +[[durable_objects.bindings]] +name = "PRESENCE_DO" +class_name = "PresenceDO" + [[migrations]] tag = "v1" new_sqlite_classes = ["ClassroomDO"] +[[migrations]] +tag = "v2" +new_sqlite_classes = ["PresenceDO"] + [observability] enabled = false