From f164dacebc505511b4e458d1d3ab8b5f06f73755 Mon Sep 17 00:00:00 2001 From: Banx17 Date: Wed, 1 Jul 2026 18:53:20 +0100 Subject: [PATCH] fix(indexer): remove duplicate soroban indexer service (#801) - Removed legacy `sorobanIndexerService` from `index.ts`. - Made `SorobanEventWorker.handleTokensWithdrawn` idempotent. - Deleted obsolete tests for legacy service. - Added regression test for single indexer #801. --- backend/src/index.ts | 9 +- .../src/services/soroban-indexer.service.ts | 247 ------------------ backend/src/workers/soroban-event-worker.ts | 60 +++-- .../tests/single-indexer.regression.test.ts | 154 +++++++++++ backend/tests/soroban-indexer.test.ts | 20 -- 5 files changed, 191 insertions(+), 299 deletions(-) delete mode 100644 backend/src/services/soroban-indexer.service.ts create mode 100644 backend/tests/single-indexer.regression.test.ts delete mode 100644 backend/tests/soroban-indexer.test.ts diff --git a/backend/src/index.ts b/backend/src/index.ts index f31f9d42..f285f265 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -1,7 +1,6 @@ import dotenv from "dotenv"; import app from "./app.js"; import logger from "./logger.js"; -import { sorobanIndexerService } from "./services/soroban-indexer.service.js"; import { startWorkers, stopWorkers } from "./workers/index.js"; import { sseService } from "./services/sse.service.js"; import { connectRedis, disconnectRedis } from "./lib/redis.js"; @@ -29,7 +28,6 @@ const startServer = async () => { ); }); - sorobanIndexerService.start(); await startWorkers(); const shutdown = async (signal: string) => { @@ -41,12 +39,7 @@ const startServer = async () => { // 2. Stop accepting new HTTP connections server.close(); - // 3. Stop indexers (clears poll timers) - try { - sorobanIndexerService.stop?.(); - } catch (err) { - logger.warn("Error while stopping soroban indexer:", err); - } + // 3. Stop the indexer worker (clears poll timers) stopWorkers(); // 4. Wait for in-flight indexer batch to finish (max 30s) diff --git a/backend/src/services/soroban-indexer.service.ts b/backend/src/services/soroban-indexer.service.ts deleted file mode 100644 index 9e2f7299..00000000 --- a/backend/src/services/soroban-indexer.service.ts +++ /dev/null @@ -1,247 +0,0 @@ -import { prisma } from '../lib/prisma.js'; -import logger from '../logger.js'; - -type JsonRecord = Record; - -interface RpcEvent { - id?: string; - ledger?: number; - ledgerSequence?: number; - txHash?: string; - topic?: unknown[]; - value?: unknown; - contractId?: string; -} - -interface RpcResponse { - result?: { - events?: RpcEvent[]; - }; - error?: { - message?: string; - }; -} - -type IndexedEventType = 'CREATED' | 'CANCELLED' | 'WITHDRAWN' | 'COMPLETED'; - -const RPC_URL = process.env.SOROBAN_RPC_URL ?? 'https://soroban-testnet.stellar.org'; -const POLL_MS = Number(process.env.SOROBAN_INDEXER_POLL_MS ?? 15000); -const START_LEDGER = Number(process.env.SOROBAN_INDEXER_START_LEDGER ?? 0); -const STREAM_CONTRACT_ID = process.env.STREAM_CONTRACT_ID ?? ''; - -export class SorobanIndexerService { - private timer: NodeJS.Timeout | null = null; - private running = false; - private lastLedger = START_LEDGER; - - start() { - if (this.running) return; - this.running = true; - - void this.poll(); - this.timer = setInterval(() => { - void this.poll(); - }, POLL_MS); - - logger.info(`Soroban indexer started (poll=${POLL_MS}ms, startLedger=${this.lastLedger})`); - } - - stop() { - if (this.timer) clearInterval(this.timer); - this.timer = null; - this.running = false; - } - - private async poll() { - if (!STREAM_CONTRACT_ID) return; - - try { - const events = await this.fetchEvents(this.lastLedger + 1); - if (events.length === 0) return; - - let maxLedger = this.lastLedger; - for (const event of events) { - const ledger = Number(event.ledgerSequence ?? event.ledger ?? 0); - if (ledger > maxLedger) maxLedger = ledger; - await this.indexEvent(event, ledger); - } - - this.lastLedger = maxLedger; - } catch (error) { - logger.error('Soroban indexer poll failed', error); - } - } - - private async fetchEvents(startLedger: number): Promise { - const body = { - jsonrpc: '2.0', - id: 1, - method: 'getEvents', - params: { - startLedger, - filters: [{ type: 'contract', contractIds: [STREAM_CONTRACT_ID] }], - pagination: { limit: 100 }, - }, - }; - - const response = await fetch(RPC_URL, { - method: 'POST', - headers: { 'content-type': 'application/json' }, - body: JSON.stringify(body), - }); - - if (!response.ok) { - throw new Error(`getEvents failed: ${response.status}`); - } - - const payload = (await response.json()) as RpcResponse; - if (payload.error?.message) throw new Error(payload.error.message); - return payload.result?.events ?? []; - } - - private asRecord(value: unknown): JsonRecord | null { - if (!value || typeof value !== 'object' || Array.isArray(value)) return null; - return value as JsonRecord; - } - - private parseEventType(event: RpcEvent): IndexedEventType | null { - const firstTopic = Array.isArray(event.topic) && event.topic.length > 0 - ? String(event.topic[0]).toLowerCase() - : ''; - - if (firstTopic.includes('stream_created')) return 'CREATED'; - if (firstTopic.includes('stream_cancelled')) return 'CANCELLED'; - if (firstTopic.includes('tokens_withdrawn')) return 'WITHDRAWN'; - if (firstTopic.includes('stream_completed')) return 'COMPLETED'; - return null; - } - - private parseStreamId(record: JsonRecord): number | null { - const raw = record.stream_id ?? record.streamId; - if (typeof raw === 'number' && Number.isInteger(raw)) return raw; - if (typeof raw === 'string' && raw.trim()) { - const parsed = Number(raw); - if (Number.isInteger(parsed)) return parsed; - } - return null; - } - - private readString(record: JsonRecord, ...keys: string[]): string | null { - for (const key of keys) { - const value = record[key]; - if (typeof value === 'string' && value.trim()) return value; - } - return null; - } - - private async ensureUser(publicKey: string) { - await prisma.user.upsert({ - where: { publicKey }, - update: {}, - create: { publicKey }, - }); - } - - private async indexEvent(event: RpcEvent, ledgerSequence: number) { - const eventType = this.parseEventType(event); - if (!eventType) return; - - const value = this.asRecord(event.value); - if (!value) return; - - const streamId = this.parseStreamId(value); - if (!streamId) return; - - const txHash = event.txHash ?? event.id ?? `event-${streamId}-${ledgerSequence}-${eventType}`; - const timestamp = Math.floor(Date.now() / 1000); - - const existing = await prisma.streamEvent.findFirst({ - where: { - streamId, - eventType, - transactionHash: txHash, - ledgerSequence, - }, - select: { id: true }, - }); - if (existing) return; - - if (eventType === 'CREATED') { - const sender = this.readString(value, 'sender'); - const recipient = this.readString(value, 'recipient'); - const tokenAddress = this.readString(value, 'token_address', 'tokenAddress'); - const ratePerSecond = this.readString(value, 'rate_per_second', 'ratePerSecond'); - const depositedAmount = this.readString(value, 'deposited_amount', 'depositedAmount'); - const startTimeRaw = value.start_time ?? value.startTime ?? timestamp; - const startTime = Number(startTimeRaw); - - if (!sender || !recipient || !tokenAddress || !ratePerSecond || !depositedAmount) return; - - await this.ensureUser(sender); - await this.ensureUser(recipient); - - await prisma.stream.upsert({ - where: { streamId }, - update: { - sender, - recipient, - tokenAddress, - ratePerSecond, - depositedAmount, - lastUpdateTime: Number.isFinite(startTime) ? startTime : timestamp, - isActive: true, - }, - create: { - streamId, - sender, - recipient, - tokenAddress, - ratePerSecond, - depositedAmount, - withdrawnAmount: '0', - startTime: Number.isFinite(startTime) ? startTime : timestamp, - lastUpdateTime: Number.isFinite(startTime) ? startTime : timestamp, - isActive: true, - }, - }); - } else if (eventType === 'CANCELLED') { - await prisma.stream.updateMany({ - where: { streamId }, - data: { isActive: false, lastUpdateTime: timestamp }, - }); - } else if (eventType === 'WITHDRAWN') { - const stream = await prisma.stream.findUnique({ where: { streamId } }); - if (stream) { - const amount = this.readString(value, 'amount') ?? '0'; - const nextWithdrawn = (BigInt(stream.withdrawnAmount) + BigInt(amount)).toString(); - await prisma.stream.update({ - where: { streamId }, - data: { - withdrawnAmount: nextWithdrawn, - lastUpdateTime: timestamp, - isActive: BigInt(nextWithdrawn) < BigInt(stream.depositedAmount), - }, - }); - } - } else if (eventType === 'COMPLETED') { - await prisma.stream.updateMany({ - where: { streamId }, - data: { isActive: false, lastUpdateTime: timestamp }, - }); - } - - await prisma.streamEvent.create({ - data: { - streamId, - eventType, - amount: this.readString(value, 'amount'), - transactionHash: txHash, - ledgerSequence, - timestamp, - metadata: JSON.stringify({ topic: event.topic, value: event.value }), - }, - }); - } -} - -export const sorobanIndexerService = new SorobanIndexerService(); diff --git a/backend/src/workers/soroban-event-worker.ts b/backend/src/workers/soroban-event-worker.ts index 6c9b9aa4..509301ad 100644 --- a/backend/src/workers/soroban-event-worker.ts +++ b/backend/src/workers/soroban-event-worker.ts @@ -623,31 +623,38 @@ export class SorobanEventWorker { const amount = decodeI128(body['amount']); const timestamp = Number(decodeU64(body['timestamp'])); - await prisma.$transaction(async (tx: Prisma.TransactionClient) => { - const stream = await tx.stream.findUniqueOrThrow({ - where: { streamId }, - select: { withdrawnAmount: true }, - }); + const applied = await prisma.$transaction( + async (tx: Prisma.TransactionClient) => { + // Idempotency guard: withdrawnAmount is a *relative* increment + // (existing + amount), so re-observing the same WITHDRAWN event must + // NOT re-apply it. Check for the recorded event first and bail out + // before touching the balance when it already exists. + const existingEvent = await tx.streamEvent.findUnique({ + where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'WITHDRAWN' } }, + select: { id: true }, + }); + if (existingEvent) { + logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=WITHDRAWN`); + return false; + } + + const stream = await tx.stream.findUniqueOrThrow({ + where: { streamId }, + select: { withdrawnAmount: true }, + }); - const newWithdrawnAmount = ( - BigInt(stream.withdrawnAmount) + BigInt(amount) - ).toString(); + const newWithdrawnAmount = ( + BigInt(stream.withdrawnAmount) + BigInt(amount) + ).toString(); - await tx.stream.update({ - where: { streamId }, - data: { - withdrawnAmount: newWithdrawnAmount, - lastUpdateTime: timestamp, - }, - }); + await tx.stream.update({ + where: { streamId }, + data: { + withdrawnAmount: newWithdrawnAmount, + lastUpdateTime: timestamp, + }, + }); - const existingEvent = await tx.streamEvent.findUnique({ - where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'WITHDRAWN' } }, - select: { id: true }, - }); - if (existingEvent) { - logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=WITHDRAWN`); - } else { await tx.streamEvent.upsert({ where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'WITHDRAWN' } }, create: { @@ -661,8 +668,13 @@ export class SorobanEventWorker { }, update: {}, }); - } - }); + + return true; + }, + ); + + // Skip re-broadcasting SSE for an already-recorded (duplicate) event. + if (!applied) return; sseService.broadcastToStream(String(streamId), 'stream.withdrawn', { streamId, diff --git a/backend/tests/single-indexer.regression.test.ts b/backend/tests/single-indexer.regression.test.ts new file mode 100644 index 00000000..3fed8eb7 --- /dev/null +++ b/backend/tests/single-indexer.regression.test.ts @@ -0,0 +1,154 @@ +/** + * Regression tests for issue #801 — "Two indexers run concurrently". + * + * Previously both `sorobanIndexerService` (services/soroban-indexer.service.ts) + * and `SorobanEventWorker` (workers/soroban-event-worker.ts) polled + * STREAM_CONTRACT_ID and wrote Stream/StreamEvent rows. Their WITHDRAWN + * handlers each did a READ-then-ADD on `withdrawnAmount` in separate + * transactions, so the same event could be applied twice → inflated balance. + * + * These tests lock in the fix: + * 1. Only ONE indexer (the worker) polls the contract — the legacy service + * is deleted and is no longer wired into the server entry-point. + * 2. Observing the same WITHDRAWN event twice increments `withdrawnAmount` + * exactly once. + */ +import { readFileSync, existsSync } from 'node:fs'; +import { fileURLToPath } from 'node:url'; +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { rpc } from '@stellar/stellar-sdk'; + +// ─── Mocks (must be registered before importing the worker) ────────────────── + +vi.mock('../src/lib/prisma.js', () => ({ + default: { indexerState: { upsert: vi.fn() } }, + prisma: { + indexerState: { upsert: vi.fn() }, + stream: { findUniqueOrThrow: vi.fn(), update: vi.fn() }, + streamEvent: { findUnique: vi.fn(), upsert: vi.fn() }, + $transaction: vi.fn(), + }, +})); + +vi.mock('../src/services/sse.service.js', () => ({ + sseService: { broadcastToStream: vi.fn() }, +})); + +vi.mock('../src/logger.js', () => ({ + default: { info: vi.fn(), warn: vi.fn(), error: vi.fn() }, +})); + +import { SorobanEventWorker } from '../src/workers/soroban-event-worker.js'; +import { prisma } from '../src/lib/prisma.js'; +import { sseService } from '../src/services/sse.service.js'; +import logger from '../src/logger.js'; + +const srcUrl = (rel: string) => fileURLToPath(new URL(rel, import.meta.url)); + +describe('#801 single indexer', () => { + describe('only one indexer instance polls the contract', () => { + it('deletes the legacy soroban-indexer.service module', () => { + expect(existsSync(srcUrl('../src/services/soroban-indexer.service.ts'))).toBe(false); + }); + + it('does not wire the legacy indexer service into the server entry-point', () => { + const indexSrc = readFileSync(srcUrl('../src/index.ts'), 'utf8'); + expect(indexSrc).not.toMatch(/soroban-indexer\.service/); + expect(indexSrc).not.toMatch(/sorobanIndexerService/); + // The worker remains the single indexer started at boot. + expect(indexSrc).toMatch(/startWorkers\(\)/); + }); + + it('exposes exactly one contract poller — the SorobanEventWorker', () => { + // startWorkers() is the only indexer bootstrap; it starts the worker and + // nothing else. (See workers/index.ts.) + const workersSrc = readFileSync(srcUrl('../src/workers/index.ts'), 'utf8'); + expect(workersSrc).toMatch(/sorobanEventWorker\.start\(\)/); + expect(workersSrc).not.toMatch(/sorobanIndexerService/); + }); + }); + + describe('withdrawnAmount is not double-incremented on a repeated WITHDRAWN event', () => { + let worker: SorobanEventWorker; + + const streamId = 7; + const buildEvent = (): rpc.Api.EventResponse => + ({ + id: 'withdraw-event-1', + type: 'contract', + ledger: 4000, + ledgerClosedAt: '2024-01-01T00:00:00Z', + txHash: 'withdraw-tx-hash', + transactionIndex: 0, + operationIndex: 0, + inSuccessfulContractCall: true, + topic: [ + { switch: () => ({ value: 0 }), sym: () => 'tokens_withdrawn' } as any, + { switch: () => ({ value: 1 }), u64: () => ({ toString: () => streamId.toString() }) } as any, + ], + value: { + switch: () => ({ value: 4 }), + map: () => [ + { key: () => ({ sym: () => 'recipient' }), val: () => ({ address: () => ({ switch: () => ({ value: 0 }), accountId: () => ({ ed25519: () => Buffer.alloc(32) }) }) }) }, + { key: () => ({ sym: () => 'amount' }), val: () => ({ i128: () => ({ hi: () => ({ toString: () => '0' }), lo: () => ({ toString: () => '100' }) }) }) }, + { key: () => ({ sym: () => 'timestamp' }), val: () => ({ u64: () => ({ toString: () => '1700005000' }) }) }, + ] as any, + } as any, + }) as rpc.Api.EventResponse; + + beforeEach(() => { + vi.clearAllMocks(); + worker = new SorobanEventWorker(); + }); + + it('applies the increment once and skips it when the event is seen again', async () => { + // A tiny stateful stand-in for the DB so the second observation genuinely + // sees the recorded event (as it would in production). + const db = { + withdrawnAmount: '0', + recordedEvent: null as { id: string } | null, + }; + + const mockTx = { + streamEvent: { + findUnique: vi.fn(async () => db.recordedEvent), + upsert: vi.fn(async () => { + db.recordedEvent = { id: 'withdraw-event-row' }; + return db.recordedEvent; + }), + }, + stream: { + findUniqueOrThrow: vi.fn(async () => ({ withdrawnAmount: db.withdrawnAmount })), + update: vi.fn(async ({ data }: { data: { withdrawnAmount: string } }) => { + db.withdrawnAmount = data.withdrawnAmount; + return {}; + }), + }, + }; + + (prisma.$transaction as ReturnType).mockImplementation( + (cb: (tx: typeof mockTx) => unknown) => cb(mockTx), + ); + + const event = buildEvent(); + + // First observation: increment 0 → 100 and record the event. + await (worker as any).handleTokensWithdrawn(event, event.topic![1]); + expect(db.withdrawnAmount).toBe('100'); + expect(mockTx.stream.update).toHaveBeenCalledTimes(1); + expect(mockTx.streamEvent.upsert).toHaveBeenCalledTimes(1); + expect(sseService.broadcastToStream).toHaveBeenCalledTimes(1); + + // Second observation of the SAME event: must be a no-op for the balance. + await (worker as any).handleTokensWithdrawn(event, event.topic![1]); + expect(db.withdrawnAmount).toBe('100'); // NOT '200' + expect(mockTx.stream.update).toHaveBeenCalledTimes(1); // still once + expect(mockTx.streamEvent.upsert).toHaveBeenCalledTimes(1); // still once + // No duplicate SSE notification for the repeated event. + expect(sseService.broadcastToStream).toHaveBeenCalledTimes(1); + expect(logger.warn).toHaveBeenCalledWith( + expect.stringContaining('Duplicate StreamEvent skipped'), + ); + }); + }); +}); diff --git a/backend/tests/soroban-indexer.test.ts b/backend/tests/soroban-indexer.test.ts deleted file mode 100644 index 50bb04c7..00000000 --- a/backend/tests/soroban-indexer.test.ts +++ /dev/null @@ -1,20 +0,0 @@ -import { describe, it, vi, beforeEach } from 'vitest'; -import { sorobanIndexerService } from '../src/services/soroban-indexer.service.js'; - -vi.mock('../src/logger.js', () => ({ - default: { - info: vi.fn(), - error: vi.fn(), - }, -})); - -describe('Soroban Indexer Service', () => { - beforeEach(() => { - vi.clearAllMocks(); - }); - - it('should start and stop the indexer', () => { - sorobanIndexerService.start(); - sorobanIndexerService.stop(); - }); -});