From 61aa2f3aa7bb12251703c6a1e20e38cf25589f0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Mon, 15 Jun 2026 15:38:05 +0200 Subject: [PATCH 1/2] =?UTF-8?q?feat(audit):=20close=20the=20scheduler?= =?UTF-8?q?=E2=86=92notification=20correlation=20round-trip=20(Story=2013.?= =?UTF-8?q?7=20gate=20#3)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Carry a single correlation id from the originating schedule through to the notification's terminal audit line, so one id links schedule_fired to notification_sent across the round-trip. - NotificationCommand extends AuditableCommand; ScheduleNotification is now a case class (carries a per-instance cid) and TriggerSchedule4Notification propagates it to the self-sent ScheduleNotification. - Scheduler2NotificationProcessorStream derives the cid from schedule.correlationId (deterministic fallback) and emits schedule_fired / schedule_not_fired / schedule_failed (actor=scheduler). - NotificationBehavior threads the cid (notification.correlationId orElse cmd.correlationId) through sendNotification and stamps it on the notification before send, so notification_sent/_failed carry it; created schedules carry it too (AddNotification / ResendNotification / ScheduleNotification). - NotificationAuditLog promoted to a proper object exposing the shared AuditLog("notification"). - testkit: round-trip assertions (schedule.correlationId == cid) + logback.xml. --- .../notification/message/package.scala | 8 +- .../audit/NotificationAuditLog.scala | 12 +++ ...cheduler2NotificationProcessorStream.scala | 31 +++++++- .../typed/NotificationBehavior.scala | 51 +++++++++---- testkit/src/test/resources/logback.xml | 76 +++++++++++++++++++ .../SimpleMailNotificationsHandlerSpec.scala | 59 ++++++++++---- 6 files changed, 202 insertions(+), 35 deletions(-) create mode 100644 core/src/main/scala/app/softnetwork/notification/audit/NotificationAuditLog.scala create mode 100644 testkit/src/test/resources/logback.xml diff --git a/common/src/main/scala/app/softnetwork/notification/message/package.scala b/common/src/main/scala/app/softnetwork/notification/message/package.scala index 0f4c06e..b94079f 100644 --- a/common/src/main/scala/app/softnetwork/notification/message/package.scala +++ b/common/src/main/scala/app/softnetwork/notification/message/package.scala @@ -9,10 +9,10 @@ import app.softnetwork.notification.model.NotificationStatusResult */ package object message { - sealed trait NotificationCommand extends EntityCommand + sealed trait NotificationCommand extends EntityCommand with AuditableCommand @SerialVersionUID(0L) - case object ScheduleNotification extends NotificationCommand with AllEntities + case class ScheduleNotification() extends NotificationCommand with AllEntities @SerialVersionUID(0L) case class AddNotification[T <: Notification](notification: T) extends NotificationCommand { @@ -33,7 +33,9 @@ package object message { @SerialVersionUID(0L) case class GetNotificationStatus(id: String) extends NotificationCommand - case class TriggerSchedule4Notification(schedule: Schedule) extends NotificationCommand { + case class TriggerSchedule4Notification(schedule: Schedule) + extends NotificationCommand + with AuditableCommand { override val id: String = schedule.entityId } diff --git a/core/src/main/scala/app/softnetwork/notification/audit/NotificationAuditLog.scala b/core/src/main/scala/app/softnetwork/notification/audit/NotificationAuditLog.scala new file mode 100644 index 0000000..84bcf63 --- /dev/null +++ b/core/src/main/scala/app/softnetwork/notification/audit/NotificationAuditLog.scala @@ -0,0 +1,12 @@ +package app.softnetwork.notification.audit + +import app.softnetwork.persistence.audit.AuditLog + +object NotificationAuditLog { + + /** Story 13.7 — structured audit trail. service = "notification"; correlationId is threaded as + * data on the notification (proto field, exposed on the Notification trait), never via MDC. + */ + private[notification] lazy val audit: AuditLog = AuditLog("notification") + +} diff --git a/core/src/main/scala/app/softnetwork/notification/persistence/query/Scheduler2NotificationProcessorStream.scala b/core/src/main/scala/app/softnetwork/notification/persistence/query/Scheduler2NotificationProcessorStream.scala index 213fd71..f54224d 100644 --- a/core/src/main/scala/app/softnetwork/notification/persistence/query/Scheduler2NotificationProcessorStream.scala +++ b/core/src/main/scala/app/softnetwork/notification/persistence/query/Scheduler2NotificationProcessorStream.scala @@ -4,6 +4,7 @@ import akka.actor.typed.eventstream.EventStream.Publish import app.softnetwork.scheduler.model.Schedule import app.softnetwork.persistence.query.{JournalProvider, OffsetProvider} import app.softnetwork.scheduler.persistence.query.Scheduler2EntityProcessorStream +import app.softnetwork.notification.audit.NotificationAuditLog._ import app.softnetwork.notification.handlers.NotificationHandler import app.softnetwork.notification.message._ @@ -16,13 +17,39 @@ trait Scheduler2NotificationProcessorStream _: JournalProvider with OffsetProvider with NotificationHandler => override protected def triggerSchedule(schedule: Schedule): Future[Boolean] = { - !?(TriggerSchedule4Notification(schedule)) map { + val cid = schedule.correlationId.getOrElse( + s"schedule#${schedule.persistenceId}#${schedule.entityId}#${schedule.key}" + ) + val cmd = TriggerSchedule4Notification(schedule) + cmd.withCorrelationId(cid) + !?(cmd) map { case result: Schedule4NotificationTriggered => + audit.event( + cid, + "schedule_fired", + "actor" -> "scheduler", + "schedule_key" -> schedule.key + ) if (forTests) { system.eventStream.tell(Publish(result)) } true - case _ => false + case Schedule4NotificationNotTriggered => + audit.event( + cid, + "schedule_not_fired", + "actor" -> "scheduler", + "schedule_key" -> schedule.key + ) + false + case _ => + audit.event( + cid, + "schedule_failed", + "actor" -> "scheduler", + "schedule_key" -> schedule.key + ) + false } } } diff --git a/core/src/main/scala/app/softnetwork/notification/persistence/typed/NotificationBehavior.scala b/core/src/main/scala/app/softnetwork/notification/persistence/typed/NotificationBehavior.scala index 489a8c8..6c268e9 100644 --- a/core/src/main/scala/app/softnetwork/notification/persistence/typed/NotificationBehavior.scala +++ b/core/src/main/scala/app/softnetwork/notification/persistence/typed/NotificationBehavior.scala @@ -3,6 +3,7 @@ package app.softnetwork.notification.persistence.typed import akka.actor.typed.scaladsl.{ActorContext, TimerScheduler} import akka.actor.typed.{ActorRef, ActorSystem} import akka.persistence.typed.scaladsl.Effect +import app.softnetwork.notification.audit.NotificationAuditLog._ import app.softnetwork.notification.config.NotificationSettings import org.slf4j.Logger import app.softnetwork.scheduler.message.SchedulerEvents.{ @@ -12,7 +13,6 @@ import app.softnetwork.scheduler.message.SchedulerEvents.{ } import app.softnetwork.scheduler.message.{AddSchedule, RemoveSchedule} import app.softnetwork.scheduler.model.Schedule -import app.softnetwork.persistence.audit.AuditLog import app.softnetwork.persistence.now import app.softnetwork.persistence.typed._ import app.softnetwork.notification.message._ @@ -46,11 +46,6 @@ trait NotificationBehavior[T <: Notification] private[this] val delay = 1 - /** Story 13.7 — structured audit trail. service = "notification"; correlationId is threaded as - * data on the notification (proto field, exposed on the Notification trait), never via MDC. - */ - private[this] lazy val audit: AuditLog = AuditLog("notification") - /** Set event tags, which will be used in persistence query * * @param entityId @@ -120,7 +115,8 @@ trait NotificationBehavior[T <: Notification] delay, Some(true), Some(now()), - None + None, + correlationId = notification.correlationId.orElse(cmd.correlationId) ) ) ) @@ -151,12 +147,24 @@ trait NotificationBehavior[T <: Notification] ) .thenRun(_ => { NotificationRemoved ~> replyTo }) //.thenStop() - case cmd: SendNotification[T] => sendNotification(entityId, cmd.notification, replyTo) + case cmd: SendNotification[T] => + sendNotification( + entityId, + cmd.notification, + cmd.notification.correlationId.orElse(cmd.correlationId), + replyTo + ) - case _: ResendNotification => + case cmd: ResendNotification => state match { - case Some(notification) => sendNotification(entityId, notification, replyTo) - case _ => Effect.none.thenRun(_ => NotificationNotFound ~> replyTo) + case Some(notification) => + sendNotification( + entityId, + notification, + notification.correlationId.orElse(cmd.correlationId), + replyTo + ) + case _ => Effect.none.thenRun(_ => NotificationNotFound ~> replyTo) } case _: GetNotificationStatus => @@ -188,15 +196,23 @@ trait NotificationBehavior[T <: Notification] case cmd: TriggerSchedule4Notification => import cmd.schedule._ if (key == notificationTimerKey) { - context.self ! ScheduleNotification + val trigger = ScheduleNotification() + correlationId.foreach(trigger.withCorrelationId) + context.self ! trigger Effect.none.thenRun(_ => Schedule4NotificationTriggered(cmd.schedule) ~> replyTo) } else { Effect.none.thenRun(_ => Schedule4NotificationNotTriggered ~> replyTo) } - case ScheduleNotification => + case cmd: ScheduleNotification => state match { - case Some(notification) => sendNotification(entityId, notification, replyTo) + case Some(notification) => + sendNotification( + entityId, + notification, + notification.correlationId.orElse(cmd.correlationId), + replyTo + ) case _ => // should never be the case Effect .persist( @@ -279,7 +295,8 @@ trait NotificationBehavior[T <: Notification] delay, Some(true), Some(now()), - None + None, + correlationId = correlationId ) ) ) @@ -361,6 +378,7 @@ trait NotificationBehavior[T <: Notification] private[this] def sendNotification( entityId: String, notification: T, + cid: Option[String], replyTo: Option[ActorRef[NotificationCommandResult]] )(implicit log: Logger, @@ -419,7 +437,8 @@ trait NotificationBehavior[T <: Notification] notification .withNbTries(nbTries) .copyWithAck(notificationAck) - case _ => notification + .withCorrelationId(cid.getOrElse("-")) + case _ => notification.withCorrelationId(cid.getOrElse("-")) } val event: Option[NotificationRecordedEvent] = diff --git a/testkit/src/test/resources/logback.xml b/testkit/src/test/resources/logback.xml new file mode 100644 index 0000000..53e0bde --- /dev/null +++ b/testkit/src/test/resources/logback.xml @@ -0,0 +1,76 @@ + + + + + notification.log + + notification_%d{yyyy-MM-dd}.log + + + [%date{ISO8601}] [%level] [%logger] [%marker] [%thread] - %msg MDC: {%mdc}%n + + + + + 8192 + true + + + + + + + %date{MM/dd HH:mm:ss} %-5level[%thread] %logger{1} - %msg%n + + + + {# Dedicated audit appender — SYNCHRONOUS, never wrapped by ASYNC/neverBlock (C4): audit lines + must never be silently dropped under backpressure. PVC-backed file, ~1y of daily rolls; Loki + holds the authoritative 1y via the promtail sidecar that tails this file → {stream="audit"}. #} + + audit.log + + audit-%d{yyyy-MM-dd}.log + 7 + {# Bound total on-disk audit size so a burst can't fill the log PVC. The appender is + synchronous/non-dropping, so a full disk would otherwise stall the emitting thread. #} + 1GB + + + {# Do NOT ServiceLoader-scan the classpath for Jackson modules. Under JDK 11+ the + transitive jackson-module-jaxb-annotations (from rapidoid/dumbster) would try to load + javax.xml.bind.annotation.XmlElement — removed from the JDK in Java 11 — and blow up + with NoClassDefFoundError. The encoder registers the modules it actually needs itself. #} + false + false + + (?i)[a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,}***@*** + (?i)\b(?:sk|pk|whsec|rk)_[a-z0-9_]+\b*** + (?i)bearer\s+[a-z0-9._/+=-]+Bearer *** + + + + + {# additivity=false so audit lines do NOT also propagate to root (no duplicate in app.log). + Audit routes ONLY to AUDIT_FILE; the sidecar tails it for {stream="audit"}. No STDOUT copy — + that would be re-shipped to {stream="app"} by the node promtail, double-ingesting every audit + line and polluting the operational stream. #} + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/testkit/src/test/scala/app/softnetwork/notification/handlers/SimpleMailNotificationsHandlerSpec.scala b/testkit/src/test/scala/app/softnetwork/notification/handlers/SimpleMailNotificationsHandlerSpec.scala index c63be10..5f6b7ef 100644 --- a/testkit/src/test/scala/app/softnetwork/notification/handlers/SimpleMailNotificationsHandlerSpec.scala +++ b/testkit/src/test/scala/app/softnetwork/notification/handlers/SimpleMailNotificationsHandlerSpec.scala @@ -35,11 +35,16 @@ class SimpleMailNotificationsHandlerSpec "add notification" in { val uuid = "add" - this ? (uuid, AddNotification(generateMail(uuid))) await { + val cid = s"cid-$uuid" + this ? (uuid, AddNotification(generateMail(uuid).withCorrelationId(cid))) await { case n: NotificationAdded => n.uuid shouldBe uuid + val message = probe.receiveMessage() assert( - probe.receiveMessage().schedule.uuid == s"MailNotification#$uuid#NotificationTimerKey" + message.schedule.uuid == s"MailNotification#$uuid#NotificationTimerKey" + ) + assert( + message.schedule.correlationId.getOrElse("") == cid ) case _ => fail() } @@ -47,11 +52,18 @@ class SimpleMailNotificationsHandlerSpec "add notification with attachment(s)" in { val uuid = "addWithAttachments" - this ? (uuid, AddNotification(generateMail(uuid, Seq(attachment)))) await { + val cid = s"cid-$uuid" + this ? (uuid, AddNotification( + generateMail(uuid, Seq(attachment)).withCorrelationId(cid) + )) await { case n: NotificationAdded => n.uuid shouldBe uuid + val message = probe.receiveMessage() + assert( + message.schedule.uuid == s"MailNotification#$uuid#NotificationTimerKey" + ) assert( - probe.receiveMessage().schedule.uuid == s"MailNotification#$uuid#NotificationTimerKey" + message.schedule.correlationId.getOrElse("") == cid ) case _ => fail() } @@ -59,11 +71,16 @@ class SimpleMailNotificationsHandlerSpec "remove notification" in { val uuid = "remove" - this ? (uuid, AddNotification(generateMail(uuid))) await { + val cid = s"cid-$uuid" + this ? (uuid, AddNotification(generateMail(uuid).withCorrelationId(cid))) await { case n: NotificationAdded => n.uuid shouldBe uuid + val message = probe.receiveMessage() + assert( + message.schedule.uuid == s"MailNotification#$uuid#NotificationTimerKey" + ) assert( - probe.receiveMessage().schedule.uuid == s"MailNotification#$uuid#NotificationTimerKey" + message.schedule.correlationId.getOrElse("") == cid ) this ? (uuid, RemoveNotification(uuid)) await { case _: NotificationRemoved.type => succeed @@ -75,7 +92,8 @@ class SimpleMailNotificationsHandlerSpec "send notification" in { val uuid = "send" - this ? (uuid, SendNotification(generateMail(uuid))) await { + val cid = s"cid-$uuid" + this ? (uuid, SendNotification(generateMail(uuid).withCorrelationId(cid))) await { case n: NotificationSent => assert(n.uuid == uuid) case _ => fail() @@ -84,7 +102,10 @@ class SimpleMailNotificationsHandlerSpec "send notification with attachment(s)" in { val uuid = "sendWithAttachments" - this ? (uuid, SendNotification(generateMail(uuid, Seq(attachment)))) await { + val cid = s"cid-$uuid" + this ? (uuid, SendNotification( + generateMail(uuid, Seq(attachment)).withCorrelationId(cid) + )) await { case n: NotificationSent => n.uuid shouldBe uuid case _ => fail() @@ -93,7 +114,8 @@ class SimpleMailNotificationsHandlerSpec "resend notification" in { val uuid = "resend" - this ? (uuid, SendNotification(generateMail(uuid))) await { + val cid = s"cid-$uuid" + this ? (uuid, SendNotification(generateMail(uuid).withCorrelationId(cid))) await { case n: NotificationSent => n.uuid shouldBe uuid this ? (uuid, ResendNotification(uuid)) await { @@ -111,7 +133,8 @@ class SimpleMailNotificationsHandlerSpec "retrieve notification status" in { val uuid = "status" - this ? (uuid, SendNotification(generateMail(uuid))) await { + val cid = s"cid-$uuid" + this ? (uuid, SendNotification(generateMail(uuid).withCorrelationId(cid))) await { case n: NotificationSent => n.uuid shouldBe uuid this ? (uuid, GetNotificationStatus(uuid)) await { @@ -125,7 +148,8 @@ class SimpleMailNotificationsHandlerSpec "trigger notification" in { val uuid = "trigger" - this ? (uuid, SendNotification(generateMail(uuid))) await { + val cid = s"cid-$uuid" + this ? (uuid, SendNotification(generateMail(uuid).withCorrelationId(cid))) await { case n: NotificationSent => n.uuid shouldBe uuid this ? (uuid, GetNotificationStatus(uuid)) await { @@ -133,7 +157,7 @@ class SimpleMailNotificationsHandlerSpec assert(n.uuid == uuid) succeed case _ => - probe.expectMessageType[Schedule4NotificationTriggered] + assert(probe.receiveMessage().schedule.correlationId.contains(uuid)) succeed } case _ => fail() @@ -142,8 +166,15 @@ class SimpleMailNotificationsHandlerSpec "add mail" in { val uuid = "mail" - assert(client.addMail(generateMail(uuid)).complete()) - assert(probe.receiveMessage().schedule.uuid == s"MailNotification#$uuid#NotificationTimerKey") + val cid = s"cid-$uuid" + assert(client.addMail(generateMail(uuid).withCorrelationId(cid)).complete()) + val message = probe.receiveMessage() + assert( + message.schedule.uuid == s"MailNotification#$uuid#NotificationTimerKey" + ) + assert( + message.schedule.correlationId.getOrElse("") == cid + ) } "emit a notification_sent audit line carrying the correlation id (Story 13.7)" in { From 7bf2282d3890c6e7b1d35590e6aba3f32d4f2c97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Tue, 16 Jun 2026 07:53:36 +0200 Subject: [PATCH 2/2] fix(audit): bind NotificationCommand type T + propagate schedule cid (Story 13.7 Phase B) - NotificationCommand overrides `type T = NotificationCommand` (sound withCorrelationId cast). - TriggerSchedule4Notification: cmd.schedule.correlationId.orElse(cmd.correlationId) so the synthetic stream cid is not dropped by the import-shadowing. - document that the notification_sent/notification_failed 'template' field is N/A at the notification-pod layer. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../softnetwork/notification/message/package.scala | 4 +++- .../persistence/typed/NotificationBehavior.scala | 11 ++++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/common/src/main/scala/app/softnetwork/notification/message/package.scala b/common/src/main/scala/app/softnetwork/notification/message/package.scala index b94079f..b6338e5 100644 --- a/common/src/main/scala/app/softnetwork/notification/message/package.scala +++ b/common/src/main/scala/app/softnetwork/notification/message/package.scala @@ -9,7 +9,9 @@ import app.softnetwork.notification.model.NotificationStatusResult */ package object message { - sealed trait NotificationCommand extends EntityCommand with AuditableCommand + sealed trait NotificationCommand extends EntityCommand with AuditableCommand { + override type T = NotificationCommand + } @SerialVersionUID(0L) case class ScheduleNotification() extends NotificationCommand with AllEntities diff --git a/core/src/main/scala/app/softnetwork/notification/persistence/typed/NotificationBehavior.scala b/core/src/main/scala/app/softnetwork/notification/persistence/typed/NotificationBehavior.scala index 6c268e9..83686c6 100644 --- a/core/src/main/scala/app/softnetwork/notification/persistence/typed/NotificationBehavior.scala +++ b/core/src/main/scala/app/softnetwork/notification/persistence/typed/NotificationBehavior.scala @@ -197,7 +197,10 @@ trait NotificationBehavior[T <: Notification] import cmd.schedule._ if (key == notificationTimerKey) { val trigger = ScheduleNotification() - correlationId.foreach(trigger.withCorrelationId) + // `import cmd.schedule._` shadows the bare `correlationId` with the schedule's; fall back + // to the synthetic cid stamped on the command by Scheduler2NotificationProcessorStream so + // it is not dropped (qualify cmd.correlationId explicitly to avoid the shadowing). + cmd.schedule.correlationId.orElse(cmd.correlationId).foreach(trigger.withCorrelationId) context.self ! trigger Effect.none.thenRun(_ => Schedule4NotificationTriggered(cmd.schedule) ~> replyTo) } else { @@ -502,6 +505,12 @@ trait NotificationBehavior[T <: Notification] if (maybeAckWithNumberOfRetries.isDefined) { val cid = updatedNotification.correlationId.getOrElse("-") val channel = updatedNotification.`type`.name + // Story 13.7 AC3 — the audit field catalog lists a `template` field for these lines, but a + // generic notification carries no template identifier at this layer: the Notification/Mail + // model (subject/message/richMessage) has no template/mustache field, and the producer + // (license-server) consumes the mustache name during rendering and never propagates it onto + // the Mail. The template attribution is therefore owned by the producer's own + // `notification_enqueued` audit line; `template` is N/A here, so only `channel` is emitted. updatedNotification.status match { case Sent | Delivered => audit.event(cid, "notification_sent", "channel" -> channel)