diff --git a/.gitignore b/.gitignore index d5586b3..45f09d6 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,10 @@ target *.pyc .vagrant .metals -*.log \ No newline at end of file +*.log +.bloop/ +.jvmopts +.sbtopts +*.sc +.bsp/ +.vscode/ diff --git a/build.sbt b/build.sbt index 7611c3a..fde3427 100644 --- a/build.sbt +++ b/build.sbt @@ -97,7 +97,10 @@ lazy val core = project.in(file("core")) .settings( Defaults.itSettings, app.softnetwork.Info.infoSettings, - moduleSettings + moduleSettings, + // Story 13.7 — AuditLog uses net.logstash.logback.argument.StructuredArguments.kv; co-located + // with the Auditable/AuditableEvent carriers in this module. + libraryDependencies += "net.logstash.logback" % "logstash-logback-encoder" % Versions.logstashEncoder ) .dependsOn( common % "compile->compile;test->test;it->it" diff --git a/core/src/main/scala/app/softnetwork/persistence/audit/AuditLog.scala b/core/src/main/scala/app/softnetwork/persistence/audit/AuditLog.scala new file mode 100644 index 0000000..a6ec247 --- /dev/null +++ b/core/src/main/scala/app/softnetwork/persistence/audit/AuditLog.scala @@ -0,0 +1,89 @@ +package app.softnetwork.persistence.audit + +import net.logstash.logback.argument.StructuredArguments.kv +import org.slf4j.LoggerFactory + +/** Story 13.7 — generic structured audit logger (cross-service audit trail). + * + * Every service emits to ONE fixed audit logger name ([[AuditLog.LoggerName]]) so the **shared** + * `logback.xml` template needs a single `additivity=false` route to the synchronous, non-dropping + * `audit.log` appender. Each pod still writes its own pod-local `audit.log` (separate files + * because separate pods); its promtail sidecar tails it → Loki `{stream="audit"}`. The logical + * domain is carried by the **`service` JSON field** (not the logger name, not the file), so + * licensing / notification / payment / scheduler lines are distinguished at query time while + * sharing one route. + * + * `correlationId` is passed **explicitly** (threaded from the command/event as plain data) and + * emitted as a structured `correlation_id` field — it is deliberately NOT read from MDC, because + * the projection streams emit audit lines from `Future` continuations where a `ThreadLocal` MDC + * value would not survive (Story 13.7 C2). MDC is reserved for the synchronous HTTP request thread + * (see `app.softnetwork.api.server.HttpCorrelation`). + * + * PII / secret masking (emails, `sk_`/`pk_`/`whsec_`/`Bearer` tokens) is enforced downstream at + * the logback `MaskingJsonGeneratorDecorator`, so callers never need to pre-redact; code must + * still never pass a raw secret to a logger (defence in depth). + * + * @param serviceOf + * maps an `eventType` to its logical business domain (the `service` field). A single-domain pod + * passes a constant (see [[AuditLog.apply(service:String)*]]); a pod that emits lines for + * several domains (e.g. the licensing pod) passes a resolver. A caller may still override per + * call by passing an explicit `"service"` entry in `fields`. + * @param defaultActor + * the `actor` for machine-initiated flows when a caller does not override it. + */ +class AuditLog( + serviceOf: String => String, + defaultActor: String = AuditLog.DefaultActor +) { + + private val log = LoggerFactory.getLogger(AuditLog.LoggerName) + + /** Emit an audit event. + * + * Always emits `event_type`, `correlation_id`, `service` (from [[serviceOf]]) and `actor` + * (default [[defaultActor]]); `service` / `actor` can be overridden by passing a `"service"` / + * `"actor"` entry in `fields`. + * + * @param correlationId + * the cross-service correlation id (threaded as data; `"-"` when unknown) + * @param eventType + * the catalog event type, e.g. `license_issued`, `notification_sent` + * @param fields + * additional structured fields (`organization_id`, `channel`, `template`, …); values are + * emitted verbatim and masked downstream by the encoder + */ + def event(correlationId: String, eventType: String, fields: (String, Any)*): Unit = { + val provided = fields.iterator.map(_._1).toSet + val base = List.newBuilder[(String, Any)] + base += "event_type" -> eventType + base += "correlation_id" -> correlationId + if (!provided.contains("service")) base += "service" -> serviceOf(eventType) + if (!provided.contains("actor")) base += "actor" -> defaultActor + val args: Seq[AnyRef] = (base.result() ++ fields).map { case (k, v) => kv(k, v) } + // The "audit" message has no `{}` placeholders on purpose — logstash-logback-encoder still + // appends every StructuredArgument as a top-level JSON field. Binds slf4j info(String, Object...). + log.info("audit", args: _*) + } +} + +object AuditLog { + + /** The single fixed audit logger name — one `additivity=false` route in the shared `logback.xml` + * template feeds every service's pod-local `audit.log`; the `service` field distinguishes + * domains. + */ + final val LoggerName = "app.softnetwork.audit" + + /** Default `actor` for machine-initiated flows. */ + final val DefaultActor = "system" + + /** Audit logger for a single-domain pod (e.g. notification) — every line carries `service`. */ + def apply(service: String): AuditLog = + new AuditLog(_ => service) + + /** Audit logger for a pod that emits lines for several domains (e.g. the licensing pod) — + * `service` is resolved per `eventType`. + */ + def apply(serviceOf: String => String): AuditLog = + new AuditLog(serviceOf) +} diff --git a/core/src/test/scala/app/softnetwork/persistence/audit/AuditLogSpec.scala b/core/src/test/scala/app/softnetwork/persistence/audit/AuditLogSpec.scala new file mode 100644 index 0000000..a1925b4 --- /dev/null +++ b/core/src/test/scala/app/softnetwork/persistence/audit/AuditLogSpec.scala @@ -0,0 +1,88 @@ +package app.softnetwork.persistence.audit + +import ch.qos.logback.classic.{Level, Logger} +import ch.qos.logback.classic.spi.ILoggingEvent +import ch.qos.logback.core.read.ListAppender +import org.scalatest.BeforeAndAfterEach +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec +import org.slf4j.LoggerFactory + +/** Story 13.7 — proves the generic AuditLog assembles the always-present structured fields + * (`event_type`, `correlation_id`, `service`, `actor`) plus caller fields, resolves `service` from + * the eventType, and lets a caller override `service` / `actor`. Field rendering is captured via a + * logback `ListAppender` on the shared audit logger (no LogstashEncoder needed at unit level). + */ +class AuditLogSpec extends AnyWordSpec with Matchers with BeforeAndAfterEach { + + private val logger = LoggerFactory.getLogger(AuditLog.LoggerName).asInstanceOf[Logger] + private val appender = new ListAppender[ILoggingEvent]() + + override def beforeEach(): Unit = { + appender.list.clear() + appender.start() + logger.addAppender(appender) + logger.setLevel(Level.INFO) + } + + override def afterEach(): Unit = { + logger.detachAppender(appender) + appender.stop() + } + + private def lastEvent: ILoggingEvent = appender.list.get(appender.list.size() - 1) + + // StructuredArguments.kv("k", v).toString renders as "k=v"; rebuild the field map from the args. + private def lastArgs: Map[String, String] = + lastEvent.getArgumentArray + .map(_.toString) + .map { s => + val i = s.indexOf('=') + s.substring(0, i) -> s.substring(i + 1) + } + .toMap + + "AuditLog (fixed service)" should { + "emit event_type, correlation_id, service, the default actor and custom fields" in { + AuditLog("notification").event( + "cid-1", + "notification_sent", + "channel" -> "email", + "template" -> "welcome.mustache" + ) + lastEvent.getMessage shouldBe "audit" + val m = lastArgs + m("event_type") shouldBe "notification_sent" + m("correlation_id") shouldBe "cid-1" + m("service") shouldBe "notification" + m("actor") shouldBe AuditLog.DefaultActor + m("channel") shouldBe "email" + m("template") shouldBe "welcome.mustache" + } + } + + "AuditLog (per-eventType resolver)" should { + "derive service from the eventType" in { + val audit = + AuditLog((et: String) => if (et.startsWith("schedule")) "scheduler" else "licensing") + audit.event("cid-2", "schedule_fired", "schedule_key" -> "renewal") + lastArgs("service") shouldBe "scheduler" + audit.event("cid-3", "license_issued") + lastArgs("service") shouldBe "licensing" + } + } + + "AuditLog override" should { + "let a caller override service and actor via fields" in { + AuditLog("licensing").event( + "cid-4", + "schedule_fired", + "service" -> "scheduler", + "actor" -> "scheduler" + ) + val m = lastArgs + m("service") shouldBe "scheduler" + m("actor") shouldBe "scheduler" + } + } +} diff --git a/project/Versions.scala b/project/Versions.scala index 0cfa4e6..228fe4c 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -12,9 +12,9 @@ object Versions { val akkaHttpSession = "0.7.1" // 0.7.0 -> 0.7.1 - val tapir = "1.7.0" + val tapir = "1.8.5" - val tapirHttpSession = "0.2.0" + val tapirHttpSession = "0.3.0" val akkaPersistenceJdbc = "5.0.4" // TODO 5.0.4 -> 5.2.1 @@ -36,7 +36,12 @@ object Versions { val scalaLogging = "3.9.2" - val logback = "1.2.3" // TODO 1.2.3 -> 1.5.6 + val logback = "1.4.14" // TODO 1.4.14 -> 1.5.6 + + // Story 13.7 — structured audit trail. Provides StructuredArguments.kv (used by AuditLog) and, at + // runtime in the service images, LogstashEncoder + MaskingJsonGeneratorDecorator. 8.1 is the newest + // line on logback 1.3/1.4 + Jackson 2.x (Java 11+); 9.x requires logback 1.5 + Jackson 3. + val logstashEncoder = "8.1" val slf4j = "1.7.36" diff --git a/server/src/main/scala/app/softnetwork/api/server/ApiRoutes.scala b/server/src/main/scala/app/softnetwork/api/server/ApiRoutes.scala index 5f9acc5..d8762a0 100644 --- a/server/src/main/scala/app/softnetwork/api/server/ApiRoutes.scala +++ b/server/src/main/scala/app/softnetwork/api/server/ApiRoutes.scala @@ -4,16 +4,7 @@ import java.util.concurrent.TimeoutException import akka.actor.typed.ActorSystem import akka.http.scaladsl.model.headers.RawHeader import akka.http.scaladsl.model.{HttpResponse, StatusCodes} -import akka.http.scaladsl.server.{ - AuthorizationFailedRejection, - Directives, - ExceptionHandler, - MethodRejection, - MissingCookieRejection, - RejectionHandler, - Route, - ValidationRejection -} +import akka.http.scaladsl.server.{Directives, ExceptionHandler, RejectionHandler, Route} import akka.http.scaladsl.settings.RoutingSettings import app.softnetwork.api.server.config.ServerSettings import org.json4s.Formats @@ -58,26 +49,32 @@ trait ApiRoutes extends Directives with GrpcServices with DefaultComplete { // PrometheusRegistry.defaultRegistry. Wraps the WHOLE pipeline (outside handleRejections / // handleExceptions) so the final response — rejection/exception ones included — is observed. HttpMetrics.withMetrics { - handleRejections(rejectionHandler) { - handleExceptions(exceptionHandler) { - logRequestResult("RestAll") { - pathPrefix(config.ServerSettings.RootPath) { - Try( - respondWithHeaders(RawHeader("Api-Version", applicationVersion)) { - routes - } - ) match { - case Success(s) => s - case Failure(f) => - log.error(f.getMessage, f.getCause) - complete( - HttpResponse( - StatusCodes.InternalServerError, - entity = f.getMessage + // Story 13.7 Phase B (gate #1) — extract-or-generate X-Correlation-Id, MDC-stamp the + // synchronous request thread, re-inject the canonical id onto the request (so downstream tapir + // `HttpCorrelation.correlationInput` reads it), and echo it on the response. Inside withMetrics + // so the latency timing still wraps the whole pipeline. + HttpCorrelation.withCorrelation { + handleRejections(rejectionHandler) { + handleExceptions(exceptionHandler) { + logRequestResult("RestAll") { + pathPrefix(config.ServerSettings.RootPath) { + Try( + respondWithHeaders(RawHeader("Api-Version", applicationVersion)) { + routes + } + ) match { + case Success(s) => s + case Failure(f) => + log.error(f.getMessage, f.getCause) + complete( + HttpResponse( + StatusCodes.InternalServerError, + entity = f.getMessage + ) ) - ) - } - } ~ grpcRoutes(system) + } + } ~ grpcRoutes(system) + } } } } diff --git a/server/src/main/scala/app/softnetwork/api/server/Endpoint.scala b/server/src/main/scala/app/softnetwork/api/server/Endpoint.scala index 959ef07..7b97722 100644 --- a/server/src/main/scala/app/softnetwork/api/server/Endpoint.scala +++ b/server/src/main/scala/app/softnetwork/api/server/Endpoint.scala @@ -22,6 +22,11 @@ object Endpoint { endpoints: List[ServerEndpoint[AkkaStreams with WebSockets, Future]] )(implicit ec: ExecutionContext - ): Route = AkkaHttpServerInterpreter().toRoute(endpoints) + ): Route = + // Story 13.7 — wrap the interpreted tapir route so every endpoint set generates-or-extracts the + // correlation id, echoes it on the response and feeds HttpCorrelation.correlationInput, even when + // mounted outside ApiRoutes.mainRoutes. withCorrelation is re-entrant, so nesting under mainRoutes + // stays a single header + one MDC scope. + HttpCorrelation.withCorrelation(AkkaHttpServerInterpreter().toRoute(endpoints)) } diff --git a/server/src/main/scala/app/softnetwork/api/server/HttpCorrelation.scala b/server/src/main/scala/app/softnetwork/api/server/HttpCorrelation.scala new file mode 100644 index 0000000..86cf387 --- /dev/null +++ b/server/src/main/scala/app/softnetwork/api/server/HttpCorrelation.scala @@ -0,0 +1,101 @@ +package app.softnetwork.api.server + +import akka.http.scaladsl.model.HttpRequest +import akka.http.scaladsl.model.headers.RawHeader +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server.Route +import org.slf4j.MDC +import sttp.tapir.{extractFromRequest, EndpointInput} +import sttp.tapir.model.ServerRequest + +import java.util.UUID + +/** Story 13.7 Phase B (gate #1) — cross-service correlation id ingress. + * + * One id per inbound HTTP request, extracted from `X-Correlation-Id` or generated when the client + * does not supply one, echoed on the response, and propagated two ways: + * - as MDC (`correlation_id`) for the SYNCHRONOUS akka-http request thread's operational stdout + * logs. MDC is a `ThreadLocal` and does NOT survive an async (`Future`) boundary, so it is NOT + * the carrier across the event-sourced path (C2) nor into a tapir `serverLogic` `Future` + * (C14); + * - as DATA into tapir handlers via [[correlationInput]]: `serverLogic` receives the id as a + * value and threads it onto the command (`cmd.withCorrelationId(cid)`, an + * [[app.softnetwork.persistence.message.AuditableCommand]]) on the request thread. The command + * carries it across the sharding boundary (Kryo); the handler stamps it onto the journaled + * event's proto field ([[app.softnetwork.persistence.message.AuditableEvent]]) — the durable + * hop that survives replay. + * + * [[withCorrelation]] re-stamps the canonical id onto the inbound request before the inner route + * runs, so any downstream tapir endpoint reading [[correlationInput]] observes the SAME id the + * directive generated/echoed — the response header and the value threaded into the command never + * diverge, even when the client sent no header. + * + * Mirrors [[HttpMetrics.withMetrics]]; apply at the same `ApiRoutes.mainRoutes` attach point: + * {{{ + * HttpMetrics.withMetrics { HttpCorrelation.withCorrelation { ... }} + * }}} + */ +object HttpCorrelation { + + /** Inbound + echoed HTTP header name. */ + final val HeaderName = "X-Correlation-Id" + + /** Lower-cased header name for akka-http `HttpHeader.is` matching (idempotent echo check). */ + private final val HeaderNameLower = HeaderName.toLowerCase + + /** SLF4J MDC key — must match `correlation_id` in the + * services' `logback.xml`. + */ + final val MdcKey = "correlation_id" + + /** Extract a non-blank id, or generate a fresh one. The single source of the default, shared by + * the directive and [[correlationInput]] so a generated id is identical wherever it is read. + */ + private def orGenerate(value: Option[String]): String = + value.map(_.trim).filter(_.nonEmpty).getOrElse(UUID.randomUUID().toString) + + /** akka-http directive — extract-or-generate the id, expose it on MDC for synchronous-route + * stdout logs, re-stamp the canonical header onto the request (so downstream tapir + * [[correlationInput]] sees it), and echo it on the response. + * + * Re-entrant: it does not clobber a `correlation_id` already on MDC (an outer application set + * it), and it echoes the header only if the response does not already carry one — so it is safe + * to apply at BOTH `ApiRoutes.mainRoutes` and `Endpoint.endpointsToRoute` (a tapir endpoint set + * is thus self-sufficient even when mounted outside `mainRoutes`, while nesting under + * `mainRoutes` produces a single header and one MDC scope). + */ + def withCorrelation(inner: Route): Route = + optionalHeaderValueByName(HeaderName) { hdr => + val cid = orGenerate(hdr) + // own the MDC scope only if no outer application already set it (avoid clobbering/early-clear). + val ownsMdc = MDC.get(MdcKey) == null + if (ownsMdc) MDC.put(MdcKey, cid) + val stamp: HttpRequest => HttpRequest = + req => req.removeHeader(HeaderName).addHeader(RawHeader(HeaderName, cid)) + mapRequest(stamp) { + mapResponse { resp => + // best-effort cleanup for the synchronous path; the async hazard is why the durable + // carrier is the explicit data input below, not this ThreadLocal. + if (ownsMdc) MDC.remove(MdcKey) + if (resp.headers.exists(_.is(HeaderNameLower))) resp + else resp.addHeader(RawHeader(HeaderName, cid)) + }(inner) + } + } + + /** Tapir input — yields the correlation id as a **value** so `serverLogic` threads it onto the + * command on the request thread, never via an MDC read across the `Future` (C14). It reads the + * (directive-stamped) `X-Correlation-Id` and, if absent, generates a default via [[orGenerate]] + * — so `serverLogic` ALWAYS receives a non-empty id, even when no client header arrived and even + * if the endpoint were mounted without [[withCorrelation]]. Prepend to an endpoint's inputs: + * {{{ + * endpoint.in(HttpCorrelation.correlationInput).in(...).serverLogic { case (cid, in) => + * val cmd = SomeCommand(...) + * cmd.withCorrelationId(cid) + * run(cmd) + * } + * }}} + */ + val correlationInput: EndpointInput[String] = + extractFromRequest((req: ServerRequest) => orGenerate(req.header(HeaderName))) +} diff --git a/server/src/test/scala/app/softnetwork/api/server/HttpCorrelationSpec.scala b/server/src/test/scala/app/softnetwork/api/server/HttpCorrelationSpec.scala new file mode 100644 index 0000000..6421bcc --- /dev/null +++ b/server/src/test/scala/app/softnetwork/api/server/HttpCorrelationSpec.scala @@ -0,0 +1,127 @@ +package app.softnetwork.api.server + +import akka.http.scaladsl.model.headers.RawHeader +import akka.http.scaladsl.server.{Directives, Route} +import akka.http.scaladsl.testkit.ScalatestRouteTest +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec +import org.slf4j.MDC +import sttp.tapir._ +import sttp.tapir.server.ServerEndpoint + +import scala.concurrent.{ExecutionContext, Future} + +/** Story 13.7 Phase B (gate #1) — proves the HttpCorrelation ingress: extract-or-generate the + * `X-Correlation-Id`, echo it on the response, re-inject the canonical id onto the request so a + * downstream consumer (akka-http directive AND tapir `correlationInput`) reads the SAME value, and + * expose it on MDC for the synchronous request thread. + */ +class HttpCorrelationSpec + extends AnyWordSpec + with Matchers + with ScalatestRouteTest + with Directives { + + import HttpCorrelation._ + + private val UuidRe = + "(?i)^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$".r + + // A directive-wrapped route whose leaves report what they observe PER REQUEST (extractRequest / + // optionalHeaderValueByName defer evaluation to request time, so the directive's mapRequest + + // MDC.put have already run). + private val route: Route = + withCorrelation { + concat( + path("ping")(get(complete("pong"))), + // what a downstream route reads off the (re-injected) request header + path("seen")(get(optionalHeaderValueByName(HeaderName) { h => + val seen: String = h.getOrElse("none") + complete(seen) + })), + // what MDC holds on the synchronous request thread + path("mdc")(get(extractRequest { _ => + val mdc: String = Option(MDC.get(MdcKey)).getOrElse("none") + complete(mdc) + })) + ) + } + + // End-to-end tapir proof (C14): correlationInput delivers the id to serverLogic as DATA. Mounted + // via Endpoint.endpointsToRoute (which wraps with withCorrelation) and with NO outer directive — so + // it proves the endpoint set is self-sufficient (default generation + echo) on its own. + private val tapirRoute: Route = { + implicit val ec: ExecutionContext = system.dispatcher + val echo: ServerEndpoint[Any, Future] = + endpoint.get + .in("echo") + .in(correlationInput) + .out(stringBody) + .serverLogic[Future](cid => Future.successful(Right(cid))) + Endpoint.endpointsToRoute(List(echo)) + } + + "HttpCorrelation.withCorrelation" should { + "generate and echo an X-Correlation-Id when the client sends none" in { + Get("/ping") ~> route ~> check { + val cid = header(HeaderName).map(_.value) + cid shouldBe defined + cid.get should fullyMatch regex UuidRe + responseAs[String] shouldBe "pong" + } + } + + "echo a client-supplied X-Correlation-Id unchanged" in { + Get("/ping") ~> addHeader(RawHeader(HeaderName, "abc-123")) ~> route ~> check { + header(HeaderName).map(_.value) shouldBe Some("abc-123") + } + } + + "treat a blank client header as absent and generate a fresh id" in { + Get("/ping") ~> addHeader(RawHeader(HeaderName, " ")) ~> route ~> check { + val cid = header(HeaderName).map(_.value) + cid shouldBe defined + cid.get should fullyMatch regex UuidRe + } + } + + "re-inject the id so a downstream route reads the SAME value echoed on the response" in { + Get("/seen") ~> route ~> check { + val downstream = responseAs[String] + downstream should not be "none" + header(HeaderName).map(_.value) shouldBe Some(downstream) + } + } + + "expose the id on MDC for the synchronous request thread" in { + Get("/mdc") ~> addHeader(RawHeader(HeaderName, "mdc-1")) ~> route ~> check { + responseAs[String] shouldBe "mdc-1" + } + } + + "be re-entrant: nesting produces a single echoed header" in { + val nested = withCorrelation(withCorrelation(path("ping")(get(complete("pong"))))) + Get("/ping") ~> nested ~> check { + headers.count(_.is(HeaderName.toLowerCase)) shouldBe 1 + responseAs[String] shouldBe "pong" + } + } + } + + "HttpCorrelation.correlationInput (self-sufficient via Endpoint.endpointsToRoute)" should { + "generate a default id, deliver it to serverLogic AND echo it — with no outer directive" in { + Get("/echo") ~> tapirRoute ~> check { + val delivered = responseAs[String] + delivered should fullyMatch regex UuidRe + header(HeaderName).map(_.value) shouldBe Some(delivered) + } + } + + "deliver a client-supplied id to serverLogic and echo it unchanged" in { + Get("/echo") ~> addHeader(RawHeader(HeaderName, "tapir-9")) ~> tapirRoute ~> check { + responseAs[String] shouldBe "tapir-9" + header(HeaderName).map(_.value) shouldBe Some("tapir-9") + } + } + } +}