From 3a80cb11c2e870e8db48f1038410f0254eef04ec Mon Sep 17 00:00:00 2001 From: jlenon7 Date: Thu, 23 Apr 2026 22:41:20 -0300 Subject: [PATCH] feat: support @athenna/otel --- package-lock.json | 4 ++-- package.json | 2 +- src/events/EventImpl.ts | 19 ++++++++++++++++--- tests/unit/events/EventImplTest.ts | 30 ++++++++++++++++++++++++++++++ 4 files changed, 49 insertions(+), 6 deletions(-) diff --git a/package-lock.json b/package-lock.json index fb15bdf..123397d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@athenna/event", - "version": "5.5.0", + "version": "5.7.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@athenna/event", - "version": "5.5.0", + "version": "5.7.0", "license": "MIT", "devDependencies": { "@athenna/artisan": "^5.12.0", diff --git a/package.json b/package.json index 01956f4..776ff1f 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@athenna/event", - "version": "5.5.0", + "version": "5.7.0", "description": "The Athenna events handler with queue store. Based on Emittery syntax.", "license": "MIT", "author": "João Lenon ", diff --git a/src/events/EventImpl.ts b/src/events/EventImpl.ts index cc11d88..5cd860b 100644 --- a/src/events/EventImpl.ts +++ b/src/events/EventImpl.ts @@ -8,10 +8,12 @@ */ import { Config } from '@athenna/config' -import { Is, Macroable } from '@athenna/common' import { Listener } from '#src/events/Listener' -import { QueueImpl, type ConnectionOptions } from '@athenna/queue' +import { Is, Macroable, Module } from '@athenna/common' import type { EventClosure, Context } from '#src/types' +import { QueueImpl, type ConnectionOptions } from '@athenna/queue' + +const otelModule = await Module.safeImport('@athenna/otel') export class EventImpl extends Macroable { /** @@ -460,12 +462,23 @@ export class EventImpl extends Macroable { emittedAt: data.emittedAt } - await record.closure(ctx) + await this.runWithOtelContext(ctx, () => record.closure(ctx)) await queue.ack(job.id) }) } + private runWithOtelContext(ctx: Context, callback: () => T): T { + if (!Config.is('event.otel.contextEnabled', true) || !otelModule) { + return callback() + } + + return otelModule.Otel.withContext(callback, { + bindings: Config.get('event.otel.contextBindings', []), + resolveBinding: binding => binding.resolve(ctx) + }) + } + private parseConnectionName(con: string) { if (con === 'default') { return Config.get('event.store') diff --git a/tests/unit/events/EventImplTest.ts b/tests/unit/events/EventImplTest.ts index 0987ccf..d885a4c 100644 --- a/tests/unit/events/EventImplTest.ts +++ b/tests/unit/events/EventImplTest.ts @@ -291,6 +291,36 @@ export class EventImplTest { assert.equal(specific, 1) } + @Test() + public async shouldRunQueuedListenersThroughOtelContextWrapper({ assert }: Context) { + const event = Event.store('memoryA') as any + const originalRunWithOtelContext = event.runWithOtelContext?.bind(event) + let wrappedCtx: any = null + let closureCtx: any = null + + event.runWithOtelContext = (ctx, callback) => { + wrappedCtx = ctx + + return callback() + } + + try { + event.on('otel:event', ctx => { + closureCtx = ctx + }) + + await event.emit('otel:event', { ok: true }) + await Sleep.for(40).milliseconds().wait() + } finally { + event.runWithOtelContext = originalRunWithOtelContext + } + + assert.isDefined(wrappedCtx) + assert.deepEqual(wrappedCtx, closureCtx) + assert.equal(wrappedCtx.event, 'otel:event') + assert.deepEqual(wrappedCtx.data, { ok: true }) + } + @Test() public async shouldBeAbleToEmitSequentially({ assert }: Context) { const event = Event.store('memoryA')