Skip to content

feat(cmd-queue): add CommandResult.handedOff() for cross-queue re-enqueue#60

Open
pditommaso wants to merge 9 commits into
masterfrom
feat/command-result-handed-off
Open

feat(cmd-queue): add CommandResult.handedOff() for cross-queue re-enqueue#60
pditommaso wants to merge 9 commits into
masterfrom
feat/command-result-handed-off

Conversation

@pditommaso
Copy link
Copy Markdown
Contributor

@pditommaso pditommaso commented Apr 20, 2026

Summary

Adds CommandResult.handedOff() so a handler can end a command's life on its current queue and continue it on a different queue. Supports the sched monitor-stream design (seqeralabs/sched#303).

Two accompanying breaking changes remove library-level single-queue assumptions so applications can wire multiple CommandService / MessageStream instances (one per queue) without fighting default beans.

What changes

lib-cmd-queue-redis

  • CommandStatus.HANDED_OFF — new non-terminal pseudo-status. Used only as a CommandResult marker; never written to CommandState.
  • CommandResult.handedOff() — new factory. Signals: "I've moved this command to another queue (e.g. via otherQueue.submit(...)). ACK the source without running the normal terminal-result transition."
  • CommandServiceImpl.processCommandWithHandler — one additional branch on HANDED_OFF: transition the persisted state to RUNNING (via state.started()) if it isn't already, then return true so the source is ACKed. The SUBMITTED → RUNNING transition is required so the destination queue's consumer dispatches to checkStatus() rather than re-running execute() — otherwise the non-idempotent initial work would re-fire on every poll, producing an infinite execute → handedOff loop.
  • Breaking — CommandServiceImpl is no longer @Singleton. Class annotations removed and field @Injects replaced with a public constructor. Each application must now wire its own CommandService bean(s) via a @Factory. Single-queue apps declare one factory method; multi-queue apps declare one per queue, all sharing the same CommandStateStore.
  • Test scaffolding: TestCommandQueueFactory now also produces the CommandService bean; new TestRedisStreamConfig provides the config bean required by the new @EachBean wiring (see below).

lib-data-stream-redis (bumped to 1.4.0)

  • Breaking — RedisMessageStream and LocalMessageStream are no longer auto-registered beans. All Micronaut class-level annotations (@Singleton, @Requires, @Inject, @PostConstruct) removed; both are now plain classes with public constructors.
  • New DefaultRedisMessageStream / DefaultLocalMessageStream — thin Micronaut-managed subclasses, each annotated @EachBean(RedisStreamConfig.class) plus @Requires(bean = RedisActivator.class) / @Requires(missingBeans = RedisActivator.class) respectively. One MessageStream bean is now produced per RedisStreamConfig bean in the context, inheriting the same qualifier — so multi-queue apps declare multiple named configs (e.g. via @EachProperty) and get named streams automatically, no hand-written per-queue factories required.
  • Migration — applications that previously injected MessageStream<String> unqualified must declare at least one RedisStreamConfig bean so @EachBean produces a stream.

READMEs

Both modules' READMEs updated with the breaking-change callout, migration examples, and the hand-off pattern including the idempotency caveat (non-atomic submit then ACK — handlers must tolerate replay).

Why this shape

An earlier draft proposed CommandResult.activeOnStream(dst) with an attachQueue / routing-registry / stream-name lookup inside the library. That conflated two genuinely different outcomes ("still active here" vs "done here, continue over there") and baked routing into the service. This version keeps routing out of the library entirely: handlers directly inject the target queue and call submit(...) themselves, and the framework just knows when to ACK without marking the command done.

Moving both CommandServiceImpl and the MessageStream implementations off @Singleton is the matching structural change: the previous class-level bean annotations encoded a one-queue-per-app assumption that has to be relaxed for hand-off to make sense.

Handler usage

@Inject @Named("monitor") CommandQueue monitorQueue;

@Override
public CommandResult<MyResult> execute(Command<MyParams> command) {
    backend.launch(...);
    monitorQueue.submit(CommandMsg.of(command.id(), command.type()));
    return CommandResult.handedOff();
}

Idempotency contract

Hand-off is not atomic — the destination submit happens before the source ACK. If the consumer crashes between the two, the source is redelivered and the handler runs again. Handlers must tolerate replay, typically by persisting an intermediate state before non-idempotent side-effects so the replay can skip them. On the destination side, the SUBMITTED → RUNNING transition performed by the framework ensures subsequent consumer passes dispatch to checkStatus() rather than re-invoking execute().

Test plan

  • New CommandResultHandedOffTest covers: handedOff() produces a non-terminal HANDED_OFF result; HANDED_OFF is not terminal; only SUCCEEDED / FAILED / CANCELLED are terminal; state.started() correctly drives SUBMITTED → RUNNING before ACK.
  • Existing test suites pass (:lib-cmd-queue-redis:test, :lib-data-stream-redis:test) after the DI reshape, with TestCommandQueueFactory and TestRedisStreamConfig providing the beans the library no longer self-registers.
  • Integration verification in the downstream sched PR: two CommandService beans wired from named RedisStreamConfigs, handlers registered on both, hand-off routes correctly.

🤖 Generated with Claude Code

pditommaso and others added 3 commits April 20, 2026 13:04
…other queue

Supports the sched monitor-stream design (seqeralabs/sched#303). A handler
that has re-submitted its command to another queue (via a directly-injected
CommandQueue) can return CommandResult.handedOff() to tell the framework to
ACK the source message without touching the persisted command state. The
command stays in RUNNING; a handler on the destination queue will drive it
to a terminal status.

Changes:
- CommandStatus.HANDED_OFF: pseudo-status, non-terminal. Used only as a
  CommandResult marker; never persisted to CommandState.
- CommandResult.handedOff(): new factory.
- CommandServiceImpl.processCommandWithHandler: on HANDED_OFF, log and
  return true so the source is ACKed without state transition.
- CommandServiceImpl now uses constructor injection, enabling consumers to
  produce additional @nAmed CommandService beans (one per queue) via a
  Factory while the default @singleton continues to work unchanged.

No existing behaviour is changed: active() and running() are untouched,
RUNNING enum value remains the canonical non-terminal state for persisted
CommandState, no public API is widened.

Handler usage:

    @Inject @nAmed("monitor") CommandQueue monitorQueue;

    computeBackend.launch(...);
    monitorQueue.submit(CommandMsg.of(command.id(), command.type()));
    return CommandResult.handedOff();

Idempotency: if the process crashes between queue.submit(dst) and the
source ACK, the source message is redelivered; handlers are expected to
be idempotent under re-execution (e.g. by persisting intermediate state
before side-effects).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Enables applications that need multiple stream instances with independent
RedisStreamConfig (different claim timeouts, consumer groups) to construct
additional instances via a @factory. Default @singleton bean continues to
work unchanged — Micronaut uses the constructor for DI.

No behavioural change; tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
lib-cmd-queue-redis: document CommandResult.handedOff() and how to wire
multiple CommandService instances with independent queues. Include runnable
@factory example, handler usage, and the idempotency caveat (non-atomic
submit-then-ACK).

lib-data-stream-redis: document how to construct additional RedisMessageStream
instances via @factory for applications that need per-stream configs
(different claim timeouts, consumer groups).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
@pditommaso pditommaso requested a review from jordeu April 20, 2026 12:11
pditommaso and others added 6 commits April 20, 2026 22:17
Strip @Singleton/@requires from RedisMessageStream and LocalMessageStream
so they can be instantiated directly. Move bean wiring into new
DefaultRedisMessageStream / DefaultLocalMessageStream subclasses annotated
with @EachBean(RedisStreamConfig.class), so apps get one stream per config
bean without hand-written factories.

Update lib-cmd-queue-redis tests to provide a RedisStreamConfig bean now
that MessageStream wiring requires one. Bump lib-data-stream-redis to 1.4.0
and document the breaking change plus @EachBean wiring in the README.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ANDED_OFF rationale

- Merge the two consecutive javadoc blocks on CommandServiceImpl into one.
  The previous state had a legacy "Implementation of the command service..."
  block followed by a new "NOTE: not annotated with @singleton" block —
  javac attaches only the last javadoc to the class, so the legacy
  description was being dropped. Unify into a single block covering both
  the wiring contract and the processing flow.

- Expand the rationale comment on the HANDED_OFF branch. Explain numbered
  steps (persist RUNNING, then ACK) and why the state.status() != RUNNING
  guard avoids a redundant Redis write when a command hops queues more
  than once over its lifetime.

No behavioural change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Replaces the handler-orchestrated hand-off protocol with a declarative
framework primitive. Handlers now just return CommandResult.handedOff();
routing is configured once via CommandService.handoffTo(target). The
actual transition runs inside one Redis MULTI — XACK + XDEL on the
source stream plus XADD on the destination — so there is no observable
duplicate-message window under crash between the two operations.

lib-data-stream-redis:
- New TxContext<T> public interface. A MessageConsumer receives it per
  message and can call ctx.offer(dstStreamId, message) to queue an XADD
  that runs inside the same MULTI as the source ACK. MessageConsumer's
  signature changes accordingly (breaking).
- New impl-private TxContextCollector (lazy-init list of PendingOffer)
  shared by RedisMessageStream and LocalMessageStream. Empty-offers
  path stays allocation-free.
- RedisMessageStream.consume drains the collected offers into the
  existing MULTI.
- LocalMessageStream: delegate map is now static (shared across every
  in-JVM instance) — without that, the @EachBean wiring produced
  isolated maps so a cross-stream offer on one instance could never
  reach a consumer living on another. init/offer use computeIfAbsent
  so hand-offs that fire before the destination consumer starts don't
  NPE.
- AbstractMessageStream.processMessage threads a typed TxContext<M>
  wrapping the raw String context with the stream's encoder.

lib-cmd-queue-redis:
- New CommandService.handoffTo(CommandService target). Must be called
  before start(). On HANDED_OFF, the framework persists state as
  RUNNING and queues the XADD via the supplied TxContext.
- CommandService.queueStreamName() added to the public interface so
  handoffTo doesn't need an instanceof cast to the impl.
- processCommand / processCommandWithHandler thread TxContext<CommandMsg>.
- HANDED_OFF-with-no-target is logged at ERROR and marks the command
  FAILED (config misuse).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
The Redis-backed tiered cache test uses a 150ms entry TTL that the test
itself exercises twice: once implicitly (put, then several assertions,
then a get() that expects the entry to still be live) and once explicitly
(sleep TTL*2, then get() that expects null). 150ms is not enough slack
for a loaded CI runner — testcontainers Redis startup + Micronaut context
startup + the Spock assertion machinery has been observed to eat that
window and fail the live-entry assertion non-deterministically.

Cache-tiered tests had been passing from Gradle's remote task cache in CI
(FROM-CACHE hits on :lib-cache-tiered-redis:test in recent successful runs
on master); the flake surfaced when an unrelated buildSrc edit invalidated
the cache and forced the test to actually execute.

Bump TTL to 2s for all four occurrences. The expiration branch still runs
in 2*TTL = 4s — acceptable, and the live-entry assertions are now robust
under CI jitter.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
The prior commit made LocalMessageStream's backing map static so that
multiple @EachBean-produced instances could see the same set of streams
(needed for atomic cross-stream hand-off via TxContext on non-Redis
environments). But static state is JVM-scoped, which meant the map
survived across Micronaut test contexts in the same JVM — causing
lingering listener threads and residual messages from earlier tests to
interfere with later tests. CI hit a 15-minute timeout on one such
hung test.

Replace the static map with an injectable LocalMessageStreamStore
@singleton: one store bean per Micronaut context, shared by every
DefaultLocalMessageStream bean in that context. Test isolation is
restored (each @MicronautTest gets its own store) while the
cross-stream hand-off still works for multi-queue applications (all
DefaultLocalMessageStream instances in one context point at the
same backing map).

LocalMessageStream gains a second constructor taking the map
explicitly; the no-arg constructor still creates an isolated instance
for programmatic / unit-test use.

Also raise .github/workflows/build.yml timeout-minutes from 15 to 30.
The 15-minute ceiling was marginal once Gradle's remote test cache
gets invalidated by unrelated buildSrc changes, forcing every test to
actually execute. 30m gives comfortable headroom for cache-cold runs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants