From 1f51ccbb0b38c39fe3dea1233d27c0302e7183ae Mon Sep 17 00:00:00 2001 From: veloura-dev Date: Tue, 30 Jun 2026 20:33:24 +0000 Subject: [PATCH] #303 WebSocket client: envelope protocol, resume + sync FIXED --- apps/backend/src/lib/eventEnvelope.ts | 63 +--- apps/backend/src/middleware/auth.ts | 1 + apps/backend/src/middleware/socketAuth.ts | 1 + apps/backend/src/routes/auth.ts | 43 ++- apps/backend/src/routes/messages.ts | 7 +- apps/backend/src/routes/sync.ts | 6 + apps/backend/src/schemas/auth.schemas.ts | 29 +- apps/backend/src/socket/messaging.ts | 92 +++++- apps/web/src/app/conversations/[id]/page.tsx | 272 ++++++++++++------ apps/web/src/components/auth/AuthProvider.tsx | 32 +-- apps/web/src/contexts/AuthContext.tsx | 60 ++-- apps/web/src/hooks/useSocket.ts | 72 ++++- apps/web/src/lib/deviceIdentity.ts | 57 ++++ apps/web/src/lib/realtime.ts | 182 ++++++++++++ apps/web/src/lib/socket.ts | 48 +++- 15 files changed, 740 insertions(+), 225 deletions(-) create mode 100644 apps/web/src/lib/deviceIdentity.ts create mode 100644 apps/web/src/lib/realtime.ts diff --git a/apps/backend/src/lib/eventEnvelope.ts b/apps/backend/src/lib/eventEnvelope.ts index 7dcdd95..7568975 100644 --- a/apps/backend/src/lib/eventEnvelope.ts +++ b/apps/backend/src/lib/eventEnvelope.ts @@ -1,55 +1,12 @@ -import { randomUUID } from 'node:crypto'; import { z } from 'zod'; - -// Central registry of all valid socket event types. -export const KNOWN_EVENT_TYPES = new Set([ - // Inbound (client → server) - 'join_room', - 'send_message', - 'message_history', - 'delete_message', - 'message_read', - 'create_conversation', - 'typing_start', - 'typing_stop', - 'ask_assistant', - 'resume', - 'join_device_channel', - // Outbound (server → client) — registered so the registry is the single source of truth - 'room_joined', - 'new_message', - 'message_ack', - 'message_deleted', - 'read_receipt', - 'conversation_created', - 'ephemeral_replay', - 'resume_complete', - 'device_envelope', - 'error', -]); - -export const EventEnvelopeSchema = z.object({ - eventId: z.string().min(1, 'eventId is required'), - type: z.string().min(1, 'type is required'), - timestamp: z.number().int().positive('timestamp must be a positive integer'), - payload: z.record(z.string(), z.unknown()).optional().default({}), -}); - +export declare const KNOWN_EVENT_TYPES: Set; +export declare const EventEnvelopeSchema: z.ZodObject<{ + eventId: z.ZodString; + type: z.ZodString; + timestamp: z.ZodNumber; + payload: z.ZodDefault>>; +}, z.core.$strip>; export type EventEnvelope = z.infer; - -export function isKnownEventType(type: string): boolean { - return KNOWN_EVENT_TYPES.has(type); -} - -export function createEnvelope( - type: string, - payload: Record, - eventId?: string, -): EventEnvelope { - return { - eventId: eventId ?? randomUUID(), - type, - timestamp: Date.now(), - payload, - }; -} +export declare function isKnownEventType(type: string): boolean; +export declare function createEnvelope(type: string, payload: Record, eventId?: string): EventEnvelope; +//# sourceMappingURL=eventEnvelope.d.ts.map \ No newline at end of file diff --git a/apps/backend/src/middleware/auth.ts b/apps/backend/src/middleware/auth.ts index ec71ba3..e468707 100644 --- a/apps/backend/src/middleware/auth.ts +++ b/apps/backend/src/middleware/auth.ts @@ -45,6 +45,7 @@ export async function requireAuth( return; } + req.auth = payload; next(); } diff --git a/apps/backend/src/middleware/socketAuth.ts b/apps/backend/src/middleware/socketAuth.ts index b866b7c..92db1a6 100644 --- a/apps/backend/src/middleware/socketAuth.ts +++ b/apps/backend/src/middleware/socketAuth.ts @@ -39,6 +39,7 @@ export async function socketAuthMiddleware( return; } + socket.auth = payload; next(); } diff --git a/apps/backend/src/routes/auth.ts b/apps/backend/src/routes/auth.ts index d5ba252..2db02c8 100644 --- a/apps/backend/src/routes/auth.ts +++ b/apps/backend/src/routes/auth.ts @@ -4,7 +4,7 @@ import type { Request, Response, IRouter } from 'express'; import rateLimit, { type RateLimitRequestHandler } from 'express-rate-limit'; import { Keypair } from '@stellar/stellar-sdk'; import { db } from '../db/index.js'; -import { users, wallets, devices } from '../db/schema.js'; +import { users, wallets, devices, userDevices } from '../db/schema.js'; import { eq, and } from 'drizzle-orm'; import { createNonce, consumeNonce } from '../lib/nonce.js'; import { signToken } from '../lib/jwt.js'; @@ -57,7 +57,7 @@ authRouter.post( verifyLimiter, validate(VerifySchema), async (req: Request, res: Response) => { - const { walletAddress, signature, nonce, identityPublicKey } = req.body as VerifyBody; + const { walletAddress, signature, nonce, identityPublicKey, deviceId: clientDeviceId, deviceName, platform, registrationId } = req.body as VerifyBody; // Validate and consume nonce const valid = consumeNonce(walletAddress, nonce); @@ -135,7 +135,42 @@ authRouter.post( deviceId = newDevice.id; } - const token = signToken({ userId, walletAddress, deviceId }); - res.json({ token }); + let tokenDeviceId = deviceId; + + if (clientDeviceId) { + const [userDevice] = await db + .insert(userDevices) + .values({ + userId, + deviceId: clientDeviceId, + deviceName: deviceName ?? 'Web browser', + platform: platform ?? 'web', + identityPublicKey, + registrationId: registrationId ? Number(registrationId) : null, + lastSeenAt: new Date(), + }) + .onConflictDoUpdate({ + target: [userDevices.userId, userDevices.deviceId], + set: { + deviceName: deviceName ?? 'Web browser', + platform: platform ?? 'web', + identityPublicKey, + registrationId: registrationId ? Number(registrationId) : null, + lastSeenAt: new Date(), + revokedAt: null, + }, + }) + .returning({ id: userDevices.id }); + + if (!userDevice) { + res.status(500).json({ error: 'Failed to register messaging device' }); + return; + } + + tokenDeviceId = userDevice.id; + } + + const token = signToken({ userId, walletAddress, deviceId: tokenDeviceId }); + res.json({ token, deviceId: tokenDeviceId }); }, ); diff --git a/apps/backend/src/routes/messages.ts b/apps/backend/src/routes/messages.ts index 273a232..fbeccf0 100644 --- a/apps/backend/src/routes/messages.ts +++ b/apps/backend/src/routes/messages.ts @@ -31,7 +31,12 @@ messagesRouter.post('/', validate(SendMessageSchema), async (req: AuthRequest, r }; // ── content-type-specific validation ────────────────────────────────────── - const validation = validateMessagePayload({ contentType, ciphertext, envelopes, fileId }); + const validation = validateMessagePayload({ + ...(contentType !== undefined ? { contentType } : {}), + ...(ciphertext !== undefined ? { ciphertext } : {}), + ...(envelopes !== undefined ? { envelopes } : {}), + ...(fileId !== undefined ? { fileId } : {}), + }); if (!validation.ok) { res.status(validation.code).json({ error: validation.message }); return; diff --git a/apps/backend/src/routes/sync.ts b/apps/backend/src/routes/sync.ts index 3a08c89..8eab03f 100644 --- a/apps/backend/src/routes/sync.ts +++ b/apps/backend/src/routes/sync.ts @@ -80,6 +80,9 @@ syncRouter.get('/', async (req: AuthRequest, res) => { createdAt: messageEnvelopes.createdAt, sequenceNumber: messages.sequenceNumber, conversationId: messages.conversationId, + senderId: messages.senderId, + senderDeviceId: messages.senderDeviceId, + contentType: messages.contentType, }) .from(messageEnvelopes) .innerJoin(messages, eq(messageEnvelopes.messageId, messages.id)) @@ -116,6 +119,9 @@ syncRouter.get('/', async (req: AuthRequest, res) => { id: r.id, messageId: r.messageId, conversationId: r.conversationId, + senderId: r.senderId, + senderDeviceId: r.senderDeviceId, + contentType: r.contentType, ciphertext: r.ciphertext, sequenceNumber: r.sequenceNumber, deliveredAt: r.deliveredAt, diff --git a/apps/backend/src/schemas/auth.schemas.ts b/apps/backend/src/schemas/auth.schemas.ts index f627e68..31ca082 100644 --- a/apps/backend/src/schemas/auth.schemas.ts +++ b/apps/backend/src/schemas/auth.schemas.ts @@ -5,6 +5,13 @@ export const ChallengeSchema = z.object({ walletAddress: z.string().min(1, 'walletAddress is required'), }); +const DeviceRegistrationSchema = z.object({ + deviceId: z.string().min(1, 'deviceId is required').optional(), + deviceName: z.string().min(1, 'deviceName is required').optional(), + platform: z.enum(['web', 'ios', 'android']).optional(), + registrationId: z.string().optional(), +}); + export const DeviceSchema = z.object({ deviceId: z.string().min(1, 'deviceId is required'), deviceName: z.string().min(1, 'deviceName is required'), @@ -13,16 +20,18 @@ export const DeviceSchema = z.object({ registrationId: z.string().optional(), }); -export const VerifySchema = z.object({ - walletAddress: z.string().min(1, 'walletAddress is required'), - signature: z.string().min(1, 'signature is required'), - nonce: z.string().min(1, 'nonce is required'), - /** - * Base64-encoded Ed25519 SPKI DER identity public key (44 bytes). - * Validated for correct base64 and exact byte length before any crypto operation. - */ - identityPublicKey: IdentityPublicKeySchema, -}); +export const VerifySchema = z + .object({ + walletAddress: z.string().min(1, 'walletAddress is required'), + signature: z.string().min(1, 'signature is required'), + nonce: z.string().min(1, 'nonce is required'), + /** + * Base64-encoded Ed25519 SPKI DER identity public key (44 bytes). + * Validated for correct base64 and exact byte length before any crypto operation. + */ + identityPublicKey: IdentityPublicKeySchema, + }) + .merge(DeviceRegistrationSchema); export type ChallengeBody = z.infer; export type DeviceBody = z.infer; diff --git a/apps/backend/src/socket/messaging.ts b/apps/backend/src/socket/messaging.ts index e9e99e6..74336c0 100644 --- a/apps/backend/src/socket/messaging.ts +++ b/apps/backend/src/socket/messaging.ts @@ -66,13 +66,14 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void // ── send_message ─────────────────────────────────────────────────────────── dispatcher.register('send_message', async (payload) => { - const { conversationId, messageId, content, contentType, ciphertext, envelopes } = payload as { + const { conversationId, messageId, content, contentType, ciphertext, envelopes, fileId } = payload as { conversationId: string; messageId?: string; content?: string; contentType?: string; ciphertext?: string; envelopes?: Array<{ recipientDeviceId: string; ciphertext: string }>; + fileId?: string; }; const deviceId = socket.auth!.deviceId; @@ -100,10 +101,10 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void const effectiveCiphertext = ciphertext ?? content ?? undefined; const validation = validateMessagePayload({ - contentType, - ciphertext: effectiveCiphertext, - envelopes, - fileId, + ...(contentType !== undefined ? { contentType } : {}), + ...(effectiveCiphertext !== undefined ? { ciphertext: effectiveCiphertext } : {}), + ...(envelopes !== undefined ? { envelopes } : {}), + ...(fileId !== undefined ? { fileId } : {}), }); if (!validation.ok) { socket.emit('error', { @@ -136,15 +137,15 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void return; } - let fileId: string | undefined; - const resolvedContentType = contentType || 'text/plain'; - if (FILE_CONTENT_TYPES.has(resolvedContentType)) { + const resolvedContentType = contentType?.trim().toLowerCase() || 'text'; + let persistedFileId = fileId; + if (FILE_CONTENT_TYPES.has(resolvedContentType) && !persistedFileId) { const [fileRow] = await db .insert(files) .values({ storageKey: messageId }) .onConflictDoUpdate({ target: files.storageKey, set: { storageKey: messageId } }) .returning({ id: files.id }); - fileId = fileRow?.id; + persistedFileId = fileRow?.id; } const [message] = await db @@ -156,7 +157,7 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void senderDeviceId: deviceId, contentType: resolvedContentType, ciphertext: effectiveCiphertext, - fileId: fileId ?? null, + fileId: persistedFileId ?? null, }) .returning(); @@ -201,7 +202,9 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void socket.emit('message_ack', { messageId, sequenceNumber: message.sequenceNumber }); } - await deliverMessage(io, message, conversationId); + if (message) { + await deliverMessage(io, message, conversationId); + } const members = await db.query.conversationMembers.findMany({ where: eq(conversationMembers.conversationId, conversationId), @@ -443,7 +446,7 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void ), ); - io.to(conversationId).volatile.emit('read_receipt', { userId, lastReadMessageId }); + io.to(conversationId).volatile.emit('read_receipt', { conversationId, userId, lastReadMessageId }); if (redis) { const members = await db.query.conversationMembers.findMany({ @@ -458,6 +461,71 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void } }); + // ── message_delivered ────────────────────────────────────────────────────── + dispatcher.register('message_delivered', async (payload) => { + const { conversationId, messageId, envelopeId, sequenceNumber } = payload as { + conversationId?: string; + messageId?: string; + envelopeId?: string; + sequenceNumber?: number; + }; + + if (!conversationId || !messageId) { + socket.emit('error', { + event: 'message_delivered', + message: 'conversationId and messageId are required', + }); + return; + } + + const membership = await db.query.conversationMembers.findFirst({ + where: and( + eq(conversationMembers.conversationId, conversationId), + eq(conversationMembers.userId, userId), + ), + }); + + if (!membership) { + socket.emit('error', { event: 'message_delivered', message: 'Not a member of this conversation' }); + return; + } + + await db + .update(messageEnvelopes) + .set({ deliveredAt: new Date() }) + .where( + and( + eq(messageEnvelopes.messageId, messageId), + eq(messageEnvelopes.recipientDeviceId, socket.auth!.deviceId), + ), + ); + + io.to(conversationId).volatile.emit('delivery_receipt', { + conversationId, + messageId, + envelopeId, + userId, + deviceId: socket.auth!.deviceId, + sequenceNumber, + deliveredAt: new Date().toISOString(), + }); + + if (redis) { + const members = await db.query.conversationMembers.findMany({ + where: eq(conversationMembers.conversationId, conversationId), + columns: { userId: true }, + }); + await publishEphemeral( + redis, + members.map((member) => member.userId), + { + type: 'delivery_receipt', + data: { conversationId, messageId, envelopeId, userId, deviceId: socket.auth!.deviceId, sequenceNumber }, + }, + ); + } + }); + // ── resume ───────────────────────────────────────────────────────────────── dispatcher.register('resume', async (payload) => { if (!redis) { diff --git a/apps/web/src/app/conversations/[id]/page.tsx b/apps/web/src/app/conversations/[id]/page.tsx index f2516eb..adbff2d 100644 --- a/apps/web/src/app/conversations/[id]/page.tsx +++ b/apps/web/src/app/conversations/[id]/page.tsx @@ -1,9 +1,11 @@ 'use client'; -import { useEffect, useRef, useState, useCallback } from 'react'; +import { useCallback, useEffect, useMemo, useRef, useState } from 'react'; import Image from 'next/image'; import { useParams } from 'next/navigation'; +import { useAuth } from '@/components/auth/useAuth'; import { useSocket } from '@/hooks/useSocket'; +import { emitSocketEnvelope, parseJwtClaims, setSyncCursor } from '@/lib/realtime'; interface Sender { id: string; @@ -15,13 +17,18 @@ interface Message { id: string; conversationId: string; senderId: string; - content: string; + senderDeviceId?: string | null; + contentType?: string; + content?: string; + ciphertext?: string | null; + sequenceNumber?: number; createdAt: string; - sender: Sender; + pending?: boolean; + delivered?: boolean; + readBy?: string[]; + sender?: Sender; } -// ── Helpers ────────────────────────────────────────────────────────────────── - function formatTime(iso: string) { return new Date(iso).toLocaleTimeString([], { hour: '2-digit', minute: '2-digit' }); } @@ -31,7 +38,6 @@ function formatDateLabel(iso: string) { const today = new Date(); const yesterday = new Date(today); yesterday.setDate(today.getDate() - 1); - if (d.toDateString() === today.toDateString()) return 'Today'; if (d.toDateString() === yesterday.toDateString()) return 'Yesterday'; return d.toLocaleDateString([], { month: 'long', day: 'numeric', year: 'numeric' }); @@ -44,146 +50,238 @@ function dayKey(iso: string) { function Avatar({ src, name }: { src: string | null; name: string }) { if (src) { return ( - {name} + {name} ); } - return ( -
- {name.charAt(0).toUpperCase()} -
- ); + return
{name.charAt(0).toUpperCase()}
; } -// ── Component ───────────────────────────────────────────────────────────────── +function normaliseMessage(msg: Partial): Message | null { + const id = msg.id ?? (msg as { messageId?: string }).messageId; + if (!id || !msg.conversationId) return null; + return { + id, + conversationId: msg.conversationId, + senderId: msg.senderId ?? '', + senderDeviceId: msg.senderDeviceId, + contentType: msg.contentType ?? 'text', + content: msg.content ?? msg.ciphertext ?? '', + ciphertext: msg.ciphertext ?? msg.content ?? '', + sequenceNumber: msg.sequenceNumber, + createdAt: msg.createdAt ?? new Date().toISOString(), + pending: msg.pending, + delivered: msg.delivered, + readBy: msg.readBy ?? [], + sender: msg.sender, + }; +} export default function ConversationPage() { const { id } = useParams<{ id: string }>(); - - // TODO: replace with real auth token from your auth context/store - const token = typeof window !== 'undefined' ? localStorage.getItem('token') : null; - // TODO: replace with real current user id from your auth context/store - const currentUserId = typeof window !== 'undefined' ? localStorage.getItem('userId') : null; - + const { token } = useAuth(); + const currentUserId = parseJwtClaims(token).userId ?? null; const socket = useSocket(token); const [messages, setMessages] = useState([]); + const [typingUsers, setTypingUsers] = useState>(new Set()); + const [input, setInput] = useState(''); const bottomRef = useRef(null); const containerRef = useRef(null); + const typingTimer = useRef | null>(null); - // Scroll to bottom only when user is already near the bottom const scrollToBottom = useCallback((force = false) => { const el = containerRef.current; if (!el) return; const atBottom = el.scrollHeight - el.scrollTop - el.clientHeight < 80; - if (force || atBottom) { - bottomRef.current?.scrollIntoView({ behavior: 'smooth' }); - } + if (force || atBottom) bottomRef.current?.scrollIntoView({ behavior: 'smooth' }); }, []); + const upsertMessage = useCallback((incoming: Partial) => { + const msg = normaliseMessage(incoming); + if (!msg || msg.conversationId !== id) return; + setMessages((prev) => { + const index = prev.findIndex((m) => m.id === msg.id); + if (index !== -1) { + const next = [...prev]; + next[index] = { ...next[index], ...msg, pending: msg.pending ?? next[index].pending }; + return next; + } + const next = [...prev, msg]; + return next.sort((a, b) => (a.sequenceNumber ?? Number.MAX_SAFE_INTEGER) - (b.sequenceNumber ?? Number.MAX_SAFE_INTEGER)); + }); + }, [id]); + useEffect(() => { if (!socket) return; - socket.emit('join_room', { conversationId: id }); - socket.emit('message_history', { conversationId: id }); + emitSocketEnvelope(socket, 'join_room', { conversationId: id }); + emitSocketEnvelope(socket, 'message_history', { conversationId: id }); - socket.on('message_history', (data: { conversationId: string; messages: Message[] }) => { - if (data.conversationId === id) { - setMessages(data.messages); - // Force scroll on initial load - setTimeout(() => scrollToBottom(true), 50); - } - }); + function onHistory(data: { conversationId: string; messages: Message[] }) { + if (data.conversationId !== id) return; + const next = data.messages.map(normaliseMessage).filter((m): m is Message => Boolean(m)); + setMessages(next); + setTimeout(() => scrollToBottom(true), 50); + } - socket.on('new_message', (msg: Message) => { - if (msg.conversationId === id) { - setMessages((prev) => [...prev, msg]); - scrollToBottom(); - } - }); + function onNewMessage(msg: Message) { + if (msg.conversationId !== id) return; + upsertMessage(msg); + scrollToBottom(); + if (msg.id) emitSocketEnvelope(socket, 'message_read', { conversationId: id, lastReadMessageId: msg.id }); + } + + function onMessageEnvelope(msg: Message & { messageId?: string }) { + onNewMessage({ ...msg, id: msg.id ?? msg.messageId, content: msg.ciphertext ?? msg.content }); + if (typeof msg.sequenceNumber === 'number') setSyncCursor(token, msg.sequenceNumber); + } + + function onAck(ack: { messageId: string; sequenceNumber: number }) { + setMessages((prev) => prev.map((m) => m.id === ack.messageId ? { ...m, pending: false, sequenceNumber: ack.sequenceNumber } : m)); + setSyncCursor(token, ack.sequenceNumber); + } + + function onDeliveryReceipt(receipt: { conversationId: string; messageId: string }) { + if (receipt.conversationId !== id) return; + setMessages((prev) => prev.map((m) => m.id === receipt.messageId ? { ...m, delivered: true } : m)); + } + + function onReadReceipt(receipt: { conversationId: string; userId: string; lastReadMessageId: string }) { + if (receipt.conversationId !== id) return; + setMessages((prev) => prev.map((m) => { + if (m.id !== receipt.lastReadMessageId) return m; + return { ...m, readBy: Array.from(new Set([...(m.readBy ?? []), receipt.userId])) }; + })); + } + + function onTypingStart(payload: { conversationId: string; userId: string }) { + if (payload.conversationId !== id || payload.userId === currentUserId) return; + setTypingUsers((prev) => new Set(prev).add(payload.userId)); + } + + function onTypingStop(payload: { conversationId: string; userId: string }) { + if (payload.conversationId !== id) return; + setTypingUsers((prev) => { + const next = new Set(prev); + next.delete(payload.userId); + return next; + }); + } + + socket.on('message_history', onHistory); + socket.on('new_message', onNewMessage); + socket.on('message_envelope', onMessageEnvelope); + socket.on('message_ack', onAck); + socket.on('delivery_receipt', onDeliveryReceipt); + socket.on('read_receipt', onReadReceipt); + socket.on('typing_start', onTypingStart); + socket.on('typing_stop', onTypingStop); return () => { - socket.off('message_history'); - socket.off('new_message'); + socket.off('message_history', onHistory); + socket.off('new_message', onNewMessage); + socket.off('message_envelope', onMessageEnvelope); + socket.off('message_ack', onAck); + socket.off('delivery_receipt', onDeliveryReceipt); + socket.off('read_receipt', onReadReceipt); + socket.off('typing_start', onTypingStart); + socket.off('typing_stop', onTypingStop); }; - }, [socket, id, scrollToBottom]); - - // ── Group messages by day ────────────────────────────────────────────────── - const grouped: { label: string; messages: Message[] }[] = []; - for (const msg of messages) { - const key = dayKey(msg.createdAt); - const last = grouped[grouped.length - 1]; - if (last && dayKey(last.messages[0].createdAt) === key) { - last.messages.push(msg); - } else { - grouped.push({ label: formatDateLabel(msg.createdAt), messages: [msg] }); + }, [socket, id, currentUserId, scrollToBottom, token, upsertMessage]); + + const grouped = useMemo(() => { + const groups: { label: string; messages: Message[] }[] = []; + for (const msg of messages) { + const key = dayKey(msg.createdAt); + const last = groups[groups.length - 1]; + if (last && dayKey(last.messages[0].createdAt) === key) last.messages.push(msg); + else groups.push({ label: formatDateLabel(msg.createdAt), messages: [msg] }); } + return groups; + }, [messages]); + + function handleTyping(value: string) { + setInput(value); + if (!socket) return; + emitSocketEnvelope(socket, 'typing_start', { conversationId: id }); + if (typingTimer.current) clearTimeout(typingTimer.current); + typingTimer.current = setTimeout(() => { + emitSocketEnvelope(socket, 'typing_stop', { conversationId: id }); + }, 1500); + } + + function sendMessage() { + const text = input.trim(); + if (!socket || !text) return; + const messageId = crypto.randomUUID(); + const optimistic: Message = { + id: messageId, + conversationId: id, + senderId: currentUserId ?? '', + content: text, + ciphertext: text, + contentType: 'text', + createdAt: new Date().toISOString(), + pending: true, + sender: { id: currentUserId ?? '', username: 'You', avatarUrl: null }, + }; + setMessages((prev) => [...prev, optimistic]); + setInput(''); + emitSocketEnvelope(socket, 'typing_stop', { conversationId: id }); + emitSocketEnvelope(socket, 'send_message', { + conversationId: id, + messageId, + contentType: 'text', + ciphertext: text, + envelopes: [{ recipientDeviceId: parseJwtClaims(token).deviceId ?? '', ciphertext: text }], + }); + setTimeout(() => scrollToBottom(true), 0); } return (
- {/* Header */}

Conversation

- {/* Message thread */}
{grouped.map((group) => (
- {/* Date separator */}
{group.label}
-
{group.messages.map((msg) => { const isSelf = msg.senderId === currentUserId; - const name = msg.sender.username ?? 'Unknown'; - + const name = msg.sender?.username ?? (isSelf ? 'You' : 'Unknown'); return ( -
- {!isSelf && } - -
- {!isSelf && ( - {name} - )} -
- {msg.content} +
+ {!isSelf && } +
+ {!isSelf && {name}} +
+ {msg.content ?? msg.ciphertext}
- {formatTime(msg.createdAt)} + {formatTime(msg.createdAt)} {msg.pending ? '• sending…' : msg.readBy?.length ? '• read' : msg.delivered ? '• delivered' : ''}
- - {isSelf && } + {isSelf && }
); })}
))} - + {typingUsers.size > 0 &&
Someone is typing…
}
+ +
{ e.preventDefault(); sendMessage(); }}> + handleTyping(e.target.value)} placeholder="Type a secure message…" className="flex-1 rounded-xl border border-[var(--border)] bg-[var(--background)] px-4 py-3 text-sm outline-none focus:border-[var(--accent)]" /> + +
); } diff --git a/apps/web/src/components/auth/AuthProvider.tsx b/apps/web/src/components/auth/AuthProvider.tsx index 1122d64..593e521 100644 --- a/apps/web/src/components/auth/AuthProvider.tsx +++ b/apps/web/src/components/auth/AuthProvider.tsx @@ -3,54 +3,42 @@ import { useCallback, useEffect, useMemo, useState } from 'react'; import { AuthContext } from './AuthContext'; -const TOKEN_STORAGE_KEY = 'clicked_token'; +const TOKEN_STORAGE_KEYS = ['clicked_token', 'clicked.jwt', 'auth_token']; export function AuthProvider({ children }: { children: React.ReactNode }) { const [token, setTokenState] = useState(null); const [loading, setLoading] = useState(true); useEffect(() => { - if (typeof window === 'undefined') { - return; - } + if (typeof window === 'undefined') return; - const storedToken = window.localStorage.getItem(TOKEN_STORAGE_KEY); + const storedToken = TOKEN_STORAGE_KEYS.map((key) => window.localStorage.getItem(key)).find( + Boolean, + ); const frameId = window.requestAnimationFrame(() => { - if (storedToken) { - setTokenState(storedToken); - } - + if (storedToken) setTokenState(storedToken); setLoading(false); }); - return () => { - window.cancelAnimationFrame(frameId); - }; + return () => window.cancelAnimationFrame(frameId); }, []); const setToken = useCallback((nextToken: string) => { if (typeof window !== 'undefined') { - window.localStorage.setItem(TOKEN_STORAGE_KEY, nextToken); + for (const key of TOKEN_STORAGE_KEYS) window.localStorage.setItem(key, nextToken); } - setTokenState(nextToken); }, []); const clearToken = useCallback(() => { if (typeof window !== 'undefined') { - window.localStorage.removeItem(TOKEN_STORAGE_KEY); + for (const key of TOKEN_STORAGE_KEYS) window.localStorage.removeItem(key); } - setTokenState(null); }, []); const value = useMemo( - () => ({ - token, - loading, - setToken, - clearToken, - }), + () => ({ token, loading, setToken, clearToken }), [clearToken, loading, setToken, token], ); diff --git a/apps/web/src/contexts/AuthContext.tsx b/apps/web/src/contexts/AuthContext.tsx index 35014d9..53f81c3 100644 --- a/apps/web/src/contexts/AuthContext.tsx +++ b/apps/web/src/contexts/AuthContext.tsx @@ -4,11 +4,15 @@ import { createContext, useCallback, useContext, useEffect, useMemo, useState } import { apiFetch } from '@/lib/api'; import { signWalletMessage } from '@/lib/freighter'; import { useWallet } from '@/contexts/WalletContext'; +import { getOrCreateDeviceIdentity } from '@/lib/deviceIdentity'; +import { parseJwtClaims, rememberRealtimeDeviceId } from '@/lib/realtime'; -const TOKEN_STORAGE_KEY = 'clicked.jwt'; +const TOKEN_STORAGE_KEYS = ['clicked.jwt', 'clicked_token', 'auth_token']; interface AuthUser { + id?: string; walletAddress: string; + deviceId?: string; } interface AuthContextValue { @@ -21,16 +25,23 @@ interface AuthContextValue { const AuthContext = createContext(undefined); +function persistToken(token: string) { + for (const key of TOKEN_STORAGE_KEYS) window.localStorage.setItem(key, token); +} + +function removeToken() { + for (const key of TOKEN_STORAGE_KEYS) window.localStorage.removeItem(key); +} + +function readToken(): string | null { + return TOKEN_STORAGE_KEYS.map((key) => window.localStorage.getItem(key)).find(Boolean) ?? null; +} + function parseJwtUser(token: string): AuthUser | null { - try { - const [, payload] = token.split('.'); - if (!payload) return null; - const normalized = payload.replace(/-/g, '+').replace(/_/g, '/'); - const decoded = JSON.parse(window.atob(normalized)) as { walletAddress?: string }; - return decoded.walletAddress ? { walletAddress: decoded.walletAddress } : null; - } catch { - return null; - } + const claims = parseJwtClaims(token); + return claims.walletAddress + ? { id: claims.userId, walletAddress: claims.walletAddress, deviceId: claims.deviceId } + : null; } export function AuthProvider({ children }: { children: React.ReactNode }) { @@ -40,7 +51,7 @@ export function AuthProvider({ children }: { children: React.ReactNode }) { const [isLoading, setIsLoading] = useState(false); useEffect(() => { - const savedToken = window.localStorage.getItem(TOKEN_STORAGE_KEY); + const savedToken = readToken(); if (savedToken) { setToken(savedToken); setUser(parseJwtUser(savedToken)); @@ -51,14 +62,13 @@ export function AuthProvider({ children }: { children: React.ReactNode }) { setIsLoading(true); try { const walletAddress = publicKey ?? (await connect()); + const device = await getOrCreateDeviceIdentity(); const challengeResponse = await apiFetch('/auth/challenge', { method: 'POST', body: JSON.stringify({ walletAddress }), }); - if (!challengeResponse.ok) { - throw new Error('Unable to request sign-in challenge'); - } + if (!challengeResponse.ok) throw new Error('Unable to request sign-in challenge'); const { message, nonce } = (await challengeResponse.json()) as { message: string; @@ -67,15 +77,19 @@ export function AuthProvider({ children }: { children: React.ReactNode }) { const signature = await signWalletMessage(message, walletAddress); const verifyResponse = await apiFetch('/auth/verify', { method: 'POST', - body: JSON.stringify({ walletAddress, signature, nonce }), + body: JSON.stringify({ + walletAddress, + signature, + nonce, + identityPublicKey: device.identityPublicKey, + }), }); - if (!verifyResponse.ok) { - throw new Error('Unable to verify signed challenge'); - } + if (!verifyResponse.ok) throw new Error('Unable to verify signed challenge'); - const { token: nextToken } = (await verifyResponse.json()) as { token: string }; - window.localStorage.setItem(TOKEN_STORAGE_KEY, nextToken); + const { token: nextToken, deviceId } = (await verifyResponse.json()) as { token: string; deviceId?: string }; + if (deviceId) rememberRealtimeDeviceId(deviceId); + persistToken(nextToken); setToken(nextToken); setUser(parseJwtUser(nextToken) ?? { walletAddress }); } finally { @@ -84,7 +98,7 @@ export function AuthProvider({ children }: { children: React.ReactNode }) { }, [connect, publicKey]); const signOut = useCallback(() => { - window.localStorage.removeItem(TOKEN_STORAGE_KEY); + removeToken(); setToken(null); setUser(null); }, []); @@ -99,8 +113,6 @@ export function AuthProvider({ children }: { children: React.ReactNode }) { export function useAuth() { const context = useContext(AuthContext); - if (!context) { - throw new Error('useAuth must be used within AuthProvider'); - } + if (!context) throw new Error('useAuth must be used within AuthProvider'); return context; } diff --git a/apps/web/src/hooks/useSocket.ts b/apps/web/src/hooks/useSocket.ts index 527a095..9915c8d 100644 --- a/apps/web/src/hooks/useSocket.ts +++ b/apps/web/src/hooks/useSocket.ts @@ -2,29 +2,85 @@ import { useEffect, useMemo } from 'react'; import { io, type Socket } from 'socket.io-client'; +import { + emitSocketEnvelope, + getResumeCursor, + getRealtimeDeviceId, + replaySocketEvent, + runSocketSync, + setResumeCursor, +} from '@/lib/realtime'; -const SOCKET_URL = process.env.NEXT_PUBLIC_SOCKET_URL ?? 'http://localhost:3001'; +const SOCKET_URL = + process.env.NEXT_PUBLIC_SOCKET_URL ?? + process.env.NEXT_PUBLIC_BACKEND_URL ?? + 'http://localhost:3001'; export function useSocket(token: string | null) { const socket = useMemo(() => { - if (!token) { - return null; - } + if (!token) return null; + + const deviceId = getRealtimeDeviceId(token); return io(SOCKET_URL, { - auth: { token }, + auth: { token, deviceId }, transports: ['websocket'], + reconnection: true, }); }, [token]); useEffect(() => { - if (!socket) { - return; + if (!socket || !token) return undefined; + + let closed = false; + const authToken = token; + + async function resumeThenSync() { + if (!socket || !token || closed) return; + emitSocketEnvelope(socket, 'resume', { lastEventId: getResumeCursor(authToken) }); + await runSocketSync(socket, authToken); + } + + function onConnect() { + void resumeThenSync(); + } + + function onResumeComplete(payload: { lastEventId?: string | null; syncRequired?: boolean }) { + setResumeCursor(authToken, payload.lastEventId ?? null); + if (payload.syncRequired && socket) void runSocketSync(socket, authToken); } + + function onEphemeralReplay(payload: { id?: string; type?: string; data?: Record }) { + if (payload.id) setResumeCursor(authToken, payload.id); + if (payload.type && socket) replaySocketEvent(socket, payload.type, payload.data ?? {}); + } + + function onMessageEnvelope(payload: { conversationId?: string; messageId?: string; envelopeId?: string; sequenceNumber?: number }) { + if (!payload.conversationId || !payload.messageId) return; + emitSocketEnvelope(socket, 'message_delivered', { + conversationId: payload.conversationId, + messageId: payload.messageId, + envelopeId: payload.envelopeId, + sequenceNumber: payload.sequenceNumber, + }); + } + + socket.on('connect', onConnect); + socket.on('resume_complete', onResumeComplete); + socket.on('ephemeral_replay', onEphemeralReplay); + socket.on('message_envelope', onMessageEnvelope); + + if (socket.connected) void resumeThenSync(); + return () => { + closed = true; + socket.off('connect', onConnect); + socket.off('resume_complete', onResumeComplete); + socket.off('ephemeral_replay', onEphemeralReplay); + socket.off('message_envelope', onMessageEnvelope); socket.disconnect(); }; - }, [socket]); + }, [socket, token]); return socket; } diff --git a/apps/web/src/lib/deviceIdentity.ts b/apps/web/src/lib/deviceIdentity.ts new file mode 100644 index 0000000..737efb2 --- /dev/null +++ b/apps/web/src/lib/deviceIdentity.ts @@ -0,0 +1,57 @@ +'use client'; + +const DEVICE_ID_KEY = 'clicked.e2eeDeviceId'; +const DEVICE_IDENTITY_KEY = 'clicked.deviceIdentityPublicKey'; + +async function exportPublicKey(key: CryptoKey): Promise { + const der = await crypto.subtle.exportKey('spki', key); + let binary = ''; + for (const byte of new Uint8Array(der)) binary += String.fromCharCode(byte); + return window.btoa(binary); +} + +function randomDeviceName(): string { + const nav = typeof navigator !== 'undefined' ? navigator : undefined; + return `${nav?.platform || 'Web'} browser`; +} + +export function getStoredDeviceId(): string | null { + if (typeof window === 'undefined') return null; + return window.localStorage.getItem(DEVICE_ID_KEY); +} + +export async function getOrCreateDeviceIdentity(): Promise<{ + deviceId: string; + deviceName: string; + platform: 'web'; + identityPublicKey: string; +}> { + const existingDeviceId = window.localStorage.getItem(DEVICE_ID_KEY); + const existingIdentity = window.localStorage.getItem(DEVICE_IDENTITY_KEY); + if (existingDeviceId && existingIdentity) { + return { + deviceId: existingDeviceId, + deviceName: randomDeviceName(), + platform: 'web', + identityPublicKey: existingIdentity, + }; + } + + const keyPair = await crypto.subtle.generateKey( + { name: 'Ed25519' }, + true, + ['sign', 'verify'], + ); + const identityPublicKey = await exportPublicKey(keyPair.publicKey); + const deviceId = existingDeviceId ?? crypto.randomUUID(); + + window.localStorage.setItem(DEVICE_ID_KEY, deviceId); + window.localStorage.setItem(DEVICE_IDENTITY_KEY, identityPublicKey); + + return { + deviceId, + deviceName: randomDeviceName(), + platform: 'web', + identityPublicKey, + }; +} diff --git a/apps/web/src/lib/realtime.ts b/apps/web/src/lib/realtime.ts new file mode 100644 index 0000000..c1d9cc8 --- /dev/null +++ b/apps/web/src/lib/realtime.ts @@ -0,0 +1,182 @@ +'use client'; + +import type { Socket } from 'socket.io-client'; +import { API_BASE_URL } from '@/lib/api'; + +export type SocketEventPayload = Record; + +export interface EventEnvelope { + eventId: string; + type: string; + timestamp: number; + payload: T; +} + +export interface JwtRealtimeClaims { + userId?: string; + walletAddress?: string; + deviceId?: string; +} + +export interface SyncedEnvelope { + id: string; + messageId: string; + conversationId: string; + senderId?: string; + senderDeviceId?: string | null; + contentType?: string; + ciphertext: string; + sequenceNumber: number; + deliveredAt?: string | null; + createdAt: string; +} + +const RESUME_CURSOR_PREFIX = 'clicked.socket.resumeCursor'; +const SYNC_CURSOR_PREFIX = 'clicked.socket.syncSequence'; +const E2EE_DEVICE_ID_KEY = 'clicked.e2eeDeviceId'; + +function base64UrlDecode(value: string): string { + const normalized = value.replace(/-/g, '+').replace(/_/g, '/'); + const padded = normalized.padEnd(Math.ceil(normalized.length / 4) * 4, '='); + return window.atob(padded); +} + +export function parseJwtClaims(token: string | null): JwtRealtimeClaims { + if (!token || typeof window === 'undefined') return {}; + try { + const [, payload] = token.split('.'); + if (!payload) return {}; + return JSON.parse(base64UrlDecode(payload)) as JwtRealtimeClaims; + } catch { + return {}; + } +} + +export function getRealtimeDeviceId(token: string | null): string | null { + if (typeof window === 'undefined') return parseJwtClaims(token).deviceId ?? null; + return window.localStorage.getItem(E2EE_DEVICE_ID_KEY) ?? parseJwtClaims(token).deviceId ?? null; +} + +export function rememberRealtimeDeviceId(deviceId: string): void { + if (typeof window !== 'undefined') window.localStorage.setItem(E2EE_DEVICE_ID_KEY, deviceId); +} + +export function createSocketEnvelope( + type: string, + payload: T, +): EventEnvelope { + return { + eventId: crypto.randomUUID(), + type, + timestamp: Date.now(), + payload, + }; +} + +export function emitSocketEnvelope( + socket: Socket | null, + type: string, + payload: T, +): string | null { + if (!socket) return null; + const envelope = createSocketEnvelope(type, payload); + socket.emit('dispatch', envelope); + return envelope.eventId; +} + +function cursorKey(prefix: string, token: string | null): string | null { + const claims = parseJwtClaims(token); + const deviceId = getRealtimeDeviceId(token); + if (!claims.userId || !deviceId) return null; + return `${prefix}:${claims.userId}:${deviceId}`; +} + +export function getResumeCursor(token: string | null): string { + const key = cursorKey(RESUME_CURSOR_PREFIX, token); + if (!key || typeof window === 'undefined') return ''; + return window.localStorage.getItem(key) ?? ''; +} + +export function setResumeCursor(token: string | null, cursor: string | null): void { + const key = cursorKey(RESUME_CURSOR_PREFIX, token); + if (!key || typeof window === 'undefined') return; + if (cursor) window.localStorage.setItem(key, cursor); +} + +export function getSyncCursor(token: string | null): number { + const key = cursorKey(SYNC_CURSOR_PREFIX, token); + if (!key || typeof window === 'undefined') return 0; + const parsed = Number(window.localStorage.getItem(key) ?? '0'); + return Number.isFinite(parsed) && parsed >= 0 ? parsed : 0; +} + +export function setSyncCursor(token: string | null, cursor: number): void { + const key = cursorKey(SYNC_CURSOR_PREFIX, token); + if (!key || typeof window === 'undefined') return; + window.localStorage.setItem(key, String(Math.max(0, cursor))); +} + +export function replaySocketEvent(socket: Socket, event: string, payload: unknown): void { + const emitEvent = (socket as unknown as { emitEvent?: (args: unknown[]) => void }).emitEvent; + if (typeof emitEvent === 'function') { + emitEvent.call(socket, [event, payload]); + return; + } + + const callbacks = (socket as unknown as { _callbacks?: Record void>> }) + ._callbacks?.[`$${event}`]; + callbacks?.slice().forEach((callback) => callback(payload)); +} + +export async function runSocketSync(socket: Socket, token: string): Promise { + const deviceId = getRealtimeDeviceId(token); + if (!deviceId) return; + + let cursor = getSyncCursor(token); + let hasMore = true; + + while (hasMore) { + const url = new URL(`${API_BASE_URL}/sync`); + url.searchParams.set('deviceId', deviceId); + url.searchParams.set('sinceSequence', String(cursor)); + + const response = await fetch(url.toString(), { + headers: { Authorization: `Bearer ${token}` }, + }); + + if (!response.ok) return; + + const data = (await response.json()) as { + envelopes?: SyncedEnvelope[]; + nextCursor?: number; + hasMore?: boolean; + }; + + for (const envelope of data.envelopes ?? []) { + const payload = { + envelopeId: envelope.id, + messageId: envelope.messageId, + id: envelope.messageId, + conversationId: envelope.conversationId, + senderId: envelope.senderId, + senderDeviceId: envelope.senderDeviceId, + contentType: envelope.contentType ?? 'text', + ciphertext: envelope.ciphertext, + sequenceNumber: envelope.sequenceNumber, + createdAt: envelope.createdAt, + }; + replaySocketEvent(socket, 'message_envelope', payload); + replaySocketEvent(socket, 'new_message', { ...payload, ciphertext: null }); + emitSocketEnvelope(socket, 'message_delivered', { + conversationId: envelope.conversationId, + messageId: envelope.messageId, + envelopeId: envelope.id, + sequenceNumber: envelope.sequenceNumber, + }); + } + + cursor = Math.max(cursor, data.nextCursor ?? cursor); + setSyncCursor(token, cursor); + hasMore = Boolean(data.hasMore && (data.envelopes?.length ?? 0) > 0); + } +} diff --git a/apps/web/src/lib/socket.ts b/apps/web/src/lib/socket.ts index 9a33ce4..8569dbd 100644 --- a/apps/web/src/lib/socket.ts +++ b/apps/web/src/lib/socket.ts @@ -1,19 +1,58 @@ import { io, Socket } from 'socket.io-client'; +import { + emitSocketEnvelope, + getRealtimeDeviceId, + getResumeCursor, + replaySocketEvent, + runSocketSync, + setResumeCursor, +} from '@/lib/realtime'; let socket: Socket | null = null; +let activeToken: string | null = null; export function initSocket( token: string, - serverUrl = process.env.NEXT_PUBLIC_BACKEND_URL || 'http://localhost:3001', + serverUrl = process.env.NEXT_PUBLIC_BACKEND_URL || process.env.NEXT_PUBLIC_SOCKET_URL || 'http://localhost:3001', ): Socket { - if (socket) return socket; + if (socket && activeToken === token) return socket; + if (socket) closeSocket(); + activeToken = token; socket = io(serverUrl, { - auth: { token }, + auth: { token, deviceId: getRealtimeDeviceId(token) }, reconnection: true, + transports: ['websocket'], + }); + + socket.on('connect', () => { + console.log('Socket connected'); + if (!socket) return; + emitSocketEnvelope(socket, 'resume', { lastEventId: getResumeCursor(token) }); + void runSocketSync(socket, token); + }); + + socket.on('resume_complete', (payload: { lastEventId?: string | null; syncRequired?: boolean }) => { + setResumeCursor(token, payload.lastEventId ?? null); + if (payload.syncRequired && socket) void runSocketSync(socket, token); + }); + + socket.on('ephemeral_replay', (payload: { id?: string; type?: string; data?: Record }) => { + if (!socket) return; + if (payload.id) setResumeCursor(token, payload.id); + if (payload.type) replaySocketEvent(socket, payload.type, payload.data ?? {}); + }); + + socket.on('message_envelope', (payload: { conversationId?: string; messageId?: string; envelopeId?: string; sequenceNumber?: number }) => { + if (!socket || !payload.conversationId || !payload.messageId) return; + emitSocketEnvelope(socket, 'message_delivered', { + conversationId: payload.conversationId, + messageId: payload.messageId, + envelopeId: payload.envelopeId, + sequenceNumber: payload.sequenceNumber, + }); }); - socket.on('connect', () => console.log('Socket connected')); socket.on('disconnect', () => console.log('Socket disconnected')); socket.on('error', (error) => console.error('Socket error:', error)); @@ -28,5 +67,6 @@ export function closeSocket(): void { if (socket) { socket.disconnect(); socket = null; + activeToken = null; } }