From a9bc4082a41855a46b25bae795a42a4b3987b5d1 Mon Sep 17 00:00:00 2001 From: Banx17 Date: Wed, 1 Jul 2026 19:25:48 +0100 Subject: [PATCH] Fix #804: Stop pause/resume controllers from duplicating PAUSED/RESUMED events --- backend/src/controllers/stream.controller.ts | 62 +------ .../pause-resume.regression.test.ts | 152 ++++++++++++++++++ .../tests/integration/stream-actions.test.ts | 32 ---- 3 files changed, 156 insertions(+), 90 deletions(-) create mode 100644 backend/tests/integration/pause-resume.regression.test.ts diff --git a/backend/src/controllers/stream.controller.ts b/backend/src/controllers/stream.controller.ts index b7670e84..c3bd6956 100644 --- a/backend/src/controllers/stream.controller.ts +++ b/backend/src/controllers/stream.controller.ts @@ -630,36 +630,13 @@ export const pauseStream = async (req: Request, res: Response) => { // Call Soroban service to verify the pause operation would succeed const result = await sorobanPauseStream(authReq.user.publicKey, parsedStreamId); - // Update the database to mark stream as paused - const now = Math.floor(Date.now() / 1000); - const updatedStream = await prisma.stream.update({ - where: { streamId: parsedStreamId }, - data: { - isPaused: true, - pausedAt: now, - lastUpdateTime: now, - }, - }); - - // Create a PAUSED event - await prisma.streamEvent.create({ - data: { - streamId: parsedStreamId, - eventType: 'PAUSED', - transactionHash: result.txHash, - ledgerSequence: 0, // Will be updated by event indexer - timestamp: now, - metadata: JSON.stringify({ pausedBy: authReq.user.publicKey }), - }, - }); - - logger.info(`Stream ${parsedStreamId} paused by ${authReq.user.publicKey}`); + logger.info(`Stream ${parsedStreamId} pause simulated by ${authReq.user.publicKey}`); return res.status(200).json({ success: true, streamId: parsedStreamId, txHash: result.txHash, - stream: updatedStream, + stream, }); } catch (sorobanError) { logger.error(`Soroban pause failed for stream ${parsedStreamId}:`, sorobanError); @@ -724,44 +701,13 @@ export const resumeStream = async (req: Request, res: Response) => { // Call Soroban service to verify the resume operation would succeed const result = await sorobanResumeStream(authReq.user.publicKey, parsedStreamId); - // Calculate pause duration and update the database - const now = Math.floor(Date.now() / 1000); - const pausedAt = stream.pausedAt ?? now; - const pauseDuration = Math.max(0, now - pausedAt); - const totalPausedDuration = (stream.totalPausedDuration ?? 0) + pauseDuration; - - const updatedStream = await prisma.stream.update({ - where: { streamId: parsedStreamId }, - data: { - isPaused: false, - pausedAt: null, - totalPausedDuration, - lastUpdateTime: now, - }, - }); - - // Create a RESUMED event - await prisma.streamEvent.create({ - data: { - streamId: parsedStreamId, - eventType: 'RESUMED', - transactionHash: result.txHash, - ledgerSequence: 0, // Will be updated by event indexer - timestamp: now, - metadata: JSON.stringify({ - resumedBy: authReq.user.publicKey, - pauseDuration, - }), - }, - }); - - logger.info(`Stream ${parsedStreamId} resumed by ${authReq.user.publicKey}`); + logger.info(`Stream ${parsedStreamId} resume simulated by ${authReq.user.publicKey}`); return res.status(200).json({ success: true, streamId: parsedStreamId, txHash: result.txHash, - stream: updatedStream, + stream, }); } catch (sorobanError) { logger.error(`Soroban resume failed for stream ${parsedStreamId}:`, sorobanError); diff --git a/backend/tests/integration/pause-resume.regression.test.ts b/backend/tests/integration/pause-resume.regression.test.ts new file mode 100644 index 00000000..d823d143 --- /dev/null +++ b/backend/tests/integration/pause-resume.regression.test.ts @@ -0,0 +1,152 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import request from 'supertest'; +import * as StellarSdk from '@stellar/stellar-sdk'; +import { SorobanEventWorker } from '../../src/workers/soroban-event-worker.js'; + +const { mockPauseStream, mockResumeStream, mockPrisma } = vi.hoisted(() => ({ + mockPauseStream: vi.fn(), + mockResumeStream: vi.fn(), + mockPrisma: { + stream: { + findUnique: vi.fn(), + update: vi.fn(), + findUniqueOrThrow: vi.fn(), + }, + streamEvent: { + create: vi.fn(), + upsert: vi.fn(), + findUnique: vi.fn(), + }, + $transaction: vi.fn(async (cb) => { + // Mock the transaction client as mockPrisma itself + return cb(mockPrisma); + }), + }, +})); + +vi.mock('../../src/lib/prisma.js', () => ({ + default: mockPrisma, + prisma: mockPrisma, +})); + +vi.mock('../../src/services/sorobanService.js', () => ({ + pauseStream: mockPauseStream, + resumeStream: mockResumeStream, +})); + +import app from '../../src/app.js'; + +function makeKeypair() { + return StellarSdk.Keypair.random(); +} + +function buildSignedTransaction(keypair: StellarSdk.Keypair, nonce: string): string { + const account = new StellarSdk.Account(keypair.publicKey(), '0'); + const tx = new StellarSdk.TransactionBuilder(account, { + fee: '100', + networkPassphrase: StellarSdk.Networks.TESTNET, + }) + .addOperation( + StellarSdk.Operation.manageData({ + name: 'auth', + value: Buffer.from(nonce, 'hex'), + }), + ) + .setTimeout(60) + .build(); + + tx.sign(keypair); + return tx.toXDR(); +} + +async function getValidJwt(keypair: StellarSdk.Keypair): Promise { + const challengeRes = await request(app) + .post('/v1/auth/challenge') + .send({ publicKey: keypair.publicKey() }); + + const nonce = challengeRes.body.nonce as string; + const signedTransaction = buildSignedTransaction(keypair, nonce); + + const verifyRes = await request(app) + .post('/v1/auth/verify') + .send({ publicKey: keypair.publicKey(), signedTransaction }); + + return verifyRes.body.token as string; +} + +describe('Regression #804: Pause/resume controller duplicate StreamEvent', () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it('pauses a stream and only writes one PAUSED event via indexer', async () => { + const sender = makeKeypair(); + const token = await getValidJwt(sender); + const streamId = 77; + + // 1. Controller flow + mockPrisma.stream.findUnique.mockResolvedValue({ + streamId, + sender: sender.publicKey(), + isActive: true, + isPaused: false, + }); + mockPauseStream.mockResolvedValue({ txHash: 'simulated-pause-77' }); + + const pauseRes = await request(app) + .post(`/v1/streams/${streamId}/pause`) + .set('Authorization', `Bearer ${token}`); + + expect(pauseRes.status).toBe(200); + + // Controller should NOT write to DB for PAUSED event + expect(mockPrisma.streamEvent.create).not.toHaveBeenCalled(); + expect(mockPrisma.stream.update).not.toHaveBeenCalled(); + + // 2. Indexer flow + const worker = new SorobanEventWorker(); + + const mockEvent = { + id: 'event1', + ledger: 100, + txHash: 'real-tx-hash', + topic: [ + StellarSdk.xdr.ScVal.scvSymbol('stream_paused'), + StellarSdk.nativeToScVal(streamId, { type: 'u64' }), + ], + value: StellarSdk.xdr.ScVal.scvMap([ + new StellarSdk.xdr.ScMapEntry({ + key: StellarSdk.xdr.ScVal.scvSymbol('sender'), + val: new StellarSdk.Address(sender.publicKey()).toScVal(), + }), + new StellarSdk.xdr.ScMapEntry({ + key: StellarSdk.xdr.ScVal.scvSymbol('paused_at'), + val: StellarSdk.nativeToScVal(Math.floor(Date.now() / 1000), { type: 'u64' }), + }), + ]), + inSuccessfulContractCall: true, + } as any; + + mockPrisma.streamEvent.findUnique.mockResolvedValue(null); + + await worker.processEvent(mockEvent); + + // Indexer should write exactly one PAUSED event + expect(mockPrisma.streamEvent.upsert).toHaveBeenCalledTimes(1); + expect(mockPrisma.streamEvent.upsert).toHaveBeenCalledWith( + expect.objectContaining({ + create: expect.objectContaining({ + eventType: 'PAUSED', + transactionHash: 'real-tx-hash', + }), + }), + ); + expect(mockPrisma.stream.update).toHaveBeenCalledTimes(1); + expect(mockPrisma.stream.update).toHaveBeenCalledWith( + expect.objectContaining({ + where: { streamId }, + data: expect.objectContaining({ isPaused: true }), + }), + ); + }); +}); diff --git a/backend/tests/integration/stream-actions.test.ts b/backend/tests/integration/stream-actions.test.ts index 37af6332..afd128ff 100644 --- a/backend/tests/integration/stream-actions.test.ts +++ b/backend/tests/integration/stream-actions.test.ts @@ -124,22 +124,6 @@ describe('stream action routes', () => { txHash: 'pause-tx-hash', }); expect(mockPauseStream).toHaveBeenCalledWith(sender.publicKey(), 7); - expect(mockPrisma.stream.update).toHaveBeenCalledWith( - expect.objectContaining({ - where: { streamId: 7 }, - data: expect.objectContaining({ - isPaused: true, - }), - }), - ); - expect(mockPrisma.streamEvent.create).toHaveBeenCalledWith( - expect.objectContaining({ - data: expect.objectContaining({ - eventType: 'PAUSED', - transactionHash: 'pause-tx-hash', - }), - }), - ); }); it('rejects a raw signed transaction bearer token without a JWT', async () => { @@ -190,22 +174,6 @@ describe('stream action routes', () => { txHash: 'resume-tx-hash', }); expect(mockResumeStream).toHaveBeenCalledWith(sender.publicKey(), 9); - expect(mockPrisma.stream.update).toHaveBeenCalledWith( - expect.objectContaining({ - where: { streamId: 9 }, - data: expect.objectContaining({ - isPaused: false, - }), - }), - ); - expect(mockPrisma.streamEvent.create).toHaveBeenCalledWith( - expect.objectContaining({ - data: expect.objectContaining({ - eventType: 'RESUMED', - transactionHash: 'resume-tx-hash', - }), - }), - ); }); it('POST /v1/streams/:streamId/withdraw withdraws the claimable amount for the recipient', async () => {