feat(cmd-queue): add CommandResult.handedOff() for cross-queue re-enqueue#60
Open
pditommaso wants to merge 9 commits into
Open
feat(cmd-queue): add CommandResult.handedOff() for cross-queue re-enqueue#60pditommaso wants to merge 9 commits into
pditommaso wants to merge 9 commits into
Conversation
…other queue Supports the sched monitor-stream design (seqeralabs/sched#303). A handler that has re-submitted its command to another queue (via a directly-injected CommandQueue) can return CommandResult.handedOff() to tell the framework to ACK the source message without touching the persisted command state. The command stays in RUNNING; a handler on the destination queue will drive it to a terminal status. Changes: - CommandStatus.HANDED_OFF: pseudo-status, non-terminal. Used only as a CommandResult marker; never persisted to CommandState. - CommandResult.handedOff(): new factory. - CommandServiceImpl.processCommandWithHandler: on HANDED_OFF, log and return true so the source is ACKed without state transition. - CommandServiceImpl now uses constructor injection, enabling consumers to produce additional @nAmed CommandService beans (one per queue) via a Factory while the default @singleton continues to work unchanged. No existing behaviour is changed: active() and running() are untouched, RUNNING enum value remains the canonical non-terminal state for persisted CommandState, no public API is widened. Handler usage: @Inject @nAmed("monitor") CommandQueue monitorQueue; computeBackend.launch(...); monitorQueue.submit(CommandMsg.of(command.id(), command.type())); return CommandResult.handedOff(); Idempotency: if the process crashes between queue.submit(dst) and the source ACK, the source message is redelivered; handlers are expected to be idempotent under re-execution (e.g. by persisting intermediate state before side-effects). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Enables applications that need multiple stream instances with independent RedisStreamConfig (different claim timeouts, consumer groups) to construct additional instances via a @factory. Default @singleton bean continues to work unchanged — Micronaut uses the constructor for DI. No behavioural change; tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
lib-cmd-queue-redis: document CommandResult.handedOff() and how to wire multiple CommandService instances with independent queues. Include runnable @factory example, handler usage, and the idempotency caveat (non-atomic submit-then-ACK). lib-data-stream-redis: document how to construct additional RedisMessageStream instances via @factory for applications that need per-stream configs (different claim timeouts, consumer groups). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
jordeu
approved these changes
Apr 20, 2026
Strip @Singleton/@requires from RedisMessageStream and LocalMessageStream so they can be instantiated directly. Move bean wiring into new DefaultRedisMessageStream / DefaultLocalMessageStream subclasses annotated with @EachBean(RedisStreamConfig.class), so apps get one stream per config bean without hand-written factories. Update lib-cmd-queue-redis tests to provide a RedisStreamConfig bean now that MessageStream wiring requires one. Bump lib-data-stream-redis to 1.4.0 and document the breaking change plus @EachBean wiring in the README. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ANDED_OFF rationale - Merge the two consecutive javadoc blocks on CommandServiceImpl into one. The previous state had a legacy "Implementation of the command service..." block followed by a new "NOTE: not annotated with @singleton" block — javac attaches only the last javadoc to the class, so the legacy description was being dropped. Unify into a single block covering both the wiring contract and the processing flow. - Expand the rationale comment on the HANDED_OFF branch. Explain numbered steps (persist RUNNING, then ACK) and why the state.status() != RUNNING guard avoids a redundant Redis write when a command hops queues more than once over its lifetime. No behavioural change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Replaces the handler-orchestrated hand-off protocol with a declarative framework primitive. Handlers now just return CommandResult.handedOff(); routing is configured once via CommandService.handoffTo(target). The actual transition runs inside one Redis MULTI — XACK + XDEL on the source stream plus XADD on the destination — so there is no observable duplicate-message window under crash between the two operations. lib-data-stream-redis: - New TxContext<T> public interface. A MessageConsumer receives it per message and can call ctx.offer(dstStreamId, message) to queue an XADD that runs inside the same MULTI as the source ACK. MessageConsumer's signature changes accordingly (breaking). - New impl-private TxContextCollector (lazy-init list of PendingOffer) shared by RedisMessageStream and LocalMessageStream. Empty-offers path stays allocation-free. - RedisMessageStream.consume drains the collected offers into the existing MULTI. - LocalMessageStream: delegate map is now static (shared across every in-JVM instance) — without that, the @EachBean wiring produced isolated maps so a cross-stream offer on one instance could never reach a consumer living on another. init/offer use computeIfAbsent so hand-offs that fire before the destination consumer starts don't NPE. - AbstractMessageStream.processMessage threads a typed TxContext<M> wrapping the raw String context with the stream's encoder. lib-cmd-queue-redis: - New CommandService.handoffTo(CommandService target). Must be called before start(). On HANDED_OFF, the framework persists state as RUNNING and queues the XADD via the supplied TxContext. - CommandService.queueStreamName() added to the public interface so handoffTo doesn't need an instanceof cast to the impl. - processCommand / processCommandWithHandler thread TxContext<CommandMsg>. - HANDED_OFF-with-no-target is logged at ERROR and marks the command FAILED (config misuse). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
The Redis-backed tiered cache test uses a 150ms entry TTL that the test itself exercises twice: once implicitly (put, then several assertions, then a get() that expects the entry to still be live) and once explicitly (sleep TTL*2, then get() that expects null). 150ms is not enough slack for a loaded CI runner — testcontainers Redis startup + Micronaut context startup + the Spock assertion machinery has been observed to eat that window and fail the live-entry assertion non-deterministically. Cache-tiered tests had been passing from Gradle's remote task cache in CI (FROM-CACHE hits on :lib-cache-tiered-redis:test in recent successful runs on master); the flake surfaced when an unrelated buildSrc edit invalidated the cache and forced the test to actually execute. Bump TTL to 2s for all four occurrences. The expiration branch still runs in 2*TTL = 4s — acceptable, and the live-entry assertions are now robust under CI jitter. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
The prior commit made LocalMessageStream's backing map static so that multiple @EachBean-produced instances could see the same set of streams (needed for atomic cross-stream hand-off via TxContext on non-Redis environments). But static state is JVM-scoped, which meant the map survived across Micronaut test contexts in the same JVM — causing lingering listener threads and residual messages from earlier tests to interfere with later tests. CI hit a 15-minute timeout on one such hung test. Replace the static map with an injectable LocalMessageStreamStore @singleton: one store bean per Micronaut context, shared by every DefaultLocalMessageStream bean in that context. Test isolation is restored (each @MicronautTest gets its own store) while the cross-stream hand-off still works for multi-queue applications (all DefaultLocalMessageStream instances in one context point at the same backing map). LocalMessageStream gains a second constructor taking the map explicitly; the no-arg constructor still creates an isolated instance for programmatic / unit-test use. Also raise .github/workflows/build.yml timeout-minutes from 15 to 30. The 15-minute ceiling was marginal once Gradle's remote test cache gets invalidated by unrelated buildSrc changes, forcing every test to actually execute. 30m gives comfortable headroom for cache-cold runs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds
CommandResult.handedOff()so a handler can end a command's life on its current queue and continue it on a different queue. Supports the sched monitor-stream design (seqeralabs/sched#303).Two accompanying breaking changes remove library-level single-queue assumptions so applications can wire multiple
CommandService/MessageStreaminstances (one per queue) without fighting default beans.What changes
lib-cmd-queue-redisCommandStatus.HANDED_OFF— new non-terminal pseudo-status. Used only as aCommandResultmarker; never written toCommandState.CommandResult.handedOff()— new factory. Signals: "I've moved this command to another queue (e.g. viaotherQueue.submit(...)). ACK the source without running the normal terminal-result transition."CommandServiceImpl.processCommandWithHandler— one additional branch onHANDED_OFF: transition the persisted state toRUNNING(viastate.started()) if it isn't already, then returntrueso the source is ACKed. TheSUBMITTED → RUNNINGtransition is required so the destination queue's consumer dispatches tocheckStatus()rather than re-runningexecute()— otherwise the non-idempotent initial work would re-fire on every poll, producing an infiniteexecute → handedOffloop.CommandServiceImplis no longer@Singleton. Class annotations removed and field@Injects replaced with a public constructor. Each application must now wire its ownCommandServicebean(s) via a@Factory. Single-queue apps declare one factory method; multi-queue apps declare one per queue, all sharing the sameCommandStateStore.TestCommandQueueFactorynow also produces theCommandServicebean; newTestRedisStreamConfigprovides the config bean required by the new@EachBeanwiring (see below).lib-data-stream-redis(bumped to1.4.0)RedisMessageStreamandLocalMessageStreamare no longer auto-registered beans. All Micronaut class-level annotations (@Singleton,@Requires,@Inject,@PostConstruct) removed; both are now plain classes with public constructors.DefaultRedisMessageStream/DefaultLocalMessageStream— thin Micronaut-managed subclasses, each annotated@EachBean(RedisStreamConfig.class)plus@Requires(bean = RedisActivator.class)/@Requires(missingBeans = RedisActivator.class)respectively. OneMessageStreambean is now produced perRedisStreamConfigbean in the context, inheriting the same qualifier — so multi-queue apps declare multiple named configs (e.g. via@EachProperty) and get named streams automatically, no hand-written per-queue factories required.MessageStream<String>unqualified must declare at least oneRedisStreamConfigbean so@EachBeanproduces a stream.READMEs
Both modules' READMEs updated with the breaking-change callout, migration examples, and the hand-off pattern including the idempotency caveat (non-atomic
submitthenACK— handlers must tolerate replay).Why this shape
An earlier draft proposed
CommandResult.activeOnStream(dst)with anattachQueue/ routing-registry / stream-name lookup inside the library. That conflated two genuinely different outcomes ("still active here" vs "done here, continue over there") and baked routing into the service. This version keeps routing out of the library entirely: handlers directly inject the target queue and callsubmit(...)themselves, and the framework just knows when to ACK without marking the command done.Moving both
CommandServiceImpland theMessageStreamimplementations off@Singletonis the matching structural change: the previous class-level bean annotations encoded a one-queue-per-app assumption that has to be relaxed for hand-off to make sense.Handler usage
Idempotency contract
Hand-off is not atomic — the destination
submithappens before the source ACK. If the consumer crashes between the two, the source is redelivered and the handler runs again. Handlers must tolerate replay, typically by persisting an intermediate state before non-idempotent side-effects so the replay can skip them. On the destination side, theSUBMITTED → RUNNINGtransition performed by the framework ensures subsequent consumer passes dispatch tocheckStatus()rather than re-invokingexecute().Test plan
CommandResultHandedOffTestcovers:handedOff()produces a non-terminalHANDED_OFFresult;HANDED_OFFis not terminal; onlySUCCEEDED/FAILED/CANCELLEDare terminal;state.started()correctly drivesSUBMITTED → RUNNINGbefore ACK.:lib-cmd-queue-redis:test,:lib-data-stream-redis:test) after the DI reshape, withTestCommandQueueFactoryandTestRedisStreamConfigproviding the beans the library no longer self-registers.CommandServicebeans wired from namedRedisStreamConfigs, handlers registered on both, hand-off routes correctly.🤖 Generated with Claude Code