Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 44 additions & 38 deletions backend/src/workers/soroban-event-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,17 @@ export class SorobanEventWorker {
const timestamp = Math.floor(Date.now() / 1000);

await prisma.$transaction(async (tx: Prisma.TransactionClient) => {
// Check for a duplicate BEFORE mutating any Stream fields so that a
// replayed event never re-applies the top-up.
const existingEvent = await tx.streamEvent.findUnique({
where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'TOPPED_UP' } },
select: { id: true },
});
if (existingEvent) {
logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=TOPPED_UP`);
return;
}

const stream = await tx.stream.findUniqueOrThrow({
where: { streamId },
select: { ratePerSecond: true, startTime: true, totalPausedDuration: true }
Expand All @@ -575,27 +586,19 @@ export class SorobanEventWorker {
},
});

const existingEvent = await tx.streamEvent.findUnique({
await tx.streamEvent.upsert({
where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'TOPPED_UP' } },
select: { id: true },
create: {
streamId,
eventType: 'TOPPED_UP',
amount,
transactionHash: event.txHash,
ledgerSequence: event.ledger,
timestamp,
metadata: JSON.stringify({ newDepositedAmount }),
},
update: {},
});
if (existingEvent) {
logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=TOPPED_UP`);
} else {
await tx.streamEvent.upsert({
where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'TOPPED_UP' } },
create: {
streamId,
eventType: 'TOPPED_UP',
amount,
transactionHash: event.txHash,
ledgerSequence: event.ledger,
timestamp,
metadata: JSON.stringify({ newDepositedAmount }),
},
update: {},
});
}
});

sseService.broadcastToStream(String(streamId), 'stream.topped_up', {
Expand Down Expand Up @@ -624,6 +627,17 @@ export class SorobanEventWorker {
const timestamp = Number(decodeU64(body['timestamp']));

await prisma.$transaction(async (tx: Prisma.TransactionClient) => {
// Check for a duplicate BEFORE mutating any Stream fields so that a
// replayed event never double-increments withdrawnAmount.
const existingEvent = await tx.streamEvent.findUnique({
where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'WITHDRAWN' } },
select: { id: true },
});
if (existingEvent) {
logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=WITHDRAWN`);
return;
}

const stream = await tx.stream.findUniqueOrThrow({
where: { streamId },
select: { withdrawnAmount: true },
Expand All @@ -641,27 +655,19 @@ export class SorobanEventWorker {
},
});

const existingEvent = await tx.streamEvent.findUnique({
await tx.streamEvent.upsert({
where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'WITHDRAWN' } },
select: { id: true },
create: {
streamId,
eventType: 'WITHDRAWN',
amount,
transactionHash: event.txHash,
ledgerSequence: event.ledger,
timestamp,
metadata: JSON.stringify({ recipient }),
},
update: {},
});
if (existingEvent) {
logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=WITHDRAWN`);
} else {
await tx.streamEvent.upsert({
where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'WITHDRAWN' } },
create: {
streamId,
eventType: 'WITHDRAWN',
amount,
transactionHash: event.txHash,
ledgerSequence: event.ledger,
timestamp,
metadata: JSON.stringify({ recipient }),
},
update: {},
});
}
});

sseService.broadcastToStream(String(streamId), 'stream.withdrawn', {
Expand Down
135 changes: 135 additions & 0 deletions backend/tests/soroban-event-worker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,141 @@ describe('SorobanEventWorker', () => {
expect(logger.warn).toHaveBeenCalledWith(expect.stringContaining('Duplicate StreamEvent skipped'));
});

it('should not double-increment withdrawnAmount when a tokens_withdrawn event is re-processed', async () => {
const txHash = 'withdraw-tx-hash';
const streamId = 21;

const mockEvent: rpc.Api.EventResponse = {
id: 'withdraw-event-1',
type: 'contract',
ledger: 4000,
ledgerClosedAt: '2024-01-01T00:00:00Z',
txHash,
transactionIndex: 0,
operationIndex: 0,
inSuccessfulContractCall: true,
topic: [
{ switch: () => ({ value: 0 }), sym: () => 'tokens_withdrawn' } as any,
{ switch: () => ({ value: 1 }), u64: () => ({ toString: () => streamId.toString() }) } as any,
],
value: {
switch: () => ({ value: 4 }),
map: () => [
{ key: () => ({ sym: () => 'recipient' }), val: () => ({ address: () => ({ switch: () => ({ value: 0 }), accountId: () => ({ ed25519: () => Buffer.alloc(32) }) }) }) },
{ key: () => ({ sym: () => 'amount' }), val: () => ({ i128: () => ({ hi: () => ({ toString: () => '0' }), lo: () => ({ toString: () => '500' }) }) }) },
{ key: () => ({ sym: () => 'timestamp' }), val: () => ({ u64: () => ({ toString: () => '1700002000' }) }) },
] as any,
} as any,
};

// withdrawnAmount starts at '1000'; a single successful withdrawal of
// 500 should bring it to '1500' and stay there under replay.
const mockTx = {
stream: {
findUniqueOrThrow: vi.fn().mockResolvedValue({ withdrawnAmount: '1000' }),
update: vi.fn().mockResolvedValue({}),
},
streamEvent: {
findUnique: vi.fn(),
upsert: vi.fn().mockResolvedValue({ id: 'withdraw-event-row' }),
},
};

(prisma.$transaction as ReturnType<typeof vi.fn>).mockImplementation((cb) => cb(mockTx));

// First processing: no existing event → withdrawnAmount is updated once.
mockTx.streamEvent.findUnique.mockResolvedValueOnce(null);
await expect((worker as any).handleTokensWithdrawn(mockEvent, mockEvent.topic![1])).resolves.not.toThrow();
expect(mockTx.stream.update).toHaveBeenCalledTimes(1);
expect(mockTx.stream.update).toHaveBeenCalledWith({
where: { streamId },
data: { withdrawnAmount: '1500', lastUpdateTime: 1700002000 },
});
expect(mockTx.streamEvent.upsert).toHaveBeenCalledTimes(1);
expect(logger.warn).not.toHaveBeenCalled();

vi.clearAllMocks();
(prisma.$transaction as ReturnType<typeof vi.fn>).mockImplementation((cb) => cb(mockTx));

// Second processing (replay of same txHash): the event now exists, so
// withdrawnAmount must NOT be touched a second time.
mockTx.streamEvent.findUnique.mockResolvedValueOnce({ id: 'withdraw-event-row' });
await expect((worker as any).handleTokensWithdrawn(mockEvent, mockEvent.topic![1])).resolves.not.toThrow();
expect(mockTx.stream.update).not.toHaveBeenCalled();
expect(mockTx.streamEvent.upsert).not.toHaveBeenCalled();
expect(logger.warn).toHaveBeenCalledWith(expect.stringContaining('Duplicate StreamEvent skipped'));
});

it('should not double-apply depositedAmount/endTime when a stream_topped_up event is re-processed', async () => {
const txHash = 'topup-tx-hash';
const streamId = 22;

const mockEvent: rpc.Api.EventResponse = {
id: 'topup-event-1',
type: 'contract',
ledger: 4001,
ledgerClosedAt: '2024-01-01T00:00:00Z',
txHash,
transactionIndex: 0,
operationIndex: 0,
inSuccessfulContractCall: true,
topic: [
{ switch: () => ({ value: 0 }), sym: () => 'stream_topped_up' } as any,
{ switch: () => ({ value: 1 }), u64: () => ({ toString: () => streamId.toString() }) } as any,
],
value: {
switch: () => ({ value: 4 }),
map: () => [
{ key: () => ({ sym: () => 'amount' }), val: () => ({ i128: () => ({ hi: () => ({ toString: () => '0' }), lo: () => ({ toString: () => '200' }) }) }) },
{ key: () => ({ sym: () => 'new_deposited_amount' }), val: () => ({ i128: () => ({ hi: () => ({ toString: () => '0' }), lo: () => ({ toString: () => '1200' }) }) }) },
] as any,
} as any,
};

const mockTx = {
stream: {
findUniqueOrThrow: vi.fn().mockResolvedValue({
ratePerSecond: '10',
startTime: 1700000000,
totalPausedDuration: 0,
}),
update: vi.fn().mockResolvedValue({}),
},
streamEvent: {
findUnique: vi.fn(),
upsert: vi.fn().mockResolvedValue({ id: 'topup-event-row' }),
},
};

(prisma.$transaction as ReturnType<typeof vi.fn>).mockImplementation((cb) => cb(mockTx));

// First processing: no existing event → depositedAmount/endTime are set once.
mockTx.streamEvent.findUnique.mockResolvedValueOnce(null);
await expect((worker as any).handleStreamToppedUp(mockEvent, mockEvent.topic![1])).resolves.not.toThrow();
expect(mockTx.stream.update).toHaveBeenCalledTimes(1);
const firstUpdateArgs = mockTx.stream.update.mock.calls[0]![0];
expect(firstUpdateArgs.data.depositedAmount).toBe('1200');
const expectedEndTime = firstUpdateArgs.data.endTime;
expect(mockTx.streamEvent.upsert).toHaveBeenCalledTimes(1);
expect(logger.warn).not.toHaveBeenCalled();

vi.clearAllMocks();
(prisma.$transaction as ReturnType<typeof vi.fn>).mockImplementation((cb) => cb(mockTx));

// Second processing (replay of same txHash): the event now exists, so
// depositedAmount/endTime must NOT be re-applied.
mockTx.streamEvent.findUnique.mockResolvedValueOnce({ id: 'topup-event-row' });
await expect((worker as any).handleStreamToppedUp(mockEvent, mockEvent.topic![1])).resolves.not.toThrow();
expect(mockTx.stream.update).not.toHaveBeenCalled();
expect(mockTx.streamEvent.upsert).not.toHaveBeenCalled();
expect(logger.warn).toHaveBeenCalledWith(expect.stringContaining('Duplicate StreamEvent skipped'));

// Sanity check: depositedAmount/endTime from the (only) applied update
// match what a single application should produce.
expect(firstUpdateArgs.data.depositedAmount).toBe('1200');
expect(expectedEndTime).toBe(1700000000 + Math.floor(1200 / 10) + 0);
});

it('should process admin_transferred events successfully', async () => {
const txHash = 'admin-transferred-tx-hash';

Expand Down
Loading