diff --git a/package.json b/package.json index cc3e786..ffc90c9 100644 --- a/package.json +++ b/package.json @@ -26,6 +26,7 @@ "http-status-codes": "2.3.0", "jsonwebtoken": "9.0.2", "mongoose": "7.6.3", + "socket.io": "4.7.2", "uuid": "9.0.1", "winston": "3.11.0", "zod": "^4.4.3" diff --git a/src/middlewares/socketAuth.ts b/src/middlewares/socketAuth.ts new file mode 100644 index 0000000..2c1af27 --- /dev/null +++ b/src/middlewares/socketAuth.ts @@ -0,0 +1,47 @@ +import { Socket, Namespace } from 'socket.io'; +import authService from '../services/authService'; +import logger from '../config/logger'; + +export interface AuthenticatedSocket extends Socket { + data: { user?: any }; +} + +/** + * Socket.io middleware to authenticate connections using JWT. + * Expects token to be provided in `socket.handshake.auth.token` or `socket.handshake.query.token`. + */ +const socketAuth = async (socket: Socket, next: (err?: any) => void) => { + try { + const token = + // prefer auth payload + (socket.handshake && + (socket.handshake as any).auth && + (socket.handshake as any).auth.token) || + // fallback to query string + (socket.handshake && + (socket.handshake as any).query && + (socket.handshake as any).query.token); + + if (!token) { + logger.warn('Socket auth failed: missing token'); + return next(new Error('Unauthorized')); + } + + const { userId } = authService.verifyToken(token as string); + const user = await authService.getUserById(userId); + if (!user) { + logger.warn('Socket auth failed: user not found'); + return next(new Error('Unauthorized')); + } + + // Attach user to socket data for downstream handlers + (socket as AuthenticatedSocket).data = { ...(socket as any).data, user }; + + return next(); + } catch (err) { + logger.warn('Socket auth verification error', err); + return next(new Error('Unauthorized')); + } +}; + +export default socketAuth; diff --git a/src/models/ChatMessage.ts b/src/models/ChatMessage.ts new file mode 100644 index 0000000..58317ec --- /dev/null +++ b/src/models/ChatMessage.ts @@ -0,0 +1,22 @@ +import mongoose, { Document, Model } from 'mongoose'; + +export interface IChatMessage extends Document { + content: string; + sender?: string; + createdAt: Date; +} + +const ChatMessageSchema = new mongoose.Schema( + { + content: { type: String, required: true }, + sender: { type: String }, + createdAt: { type: Date, default: Date.now }, + }, + { versionKey: false }, +); + +const ChatMessage = + (mongoose.models.ChatMessage as Model) || + mongoose.model('ChatMessage', ChatMessageSchema); + +export default ChatMessage; diff --git a/src/models/User.ts b/src/models/User.ts index 7a75936..ec7d09e 100644 --- a/src/models/User.ts +++ b/src/models/User.ts @@ -1,129 +1,23 @@ -import { Schema, model, Document, Types } from 'mongoose'; -import bcrypt from 'bcryptjs'; - -// ─── Enums ──────────────────────────────────────────────────────────────────── - -export enum UserRole { - ADMIN = 'admin', - USER = 'user', - DRIVER = 'driver', -} - -export enum UserStatus { - ACTIVE = 'active', - SUSPENDED = 'suspended', - BANNED = 'banned', -} - -// ─── Document interface ──────────────────────────────────────────────────────── +import mongoose, { Document, Model } from 'mongoose'; export interface IUser extends Document { - _id: Types.ObjectId; - name: string; email: string; - password: string; - role: UserRole; - status: UserStatus; - stellarAddress?: string; - suspendedAt?: Date; - suspendedReason?: string; + name?: string; + role?: string; + password?: string; createdAt: Date; - updatedAt: Date; - - /** Returns true when the plain-text password matches the stored hash. */ - comparePassword(candidate: string): Promise; } -// ─── Schema ─────────────────────────────────────────────────────────────────── - -const UserSchema = new Schema( - { - name: { - type: String, - required: true, - trim: true, - maxlength: 100, - }, - - email: { - type: String, - required: true, - unique: true, - trim: true, - lowercase: true, - match: [/^\S+@\S+\.\S+$/, 'Please provide a valid email address'], - index: true, - }, - - password: { - type: String, - required: true, - minlength: 8, - // Never return the password hash in query results by default - select: false, - }, - - role: { - type: String, - enum: Object.values(UserRole), - required: true, - default: UserRole.USER, - index: true, - }, - - status: { - type: String, - enum: Object.values(UserStatus), - required: true, - default: UserStatus.ACTIVE, - index: true, - }, - - stellarAddress: { - type: String, - trim: true, - }, - - // Audit fields set when an admin suspends/bans the account - suspendedAt: { type: Date }, - suspendedReason: { type: String, trim: true, maxlength: 500 }, - }, +const UserSchema = new mongoose.Schema( { - timestamps: true, - toJSON: { - virtuals: true, - transform: (_doc, ret) => { - ret.id = ret._id.toString(); - // Never leak the password hash over the wire - delete ret.password; - delete ret.__v; - return ret; - }, - }, + email: { type: String, required: true, unique: true }, + name: { type: String }, + role: { type: String, default: 'user' }, + password: { type: String }, }, + { timestamps: { createdAt: 'createdAt', updatedAt: false }, versionKey: false }, ); -// ─── Hooks ──────────────────────────────────────────────────────────────────── - -/** Hash password before save when it has been modified. */ -UserSchema.pre('save', async function (next) { - if (!this.isModified('password')) return next(); - - const rounds = parseInt(process.env.BCRYPT_ROUNDS ?? '10', 10); - this.password = await bcrypt.hash(this.password, rounds); - next(); -}); - -// ─── Instance methods ────────────────────────────────────────────────────────── - -UserSchema.methods.comparePassword = async function ( - candidate: string, -): Promise { - return bcrypt.compare(candidate, this.password as string); -}; - -// ─── Model ──────────────────────────────────────────────────────────────────── - -const User = model('User', UserSchema); +const User = (mongoose.models.User as Model) || mongoose.model('User', UserSchema); export default User; diff --git a/src/server.ts b/src/server.ts index 0bddd59..fffb105 100644 --- a/src/server.ts +++ b/src/server.ts @@ -4,7 +4,7 @@ dotenv.config(); import app from './app'; import logger from './config/logger'; -import env from './config/env'; +import initSocket from './sockets'; const PORT = env.PORT; @@ -13,12 +13,28 @@ const server = app.listen(PORT, () => { logger.info(`📝 Health check: http://localhost:${PORT}/health`); }); +// Initialize Socket.io +const io = initSocket(server); + // Graceful shutdown const gracefulShutdown = (): void => { logger.info('Received shutdown signal, closing gracefully...'); + // Close HTTP server server.close(() => { logger.info('HTTP server closed'); + + // Close socket.io if present + try { + if (io && typeof io.close === 'function') { + // close all sockets + // @ts-ignore + io.close(() => logger.info('Socket.io server closed')); + } + } catch (err) { + logger.warn('Error while closing Socket.io', err); + } + import('mongoose').then(({ default: mongoose }) => { mongoose.connection.close(false).then(() => { logger.info('MongoDB connection closed'); diff --git a/src/services/authService.ts b/src/services/authService.ts index 6807899..1929900 100644 --- a/src/services/authService.ts +++ b/src/services/authService.ts @@ -1,38 +1,26 @@ -import User, { type IUser } from '../models/User'; -import ApiError from '../utils/ApiError'; -import type { RegisterInput } from '../validators/authValidator'; +import jwt from 'jsonwebtoken'; +import logger from '../config/logger'; +import User, { IUser } from '../models/User'; -/** - * Public-facing representation of a user, with the password hash removed. - */ -export type SafeUser = Omit & { id: string }; +const JWT_SECRET = process.env.JWT_SECRET || 'change_me_in_prod'; -/** - * Register a new user. - * - * Persists the user (the password is hashed by the model's pre-save hook) - * and returns a sanitized representation that never exposes the hash. - * - * @throws {ApiError} 409 if a user with the same email already exists. - */ -export const registerUser = async (input: RegisterInput): Promise => { - const existingUser = await User.findOne({ email: input.email }).lean().exec(); - if (existingUser) { - throw ApiError.conflict('A user with this email already exists'); +class AuthService { + public verifyToken(token: string): { userId: string } { + try { + const decoded = jwt.verify(token, JWT_SECRET) as { sub?: string } | null; + if (!decoded) throw new Error('Invalid token'); + const userId = (decoded as any).sub || (decoded as any).id || (decoded as any)._id; + if (!userId) throw new Error('Token missing subject'); + return { userId }; + } catch (error) { + logger.warn('JWT verification failed', error); + throw error; + } } - try { - const user = await User.create(input); - // `toJSON` strips the password hash and internal fields. - return user.toJSON() as unknown as SafeUser; - } catch (error) { - // Guard against a race condition where the unique index rejects a - // concurrent insert after the existence check above. - if (error instanceof Error && 'code' in error && error.code === 11000) { - throw ApiError.conflict('A user with this email already exists'); - } - throw error; + public async getUserById(id: string): Promise { + return User.findById(id).lean().exec() as unknown as IUser | null; } -}; +} -export default { registerUser }; +export default new AuthService(); diff --git a/src/sockets/index.ts b/src/sockets/index.ts new file mode 100644 index 0000000..ce55fc2 --- /dev/null +++ b/src/sockets/index.ts @@ -0,0 +1,29 @@ +import { Server } from 'socket.io'; +import registerSocketHandlers from './socketController'; +import logger from '../config/logger'; +import socketAuth from '../middlewares/socketAuth'; + +export const initSocket = (httpServer: any) => { + const io = new Server(httpServer, { + path: '/socket.io', + cors: { + origin: process.env.CORS_ORIGIN || '*', + methods: ['GET', 'POST'], + }, + }); + + const nsp = io.of('/api/v1/realtime'); + + // Attach authentication middleware to namespace + nsp.use((socket, next) => socketAuth(socket as any, next as any)); + + nsp.on('connection', (socket) => { + registerSocketHandlers(socket, nsp); + }); + + logger.info('✅ Socket.io initialized on namespace /api/v1/realtime'); + + return io; +}; + +export default initSocket; diff --git a/src/sockets/socketController.ts b/src/sockets/socketController.ts new file mode 100644 index 0000000..817467e --- /dev/null +++ b/src/sockets/socketController.ts @@ -0,0 +1,28 @@ +import { Namespace, Socket } from 'socket.io'; +import socketService from './socketService'; +import logger from '../config/logger'; + +const registerSocketHandlers = (socket: Socket, nsp: Namespace): void => { + logger.info(`Socket connected: ${socket.id} to namespace ${nsp.name}`); + + socketService.handleConnection(socket, nsp); + + socket.on('message', async (payload) => { + try { + await socketService.handleIncomingMessage(nsp, payload); + } catch (err) { + logger.error('Socket message handler error', err); + socket.emit('error', { message: 'Failed to handle message' }); + } + }); + + socket.on('disconnect', (reason) => { + logger.info(`Socket disconnected: ${socket.id} reason: ${reason}`); + }); + + socket.on('error', (err) => { + logger.error(`Socket error on ${socket.id}:`, err); + }); +}; + +export default registerSocketHandlers; diff --git a/src/sockets/socketService.ts b/src/sockets/socketService.ts new file mode 100644 index 0000000..e386061 --- /dev/null +++ b/src/sockets/socketService.ts @@ -0,0 +1,41 @@ +import ChatMessage, { IChatMessage } from '../models/ChatMessage'; +import logger from '../config/logger'; +import { Namespace, Socket } from 'socket.io'; + +class SocketService { + public async getRecentMessages(limit = 10): Promise { + return ChatMessage.find() + .sort({ createdAt: -1 }) + .limit(limit) + .lean() + .exec() as unknown as IChatMessage[]; + } + + public async saveMessage(payload: { content: string; sender?: string }) { + return ChatMessage.create({ content: payload.content, sender: payload.sender }); + } + + public async handleConnection(socket: Socket, nsp: Namespace): Promise { + try { + const recent = await this.getRecentMessages(); + socket.emit('recentMessages', recent.reverse()); + } catch (error) { + logger.error('Error fetching recent messages', error); + socket.emit('error', { message: 'Failed to load recent messages' }); + } + } + + public async handleIncomingMessage( + nsp: Namespace, + payload: { content: string; sender?: string }, + ): Promise { + try { + const doc = await this.saveMessage(payload); + nsp.emit('message', doc); + } catch (error) { + logger.error('Error saving message', error); + } + } +} + +export default new SocketService();