Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,10 @@ target
*.pyc
.vagrant
.metals
*.log
*.log
.bloop/
.jvmopts
.sbtopts
*.sc
.bsp/
.vscode/
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ lazy val core = project.in(file("core"))
.settings(
Defaults.itSettings,
app.softnetwork.Info.infoSettings,
moduleSettings
moduleSettings,
// Story 13.7 — AuditLog uses net.logstash.logback.argument.StructuredArguments.kv; co-located
// with the Auditable/AuditableEvent carriers in this module.
libraryDependencies += "net.logstash.logback" % "logstash-logback-encoder" % Versions.logstashEncoder
)
.dependsOn(
common % "compile->compile;test->test;it->it"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package app.softnetwork.persistence.audit

import net.logstash.logback.argument.StructuredArguments.kv
import org.slf4j.LoggerFactory

/** Story 13.7 — generic structured audit logger (cross-service audit trail).
*
* Every service emits to ONE fixed audit logger name ([[AuditLog.LoggerName]]) so the **shared**
* `logback.xml` template needs a single `additivity=false` route to the synchronous, non-dropping
* `audit.log` appender. Each pod still writes its own pod-local `audit.log` (separate files
* because separate pods); its promtail sidecar tails it → Loki `{stream="audit"}`. The logical
* domain is carried by the **`service` JSON field** (not the logger name, not the file), so
* licensing / notification / payment / scheduler lines are distinguished at query time while
* sharing one route.
*
* `correlationId` is passed **explicitly** (threaded from the command/event as plain data) and
* emitted as a structured `correlation_id` field — it is deliberately NOT read from MDC, because
* the projection streams emit audit lines from `Future` continuations where a `ThreadLocal` MDC
* value would not survive (Story 13.7 C2). MDC is reserved for the synchronous HTTP request thread
* (see `app.softnetwork.api.server.HttpCorrelation`).
*
* PII / secret masking (emails, `sk_`/`pk_`/`whsec_`/`Bearer` tokens) is enforced downstream at
* the logback `MaskingJsonGeneratorDecorator`, so callers never need to pre-redact; code must
* still never pass a raw secret to a logger (defence in depth).
*
* @param serviceOf
* maps an `eventType` to its logical business domain (the `service` field). A single-domain pod
* passes a constant (see [[AuditLog.apply(service:String)*]]); a pod that emits lines for
* several domains (e.g. the licensing pod) passes a resolver. A caller may still override per
* call by passing an explicit `"service"` entry in `fields`.
* @param defaultActor
* the `actor` for machine-initiated flows when a caller does not override it.
*/
class AuditLog(
serviceOf: String => String,
defaultActor: String = AuditLog.DefaultActor
) {

private val log = LoggerFactory.getLogger(AuditLog.LoggerName)

/** Emit an audit event.
*
* Always emits `event_type`, `correlation_id`, `service` (from [[serviceOf]]) and `actor`
* (default [[defaultActor]]); `service` / `actor` can be overridden by passing a `"service"` /
* `"actor"` entry in `fields`.
*
* @param correlationId
* the cross-service correlation id (threaded as data; `"-"` when unknown)
* @param eventType
* the catalog event type, e.g. `license_issued`, `notification_sent`
* @param fields
* additional structured fields (`organization_id`, `channel`, `template`, …); values are
* emitted verbatim and masked downstream by the encoder
*/
def event(correlationId: String, eventType: String, fields: (String, Any)*): Unit = {
val provided = fields.iterator.map(_._1).toSet
val base = List.newBuilder[(String, Any)]
base += "event_type" -> eventType
base += "correlation_id" -> correlationId
if (!provided.contains("service")) base += "service" -> serviceOf(eventType)
if (!provided.contains("actor")) base += "actor" -> defaultActor
val args: Seq[AnyRef] = (base.result() ++ fields).map { case (k, v) => kv(k, v) }
// The "audit" message has no `{}` placeholders on purpose — logstash-logback-encoder still
// appends every StructuredArgument as a top-level JSON field. Binds slf4j info(String, Object...).
log.info("audit", args: _*)
}
}

object AuditLog {

/** The single fixed audit logger name — one `additivity=false` route in the shared `logback.xml`
* template feeds every service's pod-local `audit.log`; the `service` field distinguishes
* domains.
*/
final val LoggerName = "app.softnetwork.audit"

/** Default `actor` for machine-initiated flows. */
final val DefaultActor = "system"

/** Audit logger for a single-domain pod (e.g. notification) — every line carries `service`. */
def apply(service: String): AuditLog =
new AuditLog(_ => service)

/** Audit logger for a pod that emits lines for several domains (e.g. the licensing pod) —
* `service` is resolved per `eventType`.
*/
def apply(serviceOf: String => String): AuditLog =
new AuditLog(serviceOf)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package app.softnetwork.persistence.audit

import ch.qos.logback.classic.{Level, Logger}
import ch.qos.logback.classic.spi.ILoggingEvent
import ch.qos.logback.core.read.ListAppender
import org.scalatest.BeforeAndAfterEach
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import org.slf4j.LoggerFactory

/** Story 13.7 — proves the generic AuditLog assembles the always-present structured fields
* (`event_type`, `correlation_id`, `service`, `actor`) plus caller fields, resolves `service` from
* the eventType, and lets a caller override `service` / `actor`. Field rendering is captured via a
* logback `ListAppender` on the shared audit logger (no LogstashEncoder needed at unit level).
*/
class AuditLogSpec extends AnyWordSpec with Matchers with BeforeAndAfterEach {

private val logger = LoggerFactory.getLogger(AuditLog.LoggerName).asInstanceOf[Logger]
private val appender = new ListAppender[ILoggingEvent]()

override def beforeEach(): Unit = {
appender.list.clear()
appender.start()
logger.addAppender(appender)
logger.setLevel(Level.INFO)
}

override def afterEach(): Unit = {
logger.detachAppender(appender)
appender.stop()
}

private def lastEvent: ILoggingEvent = appender.list.get(appender.list.size() - 1)

// StructuredArguments.kv("k", v).toString renders as "k=v"; rebuild the field map from the args.
private def lastArgs: Map[String, String] =
lastEvent.getArgumentArray
.map(_.toString)
.map { s =>
val i = s.indexOf('=')
s.substring(0, i) -> s.substring(i + 1)
}
.toMap

"AuditLog (fixed service)" should {
"emit event_type, correlation_id, service, the default actor and custom fields" in {
AuditLog("notification").event(
"cid-1",
"notification_sent",
"channel" -> "email",
"template" -> "welcome.mustache"
)
lastEvent.getMessage shouldBe "audit"
val m = lastArgs
m("event_type") shouldBe "notification_sent"
m("correlation_id") shouldBe "cid-1"
m("service") shouldBe "notification"
m("actor") shouldBe AuditLog.DefaultActor
m("channel") shouldBe "email"
m("template") shouldBe "welcome.mustache"
}
}

"AuditLog (per-eventType resolver)" should {
"derive service from the eventType" in {
val audit =
AuditLog((et: String) => if (et.startsWith("schedule")) "scheduler" else "licensing")
audit.event("cid-2", "schedule_fired", "schedule_key" -> "renewal")
lastArgs("service") shouldBe "scheduler"
audit.event("cid-3", "license_issued")
lastArgs("service") shouldBe "licensing"
}
}

"AuditLog override" should {
"let a caller override service and actor via fields" in {
AuditLog("licensing").event(
"cid-4",
"schedule_fired",
"service" -> "scheduler",
"actor" -> "scheduler"
)
val m = lastArgs
m("service") shouldBe "scheduler"
m("actor") shouldBe "scheduler"
}
}
}
11 changes: 8 additions & 3 deletions project/Versions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ object Versions {

val akkaHttpSession = "0.7.1" // 0.7.0 -> 0.7.1

val tapir = "1.7.0"
val tapir = "1.8.5"

val tapirHttpSession = "0.2.0"
val tapirHttpSession = "0.3.0"

val akkaPersistenceJdbc = "5.0.4" // TODO 5.0.4 -> 5.2.1

Expand All @@ -36,7 +36,12 @@ object Versions {

val scalaLogging = "3.9.2"

val logback = "1.2.3" // TODO 1.2.3 -> 1.5.6
val logback = "1.4.14" // TODO 1.4.14 -> 1.5.6

// Story 13.7 — structured audit trail. Provides StructuredArguments.kv (used by AuditLog) and, at
// runtime in the service images, LogstashEncoder + MaskingJsonGeneratorDecorator. 8.1 is the newest
// line on logback 1.3/1.4 + Jackson 2.x (Java 11+); 9.x requires logback 1.5 + Jackson 3.
val logstashEncoder = "8.1"

val slf4j = "1.7.36"

Expand Down
55 changes: 26 additions & 29 deletions server/src/main/scala/app/softnetwork/api/server/ApiRoutes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,7 @@ import java.util.concurrent.TimeoutException
import akka.actor.typed.ActorSystem
import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.model.{HttpResponse, StatusCodes}
import akka.http.scaladsl.server.{
AuthorizationFailedRejection,
Directives,
ExceptionHandler,
MethodRejection,
MissingCookieRejection,
RejectionHandler,
Route,
ValidationRejection
}
import akka.http.scaladsl.server.{Directives, ExceptionHandler, RejectionHandler, Route}
import akka.http.scaladsl.settings.RoutingSettings
import app.softnetwork.api.server.config.ServerSettings
import org.json4s.Formats
Expand Down Expand Up @@ -58,26 +49,32 @@ trait ApiRoutes extends Directives with GrpcServices with DefaultComplete {
// PrometheusRegistry.defaultRegistry. Wraps the WHOLE pipeline (outside handleRejections /
// handleExceptions) so the final response — rejection/exception ones included — is observed.
HttpMetrics.withMetrics {
handleRejections(rejectionHandler) {
handleExceptions(exceptionHandler) {
logRequestResult("RestAll") {
pathPrefix(config.ServerSettings.RootPath) {
Try(
respondWithHeaders(RawHeader("Api-Version", applicationVersion)) {
routes
}
) match {
case Success(s) => s
case Failure(f) =>
log.error(f.getMessage, f.getCause)
complete(
HttpResponse(
StatusCodes.InternalServerError,
entity = f.getMessage
// Story 13.7 Phase B (gate #1) — extract-or-generate X-Correlation-Id, MDC-stamp the
// synchronous request thread, re-inject the canonical id onto the request (so downstream tapir
// `HttpCorrelation.correlationInput` reads it), and echo it on the response. Inside withMetrics
// so the latency timing still wraps the whole pipeline.
HttpCorrelation.withCorrelation {
handleRejections(rejectionHandler) {
handleExceptions(exceptionHandler) {
logRequestResult("RestAll") {
pathPrefix(config.ServerSettings.RootPath) {
Try(
respondWithHeaders(RawHeader("Api-Version", applicationVersion)) {
routes
}
) match {
case Success(s) => s
case Failure(f) =>
log.error(f.getMessage, f.getCause)
complete(
HttpResponse(
StatusCodes.InternalServerError,
entity = f.getMessage
)
)
)
}
} ~ grpcRoutes(system)
}
} ~ grpcRoutes(system)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ object Endpoint {
endpoints: List[ServerEndpoint[AkkaStreams with WebSockets, Future]]
)(implicit
ec: ExecutionContext
): Route = AkkaHttpServerInterpreter().toRoute(endpoints)
): Route =
// Story 13.7 — wrap the interpreted tapir route so every endpoint set generates-or-extracts the
// correlation id, echoes it on the response and feeds HttpCorrelation.correlationInput, even when
// mounted outside ApiRoutes.mainRoutes. withCorrelation is re-entrant, so nesting under mainRoutes
// stays a single header + one MDC scope.
HttpCorrelation.withCorrelation(AkkaHttpServerInterpreter().toRoute(endpoints))

}
Loading
Loading