diff --git a/core/src/main/scala/app/softnetwork/persistence/message/package.scala b/core/src/main/scala/app/softnetwork/persistence/message/package.scala index 7816966..0dd3873 100644 --- a/core/src/main/scala/app/softnetwork/persistence/message/package.scala +++ b/core/src/main/scala/app/softnetwork/persistence/message/package.scala @@ -153,11 +153,13 @@ package object message { */ trait AuditableCommand extends Command with Auditable { + type T <: AuditableCommand + var correlationId: Option[String] = None - override def withCorrelationId(correlationId: String): AuditableCommand = { + override def withCorrelationId(correlationId: String): T = { this.correlationId = Some(correlationId) - this + this.asInstanceOf[T] } } diff --git a/core/src/main/scala/app/softnetwork/persistence/service/Service.scala b/core/src/main/scala/app/softnetwork/persistence/service/Service.scala index b612e0d..fa15fc3 100644 --- a/core/src/main/scala/app/softnetwork/persistence/service/Service.scala +++ b/core/src/main/scala/app/softnetwork/persistence/service/Service.scala @@ -2,7 +2,7 @@ package app.softnetwork.persistence.service import akka.actor.typed.ActorSystem import app.softnetwork.persistence.typed.scaladsl.Patterns -import app.softnetwork.persistence.message.{Command, CommandResult} +import app.softnetwork.persistence.message.{AuditableCommand, Command, CommandResult} import scala.concurrent.{ExecutionContext, Future} import scala.reflect.ClassTag @@ -19,4 +19,21 @@ trait Service[C <: Command, R <: CommandResult] { _: Patterns[C, R] => this ?? (entityId, command) } + /** Story 13.7 — the single shared seam for threading a request/flow `correlationId` onto a + * command before dispatch. HTTP `serverLogic` (tapir) runs in a `Future` where MDC does not + * survive (C14), so the id must travel as DATA: the endpoint reads it via + * `HttpCorrelation.correlationInput` and calls this. No-op for commands that are not + * [[AuditableCommand]]. The id then rides the command across the cluster-sharding boundary + * (Kryo) and the handler journals it onto the event's `correlation_id` proto field. + */ + def runCorrelated(entityId: String, command: C, correlationId: String)(implicit + tTag: ClassTag[C] + ): Future[R] = { + if (correlationId.nonEmpty) command match { + case a: AuditableCommand => a.withCorrelationId(correlationId) + case _ => + } + run(entityId, command) + } + }