From 60d8061be697aed72b474876f70725ceaba223aa Mon Sep 17 00:00:00 2001 From: BernardOnuh Date: Wed, 1 Jul 2026 22:58:44 +0100 Subject: [PATCH] fix(#806): guard withdrawnAmount/depositedAmount mutations behind idempotency check MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds replay tests for handleTokensWithdrawn and handleStreamToppedUp, and fixes a bug where the duplicate-event check ran after the Stream mutation instead of before it — a replayed tokens_withdrawn event would double-count withdrawnAmount. Closes #806 --- backend/src/workers/soroban-event-worker.ts | 82 ++++++------ backend/tests/soroban-event-worker.test.ts | 135 ++++++++++++++++++++ 2 files changed, 179 insertions(+), 38 deletions(-) diff --git a/backend/src/workers/soroban-event-worker.ts b/backend/src/workers/soroban-event-worker.ts index 6c9b9aa4..63231189 100644 --- a/backend/src/workers/soroban-event-worker.ts +++ b/backend/src/workers/soroban-event-worker.ts @@ -553,6 +553,17 @@ export class SorobanEventWorker { const timestamp = Math.floor(Date.now() / 1000); await prisma.$transaction(async (tx: Prisma.TransactionClient) => { + // Check for a duplicate BEFORE mutating any Stream fields so that a + // replayed event never re-applies the top-up. + const existingEvent = await tx.streamEvent.findUnique({ + where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'TOPPED_UP' } }, + select: { id: true }, + }); + if (existingEvent) { + logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=TOPPED_UP`); + return; + } + const stream = await tx.stream.findUniqueOrThrow({ where: { streamId }, select: { ratePerSecond: true, startTime: true, totalPausedDuration: true } @@ -575,27 +586,19 @@ export class SorobanEventWorker { }, }); - const existingEvent = await tx.streamEvent.findUnique({ + await tx.streamEvent.upsert({ where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'TOPPED_UP' } }, - select: { id: true }, + create: { + streamId, + eventType: 'TOPPED_UP', + amount, + transactionHash: event.txHash, + ledgerSequence: event.ledger, + timestamp, + metadata: JSON.stringify({ newDepositedAmount }), + }, + update: {}, }); - if (existingEvent) { - logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=TOPPED_UP`); - } else { - await tx.streamEvent.upsert({ - where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'TOPPED_UP' } }, - create: { - streamId, - eventType: 'TOPPED_UP', - amount, - transactionHash: event.txHash, - ledgerSequence: event.ledger, - timestamp, - metadata: JSON.stringify({ newDepositedAmount }), - }, - update: {}, - }); - } }); sseService.broadcastToStream(String(streamId), 'stream.topped_up', { @@ -624,6 +627,17 @@ export class SorobanEventWorker { const timestamp = Number(decodeU64(body['timestamp'])); await prisma.$transaction(async (tx: Prisma.TransactionClient) => { + // Check for a duplicate BEFORE mutating any Stream fields so that a + // replayed event never double-increments withdrawnAmount. + const existingEvent = await tx.streamEvent.findUnique({ + where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'WITHDRAWN' } }, + select: { id: true }, + }); + if (existingEvent) { + logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=WITHDRAWN`); + return; + } + const stream = await tx.stream.findUniqueOrThrow({ where: { streamId }, select: { withdrawnAmount: true }, @@ -641,27 +655,19 @@ export class SorobanEventWorker { }, }); - const existingEvent = await tx.streamEvent.findUnique({ + await tx.streamEvent.upsert({ where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'WITHDRAWN' } }, - select: { id: true }, + create: { + streamId, + eventType: 'WITHDRAWN', + amount, + transactionHash: event.txHash, + ledgerSequence: event.ledger, + timestamp, + metadata: JSON.stringify({ recipient }), + }, + update: {}, }); - if (existingEvent) { - logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=WITHDRAWN`); - } else { - await tx.streamEvent.upsert({ - where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'WITHDRAWN' } }, - create: { - streamId, - eventType: 'WITHDRAWN', - amount, - transactionHash: event.txHash, - ledgerSequence: event.ledger, - timestamp, - metadata: JSON.stringify({ recipient }), - }, - update: {}, - }); - } }); sseService.broadcastToStream(String(streamId), 'stream.withdrawn', { diff --git a/backend/tests/soroban-event-worker.test.ts b/backend/tests/soroban-event-worker.test.ts index 775d8fee..772017f8 100644 --- a/backend/tests/soroban-event-worker.test.ts +++ b/backend/tests/soroban-event-worker.test.ts @@ -443,6 +443,141 @@ describe('SorobanEventWorker', () => { expect(logger.warn).toHaveBeenCalledWith(expect.stringContaining('Duplicate StreamEvent skipped')); }); + it('should not double-increment withdrawnAmount when a tokens_withdrawn event is re-processed', async () => { + const txHash = 'withdraw-tx-hash'; + const streamId = 21; + + const mockEvent: rpc.Api.EventResponse = { + id: 'withdraw-event-1', + type: 'contract', + ledger: 4000, + ledgerClosedAt: '2024-01-01T00:00:00Z', + txHash, + transactionIndex: 0, + operationIndex: 0, + inSuccessfulContractCall: true, + topic: [ + { switch: () => ({ value: 0 }), sym: () => 'tokens_withdrawn' } as any, + { switch: () => ({ value: 1 }), u64: () => ({ toString: () => streamId.toString() }) } as any, + ], + value: { + switch: () => ({ value: 4 }), + map: () => [ + { key: () => ({ sym: () => 'recipient' }), val: () => ({ address: () => ({ switch: () => ({ value: 0 }), accountId: () => ({ ed25519: () => Buffer.alloc(32) }) }) }) }, + { key: () => ({ sym: () => 'amount' }), val: () => ({ i128: () => ({ hi: () => ({ toString: () => '0' }), lo: () => ({ toString: () => '500' }) }) }) }, + { key: () => ({ sym: () => 'timestamp' }), val: () => ({ u64: () => ({ toString: () => '1700002000' }) }) }, + ] as any, + } as any, + }; + + // withdrawnAmount starts at '1000'; a single successful withdrawal of + // 500 should bring it to '1500' and stay there under replay. + const mockTx = { + stream: { + findUniqueOrThrow: vi.fn().mockResolvedValue({ withdrawnAmount: '1000' }), + update: vi.fn().mockResolvedValue({}), + }, + streamEvent: { + findUnique: vi.fn(), + upsert: vi.fn().mockResolvedValue({ id: 'withdraw-event-row' }), + }, + }; + + (prisma.$transaction as ReturnType).mockImplementation((cb) => cb(mockTx)); + + // First processing: no existing event → withdrawnAmount is updated once. + mockTx.streamEvent.findUnique.mockResolvedValueOnce(null); + await expect((worker as any).handleTokensWithdrawn(mockEvent, mockEvent.topic![1])).resolves.not.toThrow(); + expect(mockTx.stream.update).toHaveBeenCalledTimes(1); + expect(mockTx.stream.update).toHaveBeenCalledWith({ + where: { streamId }, + data: { withdrawnAmount: '1500', lastUpdateTime: 1700002000 }, + }); + expect(mockTx.streamEvent.upsert).toHaveBeenCalledTimes(1); + expect(logger.warn).not.toHaveBeenCalled(); + + vi.clearAllMocks(); + (prisma.$transaction as ReturnType).mockImplementation((cb) => cb(mockTx)); + + // Second processing (replay of same txHash): the event now exists, so + // withdrawnAmount must NOT be touched a second time. + mockTx.streamEvent.findUnique.mockResolvedValueOnce({ id: 'withdraw-event-row' }); + await expect((worker as any).handleTokensWithdrawn(mockEvent, mockEvent.topic![1])).resolves.not.toThrow(); + expect(mockTx.stream.update).not.toHaveBeenCalled(); + expect(mockTx.streamEvent.upsert).not.toHaveBeenCalled(); + expect(logger.warn).toHaveBeenCalledWith(expect.stringContaining('Duplicate StreamEvent skipped')); + }); + + it('should not double-apply depositedAmount/endTime when a stream_topped_up event is re-processed', async () => { + const txHash = 'topup-tx-hash'; + const streamId = 22; + + const mockEvent: rpc.Api.EventResponse = { + id: 'topup-event-1', + type: 'contract', + ledger: 4001, + ledgerClosedAt: '2024-01-01T00:00:00Z', + txHash, + transactionIndex: 0, + operationIndex: 0, + inSuccessfulContractCall: true, + topic: [ + { switch: () => ({ value: 0 }), sym: () => 'stream_topped_up' } as any, + { switch: () => ({ value: 1 }), u64: () => ({ toString: () => streamId.toString() }) } as any, + ], + value: { + switch: () => ({ value: 4 }), + map: () => [ + { key: () => ({ sym: () => 'amount' }), val: () => ({ i128: () => ({ hi: () => ({ toString: () => '0' }), lo: () => ({ toString: () => '200' }) }) }) }, + { key: () => ({ sym: () => 'new_deposited_amount' }), val: () => ({ i128: () => ({ hi: () => ({ toString: () => '0' }), lo: () => ({ toString: () => '1200' }) }) }) }, + ] as any, + } as any, + }; + + const mockTx = { + stream: { + findUniqueOrThrow: vi.fn().mockResolvedValue({ + ratePerSecond: '10', + startTime: 1700000000, + totalPausedDuration: 0, + }), + update: vi.fn().mockResolvedValue({}), + }, + streamEvent: { + findUnique: vi.fn(), + upsert: vi.fn().mockResolvedValue({ id: 'topup-event-row' }), + }, + }; + + (prisma.$transaction as ReturnType).mockImplementation((cb) => cb(mockTx)); + + // First processing: no existing event → depositedAmount/endTime are set once. + mockTx.streamEvent.findUnique.mockResolvedValueOnce(null); + await expect((worker as any).handleStreamToppedUp(mockEvent, mockEvent.topic![1])).resolves.not.toThrow(); + expect(mockTx.stream.update).toHaveBeenCalledTimes(1); + const firstUpdateArgs = mockTx.stream.update.mock.calls[0]![0]; + expect(firstUpdateArgs.data.depositedAmount).toBe('1200'); + const expectedEndTime = firstUpdateArgs.data.endTime; + expect(mockTx.streamEvent.upsert).toHaveBeenCalledTimes(1); + expect(logger.warn).not.toHaveBeenCalled(); + + vi.clearAllMocks(); + (prisma.$transaction as ReturnType).mockImplementation((cb) => cb(mockTx)); + + // Second processing (replay of same txHash): the event now exists, so + // depositedAmount/endTime must NOT be re-applied. + mockTx.streamEvent.findUnique.mockResolvedValueOnce({ id: 'topup-event-row' }); + await expect((worker as any).handleStreamToppedUp(mockEvent, mockEvent.topic![1])).resolves.not.toThrow(); + expect(mockTx.stream.update).not.toHaveBeenCalled(); + expect(mockTx.streamEvent.upsert).not.toHaveBeenCalled(); + expect(logger.warn).toHaveBeenCalledWith(expect.stringContaining('Duplicate StreamEvent skipped')); + + // Sanity check: depositedAmount/endTime from the (only) applied update + // match what a single application should produce. + expect(firstUpdateArgs.data.depositedAmount).toBe('1200'); + expect(expectedEndTime).toBe(1700000000 + Math.floor(1200 / 10) + 0); + }); + it('should process admin_transferred events successfully', async () => { const txHash = 'admin-transferred-tx-hash';