Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ The application reads configurations from the `.env` file at the root.
| `PRICE_REFRESH_INTERVAL_SECONDS` | Refresh interval in seconds | 30 | No |
| `PRICE_STALE_THRESHOLD_MINUTES` | Stale threshold in minutes | 5 | No |
| `PRICE_ANOMALY_THRESHOLD_PCT` | Anomaly detection threshold % | 20 | No |
| `CIRCUIT_BREAKER_FAILURE_THRESHOLD` | Source failures before opening a price-source circuit | 3 | No |
| `CIRCUIT_BREAKER_SUCCESS_THRESHOLD` | Half-open successes required to close a circuit | 1 | No |
| `CIRCUIT_BREAKER_TIMEOUT_MS` | Open-circuit cool-down before a half-open probe | 30000 | No |
| `ADMIN_API_KEY` | Bootstrap admin bearer token for API key management | empty | Yes, for protected endpoints |
| `LOG_LEVEL` | Logging level: `debug`, `info`, `warn`, or `error` | info | No |

Expand Down
8 changes: 8 additions & 0 deletions src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ const env = cleanEnv(rawEnv, {
PRICE_REFRESH_INTERVAL_SECONDS: num({ default: 30 }),
PRICE_STALE_THRESHOLD_MINUTES: num({ default: 5 }),
PRICE_ANOMALY_THRESHOLD_PCT: num({ default: 20 }),
CIRCUIT_BREAKER_FAILURE_THRESHOLD: num({ default: 3 }),
CIRCUIT_BREAKER_SUCCESS_THRESHOLD: num({ default: 1 }),
CIRCUIT_BREAKER_TIMEOUT_MS: num({ default: 30000 }),
LOG_LEVEL: str({
default: 'info',
choices: ['debug', 'info', 'warn', 'error'],
Expand Down Expand Up @@ -74,6 +77,11 @@ module.exports = {
refreshInterval: env.PRICE_REFRESH_INTERVAL_SECONDS,
staleThresholdMinutes: env.PRICE_STALE_THRESHOLD_MINUTES,
anomalyThresholdPercent: env.PRICE_ANOMALY_THRESHOLD_PCT,
circuitBreaker: {
failureThreshold: env.CIRCUIT_BREAKER_FAILURE_THRESHOLD,
successThreshold: env.CIRCUIT_BREAKER_SUCCESS_THRESHOLD,
timeoutMs: env.CIRCUIT_BREAKER_TIMEOUT_MS,
},
},
auth: {
adminApiKey: env.ADMIN_API_KEY,
Expand Down
2 changes: 2 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const helmet = require('helmet');
const config = require('./config');
const logger = require('./logger');
const cache = require('./services/cache');
const priceOracle = require('./services/priceOracle');
const priceRefreshJob = require('./jobs/priceRefresh');
const webhookRetryWorker = require('./jobs/webhookRetryWorker');
const buildCorsMiddleware = require('./middleware/cors');
Expand Down Expand Up @@ -35,6 +36,7 @@ app.get('/health', (req, res) => {
timestamp: new Date().toISOString(),
redis_connected: redisConnected,
redis_unavailable: !redisConnected,
circuits: priceOracle.getCircuitStates(),
});
});

Expand Down
39 changes: 34 additions & 5 deletions src/services/priceOracle.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,27 @@ const coingecko = require('./sources/coingecko');
const coinmarketcap = require('./sources/coinmarketcap');
const config = require('../config');
const logger = require('../logger');
const { CircuitBreaker } = require('../utils/circuitBreaker');

const CACHE_PREFIX = 'price:';
const HISTORY_PREFIX = 'price:history:';
const breakerOptions = config.price.circuitBreaker;
const SOURCES = [
{ name: 'stellar_dex', fetch: stellarDex.fetchPrice },
{ name: 'coingecko', fetch: coingecko.fetchPrice },
{ name: 'coinmarketcap', fetch: coinmarketcap.fetchPrice },
{
name: 'stellar_dex',
fetch: stellarDex.fetchPrice,
breaker: new CircuitBreaker('stellar_dex', breakerOptions),
},
{
name: 'coingecko',
fetch: coingecko.fetchPrice,
breaker: new CircuitBreaker('coingecko', breakerOptions),
},
{
name: 'coinmarketcap',
fetch: coinmarketcap.fetchPrice,
breaker: new CircuitBreaker('coinmarketcap', breakerOptions),
},
];

function median(values) {
Expand Down Expand Up @@ -79,7 +93,7 @@ async function fetchFromAllSources(assetCode, issuer) {

for (const source of SOURCES) {
try {
const price = await source.fetch(assetCode, issuer);
const price = await source.breaker.call(() => source.fetch(assetCode, issuer));
if (price !== null && price > 0) {
results.push({ source: source.name, price });
}
Expand All @@ -91,6 +105,19 @@ async function fetchFromAllSources(assetCode, issuer) {
return results;
}

function getCircuitStates() {
return SOURCES.reduce((states, source) => {
states[source.name] = source.breaker.getState();
return states;
}, {});
}

function resetCircuitBreakers() {
for (const source of SOURCES) {
source.breaker.reset();
}
}

async function getPrice(assetCode, issuer = null) {
const cacheKey = buildCacheKey(assetCode, issuer);
let redisUnavailable = false;
Expand Down Expand Up @@ -126,7 +153,7 @@ async function getPrice(assetCode, issuer = null) {

async function fetchFreshPrice(assetCode, issuer = null, redisUnavailable = false) {
const sourceResults = await fetchFromAllSources(assetCode, issuer);
const sourcesAttempted = sourceResults.map((r) => r.name);
const sourcesAttempted = sourceResults.map((r) => r.source);
const prices = sourceResults.map((r) => r.price);

const aggregatedPrice = median(prices);
Expand Down Expand Up @@ -235,5 +262,7 @@ async function refreshAllCachedPrices() {
module.exports = {
getPrice,
fetchFreshPrice,
getCircuitStates,
resetCircuitBreakers,
refreshAllCachedPrices,
};
143 changes: 143 additions & 0 deletions src/utils/circuitBreaker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
'use strict';

const logger = require('../logger');

const STATES = Object.freeze({
CLOSED: 'closed',
OPEN: 'open',
HALF_OPEN: 'half-open',
});

class CircuitBreaker {
constructor(name, options = {}) {
this.name = name;
this.failureThreshold = Math.max(1, options.failureThreshold ?? 3);
this.successThreshold = Math.max(1, options.successThreshold ?? 1);
this.timeoutMs = Math.max(1, options.timeoutMs ?? 30000);
this._now = options.now || Date.now;
this._logger = options.logger || logger;

this.state = STATES.CLOSED;
this.failureCount = 0;
this.successCount = 0;
this.openedAt = null;
this.halfOpenInFlight = false;
}

getState() {
this._moveToHalfOpenIfReady();
return this.state;
}

isOpen() {
return this.getState() === STATES.OPEN;
}

async call(fn) {
this._moveToHalfOpenIfReady();

if (this.state === STATES.OPEN) {
this._logger.info('Circuit breaker open, skipping source call', {
source: this.name,
state: this.state,
});
return null;
}

if (this.state === STATES.HALF_OPEN && this.halfOpenInFlight) {
this._logger.info('Circuit breaker half-open probe already in flight, skipping source call', {
source: this.name,
state: this.state,
});
return null;
}

const probing = this.state === STATES.HALF_OPEN;
if (probing) {
this.halfOpenInFlight = true;
}

try {
const result = await fn();
if (result === null || result === undefined) {
this.recordFailure();
} else {
this.recordSuccess();
}
return result ?? null;
} catch (err) {
this.recordFailure();
throw err;
} finally {
if (probing) {
this.halfOpenInFlight = false;
}
}
}

recordSuccess() {
if (this.state === STATES.HALF_OPEN) {
this.successCount += 1;
if (this.successCount >= this.successThreshold) {
this._transitionTo(STATES.CLOSED, { reason: 'success-threshold' });
}
return;
}

if (this.state === STATES.CLOSED) {
this.failureCount = 0;
}
}

recordFailure() {
if (this.state === STATES.HALF_OPEN) {
this._transitionTo(STATES.OPEN, { reason: 'half-open-failure' });
return;
}

if (this.state === STATES.CLOSED) {
this.failureCount += 1;
if (this.failureCount >= this.failureThreshold) {
this._transitionTo(STATES.OPEN, { reason: 'failure-threshold' });
}
}
}

reset() {
this._transitionTo(STATES.CLOSED, { reason: 'manual-reset' });
}

_moveToHalfOpenIfReady() {
if (this.state !== STATES.OPEN || this.openedAt === null) {
return;
}

if (this._now() - this.openedAt >= this.timeoutMs) {
this._transitionTo(STATES.HALF_OPEN, { reason: 'cooldown-elapsed' });
}
}

_transitionTo(nextState, metadata = {}) {
if (this.state === nextState) {
return;
}

const previousState = this.state;
this.state = nextState;
this.failureCount = 0;
this.successCount = 0;
this.openedAt = nextState === STATES.OPEN ? this._now() : null;

this._logger.info('Circuit breaker state changed', {
source: this.name,
from: previousState,
to: nextState,
...metadata,
});
}
}

module.exports = {
CircuitBreaker,
STATES,
};
89 changes: 89 additions & 0 deletions test/circuitBreaker.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
'use strict';

const { CircuitBreaker, STATES } = require('../src/utils/circuitBreaker');

function buildBreaker(options = {}) {
let now = 1000;
const logger = {
info: jest.fn(),
};

const breaker = new CircuitBreaker('coingecko', {
failureThreshold: 2,
successThreshold: 1,
timeoutMs: 100,
now: () => now,
logger,
...options,
});

return {
breaker,
logger,
advance(ms) {
now += ms;
},
};
}

describe('CircuitBreaker', () => {
test('opens after repeated failures and skips calls while cooling down', async () => {
const { breaker, logger } = buildBreaker();

await expect(breaker.call(async () => null)).resolves.toBeNull();
await expect(breaker.call(async () => null)).resolves.toBeNull();

expect(breaker.getState()).toBe(STATES.OPEN);

const sourceFetch = jest.fn(async () => 0.12);
await expect(breaker.call(sourceFetch)).resolves.toBeNull();

expect(sourceFetch).not.toHaveBeenCalled();
expect(logger.info).toHaveBeenCalledWith(
'Circuit breaker state changed',
expect.objectContaining({
source: 'coingecko',
from: STATES.CLOSED,
to: STATES.OPEN,
reason: 'failure-threshold',
})
);
});

test('moves to half-open after cooldown and closes on a successful probe', async () => {
const { breaker, advance } = buildBreaker();

await breaker.call(async () => null);
await breaker.call(async () => null);

advance(100);
expect(breaker.getState()).toBe(STATES.HALF_OPEN);

await expect(breaker.call(async () => 0.12)).resolves.toBe(0.12);

expect(breaker.getState()).toBe(STATES.CLOSED);
});

test('reopens when the half-open probe fails', async () => {
const { breaker, advance } = buildBreaker();

await breaker.call(async () => null);
await breaker.call(async () => null);

advance(100);
await expect(breaker.call(async () => null)).resolves.toBeNull();

expect(breaker.getState()).toBe(STATES.OPEN);
});

test('records thrown source errors as failures and rethrows them', async () => {
const { breaker } = buildBreaker();
const error = new Error('rate limited');

await expect(breaker.call(async () => {
throw error;
})).rejects.toThrow('rate limited');

expect(breaker.getState()).toBe(STATES.CLOSED);
});
});
5 changes: 5 additions & 0 deletions test/config.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ describe('configuration validation', () => {
refreshInterval: 30,
staleThresholdMinutes: 5,
anomalyThresholdPercent: 20,
circuitBreaker: {
failureThreshold: 3,
successThreshold: 1,
timeoutMs: 30000,
},
},
});
});
Expand Down
Loading