Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,13 @@ package object message {
*/
trait AuditableCommand extends Command with Auditable {

type T <: AuditableCommand

var correlationId: Option[String] = None

override def withCorrelationId(correlationId: String): AuditableCommand = {
override def withCorrelationId(correlationId: String): T = {
this.correlationId = Some(correlationId)
this
this.asInstanceOf[T]
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package app.softnetwork.persistence.service

import akka.actor.typed.ActorSystem
import app.softnetwork.persistence.typed.scaladsl.Patterns
import app.softnetwork.persistence.message.{Command, CommandResult}
import app.softnetwork.persistence.message.{AuditableCommand, Command, CommandResult}

import scala.concurrent.{ExecutionContext, Future}
import scala.reflect.ClassTag
Expand All @@ -19,4 +19,21 @@ trait Service[C <: Command, R <: CommandResult] { _: Patterns[C, R] =>
this ?? (entityId, command)
}

/** Story 13.7 — the single shared seam for threading a request/flow `correlationId` onto a
* command before dispatch. HTTP `serverLogic` (tapir) runs in a `Future` where MDC does not
* survive (C14), so the id must travel as DATA: the endpoint reads it via
* `HttpCorrelation.correlationInput` and calls this. No-op for commands that are not
* [[AuditableCommand]]. The id then rides the command across the cluster-sharding boundary
* (Kryo) and the handler journals it onto the event's `correlation_id` proto field.
*/
def runCorrelated(entityId: String, command: C, correlationId: String)(implicit
tTag: ClassTag[C]
): Future[R] = {
if (correlationId.nonEmpty) command match {
case a: AuditableCommand => a.withCorrelationId(correlationId)
case _ =>
}
run(entityId, command)
}

}
Loading