Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import app.softnetwork.notification.model.NotificationStatusResult
*/
package object message {

sealed trait NotificationCommand extends EntityCommand
sealed trait NotificationCommand extends EntityCommand with AuditableCommand {
override type T = NotificationCommand
}

@SerialVersionUID(0L)
case object ScheduleNotification extends NotificationCommand with AllEntities
case class ScheduleNotification() extends NotificationCommand with AllEntities

@SerialVersionUID(0L)
case class AddNotification[T <: Notification](notification: T) extends NotificationCommand {
Expand All @@ -33,7 +35,9 @@ package object message {
@SerialVersionUID(0L)
case class GetNotificationStatus(id: String) extends NotificationCommand

case class TriggerSchedule4Notification(schedule: Schedule) extends NotificationCommand {
case class TriggerSchedule4Notification(schedule: Schedule)
extends NotificationCommand
with AuditableCommand {
override val id: String = schedule.entityId
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package app.softnetwork.notification.audit

import app.softnetwork.persistence.audit.AuditLog

object NotificationAuditLog {

/** Story 13.7 — structured audit trail. service = "notification"; correlationId is threaded as
* data on the notification (proto field, exposed on the Notification trait), never via MDC.
*/
private[notification] lazy val audit: AuditLog = AuditLog("notification")

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import akka.actor.typed.eventstream.EventStream.Publish
import app.softnetwork.scheduler.model.Schedule
import app.softnetwork.persistence.query.{JournalProvider, OffsetProvider}
import app.softnetwork.scheduler.persistence.query.Scheduler2EntityProcessorStream
import app.softnetwork.notification.audit.NotificationAuditLog._
import app.softnetwork.notification.handlers.NotificationHandler
import app.softnetwork.notification.message._

Expand All @@ -16,13 +17,39 @@ trait Scheduler2NotificationProcessorStream
_: JournalProvider with OffsetProvider with NotificationHandler =>

override protected def triggerSchedule(schedule: Schedule): Future[Boolean] = {
!?(TriggerSchedule4Notification(schedule)) map {
val cid = schedule.correlationId.getOrElse(
s"schedule#${schedule.persistenceId}#${schedule.entityId}#${schedule.key}"
)
val cmd = TriggerSchedule4Notification(schedule)
cmd.withCorrelationId(cid)
!?(cmd) map {
case result: Schedule4NotificationTriggered =>
audit.event(
cid,
"schedule_fired",
"actor" -> "scheduler",
"schedule_key" -> schedule.key
)
if (forTests) {
system.eventStream.tell(Publish(result))
}
true
case _ => false
case Schedule4NotificationNotTriggered =>
audit.event(
cid,
"schedule_not_fired",
"actor" -> "scheduler",
"schedule_key" -> schedule.key
)
false
case _ =>
audit.event(
cid,
"schedule_failed",
"actor" -> "scheduler",
"schedule_key" -> schedule.key
)
false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package app.softnetwork.notification.persistence.typed
import akka.actor.typed.scaladsl.{ActorContext, TimerScheduler}
import akka.actor.typed.{ActorRef, ActorSystem}
import akka.persistence.typed.scaladsl.Effect
import app.softnetwork.notification.audit.NotificationAuditLog._
import app.softnetwork.notification.config.NotificationSettings
import org.slf4j.Logger
import app.softnetwork.scheduler.message.SchedulerEvents.{
Expand All @@ -12,7 +13,6 @@ import app.softnetwork.scheduler.message.SchedulerEvents.{
}
import app.softnetwork.scheduler.message.{AddSchedule, RemoveSchedule}
import app.softnetwork.scheduler.model.Schedule
import app.softnetwork.persistence.audit.AuditLog
import app.softnetwork.persistence.now
import app.softnetwork.persistence.typed._
import app.softnetwork.notification.message._
Expand Down Expand Up @@ -46,11 +46,6 @@ trait NotificationBehavior[T <: Notification]

private[this] val delay = 1

/** Story 13.7 — structured audit trail. service = "notification"; correlationId is threaded as
* data on the notification (proto field, exposed on the Notification trait), never via MDC.
*/
private[this] lazy val audit: AuditLog = AuditLog("notification")

/** Set event tags, which will be used in persistence query
*
* @param entityId
Expand Down Expand Up @@ -120,7 +115,8 @@ trait NotificationBehavior[T <: Notification]
delay,
Some(true),
Some(now()),
None
None,
correlationId = notification.correlationId.orElse(cmd.correlationId)
)
)
)
Expand Down Expand Up @@ -151,12 +147,24 @@ trait NotificationBehavior[T <: Notification]
)
.thenRun(_ => { NotificationRemoved ~> replyTo }) //.thenStop()

case cmd: SendNotification[T] => sendNotification(entityId, cmd.notification, replyTo)
case cmd: SendNotification[T] =>
sendNotification(
entityId,
cmd.notification,
cmd.notification.correlationId.orElse(cmd.correlationId),
replyTo
)

case _: ResendNotification =>
case cmd: ResendNotification =>
state match {
case Some(notification) => sendNotification(entityId, notification, replyTo)
case _ => Effect.none.thenRun(_ => NotificationNotFound ~> replyTo)
case Some(notification) =>
sendNotification(
entityId,
notification,
notification.correlationId.orElse(cmd.correlationId),
replyTo
)
case _ => Effect.none.thenRun(_ => NotificationNotFound ~> replyTo)
}

case _: GetNotificationStatus =>
Expand Down Expand Up @@ -188,15 +196,26 @@ trait NotificationBehavior[T <: Notification]
case cmd: TriggerSchedule4Notification =>
import cmd.schedule._
if (key == notificationTimerKey) {
context.self ! ScheduleNotification
val trigger = ScheduleNotification()
// `import cmd.schedule._` shadows the bare `correlationId` with the schedule's; fall back
// to the synthetic cid stamped on the command by Scheduler2NotificationProcessorStream so
// it is not dropped (qualify cmd.correlationId explicitly to avoid the shadowing).
cmd.schedule.correlationId.orElse(cmd.correlationId).foreach(trigger.withCorrelationId)
context.self ! trigger
Effect.none.thenRun(_ => Schedule4NotificationTriggered(cmd.schedule) ~> replyTo)
} else {
Effect.none.thenRun(_ => Schedule4NotificationNotTriggered ~> replyTo)
}

case ScheduleNotification =>
case cmd: ScheduleNotification =>
state match {
case Some(notification) => sendNotification(entityId, notification, replyTo)
case Some(notification) =>
sendNotification(
entityId,
notification,
notification.correlationId.orElse(cmd.correlationId),
replyTo
)
case _ => // should never be the case
Effect
.persist(
Expand Down Expand Up @@ -279,7 +298,8 @@ trait NotificationBehavior[T <: Notification]
delay,
Some(true),
Some(now()),
None
None,
correlationId = correlationId
)
)
)
Expand Down Expand Up @@ -361,6 +381,7 @@ trait NotificationBehavior[T <: Notification]
private[this] def sendNotification(
entityId: String,
notification: T,
cid: Option[String],
replyTo: Option[ActorRef[NotificationCommandResult]]
)(implicit
log: Logger,
Expand Down Expand Up @@ -419,7 +440,8 @@ trait NotificationBehavior[T <: Notification]
notification
.withNbTries(nbTries)
.copyWithAck(notificationAck)
case _ => notification
.withCorrelationId(cid.getOrElse("-"))
case _ => notification.withCorrelationId(cid.getOrElse("-"))
}

val event: Option[NotificationRecordedEvent] =
Expand Down Expand Up @@ -483,6 +505,12 @@ trait NotificationBehavior[T <: Notification]
if (maybeAckWithNumberOfRetries.isDefined) {
val cid = updatedNotification.correlationId.getOrElse("-")
val channel = updatedNotification.`type`.name
// Story 13.7 AC3 — the audit field catalog lists a `template` field for these lines, but a
// generic notification carries no template identifier at this layer: the Notification/Mail
// model (subject/message/richMessage) has no template/mustache field, and the producer
// (license-server) consumes the mustache name during rendering and never propagates it onto
// the Mail. The template attribution is therefore owned by the producer's own
// `notification_enqueued` audit line; `template` is N/A here, so only `channel` is emitted.
updatedNotification.status match {
case Sent | Delivered =>
audit.event(cid, "notification_sent", "channel" -> channel)
Expand Down
76 changes: 76 additions & 0 deletions testkit/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>

<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>notification.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>notification_%d{yyyy-MM-dd}.log</fileNamePattern>
</rollingPolicy>
<encoder>
<pattern>[%date{ISO8601}] [%level] [%logger] [%marker] [%thread] - %msg MDC: {%mdc}%n</pattern>
</encoder>
</appender>

<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
<queueSize>8192</queueSize>
<neverBlock>true</neverBlock>
<appender-ref ref="FILE" />
</appender>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%date{MM/dd HH:mm:ss} %-5level[%thread] %logger{1} - %msg%n</pattern>
</encoder>
</appender>

{# Dedicated audit appender — SYNCHRONOUS, never wrapped by ASYNC/neverBlock (C4): audit lines
must never be silently dropped under backpressure. PVC-backed file, ~1y of daily rolls; Loki
holds the authoritative 1y via the promtail sidecar that tails this file → {stream="audit"}. #}
<appender name="AUDIT_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>audit.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>audit-%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>7</maxHistory>
{# Bound total on-disk audit size so a burst can't fill the log PVC. The appender is
synchronous/non-dropping, so a full disk would otherwise stall the emitting thread. #}
<totalSizeCap>1GB</totalSizeCap>
</rollingPolicy>
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
{# Do NOT ServiceLoader-scan the classpath for Jackson modules. Under JDK 11+ the
transitive jackson-module-jaxb-annotations (from rapidoid/dumbster) would try to load
javax.xml.bind.annotation.XmlElement — removed from the JDK in Java 11 — and blow up
with NoClassDefFoundError. The encoder registers the modules it actually needs itself. #}
<findAndRegisterJacksonModules>false</findAndRegisterJacksonModules>
<includeMdc>false</includeMdc>
<jsonGeneratorDecorator class="net.logstash.logback.mask.MaskingJsonGeneratorDecorator">
<valueMask><value>(?i)[a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,}</value><mask>***@***</mask></valueMask>
<valueMask><value>(?i)\b(?:sk|pk|whsec|rk)_[a-z0-9_]+\b</value><mask>***</mask></valueMask>
<valueMask><value>(?i)bearer\s+[a-z0-9._/+=-]+</value><mask>Bearer ***</mask></valueMask>
</jsonGeneratorDecorator>
</encoder>
</appender>

{# additivity=false so audit lines do NOT also propagate to root (no duplicate in app.log).
Audit routes ONLY to AUDIT_FILE; the sidecar tails it for {stream="audit"}. No STDOUT copy —
that would be re-shipped to {stream="app"} by the node promtail, double-ingesting every audit
line and polluting the operational stream. #}
<logger name="app.softnetwork.audit" level="INFO" additivity="false">
<appender-ref ref="AUDIT_FILE"/>
</logger>

<logger name="akka" level="INFO"/>

<logger name="slick" level="WARN"/>
<logger name="com.zaxxer" level="WARN"/>

<logger name="app.softnetwork" level="INFO"/>

<logger name="com.github" level="INFO"/>

<root level="INFO">
<appender-ref ref="ASYNC"/>
<appender-ref ref="STDOUT"/>
</root>
</configuration>
Loading
Loading