From 61497009232d98c6778770f3223d4141de0f1be1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Sun, 14 Jun 2026 06:56:38 +0200 Subject: [PATCH 1/5] feat(audit): HTTP correlation-id ingress directive + tapir input (Story 13.7 Phase B gate #1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit HttpCorrelation.withCorrelation — an akka-http directive (mirrors HttpMetrics.withMetrics, applied at the same ApiRoutes.mainRoutes attach point) that extracts-or-generates X-Correlation-Id, MDC-stamps the synchronous request thread (operational stdout), re-injects the canonical id onto the request so downstream tapir reads the same value, and echoes it on the response. HttpCorrelation.correlationInput — a reusable tapir EndpointInput[Option[String]] (extractFromRequest) so serverLogic receives the id as DATA and threads it onto the command (AuditableCommand.withCorrelationId) on the request thread, never an MDC read across the Future (C14). Builds on the Auditable/AuditableCommand/AuditableEvent carriers plus withCid already promoted in b4d0ee1. Bumps tapir 1.7.0->1.8.5, tapirHttpSession 0.2.0->0.3.0, logback 1.2.3->1.4.14, slf4j. HttpCorrelationSpec (7 tests, incl. end-to-end tapir serverLogic delivery) and HttpMetricsSpec green; +server/compile (2.12+2.13) and scalafmtAll clean. Co-Authored-By: Claude Opus 4.8 (1M context) --- project/Versions.scala | 6 +- .../softnetwork/api/server/ApiRoutes.scala | 44 ++++--- .../api/server/HttpCorrelation.scala | 82 ++++++++++++ .../api/server/HttpCorrelationSpec.scala | 119 ++++++++++++++++++ 4 files changed, 229 insertions(+), 22 deletions(-) create mode 100644 server/src/main/scala/app/softnetwork/api/server/HttpCorrelation.scala create mode 100644 server/src/test/scala/app/softnetwork/api/server/HttpCorrelationSpec.scala diff --git a/project/Versions.scala b/project/Versions.scala index 0cfa4e6..ef01e76 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -12,9 +12,9 @@ object Versions { val akkaHttpSession = "0.7.1" // 0.7.0 -> 0.7.1 - val tapir = "1.7.0" + val tapir = "1.8.5" - val tapirHttpSession = "0.2.0" + val tapirHttpSession = "0.3.0" val akkaPersistenceJdbc = "5.0.4" // TODO 5.0.4 -> 5.2.1 @@ -36,7 +36,7 @@ object Versions { val scalaLogging = "3.9.2" - val logback = "1.2.3" // TODO 1.2.3 -> 1.5.6 + val logback = "1.4.14" // TODO 1.4.14 -> 1.5.6 val slf4j = "1.7.36" diff --git a/server/src/main/scala/app/softnetwork/api/server/ApiRoutes.scala b/server/src/main/scala/app/softnetwork/api/server/ApiRoutes.scala index 5f9acc5..e78f863 100644 --- a/server/src/main/scala/app/softnetwork/api/server/ApiRoutes.scala +++ b/server/src/main/scala/app/softnetwork/api/server/ApiRoutes.scala @@ -58,26 +58,32 @@ trait ApiRoutes extends Directives with GrpcServices with DefaultComplete { // PrometheusRegistry.defaultRegistry. Wraps the WHOLE pipeline (outside handleRejections / // handleExceptions) so the final response — rejection/exception ones included — is observed. HttpMetrics.withMetrics { - handleRejections(rejectionHandler) { - handleExceptions(exceptionHandler) { - logRequestResult("RestAll") { - pathPrefix(config.ServerSettings.RootPath) { - Try( - respondWithHeaders(RawHeader("Api-Version", applicationVersion)) { - routes - } - ) match { - case Success(s) => s - case Failure(f) => - log.error(f.getMessage, f.getCause) - complete( - HttpResponse( - StatusCodes.InternalServerError, - entity = f.getMessage + // Story 13.7 Phase B (gate #1) — extract-or-generate X-Correlation-Id, MDC-stamp the + // synchronous request thread, re-inject the canonical id onto the request (so downstream tapir + // `HttpCorrelation.correlationInput` reads it), and echo it on the response. Inside withMetrics + // so the latency timing still wraps the whole pipeline. + HttpCorrelation.withCorrelation { + handleRejections(rejectionHandler) { + handleExceptions(exceptionHandler) { + logRequestResult("RestAll") { + pathPrefix(config.ServerSettings.RootPath) { + Try( + respondWithHeaders(RawHeader("Api-Version", applicationVersion)) { + routes + } + ) match { + case Success(s) => s + case Failure(f) => + log.error(f.getMessage, f.getCause) + complete( + HttpResponse( + StatusCodes.InternalServerError, + entity = f.getMessage + ) ) - ) - } - } ~ grpcRoutes(system) + } + } ~ grpcRoutes(system) + } } } } diff --git a/server/src/main/scala/app/softnetwork/api/server/HttpCorrelation.scala b/server/src/main/scala/app/softnetwork/api/server/HttpCorrelation.scala new file mode 100644 index 0000000..3e6dee2 --- /dev/null +++ b/server/src/main/scala/app/softnetwork/api/server/HttpCorrelation.scala @@ -0,0 +1,82 @@ +package app.softnetwork.api.server + +import akka.http.scaladsl.model.HttpRequest +import akka.http.scaladsl.model.headers.RawHeader +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server.Route +import org.slf4j.MDC +import sttp.tapir.{extractFromRequest, EndpointInput} +import sttp.tapir.model.ServerRequest + +import java.util.UUID + +/** Story 13.7 Phase B (gate #1) — cross-service correlation id ingress. + * + * One id per inbound HTTP request, extracted from `X-Correlation-Id` or generated when the client + * does not supply one, echoed on the response, and propagated two ways: + * - as MDC (`correlation_id`) for the SYNCHRONOUS akka-http request thread's operational stdout + * logs. MDC is a `ThreadLocal` and does NOT survive an async (`Future`) boundary, so it is NOT + * the carrier across the event-sourced path (C2) nor into a tapir `serverLogic` `Future` + * (C14); + * - as DATA into tapir handlers via [[correlationInput]]: `serverLogic` receives the id as a + * value and threads it onto the command (`cmd.withCorrelationId(cid)`, an + * [[app.softnetwork.persistence.message.AuditableCommand]]) on the request thread. The command + * carries it across the sharding boundary (Kryo); the handler stamps it onto the journaled + * event's proto field ([[app.softnetwork.persistence.message.AuditableEvent]]) — the durable + * hop that survives replay. + * + * [[withCorrelation]] re-stamps the canonical id onto the inbound request before the inner route + * runs, so any downstream tapir endpoint reading [[correlationInput]] observes the SAME id the + * directive generated/echoed — the response header and the value threaded into the command never + * diverge, even when the client sent no header. + * + * Mirrors [[HttpMetrics.withMetrics]]; apply at the same `ApiRoutes.mainRoutes` attach point: + * {{{HttpMetrics.withMetrics { HttpCorrelation.withCorrelation { ... } }}}} + */ +object HttpCorrelation { + + /** Inbound + echoed HTTP header name. */ + final val HeaderName = "X-Correlation-Id" + + /** SLF4J MDC key — must match `correlation_id` in the + * services' `logback.xml`. + */ + final val MdcKey = "correlation_id" + + private def orGenerate(value: Option[String]): String = + value.map(_.trim).filter(_.nonEmpty).getOrElse(UUID.randomUUID().toString) + + /** akka-http directive — extract-or-generate the id, expose it on MDC for synchronous-route + * stdout logs, re-stamp the canonical header onto the request (so downstream tapir + * [[correlationInput]] sees it), and echo it on the response. + */ + def withCorrelation(inner: Route): Route = + optionalHeaderValueByName(HeaderName) { hdr => + val cid = orGenerate(hdr) + MDC.put(MdcKey, cid) + val stamp: HttpRequest => HttpRequest = + req => req.removeHeader(HeaderName).addHeader(RawHeader(HeaderName, cid)) + mapRequest(stamp) { + mapResponse { resp => + // best-effort cleanup for the synchronous path; the async hazard is exactly why the + // durable carrier is the explicit data input below, not this ThreadLocal. + MDC.remove(MdcKey) + resp.addHeader(RawHeader(HeaderName, cid)) + }(inner) + } + } + + /** Tapir input — reads the (directive-stamped) `X-Correlation-Id` so `serverLogic` receives the + * id as a value and threads it onto the command on the request thread, never via an MDC read + * across the `Future` (C14). Prepend to an endpoint's inputs: + * {{{ + * endpoint.in(HttpCorrelation.correlationInput).in(...).serverLogic { case (cidOpt, in) => + * val cmd = SomeCommand(...) + * cidOpt.foreach(cmd.withCorrelationId) + * run(cmd) + * } + * }}} + */ + val correlationInput: EndpointInput[Option[String]] = + extractFromRequest((req: ServerRequest) => req.header(HeaderName)) +} diff --git a/server/src/test/scala/app/softnetwork/api/server/HttpCorrelationSpec.scala b/server/src/test/scala/app/softnetwork/api/server/HttpCorrelationSpec.scala new file mode 100644 index 0000000..b5d9520 --- /dev/null +++ b/server/src/test/scala/app/softnetwork/api/server/HttpCorrelationSpec.scala @@ -0,0 +1,119 @@ +package app.softnetwork.api.server + +import akka.http.scaladsl.model.headers.RawHeader +import akka.http.scaladsl.server.{Directives, Route} +import akka.http.scaladsl.testkit.ScalatestRouteTest +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec +import org.slf4j.MDC +import sttp.tapir._ +import sttp.tapir.server.ServerEndpoint +import sttp.tapir.server.akkahttp.AkkaHttpServerInterpreter + +import scala.concurrent.{ExecutionContext, Future} + +/** Story 13.7 Phase B (gate #1) — proves the HttpCorrelation ingress: extract-or-generate the + * `X-Correlation-Id`, echo it on the response, re-inject the canonical id onto the request so a + * downstream consumer (akka-http directive AND tapir `correlationInput`) reads the SAME value, and + * expose it on MDC for the synchronous request thread. + */ +class HttpCorrelationSpec + extends AnyWordSpec + with Matchers + with ScalatestRouteTest + with Directives { + + import HttpCorrelation._ + + private val UuidRe = + "(?i)^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$".r + + // A directive-wrapped route whose leaves report what they observe PER REQUEST (extractRequest / + // optionalHeaderValueByName defer evaluation to request time, so the directive's mapRequest + + // MDC.put have already run). + private val route: Route = + withCorrelation { + concat( + path("ping")(get(complete("pong"))), + // what a downstream route reads off the (re-injected) request header + path("seen")(get(optionalHeaderValueByName(HeaderName) { h => + val seen: String = h.getOrElse("none") + complete(seen) + })), + // what MDC holds on the synchronous request thread + path("mdc")(get(extractRequest { _ => + val mdc: String = Option(MDC.get(MdcKey)).getOrElse("none") + complete(mdc) + })) + ) + } + + // End-to-end tapir proof (C14): correlationInput delivers the id to serverLogic as DATA. + private val tapirRoute: Route = { + implicit val ec: ExecutionContext = system.dispatcher + val echo: ServerEndpoint[Any, Future] = + endpoint.get + .in("echo") + .in(correlationInput) + .out(stringBody) + .serverLogic[Future](cidOpt => Future.successful(Right(cidOpt.getOrElse("none")))) + withCorrelation(AkkaHttpServerInterpreter().toRoute(List(echo))) + } + + "HttpCorrelation.withCorrelation" should { + "generate and echo an X-Correlation-Id when the client sends none" in { + Get("/ping") ~> route ~> check { + val cid = header(HeaderName).map(_.value) + cid shouldBe defined + cid.get should fullyMatch regex UuidRe + responseAs[String] shouldBe "pong" + } + } + + "echo a client-supplied X-Correlation-Id unchanged" in { + Get("/ping") ~> addHeader(RawHeader(HeaderName, "abc-123")) ~> route ~> check { + header(HeaderName).map(_.value) shouldBe Some("abc-123") + } + } + + "treat a blank client header as absent and generate a fresh id" in { + Get("/ping") ~> addHeader(RawHeader(HeaderName, " ")) ~> route ~> check { + val cid = header(HeaderName).map(_.value) + cid shouldBe defined + cid.get should fullyMatch regex UuidRe + } + } + + "re-inject the id so a downstream route reads the SAME value echoed on the response" in { + Get("/seen") ~> route ~> check { + val downstream = responseAs[String] + downstream should not be "none" + header(HeaderName).map(_.value) shouldBe Some(downstream) + } + } + + "expose the id on MDC for the synchronous request thread" in { + Get("/mdc") ~> addHeader(RawHeader(HeaderName, "mdc-1")) ~> route ~> check { + responseAs[String] shouldBe "mdc-1" + } + } + } + + "HttpCorrelation.correlationInput" should { + "deliver the (directive-injected) id to a tapir serverLogic as data" in { + Get("/echo") ~> tapirRoute ~> check { + val delivered = responseAs[String] + delivered should not be "none" + delivered should fullyMatch regex UuidRe + header(HeaderName).map(_.value) shouldBe Some(delivered) + } + } + + "deliver a client-supplied id to a tapir serverLogic" in { + Get("/echo") ~> addHeader(RawHeader(HeaderName, "tapir-9")) ~> tapirRoute ~> check { + responseAs[String] shouldBe "tapir-9" + header(HeaderName).map(_.value) shouldBe Some("tapir-9") + } + } + } +} From 6f4982af65afe72d8207a22c66aff7108dac6181 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Sun, 14 Jun 2026 07:33:42 +0200 Subject: [PATCH 2/5] feat(audit): promote generic AuditLog to core (Story 13.7 Phase B gate #1) Generic structured audit logger in core (co-located with the Auditable/AuditableEvent carriers) so every pod (licensing/notification/payment/scheduler) reuses one helper instead of an in-repo copy. One FIXED shared logger name app.softnetwork.audit -> a single additivity=false route in the shared logback.xml template; each pod still writes its own pod-local audit.log; the service field (per-eventType resolver, or a fixed value) distinguishes domains at query time. correlationId stays explicit data (never MDC across the async path, C2). Adds logstash-logback-encoder 8.1 to core (StructuredArguments.kv). AuditLogSpec (3 tests via logback ListAppender) green; +core/compile (2.12+2.13) + scalafmtAll clean. Also folds the ApiRoutes unused-import cleanup from gate #1. Co-Authored-By: Claude Opus 4.8 (1M context) --- build.sbt | 5 +- .../persistence/audit/AuditLog.scala | 89 +++++++++++++++++++ .../persistence/audit/AuditLogSpec.scala | 88 ++++++++++++++++++ project/Versions.scala | 5 ++ .../softnetwork/api/server/ApiRoutes.scala | 11 +-- 5 files changed, 187 insertions(+), 11 deletions(-) create mode 100644 core/src/main/scala/app/softnetwork/persistence/audit/AuditLog.scala create mode 100644 core/src/test/scala/app/softnetwork/persistence/audit/AuditLogSpec.scala diff --git a/build.sbt b/build.sbt index 7611c3a..fde3427 100644 --- a/build.sbt +++ b/build.sbt @@ -97,7 +97,10 @@ lazy val core = project.in(file("core")) .settings( Defaults.itSettings, app.softnetwork.Info.infoSettings, - moduleSettings + moduleSettings, + // Story 13.7 — AuditLog uses net.logstash.logback.argument.StructuredArguments.kv; co-located + // with the Auditable/AuditableEvent carriers in this module. + libraryDependencies += "net.logstash.logback" % "logstash-logback-encoder" % Versions.logstashEncoder ) .dependsOn( common % "compile->compile;test->test;it->it" diff --git a/core/src/main/scala/app/softnetwork/persistence/audit/AuditLog.scala b/core/src/main/scala/app/softnetwork/persistence/audit/AuditLog.scala new file mode 100644 index 0000000..a6ec247 --- /dev/null +++ b/core/src/main/scala/app/softnetwork/persistence/audit/AuditLog.scala @@ -0,0 +1,89 @@ +package app.softnetwork.persistence.audit + +import net.logstash.logback.argument.StructuredArguments.kv +import org.slf4j.LoggerFactory + +/** Story 13.7 — generic structured audit logger (cross-service audit trail). + * + * Every service emits to ONE fixed audit logger name ([[AuditLog.LoggerName]]) so the **shared** + * `logback.xml` template needs a single `additivity=false` route to the synchronous, non-dropping + * `audit.log` appender. Each pod still writes its own pod-local `audit.log` (separate files + * because separate pods); its promtail sidecar tails it → Loki `{stream="audit"}`. The logical + * domain is carried by the **`service` JSON field** (not the logger name, not the file), so + * licensing / notification / payment / scheduler lines are distinguished at query time while + * sharing one route. + * + * `correlationId` is passed **explicitly** (threaded from the command/event as plain data) and + * emitted as a structured `correlation_id` field — it is deliberately NOT read from MDC, because + * the projection streams emit audit lines from `Future` continuations where a `ThreadLocal` MDC + * value would not survive (Story 13.7 C2). MDC is reserved for the synchronous HTTP request thread + * (see `app.softnetwork.api.server.HttpCorrelation`). + * + * PII / secret masking (emails, `sk_`/`pk_`/`whsec_`/`Bearer` tokens) is enforced downstream at + * the logback `MaskingJsonGeneratorDecorator`, so callers never need to pre-redact; code must + * still never pass a raw secret to a logger (defence in depth). + * + * @param serviceOf + * maps an `eventType` to its logical business domain (the `service` field). A single-domain pod + * passes a constant (see [[AuditLog.apply(service:String)*]]); a pod that emits lines for + * several domains (e.g. the licensing pod) passes a resolver. A caller may still override per + * call by passing an explicit `"service"` entry in `fields`. + * @param defaultActor + * the `actor` for machine-initiated flows when a caller does not override it. + */ +class AuditLog( + serviceOf: String => String, + defaultActor: String = AuditLog.DefaultActor +) { + + private val log = LoggerFactory.getLogger(AuditLog.LoggerName) + + /** Emit an audit event. + * + * Always emits `event_type`, `correlation_id`, `service` (from [[serviceOf]]) and `actor` + * (default [[defaultActor]]); `service` / `actor` can be overridden by passing a `"service"` / + * `"actor"` entry in `fields`. + * + * @param correlationId + * the cross-service correlation id (threaded as data; `"-"` when unknown) + * @param eventType + * the catalog event type, e.g. `license_issued`, `notification_sent` + * @param fields + * additional structured fields (`organization_id`, `channel`, `template`, …); values are + * emitted verbatim and masked downstream by the encoder + */ + def event(correlationId: String, eventType: String, fields: (String, Any)*): Unit = { + val provided = fields.iterator.map(_._1).toSet + val base = List.newBuilder[(String, Any)] + base += "event_type" -> eventType + base += "correlation_id" -> correlationId + if (!provided.contains("service")) base += "service" -> serviceOf(eventType) + if (!provided.contains("actor")) base += "actor" -> defaultActor + val args: Seq[AnyRef] = (base.result() ++ fields).map { case (k, v) => kv(k, v) } + // The "audit" message has no `{}` placeholders on purpose — logstash-logback-encoder still + // appends every StructuredArgument as a top-level JSON field. Binds slf4j info(String, Object...). + log.info("audit", args: _*) + } +} + +object AuditLog { + + /** The single fixed audit logger name — one `additivity=false` route in the shared `logback.xml` + * template feeds every service's pod-local `audit.log`; the `service` field distinguishes + * domains. + */ + final val LoggerName = "app.softnetwork.audit" + + /** Default `actor` for machine-initiated flows. */ + final val DefaultActor = "system" + + /** Audit logger for a single-domain pod (e.g. notification) — every line carries `service`. */ + def apply(service: String): AuditLog = + new AuditLog(_ => service) + + /** Audit logger for a pod that emits lines for several domains (e.g. the licensing pod) — + * `service` is resolved per `eventType`. + */ + def apply(serviceOf: String => String): AuditLog = + new AuditLog(serviceOf) +} diff --git a/core/src/test/scala/app/softnetwork/persistence/audit/AuditLogSpec.scala b/core/src/test/scala/app/softnetwork/persistence/audit/AuditLogSpec.scala new file mode 100644 index 0000000..a1925b4 --- /dev/null +++ b/core/src/test/scala/app/softnetwork/persistence/audit/AuditLogSpec.scala @@ -0,0 +1,88 @@ +package app.softnetwork.persistence.audit + +import ch.qos.logback.classic.{Level, Logger} +import ch.qos.logback.classic.spi.ILoggingEvent +import ch.qos.logback.core.read.ListAppender +import org.scalatest.BeforeAndAfterEach +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec +import org.slf4j.LoggerFactory + +/** Story 13.7 — proves the generic AuditLog assembles the always-present structured fields + * (`event_type`, `correlation_id`, `service`, `actor`) plus caller fields, resolves `service` from + * the eventType, and lets a caller override `service` / `actor`. Field rendering is captured via a + * logback `ListAppender` on the shared audit logger (no LogstashEncoder needed at unit level). + */ +class AuditLogSpec extends AnyWordSpec with Matchers with BeforeAndAfterEach { + + private val logger = LoggerFactory.getLogger(AuditLog.LoggerName).asInstanceOf[Logger] + private val appender = new ListAppender[ILoggingEvent]() + + override def beforeEach(): Unit = { + appender.list.clear() + appender.start() + logger.addAppender(appender) + logger.setLevel(Level.INFO) + } + + override def afterEach(): Unit = { + logger.detachAppender(appender) + appender.stop() + } + + private def lastEvent: ILoggingEvent = appender.list.get(appender.list.size() - 1) + + // StructuredArguments.kv("k", v).toString renders as "k=v"; rebuild the field map from the args. + private def lastArgs: Map[String, String] = + lastEvent.getArgumentArray + .map(_.toString) + .map { s => + val i = s.indexOf('=') + s.substring(0, i) -> s.substring(i + 1) + } + .toMap + + "AuditLog (fixed service)" should { + "emit event_type, correlation_id, service, the default actor and custom fields" in { + AuditLog("notification").event( + "cid-1", + "notification_sent", + "channel" -> "email", + "template" -> "welcome.mustache" + ) + lastEvent.getMessage shouldBe "audit" + val m = lastArgs + m("event_type") shouldBe "notification_sent" + m("correlation_id") shouldBe "cid-1" + m("service") shouldBe "notification" + m("actor") shouldBe AuditLog.DefaultActor + m("channel") shouldBe "email" + m("template") shouldBe "welcome.mustache" + } + } + + "AuditLog (per-eventType resolver)" should { + "derive service from the eventType" in { + val audit = + AuditLog((et: String) => if (et.startsWith("schedule")) "scheduler" else "licensing") + audit.event("cid-2", "schedule_fired", "schedule_key" -> "renewal") + lastArgs("service") shouldBe "scheduler" + audit.event("cid-3", "license_issued") + lastArgs("service") shouldBe "licensing" + } + } + + "AuditLog override" should { + "let a caller override service and actor via fields" in { + AuditLog("licensing").event( + "cid-4", + "schedule_fired", + "service" -> "scheduler", + "actor" -> "scheduler" + ) + val m = lastArgs + m("service") shouldBe "scheduler" + m("actor") shouldBe "scheduler" + } + } +} diff --git a/project/Versions.scala b/project/Versions.scala index ef01e76..228fe4c 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -38,6 +38,11 @@ object Versions { val logback = "1.4.14" // TODO 1.4.14 -> 1.5.6 + // Story 13.7 — structured audit trail. Provides StructuredArguments.kv (used by AuditLog) and, at + // runtime in the service images, LogstashEncoder + MaskingJsonGeneratorDecorator. 8.1 is the newest + // line on logback 1.3/1.4 + Jackson 2.x (Java 11+); 9.x requires logback 1.5 + Jackson 3. + val logstashEncoder = "8.1" + val slf4j = "1.7.36" val log4s = "1.8.2" diff --git a/server/src/main/scala/app/softnetwork/api/server/ApiRoutes.scala b/server/src/main/scala/app/softnetwork/api/server/ApiRoutes.scala index e78f863..d8762a0 100644 --- a/server/src/main/scala/app/softnetwork/api/server/ApiRoutes.scala +++ b/server/src/main/scala/app/softnetwork/api/server/ApiRoutes.scala @@ -4,16 +4,7 @@ import java.util.concurrent.TimeoutException import akka.actor.typed.ActorSystem import akka.http.scaladsl.model.headers.RawHeader import akka.http.scaladsl.model.{HttpResponse, StatusCodes} -import akka.http.scaladsl.server.{ - AuthorizationFailedRejection, - Directives, - ExceptionHandler, - MethodRejection, - MissingCookieRejection, - RejectionHandler, - Route, - ValidationRejection -} +import akka.http.scaladsl.server.{Directives, ExceptionHandler, RejectionHandler, Route} import akka.http.scaladsl.settings.RoutingSettings import app.softnetwork.api.server.config.ServerSettings import org.json4s.Formats From 3addefb74741e68d6dcf8b96a100dd4f4c0e78da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Sun, 14 Jun 2026 07:38:34 +0200 Subject: [PATCH 3/5] docs(audit): update documentation formatting for HttpCorrelation --- .../scala/app/softnetwork/api/server/HttpCorrelation.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/scala/app/softnetwork/api/server/HttpCorrelation.scala b/server/src/main/scala/app/softnetwork/api/server/HttpCorrelation.scala index 3e6dee2..949bebe 100644 --- a/server/src/main/scala/app/softnetwork/api/server/HttpCorrelation.scala +++ b/server/src/main/scala/app/softnetwork/api/server/HttpCorrelation.scala @@ -31,7 +31,9 @@ import java.util.UUID * diverge, even when the client sent no header. * * Mirrors [[HttpMetrics.withMetrics]]; apply at the same `ApiRoutes.mainRoutes` attach point: - * {{{HttpMetrics.withMetrics { HttpCorrelation.withCorrelation { ... } }}}} + * {{{ + * HttpMetrics.withMetrics { HttpCorrelation.withCorrelation { ... }} + * }}} */ object HttpCorrelation { From 3f923906e452a58c8bd2aa052f11004a970f7a5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Sun, 14 Jun 2026 07:38:42 +0200 Subject: [PATCH 4/5] chore: update .gitignore to include additional IDE and build files --- .gitignore | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index d5586b3..45f09d6 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,10 @@ target *.pyc .vagrant .metals -*.log \ No newline at end of file +*.log +.bloop/ +.jvmopts +.sbtopts +*.sc +.bsp/ +.vscode/ From 839ef3ccf874bec26ce1a90986941ba7f5257379 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Sun, 14 Jun 2026 08:03:09 +0200 Subject: [PATCH 5/5] refactor(audit): self-sufficient endpoint correlation (default id + echo at the endpoint layer) correlationInput now yields String, generating a default cid when the header is absent, so serverLogic always receives a non-empty id (no None handling downstream). Endpoint.endpointsToRoute wraps the interpreted tapir route with withCorrelation, so every endpoint set generates-or-extracts the id, feeds correlationInput and echoes it on the response even when mounted outside mainRoutes. withCorrelation is now re-entrant (does not clobber an outer MDC scope; echoes only if the response has no header) so nesting under mainRoutes stays a single header + one MDC scope. HttpCorrelationSpec: +self-sufficiency test (no outer directive: default generated, delivered to serverLogic, echoed) and +re-entrancy test (single echoed header). 13 server tests green; +server/compile (2.12+2.13) + scalafmtAll clean. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../app/softnetwork/api/server/Endpoint.scala | 7 +++- .../api/server/HttpCorrelation.scala | 41 +++++++++++++------ .../api/server/HttpCorrelationSpec.scala | 24 +++++++---- 3 files changed, 51 insertions(+), 21 deletions(-) diff --git a/server/src/main/scala/app/softnetwork/api/server/Endpoint.scala b/server/src/main/scala/app/softnetwork/api/server/Endpoint.scala index 959ef07..7b97722 100644 --- a/server/src/main/scala/app/softnetwork/api/server/Endpoint.scala +++ b/server/src/main/scala/app/softnetwork/api/server/Endpoint.scala @@ -22,6 +22,11 @@ object Endpoint { endpoints: List[ServerEndpoint[AkkaStreams with WebSockets, Future]] )(implicit ec: ExecutionContext - ): Route = AkkaHttpServerInterpreter().toRoute(endpoints) + ): Route = + // Story 13.7 — wrap the interpreted tapir route so every endpoint set generates-or-extracts the + // correlation id, echoes it on the response and feeds HttpCorrelation.correlationInput, even when + // mounted outside ApiRoutes.mainRoutes. withCorrelation is re-entrant, so nesting under mainRoutes + // stays a single header + one MDC scope. + HttpCorrelation.withCorrelation(AkkaHttpServerInterpreter().toRoute(endpoints)) } diff --git a/server/src/main/scala/app/softnetwork/api/server/HttpCorrelation.scala b/server/src/main/scala/app/softnetwork/api/server/HttpCorrelation.scala index 949bebe..86cf387 100644 --- a/server/src/main/scala/app/softnetwork/api/server/HttpCorrelation.scala +++ b/server/src/main/scala/app/softnetwork/api/server/HttpCorrelation.scala @@ -40,45 +40,62 @@ object HttpCorrelation { /** Inbound + echoed HTTP header name. */ final val HeaderName = "X-Correlation-Id" + /** Lower-cased header name for akka-http `HttpHeader.is` matching (idempotent echo check). */ + private final val HeaderNameLower = HeaderName.toLowerCase + /** SLF4J MDC key — must match `correlation_id` in the * services' `logback.xml`. */ final val MdcKey = "correlation_id" + /** Extract a non-blank id, or generate a fresh one. The single source of the default, shared by + * the directive and [[correlationInput]] so a generated id is identical wherever it is read. + */ private def orGenerate(value: Option[String]): String = value.map(_.trim).filter(_.nonEmpty).getOrElse(UUID.randomUUID().toString) /** akka-http directive — extract-or-generate the id, expose it on MDC for synchronous-route * stdout logs, re-stamp the canonical header onto the request (so downstream tapir * [[correlationInput]] sees it), and echo it on the response. + * + * Re-entrant: it does not clobber a `correlation_id` already on MDC (an outer application set + * it), and it echoes the header only if the response does not already carry one — so it is safe + * to apply at BOTH `ApiRoutes.mainRoutes` and `Endpoint.endpointsToRoute` (a tapir endpoint set + * is thus self-sufficient even when mounted outside `mainRoutes`, while nesting under + * `mainRoutes` produces a single header and one MDC scope). */ def withCorrelation(inner: Route): Route = optionalHeaderValueByName(HeaderName) { hdr => val cid = orGenerate(hdr) - MDC.put(MdcKey, cid) + // own the MDC scope only if no outer application already set it (avoid clobbering/early-clear). + val ownsMdc = MDC.get(MdcKey) == null + if (ownsMdc) MDC.put(MdcKey, cid) val stamp: HttpRequest => HttpRequest = req => req.removeHeader(HeaderName).addHeader(RawHeader(HeaderName, cid)) mapRequest(stamp) { mapResponse { resp => - // best-effort cleanup for the synchronous path; the async hazard is exactly why the - // durable carrier is the explicit data input below, not this ThreadLocal. - MDC.remove(MdcKey) - resp.addHeader(RawHeader(HeaderName, cid)) + // best-effort cleanup for the synchronous path; the async hazard is why the durable + // carrier is the explicit data input below, not this ThreadLocal. + if (ownsMdc) MDC.remove(MdcKey) + if (resp.headers.exists(_.is(HeaderNameLower))) resp + else resp.addHeader(RawHeader(HeaderName, cid)) }(inner) } } - /** Tapir input — reads the (directive-stamped) `X-Correlation-Id` so `serverLogic` receives the - * id as a value and threads it onto the command on the request thread, never via an MDC read - * across the `Future` (C14). Prepend to an endpoint's inputs: + /** Tapir input — yields the correlation id as a **value** so `serverLogic` threads it onto the + * command on the request thread, never via an MDC read across the `Future` (C14). It reads the + * (directive-stamped) `X-Correlation-Id` and, if absent, generates a default via [[orGenerate]] + * — so `serverLogic` ALWAYS receives a non-empty id, even when no client header arrived and even + * if the endpoint were mounted without [[withCorrelation]]. Prepend to an endpoint's inputs: * {{{ - * endpoint.in(HttpCorrelation.correlationInput).in(...).serverLogic { case (cidOpt, in) => + * endpoint.in(HttpCorrelation.correlationInput).in(...).serverLogic { case (cid, in) => * val cmd = SomeCommand(...) - * cidOpt.foreach(cmd.withCorrelationId) + * cmd.withCorrelationId(cid) * run(cmd) * } * }}} */ - val correlationInput: EndpointInput[Option[String]] = - extractFromRequest((req: ServerRequest) => req.header(HeaderName)) + val correlationInput: EndpointInput[String] = + extractFromRequest((req: ServerRequest) => orGenerate(req.header(HeaderName))) } diff --git a/server/src/test/scala/app/softnetwork/api/server/HttpCorrelationSpec.scala b/server/src/test/scala/app/softnetwork/api/server/HttpCorrelationSpec.scala index b5d9520..6421bcc 100644 --- a/server/src/test/scala/app/softnetwork/api/server/HttpCorrelationSpec.scala +++ b/server/src/test/scala/app/softnetwork/api/server/HttpCorrelationSpec.scala @@ -8,7 +8,6 @@ import org.scalatest.wordspec.AnyWordSpec import org.slf4j.MDC import sttp.tapir._ import sttp.tapir.server.ServerEndpoint -import sttp.tapir.server.akkahttp.AkkaHttpServerInterpreter import scala.concurrent.{ExecutionContext, Future} @@ -48,7 +47,9 @@ class HttpCorrelationSpec ) } - // End-to-end tapir proof (C14): correlationInput delivers the id to serverLogic as DATA. + // End-to-end tapir proof (C14): correlationInput delivers the id to serverLogic as DATA. Mounted + // via Endpoint.endpointsToRoute (which wraps with withCorrelation) and with NO outer directive — so + // it proves the endpoint set is self-sufficient (default generation + echo) on its own. private val tapirRoute: Route = { implicit val ec: ExecutionContext = system.dispatcher val echo: ServerEndpoint[Any, Future] = @@ -56,8 +57,8 @@ class HttpCorrelationSpec .in("echo") .in(correlationInput) .out(stringBody) - .serverLogic[Future](cidOpt => Future.successful(Right(cidOpt.getOrElse("none")))) - withCorrelation(AkkaHttpServerInterpreter().toRoute(List(echo))) + .serverLogic[Future](cid => Future.successful(Right(cid))) + Endpoint.endpointsToRoute(List(echo)) } "HttpCorrelation.withCorrelation" should { @@ -97,19 +98,26 @@ class HttpCorrelationSpec responseAs[String] shouldBe "mdc-1" } } + + "be re-entrant: nesting produces a single echoed header" in { + val nested = withCorrelation(withCorrelation(path("ping")(get(complete("pong"))))) + Get("/ping") ~> nested ~> check { + headers.count(_.is(HeaderName.toLowerCase)) shouldBe 1 + responseAs[String] shouldBe "pong" + } + } } - "HttpCorrelation.correlationInput" should { - "deliver the (directive-injected) id to a tapir serverLogic as data" in { + "HttpCorrelation.correlationInput (self-sufficient via Endpoint.endpointsToRoute)" should { + "generate a default id, deliver it to serverLogic AND echo it — with no outer directive" in { Get("/echo") ~> tapirRoute ~> check { val delivered = responseAs[String] - delivered should not be "none" delivered should fullyMatch regex UuidRe header(HeaderName).map(_.value) shouldBe Some(delivered) } } - "deliver a client-supplied id to a tapir serverLogic" in { + "deliver a client-supplied id to serverLogic and echo it unchanged" in { Get("/echo") ~> addHeader(RawHeader(HeaderName, "tapir-9")) ~> tapirRoute ~> check { responseAs[String] shouldBe "tapir-9" header(HeaderName).map(_.value) shouldBe Some("tapir-9")