Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions test/metrics-outbox-disabled.test.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Mock console.dir to capture logs ConsoleMetricExporter writes
// Capture exported metric data via ConsoleMetricExporter's console.dir output
const consoleDirLogs = []
jest.spyOn(console, 'dir').mockImplementation((...args) => {
consoleDirLogs.push(args)
})

const cds = require('@sap/cds')
const { setTimeout: wait } = require('node:timers/promises')
const { metrics } = require('@opentelemetry/api')

const { expect, GET } = cds.test(__dirname + '/bookshop', '--with-mocks', '--profile', 'metrics-outbox-disabled')

Expand Down Expand Up @@ -41,7 +41,10 @@ describe('queue metrics is disabled', () => {

await GET('/odata/v4/proxy/proxyCallToExternalServiceOne', admin)

await wait(150) // Wait for metrics to be collected
// Drain the metric pipeline deterministically. If queue metrics were enabled they'd appear
// in `consoleDirLogs` right after this returns; since they're disabled, the assertions below
// verify nothing was emitted — no sleep needed.
await metrics.getMeterProvider().forceFlush?.()

expect(metricValue('cold_entries')).to.eq(null)
expect(metricValue('remaining_entries')).to.eq(null)
Expand Down
200 changes: 111 additions & 89 deletions test/metrics-outbox-multitenant.test.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
// Mock console.dir to capture logs ConsoleMetricExporter writes
// Capture exported metric data via ConsoleMetricExporter's console.dir output
const consoleDirLogs = []
jest.spyOn(console, 'dir').mockImplementation((...args) => {
consoleDirLogs.push(args)
})

const cds = require('@sap/cds')
const { metrics } = require('@opentelemetry/api')
const { setTimeout: wait } = require('node:timers/promises')

const { expect, GET, axios } = cds.test(
Expand All @@ -28,6 +29,25 @@ function metricValue(tenant, metric) {
return mostRecentTenantDataPoint ? mostRecentTenantDataPoint.value : null
}

// State-based wait: force the metric provider to export, then re-run the assertion block.
// Replaces all fixed-time `wait(150)`-style sleeps — the loop completes the instant the in-memory
// queue statistics (kept fresh by the existing cds.spawn poller) reflect the asserted state.
async function expectEventually(assertion, { timeout = 10000, interval = 25 } = {}) {
const start = Date.now()
let lastError
while (true) {
await metrics.getMeterProvider().forceFlush?.()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Same forceFlush?.() silent no-op risk as in metrics-outbox.test.js — if the meter provider doesn't expose forceFlush, the loop spins for the full 10 s timeout on every positive assertion.

Suggested change
await metrics.getMeterProvider().forceFlush?.()
const provider = metrics.getMeterProvider()
if (typeof provider.forceFlush !== 'function') throw new Error('MeterProvider does not support forceFlush — is @opentelemetry/sdk-metrics registered?')
await provider.forceFlush()

Double-check suggestion before committing. Edit this comment for amendments.


Please provide feedback on the review comment by checking the appropriate box:

  • 🌟 Awesome comment, a human might have missed that.
  • ✅ Helpful comment
  • 🤷 Neutral
  • ❌ This comment is not helpful

try {
assertion()
return
} catch (err) {
lastError = err
if (Date.now() - start >= timeout) throw lastError
await wait(interval)
}
}
}

describe('queue metrics for multi tenant service', () => {
const T1 = 'tenant_1'
const T2 = 'tenant_2'
Expand Down Expand Up @@ -78,34 +98,37 @@ describe('queue metrics for multi tenant service', () => {
GET('/odata/v4/proxy/proxyCallToExternalServiceOne', user[T2])
])

await wait(150) // Wait for metrics to be collected

expect(metricValue(T1, 'cold_entries')).to.eq(0)
expect(metricValue(T1, 'incoming_messages')).to.eq(totalInc[T1])
expect(metricValue(T1, 'outgoing_messages')).to.eq(totalOut[T1])
expect(metricValue(T1, 'remaining_entries')).to.eq(0)
expect(metricValue(T1, 'min_storage_time_in_seconds')).to.eq(0)
expect(metricValue(T1, 'med_storage_time_in_seconds')).to.eq(0)
expect(metricValue(T1, 'max_storage_time_in_seconds')).to.eq(0)

expect(metricValue(T2, 'cold_entries')).to.eq(0)
expect(metricValue(T2, 'incoming_messages')).to.eq(totalInc[T2])
expect(metricValue(T2, 'outgoing_messages')).to.eq(totalOut[T2])
expect(metricValue(T2, 'remaining_entries')).to.eq(0)
expect(metricValue(T2, 'min_storage_time_in_seconds')).to.eq(0)
expect(metricValue(T2, 'med_storage_time_in_seconds')).to.eq(0)
expect(metricValue(T2, 'max_storage_time_in_seconds')).to.eq(0)
await expectEventually(() => {
expect(metricValue(T1, 'cold_entries')).to.eq(0)
expect(metricValue(T1, 'incoming_messages')).to.eq(totalInc[T1])
expect(metricValue(T1, 'outgoing_messages')).to.eq(totalOut[T1])
expect(metricValue(T1, 'remaining_entries')).to.eq(0)
expect(metricValue(T1, 'min_storage_time_in_seconds')).to.eq(0)
expect(metricValue(T1, 'med_storage_time_in_seconds')).to.eq(0)
expect(metricValue(T1, 'max_storage_time_in_seconds')).to.eq(0)

expect(metricValue(T2, 'cold_entries')).to.eq(0)
expect(metricValue(T2, 'incoming_messages')).to.eq(totalInc[T2])
expect(metricValue(T2, 'outgoing_messages')).to.eq(totalOut[T2])
expect(metricValue(T2, 'remaining_entries')).to.eq(0)
expect(metricValue(T2, 'min_storage_time_in_seconds')).to.eq(0)
expect(metricValue(T2, 'med_storage_time_in_seconds')).to.eq(0)
expect(metricValue(T2, 'max_storage_time_in_seconds')).to.eq(0)
})
})
})

describe('given a target service that requires retries', () => {
let currentRetryCount, unboxedService

// Fail the first 3 attempts so the 4th delivers — see metrics-outbox.test.js for rationale.
const ATTEMPTS_TO_FAIL = 3

beforeAll(async () => {
unboxedService = await cds.connect.to('ExternalServiceOne')

unboxedService.before('call', req => {
if ((currentRetryCount[cds.context.tenant] += 1) <= 2) {
if ((currentRetryCount[cds.context.tenant] += 1) <= ATTEMPTS_TO_FAIL) {
totalFailed[cds.context.tenant] += 1
return req.reject({ status: 503 })
}
Expand All @@ -123,76 +146,77 @@ describe('queue metrics for multi tenant service', () => {
test('storage time increases before message can be delivered', async () => {
if (cds.version.split('.')[0] < 9) return

const timeOfInitialCall = Date.now()
await Promise.all([
GET('/odata/v4/proxy/proxyCallToExternalServiceOne', user[T1]),
GET('/odata/v4/proxy/proxyCallToExternalServiceOne', user[T2])
])
// Reference time taken after GETs return — i.e. after both messages are persisted in the outbox.
const timeOfInitialCall = Date.now()

// Wait for the first retry to be processed
while (currentRetryCount[T1] < 2) await wait(10)
while (currentRetryCount[T2] < 2) await wait(10)

// Wait until at least 1 second has passed since the initial call
const timeAfterFirstRetry = Date.now()
if (timeAfterFirstRetry - timeOfInitialCall < 1000) {
await wait(1000 - (timeAfterFirstRetry - timeOfInitialCall))
}
await wait(150) // ... for metrics to be collected

expect(metricValue(T1, 'cold_entries')).to.eq(0)
expect(metricValue(T1, 'incoming_messages')).to.eq(totalInc[T1])
expect(metricValue(T1, 'outgoing_messages')).to.eq(totalOut[T1])
expect(metricValue(T1, 'processing_failures')).to.eq(totalFailed[T1])
expect(metricValue(T1, 'remaining_entries')).to.eq(1)
expect(metricValue(T1, 'min_storage_time_in_seconds')).to.be.gte(1)
expect(metricValue(T1, 'med_storage_time_in_seconds')).to.be.gte(1)
expect(metricValue(T1, 'max_storage_time_in_seconds')).to.be.gte(1)

expect(metricValue(T2, 'cold_entries')).to.eq(0)
expect(metricValue(T2, 'incoming_messages')).to.eq(totalInc[T2])
expect(metricValue(T2, 'outgoing_messages')).to.eq(totalOut[T2])
expect(metricValue(T2, 'processing_failures')).to.eq(totalFailed[T2])
expect(metricValue(T2, 'remaining_entries')).to.eq(1)
expect(metricValue(T2, 'min_storage_time_in_seconds')).to.be.gte(1)
expect(metricValue(T2, 'med_storage_time_in_seconds')).to.be.gte(1)
expect(metricValue(T2, 'max_storage_time_in_seconds')).to.be.gte(1)

// Wait for the second retry to be processd
while (currentRetryCount[T1] < 3) await wait(10)
while (currentRetryCount[T2] < 3) await wait(10)
await wait(600) // ... for metrics to be collected

expect(metricValue(T1, 'cold_entries')).to.eq(0)
expect(metricValue(T1, 'incoming_messages')).to.eq(totalInc[T1])
expect(metricValue(T1, 'outgoing_messages')).to.eq(totalOut[T1])
expect(metricValue(T1, 'processing_failures')).to.eq(totalFailed[T1])
expect(metricValue(T1, 'remaining_entries')).to.eq(0)
expect(metricValue(T1, 'min_storage_time_in_seconds')).to.eq(0)
expect(metricValue(T1, 'med_storage_time_in_seconds')).to.eq(0)
expect(metricValue(T1, 'max_storage_time_in_seconds')).to.eq(0)

expect(metricValue(T2, 'cold_entries')).to.eq(0)
expect(metricValue(T2, 'incoming_messages')).to.eq(totalInc[T2])
expect(metricValue(T2, 'outgoing_messages')).to.eq(totalOut[T2])
expect(metricValue(T2, 'processing_failures')).to.eq(totalFailed[T2])
expect(metricValue(T2, 'remaining_entries')).to.eq(0)
expect(metricValue(T2, 'min_storage_time_in_seconds')).to.eq(0)
expect(metricValue(T2, 'med_storage_time_in_seconds')).to.eq(0)
expect(metricValue(T2, 'max_storage_time_in_seconds')).to.eq(0)
// Wait for both tenants to make their second attempt (= first retry).
await expectEventually(() => {
expect(currentRetryCount[T1]).to.be.gte(2)
expect(currentRetryCount[T2]).to.be.gte(2)
})

// The storage_time gauges need a real second to elapse since the messages were enqueued —
// this is the one place the test fundamentally depends on wall-clock time.
const elapsed = Date.now() - timeOfInitialCall
if (elapsed < 1500) await wait(1500 - elapsed)

await expectEventually(() => {
expect(metricValue(T1, 'cold_entries')).to.eq(0)
expect(metricValue(T1, 'incoming_messages')).to.eq(totalInc[T1])
expect(metricValue(T1, 'outgoing_messages')).to.eq(totalOut[T1])
expect(metricValue(T1, 'processing_failures')).to.eq(totalFailed[T1])
expect(metricValue(T1, 'remaining_entries')).to.eq(1)
expect(metricValue(T1, 'min_storage_time_in_seconds')).to.be.gte(1)
expect(metricValue(T1, 'med_storage_time_in_seconds')).to.be.gte(1)
expect(metricValue(T1, 'max_storage_time_in_seconds')).to.be.gte(1)

expect(metricValue(T2, 'cold_entries')).to.eq(0)
expect(metricValue(T2, 'incoming_messages')).to.eq(totalInc[T2])
expect(metricValue(T2, 'outgoing_messages')).to.eq(totalOut[T2])
expect(metricValue(T2, 'processing_failures')).to.eq(totalFailed[T2])
expect(metricValue(T2, 'remaining_entries')).to.eq(1)
expect(metricValue(T2, 'min_storage_time_in_seconds')).to.be.gte(1)
expect(metricValue(T2, 'med_storage_time_in_seconds')).to.be.gte(1)
expect(metricValue(T2, 'max_storage_time_in_seconds')).to.be.gte(1)
})

// Final attempt — the message is delivered and removed from the outbox.
await expectEventually(() => {
expect(currentRetryCount[T1]).to.be.gte(ATTEMPTS_TO_FAIL + 1)
expect(currentRetryCount[T2]).to.be.gte(ATTEMPTS_TO_FAIL + 1)

expect(metricValue(T1, 'cold_entries')).to.eq(0)
expect(metricValue(T1, 'incoming_messages')).to.eq(totalInc[T1])
expect(metricValue(T1, 'outgoing_messages')).to.eq(totalOut[T1])
expect(metricValue(T1, 'processing_failures')).to.eq(totalFailed[T1])
expect(metricValue(T1, 'remaining_entries')).to.eq(0)
expect(metricValue(T1, 'min_storage_time_in_seconds')).to.eq(0)
expect(metricValue(T1, 'med_storage_time_in_seconds')).to.eq(0)
expect(metricValue(T1, 'max_storage_time_in_seconds')).to.eq(0)

expect(metricValue(T2, 'cold_entries')).to.eq(0)
expect(metricValue(T2, 'incoming_messages')).to.eq(totalInc[T2])
expect(metricValue(T2, 'outgoing_messages')).to.eq(totalOut[T2])
expect(metricValue(T2, 'processing_failures')).to.eq(totalFailed[T2])
expect(metricValue(T2, 'remaining_entries')).to.eq(0)
expect(metricValue(T2, 'min_storage_time_in_seconds')).to.eq(0)
expect(metricValue(T2, 'med_storage_time_in_seconds')).to.eq(0)
expect(metricValue(T2, 'max_storage_time_in_seconds')).to.eq(0)
})
})
})

describe('given a taget service that fails unrecoverably', () => {
let unboxedService

const didProcess = { [T1]: false, [T2]: false }

beforeAll(async () => {
unboxedService = await cds.connect.to('ExternalServiceOne')

unboxedService.before('call', req => {
didProcess[cds.context.tenant] = true
totalFailed[cds.context.tenant] += 1
return req.reject({ status: 418, unrecoverable: true })
})
Expand All @@ -210,21 +234,19 @@ describe('queue metrics for multi tenant service', () => {
GET('/odata/v4/proxy/proxyCallToExternalServiceOne', user[T2])
])

while (!didProcess[T1]) await wait(10)
while (!didProcess[T2]) await wait(10)
await wait(500) // ... for metrics to be collected

expect(metricValue(T1, 'cold_entries')).to.eq(1)
expect(metricValue(T1, 'incoming_messages')).to.eq(totalInc[T1])
expect(metricValue(T1, 'outgoing_messages')).to.eq(totalOut[T1])
expect(metricValue(T1, 'processing_failures')).to.eq(totalFailed[T1])
expect(metricValue(T1, 'remaining_entries')).to.eq(0)

expect(metricValue(T2, 'cold_entries')).to.eq(1)
expect(metricValue(T2, 'incoming_messages')).to.eq(totalInc[T2])
expect(metricValue(T2, 'outgoing_messages')).to.eq(totalOut[T2])
expect(metricValue(T2, 'processing_failures')).to.eq(totalFailed[T2])
expect(metricValue(T2, 'remaining_entries')).to.eq(0)
await expectEventually(() => {
expect(metricValue(T1, 'cold_entries')).to.eq(1)
expect(metricValue(T1, 'incoming_messages')).to.eq(totalInc[T1])
expect(metricValue(T1, 'outgoing_messages')).to.eq(totalOut[T1])
expect(metricValue(T1, 'processing_failures')).to.eq(totalFailed[T1])
expect(metricValue(T1, 'remaining_entries')).to.eq(0)

expect(metricValue(T2, 'cold_entries')).to.eq(1)
expect(metricValue(T2, 'incoming_messages')).to.eq(totalInc[T2])
expect(metricValue(T2, 'outgoing_messages')).to.eq(totalOut[T2])
expect(metricValue(T2, 'processing_failures')).to.eq(totalFailed[T2])
expect(metricValue(T2, 'remaining_entries')).to.eq(0)
})
})
})
})
Loading