diff --git a/common/src/main/scala/app/softnetwork/notification/message/package.scala b/common/src/main/scala/app/softnetwork/notification/message/package.scala index 0f4c06e..b6338e5 100644 --- a/common/src/main/scala/app/softnetwork/notification/message/package.scala +++ b/common/src/main/scala/app/softnetwork/notification/message/package.scala @@ -9,10 +9,12 @@ import app.softnetwork.notification.model.NotificationStatusResult */ package object message { - sealed trait NotificationCommand extends EntityCommand + sealed trait NotificationCommand extends EntityCommand with AuditableCommand { + override type T = NotificationCommand + } @SerialVersionUID(0L) - case object ScheduleNotification extends NotificationCommand with AllEntities + case class ScheduleNotification() extends NotificationCommand with AllEntities @SerialVersionUID(0L) case class AddNotification[T <: Notification](notification: T) extends NotificationCommand { @@ -33,7 +35,9 @@ package object message { @SerialVersionUID(0L) case class GetNotificationStatus(id: String) extends NotificationCommand - case class TriggerSchedule4Notification(schedule: Schedule) extends NotificationCommand { + case class TriggerSchedule4Notification(schedule: Schedule) + extends NotificationCommand + with AuditableCommand { override val id: String = schedule.entityId } diff --git a/core/src/main/scala/app/softnetwork/notification/audit/NotificationAuditLog.scala b/core/src/main/scala/app/softnetwork/notification/audit/NotificationAuditLog.scala new file mode 100644 index 0000000..84bcf63 --- /dev/null +++ b/core/src/main/scala/app/softnetwork/notification/audit/NotificationAuditLog.scala @@ -0,0 +1,12 @@ +package app.softnetwork.notification.audit + +import app.softnetwork.persistence.audit.AuditLog + +object NotificationAuditLog { + + /** Story 13.7 — structured audit trail. service = "notification"; correlationId is threaded as + * data on the notification (proto field, exposed on the Notification trait), never via MDC. + */ + private[notification] lazy val audit: AuditLog = AuditLog("notification") + +} diff --git a/core/src/main/scala/app/softnetwork/notification/persistence/query/Scheduler2NotificationProcessorStream.scala b/core/src/main/scala/app/softnetwork/notification/persistence/query/Scheduler2NotificationProcessorStream.scala index 213fd71..f54224d 100644 --- a/core/src/main/scala/app/softnetwork/notification/persistence/query/Scheduler2NotificationProcessorStream.scala +++ b/core/src/main/scala/app/softnetwork/notification/persistence/query/Scheduler2NotificationProcessorStream.scala @@ -4,6 +4,7 @@ import akka.actor.typed.eventstream.EventStream.Publish import app.softnetwork.scheduler.model.Schedule import app.softnetwork.persistence.query.{JournalProvider, OffsetProvider} import app.softnetwork.scheduler.persistence.query.Scheduler2EntityProcessorStream +import app.softnetwork.notification.audit.NotificationAuditLog._ import app.softnetwork.notification.handlers.NotificationHandler import app.softnetwork.notification.message._ @@ -16,13 +17,39 @@ trait Scheduler2NotificationProcessorStream _: JournalProvider with OffsetProvider with NotificationHandler => override protected def triggerSchedule(schedule: Schedule): Future[Boolean] = { - !?(TriggerSchedule4Notification(schedule)) map { + val cid = schedule.correlationId.getOrElse( + s"schedule#${schedule.persistenceId}#${schedule.entityId}#${schedule.key}" + ) + val cmd = TriggerSchedule4Notification(schedule) + cmd.withCorrelationId(cid) + !?(cmd) map { case result: Schedule4NotificationTriggered => + audit.event( + cid, + "schedule_fired", + "actor" -> "scheduler", + "schedule_key" -> schedule.key + ) if (forTests) { system.eventStream.tell(Publish(result)) } true - case _ => false + case Schedule4NotificationNotTriggered => + audit.event( + cid, + "schedule_not_fired", + "actor" -> "scheduler", + "schedule_key" -> schedule.key + ) + false + case _ => + audit.event( + cid, + "schedule_failed", + "actor" -> "scheduler", + "schedule_key" -> schedule.key + ) + false } } } diff --git a/core/src/main/scala/app/softnetwork/notification/persistence/typed/NotificationBehavior.scala b/core/src/main/scala/app/softnetwork/notification/persistence/typed/NotificationBehavior.scala index 489a8c8..83686c6 100644 --- a/core/src/main/scala/app/softnetwork/notification/persistence/typed/NotificationBehavior.scala +++ b/core/src/main/scala/app/softnetwork/notification/persistence/typed/NotificationBehavior.scala @@ -3,6 +3,7 @@ package app.softnetwork.notification.persistence.typed import akka.actor.typed.scaladsl.{ActorContext, TimerScheduler} import akka.actor.typed.{ActorRef, ActorSystem} import akka.persistence.typed.scaladsl.Effect +import app.softnetwork.notification.audit.NotificationAuditLog._ import app.softnetwork.notification.config.NotificationSettings import org.slf4j.Logger import app.softnetwork.scheduler.message.SchedulerEvents.{ @@ -12,7 +13,6 @@ import app.softnetwork.scheduler.message.SchedulerEvents.{ } import app.softnetwork.scheduler.message.{AddSchedule, RemoveSchedule} import app.softnetwork.scheduler.model.Schedule -import app.softnetwork.persistence.audit.AuditLog import app.softnetwork.persistence.now import app.softnetwork.persistence.typed._ import app.softnetwork.notification.message._ @@ -46,11 +46,6 @@ trait NotificationBehavior[T <: Notification] private[this] val delay = 1 - /** Story 13.7 — structured audit trail. service = "notification"; correlationId is threaded as - * data on the notification (proto field, exposed on the Notification trait), never via MDC. - */ - private[this] lazy val audit: AuditLog = AuditLog("notification") - /** Set event tags, which will be used in persistence query * * @param entityId @@ -120,7 +115,8 @@ trait NotificationBehavior[T <: Notification] delay, Some(true), Some(now()), - None + None, + correlationId = notification.correlationId.orElse(cmd.correlationId) ) ) ) @@ -151,12 +147,24 @@ trait NotificationBehavior[T <: Notification] ) .thenRun(_ => { NotificationRemoved ~> replyTo }) //.thenStop() - case cmd: SendNotification[T] => sendNotification(entityId, cmd.notification, replyTo) + case cmd: SendNotification[T] => + sendNotification( + entityId, + cmd.notification, + cmd.notification.correlationId.orElse(cmd.correlationId), + replyTo + ) - case _: ResendNotification => + case cmd: ResendNotification => state match { - case Some(notification) => sendNotification(entityId, notification, replyTo) - case _ => Effect.none.thenRun(_ => NotificationNotFound ~> replyTo) + case Some(notification) => + sendNotification( + entityId, + notification, + notification.correlationId.orElse(cmd.correlationId), + replyTo + ) + case _ => Effect.none.thenRun(_ => NotificationNotFound ~> replyTo) } case _: GetNotificationStatus => @@ -188,15 +196,26 @@ trait NotificationBehavior[T <: Notification] case cmd: TriggerSchedule4Notification => import cmd.schedule._ if (key == notificationTimerKey) { - context.self ! ScheduleNotification + val trigger = ScheduleNotification() + // `import cmd.schedule._` shadows the bare `correlationId` with the schedule's; fall back + // to the synthetic cid stamped on the command by Scheduler2NotificationProcessorStream so + // it is not dropped (qualify cmd.correlationId explicitly to avoid the shadowing). + cmd.schedule.correlationId.orElse(cmd.correlationId).foreach(trigger.withCorrelationId) + context.self ! trigger Effect.none.thenRun(_ => Schedule4NotificationTriggered(cmd.schedule) ~> replyTo) } else { Effect.none.thenRun(_ => Schedule4NotificationNotTriggered ~> replyTo) } - case ScheduleNotification => + case cmd: ScheduleNotification => state match { - case Some(notification) => sendNotification(entityId, notification, replyTo) + case Some(notification) => + sendNotification( + entityId, + notification, + notification.correlationId.orElse(cmd.correlationId), + replyTo + ) case _ => // should never be the case Effect .persist( @@ -279,7 +298,8 @@ trait NotificationBehavior[T <: Notification] delay, Some(true), Some(now()), - None + None, + correlationId = correlationId ) ) ) @@ -361,6 +381,7 @@ trait NotificationBehavior[T <: Notification] private[this] def sendNotification( entityId: String, notification: T, + cid: Option[String], replyTo: Option[ActorRef[NotificationCommandResult]] )(implicit log: Logger, @@ -419,7 +440,8 @@ trait NotificationBehavior[T <: Notification] notification .withNbTries(nbTries) .copyWithAck(notificationAck) - case _ => notification + .withCorrelationId(cid.getOrElse("-")) + case _ => notification.withCorrelationId(cid.getOrElse("-")) } val event: Option[NotificationRecordedEvent] = @@ -483,6 +505,12 @@ trait NotificationBehavior[T <: Notification] if (maybeAckWithNumberOfRetries.isDefined) { val cid = updatedNotification.correlationId.getOrElse("-") val channel = updatedNotification.`type`.name + // Story 13.7 AC3 — the audit field catalog lists a `template` field for these lines, but a + // generic notification carries no template identifier at this layer: the Notification/Mail + // model (subject/message/richMessage) has no template/mustache field, and the producer + // (license-server) consumes the mustache name during rendering and never propagates it onto + // the Mail. The template attribution is therefore owned by the producer's own + // `notification_enqueued` audit line; `template` is N/A here, so only `channel` is emitted. updatedNotification.status match { case Sent | Delivered => audit.event(cid, "notification_sent", "channel" -> channel) diff --git a/testkit/src/test/resources/logback.xml b/testkit/src/test/resources/logback.xml new file mode 100644 index 0000000..53e0bde --- /dev/null +++ b/testkit/src/test/resources/logback.xml @@ -0,0 +1,76 @@ + + + + + notification.log + + notification_%d{yyyy-MM-dd}.log + + + [%date{ISO8601}] [%level] [%logger] [%marker] [%thread] - %msg MDC: {%mdc}%n + + + + + 8192 + true + + + + + + + %date{MM/dd HH:mm:ss} %-5level[%thread] %logger{1} - %msg%n + + + + {# Dedicated audit appender — SYNCHRONOUS, never wrapped by ASYNC/neverBlock (C4): audit lines + must never be silently dropped under backpressure. PVC-backed file, ~1y of daily rolls; Loki + holds the authoritative 1y via the promtail sidecar that tails this file → {stream="audit"}. #} + + audit.log + + audit-%d{yyyy-MM-dd}.log + 7 + {# Bound total on-disk audit size so a burst can't fill the log PVC. The appender is + synchronous/non-dropping, so a full disk would otherwise stall the emitting thread. #} + 1GB + + + {# Do NOT ServiceLoader-scan the classpath for Jackson modules. Under JDK 11+ the + transitive jackson-module-jaxb-annotations (from rapidoid/dumbster) would try to load + javax.xml.bind.annotation.XmlElement — removed from the JDK in Java 11 — and blow up + with NoClassDefFoundError. The encoder registers the modules it actually needs itself. #} + false + false + + (?i)[a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,}***@*** + (?i)\b(?:sk|pk|whsec|rk)_[a-z0-9_]+\b*** + (?i)bearer\s+[a-z0-9._/+=-]+Bearer *** + + + + + {# additivity=false so audit lines do NOT also propagate to root (no duplicate in app.log). + Audit routes ONLY to AUDIT_FILE; the sidecar tails it for {stream="audit"}. No STDOUT copy — + that would be re-shipped to {stream="app"} by the node promtail, double-ingesting every audit + line and polluting the operational stream. #} + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/testkit/src/test/scala/app/softnetwork/notification/handlers/SimpleMailNotificationsHandlerSpec.scala b/testkit/src/test/scala/app/softnetwork/notification/handlers/SimpleMailNotificationsHandlerSpec.scala index c63be10..5f6b7ef 100644 --- a/testkit/src/test/scala/app/softnetwork/notification/handlers/SimpleMailNotificationsHandlerSpec.scala +++ b/testkit/src/test/scala/app/softnetwork/notification/handlers/SimpleMailNotificationsHandlerSpec.scala @@ -35,11 +35,16 @@ class SimpleMailNotificationsHandlerSpec "add notification" in { val uuid = "add" - this ? (uuid, AddNotification(generateMail(uuid))) await { + val cid = s"cid-$uuid" + this ? (uuid, AddNotification(generateMail(uuid).withCorrelationId(cid))) await { case n: NotificationAdded => n.uuid shouldBe uuid + val message = probe.receiveMessage() assert( - probe.receiveMessage().schedule.uuid == s"MailNotification#$uuid#NotificationTimerKey" + message.schedule.uuid == s"MailNotification#$uuid#NotificationTimerKey" + ) + assert( + message.schedule.correlationId.getOrElse("") == cid ) case _ => fail() } @@ -47,11 +52,18 @@ class SimpleMailNotificationsHandlerSpec "add notification with attachment(s)" in { val uuid = "addWithAttachments" - this ? (uuid, AddNotification(generateMail(uuid, Seq(attachment)))) await { + val cid = s"cid-$uuid" + this ? (uuid, AddNotification( + generateMail(uuid, Seq(attachment)).withCorrelationId(cid) + )) await { case n: NotificationAdded => n.uuid shouldBe uuid + val message = probe.receiveMessage() + assert( + message.schedule.uuid == s"MailNotification#$uuid#NotificationTimerKey" + ) assert( - probe.receiveMessage().schedule.uuid == s"MailNotification#$uuid#NotificationTimerKey" + message.schedule.correlationId.getOrElse("") == cid ) case _ => fail() } @@ -59,11 +71,16 @@ class SimpleMailNotificationsHandlerSpec "remove notification" in { val uuid = "remove" - this ? (uuid, AddNotification(generateMail(uuid))) await { + val cid = s"cid-$uuid" + this ? (uuid, AddNotification(generateMail(uuid).withCorrelationId(cid))) await { case n: NotificationAdded => n.uuid shouldBe uuid + val message = probe.receiveMessage() + assert( + message.schedule.uuid == s"MailNotification#$uuid#NotificationTimerKey" + ) assert( - probe.receiveMessage().schedule.uuid == s"MailNotification#$uuid#NotificationTimerKey" + message.schedule.correlationId.getOrElse("") == cid ) this ? (uuid, RemoveNotification(uuid)) await { case _: NotificationRemoved.type => succeed @@ -75,7 +92,8 @@ class SimpleMailNotificationsHandlerSpec "send notification" in { val uuid = "send" - this ? (uuid, SendNotification(generateMail(uuid))) await { + val cid = s"cid-$uuid" + this ? (uuid, SendNotification(generateMail(uuid).withCorrelationId(cid))) await { case n: NotificationSent => assert(n.uuid == uuid) case _ => fail() @@ -84,7 +102,10 @@ class SimpleMailNotificationsHandlerSpec "send notification with attachment(s)" in { val uuid = "sendWithAttachments" - this ? (uuid, SendNotification(generateMail(uuid, Seq(attachment)))) await { + val cid = s"cid-$uuid" + this ? (uuid, SendNotification( + generateMail(uuid, Seq(attachment)).withCorrelationId(cid) + )) await { case n: NotificationSent => n.uuid shouldBe uuid case _ => fail() @@ -93,7 +114,8 @@ class SimpleMailNotificationsHandlerSpec "resend notification" in { val uuid = "resend" - this ? (uuid, SendNotification(generateMail(uuid))) await { + val cid = s"cid-$uuid" + this ? (uuid, SendNotification(generateMail(uuid).withCorrelationId(cid))) await { case n: NotificationSent => n.uuid shouldBe uuid this ? (uuid, ResendNotification(uuid)) await { @@ -111,7 +133,8 @@ class SimpleMailNotificationsHandlerSpec "retrieve notification status" in { val uuid = "status" - this ? (uuid, SendNotification(generateMail(uuid))) await { + val cid = s"cid-$uuid" + this ? (uuid, SendNotification(generateMail(uuid).withCorrelationId(cid))) await { case n: NotificationSent => n.uuid shouldBe uuid this ? (uuid, GetNotificationStatus(uuid)) await { @@ -125,7 +148,8 @@ class SimpleMailNotificationsHandlerSpec "trigger notification" in { val uuid = "trigger" - this ? (uuid, SendNotification(generateMail(uuid))) await { + val cid = s"cid-$uuid" + this ? (uuid, SendNotification(generateMail(uuid).withCorrelationId(cid))) await { case n: NotificationSent => n.uuid shouldBe uuid this ? (uuid, GetNotificationStatus(uuid)) await { @@ -133,7 +157,7 @@ class SimpleMailNotificationsHandlerSpec assert(n.uuid == uuid) succeed case _ => - probe.expectMessageType[Schedule4NotificationTriggered] + assert(probe.receiveMessage().schedule.correlationId.contains(uuid)) succeed } case _ => fail() @@ -142,8 +166,15 @@ class SimpleMailNotificationsHandlerSpec "add mail" in { val uuid = "mail" - assert(client.addMail(generateMail(uuid)).complete()) - assert(probe.receiveMessage().schedule.uuid == s"MailNotification#$uuid#NotificationTimerKey") + val cid = s"cid-$uuid" + assert(client.addMail(generateMail(uuid).withCorrelationId(cid)).complete()) + val message = probe.receiveMessage() + assert( + message.schedule.uuid == s"MailNotification#$uuid#NotificationTimerKey" + ) + assert( + message.schedule.correlationId.getOrElse("") == cid + ) } "emit a notification_sent audit line carrying the correlation id (Story 13.7)" in {